Skip to content

perf(zero): Update delta frequency #9461

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,26 +225,56 @@
// Let's ensure that we have all the commits up until the max here.
// Otherwise, we'll be sending commit timestamps out of order, which
// would cause Alphas to drop some of them, during writes to Badger.

newDelta := &pb.OracleDelta{}
useNewDelta := false
if o.doneUntil.DoneUntil() < waitFor() {
continue // The for loop doing blocking reads from o.updates.
// We need at least one entry from the updates channel to pick up a missing update.
// Don't goto slurp_loop, because it would break from select immediately.
if len(delta.Txns) > 10 {
replacementTxn := []*pb.TxnStatus{}

ts := o.doneUntil.DoneUntil()
for _, txn := range delta.Txns {
if txn.CommitTs > ts {
replacementTxn = append(replacementTxn, txn)
} else {
newDelta.Txns = append(newDelta.Txns, txn)
newDelta.MaxAssigned = x.Max(newDelta.MaxAssigned, txn.CommitTs)
}
}

if len(newDelta.Txns) == 0 {
continue
}

useNewDelta = true
delta.Txns = replacementTxn
} else {
continue // The for loop doing blocking reads from o.updates.
// We need at least one entry from the updates channel to pick up a missing update.
// Don't goto slurp_loop, because it would break from select immediately.
}
}

if glog.V(3) {
glog.Infof("DoneUntil: %d. Sending delta: %+v\n", o.doneUntil.DoneUntil(), delta)
}
o.Lock()
k := *delta

Check failure on line 262 in dgraph/cmd/zero/oracle.go

View workflow job for this annotation

GitHub Actions / Trunk Check

golangci-lint2(govet)

[new] copylocks: assignment copies lock value to k: github.com/hypermodeinc/dgraph/v25/protos/pb.OracleDelta contains google.golang.org/protobuf/runtime/protoimpl.MessageState contains sync.Mutex
if useNewDelta {
k = *newDelta

Check failure on line 264 in dgraph/cmd/zero/oracle.go

View workflow job for this annotation

GitHub Actions / Trunk Check

golangci-lint2(govet)

[new] copylocks: assignment copies lock value to k: github.com/hypermodeinc/dgraph/v25/protos/pb.OracleDelta contains google.golang.org/protobuf/runtime/protoimpl.MessageState contains sync.Mutex
}
for id, ch := range o.subscribers {
select {
case ch <- *delta:
case ch <- k:
default:
close(ch)
delete(o.subscribers, id)
}
}
o.Unlock()
delta = &pb.OracleDelta{}
if useNewDelta {
delta = &pb.OracleDelta{}
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions posting/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/dgraph-io/badger/v4"
"github.com/golang/glog"
ostats "go.opencensus.io/stats"
"go.opentelemetry.io/otel/trace"

"github.com/hypermodeinc/dgraph/v25/protos/pb"
"github.com/hypermodeinc/dgraph/v25/tok/index"
Expand Down Expand Up @@ -54,6 +55,8 @@ type Txn struct {
lastUpdate time.Time

cache *LocalCache // This pointer does not get modified.

Span trace.Span
}

// struct to implement Txn interface from vector-indexer
Expand Down
41 changes: 40 additions & 1 deletion worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,12 @@ func (n *node) processApplyCh() {
glog.V(3).Infof("handling element in applyCh with #entries %v", len(entries))
defer glog.V(3).Infof("done handling element in applyCh")

_, spanHandler := otel.Tracer("applyCh").Start(context.Background(), "Alpha.processApplyCh")
defer spanHandler.End()

spanHandler.AddEvent("handling element in applyCh with #entries %v", trace.WithAttributes(
attribute.Int64("numEntries", int64(len(entries)))))

var totalSize int64
for _, entry := range entries {
x.AssertTrue(len(entry.Data) > 0)
Expand Down Expand Up @@ -909,7 +915,7 @@ func (n *node) processApplyCh() {
p := &P{err: perr, size: psz, seen: time.Now()}
previous[key] = p
}
span := trace.SpanFromContext(n.ctx)
span := trace.SpanFromContext(n.Ctx(key))
if perr != nil {
glog.Errorf("Applying proposal. Error: %v. Proposal: %q.", perr, &proposal)
span.AddEvent(fmt.Sprintf("Applying proposal failed. Error: %v Proposal: %q", perr, &proposal))
Expand All @@ -919,11 +925,22 @@ func (n *node) processApplyCh() {
attribute.Int64("key", int64(key)),
attribute.Int64("index", int64(proposal.Index)),
))
spanHandler.AddEvent("Applied proposal with key: %d, index: %d. Err: %v",
trace.WithAttributes(
attribute.Int64("key", int64(key)),
attribute.Int64("index", int64(proposal.Index)),
))

var tags []tag.Mutator
switch {
case proposal.Mutations != nil:
tags = append(tags, tag.Upsert(x.KeyMethod, "apply.Mutations"))
span.SetAttributes(attribute.Int64("start_ts", int64(proposal.Mutations.StartTs)))
txn := posting.Oracle().GetTxn(proposal.Mutations.StartTs)
if txn != nil {
txn.Span = span
}

case proposal.Delta != nil:
tags = append(tags, tag.Upsert(x.KeyMethod, "apply.Delta"))
}
Expand Down Expand Up @@ -966,10 +983,15 @@ func (n *node) processApplyCh() {

// TODO(Anurag - 4 May 2020): Are we using pkey? Remove if unused.
func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error {
pctx, span := otel.Tracer("alpha.CommitLoop").Start(context.Background(), "alpha.commitOrAbort")
defer span.End()

x.PrintOracleDelta(delta)
// First let's commit all mutations to disk.
writer := posting.NewTxnWriter(pstore)
toDisk := func(start, commit uint64) {
_, tspan := otel.Tracer("alpha.CommitLoop").Start(pctx, "alpha.toDisk")
defer tspan.End()
txn := posting.Oracle().GetTxn(start)
if txn == nil || commit == 0 {
return
Expand All @@ -995,6 +1017,12 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error {
start, commit, err)
panic(err)
}

tspan.AddEvent("Committed txn with start_ts: %d, commit_ts: %d", trace.WithAttributes(
attribute.Int64("start_ts", int64(start)),
attribute.Int64("commit_ts", int64(commit)),
))
tspan.SetAttributes(attribute.Int64("start_ts", int64(start)), attribute.Int64("commit_ts", int64(commit)))
}

for _, status := range delta.Txns {
Expand All @@ -1004,6 +1032,17 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error {
return errors.Wrapf(err, "while flushing to disk")
}

span.AddEvent("Flushed to disk")
for _, status := range delta.Txns {
txn := posting.Oracle().GetTxn(status.StartTs)
if txn != nil && txn.Span != nil {
txn.Span.AddEvent("Flushed txn with start_ts: %d, commit_ts: %d", trace.WithAttributes(
attribute.Int64("start_ts", int64(status.StartTs)),
attribute.Int64("commit_ts", int64(status.CommitTs)),
))
}
}

if x.WorkerConfig.HardSync {
if err := pstore.Sync(); err != nil {
glog.Errorf("Error while calling Sync while commitOrAbort: %v", err)
Expand Down
2 changes: 2 additions & 0 deletions worker/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) (perr
cctx, cancel := context.WithCancel(context.Background())
defer cancel()

cctx = trace.ContextWithSpan(cctx, span)

errCh := make(chan error, 1)
pctx := &conn.ProposalCtx{
ErrCh: errCh,
Expand Down
Loading