Skip to content

Commit ca20e17

Browse files
add generate mqtt worker from plugin
1 parent a557071 commit ca20e17

File tree

3 files changed

+58
-12
lines changed

3 files changed

+58
-12
lines changed

cmd/candi/plugin.go

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ type plugin struct {
1111
const (
1212
pluginGCPPubSubWorker = "GCPPubSubWorker"
1313
pluginSTOMPWorker = "STOMPWorker"
14+
pluginMQTTWorker = "MQTTWorker"
1415
)
1516

1617
var (
@@ -54,7 +55,7 @@ var (
5455
`import (`: `import (
5556
stompbroker "github.com/golangid/candi-plugin/stomp-broker"`,
5657
"brokerDeps := broker.InitBrokers(": `brokerDeps := broker.InitBrokers(
57-
stompbroker.NewSTOMPBroker(stompbroker.InitDefaultConnection("[broker host]", "[username]", "[password]")),`,
58+
stompbroker.NewSTOMPBroker(stompbroker.InitDefaultConnection("127.0.0.1:61613", "[username]", "[password]")),`,
5859
},
5960
editAppFactory: map[string]string{
6061
`import (`: `import (
@@ -75,5 +76,43 @@ var (
7576
stompbroker.STOMPBroker: workerhandler.NewSTOMPWorkerHandler(usecase.GetSharedUsecase(), deps),`,
7677
},
7778
},
79+
80+
pluginMQTTWorker: {
81+
name: pluginMQTTWorker,
82+
packageName: "github.com/golangid/candi-plugin/mqtt-broker",
83+
editConfig: map[string]string{
84+
`import (`: `import (
85+
mqtt "github.com/eclipse/paho.mqtt.golang"
86+
mqttbroker "github.com/golangid/candi-plugin/mqtt-broker"`,
87+
"brokerDeps := broker.InitBrokers(": `brokerDeps := broker.InitBrokers(
88+
mqttbroker.NewMQTTBroker(mqtt.NewClientOptions().
89+
AddBroker("tcp://127.0.0.1:1883").
90+
SetClientID("MQTTClientID").
91+
SetUsername("MQTTUsername").
92+
SetPassword("MQTTPassword").
93+
SetCleanSession(false).
94+
SetAutoReconnect(true).
95+
SetConnectRetry(true),
96+
),`,
97+
},
98+
editAppFactory: map[string]string{
99+
`import (`: `import (
100+
mqttbroker "github.com/golangid/candi-plugin/mqtt-broker"`,
101+
`return
102+
}`: `apps = append(apps, mqttbroker.NewMQTTSubscriber(
103+
service,
104+
service.GetDependency().GetBroker(mqttbroker.MQTTBroker),
105+
))
106+
return
107+
}`,
108+
},
109+
editModule: map[string]string{
110+
"import (": `import (
111+
mqttbroker "github.com/golangid/candi-plugin/mqtt-broker"
112+
`,
113+
"mod.workerHandlers = map[types.Worker]interfaces.WorkerHandler{": `mod.workerHandlers = map[types.Worker]interfaces.WorkerHandler{
114+
mqttbroker.MQTTBroker: workerhandler.NewMQTTWorkerHandler(usecase.GetSharedUsecase(), deps),`,
115+
},
116+
},
78117
}
79118
)

cmd/candi/project_generator_utils.go

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,11 @@ func filterWorkerHandler(cfg *serviceConfig, flagParam *flagParameter) (wording
235235
options = append(options, fmt.Sprintf("%d) AMQ (STOMP) Consumer (plugin)", len(options)+1))
236236
handlers[strconv.Itoa(len(options))] = pluginSTOMPWorker
237237
}
238+
if flagParam.addModule || flagParam.initService || (flagParam.addHandler &&
239+
validateDir(flagParam.getFullModuleChildDir("delivery", "workerhandler", strings.ToLower(pluginMQTTWorker)+"_handler.go")) != nil) {
240+
options = append(options, fmt.Sprintf("%d) MQTT Subscriber (plugin)", len(options)+1))
241+
handlers[strconv.Itoa(len(options))] = pluginMQTTWorker
242+
}
238243

239244
wording = strings.Join(options, "\n")
240245
return
@@ -366,18 +371,16 @@ func getNeedFileUpdates(srvConfig *serviceConfig) (fileUpdates []fileUpdate) {
366371
})
367372
}
368373
for _, module := range srvConfig.Modules {
369-
if module.Skip {
374+
if module.Skip && !srvConfig.flag.addHandler {
370375
continue
371376
}
372377
moduleName := cleanSpecialChar.Replace(module.ModuleName)
373378
deliveryPackageDir := fmt.Sprintf(`"%s/internal/modules/%s/delivery`, module.PackagePrefix, moduleName)
374-
if !module.IsWorkerActive {
375-
fileUpdates = append(fileUpdates, fileUpdate{
376-
filepath: rootDir + "internal/modules/" + moduleName + "/module.go",
377-
oldContent: "// " + deliveryPackageDir + "/workerhandler",
378-
newContent: deliveryPackageDir + "/workerhandler",
379-
})
380-
}
379+
fileUpdates = append(fileUpdates, fileUpdate{
380+
filepath: rootDir + "internal/modules/" + moduleName + "/module.go",
381+
oldContent: "// " + deliveryPackageDir + "/workerhandler",
382+
newContent: deliveryPackageDir + "/workerhandler",
383+
})
381384
for before, after := range pl.editModule {
382385
fileUpdates = append(fileUpdates, fileUpdate{
383386
filepath: rootDir + "internal/modules/" + moduleName + "/module.go",
@@ -505,11 +508,15 @@ func getAllModuleHandler(path string) (wording string, handlers map[string]strin
505508
}
506509
if validateDir(path+"/workerhandler/"+strings.ToLower(pluginGCPPubSubWorker)+"_handler.go") == nil {
507510
options = append(options, fmt.Sprintf("%d) GCP PubSub Subscriber (plugin)", len(options)+1))
508-
handlers[strconv.Itoa(len(options))] = RabbitmqHandler
511+
handlers[strconv.Itoa(len(options))] = pluginGCPPubSubWorker
509512
}
510513
if validateDir(path+"/workerhandler/"+strings.ToLower(pluginSTOMPWorker)+"_handler.go") == nil {
511514
options = append(options, fmt.Sprintf("%d) AMQ (STOMP) Consumer (plugin)", len(options)+1))
512-
handlers[strconv.Itoa(len(options))] = RabbitmqHandler
515+
handlers[strconv.Itoa(len(options))] = pluginSTOMPWorker
516+
}
517+
if validateDir(path+"/workerhandler/"+strings.ToLower(pluginMQTTWorker)+"_handler.go") == nil {
518+
options = append(options, fmt.Sprintf("%d) MQTT Subscriber (plugin)", len(options)+1))
519+
handlers[strconv.Itoa(len(options))] = pluginMQTTWorker
513520
}
514521

515522
wording = strings.Join(options, "\n")

cmd/candi/template_delivery_worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ func (h *{{.WorkerPluginName}}Handler) handleTopic{{upper (camel .ModuleName)}}(
358358
trace, ctx := tracer.StartTraceWithContext(eventContext.Context(), "{{upper (camel .ModuleName)}}Delivery{{.WorkerPluginName}}:HandleTopic{{upper (camel .ModuleName)}}")
359359
defer trace.Finish()
360360
361-
fmt.Printf("message consumed by module {{.ModuleName}}. message: %s\n", eventContext.Context())
361+
fmt.Printf("message consumed by module {{.ModuleName}}. message: %s\n", eventContext.Message())
362362
363363
// exec usecase
364364
// h.uc.SomethingUsecase()

0 commit comments

Comments
 (0)