Skip to content

Commit 8543c17

Browse files
authored
Add the message offset to record like message_key (#1210)
* fix kafka dataset OOM fix kafka dataset OOM problem, when use kafka dataset in online learning, the evaluation will create session and create dataset many times ,then that's problem will be every clearly to appear! * fix kafka dataset OOM fix kafka dataset OOM problem, when use kafka dataset in online learning, the evaluation will create session and create dataset many times ,then that's problem will be every clearly to appear! * test commit * fix lint error * fix lint error * fix lint error * fix kafka dataset OOM problem, when use kafka dataset in online learning, the evaluation will create session and create dataset many times ,then that's problem will be every clearly to appear! * Add the message offset to record like message_key (#1209) * Add the message offset to record like message_key (#1209) * Add the message offset to record like message_key (#1209) * fix test bug * fix output tenor string andling method * fix lint error * opt string generate * fix lint error * adjust the code for string operation * fix lint error * fix lint error
1 parent a49dca2 commit 8543c17

File tree

5 files changed

+93
-15
lines changed

5 files changed

+93
-15
lines changed

tensorflow_io/core/kernels/kafka_kernels_deprecated.cc

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,13 @@ class KafkaDatasetOp : public DatasetOpKernel {
8383
OP_REQUIRES_OK(
8484
ctx, data::ParseScalarArgument<bool>(ctx, "message_key", &message_key));
8585

86+
bool message_offset = false;
87+
OP_REQUIRES_OK(
88+
ctx, ParseScalarArgument<bool>(ctx, "message_offset", &message_offset));
89+
8690
*output = new Dataset(ctx, std::move(topics), servers, group, eof, timeout,
8791
std::move(config_global), std::move(config_topic),
88-
message_key);
92+
message_key, message_offset);
8993
}
9094

9195
private:
@@ -94,7 +98,8 @@ class KafkaDatasetOp : public DatasetOpKernel {
9498
Dataset(OpKernelContext* ctx, std::vector<string> topics,
9599
const string& servers, const string& group, const bool eof,
96100
const int64 timeout, std::vector<string> config_global,
97-
std::vector<string> config_topic, const bool message_key)
101+
std::vector<string> config_topic, const bool message_key,
102+
const bool message_offset)
98103
: DatasetBase(DatasetContext(ctx)),
99104
topics_(std::move(topics)),
100105
servers_(servers),
@@ -103,7 +108,8 @@ class KafkaDatasetOp : public DatasetOpKernel {
103108
timeout_(timeout),
104109
config_global_(std::move(config_global)),
105110
config_topic_(std::move(config_topic)),
106-
message_key_(message_key) {}
111+
message_key_(message_key),
112+
message_offset_(message_offset) {}
107113

108114
std::unique_ptr<IteratorBase> MakeIteratorInternal(
109115
const string& prefix) const override {
@@ -112,20 +118,28 @@ class KafkaDatasetOp : public DatasetOpKernel {
112118
}
113119

114120
const DataTypeVector& output_dtypes() const override {
115-
if (message_key_) {
121+
if (message_key_ ^ message_offset_) {
116122
static DataTypeVector* dtypes =
117123
new DataTypeVector({DT_STRING, DT_STRING});
118124
return *dtypes;
125+
} else if (message_key_ && message_offset_) {
126+
static DataTypeVector* dtypes =
127+
new DataTypeVector({DT_STRING, DT_STRING, DT_STRING});
128+
return *dtypes;
119129
}
120130
static DataTypeVector* dtypes = new DataTypeVector({DT_STRING});
121131
return *dtypes;
122132
}
123133

124134
const std::vector<PartialTensorShape>& output_shapes() const override {
125-
if (message_key_) {
135+
if (message_key_ ^ message_offset_) {
126136
static std::vector<PartialTensorShape>* shapes =
127137
new std::vector<PartialTensorShape>({{}, {}});
128138
return *shapes;
139+
} else if (message_key_ && message_offset_) {
140+
static std::vector<PartialTensorShape>* shapes =
141+
new std::vector<PartialTensorShape>({{}, {}, {}});
142+
return *shapes;
129143
}
130144
static std::vector<PartialTensorShape>* shapes =
131145
new std::vector<PartialTensorShape>({{}});
@@ -156,10 +170,12 @@ class KafkaDatasetOp : public DatasetOpKernel {
156170
TF_RETURN_IF_ERROR(b->AddVector(config_topic_, &config_topic));
157171
Node* message_key = nullptr;
158172
TF_RETURN_IF_ERROR(b->AddScalar(message_key_, &message_key));
173+
Node* message_offset = nullptr;
174+
TF_RETURN_IF_ERROR(b->AddScalar(message_offset_, &message_offset));
159175
TF_RETURN_IF_ERROR(
160176
b->AddDataset(this,
161177
{topics, servers, group, eof, timeout, config_global,
162-
config_topic, message_key},
178+
config_topic, message_key, message_offset},
163179
output));
164180
return Status::OK();
165181
}
@@ -242,6 +258,15 @@ class KafkaDatasetOp : public DatasetOpKernel {
242258
}
243259
out_tensors->emplace_back(std::move(key_tensor));
244260
}
261+
if (dataset()->message_offset_) {
262+
Tensor offset_tensor(cpu_allocator(), DT_STRING, {});
263+
int64_t offset = message->offset();
264+
int32_t partition = message->partition();
265+
string offset_string =
266+
std::to_string(partition) + ":" + std::to_string(offset);
267+
offset_tensor.scalar<tstring>()() = offset_string;
268+
out_tensors->emplace_back(std::move(offset_tensor));
269+
}
245270
*end_of_sequence = false;
246271
// Sync offset
247272
offset_ = message->offset();
@@ -508,6 +533,7 @@ class KafkaDatasetOp : public DatasetOpKernel {
508533
const std::vector<string> config_global_;
509534
const std::vector<string> config_topic_;
510535
const bool message_key_;
536+
const bool message_offset_;
511537
};
512538
};
513539

tensorflow_io/core/ops/kafka_ops_deprecated.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ REGISTER_OP("IO>KafkaDataset")
2828
.Input("config_global: string")
2929
.Input("config_topic: string")
3030
.Input("message_key: bool")
31+
.Input("message_offset: bool")
3132
.Output("handle: variant")
3233
.SetIsStateful()
3334
.SetShapeFn(shape_inference::ScalarShape)

tensorflow_io/kafka/python/ops/kafka_dataset_ops.py

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def __init__(
4747
config_global=None,
4848
config_topic=None,
4949
message_key=False,
50+
message_offset=False,
5051
):
5152
"""Create a KafkaReader.
5253
@@ -76,6 +77,8 @@ def __init__(
7677
please refer to 'Topic configuration properties'
7778
in librdkafka doc.
7879
message_key: If True, the kafka will output both message value and key.
80+
message_offset: If True, the kafka will output both message value and offset,
81+
the offset info like 'partition-index:offset'.
7982
"""
8083
self._topics = tf.convert_to_tensor(topics, dtype=dtypes.string, name="topics")
8184
self._servers = tf.convert_to_tensor(
@@ -95,6 +98,7 @@ def __init__(
9598
config_topic, dtype=dtypes.string, name="config_topic"
9699
)
97100
self._message_key = message_key
101+
self._message_offset = message_offset
98102
super().__init__()
99103

100104
def _inputs(self):
@@ -110,25 +114,39 @@ def _as_variant_tensor(self):
110114
self._config_global,
111115
self._config_topic,
112116
self._message_key,
117+
self._message_offset,
113118
)
114119

115120
@property
116121
def output_classes(self):
117-
return (tf.Tensor) if not self._message_key else (tf.Tensor, tf.Tensor)
122+
if self._message_key ^ self._message_offset:
123+
return (tf.Tensor, tf.Tensor)
124+
elif self._message_key and self._message_offset:
125+
return (tf.Tensor, tf.Tensor, tf.Tensor)
126+
else:
127+
return tf.Tensor
118128

119129
@property
120130
def output_shapes(self):
121-
return (
122-
(tf.TensorShape([]))
123-
if not self._message_key
124-
else (tf.TensorShape([]), tf.TensorShape([]))
125-
)
131+
if self._message_key ^ self._message_offset:
132+
return (tf.TensorShape([]), tf.TensorShape([]))
133+
elif self._message_key and self._message_offset:
134+
return (
135+
tf.TensorShape([]),
136+
tf.TensorShape([]),
137+
tf.TensorShape([]),
138+
)
139+
else:
140+
return tf.TensorShape([])
126141

127142
@property
128143
def output_types(self):
129-
return (
130-
(dtypes.string) if not self._message_key else (dtypes.string, dtypes.string)
131-
)
144+
if self._message_key ^ self._message_offset:
145+
return (dtypes.string, dtypes.string)
146+
elif self._message_key and self._message_offset:
147+
return (dtypes.string, dtypes.string, dtypes.string)
148+
else:
149+
return dtypes.string
132150

133151

134152
def write_kafka(message, topic, servers="localhost", name=None):

tests/test_kafka.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,33 @@ def test_kafka_dataset_with_partitioned_key(self):
479479
sess.run(get_next),
480480
)
481481

482+
def test_kafka_dataset_with_offset(self):
483+
"""Tests for KafkaDataset when reading non-keyed messages
484+
from a single-partitioned topic"""
485+
topics = tf.compat.v1.placeholder(dtypes.string, shape=[None])
486+
num_epochs = tf.compat.v1.placeholder(dtypes.int64, shape=[])
487+
batch_size = tf.compat.v1.placeholder(dtypes.int64, shape=[])
488+
489+
repeat_dataset = kafka_io.KafkaDataset(
490+
topics, group="test", eof=True, message_offset=True
491+
).repeat(num_epochs)
492+
batch_dataset = repeat_dataset.batch(batch_size)
493+
494+
iterator = data.Iterator.from_structure(batch_dataset.output_types)
495+
init_op = iterator.make_initializer(repeat_dataset)
496+
get_next = iterator.get_next()
497+
498+
with self.cached_session() as sess:
499+
# Basic offset test: read a limited number of messages from the topic.
500+
sess.run(init_op, feed_dict={topics: ["offset-test:0:0:4"], num_epochs: 1})
501+
for i in range(5):
502+
self.assertEqual(
503+
(("D" + str(i)).encode(), ("0:" + str(i)).encode()),
504+
sess.run(get_next),
505+
)
506+
with self.assertRaises(errors.OutOfRangeError):
507+
sess.run(get_next)
508+
482509

483510
if __name__ == "__main__":
484511
test.main()

tests/test_kafka/kafka_test.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ echo -e "D0\nD1\nD2\nD3\nD4\nD5\nD6\nD7\nD8\nD9" > confluent-$VERSION/test
3636
echo -e "K0:D0\nK1:D1\nK0:D2\nK1:D3\nK0:D4\nK1:D5\nK0:D6\nK1:D7\nK0:D8\nK1:D9" > confluent-$VERSION/key-test
3737
echo -e "K0:D0\nK1:D1\nK0:D2\nK1:D3\nK0:D4\nK1:D5\nK0:D6\nK1:D7\nK0:D8\nK1:D9" > confluent-$VERSION/key-partition-test
3838
echo -e "0:0\n1:1\n0:2\n1:3\n0:4\n1:5\n0:6\n1:7\n0:8\n1:9" > confluent-$VERSION/mini-batch-test
39+
echo -e "D0\nD1\nD2\nD3\nD4\nD5\nD6\nD7\nD8\nD9" > confluent-$VERSION/offset-test
3940
echo "Waiting for 30 secs until schema registry is ready and other services are up and running"
4041
sleep 30
4142

@@ -55,6 +56,11 @@ echo "Creating and populating 'mini-batch-test' multi-partition topic with sampl
5556
sudo confluent-$VERSION/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic mini-batch-test
5657
sudo confluent-$VERSION/bin/kafka-console-producer --topic mini-batch-test --property "parse.key=true" --property "key.separator=:" --broker-list 127.0.0.1:9092 < confluent-$VERSION/mini-batch-test
5758

59+
echo "Creating and populating 'offset-test' topic with sample non-keyed messages"
60+
sudo confluent-$VERSION/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic offset-test
61+
sudo confluent-$VERSION/bin/kafka-console-producer --topic offset-test --broker-list 127.0.0.1:9092 < confluent-$VERSION/offset-test
62+
63+
5864
echo "Creating and populating 'avro-test' topic with sample messages."
5965
sudo confluent-$VERSION/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic avro-test
6066
echo -e "{\"f1\":\"value1\",\"f2\":1,\"f3\":null}\n{\"f1\":\"value2\",\"f2\":2,\"f3\":{\"string\":\"2\"}}\n{\"f1\":\"value3\",\"f2\":3,\"f3\":null}" > confluent-$VERSION/avro-test

0 commit comments

Comments
 (0)