Skip to content

Commit a95c50a

Browse files
authored
feat: state store (#153)
Signed-off-by: Zike Yang <zike@apache.org>
1 parent eeae101 commit a95c50a

29 files changed

+1042
-302
lines changed

.github/workflows/ci.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ jobs:
5757
- run: make build_all
5858
- name: Wait for Pulsar service
5959
run: until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done
60-
- run: nohup bin/function-stream server > nohup.out &
6160
- run: make test
6261
- name: Collect Docker Compose logs
6362
if: failure()

benchmark/bench_test.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import (
3535
)
3636

3737
func BenchmarkStressForBasicFunc(b *testing.B) {
38-
s, err := server.NewServer(server.LoadConfigFromEnv())
38+
s, err := server.NewDefaultServer()
3939
if err != nil {
4040
b.Fatal(err)
4141
}
@@ -110,15 +110,7 @@ func BenchmarkStressForBasicFunc(b *testing.B) {
110110
func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) {
111111
memoryQueueFactory := contube.NewMemoryQueueFactory(context.Background())
112112

113-
svrConf := &common.Config{
114-
ListenAddr: common.DefaultAddr,
115-
}
116-
117-
fm, err := fs.NewFunctionManager(fs.WithDefaultTubeFactory(memoryQueueFactory))
118-
if err != nil {
119-
b.Fatal(err)
120-
}
121-
s, err := server.NewServer(svrConf, server.WithFunctionManager(fm))
113+
s, err := server.NewServer(server.WithFunctionManager(fs.WithDefaultTubeFactory(memoryQueueFactory)))
122114
if err != nil {
123115
b.Fatal(err)
124116
}

cmd/server/cmd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ var (
3535

3636
func exec(*cobra.Command, []string) {
3737
common.RunProcess(func() (io.Closer, error) {
38-
s, err := server.NewServer(server.LoadConfigFromEnv())
38+
s, err := server.NewServer()
3939
if err != nil {
4040
return nil, err
4141
}

cmd/standalone/cmd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ var (
3535

3636
func exec(*cobra.Command, []string) {
3737
common.RunProcess(func() (io.Closer, error) {
38-
s, err := server.NewServer(server.LoadStandaloneConfigFromEnv())
38+
s, err := server.NewServerWithConfig(server.LoadStandaloneConfigFromEnv())
3939
if err != nil {
4040
return nil, err
4141
}

fs/api/func_ctx.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2024 Function Stream Org.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package api
18+
19+
type FunctionContext interface {
20+
PutState(key string, value []byte) error
21+
GetState(key string) ([]byte, error)
22+
}

fs/api/instance.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
type FunctionInstance interface {
2727
Context() context.Context
28+
FunctionContext() FunctionContext
2829
Definition() *model.Function
2930
Index() int32
3031
Stop()
@@ -34,5 +35,5 @@ type FunctionInstance interface {
3435
}
3536

3637
type FunctionInstanceFactory interface {
37-
NewFunctionInstance(f *model.Function, sourceFactory contube.SourceTubeFactory, sinkFactory contube.SinkTubeFactory, i int32, logger *slog.Logger) FunctionInstance
38+
NewFunctionInstance(f *model.Function, funcCtx FunctionContext, sourceFactory contube.SourceTubeFactory, sinkFactory contube.SinkTubeFactory, i int32, logger *slog.Logger) FunctionInstance
3839
}

fs/api/statestore.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2024 Function Stream Org.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package api
18+
19+
import "github.com/pkg/errors"
20+
21+
var ErrNotFound = errors.New("key not found")
22+
23+
type StateStore interface {
24+
PutState(key string, value []byte) error
25+
GetState(key string) ([]byte, error)
26+
Close() error
27+
}

fs/contube/http.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ type HttpTubeFactory struct {
5656
endpoints map[string]*endpointHandler
5757
}
5858

59-
func NewHttpTubeFactory(ctx context.Context) TubeFactory {
59+
func NewHttpTubeFactory(ctx context.Context) *HttpTubeFactory {
6060
return &HttpTubeFactory{
6161
ctx: ctx,
6262
endpoints: make(map[string]*endpointHandler),

fs/contube/http_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424

2525
func TestHttpTubeHandleRecord(t *testing.T) {
2626
ctx, cancel := context.WithCancel(context.Background())
27-
f := NewHttpTubeFactory(ctx).(*HttpTubeFactory)
27+
f := NewHttpTubeFactory(ctx)
2828

2929
endpoint := "test"
3030
err := f.Handle(ctx, "test", []byte("test"))
@@ -51,7 +51,7 @@ func TestHttpTubeHandleRecord(t *testing.T) {
5151
}
5252

5353
func TestHttpTubeSinkTubeNotImplement(t *testing.T) {
54-
f := NewHttpTubeFactory(context.Background()).(*HttpTubeFactory)
54+
f := NewHttpTubeFactory(context.Background())
5555
_, err := f.NewSinkTube(context.Background(), make(ConfigMap))
5656
assert.ErrorIs(t, err, ErrSinkTubeNotImplemented)
5757
}

fs/func_ctx_impl.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2024 Function Stream Org.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package fs
18+
19+
import (
20+
"github.com/functionstream/function-stream/fs/api"
21+
"github.com/pkg/errors"
22+
)
23+
24+
var ErrStateStoreNotLoaded = errors.New("state store not loaded")
25+
26+
type FuncCtxImpl struct {
27+
api.FunctionContext
28+
store api.StateStore
29+
putStateFunc func(key string, value []byte) error
30+
getStateFunc func(key string) ([]byte, error)
31+
}
32+
33+
func NewFuncCtxImpl(store api.StateStore) *FuncCtxImpl {
34+
putStateFunc := func(key string, value []byte) error {
35+
return ErrStateStoreNotLoaded
36+
}
37+
getStateFunc := func(key string) ([]byte, error) {
38+
return nil, ErrStateStoreNotLoaded
39+
}
40+
if store != nil {
41+
putStateFunc = store.PutState
42+
getStateFunc = store.GetState
43+
}
44+
return &FuncCtxImpl{store: store,
45+
putStateFunc: putStateFunc,
46+
getStateFunc: getStateFunc}
47+
}
48+
49+
func (f *FuncCtxImpl) PutState(key string, value []byte) error {
50+
return f.putStateFunc(key, value)
51+
}
52+
53+
func (f *FuncCtxImpl) GetState(key string) ([]byte, error) {
54+
return f.getStateFunc(key)
55+
}

fs/func_ctx_impl_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2024 Function Stream Org.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package fs
18+
19+
import (
20+
"github.com/stretchr/testify/assert"
21+
"testing"
22+
)
23+
24+
func TestFuncCtx_NilStore(t *testing.T) {
25+
f := NewFuncCtxImpl(nil)
26+
assert.ErrorIs(t, f.PutState("key", []byte("value")), ErrStateStoreNotLoaded)
27+
_, err := f.GetState("key")
28+
assert.ErrorIs(t, err, ErrStateStoreNotLoaded)
29+
}

fs/instance_impl.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ import (
2828
)
2929

3030
type FunctionInstanceImpl struct {
31-
api.FunctionInstance
3231
ctx context.Context
32+
funcCtx api.FunctionContext
3333
cancelFunc context.CancelFunc
3434
definition *model.Function
3535
sourceFactory contube.SourceTubeFactory
@@ -47,18 +47,21 @@ const (
4747
CtxKeyInstanceIndex CtxKey = "instance-index"
4848
)
4949

50-
type DefaultInstanceFactory struct{}
50+
type DefaultInstanceFactory struct {
51+
api.FunctionInstanceFactory
52+
}
5153

5254
func NewDefaultInstanceFactory() api.FunctionInstanceFactory {
5355
return &DefaultInstanceFactory{}
5456
}
5557

56-
func (f *DefaultInstanceFactory) NewFunctionInstance(definition *model.Function, sourceFactory contube.SourceTubeFactory, sinkFactory contube.SinkTubeFactory, index int32, logger *slog.Logger) api.FunctionInstance {
58+
func (f *DefaultInstanceFactory) NewFunctionInstance(definition *model.Function, funcCtx api.FunctionContext, sourceFactory contube.SourceTubeFactory, sinkFactory contube.SinkTubeFactory, index int32, logger *slog.Logger) api.FunctionInstance {
5759
ctx, cancelFunc := context.WithCancel(context.Background())
5860
ctx = context.WithValue(ctx, CtxKeyFunctionName, definition.Name)
5961
ctx = context.WithValue(ctx, CtxKeyInstanceIndex, index)
6062
return &FunctionInstanceImpl{
6163
ctx: ctx,
64+
funcCtx: funcCtx,
6265
cancelFunc: cancelFunc,
6366
definition: definition,
6467
sourceFactory: sourceFactory,
@@ -136,6 +139,10 @@ func (instance *FunctionInstanceImpl) Context() context.Context {
136139
return instance.ctx
137140
}
138141

142+
func (instance *FunctionInstanceImpl) FunctionContext() api.FunctionContext {
143+
return instance.funcCtx
144+
}
145+
139146
func (instance *FunctionInstanceImpl) Definition() *model.Function {
140147
return instance.definition
141148
}

fs/instance_impl_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func TestFunctionInstanceContextSetting(t *testing.T) {
2828
Name: "test-function",
2929
}
3030
index := int32(1)
31-
instance := defaultInstanceFactory.NewFunctionInstance(definition, nil, nil, index, slog.Default())
31+
instance := defaultInstanceFactory.NewFunctionInstance(definition, nil, nil, nil, index, slog.Default())
3232

3333
if instance == nil {
3434
t.Error("FunctionInstance should not be nil")

fs/manager.go

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/functionstream/function-stream/fs/api"
2424
"github.com/functionstream/function-stream/fs/contube"
2525
"github.com/functionstream/function-stream/fs/runtime/wazero"
26+
"github.com/functionstream/function-stream/fs/statestore"
2627
"github.com/pkg/errors"
2728
"log/slog"
2829
"math/rand"
@@ -38,9 +39,11 @@ type FunctionManager struct {
3839
}
3940

4041
type managerOptions struct {
41-
tubeFactoryMap map[string]contube.TubeFactory
42-
runtimeFactoryMap map[string]api.FunctionRuntimeFactory
43-
instanceFactory api.FunctionInstanceFactory
42+
tubeFactoryMap map[string]contube.TubeFactory
43+
runtimeFactoryMap map[string]api.FunctionRuntimeFactory
44+
instanceFactory api.FunctionInstanceFactory
45+
stateStore api.StateStore
46+
dontUseDefaultStateStore bool
4447
}
4548

4649
type ManagerOption interface {
@@ -82,6 +85,14 @@ func WithInstanceFactory(factory api.FunctionInstanceFactory) ManagerOption {
8285
})
8386
}
8487

88+
func WithStateStore(store api.StateStore) ManagerOption {
89+
return managerOptionFunc(func(c *managerOptions) (*managerOptions, error) {
90+
c.dontUseDefaultStateStore = true
91+
c.stateStore = store
92+
return c, nil
93+
})
94+
}
95+
8596
func NewFunctionManager(opts ...ManagerOption) (*FunctionManager, error) {
8697
options := &managerOptions{
8798
tubeFactoryMap: make(map[string]contube.TubeFactory),
@@ -96,6 +107,13 @@ func NewFunctionManager(opts ...ManagerOption) (*FunctionManager, error) {
96107
return nil, err
97108
}
98109
}
110+
if !options.dontUseDefaultStateStore {
111+
var err error
112+
options.stateStore, err = statestore.NewTmpPebbleStateStore()
113+
if err != nil {
114+
return nil, errors.Wrap(err, "failed to create default state store")
115+
}
116+
}
99117
log := slog.With()
100118
loadedRuntimeFact := make([]string, 0, len(options.runtimeFactoryMap))
101119
for k := range options.runtimeFactoryMap {
@@ -145,6 +163,10 @@ func (fm *FunctionManager) getRuntimeFactory(t string) (api.FunctionRuntimeFacto
145163
return factory, nil
146164
}
147165

166+
func (fm *FunctionManager) createFuncCtx(f *model.Function) api.FunctionContext {
167+
return NewFuncCtxImpl(fm.options.stateStore)
168+
}
169+
148170
func (fm *FunctionManager) StartFunction(f *model.Function) error {
149171
fm.functionsLock.Lock()
150172
if _, exist := fm.functions[f.Name]; exist {
@@ -156,6 +178,7 @@ func (fm *FunctionManager) StartFunction(f *model.Function) error {
156178
if f.Replicas <= 0 {
157179
return errors.New("replicas should be greater than 0")
158180
}
181+
funcCtx := fm.createFuncCtx(f)
159182
for i := int32(0); i < f.Replicas; i++ {
160183
sourceFactory, err := fm.getTubeFactory(f.Source)
161184
if err != nil {
@@ -167,7 +190,7 @@ func (fm *FunctionManager) StartFunction(f *model.Function) error {
167190
}
168191
runtimeType := fm.getRuntimeType(f.Runtime)
169192

170-
instance := fm.options.instanceFactory.NewFunctionInstance(f, sourceFactory, sinkFactory, i, slog.With(
193+
instance := fm.options.instanceFactory.NewFunctionInstance(f, funcCtx, sourceFactory, sinkFactory, i, slog.With(
171194
slog.String("name", f.Name),
172195
slog.Int("index", int(i)),
173196
slog.String("runtime", runtimeType),
@@ -181,7 +204,7 @@ func (fm *FunctionManager) StartFunction(f *model.Function) error {
181204
}
182205
go instance.Run(runtimeFactory)
183206
select {
184-
case <-instance.WaitForReady():
207+
case err := <-instance.WaitForReady():
185208
if err != nil {
186209
fm.log.ErrorContext(instance.Context(), "Error starting function instance", slog.Any("error", err.Error()))
187210
instance.Stop()
@@ -241,3 +264,24 @@ func (fm *FunctionManager) ConsumeEvent(name string) (contube.Record, error) {
241264
}
242265
return <-c, nil
243266
}
267+
268+
// GetStateStore returns the state store used by the function manager
269+
// Return nil if no state store is configured
270+
func (fm *FunctionManager) GetStateStore() api.StateStore {
271+
return fm.options.stateStore
272+
}
273+
274+
func (fm *FunctionManager) Close() error {
275+
fm.functionsLock.Lock()
276+
defer fm.functionsLock.Unlock()
277+
for _, instances := range fm.functions {
278+
for _, instance := range instances {
279+
instance.Stop()
280+
}
281+
}
282+
err := fm.options.stateStore.Close()
283+
if err != nil {
284+
return err
285+
}
286+
return nil
287+
}

0 commit comments

Comments
 (0)