Skip to content

Commit 41627b4

Browse files
committed
fix(stream): use cursors and _time ordering
1 parent 4dfe3ed commit 41627b4

File tree

1 file changed

+19
-22
lines changed

1 file changed

+19
-22
lines changed

internal/cmd/stream/stream.go

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/AlecAivazis/survey/v2"
1010
"github.com/MakeNowJust/heredoc"
11+
"github.com/axiomhq/axiom-go/axiom/ingest"
1112
"github.com/axiomhq/axiom-go/axiom/querylegacy"
1213
"github.com/spf13/cobra"
1314

@@ -17,8 +18,6 @@ import (
1718
"github.com/axiomhq/cli/pkg/iofmt"
1819
)
1920

20-
const streamingDuration = time.Second * 2
21-
2221
type options struct {
2322
*cmdutil.Factory
2423

@@ -131,28 +130,32 @@ func run(ctx context.Context, opts *options) error {
131130
fmt.Fprintf(opts.IO.Out(), "Streaming events from dataset %s:\n\n", cs.Bold(opts.Dataset))
132131
}
133132

134-
t := time.NewTicker(streamingDuration)
135-
defer t.Stop()
136-
137-
lastRequest := time.Now().Add(-time.Nanosecond)
133+
var (
134+
start = time.Now().Add(-time.Nanosecond)
135+
cursor = ""
136+
)
138137
for {
139-
queryCtx, queryCancel := context.WithTimeout(ctx, streamingDuration)
140-
141-
res, err := client.Datasets.QueryLegacy(queryCtx, opts.Dataset, querylegacy.Query{
142-
StartTime: lastRequest,
138+
res, err := client.Datasets.QueryLegacy(ctx, opts.Dataset, querylegacy.Query{
139+
StartTime: start,
143140
EndTime: time.Now(),
141+
Order: []querylegacy.Order{
142+
{
143+
Field: ingest.TimestampField,
144+
},
145+
},
146+
Cursor: cursor,
144147
}, querylegacy.Options{
145-
StreamingDuration: streamingDuration,
148+
StreamingDuration: time.Second * 5,
146149
})
147-
if err != nil && !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
148-
queryCancel()
150+
if err != nil {
151+
if errors.Is(err, context.Canceled) {
152+
return nil
153+
}
149154
return err
150155
}
151156

152-
queryCancel()
153-
154157
if res != nil && len(res.Matches) > 0 {
155-
lastRequest = res.Matches[len(res.Matches)-1].Time.Add(time.Nanosecond)
158+
cursor = res.Matches[len(res.Matches)-1].RowID
156159

157160
for _, entry := range res.Matches {
158161
var data any
@@ -168,11 +171,5 @@ func run(ctx context.Context, opts *options) error {
168171
}
169172
}
170173
}
171-
172-
select {
173-
case <-ctx.Done():
174-
return nil
175-
case <-t.C:
176-
}
177174
}
178175
}

0 commit comments

Comments
 (0)