@@ -8,11 +8,15 @@ import akka.stream.scaladsl.{ Flow, Tcp }
8
8
import akka .util .{ ByteString , Timeout }
9
9
import hmda .api .tcp .TcpApi
10
10
import hmda .api .util .FlowUtils
11
+ import hmda .model .fi .Filing
11
12
import hmda .model .institution .Institution
12
13
import hmda .parser .fi .InstitutionParser
13
- import hmda .persistence .institutions .InstitutionPersistence
14
+ import hmda .persistence .HmdaSupervisor .FindFilings
15
+ import hmda .persistence .institutions .FilingPersistence .CreateFiling
16
+ import hmda .persistence .institutions .{ FilingPersistence , InstitutionPersistence }
14
17
import hmda .persistence .institutions .InstitutionPersistence .CreateInstitution
15
18
import hmda .persistence .model .HmdaSupervisorActor .FindActorByName
19
+
16
20
import scala .concurrent .{ ExecutionContext , Future }
17
21
import scala .concurrent .duration ._
18
22
@@ -35,15 +39,14 @@ class InstitutionAdminTcpApi(supervisor: ActorRef) extends TcpApi with FlowUtils
35
39
implicit val timeout = Timeout (duration)
36
40
val buffer = config.getInt(" hmda.panel.tcp.parallelism" )
37
41
38
- val fInstitutionsActor = (supervisor ? FindActorByName (InstitutionPersistence .name)).mapTo[ActorRef ]
39
-
40
42
val tcpHandler : Flow [ByteString , ByteString , NotUsed ] =
41
43
Flow [ByteString ]
42
44
.via(framing)
43
45
.drop(1 )
44
46
.via(byte2StringFlow)
45
47
.map(x => InstitutionParser (x))
46
- .mapAsync(parallelism = buffer)(i => createInstitution(fInstitutionsActor, i))
48
+ .mapAsync(parallelism = buffer)(i => createInstitution(i))
49
+ .mapAsync(parallelism = buffer)(i => createFiling(i))
47
50
.map(e => ByteString (e.toString))
48
51
49
52
override val tcp : Future [Tcp .ServerBinding ] = Tcp ().bindAndHandle(
@@ -54,11 +57,24 @@ class InstitutionAdminTcpApi(supervisor: ActorRef) extends TcpApi with FlowUtils
54
57
55
58
tcp pipeTo self
56
59
57
- private def createInstitution (fActor : Future [ActorRef ], i : Institution ): Future [Institution ] = {
60
+ private def createInstitution (i : Institution ): Future [Institution ] = {
61
+ val fInstitutionsActor = (supervisor ? FindActorByName (InstitutionPersistence .name)).mapTo[ActorRef ]
58
62
for {
59
- actor <- fActor
60
- i <- (actor ? CreateInstitution (i)).mapTo[Option [Institution ]].map(i => i.getOrElse(Institution .empty))
63
+ actor <- fInstitutionsActor
64
+ i <- (actor ? CreateInstitution (i))
65
+ .mapTo[Option [Institution ]]
66
+ .map(i => i.getOrElse(Institution .empty))
61
67
} yield i
62
68
}
63
69
70
+ private def createFiling (institution : Institution ): Future [Filing ] = {
71
+ val fFilingPersistence = (supervisor ? FindFilings (FilingPersistence .name, institution.id)).mapTo[ActorRef ]
72
+ for {
73
+ actor <- fFilingPersistence
74
+ f <- (actor ? CreateFiling (Filing (institution.activityYear.toString, institution.id)))
75
+ .mapTo[Option [Filing ]]
76
+ .map(x => x.getOrElse(Filing ()))
77
+ } yield f
78
+ }
79
+
64
80
}
0 commit comments