Skip to content

Commit e51a06b

Browse files
authored
[venice-samza] Enabled pipelineV2 for the d2 client being used in samza producer (#1728)
`pipelineV2` is required in D2 Client to talk H2 to the backend. In the future, we should construct a D2Client from the internal factory, where we can manage all the D2 configs in a centralized fashion.
1 parent a6bc59c commit e51a06b

File tree

1 file changed

+14
-0
lines changed

1 file changed

+14
-0
lines changed

integrations/venice-samza/src/main/java/com/linkedin/venice/samza/VeniceSystemProducer.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
1111
import com.linkedin.d2.balancer.D2Client;
1212
import com.linkedin.d2.balancer.D2ClientBuilder;
13+
import com.linkedin.r2.transport.common.TransportClientFactory;
14+
import com.linkedin.r2.transport.http.client.HttpClientFactory;
1315
import com.linkedin.venice.D2.D2ClientUtils;
1416
import com.linkedin.venice.client.store.ClientConfig;
1517
import com.linkedin.venice.client.store.ClientFactory;
@@ -933,6 +935,16 @@ protected void setControllerClient(D2ControllerClient controllerClient) {
933935
}
934936

935937
private D2Client getStartedD2Client(String d2ZkHost) {
938+
/**
939+
* Create {@link HttpClientFactory} with {@link com.linkedin.r2.transport.http.client.HttpClientFactory#_usePipelineV2} enabled,
940+
* so that it can use the right clients when H2 is enabled.
941+
*
942+
* TODO: leverage the internal factory to create a proper D2 Client, so that it can follow the default global config.
943+
*/
944+
final Map<String, TransportClientFactory> clientFactories = new HashMap<>();
945+
TransportClientFactory transportClientFactory = new HttpClientFactory.Builder().setUsePipelineV2(true).build();
946+
clientFactories.put("http", transportClientFactory);
947+
clientFactories.put("https", transportClientFactory);
936948
D2ClientEnvelope d2ClientEnvelope = d2ZkHostToClientEnvelopeMap.computeIfAbsent(d2ZkHost, zkHost -> {
937949
String fsBasePath = Utils.getUniqueTempPath("d2");
938950
D2Client d2Client = new D2ClientBuilder().setZkHosts(d2ZkHost)
@@ -941,6 +953,8 @@ private D2Client getStartedD2Client(String d2ZkHost) {
941953
.setSSLParameters(sslFactory.map(SSLFactory::getSSLParameters).orElse(null))
942954
.setFsBasePath(fsBasePath)
943955
.setEnableSaveUriDataOnDisk(true)
956+
.setClientFactories(clientFactories)
957+
.setRestOverStream(true)
944958
.build();
945959
D2ClientUtils.startClient(d2Client);
946960
return new D2ClientEnvelope(d2Client, fsBasePath);

0 commit comments

Comments
 (0)