-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add support for HA and multishard functionality in import APIs #9406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,16 +7,13 @@ package dgraphimport | |
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"math" | ||
"os" | ||
"path/filepath" | ||
|
||
"github.com/dgraph-io/badger/v4" | ||
apiv2 "github.com/dgraph-io/dgo/v250/protos/api.v2" | ||
"github.com/dgraph-io/ristretto/v2/z" | ||
"github.com/hypermodeinc/dgraph/v25/worker" | ||
|
||
"github.com/golang/glog" | ||
"golang.org/x/sync/errgroup" | ||
|
@@ -48,10 +45,12 @@ func Import(ctx context.Context, endpoint string, opts grpc.DialOption, bulkOutD | |
} | ||
|
||
// startPDirStream initiates a snapshot stream session with the Dgraph server. | ||
func startPDirStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.InitiatePDirStreamResponse, error) { | ||
func startPDirStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.UpdateExtSnapshotStreamingStateResponse, error) { | ||
glog.Info("Initiating pdir stream") | ||
req := &apiv2.InitiatePDirStreamRequest{} | ||
resp, err := dc.InitiatePDirStream(ctx, req) | ||
req := &apiv2.UpdateExtSnapshotStreamingStateRequest{ | ||
Start: true, | ||
} | ||
resp, err := dc.UpdateExtSnapshotStreamingState(ctx, req) | ||
if err != nil { | ||
glog.Errorf("failed to initiate pdir stream: %v", err) | ||
return nil, fmt.Errorf("failed to initiate pdir stream: %v", err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here |
||
|
@@ -63,7 +62,7 @@ func startPDirStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.Initiat | |
// sendPDir takes a p directory and a set of group IDs and streams the data from the | ||
// p directory to the corresponding group IDs. It first scans the provided directory for | ||
// subdirectories named with numeric group IDs. | ||
func sendPDir(ctx context.Context, dg apiv2.DgraphClient, baseDir string, groups []uint32) error { | ||
func sendPDir(ctx context.Context, dc apiv2.DgraphClient, baseDir string, groups []uint32) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. streamSnapshot |
||
glog.Infof("Starting to stream pdir from directory: %s", baseDir) | ||
|
||
errG, ctx := errgroup.WithContext(ctx) | ||
|
@@ -74,31 +73,43 @@ func sendPDir(ctx context.Context, dg apiv2.DgraphClient, baseDir string, groups | |
if _, err := os.Stat(pDir); err != nil { | ||
return fmt.Errorf("p directory does not exist for group [%d]: [%s]", group, pDir) | ||
} | ||
|
||
glog.Infof("Streaming data for group [%d] from directory: [%s]", group, pDir) | ||
if err := streamData(ctx, dg, pDir, group); err != nil { | ||
if err := streamData(ctx, dc, pDir, group); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. streamDataForGroup | streamSnapshotForGroup |
||
glog.Errorf("Failed to stream data for groups [%v] from directory: [%s]: %v", group, pDir, err) | ||
return err | ||
} | ||
|
||
return nil | ||
}) | ||
} | ||
if err := errG.Wait(); err != nil { | ||
return err | ||
if err1 := errG.Wait(); err1 != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. err1 --> err |
||
// If the p directory doesn't exist for this group, it indicates that | ||
shivaji-kharse marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If errors occurs during streaming of the external snapshot, we drop all the data and go back to ensure a clean slate and the cluster remains in working state. |
||
// streaming might be in progress to other groups. We disable drain mode | ||
// to prevent interference and drop any streamed data to ensure a clean state. | ||
req := &apiv2.UpdateExtSnapshotStreamingStateRequest{ | ||
Start: false, | ||
Finish: true, | ||
DropData: true, | ||
} | ||
if _, err := dc.UpdateExtSnapshotStreamingState(context.Background(), req); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should pass the context received in this function as argument |
||
return fmt.Errorf("failed to stream data :%v failed to off drain mode: %v", err1, err) | ||
shivaji-kharse marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
glog.Info("successfully disabled drain mode") | ||
shivaji-kharse marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return err1 | ||
} | ||
|
||
glog.Infof("Completed streaming all pdirs") | ||
glog.Info("Completed streaming all pdirs") | ||
return nil | ||
} | ||
|
||
// streamData handles the actual data streaming process for a single group. | ||
// It opens the BadgerDB at the specified directory and streams all data to the server. | ||
func streamData(ctx context.Context, dg apiv2.DgraphClient, pdir string, groupId uint32) error { | ||
func streamData(ctx context.Context, dc apiv2.DgraphClient, pdir string, groupId uint32) error { | ||
glog.Infof("Opening stream for group %d from directory %s", groupId, pdir) | ||
|
||
// Initialize stream with the server | ||
out, err := dg.StreamPDir(ctx) | ||
out, err := dc.StreamExtSnapshot(ctx) | ||
if err != nil { | ||
return fmt.Errorf("failed to start pdir stream for group %d: %w", groupId, err) | ||
} | ||
|
@@ -118,41 +129,23 @@ func streamData(ctx context.Context, dg apiv2.DgraphClient, pdir string, groupId | |
|
||
// Send group ID as the first message in the stream | ||
glog.Infof("Sending group ID [%d] to server", groupId) | ||
groupReq := &apiv2.StreamPDirRequest{GroupId: groupId} | ||
groupReq := &apiv2.StreamExtSnapshotRequest{GroupId: groupId} | ||
if err := out.Send(groupReq); err != nil { | ||
return fmt.Errorf("failed to send group ID [%d]: %w", groupId, err) | ||
} | ||
|
||
// Configure and start the BadgerDB stream | ||
glog.Infof("Starting BadgerDB stream for group [%d]", groupId) | ||
stream := ps.NewStreamAt(math.MaxUint64) | ||
stream.LogPrefix = fmt.Sprintf("Sending P dir for group [%d]", groupId) | ||
stream.KeyToList = nil | ||
stream.Send = func(buf *z.Buffer) error { | ||
p := &apiv2.StreamPacket{Data: buf.Bytes()} | ||
if err := out.Send(&apiv2.StreamPDirRequest{StreamPacket: p}); err != nil && !errors.Is(err, io.EOF) { | ||
return fmt.Errorf("failed to send data chunk: %w", err) | ||
} | ||
return nil | ||
} | ||
|
||
// Execute the stream process | ||
if err := stream.Orchestrate(ctx); err != nil { | ||
return fmt.Errorf("stream orchestration failed for group [%d]: %w", groupId, err) | ||
} | ||
|
||
// Send the final 'done' signal to mark completion | ||
glog.Infof("Sending completion signal for group [%d]", groupId) | ||
done := &apiv2.StreamPacket{Done: true} | ||
// if err := RunBadgerStream(ctx, ps, out, groupId); err != nil { | ||
shivaji-kharse marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// return fmt.Errorf("badger stream failed for group [%d]: %w", groupId, err) | ||
// } | ||
|
||
if err := out.Send(&apiv2.StreamPDirRequest{StreamPacket: done}); err != nil && !errors.Is(err, io.EOF) { | ||
return fmt.Errorf("failed to send 'done' signal for group [%d]: %w", groupId, err) | ||
if err := worker.RunBadgerStream(ctx, ps, out, groupId); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. move the function 'RunBadgerStream` back into this package |
||
return fmt.Errorf("badger stream failed for group [%d]: %w", groupId, err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. badger stream -> badger streaming |
||
} | ||
// Wait for acknowledgment from the server | ||
if _, err := out.CloseAndRecv(); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to close the steam even if there is an error before this line |
||
return fmt.Errorf("failed to receive ACK for group [%d]: %w", groupId, err) | ||
} | ||
glog.Infof("Group [%d]: Received ACK ", groupId) | ||
|
||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initiateSnapshotStream, we can also get rid of this function