Skip to content

Commit a51b192

Browse files
Merge pull request #16 from koupleless/feat.support_http_tunnel
Feat.support http tunnel
2 parents 1fb5a4d + 40a5e04 commit a51b192

23 files changed

+1309
-161
lines changed

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,6 @@ COPY --from=builder /workspace/module_controller .
3434

3535
EXPOSE 9090
3636
EXPOSE 8080
37+
EXPOSE 7777
3738

3839
ENTRYPOINT ["./module_controller"]

cmd/module-controller/main.go

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/koupleless/module_controller/common/model"
2121
"github.com/koupleless/module_controller/controller/module_deployment_controller"
2222
"github.com/koupleless/module_controller/module_tunnels"
23+
"github.com/koupleless/module_controller/module_tunnels/koupleless_http_tunnel"
2324
"github.com/koupleless/module_controller/module_tunnels/koupleless_mqtt_tunnel"
2425
"github.com/koupleless/module_controller/report_server"
2526
"github.com/koupleless/virtual-kubelet/common/log"
@@ -71,13 +72,20 @@ func main() {
7172

7273
if err != nil {
7374
log.G(ctx).WithError(err).Error("failed to parse WORKLOAD_MAX_LEVEL, will be set to 3 default")
75+
workloadMaxLevel = 3
76+
}
77+
78+
vnodeWorkerNum, err := strconv.Atoi(utils.GetEnv("VNODE_WORKER_NUM", "8"))
79+
if err != nil {
80+
log.G(ctx).WithError(err).Error("failed to parse VNODE_WORKER_NUM, will be set to 8 default")
81+
vnodeWorkerNum = 8
7482
}
7583

7684
kubeConfig := config.GetConfigOrDie()
7785
mgr, err := manager.New(kubeConfig, manager.Options{
7886
Cache: cache.Options{},
7987
Metrics: server.Options{
80-
BindAddress: "0",
88+
BindAddress: ":9090",
8189
},
8290
})
8391

@@ -88,9 +96,36 @@ func main() {
8896

8997
tracker.SetTracker(&tracker.DefaultTracker{})
9098

91-
tl := &koupleless_mqtt_tunnel.MqttTunnel{
92-
Cache: mgr.GetCache(),
93-
Client: mgr.GetClient(),
99+
tunnels := make([]tunnel.Tunnel, 0)
100+
moduleTunnels := make([]module_tunnels.ModuleTunnel, 0)
101+
102+
mqttTunnelEnable := utils.GetEnv("ENABLE_MQTT_TUNNEL", "false")
103+
if mqttTunnelEnable == "true" {
104+
mqttTl := &koupleless_mqtt_tunnel.MqttTunnel{
105+
Cache: mgr.GetCache(),
106+
Client: mgr.GetClient(),
107+
}
108+
109+
tunnels = append(tunnels, mqttTl)
110+
moduleTunnels = append(moduleTunnels, mqttTl)
111+
}
112+
113+
httpTunnelEnable := utils.GetEnv("ENABLE_HTTP_TUNNEL", "false")
114+
if httpTunnelEnable == "true" {
115+
httpTunnelListenPort, err := strconv.Atoi(utils.GetEnv("HTTP_TUNNEL_LISTEN_PORT", "7777"))
116+
117+
if err != nil {
118+
log.G(ctx).WithError(err).Error("failed to parse HTTP_TUNNEL_LISTEN_PORT, set default port 7777")
119+
httpTunnelListenPort = 7777
120+
}
121+
122+
httpTl := &koupleless_http_tunnel.HttpTunnel{
123+
Cache: mgr.GetCache(),
124+
Client: mgr.GetClient(),
125+
Port: httpTunnelListenPort,
126+
}
127+
tunnels = append(tunnels, httpTl)
128+
moduleTunnels = append(moduleTunnels, httpTl)
94129
}
95130

96131
rcc := vkModel.BuildVNodeControllerConfig{
@@ -99,11 +134,10 @@ func main() {
99134
VPodIdentity: model.ComponentModule,
100135
IsCluster: isCluster,
101136
WorkloadMaxLevel: workloadMaxLevel,
137+
VNodeWorkerNum: vnodeWorkerNum,
102138
}
103139

104-
vc, err := vnode_controller.NewVNodeController(&rcc, []tunnel.Tunnel{
105-
tl,
106-
})
140+
vc, err := vnode_controller.NewVNodeController(&rcc, tunnels)
107141
if err != nil {
108142
log.G(ctx).Error(err, "unable to set up VNodeController")
109143
return
@@ -118,9 +152,7 @@ func main() {
118152
enableModuleDeploymentController := utils.GetEnv("ENABLE_MODULE_DEPLOYMENT_CONTROLLER", "false")
119153

120154
if enableModuleDeploymentController == "true" {
121-
mdc, err := module_deployment_controller.NewModuleDeploymentController(env, []module_tunnels.ModuleTunnel{
122-
tl,
123-
})
155+
mdc, err := module_deployment_controller.NewModuleDeploymentController(env, moduleTunnels)
124156
if err != nil {
125157
log.G(ctx).Error(err, "unable to set up module_deployment_controller")
126158
return
@@ -133,11 +165,13 @@ func main() {
133165
}
134166
}
135167

136-
err = tl.Start(ctx, clientID, env)
137-
if err != nil {
138-
log.G(ctx).WithError(err).Error("failed to start tunnel", tl.Key())
139-
} else {
140-
log.G(ctx).Info("Tunnel started: ", tl.Key())
168+
for _, t := range tunnels {
169+
err = t.Start(ctx, clientID, env)
170+
if err != nil {
171+
log.G(ctx).WithError(err).Error("failed to start tunnel", t.Key())
172+
} else {
173+
log.G(ctx).Info("Tunnel started: ", t.Key())
174+
}
141175
}
142176

143177
log.G(ctx).Info("Module controller running")

common/model/consts.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,6 @@ const (
4444
)
4545

4646
const (
47-
LabelKeyOfTechStack = "base.koupleless.io/stack"
47+
LabelKeyOfTechStack = "base.koupleless.io/stack"
48+
LabelKeyOfArkletPort = "base.koupleless.io/arklet-port"
4849
)

common/model/model.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package model
22

33
import (
4-
"github.com/koupleless/arkctl/v1/service/ark"
4+
"github.com/koupleless/module_controller/module_tunnels/koupleless_http_tunnel/ark_service"
55
)
66

77
// ArkMqttMsg is the response of mqtt message payload.
@@ -17,6 +17,7 @@ type Metadata struct {
1717

1818
// HeartBeatData is the data of base heart beat.
1919
type HeartBeatData struct {
20+
BaseID string `json:"baseID"`
2021
State string `json:"state"`
2122
MasterBizInfo Metadata `json:"masterBizInfo"`
2223
NetworkInfo NetworkInfo `json:"networkInfo"`
@@ -29,10 +30,10 @@ type NetworkInfo struct {
2930
}
3031

3132
type BizOperationResponse struct {
32-
Command string `json:"command"`
33-
BizName string `json:"bizName"`
34-
BizVersion string `json:"bizVersion"`
35-
Response ark.ArkResponseBase `json:"response"`
33+
Command string `json:"command"`
34+
BizName string `json:"bizName"`
35+
BizVersion string `json:"bizVersion"`
36+
Response ark_service.ArkResponse `json:"response"`
3637
}
3738

3839
// QueryBaselineRequest is the request parameters of query baseline func

common/utils/utils.go

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
package utils
22

33
import (
4+
"context"
45
"fmt"
56
"github.com/koupleless/arkctl/common/fileutil"
67
"github.com/koupleless/arkctl/v1/service/ark"
78
"github.com/koupleless/module_controller/common/model"
9+
"github.com/koupleless/virtual-kubelet/common/log"
810
"github.com/koupleless/virtual-kubelet/common/utils"
911
vkModel "github.com/koupleless/virtual-kubelet/model"
1012
"github.com/sirupsen/logrus"
1113
corev1 "k8s.io/api/core/v1"
14+
apiErrors "k8s.io/apimachinery/pkg/api/errors"
15+
"sigs.k8s.io/controller-runtime/pkg/client"
16+
"strconv"
1217
"strings"
1318
"time"
1419
)
@@ -61,6 +66,10 @@ func TranslateHeartBeatDataToNodeInfo(data model.HeartBeatData) vkModel.NodeInfo
6166
if data.State == "ACTIVATED" {
6267
state = vkModel.NodeStatusActivated
6368
}
69+
labels := map[string]string{}
70+
if data.NetworkInfo.ArkletPort != 0 {
71+
labels[model.LabelKeyOfArkletPort] = strconv.Itoa(data.NetworkInfo.ArkletPort)
72+
}
6473
return vkModel.NodeInfo{
6574
Metadata: vkModel.NodeMetadata{
6675
Name: data.MasterBizInfo.Name,
@@ -71,6 +80,7 @@ func TranslateHeartBeatDataToNodeInfo(data model.HeartBeatData) vkModel.NodeInfo
7180
NodeIP: data.NetworkInfo.LocalIP,
7281
HostName: data.NetworkInfo.LocalHostName,
7382
},
83+
CustomLabels: labels,
7484
}
7585
}
7686

@@ -150,12 +160,14 @@ func TranslateSimpleBizDataToArkBizInfo(data model.ArkSimpleBizInfoData) *ark.Ar
150160
}
151161

152162
func GetContainerStateFromBizState(bizStateIndex string) vkModel.ContainerState {
153-
switch bizStateIndex {
154-
case "RESOLVED":
163+
switch strings.ToLower(bizStateIndex) {
164+
case "resolved":
155165
return vkModel.ContainerStateResolved
156-
case "ACTIVATED":
166+
case "activated":
157167
return vkModel.ContainerStateActivated
158-
case "DEACTIVATED":
168+
case "deactivated":
169+
return vkModel.ContainerStateDeactivated
170+
case "broken":
159171
return vkModel.ContainerStateDeactivated
160172
}
161173
return vkModel.ContainerStateWaiting
@@ -164,11 +176,13 @@ func GetContainerStateFromBizState(bizStateIndex string) vkModel.ContainerState
164176
func GetArkBizStateFromSimpleBizState(bizStateIndex string) string {
165177
switch bizStateIndex {
166178
case "2":
167-
return "RESOLVED"
179+
return "resolved"
168180
case "3":
169-
return "ACTIVATED"
181+
return "activated"
170182
case "4":
171-
return "DEACTIVATED"
183+
return "deactivated"
184+
case "5":
185+
return "broken"
172186
}
173187
return ""
174188
}
@@ -197,3 +211,39 @@ func GetLatestState(state string, records []ark.ArkBizStateRecord) (time.Time, s
197211
}
198212
return latestStateTime, reason, message
199213
}
214+
215+
func OnBaseUnreachable(ctx context.Context, info vkModel.UnreachableNodeInfo, env string, k8sClient client.Client) {
216+
// base not ready, delete from api server
217+
node := corev1.Node{}
218+
nodeName := utils.FormatNodeName(info.NodeID, env)
219+
err := k8sClient.Get(ctx, client.ObjectKey{Name: nodeName}, &node)
220+
logger := log.G(ctx).WithField("nodeID", info.NodeID).WithField("func", "OnNodeNotReady")
221+
if err == nil {
222+
// delete node from api server
223+
logger.Info("DeleteBaseNode")
224+
deleteErr := k8sClient.Delete(ctx, &node)
225+
if deleteErr != nil && !apiErrors.IsNotFound(err) {
226+
logger.WithError(deleteErr).Info("delete base node failed")
227+
}
228+
} else if apiErrors.IsNotFound(err) {
229+
logger.Info("Node not found, skipping delete operation")
230+
} else {
231+
logger.WithError(err).Error("Failed to get node, cannot delete")
232+
}
233+
}
234+
235+
func ExtractNetworkInfoFromNodeInfoData(initData vkModel.NodeInfo) model.NetworkInfo {
236+
portStr := initData.CustomLabels[model.LabelKeyOfArkletPort]
237+
238+
port, err := strconv.Atoi(portStr)
239+
if err != nil {
240+
logrus.Errorf("failed to parse port %s from node info", portStr)
241+
port = 1238
242+
}
243+
244+
return model.NetworkInfo{
245+
LocalIP: initData.NetworkInfo.NodeIP,
246+
LocalHostName: initData.NetworkInfo.HostName,
247+
ArkletPort: port,
248+
}
249+
}

common/utils/utils_test.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package utils
22

33
import (
4+
"context"
45
"fmt"
56
"github.com/koupleless/arkctl/v1/service/ark"
67
"github.com/koupleless/module_controller/common/model"
@@ -9,6 +10,7 @@ import (
910
corev1 "k8s.io/api/core/v1"
1011
"k8s.io/apimachinery/pkg/api/resource"
1112
"reflect"
13+
"sigs.k8s.io/controller-runtime/pkg/client/fake"
1214
"testing"
1315
"time"
1416
)
@@ -277,6 +279,7 @@ func TestTranslateHeartBeatDataToNodeInfo(t *testing.T) {
277279
NetworkInfo: model.NetworkInfo{
278280
LocalIP: "192.168.1.1",
279281
LocalHostName: "host1",
282+
ArkletPort: 1238,
280283
},
281284
},
282285
expected: vkModel.NodeInfo{
@@ -289,6 +292,9 @@ func TestTranslateHeartBeatDataToNodeInfo(t *testing.T) {
289292
NodeIP: "192.168.1.1",
290293
HostName: "host1",
291294
},
295+
CustomLabels: map[string]string{
296+
model.LabelKeyOfArkletPort: "1238",
297+
},
292298
},
293299
},
294300
{
@@ -313,6 +319,7 @@ func TestTranslateHeartBeatDataToNodeInfo(t *testing.T) {
313319
NodeIP: "192.168.1.2",
314320
HostName: "host2",
315321
},
322+
CustomLabels: map[string]string{},
316323
},
317324
},
318325
}
@@ -625,9 +632,9 @@ func TestTranslateSimpleBizDataToArkBizInfo(t *testing.T) {
625632

626633
func TestGetArkBizStateFromSimpleBizState(t *testing.T) {
627634
testCases := map[string]string{
628-
"2": "RESOLVED",
629-
"3": "ACTIVATED",
630-
"4": "DEACTIVATED",
635+
"2": "resolved",
636+
"3": "activated",
637+
"4": "deactivated",
631638
"123": "",
632639
}
633640
for input, expected := range testCases {
@@ -647,3 +654,16 @@ func TestGetLatestState_ChangeTimeLenLt3(t *testing.T) {
647654
assert.Empty(t, reason)
648655
assert.Empty(t, message)
649656
}
657+
658+
func TestExtractNetworkInfoFromNodeInfoData(t *testing.T) {
659+
data := ExtractNetworkInfoFromNodeInfoData(vkModel.NodeInfo{
660+
CustomLabels: map[string]string{
661+
model.LabelKeyOfArkletPort: ";",
662+
},
663+
})
664+
assert.Equal(t, data.ArkletPort, 1238)
665+
}
666+
667+
func TestOnBaseUnreachable(t *testing.T) {
668+
OnBaseUnreachable(context.Background(), vkModel.UnreachableNodeInfo{}, "test", fake.NewFakeClient())
669+
}

example/quick-start/base.yaml

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,13 @@ metadata:
77
spec:
88
containers:
99
- name: base
10-
image: serverless-registry.cn-shanghai.cr.aliyuncs.com/opensource/test/base-web:latest # 已经打包好的镜像
10+
image: serverless-registry.cn-shanghai.cr.aliyuncs.com/opensource/test/base-web:1.1.1 # 已经打包好的镜像
1111
imagePullPolicy: Always
1212
ports:
1313
- name: base
1414
containerPort: 8080
15+
- name: arklet
16+
containerPort: 1238
1517
env:
16-
- name: KUPLELESS_ARKLET_MQTT_BROKER
17-
value: 10.244.0.27
18-
- name: KUPLELESS_ARKLET_MQTT_PORT
19-
value: "1883"
20-
- name: KUPLELESS_ARKLET_MQTT_USERNAME
21-
value: koupleless_base
22-
- name: KUPLELESS_ARKLET_MQTT_PASSWORD
23-
value: public
24-
- name: KUPLELESS_ARKLET_MQTT_CLIENT_PREFIX
25-
value: koupleless
26-
- name: KUPLELESS_ARKLET_CUSTOM_TUNNEL_CLASSNAME
27-
value: com.alipay.sofa.koupleless.arklet.tunnel.mqtt.MqttTunnel
28-
- name: KUPLELESS_ARKLET_CUSTOM_BASE_METADATA_CLASSNAME
29-
value: com.alipay.sofa.web.base.metadata.MetadataHook
18+
- name: MODULE_CONTROLLER_ADDRESS
19+
value: {YOUR_MODULE_CONTROLLER_IP}

0 commit comments

Comments
 (0)