Skip to content
This repository was archived by the owner on Jul 1, 2024. It is now read-only.

Commit 9fe7b3b

Browse files
authored
Merge pull request #33 from SoMind/mqtt-merge
refactor
2 parents 666ea89 + d5b5264 commit 9fe7b3b

File tree

12 files changed

+238
-146
lines changed

12 files changed

+238
-146
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
.bsp/
2+
runme*sh
13
target/
24
project/target/
35
project/project/

README.md

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,35 @@
22

33
[![Codacy Badge](https://api.codacy.com/project/badge/Grade/25295eb70b8d463486773644f4b1c215)](https://app.codacy.com/gh/SoMind/dtlab-ingest-mqtt?utm_source=github.com&utm_medium=referral&utm_content=SoMind/dtlab-ingest-mqtt&utm_campaign=Badge_Grade_Dashboard)
44

5-
## UNDER CONSTRUCTION
6-
75
Listen for data on an MQTT topic and post each message to a remote HTTP service (probably a dtlab-ingest instance).
86

97
```console
108
sbt assembly && MQTT_URL=ssl://YOUR_HOST:8883 MQTT_TOPIC=test/c/ MQTT_CLIENT_ID=YOUR_CLIENT_ID MQTT_USER=YOUR_USER MQTT_PWD=YOUR_PWD java -jar target/scala-2.12/DtLabMqttIngest.jar
11-
```
9+
```
10+
11+
### Build and run
12+
13+
```console
14+
sbt assembly
15+
java -jar target/scala-2.12/MqttToKafka.jar
16+
```
17+
18+
### Test and Code Coverage
19+
20+
```console
21+
sbt clean coverage test
22+
sbt coverageReport
23+
open target/scala-2.12/scoverage-report/index.html
24+
```
25+
26+
### Code Stats
27+
28+
```console
29+
sbt stats
30+
```
31+
### Check dependencies
32+
33+
```console
34+
sbt dependencyUpdates
35+
```
36+

build.sbt

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,53 @@
1-
name := "DtLabIngestMqtt"
1+
name := "DtlabIngestMqtt"
22

33
fork := true
44
javaOptions in test ++= Seq(
5-
"-Xms128M", "-Xmx256M",
6-
"-XX:MaxPermSize=256M",
5+
"-Xms512M", "-Xmx2048M",
6+
"-XX:MaxPermSize=2048M",
77
"-XX:+CMSClassUnloadingEnabled"
88
)
99

1010
parallelExecution in test := false
1111

12+
scalacOptions += "-Ypartial-unification"
13+
1214
version := "1.0"
1315

14-
scalaVersion := "2.12.12"
15-
val akkaVersion = "2.6.10"
16+
scalaVersion := "2.12.8"
17+
val akkaVersion = "2.5.21"
1618

1719
libraryDependencies ++=
1820
Seq(
19-
20-
"tech.navicore" %% "navipath" % "4.0.2",
21+
"com.typesafe" % "config" % "1.3.3",
2122
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.2",
22-
"com.typesafe" % "config" % "1.4.1",
2323
"ch.qos.logback" % "logback-classic" % "1.2.3",
24+
"org.slf4j" % "jul-to-slf4j" % "1.7.30",
25+
26+
"org.typelevel" %% "cats-core" % "1.6.0",
2427
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
2528
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
29+
2630
"com.lightbend.akka" %% "akka-stream-alpakka-mqtt" % "2.0.2",
27-
"org.scalatest" %% "scalatest" % "3.2.2" % "test"
2831

32+
"org.scalatest" %% "scalatest" % "3.0.5" % "test"
2933
)
3034

35+
dependencyOverrides ++= Seq(
36+
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
37+
"com.typesafe.akka" %% "akka-stream" % akkaVersion
38+
)
39+
3140
mainClass in assembly := Some("somind.dtlab.ingest.mqtt.Main")
32-
assemblyJarName in assembly := "DtLabIngestMqtt.jar"
41+
assemblyJarName in assembly := "DtlabIngestMqtt.jar"
3342

3443
assemblyMergeStrategy in assembly := {
3544
case PathList("reference.conf") => MergeStrategy.concat
3645
case x if x.endsWith("io.netty.versions.properties") => MergeStrategy.first
37-
case PathList("META-INF", _ @ _*) => MergeStrategy.discard
46+
//case PathList("META-INF", _ @ _*) => MergeStrategy.discard
47+
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
48+
case PathList("META-INF", "ECLIPSE_.RSA") => MergeStrategy.discard
49+
case PathList("META-INF", "ECLIPSE_.SF") => MergeStrategy.discard
50+
//case x if x.endsWith("MANIFEST.MF") => MergeStrategy.last
3851
case _ => MergeStrategy.first
3952
}
4053

src/main/resources/application.conf

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,34 @@
11
main {
22

3-
appName = "DtLabIngestMqtt"
3+
appName = "DtlabIngestMqtt"
4+
appName = ${?APP_NAME}
45

5-
}
6+
keyStorePath = "~/secrets/keystore.jks"
7+
keyStorePath = ${?KEYSTORE_PATH}
8+
keyStorePassword = "mysecret"
9+
keyStorePassword = ${?KEYSTORE_PASSWORD}
10+
11+
mqttTopic = "data"
12+
mqttTopic = ${?MQTT_TOPIC}
613

7-
mqtt {
8-
url = ${MQTT_URL}
9-
user = ${MQTT_USER}
10-
pwd = ${MQTT_PWD}
11-
topic = ${MQTT_TOPIC}
12-
clientId = ${MQTT_CLIENT_ID}
14+
mqttUrl = "ssl://my.mqtt.host:12345"
15+
mqttUrl = ${MQTT_URL}
16+
17+
mqttClientId = "dtLabClient"
18+
mqttClientId = ${?MQTT_CLIENT_ID}
1319
}
1420

1521
akka {
22+
1623
# Options: OFF, ERROR, WARNING, INFO, DEBUG
24+
loglevel = "ERROR"
1725
loglevel = ${?AKKA_LOG_LEVEL}
18-
loglevel = "INFO"
19-
}
2026

27+
actor {
28+
serializers {
29+
}
30+
serialization-bindings {
31+
}
32+
}
33+
34+
}

src/main/resources/logback.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66
</encoder>
77
</appender>
88

9-
<root level="${SYS_LOG_LEVEL:-warn}">
9+
<root level="${SYS_LOG_LEVEL:-error}">
1010
<appender-ref ref="STDOUT"/>
1111
</root>
1212

13-
<logger name="somind" level="${LOG_LEVEL:-debug}"/>
13+
<logger name="somind" level="${LOG_LEVEL:-info}"/>
1414

1515
</configuration>
1616

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,23 @@
11
package somind.dtlab.ingest.mqtt
22

33
import akka.actor.ActorSystem
4-
import akka.stream._
4+
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
55
import com.typesafe.config.{Config, ConfigFactory}
66
import com.typesafe.scalalogging.LazyLogging
77

8-
object Conf extends Conf with LazyLogging {
8+
object Conf extends LazyLogging {
99

10-
implicit val actorSystem: ActorSystem = ActorSystem("MqttKafka")
11-
12-
val decider: Supervision.Decider = { e: Throwable =>
13-
logger.error(s"decider can not decide: $e - restarting...", e)
14-
Supervision.Restart
15-
}
10+
InitJavaLogging()
1611

12+
implicit val actorSystem: ActorSystem = ActorSystem("MqttToKafkaSystem")
1713
implicit val materializer: ActorMaterializer = ActorMaterializer(
18-
ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
19-
}
20-
21-
trait Conf {
14+
ActorMaterializerSettings(actorSystem))
2215

2316
val conf: Config = ConfigFactory.load()
24-
25-
val mqttClientId: String = conf.getString("mqtt.clientId")
26-
val mqttUrl: String = conf.getString("mqtt.url")
27-
val mqttUser: String = conf.getString("mqtt.user")
28-
val mqttPwd: String = conf.getString("mqtt.pwd")
29-
val mqttTopic: String = conf.getString("mqtt.topic")
17+
val mqttUrl: String = conf.getString("main.mqttUrl")
18+
val mqttClientId: String = conf.getString("main.mqttClientId")
19+
val mqttTopic: String = conf.getString("main.mqttTopic")
20+
val keyStorePassword: String = conf.getString("main.keyStorePassword")
21+
val keyStorePath: String = conf.getString("main.keyStorePath")
3022

3123
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package somind.dtlab.ingest.mqtt
2+
3+
import java.util.logging.Level
4+
5+
import com.typesafe.scalalogging.LazyLogging
6+
import org.slf4j.LoggerFactory
7+
8+
import scala.sys.env
9+
10+
/**
11+
* If you have java a lib dep like eclipse.paho that you want to see logging
12+
* from, it sucks to be you.
13+
*
14+
* You must init the java.util.logging subsystem with jul-to-slf4j to get
15+
* Logback and LazyLogging to all play together :(
16+
*
17+
* dependent on build.sbt libs
18+
* "ch.qos.logback" % "logback-classic" % "?.?.?",
19+
* "org.slf4j" % "jul-to-slf4j" % "?.?.?",
20+
*/
21+
object InitJavaLogging extends LazyLogging {
22+
23+
def apply(): Unit = {
24+
import java.util.logging.LogManager
25+
26+
import org.slf4j.bridge.SLF4JBridgeHandler
27+
28+
val pin: List[java.util.logging.Logger] = List(
29+
java.util.logging.Logger.getLogger(""),
30+
java.util.logging.Logger.getLogger("org.eclipse.paho.client.mqttv"),
31+
java.util.logging.Logger.getLogger("org.eclipse.paho")
32+
)
33+
34+
def canInstallBridgeHandler = {
35+
try {
36+
Class.forName("org.slf4j.impl.JDK14LoggerFactory",
37+
false,
38+
this.getClass.getClassLoader)
39+
LoggerFactory
40+
.getLogger(this.getClass)
41+
.warn("Detected [org.slf4j.impl.JDK14LoggerFactory] on classpath. " +
42+
"SLF4JBridgeHandler cannot be installed, see: http://www.slf4j.org/legacy.html#julRecursion")
43+
false
44+
} catch {
45+
case _: ClassNotFoundException =>
46+
true
47+
}
48+
}
49+
50+
val sysLogLevel = env.getOrElse("SYS_LOG_LEVEL", "info") match {
51+
case "debug" => Level.FINER
52+
case "trace" => Level.FINEST
53+
case "error" => Level.WARNING
54+
case _ => Level.INFO
55+
}
56+
57+
if (canInstallBridgeHandler) {
58+
LogManager.getLogManager.reset()
59+
SLF4JBridgeHandler.removeHandlersForRootLogger()
60+
SLF4JBridgeHandler.install()
61+
62+
java.util.logging.Logger
63+
.getLogger("")
64+
.addHandler(new SLF4JBridgeHandler())
65+
pin.foreach(l => l.setLevel(sysLogLevel))
66+
67+
logger.info("org.slf4j.bridge.SLF4JBridgeHandler installed.")
68+
}
69+
70+
}
71+
72+
}
Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,45 @@
11
package somind.dtlab.ingest.mqtt
22

33
import akka.Done
4+
import akka.stream.alpakka.mqtt._
5+
import akka.stream.alpakka.mqtt.scaladsl.{MqttMessageWithAck, MqttSource}
6+
import akka.stream.scaladsl.Source
47
import com.typesafe.scalalogging.LazyLogging
5-
6-
import scala.concurrent.Future
7-
import scala.util.{Failure, Success}
8+
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
89
import Conf._
910

10-
import scala.concurrent.ExecutionContext.Implicits.global
11+
import scala.concurrent.Future
1112

1213
object Main extends App with LazyLogging {
1314

14-
def handleTerminate(result: Future[Done]): Unit = {
15-
result onComplete {
16-
case Success(_) =>
17-
logger.warn("success. but stream should not end!")
18-
actorSystem.terminate()
19-
case Failure(e) =>
20-
logger.error(s"failure. stream should not end! $e", e)
21-
actorSystem.terminate()
22-
}
23-
}
24-
25-
handleTerminate(Stream())
15+
val sslContext = SslContextUtil()
16+
17+
val connectionSettings = MqttConnectionSettings(
18+
mqttUrl,
19+
mqttClientId,
20+
new MemoryPersistence
21+
).withAutomaticReconnect(true)
22+
.withServerUri(mqttUrl)
23+
.withSocketFactory(sslContext.getSocketFactory)
24+
.withCleanSession(false)
25+
26+
logger.debug(s"connection settings:\n$connectionSettings")
27+
28+
val src: Source[MqttMessageWithAck, Future[Done]] =
29+
MqttSource.atLeastOnce(
30+
connectionSettings.withClientId(
31+
clientId = "somind-mqtt-gateway-" + System.nanoTime().toString),
32+
MqttSubscriptions(mqttTopic, MqttQoS.AtLeastOnce),
33+
bufferSize = 8
34+
)
35+
36+
logger.info("starting...")
37+
logger.debug("debugging...")
38+
src
39+
.map(m => {
40+
m.ack()
41+
m.message.payload.map(_.toChar).mkString
42+
})
43+
.runWith(akka.stream.scaladsl.Sink.foreach(println))
2644

2745
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package somind.dtlab.ingest.mqtt
2+
3+
import java.io.FileInputStream
4+
import java.security.KeyStore
5+
6+
import com.typesafe.scalalogging.LazyLogging
7+
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
8+
import Conf._
9+
10+
object SslContextUtil extends LazyLogging {
11+
12+
def apply(): SSLContext = {
13+
14+
val keyStore = KeyStore.getInstance("pkcs12")
15+
keyStore.load(new FileInputStream(keyStorePath),
16+
keyStorePassword.toCharArray)
17+
val kmf =
18+
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
19+
kmf.init(keyStore, keyStorePassword.toCharArray)
20+
21+
val trustManagerFactory =
22+
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
23+
trustManagerFactory.init(keyStore)
24+
25+
val ctx: SSLContext = SSLContext.getInstance("TLS")
26+
ctx.init(kmf.getKeyManagers, trustManagerFactory.getTrustManagers, null)
27+
ctx
28+
29+
}
30+
31+
}

0 commit comments

Comments
 (0)