Skip to content

Commit b724f83

Browse files
authored
DPA-1666: Support Virtual Stream IDs (#17764)
* Support virtual stream IDs in median and quote streams. * Move job-handling changesets to a submodule. * Add support virtual stream IDs. * lint * Better switch. * lint
1 parent 55a2c94 commit b724f83

13 files changed

+244
-87
lines changed

deployment/data-streams/changeset/jd_register_nodes.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func registerNodesForDON(e cldf.Environment, donName string, donID uint64, nodes
5656
})
5757

5858
labels = append(labels, &ptypes.Label{
59-
Key: utils.DonIdentifier(donID, donName),
59+
Key: utils.DonIDLabel(donID, donName),
6060
})
6161

6262
nodeID, err := e.Offchain.RegisterNode(e.GetContext(), &nodev1.RegisterNodeRequest{

deployment/data-streams/changeset/common.go renamed to deployment/data-streams/changeset/jobs/common.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package changeset
1+
package jobs
22

33
import (
44
"context"

deployment/data-streams/changeset/helpers_test.go renamed to deployment/data-streams/changeset/jobs/helpers_test.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ This file contains test helpers for the changeset package.
33
The filename has a suffix of "_test.go" in order to not be included in the production build.
44
*/
55

6-
package changeset
6+
package jobs
77

88
import (
99
"context"
10+
"math/rand"
11+
"strconv"
1012
"testing"
1113

1214
"github.com/ethereum/go-ethereum/common"
@@ -22,6 +24,7 @@ import (
2224
"github.com/smartcontractkit/chainlink/deployment/data-streams/changeset/testutil"
2325
"github.com/smartcontractkit/chainlink/deployment/data-streams/jd"
2426
"github.com/smartcontractkit/chainlink/deployment/data-streams/jobs"
27+
"github.com/smartcontractkit/chainlink/deployment/data-streams/utils/pointer"
2528
"github.com/smartcontractkit/chainlink/deployment/environment/devenv"
2629
"github.com/smartcontractkit/chainlink/deployment/environment/test"
2730
)
@@ -99,15 +102,17 @@ func sendTestStreamJobs(t *testing.T, e cldf.Environment, numOracles int, autoAp
99102
},
100103
Streams: []StreamSpecConfig{
101104
{
102-
StreamID: 1000001038,
105+
StreamID: randomStreamID(),
103106
Name: "ICP/USD-RefPrice",
104107
StreamType: jobs.StreamTypeQuote,
105108
ReportFields: jobs.QuoteReportFields{
106109
Bid: jobs.ReportFieldLLO{
107110
ResultPath: "data,bid",
111+
StreamID: pointer.To(strconv.FormatUint(uint64(randomStreamID()), 10)),
108112
},
109113
Benchmark: jobs.ReportFieldLLO{
110114
ResultPath: "data,mid",
115+
StreamID: pointer.To(strconv.FormatUint(uint64(randomStreamID()), 10)),
111116
},
112117
Ask: jobs.ReportFieldLLO{
113118
ResultPath: "data,ask",
@@ -159,3 +164,18 @@ func collectNodeNames(t *testing.T, e cldf.Environment, numOracles, numBootstrap
159164

160165
return bootstrapNodeNames, oracleNodeNames
161166
}
167+
168+
// Cache the streamIDs we've used during this test cycle, so we don't repeat them.
169+
var usedStreamIDs = make(map[uint32]struct{})
170+
171+
func randomStreamID() uint32 {
172+
var id uint32
173+
for {
174+
id = rand.Uint32()%1000000000 + 1000000000
175+
if _, ok := usedStreamIDs[id]; !ok {
176+
break
177+
}
178+
}
179+
usedStreamIDs[id] = struct{}{}
180+
return id
181+
}

deployment/data-streams/changeset/jd_distribute_llo_jobs.go renamed to deployment/data-streams/changeset/jobs/jd_distribute_llo_jobs.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package changeset
1+
package jobs
22

33
import (
44
"context"
@@ -65,7 +65,7 @@ func (CsDistributeLLOJobSpecs) Apply(e cldf.Environment, cfg CsDistributeLLOJobS
6565
// Add a label to the job spec to identify the related DON
6666
cfg.Labels = append(cfg.Labels,
6767
&ptypes.Label{
68-
Key: utils.DonIdentifier(cfg.Filter.DONID, cfg.Filter.DONName),
68+
Key: utils.DonIDLabel(cfg.Filter.DONID, cfg.Filter.DONName),
6969
},
7070
&ptypes.Label{
7171
Key: devenv.LabelJobTypeKey,
@@ -93,7 +93,7 @@ func (CsDistributeLLOJobSpecs) Apply(e cldf.Environment, cfg CsDistributeLLOJobS
9393
return cldf.ChangesetOutput{}, fmt.Errorf("failed to propose all jobs: %w", err)
9494
}
9595

96-
err = labelNodesForProposals(e.GetContext(), e.Offchain, allProposals, utils.DonIdentifier(cfg.Filter.DONID, cfg.Filter.DONName))
96+
err = labelNodesForProposals(e.GetContext(), e.Offchain, allProposals, utils.DonIDLabel(cfg.Filter.DONID, cfg.Filter.DONName))
9797
if err != nil {
9898
return cldf.ChangesetOutput{}, fmt.Errorf("failed to label nodes for proposals: %w", err)
9999
}
@@ -136,7 +136,7 @@ func generateBootstrapProposals(
136136
Op: ptypes.SelectorOp_EQ,
137137
},
138138
{
139-
Key: utils.DonIdentifier(cfg.Filter.DONID, cfg.Filter.DONName),
139+
Key: utils.DonIDLabel(cfg.Filter.DONID, cfg.Filter.DONName),
140140
Op: ptypes.SelectorOp_EXIST,
141141
},
142142
})
@@ -243,7 +243,7 @@ func generateOracleProposals(
243243
Op: ptypes.SelectorOp_EQ,
244244
},
245245
{
246-
Key: utils.DonIdentifier(cfg.Filter.DONID, cfg.Filter.DONName),
246+
Key: utils.DonIDLabel(cfg.Filter.DONID, cfg.Filter.DONName),
247247
Op: ptypes.SelectorOp_EXIST,
248248
},
249249
})
@@ -309,10 +309,10 @@ func getBootstrapMultiAddr(ctx context.Context, e cldf.Environment, cfg CsDistri
309309
respBoots, err := e.Offchain.ListNodes(ctx, &node.ListNodesRequest{
310310
Filter: &node.ListNodesRequest_Filter{
311311
Selectors: []*ptypes.Selector{
312-
// We can afford to filter by DonIdentifier here because if the caller didn't provide any bootstrap node IDs,
313-
// then they are updating an existing job spec and the bootstrap nodes are already labeled with the DON ID.
312+
// We can afford to filter by DonIDLabel here because if the caller didn't provide any bootstrap node IDs,
313+
// then they are updating an existing job spec and the bootstrap nodes are already labelled with the DON ID.
314314
{
315-
Key: utils.DonIdentifier(cfg.Filter.DONID, cfg.Filter.DONName),
315+
Key: utils.DonIDLabel(cfg.Filter.DONID, cfg.Filter.DONName),
316316
Op: ptypes.SelectorOp_EXIST,
317317
},
318318
{

deployment/data-streams/changeset/jd_distribute_llo_jobs_test.go renamed to deployment/data-streams/changeset/jobs/jd_distribute_llo_jobs_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package changeset
1+
package jobs
22

33
import (
44
"strings"
@@ -69,7 +69,7 @@ donID = 1
6969
servers = {'mercury-pipeline-testnet-producer.TEST.cldev.cloud:1340' = '0000005187b1498c0ccb2e56d5ee8040a03a4955822ed208749b474058fc3f9c'}
7070
`
7171

72-
bootstrapSpec := `name = 'bootstrap'
72+
bootstrapSpec := `name = 'don | 1'
7373
type = 'bootstrap'
7474
schemaVersion = 1
7575
contractID = '0x4170ed0880ac9a755fd29b2688956bd959f923f4'

deployment/data-streams/changeset/jd_distribute_stream_jobs.go renamed to deployment/data-streams/changeset/jobs/jd_distribute_stream_jobs.go

Lines changed: 90 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package changeset
1+
package jobs
22

33
import (
44
"context"
@@ -56,7 +56,11 @@ func (CsDistributeStreamJobSpecs) Apply(e cldf.Environment, cfg CsDistributeStre
5656
// Add a label to the job spec to identify the related DON
5757
cfg.Labels = append(cfg.Labels,
5858
&ptypes.Label{
59-
Key: utils.DonIdentifier(cfg.Filter.DONID, cfg.Filter.DONName),
59+
Key: utils.DonIDLabel(cfg.Filter.DONID, cfg.Filter.DONName),
60+
},
61+
&ptypes.Label{
62+
Key: devenv.LabelJobTypeKey,
63+
Value: pointer.To(devenv.LabelJobTypeValueStream),
6064
},
6165
)
6266

@@ -67,24 +71,38 @@ func (CsDistributeStreamJobSpecs) Apply(e cldf.Environment, cfg CsDistributeStre
6771

6872
var proposals []*jobv1.ProposeJobRequest
6973
for _, s := range cfg.Streams {
70-
for _, n := range oracleNodes {
71-
localLabels := append(cfg.Labels, //nolint: gocritic // locally modified copy of labels
72-
&ptypes.Label{
73-
Key: devenv.LabelStreamIDKey,
74-
Value: pointer.To(strconv.FormatUint(uint64(s.StreamID), 10)),
75-
},
76-
&ptypes.Label{
77-
Key: devenv.LabelJobTypeKey,
78-
Value: pointer.To(devenv.LabelJobTypeValueStream),
79-
},
80-
)
74+
// Start with the common labels.
75+
streamLabels := append([]*ptypes.Label{}, cfg.Labels...)
76+
// Some streams might not have an ID.
77+
if s.StreamID > 0 {
78+
streamLabels = append(streamLabels, &ptypes.Label{
79+
Key: utils.StreamIDLabel(s.StreamID),
80+
Value: pointer.To(s.Name),
81+
})
82+
}
83+
virtualStreamIDLabels, err := streamIDLabelsFromReportFields(s.ReportFields)
84+
if err != nil {
85+
return cldf.ChangesetOutput{}, fmt.Errorf("failed to get streamID labels: %w", err)
86+
}
87+
streamLabels = append(streamLabels, virtualStreamIDLabels...)
8188

89+
for _, n := range oracleNodes {
90+
// Check if there is already a job spec for this stream on this node:
91+
streamID := s.StreamID
92+
if streamID == 0 {
93+
if len(virtualStreamIDLabels) == 0 {
94+
return cldf.ChangesetOutput{}, fmt.Errorf("no top level or virtual streamID found for stream %s", s.Name)
95+
}
96+
streamID, err = utils.StreamIDFromLabel(virtualStreamIDLabels[0].Key)
97+
if err != nil {
98+
return cldf.ChangesetOutput{}, fmt.Errorf("failed to parse streamID from label: %w", err)
99+
}
100+
}
82101
// Check if there is already a job spec for this stream on this node:
83102
externalJobID, err := fetchExternalJobID(e, n.Id, []*ptypes.Selector{
84103
{
85-
Key: devenv.LabelStreamIDKey,
86-
Value: pointer.To(strconv.FormatUint(uint64(s.StreamID), 10)),
87-
Op: ptypes.SelectorOp_EQ,
104+
Key: utils.StreamIDLabel(streamID),
105+
Op: ptypes.SelectorOp_EXIST,
88106
},
89107
})
90108
if err != nil {
@@ -104,7 +122,7 @@ func (CsDistributeStreamJobSpecs) Apply(e cldf.Environment, cfg CsDistributeStre
104122
proposals = append(proposals, &jobv1.ProposeJobRequest{
105123
NodeId: n.Id,
106124
Spec: string(renderedSpec),
107-
Labels: localLabels,
125+
Labels: streamLabels,
108126
})
109127
}
110128
}
@@ -156,6 +174,61 @@ func generateDatasources(cc StreamSpecConfig) []jobs.Datasource {
156174
return dss
157175
}
158176

177+
// streamIDLabelsFromReportFields returns a list of labels for the virtual streamIDs from the report fields.
178+
// This function does NOT return nil, it returns an empty slice if no labels are found.
179+
func streamIDLabelsFromReportFields(rf jobs.ReportFields) ([]*ptypes.Label, error) {
180+
labels := []*ptypes.Label{}
181+
182+
switch rf := rf.(type) {
183+
case jobs.MedianReportFields:
184+
l, err := streamIDLabelsFor(rf.Benchmark.StreamID)
185+
if err != nil {
186+
return nil, err
187+
}
188+
labels = append(labels, l...)
189+
190+
case jobs.QuoteReportFields:
191+
l, err := streamIDLabelsFor(rf.Benchmark.StreamID)
192+
if err != nil {
193+
return nil, err
194+
}
195+
labels = append(labels, l...)
196+
l, err = streamIDLabelsFor(rf.Bid.StreamID)
197+
if err != nil {
198+
return nil, err
199+
}
200+
labels = append(labels, l...)
201+
l, err = streamIDLabelsFor(rf.Ask.StreamID)
202+
if err != nil {
203+
return nil, err
204+
}
205+
labels = append(labels, l...)
206+
207+
default:
208+
return nil, fmt.Errorf("unknown report fields type: %T", rf)
209+
}
210+
211+
return labels, nil
212+
}
213+
214+
// streamIDLabelsFor returns a list of labels for the streamID.
215+
// We intentionally return a list, so we can return an empty one.
216+
func streamIDLabelsFor(sid *string) ([]*ptypes.Label, error) {
217+
if sid == nil {
218+
// It's fine to not have a streamID in the report fields.
219+
return nil, nil
220+
}
221+
id, err := strconv.ParseUint(*sid, 10, 32)
222+
if err != nil {
223+
return nil, fmt.Errorf("failed to parse streamID: %w", err)
224+
}
225+
return []*ptypes.Label{
226+
{
227+
Key: utils.StreamIDLabel(uint32(id)),
228+
},
229+
}, nil
230+
}
231+
159232
func (f CsDistributeStreamJobSpecs) VerifyPreconditions(_ cldf.Environment, config CsDistributeStreamJobSpecsConfig) error {
160233
if config.Filter == nil {
161234
return errors.New("filter is required")

deployment/data-streams/changeset/jd_distribute_stream_jobs_test.go renamed to deployment/data-streams/changeset/jobs/jd_distribute_stream_jobs_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package changeset
1+
package jobs
22

33
import (
44
"testing"
@@ -96,7 +96,7 @@ ds1_payload -> ds1_benchmark -> benchmark_price;
9696
ds2_payload -> ds2_benchmark -> benchmark_price;
9797
ds3_payload -> ds3_benchmark -> benchmark_price;
9898
ds4_payload -> ds4_benchmark -> benchmark_price;
99-
benchmark_price [type=median allowedFaults=3 index=0];
99+
benchmark_price [type=median allowedFaults=3 streamID=1234 index=0];
100100
101101
ds1_payload -> ds1_bid -> bid_price;
102102
ds2_payload -> ds2_bid -> bid_price;
@@ -130,6 +130,8 @@ ask_price [type=median allowedFaults=3 index=2];
130130
},
131131
Benchmark: jobs.ReportFieldLLO{
132132
ResultPath: "data,mid",
133+
// We intentionally set just one virtual stream ID.
134+
StreamID: pointer.To("1234"),
133135
},
134136
Ask: jobs.ReportFieldLLO{
135137
ResultPath: "data,ask",

0 commit comments

Comments
 (0)