Skip to content

Commit 04445c2

Browse files
committed
added filter for vector lengths
1 parent aba5675 commit 04445c2

File tree

2 files changed

+7
-11
lines changed

2 files changed

+7
-11
lines changed

dgraph/cmd/zero/oracle.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ func (o *Oracle) sendDeltasToSubscribers() {
229229
newDelta := &pb.OracleDelta{}
230230
useNewDelta := false
231231
if o.doneUntil.DoneUntil() < waitFor() {
232-
if len(delta.Txns) > 5 {
232+
if len(delta.Txns) > 10 {
233233
replacementTxn := []*pb.TxnStatus{}
234234

235235
ts := o.doneUntil.DoneUntil()

worker/draft.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -980,13 +980,15 @@ func (n *node) processApplyCh() {
980980

981981
// TODO(Anurag - 4 May 2020): Are we using pkey? Remove if unused.
982982
func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error {
983-
_, span := otel.Tracer("alpha.CommitLoop").Start(context.Background(), "commitOrAbort")
983+
pctx, span := otel.Tracer("alpha.CommitLoop").Start(context.Background(), "alpha.commitOrAbort")
984984
defer span.End()
985985

986986
x.PrintOracleDelta(delta)
987987
// First let's commit all mutations to disk.
988988
writer := posting.NewTxnWriter(pstore)
989989
toDisk := func(start, commit uint64) {
990+
_, tspan := otel.Tracer("alpha.CommitLoop").Start(pctx, "alpha.toDisk")
991+
defer tspan.End()
990992
txn := posting.Oracle().GetTxn(start)
991993
if txn == nil || commit == 0 {
992994
return
@@ -1013,17 +1015,11 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error {
10131015
panic(err)
10141016
}
10151017

1016-
span.AddEvent("Committed txn with start_ts: %d, commit_ts: %d", trace.WithAttributes(
1018+
tspan.AddEvent("Committed txn with start_ts: %d, commit_ts: %d", trace.WithAttributes(
10171019
attribute.Int64("start_ts", int64(start)),
10181020
attribute.Int64("commit_ts", int64(commit)),
10191021
))
1020-
1021-
if txn.Span != nil {
1022-
txn.Span.AddEvent("Committed txn with start_ts: %d, commit_ts: %d", trace.WithAttributes(
1023-
attribute.Int64("start_ts", int64(start)),
1024-
attribute.Int64("commit_ts", int64(commit)),
1025-
))
1026-
}
1022+
tspan.SetAttributes(attribute.Int64("start_ts", int64(start)), attribute.Int64("commit_ts", int64(commit)))
10271023
}
10281024

10291025
for _, status := range delta.Txns {
@@ -1036,7 +1032,7 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error {
10361032
span.AddEvent("Flushed to disk")
10371033
for _, status := range delta.Txns {
10381034
txn := posting.Oracle().GetTxn(status.StartTs)
1039-
if txn.Span != nil {
1035+
if txn != nil && txn.Span != nil {
10401036
txn.Span.AddEvent("Flushed txn with start_ts: %d, commit_ts: %d", trace.WithAttributes(
10411037
attribute.Int64("start_ts", int64(status.StartTs)),
10421038
attribute.Int64("commit_ts", int64(status.CommitTs)),

0 commit comments

Comments
 (0)