Skip to content

Commit daa9fa6

Browse files
authored
feat: add data schema support (#182)
1 parent 14d245a commit daa9fa6

File tree

24 files changed

+392
-109
lines changed

24 files changed

+392
-109
lines changed

.github/workflows/bench.yaml

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,6 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
# Licensed under the Apache License, Version 2.0 (the "License");
16-
# you may not use this file except in compliance with the License.
17-
# You may obtain a copy of the License at
18-
#
19-
# http://www.apache.org/licenses/LICENSE-2.0
20-
#
21-
# Unless required by applicable law or agreed to in writing, software
22-
# distributed under the License is distributed on an "AS IS" BASIS,
23-
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
24-
# See the License for the specific language governing permissions and
25-
# limitations under the License.
2615

2716
name: Benchmark
2817
on:

.github/workflows/ci.yaml

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,6 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
# Licensed under the Apache License, Version 2.0 (the "License");
16-
# you may not use this file except in compliance with the License.
17-
# You may obtain a copy of the License at
18-
#
19-
# http://www.apache.org/licenses/LICENSE-2.0
20-
#
21-
# Unless required by applicable law or agreed to in writing, software
22-
# distributed under the License is distributed on an "AS IS" BASIS,
23-
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
24-
# See the License for the specific language governing permissions and
25-
# limitations under the License.
2615

2716
name: CI
2817
on:

.github/workflows/license.yaml

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,6 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
# Licensed under the Apache License, Version 2.0 (the "License");
16-
# you may not use this file except in compliance with the License.
17-
# You may obtain a copy of the License at
18-
#
19-
# http://www.apache.org/licenses/LICENSE-2.0
20-
#
21-
# Unless required by applicable law or agreed to in writing, software
22-
# distributed under the License is distributed on an "AS IS" BASIS,
23-
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
24-
# See the License for the specific language governing permissions and
25-
# limitations under the License.
2615

2716
name: License Check
2817
on:

.golangci.yml

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Copyright 2024 Function Stream Org.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
run:
16+
deadline: 5m
17+
allow-parallel-runners: true
18+
19+
issues:
20+
# don't skip warning about doc comments
21+
# don't exclude the default set of lint
22+
exclude-use-default: false
23+
# restore some of the defaults
24+
# (fill in the rest as needed)
25+
exclude-rules:
26+
- path: "api/*"
27+
linters:
28+
- lll
29+
- path: "internal/*"
30+
linters:
31+
- dupl
32+
- lll
33+
linters:
34+
disable-all: true
35+
enable:
36+
- dupl
37+
- errcheck
38+
- exportloopref
39+
- goconst
40+
- gocyclo
41+
- gofmt
42+
- goimports
43+
- gosimple
44+
- govet
45+
- ineffassign
46+
- lll
47+
- misspell
48+
- nakedret
49+
- prealloc
50+
- staticcheck
51+
- typecheck
52+
- unconvert
53+
- unparam
54+
- unused

benchmark/bench_test.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,16 @@ package benchmark
1818

1919
import (
2020
"context"
21-
"github.com/functionstream/function-stream/fs/api"
22-
"github.com/functionstream/function-stream/fs/runtime/wazero"
2321
"math/rand"
2422
"os"
2523
"runtime/pprof"
2624
"strconv"
2725
"testing"
2826
"time"
2927

28+
"github.com/functionstream/function-stream/fs/api"
29+
"github.com/functionstream/function-stream/fs/runtime/wazero"
30+
3031
"github.com/apache/pulsar-client-go/pulsaradmin"
3132
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
3233
adminclient "github.com/functionstream/function-stream/admin/client"
@@ -115,12 +116,14 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) {
115116
memoryQueueFactory := contube.NewMemoryQueueFactory(context.Background())
116117

117118
s, err := server.NewServer(
118-
server.WithRuntimeFactoryBuilder(common.WASMRuntime, func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) {
119-
return wazero.NewWazeroFunctionRuntimeFactory(), nil
120-
}),
121-
server.WithTubeFactoryBuilder(common.MemoryTubeType, func(configMap common.ConfigMap) (contube.TubeFactory, error) {
122-
return memoryQueueFactory, nil
123-
}),
119+
server.WithRuntimeFactoryBuilder(common.WASMRuntime,
120+
func(configMap common.ConfigMap) (api.FunctionRuntimeFactory, error) {
121+
return wazero.NewWazeroFunctionRuntimeFactory(), nil
122+
}),
123+
server.WithTubeFactoryBuilder(common.MemoryTubeType,
124+
func(configMap common.ConfigMap) (contube.TubeFactory, error) {
125+
return memoryQueueFactory, nil
126+
}),
124127
)
125128
if err != nil {
126129
b.Fatal(err)
@@ -134,7 +137,7 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) {
134137
inputTopic := "test-input-" + strconv.Itoa(rand.Int())
135138
outputTopic := "test-output-" + strconv.Itoa(rand.Int())
136139

137-
replicas := int32(1)
140+
replicas := int32(5)
138141

139142
pConfig := &perf.Config{
140143
RequestRate: 200000.0,

clients/gofs/gofs.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2024 Function Stream Org.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package gofs
18+
19+
import "C"
20+
import (
21+
"encoding/json"
22+
"fmt"
23+
"io"
24+
"os"
25+
26+
. "github.com/functionstream/function-stream/common/wasm_utils"
27+
"github.com/wirelessr/avroschema"
28+
)
29+
30+
var processFile *os.File
31+
32+
func init() {
33+
processFile, _ = os.Open("/process")
34+
}
35+
36+
var processFunc func([]byte) []byte
37+
38+
//go:wasmimport fs registerSchema
39+
func registerSchema(inputSchemaPtrSize, outputSchemaPtrSize uint64)
40+
41+
func Register[I any, O any](process func(*I) *O) error {
42+
inputSchema, err := avroschema.Reflect(new(I))
43+
if err != nil {
44+
return err
45+
}
46+
outputSchema, err := avroschema.Reflect(new(O))
47+
if err != nil {
48+
return err
49+
}
50+
processFunc = func(payload []byte) []byte {
51+
input := new(I)
52+
err = json.Unmarshal(payload, input)
53+
if err != nil {
54+
fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s", err, payload)
55+
}
56+
output := process(input)
57+
outputPayload, _ := json.Marshal(output)
58+
return outputPayload
59+
}
60+
registerSchema(PtrSize(StringToPtr(inputSchema)), PtrSize(StringToPtr(outputSchema)))
61+
return nil
62+
}
63+
64+
//export process
65+
func process() {
66+
payload, _ := io.ReadAll(processFile)
67+
outputPayload := processFunc(payload)
68+
_, _ = processFile.Write(outputPayload)
69+
}

cmd/client/reload/cmd.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ package reload
1919
import (
2020
"context"
2121
"fmt"
22+
"os"
23+
2224
adminclient "github.com/functionstream/function-stream/admin/client"
2325
"github.com/functionstream/function-stream/cmd/client/common"
2426
"github.com/spf13/cobra"
25-
"os"
2627
)
2728

2829
var Cmd = &cobra.Command{

common/wasm_utils/wasm_utils.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2024 Function Stream Org.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package wasm_utils
18+
19+
import "unsafe"
20+
21+
// StringToPtr returns a pointer and size pair for the given string in a way
22+
// compatible with WebAssembly numeric types.
23+
// The returned pointer aliases the string hence the string must be kept alive
24+
// until ptr is no longer needed.
25+
func StringToPtr(s string) (uint32, uint32) {
26+
ptr := unsafe.Pointer(unsafe.StringData(s))
27+
return uint32(uintptr(ptr)), uint32(len(s))
28+
}
29+
30+
//// StringToLeakedPtr returns a pointer and size pair for the given string in a way
31+
//// compatible with WebAssembly numeric types.
32+
//// The pointer is not automatically managed by TinyGo hence it must be freed by the host.
33+
//func StringToLeakedPtr(s string) (uint32, uint32) {
34+
// size := C.ulong(len(s))
35+
// ptr := unsafe.Pointer(C.malloc(size))
36+
// copy(unsafe.Slice((*byte)(ptr), size), s)
37+
// return uint32(uintptr(ptr)), uint32(size)
38+
//}
39+
40+
func PtrSize(ptr, size uint32) uint64 {
41+
return (uint64(ptr) << 32) | uint64(size)
42+
}
43+
44+
func ExtractPtrSize(ptrSize uint64) (uint32, uint32) {
45+
return uint32(ptrSize >> 32), uint32(ptrSize)
46+
}

examples/basic/main.go

Lines changed: 10 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,48 +17,26 @@
1717
package main
1818

1919
import (
20-
"encoding/json"
2120
"fmt"
22-
"io"
21+
"github.com/functionstream/function-stream/clients/gofs"
2322
"os"
2423
)
2524

25+
func main() {
26+
_, _ = fmt.Fprintln(os.Stderr, "Hello from Go!")
27+
}
28+
2629
type Person struct {
2730
Name string `json:"name"`
2831
Money int `json:"money"`
2932
Expected int `json:"expected"`
3033
}
3134

32-
func main() {
33-
_, _ = fmt.Fprintln(os.Stderr, "Hello from Go!")
35+
func init() {
36+
_ = gofs.Register(myProcess)
3437
}
3538

36-
//export process
37-
func process() {
38-
dataBytes, err := io.ReadAll(os.Stdin)
39-
if err != nil {
40-
_, _ = fmt.Fprintln(os.Stderr, "Failed to read data:", err)
41-
os.Exit(1)
42-
}
43-
44-
var person Person
45-
err = json.Unmarshal(dataBytes, &person)
46-
if err != nil {
47-
_, _ = fmt.Fprintln(os.Stderr, "Failed to parse JSON:", err)
48-
os.Exit(1)
49-
}
50-
51-
person.Money++
52-
53-
jsonBytes, err := json.Marshal(person)
54-
if err != nil {
55-
_, _ = fmt.Fprintln(os.Stderr, "Failed to encode JSON:", err)
56-
os.Exit(1)
57-
}
58-
59-
_, err = fmt.Printf("%s", jsonBytes)
60-
if err != nil {
61-
_, _ = fmt.Fprintln(os.Stderr, "Failed to print JSON:", err)
62-
os.Exit(1)
63-
}
39+
func myProcess(person *Person) *Person {
40+
person.Money += 1
41+
return person
6442
}

fs/contube/contube.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package contube
1919
import (
2020
"context"
2121
"encoding/json"
22+
2223
"github.com/go-playground/validator/v10"
2324
"github.com/pkg/errors"
2425
)
@@ -31,6 +32,7 @@ var (
3132

3233
type Record interface {
3334
GetPayload() []byte
35+
GetSchema() string
3436
Commit()
3537
}
3638

@@ -110,6 +112,7 @@ type TubeFactory interface {
110112

111113
type RecordImpl struct {
112114
payload []byte
115+
schema string
113116
commitFunc func()
114117
}
115118

@@ -120,10 +123,22 @@ func NewRecordImpl(payload []byte, ackFunc func()) *RecordImpl {
120123
}
121124
}
122125

126+
func NewSchemaRecordImpl(payload []byte, schema string, ackFunc func()) *RecordImpl {
127+
return &RecordImpl{
128+
payload: payload,
129+
schema: schema,
130+
commitFunc: ackFunc,
131+
}
132+
}
133+
123134
func (e *RecordImpl) GetPayload() []byte {
124135
return e.payload
125136
}
126137

138+
func (e *RecordImpl) GetSchema() string {
139+
return e.schema
140+
}
141+
127142
func (e *RecordImpl) Commit() {
128143
if e.commitFunc != nil {
129144
e.commitFunc()

0 commit comments

Comments
 (0)