Skip to content

Add support for HA and multishard functionality in import APIs #9406

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 32 additions & 39 deletions dgraph/cmd/dgraphimport/import_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@ package dgraphimport

import (
"context"
"errors"
"fmt"
"io"
"math"
"os"
"path/filepath"

"github.com/dgraph-io/badger/v4"
apiv2 "github.com/dgraph-io/dgo/v250/protos/api.v2"
"github.com/dgraph-io/ristretto/v2/z"
"github.com/hypermodeinc/dgraph/v25/worker"

"github.com/golang/glog"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -48,10 +45,12 @@ func Import(ctx context.Context, endpoint string, opts grpc.DialOption, bulkOutD
}

// startPDirStream initiates a snapshot stream session with the Dgraph server.
func startPDirStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.InitiatePDirStreamResponse, error) {
func startPDirStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.UpdateExtSnapshotStreamingStateResponse, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initiateSnapshotStream, we can also get rid of this function

glog.Info("Initiating pdir stream")
req := &apiv2.InitiatePDirStreamRequest{}
resp, err := dc.InitiatePDirStream(ctx, req)
req := &apiv2.UpdateExtSnapshotStreamingStateRequest{
Start: true,
}
resp, err := dc.UpdateExtSnapshotStreamingState(ctx, req)
if err != nil {
glog.Errorf("failed to initiate pdir stream: %v", err)
return nil, fmt.Errorf("failed to initiate pdir stream: %v", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Expand All @@ -63,7 +62,7 @@ func startPDirStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.Initiat
// sendPDir takes a p directory and a set of group IDs and streams the data from the
// p directory to the corresponding group IDs. It first scans the provided directory for
// subdirectories named with numeric group IDs.
func sendPDir(ctx context.Context, dg apiv2.DgraphClient, baseDir string, groups []uint32) error {
func sendPDir(ctx context.Context, dc apiv2.DgraphClient, baseDir string, groups []uint32) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamSnapshot

glog.Infof("Starting to stream pdir from directory: %s", baseDir)

errG, ctx := errgroup.WithContext(ctx)
Expand All @@ -74,31 +73,43 @@ func sendPDir(ctx context.Context, dg apiv2.DgraphClient, baseDir string, groups
if _, err := os.Stat(pDir); err != nil {
return fmt.Errorf("p directory does not exist for group [%d]: [%s]", group, pDir)
}

glog.Infof("Streaming data for group [%d] from directory: [%s]", group, pDir)
if err := streamData(ctx, dg, pDir, group); err != nil {
if err := streamData(ctx, dc, pDir, group); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamDataForGroup | streamSnapshotForGroup

glog.Errorf("Failed to stream data for groups [%v] from directory: [%s]: %v", group, pDir, err)
return err
}

return nil
})
}
if err := errG.Wait(); err != nil {
return err
if err1 := errG.Wait(); err1 != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err1 --> err

// If the p directory doesn't exist for this group, it indicates that
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If errors occurs during streaming of the external snapshot, we drop all the data and go back to ensure a clean slate and the cluster remains in working state.

// streaming might be in progress to other groups. We disable drain mode
// to prevent interference and drop any streamed data to ensure a clean state.
req := &apiv2.UpdateExtSnapshotStreamingStateRequest{
Start: false,
Finish: true,
DropData: true,
}
if _, err := dc.UpdateExtSnapshotStreamingState(context.Background(), req); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should pass the context received in this function as argument

return fmt.Errorf("failed to stream data :%v failed to off drain mode: %v", err1, err)
}

glog.Info("successfully disabled drain mode")
return err1
}

glog.Infof("Completed streaming all pdirs")
glog.Info("Completed streaming all pdirs")
return nil
}

// streamData handles the actual data streaming process for a single group.
// It opens the BadgerDB at the specified directory and streams all data to the server.
func streamData(ctx context.Context, dg apiv2.DgraphClient, pdir string, groupId uint32) error {
func streamData(ctx context.Context, dc apiv2.DgraphClient, pdir string, groupId uint32) error {
glog.Infof("Opening stream for group %d from directory %s", groupId, pdir)

// Initialize stream with the server
out, err := dg.StreamPDir(ctx)
out, err := dc.StreamExtSnapshot(ctx)
if err != nil {
return fmt.Errorf("failed to start pdir stream for group %d: %w", groupId, err)
}
Expand All @@ -118,41 +129,23 @@ func streamData(ctx context.Context, dg apiv2.DgraphClient, pdir string, groupId

// Send group ID as the first message in the stream
glog.Infof("Sending group ID [%d] to server", groupId)
groupReq := &apiv2.StreamPDirRequest{GroupId: groupId}
groupReq := &apiv2.StreamExtSnapshotRequest{GroupId: groupId}
if err := out.Send(groupReq); err != nil {
return fmt.Errorf("failed to send group ID [%d]: %w", groupId, err)
}

// Configure and start the BadgerDB stream
glog.Infof("Starting BadgerDB stream for group [%d]", groupId)
stream := ps.NewStreamAt(math.MaxUint64)
stream.LogPrefix = fmt.Sprintf("Sending P dir for group [%d]", groupId)
stream.KeyToList = nil
stream.Send = func(buf *z.Buffer) error {
p := &apiv2.StreamPacket{Data: buf.Bytes()}
if err := out.Send(&apiv2.StreamPDirRequest{StreamPacket: p}); err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("failed to send data chunk: %w", err)
}
return nil
}

// Execute the stream process
if err := stream.Orchestrate(ctx); err != nil {
return fmt.Errorf("stream orchestration failed for group [%d]: %w", groupId, err)
}

// Send the final 'done' signal to mark completion
glog.Infof("Sending completion signal for group [%d]", groupId)
done := &apiv2.StreamPacket{Done: true}
// if err := RunBadgerStream(ctx, ps, out, groupId); err != nil {
// return fmt.Errorf("badger stream failed for group [%d]: %w", groupId, err)
// }

if err := out.Send(&apiv2.StreamPDirRequest{StreamPacket: done}); err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("failed to send 'done' signal for group [%d]: %w", groupId, err)
if err := worker.RunBadgerStream(ctx, ps, out, groupId); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move the function 'RunBadgerStream` back into this package

return fmt.Errorf("badger stream failed for group [%d]: %w", groupId, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

badger stream -> badger streaming

}
// Wait for acknowledgment from the server
if _, err := out.CloseAndRecv(); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to close the steam even if there is an error before this line

return fmt.Errorf("failed to receive ACK for group [%d]: %w", groupId, err)
}
glog.Infof("Group [%d]: Received ACK ", groupId)

return nil
}
Loading
Loading