Skip to content

Commit cf2b4c5

Browse files
committed
Add TCP panel loader
1 parent 608a185 commit cf2b4c5

File tree

8 files changed

+70
-63
lines changed

8 files changed

+70
-63
lines changed

api/src/main/resources/application-dev.conf

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,15 @@ hmda {
2828
timeout = ${?HMDA_HTTP_TIMEOUT}
2929
}
3030
isDemo = true
31+
panel {
32+
tcp {
33+
host = "127.0.0.1"
34+
host = ${?HMDA_PANEL_LOADER_HOST}
35+
port = "8888"
36+
port = ${?HMDA_PANEL_LOADER_PORT}
37+
timeout = 5
38+
}
39+
}
3140
actor-flow-parallelism = 4
3241
}
3342

api/src/main/resources/application.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ hmda {
3535
host = ${?HMDA_PANEL_LOADER_HOST}
3636
port = "8888"
3737
port = ${?HMDA_PANEL_LOADER_PORT}
38+
timeout = 5
39+
parallelism = 5
3840
}
3941
}
4042
}

api/src/main/scala/hmda/api/tcp/TcpApi.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ package hmda.api.tcp
22

33
import java.net.InetSocketAddress
44

5-
import akka.actor.{Actor, ActorSystem, Status}
5+
import akka.actor.{ Actor, ActorSystem, Status }
66
import akka.stream.ActorMaterializer
77
import akka.stream.scaladsl.Tcp
88
import akka.stream.scaladsl.Tcp.ServerBinding
99
import hmda.persistence.model.HmdaActor
1010

11-
import scala.concurrent.{ExecutionContext, Future}
11+
import scala.concurrent.{ ExecutionContext, Future }
1212

1313
abstract class TcpApi extends HmdaActor {
1414

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,26 @@
11
package hmda.api.tcp.admin
22

33
import akka.NotUsed
4-
import akka.pattern.pipe
5-
import akka.actor.{ActorRef, ActorSystem}
4+
import akka.pattern.{ ask, pipe }
5+
import akka.actor.{ ActorRef, ActorSystem, Props }
66
import akka.stream.ActorMaterializer
7-
import akka.stream.scaladsl.{Flow, Tcp}
8-
import akka.util.ByteString
7+
import akka.stream.scaladsl.{ Flow, Tcp }
8+
import akka.util.{ ByteString, Timeout }
99
import hmda.api.tcp.TcpApi
1010
import hmda.api.util.FlowUtils
11+
import hmda.model.institution.Institution
12+
import hmda.parser.fi.InstitutionParser
13+
import hmda.persistence.institutions.InstitutionPersistence
14+
import hmda.persistence.institutions.InstitutionPersistence.CreateInstitution
15+
import hmda.persistence.model.HmdaSupervisorActor.FindActorByName
16+
import scala.concurrent.{ ExecutionContext, Future }
17+
import scala.concurrent.duration._
1118

12-
import scala.concurrent.{ExecutionContext, Future}
19+
object InstitutionAdminTcpApi {
20+
def props(supervisor: ActorRef): Props = {
21+
Props(new InstitutionAdminTcpApi(supervisor))
22+
}
23+
}
1324

1425
class InstitutionAdminTcpApi(supervisor: ActorRef) extends TcpApi with FlowUtils {
1526
override val name: String = "hmda-institutions-tcp-api"
@@ -20,12 +31,20 @@ class InstitutionAdminTcpApi(supervisor: ActorRef) extends TcpApi with FlowUtils
2031

2132
override val host: String = config.getString("hmda.panel.tcp.host")
2233
override val port: Int = config.getInt("hmda.panel.tcp.port")
34+
val duration = config.getInt("hmda.panel.tcp.timeout").seconds
35+
implicit val timeout = Timeout(duration)
36+
val buffer = config.getInt("hmda.panel.tcp.parallelism")
2337

38+
val fInstitutionsActor = (supervisor ? FindActorByName(InstitutionPersistence.name)).mapTo[ActorRef]
2439

2540
val tcpHandler: Flow[ByteString, ByteString, NotUsed] =
2641
Flow[ByteString]
27-
.map{ e => println(e); e}
28-
42+
.via(framing)
43+
.drop(1)
44+
.via(byte2StringFlow)
45+
.map(x => InstitutionParser(x))
46+
.mapAsync(parallelism = buffer)(i => createInstitution(fInstitutionsActor, i))
47+
.map(e => ByteString(e.toString))
2948

3049
override val tcp: Future[Tcp.ServerBinding] = Tcp().bindAndHandle(
3150
tcpHandler,
@@ -34,4 +53,12 @@ class InstitutionAdminTcpApi(supervisor: ActorRef) extends TcpApi with FlowUtils
3453
)
3554

3655
tcp pipeTo self
56+
57+
private def createInstitution(fActor: Future[ActorRef], i: Institution): Future[Institution] = {
58+
for {
59+
actor <- fActor
60+
i <- (actor ? CreateInstitution(i)).mapTo[Option[Institution]].map(i => i.getOrElse(Institution.empty))
61+
} yield i
62+
}
63+
3764
}

cluster/src/main/scala/hmda/cluster/HmdaPlatform.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import akka.cluster.http.management.ClusterHttpManagement
1010
import akka.cluster.singleton.{ ClusterSingletonManager, ClusterSingletonManagerSettings, ClusterSingletonProxy, ClusterSingletonProxySettings }
1111
import akka.util.Timeout
1212
import com.typesafe.config.ConfigFactory
13+
import hmda.api.tcp.admin.InstitutionAdminTcpApi
1314
import hmda.api.{ HmdaAdminApi, HmdaFilingApi, HmdaPublicApi }
1415
import hmda.persistence.HmdaSupervisor
1516
import hmda.persistence.institutions.InstitutionPersistence
@@ -85,6 +86,7 @@ object HmdaPlatform extends App {
8586
system.actorOf(HmdaFilingApi.props(supervisorProxy, querySupervisorProxy, validationStatsProxy).withDispatcher("api-dispatcher"), "hmda-filing-api")
8687
system.actorOf(HmdaAdminApi.props(supervisorProxy, querySupervisorProxy).withDispatcher("api-dispatcher"), "hmda-admin-api")
8788
system.actorOf(HmdaPublicApi.props(querySupervisorProxy).withDispatcher("api-dispatcher"), "hmda-public-api")
89+
system.actorOf(InstitutionAdminTcpApi.props(supervisorProxy), "panel-loader-tcp")
8890
}
8991

9092
//Start Persistence

loader/src/main/resources/application.conf

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,13 @@ hmda {
88
adminUrl = "http://0.0.0.0:8081"
99
adminUrl = ${?HMDA_HTTP_ADMIN_URL}
1010
httpTimeout = 5
11+
panel {
12+
tcp {
13+
host = "127.0.0.1"
14+
host = ${?HMDA_PANEL_LOADER_HOST}
15+
port = "8888"
16+
port = ${?HMDA_PANEL_LOADER_PORT}
17+
}
18+
}
1119
}
1220

loader/src/main/scala/hmda/loader/http/InstitutionFlowUtils.scala

Lines changed: 0 additions & 23 deletions
This file was deleted.
Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
1-
package hmda.panel
1+
package hmda.loader.panel
22

33
import java.io.File
44

55
import akka.actor.ActorSystem
6-
import akka.http.scaladsl.model.HttpResponse
76
import akka.stream.ActorMaterializer
8-
import akka.stream.scaladsl.{ FileIO, Framing, Sink }
9-
import akka.util.ByteString
10-
import hmda.loader.http.InstitutionFlowUtils
7+
import akka.stream.scaladsl.{ FileIO, Sink, Tcp }
118
import org.slf4j.LoggerFactory
129
import hmda.api.util.FlowUtils
10+
1311
import scala.concurrent.duration._
1412

15-
object PanelCsvLoader extends FlowUtils with InstitutionFlowUtils {
13+
object PanelCsvLoader extends FlowUtils {
1614
val httpTimeout = config.getInt("hmda.httpTimeout")
1715
val adminUrl = config.getString("hmda.adminUrl")
1816
val url = s"$adminUrl/institutions"
@@ -22,6 +20,9 @@ object PanelCsvLoader extends FlowUtils with InstitutionFlowUtils {
2220
override implicit val ec = system.dispatcher
2321
val log = LoggerFactory.getLogger("hmda")
2422

23+
val host = config.getString("hmda.panel.tcp.host")
24+
val port = config.getInt("hmda.panel.tcp.port")
25+
2526
def main(args: Array[String]): Unit = {
2627

2728
if (args.length < 1) {
@@ -33,34 +34,15 @@ object PanelCsvLoader extends FlowUtils with InstitutionFlowUtils {
3334
exitSys(log, "File does not exist", 2)
3435
}
3536

36-
sendGetRequest("delete", url).map(_ => {
37-
val response = sendGetRequest("create", url)
37+
val connectionFlow = Tcp().outgoingConnection(host, port)
3838

39-
val source = FileIO.fromPath(file.toPath)
39+
val source = FileIO.fromPath(file.toPath)
4040

41-
response.onComplete(s =>
42-
if (s.getOrElse("").equals("InstitutionSchemaCreated()")) {
43-
log.info(s.get)
44-
val completedF = source
45-
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024, allowTruncation = true))
46-
.drop(1)
47-
.via(byte2StringFlow)
48-
.via(institutionStringToHttpFlow(url))
49-
.via(singleConnectionFlow)
50-
.runWith(Sink.foreach[HttpResponse](elem => log.info(elem.entity.toString)))
41+
source
42+
.via(connectionFlow)
43+
.runWith(Sink.last)
44+
.onComplete(_ => system.terminate())
5145

52-
completedF.onComplete(result => {
53-
if (result.isSuccess)
54-
sys.exit(0)
55-
else {
56-
exitSys(log, "Error while processing institutions", 4)
57-
}
58-
})
59-
} else {
60-
exitSys(log, "Error creating institutions schema", 3)
61-
})
62-
})
6346
}
64-
6547
}
6648

0 commit comments

Comments
 (0)