Skip to content

DPA-1666: Support Virtual Stream IDs #17764

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 20, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deployment/data-streams/changeset/jd_register_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func registerNodesForDON(e cldf.Environment, donName string, donID uint64, nodes
})

labels = append(labels, &ptypes.Label{
Key: utils.DonIdentifier(donID, donName),
Key: utils.DonIDLabel(donID, donName),
})

nodeID, err := e.Offchain.RegisterNode(e.GetContext(), &nodev1.RegisterNodeRequest{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package changeset
package jobs

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ This file contains test helpers for the changeset package.
The filename has a suffix of "_test.go" in order to not be included in the production build.
*/

package changeset
package jobs

import (
"context"
"math/rand"
"strconv"
"testing"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -22,6 +24,7 @@ import (
"github.com/smartcontractkit/chainlink/deployment/data-streams/changeset/testutil"
"github.com/smartcontractkit/chainlink/deployment/data-streams/jd"
"github.com/smartcontractkit/chainlink/deployment/data-streams/jobs"
"github.com/smartcontractkit/chainlink/deployment/data-streams/utils/pointer"
"github.com/smartcontractkit/chainlink/deployment/environment/devenv"
"github.com/smartcontractkit/chainlink/deployment/environment/test"
)
Expand Down Expand Up @@ -99,15 +102,17 @@ func sendTestStreamJobs(t *testing.T, e cldf.Environment, numOracles int, autoAp
},
Streams: []StreamSpecConfig{
{
StreamID: 1000001038,
StreamID: randomStreamID(),
Name: "ICP/USD-RefPrice",
StreamType: jobs.StreamTypeQuote,
ReportFields: jobs.QuoteReportFields{
Bid: jobs.ReportFieldLLO{
ResultPath: "data,bid",
StreamID: pointer.To(strconv.FormatUint(uint64(randomStreamID()), 10)),
},
Benchmark: jobs.ReportFieldLLO{
ResultPath: "data,mid",
StreamID: pointer.To(strconv.FormatUint(uint64(randomStreamID()), 10)),
},
Ask: jobs.ReportFieldLLO{
ResultPath: "data,ask",
Expand Down Expand Up @@ -159,3 +164,18 @@ func collectNodeNames(t *testing.T, e cldf.Environment, numOracles, numBootstrap

return bootstrapNodeNames, oracleNodeNames
}

// Cache the streamIDs we've used during this test cycle, so we don't repeat them.
var usedStreamIDs = make(map[uint32]struct{})

func randomStreamID() uint32 {
var id uint32
for {
id = rand.Uint32()%1000000000 + 1000000000
if _, ok := usedStreamIDs[id]; !ok {
break
}
}
usedStreamIDs[id] = struct{}{}
return id
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package changeset
package jobs

import (
"context"
Expand Down Expand Up @@ -65,7 +65,7 @@ func (CsDistributeLLOJobSpecs) Apply(e cldf.Environment, cfg CsDistributeLLOJobS
// Add a label to the job spec to identify the related DON
cfg.Labels = append(cfg.Labels,
&ptypes.Label{
Key: utils.DonIdentifier(cfg.Filter.DONID, cfg.Filter.DONName),
Key: utils.DonIDLabel(cfg.Filter.DONID, cfg.Filter.DONName),
},
&ptypes.Label{
Key: devenv.LabelJobTypeKey,
Expand Down Expand Up @@ -93,7 +93,7 @@ func (CsDistributeLLOJobSpecs) Apply(e cldf.Environment, cfg CsDistributeLLOJobS
return cldf.ChangesetOutput{}, fmt.Errorf("failed to propose all jobs: %w", err)
}

err = labelNodesForProposals(e.GetContext(), e.Offchain, allProposals, utils.DonIdentifier(cfg.Filter.DONID, cfg.Filter.DONName))
err = labelNodesForProposals(e.GetContext(), e.Offchain, allProposals, utils.DonIDLabel(cfg.Filter.DONID, cfg.Filter.DONName))
if err != nil {
return cldf.ChangesetOutput{}, fmt.Errorf("failed to label nodes for proposals: %w", err)
}
Expand Down Expand Up @@ -136,7 +136,7 @@ func generateBootstrapProposals(
Op: ptypes.SelectorOp_EQ,
},
{
Key: utils.DonIdentifier(cfg.Filter.DONID, cfg.Filter.DONName),
Key: utils.DonIDLabel(cfg.Filter.DONID, cfg.Filter.DONName),
Op: ptypes.SelectorOp_EXIST,
},
})
Expand Down Expand Up @@ -243,7 +243,7 @@ func generateOracleProposals(
Op: ptypes.SelectorOp_EQ,
},
{
Key: utils.DonIdentifier(cfg.Filter.DONID, cfg.Filter.DONName),
Key: utils.DonIDLabel(cfg.Filter.DONID, cfg.Filter.DONName),
Op: ptypes.SelectorOp_EXIST,
},
})
Expand Down Expand Up @@ -309,10 +309,10 @@ func getBootstrapMultiAddr(ctx context.Context, e cldf.Environment, cfg CsDistri
respBoots, err := e.Offchain.ListNodes(ctx, &node.ListNodesRequest{
Filter: &node.ListNodesRequest_Filter{
Selectors: []*ptypes.Selector{
// We can afford to filter by DonIdentifier here because if the caller didn't provide any bootstrap node IDs,
// then they are updating an existing job spec and the bootstrap nodes are already labeled with the DON ID.
// We can afford to filter by DonIDLabel here because if the caller didn't provide any bootstrap node IDs,
// then they are updating an existing job spec and the bootstrap nodes are already labelled with the DON ID.
{
Key: utils.DonIdentifier(cfg.Filter.DONID, cfg.Filter.DONName),
Key: utils.DonIDLabel(cfg.Filter.DONID, cfg.Filter.DONName),
Op: ptypes.SelectorOp_EXIST,
},
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package changeset
package jobs

import (
"strings"
Expand Down Expand Up @@ -69,7 +69,7 @@ donID = 1
servers = {'mercury-pipeline-testnet-producer.TEST.cldev.cloud:1340' = '0000005187b1498c0ccb2e56d5ee8040a03a4955822ed208749b474058fc3f9c'}
`

bootstrapSpec := `name = 'bootstrap'
bootstrapSpec := `name = 'don | 1'
type = 'bootstrap'
schemaVersion = 1
contractID = '0x4170ed0880ac9a755fd29b2688956bd959f923f4'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package changeset
package jobs

import (
"context"
Expand Down Expand Up @@ -56,7 +56,11 @@
// Add a label to the job spec to identify the related DON
cfg.Labels = append(cfg.Labels,
&ptypes.Label{
Key: utils.DonIdentifier(cfg.Filter.DONID, cfg.Filter.DONName),
Key: utils.DonIDLabel(cfg.Filter.DONID, cfg.Filter.DONName),
},
&ptypes.Label{
Key: devenv.LabelJobTypeKey,
Value: pointer.To(devenv.LabelJobTypeValueStream),
},
)

Expand All @@ -67,24 +71,38 @@

var proposals []*jobv1.ProposeJobRequest
for _, s := range cfg.Streams {
for _, n := range oracleNodes {
localLabels := append(cfg.Labels, //nolint: gocritic // locally modified copy of labels
&ptypes.Label{
Key: devenv.LabelStreamIDKey,
Value: pointer.To(strconv.FormatUint(uint64(s.StreamID), 10)),
},
&ptypes.Label{
Key: devenv.LabelJobTypeKey,
Value: pointer.To(devenv.LabelJobTypeValueStream),
},
)
// Start with the common labels.
streamLabels := append([]*ptypes.Label{}, cfg.Labels...)
// Some streams might not have an ID.
if s.StreamID > 0 {
streamLabels = append(streamLabels, &ptypes.Label{
Key: utils.StreamIDLabel(s.StreamID),
Value: pointer.To(s.Name),
})
}
virtualStreamIDLabels, err := streamIDLabelsFromReportFields(s.ReportFields)
if err != nil {
return cldf.ChangesetOutput{}, fmt.Errorf("failed to get streamID labels: %w", err)
}
streamLabels = append(streamLabels, virtualStreamIDLabels...)

for _, n := range oracleNodes {
// Check if there is already a job spec for this stream on this node:
streamID := s.StreamID
if streamID == 0 {
if len(virtualStreamIDLabels) == 0 {
return cldf.ChangesetOutput{}, fmt.Errorf("no top level or virtual streamID found for stream %s", s.Name)
}
streamID, err = utils.StreamIDFromLabel(virtualStreamIDLabels[0].Key)
if err != nil {
return cldf.ChangesetOutput{}, fmt.Errorf("failed to parse streamID from label: %w", err)
}
}
// Check if there is already a job spec for this stream on this node:
externalJobID, err := fetchExternalJobID(e, n.Id, []*ptypes.Selector{
{
Key: devenv.LabelStreamIDKey,
Value: pointer.To(strconv.FormatUint(uint64(s.StreamID), 10)),
Op: ptypes.SelectorOp_EQ,
Key: utils.StreamIDLabel(streamID),
Op: ptypes.SelectorOp_EXIST,
},
})
if err != nil {
Expand All @@ -104,7 +122,7 @@
proposals = append(proposals, &jobv1.ProposeJobRequest{
NodeId: n.Id,
Spec: string(renderedSpec),
Labels: localLabels,
Labels: streamLabels,
})
}
}
Expand Down Expand Up @@ -156,6 +174,63 @@
return dss
}

// streamIDLabelsFromReportFields returns a list of labels for the virtual streamIDs from the report fields.
// This function does NOT return nil, it returns an empty slice if no labels are found.
func streamIDLabelsFromReportFields(rf jobs.ReportFields) ([]*ptypes.Label, error) {
labels := []*ptypes.Label{}

switch rf.(type) {

Check failure on line 182 in deployment/data-streams/changeset/jobs/jd_distribute_stream_jobs.go

View workflow job for this annotation

GitHub Actions / GolangCI Lint (deployment)

typeSwitchVar: 2 cases can benefit from type switch with assignment (gocritic)
case jobs.MedianReportFields:
median := rf.(jobs.MedianReportFields)

Check failure on line 184 in deployment/data-streams/changeset/jobs/jd_distribute_stream_jobs.go

View workflow job for this annotation

GitHub Actions / GolangCI Lint (deployment)

S1034(related information): could eliminate this type assertion (gosimple)
l, err := streamIDLabelsFor(median.Benchmark.StreamID)
if err != nil {
return nil, err
}
labels = append(labels, l...)

case jobs.QuoteReportFields:
quote := rf.(jobs.QuoteReportFields)

Check failure on line 192 in deployment/data-streams/changeset/jobs/jd_distribute_stream_jobs.go

View workflow job for this annotation

GitHub Actions / GolangCI Lint (deployment)

S1034(related information): could eliminate this type assertion (gosimple)
l, err := streamIDLabelsFor(quote.Benchmark.StreamID)
if err != nil {
return nil, err
}
labels = append(labels, l...)
l, err = streamIDLabelsFor(quote.Bid.StreamID)
if err != nil {
return nil, err
}
labels = append(labels, l...)
l, err = streamIDLabelsFor(quote.Ask.StreamID)
if err != nil {
return nil, err
}
labels = append(labels, l...)

default:
return nil, fmt.Errorf("unknown report fields type: %T", rf)
}

return labels, nil
}

// streamIDLabelsFor returns a list of labels for the streamID.
// We intentionally return a list, so we can return an empty one.
func streamIDLabelsFor(sid *string) ([]*ptypes.Label, error) {
if sid == nil {
// It's fine to not have a streamID in the report fields.
return nil, nil
}
id, err := strconv.ParseUint(*sid, 10, 32)
if err != nil {
return nil, fmt.Errorf("failed to parse streamID: %w", err)
}
return []*ptypes.Label{
{
Key: utils.StreamIDLabel(uint32(id)),
},
}, nil
}

func (f CsDistributeStreamJobSpecs) VerifyPreconditions(_ cldf.Environment, config CsDistributeStreamJobSpecsConfig) error {
if config.Filter == nil {
return errors.New("filter is required")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package changeset
package jobs

import (
"testing"
Expand Down Expand Up @@ -96,7 +96,7 @@ ds1_payload -> ds1_benchmark -> benchmark_price;
ds2_payload -> ds2_benchmark -> benchmark_price;
ds3_payload -> ds3_benchmark -> benchmark_price;
ds4_payload -> ds4_benchmark -> benchmark_price;
benchmark_price [type=median allowedFaults=3 index=0];
benchmark_price [type=median allowedFaults=3 streamID=1234 index=0];

ds1_payload -> ds1_bid -> bid_price;
ds2_payload -> ds2_bid -> bid_price;
Expand Down Expand Up @@ -130,6 +130,8 @@ ask_price [type=median allowedFaults=3 index=2];
},
Benchmark: jobs.ReportFieldLLO{
ResultPath: "data,mid",
// We intentionally set just one virtual stream ID.
StreamID: pointer.To("1234"),
},
Ask: jobs.ReportFieldLLO{
ResultPath: "data,ask",
Expand Down
Loading
Loading