Skip to content

Add support for HA and multishard functionality in import APIs #9406

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

shivaji-kharse
Copy link
Contributor

Description

Please explain the changes you made here.

Checklist

  • Code compiles correctly and linting passes locally
  • For all code changes, an entry added to the CHANGELOG.md file describing and linking to
    this PR
  • Tests added for new functionality, or regression tests for bug fixes added as applicable
  • For public APIs, new features, etc., PR on
    docs repo staged and linked here

Instructions

  • The PR title should follow the Conventional Commits
    syntax, leading with fix:, feat:, chore:, ci:, etc.
  • The description should briefly explain what the PR is about. In the case of a bugfix, describe or
    link to the bug.
  • In the checklist section, check the boxes in that are applicable, using [x] syntax.
    • If not applicable, remove the entire line. Only leave the box unchecked if you intend to come
      back and check the box later.
  • Delete the Instructions line and everything below it, to indicate you have read and are
    following these instructions. 🙂

Thank you for your contribution to Dgraph!

@Copilot Copilot AI review requested due to automatic review settings May 13, 2025 12:37
@shivaji-kharse shivaji-kharse requested a review from a team as a code owner May 13, 2025 12:37
@github-actions github-actions bot added area/testing Testing related issues area/core internal mechanisms go Pull requests that update Go code labels May 13, 2025
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for high availability and multishard functionality in the import APIs by introducing a new streaming mechanism for propagating partition directory (P dir) updates across nodes. Key changes include:

  • Adding new stream processing functions and a helper for managing ongoing P dir tasks.
  • Introducing a new RPC (ReqPDirStream) and related message and client/server implementations.
  • Updating tests and client code to leverage the new streaming workflow.

Reviewed Changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
worker/import.go Adds streamProcessorSender support, a new stream task check, and updates the acknowledgment flow for stream processing.
worker/draft.go Introduces a new opStreamPDir constant and string conversion for task identification.
protos/pb/pb_grpc.pb.go Implements the new ReqPDirStream RPC handler methods and client stream handling.
protos/pb.proto Defines the new ReqPDirStreamRequest message and RPC in the Worker service.
edgraph/server.go Removes legacy drain mode invocation in favor of the new stream-based approach.
dgraph/cmd/dgraphimport/import_test.go Updates test parameters and introduces a longer fixed sleep period post-import.
dgraph/cmd/dgraphimport/import_client.go Refactors stream data logic to use the new RunBadgerStream method from the worker package.
Comments suppressed due to low confidence (1)

worker/import.go:319

  • The removal of sending the acknowledgment using stream.SendAndClose in this function appears to be intentional given the updated flow in InStream. Please verify that the ACK signal is being sent exactly once during stream processing to avoid protocol inconsistencies.
return nil

Comment on lines 127 to 128
time.Sleep(time.Minute * 3)
verifyImportResults(t, gc)
Copy link
Preview

Copilot AI May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Using a fixed sleep of 3 minutes may unnecessarily prolong test execution. Consider replacing the fixed delay with a synchronization mechanism or a condition-based wait to reduce test runtime.

Suggested change
time.Sleep(time.Minute * 3)
verifyImportResults(t, gc)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*3)
defer cancel()
for {
if verifyImportResults(t, gc) {
break
}
select {
case <-ctx.Done():
t.Fatal("Timeout waiting for import results to be ready")
default:
time.Sleep(time.Second * 5) // Poll every 5 seconds
}
}

Copilot uses AI. Check for mistakes.

Copy link

trunk-io bot commented May 13, 2025

Static BadgeStatic BadgeStatic BadgeStatic Badge

Failed Test Failure Summary Logs
TestVectorIndexWithoutSchema Error occurred while computing euclidean distance on vectors of different lengths. Logs ↗︎

View Full Report ↗︎Docs

Copy link

trunk-io bot commented May 26, 2025

Running Code Quality on PRs by uploading data to Trunk will soon be removed. You can still run checks on your PRs using trunk-action - see the migration guide for more information.

@shivaji-kharse shivaji-kharse force-pushed the shiva/stream-ha branch 3 times, most recently from f2a9fed to 506301a Compare May 28, 2025 06:52
}
// Wait for acknowledgment from the server
if _, err := out.CloseAndRecv(); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to close the steam even if there is an error before this line

require.NoError(t, targetCluster.HealthCheck(false))

if waitForSnapshot {
for grp, alphas := range alphaGroups {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we waiting for a snapshot?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when a stopped Alpha rejoins the group, it receives a Raft proposal and then requests the latest snapshot from the leader. We should wait for that process to complete — that's why we are waiting for the snapshot.

"github.com/pkg/errors"
)

var (
// the drainingMode variable should be accessed through the atomic.Store and atomic.Load
// functions. The value 0 means the draining-mode is disabled, and the value 1 means the
// mode is enabled
drainingMode uint32
drainingMode uint32
extSnapshotStreamingState uint32
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, so we don't want to rely on drain mode only, because it can be changed by other operations as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/core internal mechanisms area/testing Testing related issues go Pull requests that update Go code
Development

Successfully merging this pull request may close these issues.

2 participants