Skip to content

Commit 8106247

Browse files
authored
feat: supports mqtt (#22)
* feat: supports mqtt * x * x * x * x * x * x
1 parent 9ed2993 commit 8106247

File tree

8 files changed

+482
-9
lines changed

8 files changed

+482
-9
lines changed

Cargo.lock

Lines changed: 74 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,4 @@ reqwest = { version = "0.12", features = ["json"] }
4545
clap = { version = "4.3", features = ["derive"] }
4646
colored = "2.0"
4747
flume = "=0.10"
48+
rumqttc = "0.24.0"

examples/mqtt_example.yaml

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# 流式SQL处理示例配置
2+
3+
streams:
4+
- input:
5+
type: "mqtt"
6+
# 内存输入配置,提供一些测试消息
7+
host: "broker.emqx.io"
8+
port: 1883
9+
qos: 1
10+
client_id: "flow_xx_xdsfsfsfsf"
11+
topics: ["flow_xx/#"]
12+
13+
# - input:
14+
# type: "memory"
15+
# # 内存输入配置,提供一些测试消息
16+
# messages:
17+
# - '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
18+
# - '{ "timestamp": 1625000001000, "value": 15, "sensor": "temp_1" }'
19+
# - '{ "timestamp": 1625000002000, "value": 12, "sensor": "temp_1" }'
20+
21+
22+
pipeline:
23+
thread_num: 4
24+
processors:
25+
- type: "sql"
26+
# SQL查询语句,支持标准SQL语法
27+
query: "SELECT * ,cast(value as string) as tx FROM flow WHERE value > 10"
28+
# 表名(用于SQL查询中引用)
29+
# table_name: "events"
30+
31+
32+
output:
33+
## type: "stdout"
34+
# type: "file"
35+
# append: false
36+
# path: "examples/output.txt"
37+
type: "mqtt"
38+
# 内存输入配置,提供一些测试消息
39+
host: "broker.emqx.io"
40+
port: 1883
41+
qos: 1
42+
client_id: "flow_xx_xdsfsfsfsfsdsd"
43+
topic: "flow_xx_y"

src/input/memory.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,6 @@ impl Input for MemoryInput {
7373
if let Some(msg) = msg_option {
7474
Ok(msg)
7575
} else {
76-
// 如果队列为空,则等待一段时间后返回错误
77-
// 在实际应用中,这里可能需要实现更复杂的等待机制
78-
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
7976
Err(Error::Done)
8077
}
8178
}
@@ -85,7 +82,7 @@ impl Input for MemoryInput {
8582
Ok(())
8683
}
8784

88-
async fn close(&self) -> Result<(), Error> {
85+
async fn close(&self) -> Result<(), Error> {
8986
self.connected.store(false, std::sync::atomic::Ordering::SeqCst);
9087
Ok(())
9188
}

src/input/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub mod file;
1212
pub mod http;
1313
pub mod kafka;
1414
pub mod memory;
15+
pub mod mqtt;
1516

1617
/// 输入组件的特征接口
1718
#[async_trait]
@@ -77,6 +78,7 @@ pub enum InputConfig {
7778
Http(http::HttpInputConfig),
7879
// Kafka(kafka::KafkaInputConfig),
7980
Memory(memory::MemoryInputConfig),
81+
Mqtt(mqtt::MqttInputConfig),
8082
}
8183

8284
impl InputConfig {
@@ -87,6 +89,7 @@ impl InputConfig {
8789
InputConfig::Http(config) => Ok(Arc::new(http::HttpInput::new(config)?)),
8890
// InputConfig::Kafka(config) => Ok(Arc::new(kafka::KafkaInput::new(config)?)),
8991
InputConfig::Memory(config) => Ok(Arc::new(memory::MemoryInput::new(config)?)),
92+
InputConfig::Mqtt(config) => Ok(Arc::new(mqtt::MqttInput::new(config)?)),
9093
}
9194
}
9295
}

0 commit comments

Comments
 (0)