Skip to content

Commit 64f7e7f

Browse files
authored
NOISSUE - Refactor single algorithm processing (#117)
* Refactor single algorithm processing Simplified the agent service's algorithm handling logic to process a single algorithm instead of multiple. This change: - Removed the `Algorithms` type and associated stringer implementation. - Updated the state machine and service logic to expect a singular algorithm, aligning the agent's internal state transitions with the new model. - Adjusted the manager service and computations test server to mirror these changes in their respective payload structures, ensuring API and test consistency. - Altered README files to reflect the simplified interaction model and removed outdated descriptions. - Reverted the protoc-gen-go version used for generating protobuf files to maintain compatibility with the rest of the codebase. The single-algorithm approach streamlines the computation running process, reducing complexity and potential error conditions. It directly impacts how external services will construct and send computation requests. Signed-off-by: SammyOina <sammyoina@gmail.com> * Update protoc-gen-go version to v1.33.0 Signed-off-by: SammyOina <sammyoina@gmail.com> * Refactor variable name in computations.go and grpc.go Signed-off-by: SammyOina <sammyoina@gmail.com> --------- Signed-off-by: SammyOina <sammyoina@gmail.com>
1 parent 3a14896 commit 64f7e7f

File tree

13 files changed

+88
-111
lines changed

13 files changed

+88
-111
lines changed

agent/README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ The service is configured using the environment variables from the following tab
1515
| AGENT_GRPC_SERVER_KEY | Path to gRPC server key in pem format | "" |
1616
| AGENT_GRPC_SERVER_CA_CERTS | Path to gRPC server CA certificate | "" |
1717
| AGENT_GRPC_CLIENT_CA_CERTS | Path to gRPC client CA certificate | "" |
18-
| COCOS_NOTIFICATION_SERVER_URL | Server to receive notification events from agent. | http:/localhost:9000 |
1918

2019

2120
## Deployment

agent/computations.go

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,7 @@ import (
88
"reflect"
99
)
1010

11-
var (
12-
_ fmt.Stringer = (*Datasets)(nil)
13-
_ fmt.Stringer = (*Algorithms)(nil)
14-
)
11+
var _ fmt.Stringer = (*Datasets)(nil)
1512

1613
type AgentConfig struct {
1714
LogLevel string `json:"log_level"`
@@ -29,7 +26,7 @@ type Computation struct {
2926
Name string `json:"name,omitempty"`
3027
Description string `json:"description,omitempty"`
3128
Datasets Datasets `json:"datasets,omitempty"`
32-
Algorithms Algorithms `json:"algorithms,omitempty"`
29+
Algorithm Algorithm `json:"algorithms,omitempty"`
3330
ResultConsumers []string `json:"result_consumers,omitempty"`
3431
AgentConfig AgentConfig `json:"agent_config,omitempty"`
3532
}
@@ -42,14 +39,6 @@ func (d *Datasets) String() string {
4239
return string(dat)
4340
}
4441

45-
func (a *Algorithms) String() string {
46-
dat, err := json.Marshal(a)
47-
if err != nil {
48-
return ""
49-
}
50-
return string(dat)
51-
}
52-
5342
type Dataset struct {
5443
Dataset []byte `json:"-"`
5544
Hash [32]byte `json:"hash,omitempty"`
@@ -66,8 +55,6 @@ type Algorithm struct {
6655
ID string `json:"id,omitempty"`
6756
}
6857

69-
type Algorithms []Algorithm
70-
7158
func containsID(slice interface{}, id string) int {
7259
rangeOnMe := reflect.ValueOf(slice)
7360
for i := 0; i < rangeOnMe.Len(); i++ {

agent/service.go

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ type Service interface {
5959

6060
type agentService struct {
6161
computation Computation // Holds the current computation request details.
62-
algorithms [][]byte // Stores the algorithms received for the computation.
62+
algorithm []byte // Stores the algorithm received for the computation.
6363
datasets [][]byte // Stores the datasets received for the computation.
6464
result []byte // Stores the result of the computation.
6565
sm *StateMachine // Manages the state transitions of the agent service.
@@ -84,46 +84,45 @@ func New(ctx context.Context, logger *slog.Logger, eventSvc events.Service, cmp
8484
go svc.sm.Start(ctx)
8585
svc.sm.SendEvent(start)
8686
svc.sm.StateFunctions[idle] = svc.publishEvent("in-progress", json.RawMessage{})
87-
svc.sm.StateFunctions[receivingManifests] = svc.publishEvent("in-progress", json.RawMessage{})
88-
svc.sm.StateFunctions[receivingAlgorithms] = svc.publishEvent("in-progress", json.RawMessage{})
87+
svc.sm.StateFunctions[receivingManifest] = svc.publishEvent("in-progress", json.RawMessage{})
88+
svc.sm.StateFunctions[receivingAlgorithm] = svc.publishEvent("in-progress", json.RawMessage{})
8989
svc.sm.StateFunctions[receivingData] = svc.publishEvent("in-progress", json.RawMessage{})
9090
svc.sm.StateFunctions[resultsReady] = svc.publishEvent("in-progress", json.RawMessage{})
9191
svc.sm.StateFunctions[complete] = svc.publishEvent("in-progress", json.RawMessage{})
9292
svc.sm.StateFunctions[running] = svc.runComputation
9393

9494
svc.computation = cmp
95-
svc.sm.SendEvent(manifestsReceived)
95+
svc.sm.SendEvent(manifestReceived)
9696
return svc
9797
}
9898

9999
func (as *agentService) Algo(ctx context.Context, algorithm Algorithm) error {
100-
if as.sm.GetState() != receivingAlgorithms {
100+
if as.sm.GetState() != receivingAlgorithm {
101101
return errStateNotReady
102102
}
103-
if len(as.computation.Algorithms) == 0 {
103+
if as.algorithm != nil {
104104
return errAllManifestItemsReceived
105105
}
106106

107107
hash := sha3.Sum256(algorithm.Algorithm)
108108

109-
index := containsID(as.computation.Algorithms, algorithm.ID)
109+
index := containsID(as.computation.Algorithm, algorithm.ID)
110110
switch index {
111111
case -1:
112112
return errUndeclaredAlgorithm
113113
default:
114-
if as.computation.Algorithms[index].Provider != algorithm.Provider {
114+
if as.computation.Algorithm.Provider != algorithm.Provider {
115115
return errProviderMissmatch
116116
}
117-
if hash != as.computation.Algorithms[index].Hash {
117+
if hash != as.computation.Algorithm.Hash {
118118
return errHashMismatch
119119
}
120-
as.computation.Algorithms = slices.Delete(as.computation.Algorithms, index, index+1)
121120
}
122121

123-
as.algorithms = append(as.algorithms, algorithm.Algorithm)
122+
as.algorithm = algorithm.Algorithm
124123

125-
if len(as.computation.Algorithms) == 0 {
126-
as.sm.SendEvent(algorithmsReceived)
124+
if as.algorithm != nil {
125+
as.sm.SendEvent(algorithmReceived)
127126
}
128127

129128
return nil
@@ -202,7 +201,7 @@ func (as *agentService) runComputation() {
202201
as.sm.logger.Debug("computation run started")
203202
defer as.sm.SendEvent(runComplete)
204203
as.publishEvent("in-progress", json.RawMessage{})()
205-
result, err := run(as.algorithms[0], as.datasets[0])
204+
result, err := run(as.algorithm, as.datasets[0])
206205
if err != nil {
207206
as.runError = err
208207
as.publishEvent("failed", json.RawMessage{})()

agent/state.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ type state int
1414

1515
const (
1616
idle state = iota
17-
receivingManifests
18-
receivingAlgorithms
17+
receivingManifest
18+
receivingAlgorithm
1919
receivingData
2020
running
2121
resultsReady
@@ -26,8 +26,8 @@ type event int
2626

2727
const (
2828
start event = iota
29-
manifestsReceived
30-
algorithmsReceived
29+
manifestReceived
30+
algorithmReceived
3131
dataReceived
3232
runComplete
3333
resultsConsumed
@@ -56,13 +56,13 @@ func NewStateMachine(logger *slog.Logger) *StateMachine {
5656
}
5757

5858
sm.Transitions[idle] = make(map[event]state)
59-
sm.Transitions[idle][start] = receivingManifests
59+
sm.Transitions[idle][start] = receivingManifest
6060

61-
sm.Transitions[receivingManifests] = make(map[event]state)
62-
sm.Transitions[receivingManifests][manifestsReceived] = receivingAlgorithms
61+
sm.Transitions[receivingManifest] = make(map[event]state)
62+
sm.Transitions[receivingManifest][manifestReceived] = receivingAlgorithm
6363

64-
sm.Transitions[receivingAlgorithms] = make(map[event]state)
65-
sm.Transitions[receivingAlgorithms][algorithmsReceived] = receivingData
64+
sm.Transitions[receivingAlgorithm] = make(map[event]state)
65+
sm.Transitions[receivingAlgorithm][algorithmReceived] = receivingData
6666

6767
sm.Transitions[receivingData] = make(map[event]state)
6868
sm.Transitions[receivingData][dataReceived] = running

agent/state_string.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

agent/state_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ func TestStateMachineTransitions(t *testing.T) {
1616
event event
1717
expected state
1818
}{
19-
{idle, start, receivingManifests},
20-
{receivingManifests, manifestsReceived, receivingAlgorithms},
21-
{receivingAlgorithms, algorithmsReceived, receivingData},
19+
{idle, start, receivingManifest},
20+
{receivingManifest, manifestReceived, receivingAlgorithm},
21+
{receivingAlgorithm, algorithmReceived, receivingData},
2222
{receivingData, dataReceived, running},
2323
{running, runComplete, resultsReady},
2424
{resultsReady, resultsConsumed, complete},

internal/server/grpc/grpc.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ type serviceRegister func(srv *grpc.Server)
5353

5454
var _ server.Server = (*Server)(nil)
5555

56-
func New(ctx context.Context, cancel context.CancelFunc, name string, config server.Config, registerService serviceRegister, logger *slog.Logger, agent *agent.Service) server.Server {
56+
func New(ctx context.Context, cancel context.CancelFunc, name string, config server.Config, registerService serviceRegister, logger *slog.Logger, agentSvc *agent.Service) server.Server {
5757
listenFullAddress := fmt.Sprintf("%s:%s", config.Host, config.Port)
5858
return &Server{
5959
BaseServer: server.BaseServer{
@@ -65,7 +65,7 @@ func New(ctx context.Context, cancel context.CancelFunc, name string, config ser
6565
Logger: logger,
6666
},
6767
registerService: registerService,
68-
agent: agent,
68+
agent: agentSvc,
6969
}
7070
}
7171

manager/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,10 +205,10 @@ MANAGER_QEMU_KERNEL_HASH=true \
205205

206206
### Verifying VM launch
207207

208-
NB: To verify that the manager successfully launched the VM, you need to open three terminals on the same machine. In one terminal, you need to launch the Manager test server by executing (with the environment variables of choice):
208+
NB: To verify that the manager successfully launched the VM, you need to open three terminals on the same machine. In one terminal, you need to launch the computations server by executing (with the environment variables of choice):
209209

210210
```bash
211-
go run ./test/manager-server/main.go
211+
go run ./test/computations/main.go <dataset path> <algo path>
212212
```
213213

214214
and in the second the manager by executing (with the environment variables of choice):
@@ -217,7 +217,7 @@ and in the second the manager by executing (with the environment variables of ch
217217
go run ./cmd/manager/main.go
218218
```
219219

220-
Ensure that the Manager can connect to the Manager test server by setting the MANAGER_GRPC_PORT with the port value of the Manager test server. The Manager test server is listening on the default value of the MANAGER_GRPC_PORT. In the last one, you can run the verification commands.
220+
Ensure that the Manager can connect to the Manager test server by setting the MANAGER_GRPC_PORT with the port value of the Manager test server. In the last terminal, you can run the verification commands.
221221

222222
To verify that the manager launched the VM successfully, run the following command:
223223

manager/manager.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ message ComputationRunReq {
4747
string name = 2;
4848
string description = 3;
4949
repeated Dataset datasets = 4;
50-
repeated Algorithm algorithms = 5;
50+
Algorithm algorithm = 5;
5151
repeated string result_consumers = 6;
5252
AgentConfig agent_config = 7;
5353
}

manager/service.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,8 @@ func (ms *managerService) Run(ctx context.Context, c *manager.ComputationRunReq)
8484
LogLevel: c.AgentConfig.LogLevel,
8585
},
8686
}
87-
for _, algo := range c.Algorithms {
88-
if len(algo.Hash) != hashLength {
89-
ms.publishEvent("vm-provision", c.Id, "failed", json.RawMessage{})
90-
return "", errInvalidHashLength
91-
}
92-
ac.Algorithms = append(ac.Algorithms, agent.Algorithm{ID: algo.Id, Provider: algo.Provider, Hash: [hashLength]byte(algo.Hash)})
93-
}
87+
ac.Algorithm = agent.Algorithm{ID: c.Algorithm.Id, Provider: c.Algorithm.Provider, Hash: [hashLength]byte(c.Algorithm.Hash)}
88+
9489
for _, data := range c.Datasets {
9590
if len(data.Hash) != hashLength {
9691
ms.publishEvent("vm-provision", c.Id, "failed", json.RawMessage{})

0 commit comments

Comments
 (0)