Skip to content

Commit ac910c5

Browse files
committed
fix(topo): register lag state type (#3719)
Signed-off-by: Jiyong Huang <huangjy@emqx.io> (cherry picked from commit 3ef13de)
1 parent 67301cb commit ac910c5

File tree

1 file changed

+27
-25
lines changed

1 file changed

+27
-25
lines changed

internal/binder/function/funcs_misc.go

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
1+
// Copyright 2022-2025 EMQ Technologies Co., Ltd.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ import (
2020
"crypto/sha256"
2121
"crypto/sha512"
2222
b64 "encoding/base64"
23+
"encoding/gob"
2324
"encoding/json"
2425
"fmt"
2526
"hash/crc32"
@@ -43,6 +44,7 @@ import (
4344
)
4445

4546
func registerMiscFunc() {
47+
gob.Register(&ringqueue{})
4648
builtins["bypass"] = builtinFunc{
4749
fType: ast.FuncTypeScalar,
4850
exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
@@ -692,19 +694,19 @@ func jsonCall(ctx api.StreamContext, args []interface{}) (interface{}, error) {
692694
// page Rotate storage for in memory cache
693695
// Not thread safe!
694696
type ringqueue struct {
695-
data []interface{}
696-
h int
697-
t int
698-
l int
699-
size int
697+
Data []any
698+
H int
699+
T int
700+
L int
701+
Size int
700702
}
701703

702704
func newRingqueue(size int) *ringqueue {
703705
return &ringqueue{
704-
data: make([]interface{}, size),
705-
h: 0, // When deleting, head++, if tail == head, it is empty
706-
t: 0, // When append, tail++, if tail== head, it is full
707-
size: size,
706+
Data: make([]interface{}, size),
707+
H: 0, // When deleting, head++, if tail == head, it is empty
708+
T: 0, // When append, tail++, if tail== head, it is full
709+
Size: size,
708710
}
709711
}
710712

@@ -719,41 +721,41 @@ func (p *ringqueue) fill(item interface{}) {
719721

720722
// append item if list is not full and return true; otherwise return false
721723
func (p *ringqueue) append(item interface{}) bool {
722-
if p.l == p.size { // full
724+
if p.L == p.Size { // full
723725
return false
724726
}
725-
p.data[p.t] = item
726-
p.t++
727-
if p.t == p.size {
728-
p.t = 0
727+
p.Data[p.T] = item
728+
p.T++
729+
if p.T == p.Size {
730+
p.T = 0
729731
}
730-
p.l++
732+
p.L++
731733
return true
732734
}
733735

734736
// fetch get the first item in the cache and remove
735737
func (p *ringqueue) fetch() (interface{}, bool) {
736-
if p.l == 0 {
738+
if p.L == 0 {
737739
return nil, false
738740
}
739-
result := p.data[p.h]
740-
p.h++
741-
if p.h == p.size {
742-
p.h = 0
741+
result := p.Data[p.H]
742+
p.H++
743+
if p.H == p.Size {
744+
p.H = 0
743745
}
744-
p.l--
746+
p.L--
745747
return result, true
746748
}
747749

748750
// peek get the first item in the cache but keep it
749751
func (p *ringqueue) peek() (interface{}, bool) {
750-
if p.l == 0 {
752+
if p.L == 0 {
751753
return nil, false
752754
}
753-
result := p.data[p.h]
755+
result := p.Data[p.H]
754756
return result, true
755757
}
756758

757759
func (p *ringqueue) isFull() bool {
758-
return p.l == p.size
760+
return p.L == p.Size
759761
}

0 commit comments

Comments
 (0)