Skip to content
This repository was archived by the owner on Jul 1, 2024. It is now read-only.

Commit ef38461

Browse files
committed
good long running tests
1 parent 3a25aa6 commit ef38461

File tree

4 files changed

+25
-18
lines changed

4 files changed

+25
-18
lines changed

src/main/resources/application.conf

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ main {
2323
mqttClientId = "dtLabClient"
2424
mqttClientId = ${?MQTT_CLIENT_ID}
2525

26-
dtlabIngestUri = "http://localhost:8082/dtlab-alligator/extractor/telemetry/CHANGEME"
27-
dtlabIngestUri = ${?DTLAB_INGEST_PATH}
26+
dtlabIngestUris = "http://localhost:8082/dtlab-alligator/extractor/telemetry/CHANGEME http://localhost:8082/dtlab-alligator/extractor/array/CHANGEME"
27+
dtlabIngestUris = ${?DTLAB_INGEST_PATHS}
2828

2929
telemetryContentType = "json"
3030
telemetryContentType = ${?TELEMETRY_CONTENT_TYPE}

src/main/scala/somind/dtlab/ingest/mqtt/Conf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ object Conf extends LazyLogging {
2626
val mqttTopic: String = conf.getString("main.mqttTopic")
2727
val keyStorePassword: String = conf.getString("main.keyStorePassword")
2828
val keyStorePath: String = conf.getString("main.keyStorePath")
29-
val dtlabIngestUri: String = conf.getString("main.dtlabIngestUri")
29+
val dtlabIngestUris: List[String] = conf.getString("main.dtlabIngestUris").split(' ').toList
3030
import scala.concurrent.duration._
3131
val webhookTimeoutSeconds: Duration =
3232
conf.getInt("main.webhookTimeoutSeconds").seconds

src/main/scala/somind/dtlab/ingest/mqtt/MqttSourceWebhookSinkStream.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,13 @@ object MqttSourceWebhookSinkStream extends LazyLogging {
4040

4141
src
4242
.runWith(akka.stream.scaladsl.Sink.foreach(m => {
43-
43+
// todo: move to an async flow that can scale horizontally
4444
Observer("mqtt_message_processing_by_sink")
4545
val payload = m.message.payload.map(_.toChar).mkString
4646
logger.debug(payload)
4747
PostString(payload) match {
48-
case Some(code) if code == StatusCodes.Accepted =>
48+
case Some(code)
49+
if code == StatusCodes.Accepted || code == StatusCodes.OK =>
4950
m.ack()
5051
Observer("mqtt_message_processed_by_sink_fitness")
5152
case code =>

src/main/scala/somind/dtlab/ingest/mqtt/utils/PostString.scala

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,25 @@ import scala.concurrent.{Await, Future, TimeoutException}
1010
object PostString extends LazyLogging {
1111

1212
val http: HttpExt = Http(actorSystem)
13-
val request: HttpRequest = HttpRequest()
14-
.withUri(uri = Uri(dtlabIngestUri))
13+
val request: HttpRequest = HttpRequest().withMethod(HttpMethods.POST)
1514

1615
def apply(payload: String): Option[StatusCode] = {
1716
try {
18-
val f = PostString.applyAsync(payload)
19-
val r = Await.result(f, webhookTimeoutSeconds)
20-
if (!r.status.isSuccess()) {
21-
logger.warn(s"post not successful: ${r._3}")
22-
}
23-
Some(r.status)
17+
18+
val results = dtlabIngestUris.map(uriString => {
19+
val f = PostString.applyAsync(payload, uriString)
20+
val r = Await.result(f, webhookTimeoutSeconds)
21+
if (!r.status.isSuccess()) {
22+
logger.warn(s"post not successful: $r")
23+
return None
24+
}
25+
logger.debug(s"post successful")
26+
Some(r.status)
27+
})
28+
results.last
2429
} catch {
2530
case e: TimeoutException =>
26-
logger.error(
27-
s"remote system is timing out: ${e.getMessage}")
31+
logger.error(s"remote system is timing out: ${e.getMessage}")
2832
System.exit(1)
2933
None
3034
case e: Throwable =>
@@ -34,12 +38,14 @@ object PostString extends LazyLogging {
3438
}
3539
}
3640

37-
def applyAsync(telem: String): Future[HttpResponse] = {
41+
def applyAsync(telem: String, uriString: String): Future[HttpResponse] = {
3842

3943
val newRequest =
40-
request.withEntity(entity = HttpEntity(telemetryContentType, telem))
44+
request
45+
.withEntity(entity = HttpEntity(telemetryContentType, telem))
46+
.withUri(uri = Uri(uriString))
4147

42-
logger.debug(s"sending request to: " + newRequest)
48+
logger.debug(s"posting request to: " + newRequest)
4349
http.singleRequest(newRequest)
4450
}
4551

0 commit comments

Comments
 (0)