Skip to content

Commit 1b36788

Browse files
authored
Introduce the concept of instance (#122)
* Use concrete artifacts * Add manifest file
1 parent e910983 commit 1b36788

File tree

6 files changed

+140
-86
lines changed

6 files changed

+140
-86
lines changed

internal/components.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -191,24 +191,24 @@ func (o *OpGeth) Name() string {
191191

192192
var _ ServiceReady = &OpGeth{}
193193

194-
func (o *OpGeth) Ready(service *service) error {
195-
enodeLine, err := service.logs.FindLog("enode://")
194+
func (o *OpGeth) Ready(instance *instance) error {
195+
enodeLine, err := instance.logs.FindLog("enode://")
196196
if err != nil {
197197
return err
198198
}
199199

200200
parts := strings.Split(enodeLine, "enode://")[1]
201201
enodeID := strings.Split(parts, "@")[0]
202202

203-
enode := fmt.Sprintf("enode://%s@127.0.0.1:%d?discport=0", enodeID, service.MustGetPort("rpc").HostPort)
203+
enode := fmt.Sprintf("enode://%s@127.0.0.1:%d?discport=0", enodeID, instance.service.MustGetPort("rpc").HostPort)
204204
o.Enode = enode
205205
return nil
206206
}
207207

208208
var _ ServiceWatchdog = &OpGeth{}
209209

210-
func (o *OpGeth) Watchdog(out io.Writer, service *service, ctx context.Context) error {
211-
gethURL := fmt.Sprintf("http://localhost:%d", service.MustGetPort("http").HostPort)
210+
func (o *OpGeth) Watchdog(out io.Writer, instance *instance, ctx context.Context) error {
211+
gethURL := fmt.Sprintf("http://localhost:%d", instance.service.MustGetPort("http").HostPort)
212212
return watchChainHead(out, gethURL, 2*time.Second)
213213
}
214214

@@ -296,8 +296,8 @@ func (r *RethEL) Name() string {
296296

297297
var _ ServiceWatchdog = &RethEL{}
298298

299-
func (r *RethEL) Watchdog(out io.Writer, service *service, ctx context.Context) error {
300-
rethURL := fmt.Sprintf("http://localhost:%d", service.MustGetPort("http").HostPort)
299+
func (r *RethEL) Watchdog(out io.Writer, instance *instance, ctx context.Context) error {
300+
rethURL := fmt.Sprintf("http://localhost:%d", instance.service.MustGetPort("http").HostPort)
301301
return watchChainHead(out, rethURL, 12*time.Second)
302302
}
303303

@@ -438,8 +438,8 @@ func (m *MevBoostRelay) Name() string {
438438

439439
var _ ServiceWatchdog = &MevBoostRelay{}
440440

441-
func (m *MevBoostRelay) Watchdog(out io.Writer, service *service, ctx context.Context) error {
442-
beaconNodeURL := fmt.Sprintf("http://localhost:%d", service.MustGetPort("http").HostPort)
441+
func (m *MevBoostRelay) Watchdog(out io.Writer, instance *instance, ctx context.Context) error {
442+
beaconNodeURL := fmt.Sprintf("http://localhost:%d", instance.service.MustGetPort("http").HostPort)
443443

444444
watchGroup := newWatchGroup()
445445
watchGroup.watch(func() error {
@@ -567,7 +567,7 @@ func (o *OpReth) ReleaseArtifact() *release {
567567

568568
var _ ServiceWatchdog = &OpReth{}
569569

570-
func (p *OpReth) Watchdog(out io.Writer, service *service, ctx context.Context) error {
571-
rethURL := fmt.Sprintf("http://localhost:%d", service.MustGetPort("http").HostPort)
570+
func (p *OpReth) Watchdog(out io.Writer, instance *instance, ctx context.Context) error {
571+
rethURL := fmt.Sprintf("http://localhost:%d", instance.service.MustGetPort("http").HostPort)
572572
return watchChainHead(out, rethURL, 2*time.Second)
573573
}

internal/local_runner.go

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ type LocalRunner struct {
5757
// signals whether we are running in interactive mode
5858
interactive bool
5959

60+
// TODO: Merge instance with tasks
61+
instances []*instance
62+
6063
// tasks tracks the status of each service
6164
tasksMtx sync.Mutex
6265
tasks map[string]*task
@@ -115,6 +118,49 @@ func NewLocalRunner(out *output, manifest *Manifest, overrides map[string]string
115118
overrides[k] = v
116119
}
117120

121+
// Create the concrete instances to run
122+
instances := []*instance{}
123+
for _, service := range manifest.Services() {
124+
log_output, err := out.LogOutput(service.Name)
125+
if err != nil {
126+
return nil, fmt.Errorf("error getting log output: %w", err)
127+
}
128+
logs := &serviceLogs{
129+
logRef: log_output,
130+
path: log_output.Name(),
131+
}
132+
component := FindComponent(service.componentName)
133+
if component == nil {
134+
return nil, fmt.Errorf("component not found '%s'", service.componentName)
135+
}
136+
instance := &instance{
137+
service: service,
138+
logs: logs,
139+
component: component,
140+
}
141+
instances = append(instances, instance)
142+
}
143+
144+
// download any local release artifacts for the services that require them
145+
// TODO: it feels a bit weird to have all this logic on the new command. We should split it later on.
146+
for _, instance := range instances {
147+
ss := instance.service
148+
if ss.labels[useHostExecutionLabel] == "true" {
149+
// If the service wants to run on the host, it must implement the ReleaseService interface
150+
// which provides functions to download the release artifact.
151+
releaseService, ok := instance.component.(ReleaseService)
152+
if !ok {
153+
return nil, fmt.Errorf("service '%s' must implement the ReleaseService interface", ss.Name)
154+
}
155+
releaseArtifact := releaseService.ReleaseArtifact()
156+
bin, err := DownloadRelease(out.homeDir, releaseArtifact)
157+
if err != nil {
158+
return nil, fmt.Errorf("failed to download release artifact for service '%s': %w", ss.Name, err)
159+
}
160+
overrides[ss.Name] = bin
161+
}
162+
}
163+
118164
// Now, the override can either be one of two things (we are overloading the override map):
119165
// - docker image: In that case, change the manifest and remove from override map
120166
// - a path to an executable: In that case, we need to run it on the host machine
@@ -162,6 +208,7 @@ func NewLocalRunner(out *output, manifest *Manifest, overrides map[string]string
162208
bindHostPortsLocally: bindHostPortsLocally,
163209
sessionID: uuid.New().String(),
164210
networkName: networkName,
211+
instances: instances,
165212
}
166213

167214
if interactive {
@@ -176,6 +223,10 @@ func NewLocalRunner(out *output, manifest *Manifest, overrides map[string]string
176223
return d, nil
177224
}
178225

226+
func (d *LocalRunner) Instances() []*instance {
227+
return d.instances
228+
}
229+
179230
func (d *LocalRunner) printStatus() {
180231
fmt.Print("\033[s")
181232
lineOffset := 0
@@ -921,15 +972,8 @@ func (d *LocalRunner) Run() error {
921972
}
922973

923974
// generate the output log file for each service so that it is available after Run is done
924-
for _, svc := range d.manifest.services {
925-
log_output, err := d.out.LogOutput(svc.Name)
926-
if err != nil {
927-
return fmt.Errorf("error getting log output: %w", err)
928-
}
929-
svc.logs = &serviceLogs{
930-
path: log_output.Name(),
931-
}
932-
d.tasks[svc.Name].logs = log_output
975+
for _, instance := range d.instances {
976+
d.tasks[instance.service.Name].logs = instance.logs.logRef
933977
}
934978

935979
// First start the services that are running in docker-compose

internal/manifest.go

Lines changed: 19 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
package internal
22

33
import (
4-
"context"
54
"fmt"
6-
"io"
75
"os"
86
"path/filepath"
97
"strings"
@@ -81,47 +79,7 @@ type Service interface {
8179
}
8280

8381
type ServiceReady interface {
84-
Ready(ouservice *service) error
85-
}
86-
87-
func (m *Manifest) CompleteReady() error {
88-
for _, s := range m.services {
89-
if readyFn, ok := s.component.(ServiceReady); ok {
90-
if err := readyFn.Ready(s); err != nil {
91-
return err
92-
}
93-
}
94-
}
95-
return nil
96-
}
97-
98-
type ServiceWatchdog interface {
99-
Watchdog(out io.Writer, service *service, ctx context.Context) error
100-
}
101-
102-
func RunWatchdog(manifest *Manifest) error {
103-
watchdogErr := make(chan error, len(manifest.Services()))
104-
105-
output, err := manifest.out.LogOutput("watchdog")
106-
if err != nil {
107-
return fmt.Errorf("failed to create log output: %w", err)
108-
}
109-
110-
for _, s := range manifest.Services() {
111-
if watchdogFn, ok := s.component.(ServiceWatchdog); ok {
112-
go func() {
113-
if err := watchdogFn.Watchdog(output, s, context.Background()); err != nil {
114-
watchdogErr <- fmt.Errorf("service %s watchdog failed: %w", s.Name, err)
115-
}
116-
}()
117-
}
118-
}
119-
120-
// If any of the watchdogs fail, we return the error
121-
if err := <-watchdogErr; err != nil {
122-
return fmt.Errorf("failed to run watchdog: %w", err)
123-
}
124-
return nil
82+
Ready(instance *instance) error
12583
}
12684

12785
func (s *Manifest) Services() []*service {
@@ -135,7 +93,7 @@ type ReleaseService interface {
13593

13694
func (s *Manifest) AddService(name string, srv Service) {
13795
service := s.NewService(name)
138-
service.component = srv
96+
service.componentName = srv.Name()
13997
srv.Run(service, s.ctx)
14098

14199
s.services = append(s.services, service)
@@ -191,21 +149,17 @@ func (s *Manifest) Validate() error {
191149
}
192150
}
193151

194-
// download any local release artifacts for the services that require them
152+
// validate that the mounts are correct
195153
for _, ss := range s.services {
196-
if ss.labels[useHostExecutionLabel] == "true" {
197-
// If the service wants to run on the host, it must implement the ReleaseService interface
198-
// which provides functions to download the release artifact.
199-
releaseService, ok := ss.component.(ReleaseService)
200-
if !ok {
201-
return fmt.Errorf("service '%s' must implement the ReleaseService interface", ss.Name)
202-
}
203-
releaseArtifact := releaseService.ReleaseArtifact()
204-
bin, err := DownloadRelease(s.out.homeDir, releaseArtifact)
205-
if err != nil {
206-
return fmt.Errorf("failed to download release artifact for service '%s': %w", ss.Name, err)
154+
for _, fileNameRef := range ss.filesMapped {
155+
fileLoc := filepath.Join(s.out.dst, fileNameRef)
156+
157+
if _, err := os.Stat(fileLoc); err != nil {
158+
if os.IsNotExist(err) {
159+
return fmt.Errorf("service %s includes an unknown file %s does not exist", ss.Name, fileLoc)
160+
}
161+
return fmt.Errorf("failed to stat file %s: %w", fileLoc, err)
207162
}
208-
s.overrides[ss.Name] = bin
209163
}
210164
}
211165

@@ -256,7 +210,8 @@ type NodeRef struct {
256210

257211
// serviceLogs is a service to access the logs of the running service
258212
type serviceLogs struct {
259-
path string
213+
logRef *os.File
214+
path string
260215
}
261216

262217
func (s *serviceLogs) readLogs() (string, error) {
@@ -301,9 +256,15 @@ type service struct {
301256
filesMapped map[string]string
302257
volumesMapped map[string]string
303258

259+
componentName string
260+
304261
tag string
305262
image string
306263
entrypoint string
264+
}
265+
266+
type instance struct {
267+
service *service
307268

308269
logs *serviceLogs
309270
component Service

internal/recipe_opstack.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,14 @@ func (o *OpRecipe) Apply(ctx *ExContext, artifacts *Artifacts) *Manifest {
9393
}
9494

9595
func (o *OpRecipe) Output(manifest *Manifest) map[string]interface{} {
96-
opGeth := manifest.MustGetService("op-geth").component.(*OpGeth)
97-
if opGeth.Enode != "" {
98-
// Only output if enode was set
99-
return map[string]interface{}{
100-
"op-geth-enode": opGeth.Enode,
96+
/*
97+
opGeth := manifest.MustGetService("op-geth").component.(*OpGeth)
98+
if opGeth.Enode != "" {
99+
// Only output if enode was set
100+
return map[string]interface{}{
101+
"op-geth-enode": opGeth.Enode,
102+
}
101103
}
102-
}
104+
*/
103105
return map[string]interface{}{}
104106
}

internal/watchdog.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
)
8+
9+
type ServiceWatchdog interface {
10+
Watchdog(out io.Writer, instance *instance, ctx context.Context) error
11+
}
12+
13+
func RunWatchdog(out *output, instances []*instance) error {
14+
watchdogErr := make(chan error, len(instances))
15+
16+
output, err := out.LogOutput("watchdog")
17+
if err != nil {
18+
return fmt.Errorf("failed to create log output: %w", err)
19+
}
20+
21+
for _, s := range instances {
22+
if watchdogFn, ok := s.component.(ServiceWatchdog); ok {
23+
go func() {
24+
if err := watchdogFn.Watchdog(output, s, context.Background()); err != nil {
25+
watchdogErr <- fmt.Errorf("service %s watchdog failed: %w", s.service.Name, err)
26+
}
27+
}()
28+
}
29+
}
30+
31+
// If any of the watchdogs fail, we return the error
32+
if err := <-watchdogErr; err != nil {
33+
return fmt.Errorf("failed to run watchdog: %w", err)
34+
}
35+
return nil
36+
}
37+
38+
func CompleteReady(instances []*instance) error {
39+
for _, s := range instances {
40+
if readyFn, ok := s.component.(ServiceReady); ok {
41+
if err := readyFn.Ready(s); err != nil {
42+
return err
43+
}
44+
}
45+
}
46+
return nil
47+
}

main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ func runIt(recipe internal.Recipe) error {
291291
return fmt.Errorf("failed to wait for service readiness: %w", err)
292292
}
293293

294-
if err := svcManager.CompleteReady(); err != nil {
294+
if err := internal.CompleteReady(dockerRunner.Instances()); err != nil {
295295
dockerRunner.Stop()
296296
return fmt.Errorf("failed to complete ready: %w", err)
297297
}
@@ -308,7 +308,7 @@ func runIt(recipe internal.Recipe) error {
308308
watchdogErr := make(chan error, 1)
309309
if watchdog {
310310
go func() {
311-
if err := internal.RunWatchdog(svcManager); err != nil {
311+
if err := internal.RunWatchdog(artifacts.Out, dockerRunner.Instances()); err != nil {
312312
watchdogErr <- fmt.Errorf("watchdog failed: %w", err)
313313
}
314314
}()

0 commit comments

Comments
 (0)