Skip to content

Commit d0450e2

Browse files
authored
Add configuration to exclude topic on file path (#1172)
* Add configuration to exclude topic on file path * add docs
1 parent 7ddce6a commit d0450e2

File tree

8 files changed

+119
-13
lines changed

8 files changed

+119
-13
lines changed

docs/aws-s3-sink.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ Before using the AWS S3 sink connector, you need to configure it. This table out
139139
| `timePartitionPattern` | String | False | false | "yyyy-MM-dd" | The format pattern of the time-based partitioning. For details, refer to the Java date and time format. |
140140
| `timePartitionDuration` | String | False | false | "86400000" | The time interval for time-based partitioning. Support formatted interval string, such as `30d`, `24h`, `30m`, `10s`, and also support number in milliseconds precision, such as `86400000` refers to `24h` or `1d`. |
141141
| `pathPrefix` | String | False | false | false | If it is set, the output files are stored in a folder under the given bucket path. The `pathPrefix` must be in the format of `xx/xxx/`. |
142+
| `partitionerWithTopicName` | Boolean | False | false | true | Indicates whether to include the topic name in the file path. Default is true. If not included, the path like: `pathPrefix/24.45.0.json` |
142143
| `partitionerUseIndexAsOffset` | Boolean | False | false | false | Whether to use the Pulsar's message index as offset or the record sequence. It's recommended if the incoming messages may be batched. The brokers may or not expose the index metadata and, if it's not present on the record, the sequence will be used. See [PIP-70](https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata) for more details. |
143144
| `withTopicPartitionNumber` | Boolean | False | false | true | When it is set to `true`, include the topic partition number to the object path. |
144145
| `sliceTopicPartitionPath` | Boolean | False | false | false | When it is set to `true`, split the partitioned topic name into separate folders in the bucket path. |

docs/azure-blob-storage-sink.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ Before using the Azure Blob Storage sink connector, you need to configure it. Th
120120
| `timePartitionPattern` | String | False | false | "yyyy-MM-dd" | The format pattern of the time-based partitioning. For details, refer to the Java date and time format. |
121121
| `timePartitionDuration` | String | False | false | "86400000" | The time interval for time-based partitioning. Support formatted interval string, such as `30d`, `24h`, `30m`, `10s`, and also support number in milliseconds precision, such as `86400000` refers to `24h` or `1d`. |
122122
| `pathPrefix` | String | False | false | false | If it is set, the output files are stored in a folder under the given bucket path. The `pathPrefix` must be in the format of `xx/xxx/`. |
123+
| `partitionerWithTopicName` | Boolean | False | false | true | Indicates whether to include the topic name in the file path. Default is true. If not included, the path like: `pathPrefix/24.45.0.json` |
123124
| `partitionerUseIndexAsOffset` | Boolean | False | false | false | Whether to use the Pulsar's message index as offset or the record sequence. It's recommended if the incoming messages may be batched. The brokers may or not expose the index metadata and, if it's not present on the record, the sequence will be used. See [PIP-70](https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata) for more details. |
124125
| `withTopicPartitionNumber` | Boolean | False | false | true | When it is set to `true`, include the topic partition number to the object path. |
125126
| `sliceTopicPartitionPath` | Boolean | False | false | false | When it is set to `true`, split the partitioned topic name into separate folders in the bucket path. |

docs/google-cloud-storage-sink.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ Before using the Google Cloud Storage sink connector, you need to configure it.
125125
| `timePartitionPattern` | String | False | false | "yyyy-MM-dd" | The format pattern of the time-based partitioning. For details, refer to the Java date and time format. |
126126
| `timePartitionDuration` | String | False | false | "86400000" | The time interval for time-based partitioning. Support formatted interval string, such as `30d`, `24h`, `30m`, `10s`, and also support number in milliseconds precision, such as `86400000` refers to `24h` or `1d`. |
127127
| `pathPrefix` | String | False | false | false | If it is set, the output files are stored in a folder under the given bucket path. The `pathPrefix` must be in the format of `xx/xxx/`. |
128+
| `partitionerWithTopicName` | Boolean | False | false | true | Indicates whether to include the topic name in the file path. Default is true. If not included, the path like: `pathPrefix/24.45.0.json` |
128129
| `partitionerUseIndexAsOffset` | Boolean | False | false | false | Whether to use the Pulsar's message index as offset or the record sequence. It's recommended if the incoming messages may be batched. The brokers may or not expose the index metadata and, if it's not present on the record, the sequence will be used. See [PIP-70](https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata) for more details. |
129130
| `withTopicPartitionNumber` | Boolean | False | false | true | When it is set to `true`, include the topic partition number to the object path. |
130131
| `sliceTopicPartitionPath` | Boolean | False | false | false | When it is set to `true`, split the partitioned topic name into separate folders in the bucket path. |

src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public class BlobStoreAbstractConfig implements Serializable {
8686
private String partitionerType;
8787
private String pathPrefix;
8888
private boolean withTopicPartitionNumber = true;
89+
private boolean partitionerWithTopicName = true;
8990
private boolean partitionerUseIndexAsOffset;
9091
private String timePartitionPattern;
9192
private String timePartitionDuration;

src/main/java/org/apache/pulsar/io/jcloud/partitioner/AbstractPartitioner.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,34 +41,36 @@ public abstract class AbstractPartitioner<T> implements Partitioner<T> {
4141

4242
private boolean sliceTopicPartitionPath;
4343
private boolean withTopicPartitionNumber;
44+
private boolean partitionerWithTopicName;
4445
private boolean useIndexAsOffset;
4546

4647
@Override
4748
public void configure(BlobStoreAbstractConfig config) {
4849
this.sliceTopicPartitionPath = config.isSliceTopicPartitionPath();
4950
this.withTopicPartitionNumber = config.isWithTopicPartitionNumber();
5051
this.useIndexAsOffset = config.isPartitionerUseIndexAsOffset();
52+
this.partitionerWithTopicName = config.isPartitionerWithTopicName();
5153
}
5254

5355
@Override
5456
public String generatePartitionedPath(String topic, String encodedPartition) {
55-
5657
List<String> joinList = new ArrayList<>();
57-
TopicName topicName = TopicName.get(topic);
58-
joinList.add(topicName.getTenant());
59-
joinList.add(topicName.getNamespacePortion());
60-
61-
if (topicName.isPartitioned() && withTopicPartitionNumber) {
62-
if (sliceTopicPartitionPath) {
58+
if (partitionerWithTopicName) {
59+
TopicName topicName = TopicName.get(topic);
60+
joinList.add(topicName.getTenant());
61+
joinList.add(topicName.getNamespacePortion());
62+
if (topicName.isPartitioned() && withTopicPartitionNumber) {
63+
if (sliceTopicPartitionPath) {
64+
TopicName newTopicName = TopicName.get(topicName.getPartitionedTopicName());
65+
joinList.add(newTopicName.getLocalName());
66+
joinList.add(Integer.toString(topicName.getPartitionIndex()));
67+
} else {
68+
joinList.add(topicName.getLocalName());
69+
}
70+
} else {
6371
TopicName newTopicName = TopicName.get(topicName.getPartitionedTopicName());
6472
joinList.add(newTopicName.getLocalName());
65-
joinList.add(Integer.toString(topicName.getPartitionIndex()));
66-
} else {
67-
joinList.add(topicName.getLocalName());
6873
}
69-
} else {
70-
TopicName newTopicName = TopicName.get(topicName.getPartitionedTopicName());
71-
joinList.add(newTopicName.getLocalName());
7274
}
7375
joinList.add(encodedPartition);
7476
return StringUtils.join(joinList, PATH_SEPARATOR);

src/test/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfigTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public void loadBasicConfigTest() throws IOException {
6565
Assert.assertEquals(config.get("timePartitionPattern"), cloudStorageSinkConfig.getTimePartitionPattern());
6666
Assert.assertEquals(config.get("timePartitionDuration"), cloudStorageSinkConfig.getTimePartitionDuration());
6767
Assert.assertEquals(config.get("batchSize"), cloudStorageSinkConfig.getBatchSize());
68+
Assert.assertTrue(cloudStorageSinkConfig.isPartitionerWithTopicName());
6869
Assert.assertEquals(10000000L, cloudStorageSinkConfig.getMaxBatchBytes());
6970
}
7071

@@ -243,6 +244,7 @@ public void byteConfigTest() throws IOException {
243244
config.put("timePartitionDuration", "2d");
244245
config.put("batchSize", 10);
245246
config.put("bytesFormatTypeSeparator", "0x10");
247+
config.put("partitionerWithTopicName", "false");
246248
CloudStorageSinkConfig cloudStorageSinkConfig = CloudStorageSinkConfig.load(config);
247249
cloudStorageSinkConfig.validate();
248250

@@ -258,6 +260,7 @@ public void byteConfigTest() throws IOException {
258260
Assert.assertEquals(config.get("batchSize"), cloudStorageSinkConfig.getBatchSize());
259261
Assert.assertEquals(config.get("bytesFormatTypeSeparator"),
260262
cloudStorageSinkConfig.getBytesFormatTypeSeparator());
263+
Assert.assertFalse(cloudStorageSinkConfig.isPartitionerWithTopicName());
261264
}
262265

263266
@Test

src/test/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,18 @@ public static Object[][] data() {
9090
numberConfig.setTimePartitionPattern("yyyy-MM-dd-HH");
9191
TimePartitioner<Object> numberPartitioner = new TimePartitioner<>();
9292
numberPartitioner.configure(numberConfig);
93+
94+
BlobStoreAbstractConfig withoutTopicNameConfig = new BlobStoreAbstractConfig();
95+
withoutTopicNameConfig.setPartitionerWithTopicName(false);
96+
SimplePartitioner<Object> simpleWithoutTopicNamePartitioner = new SimplePartitioner<>();
97+
simpleWithoutTopicNamePartitioner.configure(withoutTopicNameConfig);
98+
99+
BlobStoreAbstractConfig withoutTopicNameConfig2 = new BlobStoreAbstractConfig();
100+
withoutTopicNameConfig2.setPartitionerWithTopicName(false);
101+
withoutTopicNameConfig2.setTimePartitionDuration("7200000");
102+
withoutTopicNameConfig2.setTimePartitionPattern("yyyy-MM-dd-HH");
103+
TimePartitioner<Object> timeWithoutTopicNamePartitioner = new TimePartitioner<>();
104+
timeWithoutTopicNamePartitioner.configure(withoutTopicNameConfig2);
93105
return new Object[][]{
94106
new Object[]{
95107
simplePartitioner,
@@ -146,6 +158,18 @@ public static Object[][] data() {
146158
+ Partitioner.PATH_SEPARATOR + testMsgIdFileName,
147159
getPartitionedTopic()
148160
},
161+
new Object[]{
162+
simpleWithoutTopicNamePartitioner,
163+
testMsgIdFileName,
164+
testMsgIdFileName,
165+
getTopic()
166+
},
167+
new Object[]{
168+
timeWithoutTopicNamePartitioner,
169+
"2020-09-08-14" + Partitioner.PATH_SEPARATOR + testMsgIdFileName,
170+
"2020-09-08-14" + Partitioner.PATH_SEPARATOR + testMsgIdFileName,
171+
getPartitionedTopic()
172+
},
149173
};
150174
}
151175

0 commit comments

Comments
 (0)