Skip to content

Commit e3c0e90

Browse files
committed
Configure cluster receptionist
1 parent 1aa7314 commit e3c0e90

File tree

5 files changed

+81
-2
lines changed

5 files changed

+81
-2
lines changed

cluster/src/main/resources/application.conf

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ akka {
3939

4040
}
4141

42-
extensions = ["de.heikoseeberger.constructr.ConstructrExtension"]
42+
extensions = ["de.heikoseeberger.constructr.ConstructrExtension", "akka.cluster.client.ClusterClientReceptionist"]
4343

4444
}
4545

@@ -64,6 +64,7 @@ clustering {
6464
name = "hmda"
6565
name = ${?HMDA_CLUSTER_NAME}
6666
ip = "127.0.0.1"
67+
ip = ${?HMDA_CLUSTER_IP}
6768
port = 0
6869
port = ${?APP_PORT}
6970
}

loader/src/main/resources/application.conf

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
akka {
2-
loglevel = "INFO"
2+
loglevel = "DEBUG"
33
loglevel = ${?HMDA_LOGLEVEL}
44
actor.warn-about-java-serializer-usage = off
5+
actor.provider = cluster
6+
7+
extensions = ["akka.cluster.client.ClusterClientReceptionist"]
58
}
69

710
hmda {
11+
clusterName = "hmda"
812
adminUrl = "http://0.0.0.0:8081"
913
adminUrl = ${?HMDA_HTTP_ADMIN_URL}
14+
actorTimeout = 5
1015
httpTimeout = 5
1116
panel {
1217
tcp {
@@ -16,5 +21,11 @@ hmda {
1621
port = ${?HMDA_PANEL_LOADER_PORT}
1722
}
1823
}
24+
lar {
25+
host = "127.0.0.1"
26+
host = ${?HMDA_CLUSTER_HOST}
27+
port = 0
28+
port = ${?HMDA_CLUSTER_PORT}
29+
}
1930
}
2031

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package hmda.loader.lar
2+
3+
import akka.actor.{ActorPath, ActorRef, ActorSystem}
4+
import akka.pattern.ask
5+
import akka.cluster.client.{ClusterClient, ClusterClientSettings}
6+
import akka.util.Timeout
7+
import com.typesafe.config.ConfigFactory
8+
import hmda.model.fi.SubmissionId
9+
import hmda.persistence.HmdaSupervisor.{FindHmdaFiling, FindProcessingActor, FindSubmissions}
10+
import hmda.persistence.institutions.SubmissionPersistence
11+
import hmda.persistence.processing.SubmissionManager
12+
import scala.concurrent.duration._
13+
14+
object HmdaLarLoader extends App {
15+
16+
val config = ConfigFactory.load()
17+
18+
implicit val system = ActorSystem("hmda-cluster-client")
19+
implicit val ec = system.dispatcher
20+
21+
val hmdaClusterName = config.getString("hmda.clusterName")
22+
val hmdaClusterIP = config.getString("hmda.lar.host")
23+
val hmdaClusterPort = config.getInt("hmda.lar.port")
24+
val actorTimeout = config.getInt("hmda.actorTimeout")
25+
26+
implicit val timeout = Timeout(actorTimeout.seconds)
27+
28+
val initialContacts = Set(
29+
ActorPath.fromString(s"akka.tcp://$hmdaClusterName@$hmdaClusterIP:$hmdaClusterPort/system/receptionist")
30+
)
31+
32+
println(initialContacts)
33+
34+
val settings = ClusterClientSettings(system)
35+
.withInitialContacts(initialContacts)
36+
37+
val clusterClient = system.actorOf(ClusterClient.props(settings), "hmda-lar-loader")
38+
39+
val institutionId = "institutionID"
40+
val period = "2017"
41+
42+
val submissionId = SubmissionId(institutionId, period)
43+
44+
val message = FindProcessingActor(SubmissionManager.name, submissionId)
45+
46+
val fProcessingActor = (clusterClient ? ClusterClient.Send("/user/supervisor/singleton", message, localAffinity = true)).mapTo[ActorRef]
47+
val fSubmissionsActor = (clusterClient ? ClusterClient
48+
.Send("/user/supervisor/singleton",
49+
FindSubmissions(SubmissionPersistence.name, institutionId, period),
50+
localAffinity = true)).mapTo[ActorRef]
51+
52+
(clusterClient ? ClusterClient.Send("/user/supervisor/singleton", FindHmdaFiling(period), localAffinity = true)).mapTo[ActorRef]
53+
//(clusterClient ? ClusterClient.Send("/user/query-supervisor", FindHmdaFilingView(period), localAffinity = true)).mapTo[ActorRef]
54+
55+
56+
57+
58+
private def uploadFile(): Unit = ???
59+
60+
}

persistence/src/main/scala/hmda/persistence/HmdaSupervisor.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package hmda.persistence
22

33
import akka.actor.{ ActorRef, ActorSystem, Props }
4+
import akka.cluster.client.ClusterClientReceptionist
45
import hmda.model.fi.SubmissionId
56
import hmda.persistence.institutions.{ FilingPersistence, InstitutionPersistence, SubmissionPersistence }
67
import hmda.persistence.model.HmdaSupervisorActor
@@ -25,6 +26,8 @@ class HmdaSupervisor(validationStats: ActorRef) extends HmdaSupervisorActor {
2526

2627
import HmdaSupervisor._
2728

29+
ClusterClientReceptionist(context.system).registerService(self)
30+
2831
override def receive: Receive = super.receive orElse {
2932

3033
case FindHmdaFiling(filingPeriod) =>

query/src/main/scala/hmda/query/HmdaQuerySupervisor.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package hmda.query
22

33
import akka.actor.{ ActorRef, ActorSystem, Props }
4+
import akka.cluster.client.ClusterClientReceptionist
45
import akka.util.Timeout
56
import hmda.persistence.model.HmdaSupervisorActor
67
import hmda.query.view.filing.HmdaFilingView
78
import hmda.query.view.institutions.InstitutionView
89
import hmda.persistence.PersistenceConfig._
910
import hmda.persistence.messages.CommonMessages._
11+
1012
import scala.concurrent.duration._
1113

1214
object HmdaQuerySupervisor {
@@ -28,6 +30,8 @@ class HmdaQuerySupervisor extends HmdaSupervisorActor {
2830
implicit val timeout = Timeout(duration.seconds)
2931
implicit val ec = context.dispatcher
3032

33+
ClusterClientReceptionist(context.system).registerService(self)
34+
3135
override def receive: Receive = super.receive orElse {
3236
case FindHmdaFilingView(period) =>
3337
sender() ! findHmdaFilingView(period)

0 commit comments

Comments
 (0)