-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add support for HA and multishard functionality in import APIs #9406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for high availability and multishard functionality in the import APIs by introducing a new streaming mechanism for propagating partition directory (P dir) updates across nodes. Key changes include:
- Adding new stream processing functions and a helper for managing ongoing P dir tasks.
- Introducing a new RPC (ReqPDirStream) and related message and client/server implementations.
- Updating tests and client code to leverage the new streaming workflow.
Reviewed Changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
worker/import.go | Adds streamProcessorSender support, a new stream task check, and updates the acknowledgment flow for stream processing. |
worker/draft.go | Introduces a new opStreamPDir constant and string conversion for task identification. |
protos/pb/pb_grpc.pb.go | Implements the new ReqPDirStream RPC handler methods and client stream handling. |
protos/pb.proto | Defines the new ReqPDirStreamRequest message and RPC in the Worker service. |
edgraph/server.go | Removes legacy drain mode invocation in favor of the new stream-based approach. |
dgraph/cmd/dgraphimport/import_test.go | Updates test parameters and introduces a longer fixed sleep period post-import. |
dgraph/cmd/dgraphimport/import_client.go | Refactors stream data logic to use the new RunBadgerStream method from the worker package. |
Comments suppressed due to low confidence (1)
worker/import.go:319
- The removal of sending the acknowledgment using stream.SendAndClose in this function appears to be intentional given the updated flow in InStream. Please verify that the ACK signal is being sent exactly once during stream processing to avoid protocol inconsistencies.
return nil
time.Sleep(time.Minute * 3) | ||
verifyImportResults(t, gc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Using a fixed sleep of 3 minutes may unnecessarily prolong test execution. Consider replacing the fixed delay with a synchronization mechanism or a condition-based wait to reduce test runtime.
time.Sleep(time.Minute * 3) | |
verifyImportResults(t, gc) | |
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*3) | |
defer cancel() | |
for { | |
if verifyImportResults(t, gc) { | |
break | |
} | |
select { | |
case <-ctx.Done(): | |
t.Fatal("Timeout waiting for import results to be ready") | |
default: | |
time.Sleep(time.Second * 5) // Poll every 5 seconds | |
} | |
} |
Copilot uses AI. Check for mistakes.
|
2572f94
to
dc05732
Compare
Running Code Quality on PRs by uploading data to Trunk will soon be removed. You can still run checks on your PRs using trunk-action - see the migration guide for more information. |
f2a9fed
to
506301a
Compare
ada9899
to
eddd33c
Compare
} | ||
// Wait for acknowledgment from the server | ||
if _, err := out.CloseAndRecv(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to close the steam even if there is an error before this line
require.NoError(t, targetCluster.HealthCheck(false)) | ||
|
||
if waitForSnapshot { | ||
for grp, alphas := range alphaGroups { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we waiting for a snapshot?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So when a stopped Alpha rejoins the group, it receives a Raft proposal and then requests the latest snapshot from the leader. We should wait for that process to complete — that's why we are waiting for the snapshot.
"github.com/pkg/errors" | ||
) | ||
|
||
var ( | ||
// the drainingMode variable should be accessed through the atomic.Store and atomic.Load | ||
// functions. The value 0 means the draining-mode is disabled, and the value 1 means the | ||
// mode is enabled | ||
drainingMode uint32 | ||
drainingMode uint32 | ||
extSnapshotStreamingState uint32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, so we don't want to rely on drain mode only, because it can be changed by other operations as well.
Description
Please explain the changes you made here.
Checklist
CHANGELOG.md
file describing and linking tothis PR
docs repo staged and linked here
Instructions
syntax, leading with
fix:
,feat:
,chore:
,ci:
, etc.link to the bug.
[x]
syntax.back and check the box later.
Instructions
line and everything below it, to indicate you have read and arefollowing these instructions. 🙂
Thank you for your contribution to Dgraph!