Skip to content

Commit ca81dbb

Browse files
authored
Merge pull request #283 from ydb-platform/find_export_methods
Added methods to find exisiting operations
2 parents ef49df2 + 1808433 commit ca81dbb

File tree

5 files changed

+93
-10
lines changed

5 files changed

+93
-10
lines changed

export/src/main/java/tech/ydb/export/ExportClient.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import tech.ydb.export.result.ExportToYtResult;
1515
import tech.ydb.export.settings.ExportToS3Settings;
1616
import tech.ydb.export.settings.ExportToYtSettings;
17+
import tech.ydb.export.settings.FindExportSettings;
1718

1819
/**
1920
* @author Kirill Kurdyukov
@@ -24,6 +25,14 @@ static ExportClient newClient(@WillNotClose GrpcTransport transport) {
2425
return new ExportClientImpl(GrpcExportRpcImpl.useTransport(transport));
2526
}
2627

28+
CompletableFuture<Operation<Result<ExportToS3Result>>> findExportToS3(
29+
String operationId, FindExportSettings settings
30+
);
31+
32+
CompletableFuture<Operation<Result<ExportToYtResult>>> findExportToYT(
33+
String operationId, FindExportSettings settings
34+
);
35+
2736
CompletableFuture<Operation<Result<ExportToS3Result>>> startExportToS3(
2837
String endpoint, String bucket, String accessKey, String secretKey, ExportToS3Settings settings
2938
);
@@ -40,7 +49,7 @@ default CompletableFuture<Result<ExportToS3Result>> exportToS3(
4049
.thenCompose(operation -> OperationTray.fetchOperation(operation, updateRateSeconds));
4150
}
4251

43-
default CompletableFuture<Result<ExportToYtResult>> startExportToYt(
52+
default CompletableFuture<Result<ExportToYtResult>> exportToYt(
4453
String host, String token, ExportToYtSettings settings, int updateRateSeconds
4554
) {
4655
return startExportToYt(host, token, settings)

export/src/main/java/tech/ydb/export/ExportRpc.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,14 @@
1212
*/
1313
public interface ExportRpc {
1414

15+
CompletableFuture<Operation<Result<YdbExport.ExportToS3Result>>> findExportToS3(
16+
String operationId, GrpcRequestSettings settings
17+
);
18+
19+
CompletableFuture<Operation<Result<YdbExport.ExportToYtResult>>> findExportToYT(
20+
String operationId, GrpcRequestSettings settings
21+
);
22+
1523
CompletableFuture<Operation<Result<YdbExport.ExportToS3Result>>> exportS3(
1624
YdbExport.ExportToS3Request request, GrpcRequestSettings settings
1725
);

export/src/main/java/tech/ydb/export/impl/ExportClientImpl.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import tech.ydb.export.result.ExportToYtResult;
1414
import tech.ydb.export.settings.ExportToS3Settings;
1515
import tech.ydb.export.settings.ExportToYtSettings;
16+
import tech.ydb.export.settings.FindExportSettings;
1617
import tech.ydb.proto.export.YdbExport;
1718

1819
/**
@@ -26,11 +27,8 @@ public ExportClientImpl(ExportRpc exportRpc) {
2627
this.exportRpc = exportRpc;
2728
}
2829

29-
private String getTraceIdOrGenerateNew(String traceId) {
30-
return traceId == null ? UUID.randomUUID().toString() : traceId;
31-
}
32-
33-
private GrpcRequestSettings makeGrpcRequestSettings(BaseRequestSettings settings, String traceId) {
30+
private GrpcRequestSettings makeGrpcRequestSettings(BaseRequestSettings settings) {
31+
String traceId = settings.getTraceId() == null ? UUID.randomUUID().toString() : settings.getTraceId();
3432
return GrpcRequestSettings.newBuilder()
3533
.withDeadline(settings.getRequestTimeout())
3634
.withTraceId(traceId)
@@ -85,8 +83,7 @@ public CompletableFuture<Operation<Result<ExportToS3Result>>> startExportToS3(
8583
.setOperationParams(Operation.buildParams(settings))
8684
.build();
8785

88-
String traceId = getTraceIdOrGenerateNew(settings.getTraceId());
89-
return exportRpc.exportS3(request, makeGrpcRequestSettings(settings, traceId))
86+
return exportRpc.exportS3(request, makeGrpcRequestSettings(settings))
9087
.thenApply(op -> op.transform(r -> r.map(ExportToS3Result::new)));
9188
}
9289

@@ -128,8 +125,23 @@ public CompletableFuture<Operation<Result<ExportToYtResult>>> startExportToYt(
128125
.setOperationParams(Operation.buildParams(settings))
129126
.build();
130127

131-
String traceId = getTraceIdOrGenerateNew(settings.getTraceId());
132-
return exportRpc.exportYt(request, makeGrpcRequestSettings(settings, traceId))
128+
return exportRpc.exportYt(request, makeGrpcRequestSettings(settings))
129+
.thenApply(op -> op.transform(r -> r.map(ExportToYtResult::new)));
130+
}
131+
132+
@Override
133+
public CompletableFuture<Operation<Result<ExportToS3Result>>> findExportToS3(
134+
String operationId, FindExportSettings settings
135+
) {
136+
return exportRpc.findExportToS3(operationId, makeGrpcRequestSettings(settings))
137+
.thenApply(op -> op.transform(r -> r.map(ExportToS3Result::new)));
138+
}
139+
140+
@Override
141+
public CompletableFuture<Operation<Result<ExportToYtResult>>> findExportToYT(
142+
String operationId, FindExportSettings settings
143+
) {
144+
return exportRpc.findExportToYT(operationId, makeGrpcRequestSettings(settings))
133145
.thenApply(op -> op.transform(r -> r.map(ExportToYtResult::new)));
134146
}
135147
}

export/src/main/java/tech/ydb/export/impl/GrpcExportRpcImpl.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010
import tech.ydb.core.operation.Operation;
1111
import tech.ydb.core.operation.OperationBinder;
1212
import tech.ydb.export.ExportRpc;
13+
import tech.ydb.proto.OperationProtos;
1314
import tech.ydb.proto.export.YdbExport;
1415
import tech.ydb.proto.export.v1.ExportServiceGrpc;
16+
import tech.ydb.proto.operation.v1.OperationServiceGrpc;
1517

1618
/**
1719
* @author Kirill Kurdyukov
@@ -49,4 +51,32 @@ public CompletableFuture<Operation<Result<YdbExport.ExportToYtResult>>> exportYt
4951
)
5052
);
5153
}
54+
55+
@Override
56+
public CompletableFuture<Operation<Result<YdbExport.ExportToS3Result>>> findExportToS3(
57+
String operationId, GrpcRequestSettings settings
58+
) {
59+
OperationProtos.GetOperationRequest request = OperationProtos.GetOperationRequest.newBuilder()
60+
.setId(operationId)
61+
.build();
62+
return transport
63+
.unaryCall(OperationServiceGrpc.getGetOperationMethod(), settings, request)
64+
.thenApply(OperationBinder.bindAsync(
65+
transport, OperationProtos.GetOperationResponse::getOperation, YdbExport.ExportToS3Result.class
66+
));
67+
}
68+
69+
@Override
70+
public CompletableFuture<Operation<Result<YdbExport.ExportToYtResult>>> findExportToYT(
71+
String operationId, GrpcRequestSettings settings
72+
) {
73+
OperationProtos.GetOperationRequest request = OperationProtos.GetOperationRequest.newBuilder()
74+
.setId(operationId)
75+
.build();
76+
return transport
77+
.unaryCall(OperationServiceGrpc.getGetOperationMethod(), settings, request)
78+
.thenApply(OperationBinder.bindAsync(
79+
transport, OperationProtos.GetOperationResponse::getOperation, YdbExport.ExportToYtResult.class
80+
));
81+
}
5282
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package tech.ydb.export.settings;
2+
3+
import tech.ydb.core.settings.OperationSettings;
4+
5+
/**
6+
*
7+
* @author Aleksandr Gorshenin
8+
*/
9+
public class FindExportSettings extends OperationSettings {
10+
private FindExportSettings(Builder builder) {
11+
super(builder);
12+
}
13+
14+
public static Builder newBuilder() {
15+
return new Builder().withAsyncMode(true);
16+
}
17+
18+
public static class Builder extends OperationSettings.OperationBuilder<Builder> {
19+
@Override
20+
public FindExportSettings build() {
21+
return new FindExportSettings(this);
22+
}
23+
}
24+
}

0 commit comments

Comments
 (0)