Skip to content

Commit 3503340

Browse files
committed
Refactor panel -> loader project
1 parent e161e3b commit 3503340

File tree

10 files changed

+147
-108
lines changed

10 files changed

+147
-108
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
hmda {
2+
connectionFlowParallelism = 5
3+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package hmda.api.util
2+
3+
import akka.NotUsed
4+
import akka.actor.ActorSystem
5+
import akka.http.scaladsl.Http
6+
import akka.http.scaladsl.model.{ HttpMethods, HttpRequest, HttpResponse, Uri }
7+
import akka.http.scaladsl.unmarshalling.Unmarshal
8+
import akka.stream.ActorMaterializer
9+
import akka.stream.scaladsl.{ Flow, Framing }
10+
import akka.util.ByteString
11+
import com.typesafe.config.ConfigFactory
12+
import org.slf4j.Logger
13+
import scala.concurrent.ExecutionContext
14+
15+
trait FlowUtils {
16+
17+
implicit val system: ActorSystem
18+
implicit val materializer: ActorMaterializer
19+
implicit val ec: ExecutionContext
20+
val config = ConfigFactory.load()
21+
val parallelism = config.getInt("hmda.connectionFlowParallelism")
22+
23+
def singleConnectionFlow: Flow[HttpRequest, HttpResponse, NotUsed] =
24+
Flow[HttpRequest]
25+
.mapAsync[HttpResponse](parallelism)(request => {
26+
for {
27+
response <- Http().singleRequest(request)
28+
} yield response
29+
})
30+
31+
def framing: Flow[ByteString, ByteString, NotUsed] = {
32+
Framing.delimiter(ByteString("\n"), maximumFrameLength = 65536, allowTruncation = true)
33+
}
34+
35+
def byte2StringFlow: Flow[ByteString, String, NotUsed] =
36+
Flow[ByteString].map(bs => bs.utf8String)
37+
38+
def sendGetRequest(req: String, url: Uri) = {
39+
val request = HttpRequest(HttpMethods.GET, uri = s"$url/$req")
40+
for {
41+
response <- Http().singleRequest(request)
42+
content <- Unmarshal(response.entity).to[String]
43+
} yield content
44+
}
45+
46+
def exitSys(log: Logger, errorMessage: String, code: Int) = {
47+
log.error(errorMessage)
48+
system.terminate()
49+
Thread.sleep(100)
50+
sys.exit(code)
51+
}
52+
}

build.sbt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,13 @@ lazy val validation = (project in file("validation"))
126126
).dependsOn(parserJVM % "compile->compile;test->test")
127127
.dependsOn(persistenceModel % "compile->compile;test->test")
128128

129-
lazy val panel = (project in file("panel"))
129+
lazy val loader = (project in file("loader"))
130130
.settings(hmdaBuildSettings: _*)
131131
.settings(Revolver.settings:_*)
132132
.settings(
133133
Seq(
134134
assemblyJarName in assembly := {s"${name.value}.jar"},
135-
mainClass in assembly := Some("hmda.panel.PanelCsvLoader"),
135+
mainClass in assembly := Some("hmda.loader.panel.PanelCsvLoader"),
136136
assemblyMergeStrategy in assembly := {
137137
case "application.conf" => MergeStrategy.concat
138138
case "META-INF/io.netty.versions.properties" => MergeStrategy.concat

panel/src/main/resources/application.conf renamed to loader/src/main/resources/application.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ akka {
77
hmda {
88
adminUrl = "http://0.0.0.0:8081"
99
adminUrl = ${?HMDA_HTTP_ADMIN_URL}
10+
httpTimeout = 5
1011
}
1112

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package hmda.loader.http
2+
3+
import akka.NotUsed
4+
import akka.http.scaladsl.model._
5+
import akka.stream.scaladsl.Flow
6+
import akka.util.ByteString
7+
import hmda.api.protocol.admin.WriteInstitutionProtocol
8+
import hmda.parser.fi.InstitutionParser
9+
import spray.json._
10+
11+
trait InstitutionFlowUtils extends WriteInstitutionProtocol {
12+
13+
def institutionStringToHttpFlow(url: Uri): Flow[String, HttpRequest, NotUsed] =
14+
Flow[String]
15+
.map { x =>
16+
val payload = ByteString(InstitutionParser(x).toJson.toString)
17+
HttpRequest(
18+
HttpMethods.POST,
19+
uri = url,
20+
entity = HttpEntity(MediaTypes.`application/json`, payload)
21+
)
22+
}
23+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package hmda.panel
2+
3+
import java.io.File
4+
5+
import akka.actor.ActorSystem
6+
import akka.http.scaladsl.model.HttpResponse
7+
import akka.stream.ActorMaterializer
8+
import akka.stream.scaladsl.{ FileIO, Framing, Sink }
9+
import akka.util.ByteString
10+
import hmda.loader.http.InstitutionFlowUtils
11+
import org.slf4j.LoggerFactory
12+
import hmda.api.util.FlowUtils
13+
import scala.concurrent.duration._
14+
15+
object PanelCsvLoader extends FlowUtils with InstitutionFlowUtils {
16+
val httpTimeout = config.getInt("hmda.httpTimeout")
17+
val adminUrl = config.getString("hmda.adminUrl")
18+
val url = s"$adminUrl/institutions"
19+
val duration = httpTimeout.seconds
20+
override implicit val system: ActorSystem = ActorSystem("hmda-loader")
21+
override implicit val materializer = ActorMaterializer()
22+
override implicit val ec = system.dispatcher
23+
val log = LoggerFactory.getLogger("hmda")
24+
25+
def main(args: Array[String]): Unit = {
26+
27+
if (args.length < 1) {
28+
exitSys(log, "No file argument provided", 1)
29+
}
30+
31+
val file = new File(args(0))
32+
if (!file.exists() || !file.isFile) {
33+
exitSys(log, "File does not exist", 2)
34+
}
35+
36+
sendGetRequest("delete", url).map(_ => {
37+
val response = sendGetRequest("create", url)
38+
39+
val source = FileIO.fromPath(file.toPath)
40+
41+
response.onComplete(s =>
42+
if (s.getOrElse("").equals("InstitutionSchemaCreated()")) {
43+
log.info(s.get)
44+
val completedF = source
45+
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024, allowTruncation = true))
46+
.drop(1)
47+
.via(byte2StringFlow)
48+
.via(institutionStringToHttpFlow(url))
49+
.via(singleConnectionFlow)
50+
.runWith(Sink.foreach[HttpResponse](elem => log.info(elem.entity.toString)))
51+
52+
completedF.onComplete(result => {
53+
if (result.isSuccess)
54+
sys.exit(0)
55+
else {
56+
exitSys(log, "Error while processing institutions", 4)
57+
}
58+
})
59+
} else {
60+
exitSys(log, "Error creating institutions schema", 3)
61+
})
62+
})
63+
}
64+
65+
}
66+

panel/src/main/scala/hmda/panel/PanelCsvLoader.scala

Lines changed: 0 additions & 106 deletions
This file was deleted.

0 commit comments

Comments
 (0)