Skip to content

Commit f48977c

Browse files
author
Nick Grippin
authored
Merge pull request cfpb#1145 from jmarin/loader-refactor
Loader refactor
2 parents 81e58b9 + bca0fb4 commit f48977c

File tree

15 files changed

+255
-108
lines changed

15 files changed

+255
-108
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
hmda {
2+
connectionFlowParallelism = 5
3+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package hmda.api.util
2+
3+
import akka.NotUsed
4+
import akka.actor.ActorSystem
5+
import akka.http.scaladsl.Http
6+
import akka.http.scaladsl.model.{ HttpMethods, HttpRequest, HttpResponse, Uri }
7+
import akka.http.scaladsl.unmarshalling.Unmarshal
8+
import akka.stream.ActorMaterializer
9+
import akka.stream.scaladsl.{ Flow, Framing }
10+
import akka.util.ByteString
11+
import com.typesafe.config.ConfigFactory
12+
import org.slf4j.Logger
13+
import scala.concurrent.ExecutionContext
14+
15+
trait FlowUtils {
16+
17+
implicit val system: ActorSystem
18+
implicit val materializer: ActorMaterializer
19+
implicit val ec: ExecutionContext
20+
val config = ConfigFactory.load()
21+
val parallelism = config.getInt("hmda.connectionFlowParallelism")
22+
23+
def singleConnectionFlow: Flow[HttpRequest, HttpResponse, NotUsed] =
24+
Flow[HttpRequest]
25+
.mapAsync[HttpResponse](parallelism)(request => {
26+
for {
27+
response <- Http().singleRequest(request)
28+
} yield response
29+
})
30+
31+
def framing: Flow[ByteString, ByteString, NotUsed] = {
32+
Framing.delimiter(ByteString("\n"), maximumFrameLength = 65536, allowTruncation = true)
33+
}
34+
35+
def byte2StringFlow: Flow[ByteString, String, NotUsed] =
36+
Flow[ByteString].map(bs => bs.utf8String)
37+
38+
def sendGetRequest(req: String, url: Uri) = {
39+
val request = HttpRequest(HttpMethods.GET, uri = s"$url/$req")
40+
for {
41+
response <- Http().singleRequest(request)
42+
content <- Unmarshal(response.entity).to[String]
43+
} yield content
44+
}
45+
46+
def exitSys(log: Logger, errorMessage: String, code: Int) = {
47+
log.error(errorMessage)
48+
system.terminate()
49+
Thread.sleep(100)
50+
sys.exit(code)
51+
}
52+
}

api/src/main/resources/application-dev.conf

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,15 @@ hmda {
2828
timeout = ${?HMDA_HTTP_TIMEOUT}
2929
}
3030
isDemo = true
31+
panel {
32+
tcp {
33+
host = "0.0.0.0"
34+
host = ${?HMDA_PANEL_LOADER_HOST}
35+
port = "8888"
36+
port = ${?HMDA_PANEL_LOADER_PORT}
37+
timeout = 5
38+
}
39+
}
3140
actor-flow-parallelism = 4
3241
}
3342

api/src/main/resources/application.conf

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,16 @@ hmda {
2929
}
3030
isDemo = false
3131
isDemo = ${?HMDA_IS_DEMO}
32+
panel {
33+
tcp {
34+
host = "0.0.0.0"
35+
host = ${?HMDA_PANEL_LOADER_HOST}
36+
port = "8888"
37+
port = ${?HMDA_PANEL_LOADER_PORT}
38+
timeout = 5
39+
parallelism = 5
40+
}
41+
}
3242
}
3343

3444
api-dispatcher {
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package hmda.api.tcp
2+
3+
import java.net.InetSocketAddress
4+
5+
import akka.actor.{ Actor, ActorSystem, Status }
6+
import akka.stream.ActorMaterializer
7+
import akka.stream.scaladsl.Tcp
8+
import akka.stream.scaladsl.Tcp.ServerBinding
9+
import hmda.persistence.model.HmdaActor
10+
11+
import scala.concurrent.{ ExecutionContext, Future }
12+
13+
abstract class TcpApi extends HmdaActor {
14+
15+
val name: String
16+
val host: String
17+
val port: Int
18+
19+
implicit val system: ActorSystem
20+
implicit val materializer: ActorMaterializer
21+
implicit val ec: ExecutionContext
22+
23+
val tcp: Future[ServerBinding]
24+
25+
override def receive = {
26+
case Tcp.ServerBinding(s) => handleServerBinding(s)
27+
case Status.Failure(e) => handleBindFailure(e)
28+
}
29+
30+
private def handleServerBinding(address: InetSocketAddress): Unit = {
31+
log.info(s"$name started on {}", address)
32+
context.become(Actor.emptyBehavior)
33+
}
34+
35+
private def handleBindFailure(error: Throwable): Unit = {
36+
log.error(error, s"Failed to bind to $host:$port")
37+
context stop self
38+
}
39+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package hmda.api.tcp.admin
2+
3+
import akka.NotUsed
4+
import akka.pattern.{ ask, pipe }
5+
import akka.actor.{ ActorRef, ActorSystem, Props }
6+
import akka.stream.ActorMaterializer
7+
import akka.stream.scaladsl.{ Flow, Tcp }
8+
import akka.util.{ ByteString, Timeout }
9+
import hmda.api.tcp.TcpApi
10+
import hmda.api.util.FlowUtils
11+
import hmda.model.fi.Filing
12+
import hmda.model.institution.Institution
13+
import hmda.parser.fi.InstitutionParser
14+
import hmda.persistence.HmdaSupervisor.FindFilings
15+
import hmda.persistence.institutions.FilingPersistence.CreateFiling
16+
import hmda.persistence.institutions.{ FilingPersistence, InstitutionPersistence }
17+
import hmda.persistence.institutions.InstitutionPersistence.CreateInstitution
18+
import hmda.persistence.model.HmdaSupervisorActor.FindActorByName
19+
20+
import scala.concurrent.{ ExecutionContext, Future }
21+
import scala.concurrent.duration._
22+
23+
object InstitutionAdminTcpApi {
24+
def props(supervisor: ActorRef): Props = {
25+
Props(new InstitutionAdminTcpApi(supervisor))
26+
}
27+
}
28+
29+
class InstitutionAdminTcpApi(supervisor: ActorRef) extends TcpApi with FlowUtils {
30+
override val name: String = "hmda-institutions-tcp-api"
31+
32+
override implicit val system: ActorSystem = context.system
33+
override implicit val materializer: ActorMaterializer = ActorMaterializer()
34+
override implicit val ec: ExecutionContext = context.dispatcher
35+
36+
override val host: String = config.getString("hmda.panel.tcp.host")
37+
override val port: Int = config.getInt("hmda.panel.tcp.port")
38+
val duration = config.getInt("hmda.panel.tcp.timeout").seconds
39+
implicit val timeout = Timeout(duration)
40+
val buffer = config.getInt("hmda.panel.tcp.parallelism")
41+
42+
val tcpHandler: Flow[ByteString, ByteString, NotUsed] =
43+
Flow[ByteString]
44+
.via(framing)
45+
.drop(1)
46+
.via(byte2StringFlow)
47+
.map(x => InstitutionParser(x))
48+
.mapAsync(parallelism = buffer)(i => createInstitution(i))
49+
.mapAsync(parallelism = buffer)(i => createFiling(i))
50+
.map(e => ByteString(e.toString))
51+
52+
override val tcp: Future[Tcp.ServerBinding] = Tcp().bindAndHandle(
53+
tcpHandler,
54+
host,
55+
port
56+
)
57+
58+
tcp pipeTo self
59+
60+
private def createInstitution(i: Institution): Future[Institution] = {
61+
val fInstitutionsActor = (supervisor ? FindActorByName(InstitutionPersistence.name)).mapTo[ActorRef]
62+
for {
63+
actor <- fInstitutionsActor
64+
i <- (actor ? CreateInstitution(i))
65+
.mapTo[Option[Institution]]
66+
.map(i => i.getOrElse(Institution.empty))
67+
} yield i
68+
}
69+
70+
private def createFiling(institution: Institution): Future[Filing] = {
71+
val fFilingPersistence = (supervisor ? FindFilings(FilingPersistence.name, institution.id)).mapTo[ActorRef]
72+
for {
73+
actor <- fFilingPersistence
74+
f <- (actor ? CreateFiling(Filing(institution.activityYear.toString, institution.id)))
75+
.mapTo[Option[Filing]]
76+
.map(x => x.getOrElse(Filing()))
77+
} yield f
78+
}
79+
80+
}

build.sbt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,13 @@ lazy val validation = (project in file("validation"))
126126
).dependsOn(parserJVM % "compile->compile;test->test")
127127
.dependsOn(persistenceModel % "compile->compile;test->test")
128128

129-
lazy val panel = (project in file("panel"))
129+
lazy val loader = (project in file("loader"))
130130
.settings(hmdaBuildSettings: _*)
131131
.settings(Revolver.settings:_*)
132132
.settings(
133133
Seq(
134134
assemblyJarName in assembly := {s"${name.value}.jar"},
135-
mainClass in assembly := Some("hmda.panel.PanelCsvLoader"),
135+
mainClass in assembly := Some("hmda.loader.panel.PanelCsvLoader"),
136136
assemblyMergeStrategy in assembly := {
137137
case "application.conf" => MergeStrategy.concat
138138
case "META-INF/io.netty.versions.properties" => MergeStrategy.concat

cluster/src/main/scala/hmda/cluster/HmdaPlatform.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import akka.cluster.http.management.ClusterHttpManagement
1010
import akka.cluster.singleton.{ ClusterSingletonManager, ClusterSingletonManagerSettings, ClusterSingletonProxy, ClusterSingletonProxySettings }
1111
import akka.util.Timeout
1212
import com.typesafe.config.ConfigFactory
13+
import hmda.api.tcp.admin.InstitutionAdminTcpApi
1314
import hmda.api.{ HmdaAdminApi, HmdaFilingApi, HmdaPublicApi }
1415
import hmda.persistence.HmdaSupervisor
1516
import hmda.persistence.institutions.InstitutionPersistence
@@ -79,6 +80,7 @@ object HmdaPlatform extends App {
7980
system.actorOf(HmdaFilingApi.props(supervisorProxy, querySupervisorProxy, validationStatsProxy).withDispatcher("api-dispatcher"), "hmda-filing-api")
8081
system.actorOf(HmdaAdminApi.props(supervisorProxy, querySupervisorProxy).withDispatcher("api-dispatcher"), "hmda-admin-api")
8182
system.actorOf(HmdaPublicApi.props(querySupervisorProxy).withDispatcher("api-dispatcher"), "hmda-public-api")
83+
system.actorOf(InstitutionAdminTcpApi.props(supervisorProxy), "panel-loader-tcp")
8284
}
8385

8486
//Start Persistence

docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ services:
77
- '8080:8080'
88
- '8081:8081'
99
- '8082:8082'
10+
- '8888:8888'
1011
volumes:
1112
- ./target/scala-2.12/hmda.jar:/opt/hmda.jar
1213
depends_on:

panel/src/main/resources/application.conf renamed to loader/src/main/resources/application.conf

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,14 @@ akka {
77
hmda {
88
adminUrl = "http://0.0.0.0:8081"
99
adminUrl = ${?HMDA_HTTP_ADMIN_URL}
10+
httpTimeout = 5
11+
panel {
12+
tcp {
13+
host = "127.0.0.1"
14+
host = ${?HMDA_PANEL_LOADER_HOST}
15+
port = "8888"
16+
port = ${?HMDA_PANEL_LOADER_PORT}
17+
}
18+
}
1019
}
1120

0 commit comments

Comments
 (0)