Skip to content

Commit 0d0d444

Browse files
authored
Add component outputs (#64)
1 parent 3741238 commit 0d0d444

File tree

8 files changed

+154
-24
lines changed

8 files changed

+154
-24
lines changed

internal/artifacts.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737

3838
var (
3939
defaultDiscoveryPrivKey = "a11ac89899cd86e36b6fb881ec1255b8a92a688790b7d950f8b7d8dd626671fb"
40+
defaultDiscoveryEnodeID = "3479db4d9217fb5d7a8ed4d61ac36e120b05d36c2eefb795dc42ff2e971f251a2315f5649ea1833271e020b9adc98d5db9973c7ed92d6b2f1f2223088c3d852f"
4041
)
4142

4243
// minimumGenesisDelay is the minimum delay for the genesis time. This is required

internal/components.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"io"
7+
"strings"
78
"time"
89
)
910

@@ -91,6 +92,9 @@ func (o *OpNode) Run(service *service, ctx *ExContext) {
9192

9293
type OpGeth struct {
9394
UseDeterministicP2PKey bool
95+
96+
// outputs
97+
Enode string
9498
}
9599

96100
func logLevelToGethVerbosity(logLevel LogLevel) string {
@@ -155,6 +159,28 @@ func (o *OpGeth) Run(service *service, ctx *ExContext) {
155159
)
156160
}
157161

162+
var _ ServiceReady = &OpGeth{}
163+
164+
func (o *OpGeth) Ready(out io.Writer, service *service, ctx context.Context) error {
165+
logs := service.logs
166+
167+
if err := logs.WaitForLog("HTTP server started", 5*time.Second); err != nil {
168+
return err
169+
}
170+
171+
enodeLine, err := logs.FindLog("enode://")
172+
if err != nil {
173+
return err
174+
}
175+
176+
parts := strings.Split(enodeLine, "enode://")[1]
177+
enodeID := strings.Split(parts, "@")[0]
178+
179+
enode := fmt.Sprintf("enode://%s@127.0.0.1:%d?discport=0", enodeID, service.MustGetPort("rpc").HostPort)
180+
o.Enode = enode
181+
return nil
182+
}
183+
158184
var _ ServiceWatchdog = &OpGeth{}
159185

160186
func (o *OpGeth) Watchdog(out io.Writer, service *service, ctx context.Context) error {
@@ -293,7 +319,7 @@ var _ ServiceReady = &LighthouseBeaconNode{}
293319
func (l *LighthouseBeaconNode) Ready(logOutput io.Writer, service *service, ctx context.Context) error {
294320
beaconNodeURL := fmt.Sprintf("http://localhost:%d", service.MustGetPort("http").HostPort)
295321

296-
if err := waitForChainAlive(logOutput, beaconNodeURL, 30*time.Second); err != nil {
322+
if err := waitForChainAlive(ctx, logOutput, beaconNodeURL, 30*time.Second); err != nil {
297323
return err
298324
}
299325
return nil

internal/local_runner.go

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,15 @@ type LocalRunner struct {
5656

5757
// tasks tracks the status of each service
5858
tasksMtx sync.Mutex
59-
tasks map[string]string
59+
tasks map[string]*task
6060
taskUpdateCh chan struct{}
6161
}
6262

63+
type task struct {
64+
status string
65+
logs *os.File
66+
}
67+
6368
var (
6469
taskStatusPending = "pending"
6570
taskStatusStarted = "started"
@@ -86,9 +91,12 @@ func NewLocalRunner(out *output, manifest *Manifest, overrides map[string]string
8691
overrides[k] = v
8792
}
8893

89-
tasks := map[string]string{}
94+
tasks := map[string]*task{}
9095
for _, svc := range manifest.services {
91-
tasks[svc.Name] = taskStatusPending
96+
tasks[svc.Name] = &task{
97+
status: taskStatusPending,
98+
logs: nil,
99+
}
92100
}
93101

94102
d := &LocalRunner{
@@ -153,7 +161,7 @@ func (d *LocalRunner) printStatus() {
153161
lineOffset = 0
154162
// Use ordered services instead of ranging over map
155163
for _, name := range orderedServices {
156-
status := d.tasks[name]
164+
status := d.tasks[name].status
157165
var statusLine string
158166

159167
switch status {
@@ -183,7 +191,7 @@ func (d *LocalRunner) printStatus() {
183191
func (d *LocalRunner) updateTaskStatus(name string, status string) {
184192
d.tasksMtx.Lock()
185193
defer d.tasksMtx.Unlock()
186-
d.tasks[name] = status
194+
d.tasks[name].status = status
187195

188196
if status == taskStatusDie {
189197
d.exitErr <- fmt.Errorf("container %s failed", name)
@@ -482,9 +490,12 @@ func (d *LocalRunner) runOnHost(ss *service) error {
482490

483491
// trackLogs tracks the logs of a container and writes them to the log output
484492
func (d *LocalRunner) trackLogs(serviceName string, containerID string) error {
485-
log_output, err := d.out.LogOutput(serviceName)
486-
if err != nil {
487-
return fmt.Errorf("error getting log output: %w", err)
493+
d.tasksMtx.Lock()
494+
log_output := d.tasks[serviceName].logs
495+
d.tasksMtx.Unlock()
496+
497+
if log_output == nil {
498+
panic("BUG: log output not found for service " + serviceName)
488499
}
489500

490501
logs, err := d.client.ContainerLogs(context.Background(), containerID, container.LogsOptions{
@@ -546,6 +557,18 @@ func (d *LocalRunner) Run() error {
546557
return fmt.Errorf("failed to write docker-compose.yaml: %w", err)
547558
}
548559

560+
// generate the output log file for each service so that it is available after Run is done
561+
for _, svc := range d.manifest.services {
562+
log_output, err := d.out.LogOutput(svc.Name)
563+
if err != nil {
564+
return fmt.Errorf("error getting log output: %w", err)
565+
}
566+
svc.logs = &serviceLogs{
567+
path: log_output.Name(),
568+
}
569+
d.tasks[svc.Name].logs = log_output
570+
}
571+
549572
// First start the services that are running in docker-compose
550573
cmd := exec.Command("docker", "compose", "-f", d.out.dst+"/docker-compose.yaml", "up", "-d")
551574

internal/manifest.go

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import (
44
"context"
55
"fmt"
66
"io"
7+
"os"
78
"strings"
89
"sync"
910
"text/template"
11+
"time"
1012

1113
flag "github.com/spf13/pflag"
1214
)
@@ -19,6 +21,7 @@ type Recipe interface {
1921
Flags() *flag.FlagSet
2022
Artifacts() *ArtifactsBuilder
2123
Apply(ctx *ExContext, artifacts *Artifacts) *Manifest
24+
Output(manifest *Manifest) map[string]interface{}
2225
}
2326

2427
// Manifest describes a list of services and their dependencies
@@ -80,7 +83,7 @@ type ServiceReady interface {
8083
Ready(out io.Writer, service *service, ctx context.Context) error
8184
}
8285

83-
func WaitForReady(manifest *Manifest) error {
86+
func WaitForReady(ctx context.Context, manifest *Manifest) error {
8487
var wg sync.WaitGroup
8588
readyErr := make(chan error, len(manifest.Services()))
8689

@@ -96,7 +99,7 @@ func WaitForReady(manifest *Manifest) error {
9699
go func() {
97100
defer wg.Done()
98101

99-
if err := readyFn.Ready(output, s, context.Background()); err != nil {
102+
if err := readyFn.Ready(output, s, ctx); err != nil {
100103
readyErr <- fmt.Errorf("service %s failed to start: %w", s.Name, err)
101104
}
102105
}()
@@ -240,6 +243,52 @@ type NodeRef struct {
240243
PortLabel string
241244
}
242245

246+
// serviceLogs is a service to access the logs of the running service
247+
type serviceLogs struct {
248+
path string
249+
}
250+
251+
func (s *serviceLogs) readLogs() (string, error) {
252+
content, err := os.ReadFile(s.path)
253+
if err != nil {
254+
return "", fmt.Errorf("failed to read logs: %w", err)
255+
}
256+
return string(content), nil
257+
}
258+
259+
func (s *serviceLogs) WaitForLog(pattern string, timeout time.Duration) error {
260+
timer := time.After(timeout)
261+
for {
262+
select {
263+
case <-timer:
264+
return fmt.Errorf("timeout waiting for log pattern %s", pattern)
265+
case <-time.After(500 * time.Millisecond):
266+
logs, err := s.readLogs()
267+
if err != nil {
268+
return fmt.Errorf("failed to read logs: %w", err)
269+
}
270+
if strings.Contains(logs, pattern) {
271+
return nil
272+
}
273+
}
274+
}
275+
}
276+
277+
func (s *serviceLogs) FindLog(pattern string) (string, error) {
278+
logs, err := s.readLogs()
279+
if err != nil {
280+
return "", fmt.Errorf("failed to read logs: %w", err)
281+
}
282+
283+
lines := strings.Split(logs, "\n")
284+
for _, line := range lines {
285+
if strings.Contains(line, pattern) {
286+
return line, nil
287+
}
288+
}
289+
return "", fmt.Errorf("log pattern %s not found", pattern)
290+
}
291+
243292
type service struct {
244293
Name string
245294
args []string
@@ -253,6 +302,7 @@ type service struct {
253302
image string
254303
entrypoint string
255304

305+
logs *serviceLogs
256306
component Service
257307
}
258308

internal/recipe_l1.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,7 @@ func (l *L1Recipe) Apply(ctx *ExContext, artifacts *Artifacts) *Manifest {
8888
})
8989
return svcManager
9090
}
91+
92+
func (l *L1Recipe) Output(manifest *Manifest) map[string]interface{} {
93+
return map[string]interface{}{}
94+
}

internal/recipe_opstack.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,14 @@ func (o *OpRecipe) Apply(ctx *ExContext, artifacts *Artifacts) *Manifest {
6666
})
6767
return svcManager
6868
}
69+
70+
func (o *OpRecipe) Output(manifest *Manifest) map[string]interface{} {
71+
opGeth := manifest.MustGetService("op-geth").component.(*OpGeth)
72+
if opGeth.Enode != "" {
73+
// Only output if enode was set
74+
return map[string]interface{}{
75+
"op-geth-enode": opGeth.Enode,
76+
}
77+
}
78+
return map[string]interface{}{}
79+
}

internal/watchers.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
mevRCommon "github.com/flashbots/mev-boost-relay/common"
1515
)
1616

17-
func waitForChainAlive(logOutput io.Writer, beaconNodeURL string, timeout time.Duration) error {
17+
func waitForChainAlive(ctx context.Context, logOutput io.Writer, beaconNodeURL string, timeout time.Duration) error {
1818
// Test that blocks are being produced
1919
log := mevRCommon.LogSetup(false, "info").WithField("context", "waitForChainAlive")
2020
log.Logger.Out = logOutput
@@ -51,6 +51,8 @@ func waitForChainAlive(logOutput io.Writer, beaconNodeURL string, timeout time.D
5151
select {
5252
case <-syncTimeoutCh:
5353
return fmt.Errorf("beacon client failed to start")
54+
case <-ctx.Done():
55+
return fmt.Errorf("timeout waiting for chain to start")
5456
default:
5557
time.Sleep(1 * time.Second)
5658
}

main.go

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"context"
45
_ "embed"
56
"fmt"
67
"log"
@@ -120,14 +121,20 @@ func runIt(recipe internal.Recipe) error {
120121
sig := make(chan os.Signal, 1)
121122
signal.Notify(sig, os.Interrupt)
122123

124+
ctx, cancel := context.WithCancel(context.Background())
125+
go func() {
126+
<-sig
127+
cancel()
128+
}()
129+
123130
if err := dockerRunner.Run(); err != nil {
124131
dockerRunner.Stop()
125132
return fmt.Errorf("failed to run docker: %w", err)
126133
}
127134

128135
if !interactive {
129136
// print services info
130-
fmt.Printf("Services started:\n==================\n")
137+
fmt.Printf("\n========= Services started =========\n")
131138
for _, ss := range svcManager.Services() {
132139
ports := ss.Ports()
133140
sort.Slice(ports, func(i, j int) bool {
@@ -142,33 +149,39 @@ func runIt(recipe internal.Recipe) error {
142149
}
143150
}
144151

145-
watchdogErr := make(chan error, 1)
146-
readyErr := make(chan error, 1)
152+
if err := internal.WaitForReady(ctx, svcManager); err != nil {
153+
dockerRunner.Stop()
154+
return fmt.Errorf("failed to wait for service readiness: %w", err)
155+
}
147156

148-
go func() {
149-
if err := internal.WaitForReady(svcManager); err != nil {
150-
readyErr <- fmt.Errorf("failed to wait for service readiness: %w", err)
157+
// get the output from the recipe
158+
output := recipe.Output(svcManager)
159+
if len(output) > 0 {
160+
fmt.Printf("\n========= Output =========\n")
161+
for k, v := range output {
162+
fmt.Printf("- %s: %v\n", k, v)
151163
}
164+
}
152165

153-
if watchdog {
166+
watchdogErr := make(chan error, 1)
167+
if watchdog {
168+
go func() {
154169
if err := internal.RunWatchdog(svcManager); err != nil {
155170
watchdogErr <- fmt.Errorf("watchdog failed: %w", err)
156171
}
157-
}
158-
}()
172+
}()
173+
}
159174

160175
var timerCh <-chan time.Time
161176
if timeout > 0 {
162177
timerCh = time.After(timeout)
163178
}
164179

165180
select {
166-
case <-sig:
181+
case <-ctx.Done():
167182
fmt.Println("Stopping...")
168183
case err := <-dockerRunner.ExitErr():
169184
fmt.Println("Service failed:", err)
170-
case err := <-readyErr:
171-
fmt.Println("Service failed to start:", err)
172185
case err := <-watchdogErr:
173186
fmt.Println("Watchdog failed:", err)
174187
case <-timerCh:

0 commit comments

Comments
 (0)