diff --git a/common/src/main/scala/hmda/query/ts/TransmittalSheetAltEntity.scala b/common/src/main/scala/hmda/query/ts/TransmittalSheetAltEntity.scala new file mode 100644 index 0000000000..979af51a53 --- /dev/null +++ b/common/src/main/scala/hmda/query/ts/TransmittalSheetAltEntity.scala @@ -0,0 +1,145 @@ +package hmda.query.ts + +import hmda.parser.filing.ts.TsCsvParser.dateFromString +import hmda.util.PsvParsingCompanion +import hmda.util.conversion.ColumnDataFormatter +import io.chrisdavenport.cormorant +import io.chrisdavenport.cormorant.CSV +import io.chrisdavenport.cormorant.implicits._ + +case class TransmittalSheetAltEntity( + lei: String = "", + id: Int = 0, + institutionName: String = "", + year: Int = 0, + quarter: Int = 0, + name: String = "", + phone: String = "", + email: String = "", + street: String = "", + city: String = "", + state: String = "", + zipCode: String = "", + agency: Int = 0, + totalLines: Int = 0, + taxId: String = "", + submissionId: Option[String] = Some(""), + createdAt: Option[java.sql.Timestamp] = Some(new java.sql.Timestamp(System.currentTimeMillis())), + isQuarterly: Option[Boolean] = Some(false), + signDate: Option[Long] = Some(0L), + firstSignDate: Option[Long] = Some(0L) + ) extends ColumnDataFormatter { + def isEmpty: Boolean = lei == "" + + def toRegulatorPSV: String = + s"$id|$institutionName|$year|" + + s"$quarter|$name|$phone|" + + s"$email|$street|$city|" + + s"$state|$zipCode|$agency|" + + s"$totalLines|$taxId|$lei|${dateToString(signDate)}|${dateToString(firstSignDate)}" + + def toPublicPSV: String = + s"$year|$quarter|$lei|$taxId|$agency|" + + s"$institutionName|$state|$city|$zipCode|$totalLines" + + def toPublicCSV: String = + s"$year,$quarter,$lei,$taxId,$agency," + + s"${escapeCommas(institutionName)},$state,${escapeCommas(city)},$zipCode,$totalLines" + +} + +object TransmittalSheetAltEntity { + + object PublicParser extends PsvParsingCompanion[TransmittalSheetAltEntity] { + override val psvReader: cormorant.Read[TransmittalSheetAltEntity] = { (a: CSV.Row) => + for { + (rest, year) <- enforcePartialRead(readNext[Int], a) + (rest, quarter) <- enforcePartialRead(readNext[Int], rest) + (rest, lei) <- enforcePartialRead(readNext[String], rest) + (rest, taxId) <- enforcePartialRead(readNext[String], rest) + (rest, agency) <- enforcePartialRead(readNext[Int], rest) + (rest, institutionName) <- enforcePartialRead(readNext[String], rest) + (rest, state) <- enforcePartialRead(readNext[String], rest) + (rest, city) <- enforcePartialRead(readNext[String], rest) + (rest, zipCode) <- enforcePartialRead(readNext[String], rest) + totalLinesOrMore <- readNext[Int].readPartial(rest) + } yield { + def create(totalLines: Int) = TransmittalSheetAltEntity( + lei = lei, + institutionName = institutionName, + year = year, + quarter = quarter, + city = city, + state = state, + zipCode = zipCode, + agency = agency, + totalLines = totalLines, + taxId = taxId + ) + totalLinesOrMore match { + case Left((more, totalLines)) => Left(more -> create(totalLines)) + case Right(totalLines) => Right(create(totalLines)) + } + } + } + } + + /** + * s"$id|$institutionName|$year|" + + s"$quarter|$name|$phone|" + + s"$email|$street|$city|" + + s"$state|$zipCode|$agency|" + + s"$totalLines|$taxId|$lei|${dateToString(signDate)}|${dateToString(firstSignDate)}" + */ + object RegulatorParser extends PsvParsingCompanion[TransmittalSheetAltEntity] { + override val psvReader: cormorant.Read[TransmittalSheetAltEntity] = { (a: CSV.Row) => + for { + (rest, id) <- enforcePartialRead(readNext[Int], a) + (rest, institutionName) <- enforcePartialRead(readNext[String], rest) + (rest, year) <- enforcePartialRead(readNext[Int], rest) + (rest, quarter) <- enforcePartialRead(readNext[Int], rest) + (rest, name) <- enforcePartialRead(readNext[String], rest) + (rest, phone) <- enforcePartialRead(readNext[String], rest) + (rest, email) <- enforcePartialRead(readNext[String], rest) + (rest, street) <- enforcePartialRead(readNext[String], rest) + (rest, city) <- enforcePartialRead(readNext[String], rest) + (rest, state) <- enforcePartialRead(readNext[String], rest) + (rest, zipCode) <- enforcePartialRead(readNext[String], rest) + (rest, agency) <- enforcePartialRead(readNext[Int], rest) + (rest, totalLines) <- enforcePartialRead(readNext[Int], rest) + (rest, taxId) <- enforcePartialRead(readNext[String], rest) + (rest, lei) <- enforcePartialRead(readNext[String], rest) + (rest, signDate) <- enforcePartialRead(readNext[String], rest) + firstSignDateOrMore <- readNext[String].readPartial(rest) + } yield { + def create(firstSignDate: String) = TransmittalSheetAltEntity( + lei = lei, + id = id, + institutionName = institutionName, + year = year, + quarter = quarter, + name = name, + phone = phone, + email = email, + street = street, + city = city, + state = state, + zipCode = zipCode, + agency = agency, + totalLines = totalLines, + taxId = taxId, + signDate = dateFromString(signDate), + firstSignDate = dateFromString(firstSignDate) + ) + + + firstSignDateOrMore match { + case Left((more, firstSignDate)) => Left(more -> create(firstSignDate)) + case Right(firstSignDate) => Right(create(firstSignDate)) + } + } + } + + } + +} \ No newline at end of file diff --git a/common/src/main/scala/hmda/query/ts/TransmittalSheetEntity.scala b/common/src/main/scala/hmda/query/ts/TransmittalSheetEntity.scala index ee4ac45269..5f4dadd7c2 100644 --- a/common/src/main/scala/hmda/query/ts/TransmittalSheetEntity.scala +++ b/common/src/main/scala/hmda/query/ts/TransmittalSheetEntity.scala @@ -35,7 +35,7 @@ case class TransmittalSheetEntity( s"$quarter|$name|$phone|" + s"$email|$street|$city|" + s"$state|$zipCode|$agency|" + - s"$totalLines|$taxId|$lei|${dateToString(signDate)}" + s"$totalLines|$taxId|$lei|${dateToString(signDate)}}" def toPublicPSV: String = s"$year|$quarter|$lei|$taxId|$agency|" + @@ -88,7 +88,7 @@ object TransmittalSheetEntity { s"$quarter|$name|$phone|" + s"$email|$street|$city|" + s"$state|$zipCode|$agency|" + - s"$totalLines|$taxId|$lei|${dateToString(signDate)}" + s"$totalLines|$taxId|$lei|${dateToString(signDate)}|${dateToString(firstSignDate)}" */ object RegulatorParser extends PsvParsingCompanion[TransmittalSheetEntity] { override val psvReader: cormorant.Read[TransmittalSheetEntity] = { (a: CSV.Row) => @@ -108,9 +108,10 @@ object TransmittalSheetEntity { (rest, totalLines) <- enforcePartialRead(readNext[Int], rest) (rest, taxId) <- enforcePartialRead(readNext[String], rest) (rest, lei) <- enforcePartialRead(readNext[String], rest) - signDateOrMore <- readNext[String].readPartial(rest) + (rest, signDate) <- enforcePartialRead(readNext[String], rest) + firstSignDateOrMore <- readNext[String].readPartial(rest) } yield { - def create(signDate: String) = TransmittalSheetEntity( + def create(firstSignDate: String) = TransmittalSheetEntity( lei = lei, id = id, institutionName = institutionName, @@ -126,12 +127,12 @@ object TransmittalSheetEntity { agency = agency, totalLines = totalLines, taxId = taxId, - signDate = dateFromString(signDate) - ) + signDate = dateFromString(signDate)) - signDateOrMore match { - case Left((more, signDate)) => Left(more -> create(signDate)) - case Right(signDate) => Right(create(signDate)) + + firstSignDateOrMore match { + case Left((more, firstSignDate)) => Left(more -> create(firstSignDate)) + case Right(firstSignDate) => Right(create(firstSignDate)) } } } diff --git a/docs/spec/Agency_TS_Spec.csv b/docs/spec/Agency_TS_Spec.csv index d291ca7949..efb564a75a 100644 --- a/docs/spec/Agency_TS_Spec.csv +++ b/docs/spec/Agency_TS_Spec.csv @@ -14,4 +14,5 @@ agency, Federal Agency, Numeric, HMDA Filing Instructions Guide totalLines, Total Number of Entries Contained in Submission, Numeric, HMDA Filing Instructions Guide taxId, Federal Taxpayer Identification Number, Alphanumeric, HMDA Filing Instructions Guide lei, Legal Entity Identifier (LEI), Alphanumeric, HMDA Filing Instructions Guide -signDate, HMDA Filing Sign Datetime, Alphanumeric, Collected by the HMDA Platform \ No newline at end of file +signDate, HMDA Filing Sign Datetime, Alphanumeric, Collected by the HMDA Platform +firstSignDate, HMDA Filing First Sign Datetime, Alphanumeric, Collected by the HMDA Platform \ No newline at end of file diff --git a/hmda-analytics/src/main/scala/hmda/analytics/HmdaAnalyticsApp.scala b/hmda-analytics/src/main/scala/hmda/analytics/HmdaAnalyticsApp.scala index 9c80020caf..6fc314dd05 100644 --- a/hmda-analytics/src/main/scala/hmda/analytics/HmdaAnalyticsApp.scala +++ b/hmda-analytics/src/main/scala/hmda/analytics/HmdaAnalyticsApp.scala @@ -140,6 +140,10 @@ object HmdaAnalyticsApp extends App with TransmittalSheetComponent with LarCompo .map(l => l.submission.end) .runWith(Sink.lastOption) + def firstSignDateSubmissionHistory: Future[Seq[Long]] = { + submissionHistoryRepository.firstSignDate(submissionId) + } + def deleteTsRow: Future[Done] = rawData.take(1) .map(s => TsCsvParser(s, fromCassandra = true)) @@ -189,16 +193,29 @@ object HmdaAnalyticsApp extends App with TransmittalSheetComponent with LarCompo val enforceQuarterly = submissionId.period.quarter.isDefined for { signdate <- signDate - insertorupdate <- repo.insert(copyTs(ts, Some(signdate.getOrElse(0L)), enforceQuarterly)) + firstsigndate <- firstSignDateSubmissionHistory + insertorupdate <- { + println("first sign date") + println(firstsigndate) + val resolvedSignDate = Some(signdate.getOrElse(0L)) + val resolvedFirstSignDate = { + if (firstsigndate.isEmpty) resolvedSignDate + else Some(firstsigndate.head) + } + repo.insert(copyTs(ts, resolvedSignDate, resolvedFirstSignDate, enforceQuarterly)) + } } yield insertorupdate } .runWith(Sink.ignore) - def copyTs(ts: TransmittalSheetEntity, signdate: Option[Long], enforceQuarterly: Boolean): TransmittalSheetEntity = + def copyTs(ts: TransmittalSheetEntity, signdate: Option[Long], firstsigndate: Option[Long], enforceQuarterly: Boolean): TransmittalSheetEntity = if (enforceQuarterly) { - ts.copy(lei = ts.lei.toUpperCase, signDate = signdate, isQuarterly = Some(true)) + ts.copy(lei = ts.lei.toUpperCase, signDate = signdate, firstSignDate = firstsigndate, isQuarterly = Some(true)) } else { - ts.copy(lei = ts.lei.toUpperCase, signDate = signdate) + val newts = ts.copy(lei = ts.lei.toUpperCase, firstSignDate = firstsigndate, signDate = signdate) + println("new ts") + println(newts) + newts } def deleteLarRows: Future[Done] = @@ -248,6 +265,9 @@ object HmdaAnalyticsApp extends App with TransmittalSheetComponent with LarCompo def result = for { + firstSignDate <- firstSignDateSubmissionHistory + _ = log.info(s"First date signed $firstSignDate") + _ <- deleteTsRow _ = if(tsDeletion) log.info(s"Attempt to remove data from TS table for $submissionId completed.") @@ -268,7 +288,7 @@ object HmdaAnalyticsApp extends App with TransmittalSheetComponent with LarCompo _ = log.info(s"Attempt to add data to LAR table for $submissionId completed.") - dateSigned <- signDate + dateSigned <- signDate _ = log.info(s"Date signed $dateSigned") res <- insertSubmissionHistory diff --git a/hmda-analytics/src/main/scala/hmda/analytics/query/SubmissionHistoryComponent.scala b/hmda-analytics/src/main/scala/hmda/analytics/query/SubmissionHistoryComponent.scala index 2eacab6bff..3bbb6f7295 100644 --- a/hmda-analytics/src/main/scala/hmda/analytics/query/SubmissionHistoryComponent.scala +++ b/hmda-analytics/src/main/scala/hmda/analytics/query/SubmissionHistoryComponent.scala @@ -28,6 +28,18 @@ trait SubmissionHistoryComponent { sign_date = ${signDate} """ } + + def firstSignDate(submissionId: SubmissionId): Future[Seq[Long]] = { + val period = submissionId.period.year + val lei = submissionId.lei + val submissionIdLikeStatment = s"${lei}-${period}-%" + config.db.run { + sql""" + SELECT MIN(sign_date) from #${tableName} + WHERE submission_id LIKE $submissionIdLikeStatment + """.as[Long] + } + } } } diff --git a/hmda-analytics/src/main/scala/hmda/analytics/query/TransmittalSheetComponent.scala b/hmda-analytics/src/main/scala/hmda/analytics/query/TransmittalSheetComponent.scala index 51043b91ca..06cac5b3cb 100644 --- a/hmda-analytics/src/main/scala/hmda/analytics/query/TransmittalSheetComponent.scala +++ b/hmda-analytics/src/main/scala/hmda/analytics/query/TransmittalSheetComponent.scala @@ -35,6 +35,7 @@ trait TransmittalSheetComponent { def createdAt = column[Option[Timestamp]]("created_at") def isQuarterly = column[Option[Boolean]]("is_quarterly") def signDate = column[Option[Long]]("sign_date") + def firstSignDate = column[Option[Long]]("first_sign_data") override def * = ( @@ -56,7 +57,8 @@ trait TransmittalSheetComponent { submissionId, createdAt, isQuarterly, - signDate + signDate, + firstSignDate ) <> ((TransmittalSheetEntity.apply _).tupled, TransmittalSheetEntity.unapply) } @@ -84,6 +86,9 @@ trait TransmittalSheetComponent { def findByLei(lei: String): Future[Seq[TransmittalSheetEntity]] = db.run(table.filter(_.lei.toUpperCase === lei.toUpperCase).result) + def findByLeiAndQuarter(lei: String): Future[Seq[TransmittalSheetEntity]] = + db.run(table.filter(x => x.lei.toUpperCase === lei.toUpperCase && x.isQuarterly === true).result) + def deleteByLei(lei: String): Future[Int] = db.run(table.filter(_.lei.toUpperCase === lei.toUpperCase).delete) diff --git a/hmda-data-publisher/src/main/resources/application.conf b/hmda-data-publisher/src/main/resources/application.conf index c7a8cc28a4..c391be7cbd 100644 --- a/hmda-data-publisher/src/main/resources/application.conf +++ b/hmda-data-publisher/src/main/resources/application.conf @@ -416,6 +416,8 @@ pg-tables { //Common PG Email Table + submissionHistoryTableName="submission_history" + submissionHistoryTableName=${?SUBMISSION_HISTORY_TABLE} emailTableName ="institutions_emails_2018" emailTableName =${?EMAIL_TABLE} diff --git a/hmda-data-publisher/src/main/scala/hmda/publisher/helper/PGTableNameLoader.scala b/hmda-data-publisher/src/main/scala/hmda/publisher/helper/PGTableNameLoader.scala index 2ac095ab62..4b8db3b6bf 100644 --- a/hmda-data-publisher/src/main/scala/hmda/publisher/helper/PGTableNameLoader.scala +++ b/hmda-data-publisher/src/main/scala/hmda/publisher/helper/PGTableNameLoader.scala @@ -117,6 +117,7 @@ trait PGTableNameLoader { //common table names val emailTableName: String = pgTableConfig.getString("emailTableName") + val submissionHistoryTableName: String = pgTableConfig.getString("submissionHistoryTableName") val panelTableBase: String = pgTableConfig.getString("panelTableBase") val tsAnnualTableBase: String = pgTableConfig.getString("tsAnnualTableBase") val tsQuarterTableBase: String = pgTableConfig.getString("tsQuarterTableBase") diff --git a/hmda-data-publisher/src/main/scala/hmda/publisher/query/component/SubmissionHistoryComponent.scala b/hmda-data-publisher/src/main/scala/hmda/publisher/query/component/SubmissionHistoryComponent.scala new file mode 100644 index 0000000000..6c521b985b --- /dev/null +++ b/hmda-data-publisher/src/main/scala/hmda/publisher/query/component/SubmissionHistoryComponent.scala @@ -0,0 +1,57 @@ +package hmda.publisher.query.component + +import hmda.publisher.helper.PGTableNameLoader +import hmda.publisher.query.submissionhistory.SubmissionHistoryEntity +import hmda.query.DbConfiguration.dbConfig +import hmda.query.repository.TableRepository +import hmda.query.ts.TransmittalSheetEntity +import slick.basic.DatabaseConfig +import slick.jdbc.JdbcProfile + +import scala.concurrent.Future + + +trait SubmissionHistoryComponent extends PGTableNameLoader{ + + import dbConfig.profile.api._ + + class SubmissionHistoryTable(tag: Tag) + extends Table[SubmissionHistoryEntity](tag, submissionHistoryTableName) with SubmissionHistoryComponent { + def id = column[Int]("id") + def lei = column[String]("lei") + def submissionId= column[String]("submission_id") + def signDate = column[Option[Long]]("sign_date") + + def * = + (lei, submissionId, signDate) <> (SubmissionHistoryEntity.tupled, SubmissionHistoryEntity.unapply) + + } + val submissionHistoryTable = TableQuery[SubmissionHistoryTable] + +class SubmissionHistoryRepository(val config: DatabaseConfig[JdbcProfile]) + extends TableRepository[SubmissionHistoryTable, Int] { + val table = submissionHistoryTable + def getId(table: SubmissionHistoryTable) = table.id + def deleteById(id: Int) = db.run(filterById(id).delete) + + def createSchema() = db.run(table.schema.create) + def dropSchema() = db.run(table.schema.drop) + + def findByLei(lei: String) = { + db.run(table.filter(_.lei === lei).result) + } + + + def findFirstSignDate(lei: String,year: Int): Future[Seq[Long]] = { + val submissionIdLikeStatment = s"${lei}-${year}-%" + val excludeQuarterlyLikeStatment = s"%-Q%" + db.run { + sql""" + SELECT MIN(sign_date) from #${submissionHistoryTableName} + WHERE submission_id LIKE $submissionIdLikeStatment AND + NOT LIKE $excludeQuarterlyLikeStatment + """.as[Long] + } + } +} +} \ No newline at end of file diff --git a/hmda-data-publisher/src/main/scala/hmda/publisher/query/submissionhistory/SubmissionHistoryEntity.scala b/hmda-data-publisher/src/main/scala/hmda/publisher/query/submissionhistory/SubmissionHistoryEntity.scala new file mode 100644 index 0000000000..bb5d914f0d --- /dev/null +++ b/hmda-data-publisher/src/main/scala/hmda/publisher/query/submissionhistory/SubmissionHistoryEntity.scala @@ -0,0 +1,7 @@ +package hmda.publisher.query.submissionhistory + +case class SubmissionHistoryEntity( + lei: String, + submissionId: String, + signDate: Option[Long] = Some(0L) +) diff --git a/hmda-data-publisher/src/main/scala/hmda/publisher/scheduler/TsScheduler.scala b/hmda-data-publisher/src/main/scala/hmda/publisher/scheduler/TsScheduler.scala index 60bb67000a..334dbcff3f 100644 --- a/hmda-data-publisher/src/main/scala/hmda/publisher/scheduler/TsScheduler.scala +++ b/hmda-data-publisher/src/main/scala/hmda/publisher/scheduler/TsScheduler.scala @@ -13,7 +13,8 @@ import com.typesafe.config.ConfigFactory import hmda.actor.HmdaActor import hmda.publisher.helper.CronConfigLoader.{CronString, specificTsCron, specificTsYears, tsCron, tsQuarterlyCron, tsQuarterlyYears, tsYears} import hmda.publisher.helper.{PrivateAWSConfigLoader, QuarterTimeBarrier, S3Utils, SnapshotCheck} -import hmda.publisher.query.component.{PublisherComponent, PublisherComponent2018, PublisherComponent2019, PublisherComponent2020, PublisherComponent2021, PublisherComponent2022, PublisherComponent2023, TransmittalSheetTable, TsRepository, YearPeriod} +import hmda.publisher.query.component.{PublisherComponent, PublisherComponent2018, PublisherComponent2019, PublisherComponent2020, PublisherComponent2021, PublisherComponent2022, PublisherComponent2023, SubmissionHistoryComponent, TransmittalSheetTable, TsRepository, YearPeriod} +import hmda.publisher.query.submissionhistory.SubmissionHistoryEntity import hmda.publisher.scheduler.schedules.{Schedule, ScheduleWithYear} import hmda.publisher.scheduler.schedules.Schedules.{TsQuarterlySchedule, TsSchedule} import hmda.publisher.util.{PublishingReporter, ScheduleCoordinator} @@ -37,7 +38,9 @@ class TsScheduler(publishingReporter: ActorRef[PublishingReporter.Command], sche with PublisherComponent2021 with PublisherComponent2022 with PublisherComponent2023 - with PrivateAWSConfigLoader { + with PrivateAWSConfigLoader + with SubmissionHistoryComponent + { implicit val ec = context.system.dispatcher implicit val materializer = Materializer(context) @@ -58,6 +61,7 @@ class TsScheduler(publishingReporter: ActorRef[PublishingReporter.Command], sche ) }).toMap + def submissionHistoryRepository = new SubmissionHistoryRepository(dbConfig) val publishingGuard: PublishingGuard = PublishingGuard.create(this)(context.system) val timeBarrier: QuarterTimeBarrier = new QuarterTimeBarrier(Clock.systemDefaultZone()) @@ -98,6 +102,7 @@ class TsScheduler(publishingReporter: ActorRef[PublishingReporter.Command], sche val source = Source .future(transmittalSheets) .mapConcat(_.toList) + .mapAsync(1)(transmittalsheet => appendMinSignDate(transmittalsheet)) .map(transmittalSheet => transmittalSheet.toRegulatorPSV + "\n") .map(ByteString(_)) S3Utils.uploadWithRetry(source, s3Sink) @@ -228,5 +233,33 @@ class TsScheduler(publishingReporter: ActorRef[PublishingReporter.Command], sche log.error(s"An error has occurred while publishing $bucketPrivate/$fullFilePath: " + message.getMessage, message) } + def appendMinSignDate(transmittalsheet: TransmittalSheetEntity): Future[TransmittalSheetAltEntity] = { + val firstSignDates: Future[Seq[Long]] = + submissionHistoryRepository.findFirstSignDate(transmittalsheet.lei,transmittalsheet.year) + firstSignDates.map( signDateList => + TransmittalSheetAltEntity( + lei = transmittalsheet.lei, + institutionName= transmittalsheet.institutionName, + year= transmittalsheet.year, + quarter= transmittalsheet.quarter, + name= transmittalsheet.name, + phone= transmittalsheet.phone, + email= transmittalsheet.email, + street= transmittalsheet.street, + city= transmittalsheet.city, + state= transmittalsheet.state, + zipCode= transmittalsheet.zipCode, + agency= transmittalsheet.agency, + totalLines= transmittalsheet.totalLines, + taxId= transmittalsheet.taxId, + submissionId=transmittalsheet.submissionId, + createdAt= transmittalsheet.createdAt, + isQuarterly= transmittalsheet.isQuarterly, + signDate= transmittalsheet.signDate, + firstSignDate = Option(signDateList.toList.last) + ) + ) + } + } // $COVERAGE-ON$ \ No newline at end of file