Skip to content

Commit e14ae6a

Browse files
authored
HTTP-121 Ensure SerializationSchema is used in thread-safe way (#127)
1 parent 3f86bf4 commit e14ae6a

File tree

3 files changed

+49
-2
lines changed

3 files changed

+49
-2
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66

77
- Added support for OIDC Bearer tokens.
88

9+
### Fixed
10+
11+
- Ensured SerializationSchema is used in thread-safe way.
12+
913
## [0.15.0] - 2024-07-30
1014

1115
### Added

src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonQueryCreatorFactory.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import com.getindata.connectors.http.LookupQueryCreator;
1616
import com.getindata.connectors.http.LookupQueryCreatorFactory;
1717
import com.getindata.connectors.http.internal.table.lookup.LookupRow;
18+
import com.getindata.connectors.http.internal.utils.SynchronizedSerializationSchema;
19+
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.ASYNC_POLLING;
1820
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_REQUEST_FORMAT;
1921

2022
/**
@@ -47,8 +49,14 @@ public LookupQueryCreator createLookupQueryCreator(
4749
queryFormatAwareConfiguration
4850
);
4951

50-
SerializationSchema<RowData> serializationSchema =
51-
encoder.createRuntimeEncoder(null, lookupRow.getLookupPhysicalRowDataType());
52+
final SerializationSchema<RowData> serializationSchema;
53+
if (readableConfig.get(ASYNC_POLLING)) {
54+
serializationSchema = new SynchronizedSerializationSchema<>(
55+
encoder.createRuntimeEncoder(null, lookupRow.getLookupPhysicalRowDataType()));
56+
} else {
57+
serializationSchema =
58+
encoder.createRuntimeEncoder(null, lookupRow.getLookupPhysicalRowDataType());
59+
}
5260

5361
return new GenericJsonQueryCreator(serializationSchema);
5462
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.getindata.connectors.http.internal.utils;
2+
3+
import org.apache.flink.api.common.serialization.SerializationSchema;
4+
5+
/**
6+
* Decorator which ensures that underlying SerializationSchema is called in thread-safe way.
7+
*
8+
* @param <T> type
9+
*/
10+
public class SynchronizedSerializationSchema<T> implements SerializationSchema<T> {
11+
12+
private final SerializationSchema<T> delegate;
13+
14+
public SynchronizedSerializationSchema(SerializationSchema<T> delegate) {
15+
this.delegate = delegate;
16+
}
17+
18+
@Override
19+
public void open(InitializationContext context) throws Exception {
20+
doOpen(context);
21+
}
22+
23+
private synchronized void doOpen(InitializationContext context) throws Exception {
24+
this.delegate.open(context);
25+
}
26+
27+
@Override
28+
public byte[] serialize(T element) {
29+
return syncSerialize(element);
30+
}
31+
32+
private synchronized byte[] syncSerialize(T element) {
33+
return delegate.serialize(element);
34+
}
35+
}

0 commit comments

Comments
 (0)