Skip to content

Commit c602cb8

Browse files
authored
Implement CSVSource (#490)
Implement CSVSource A base CSV source that reads data from a single CSV file.
1 parent 1240e61 commit c602cb8

File tree

6 files changed

+178
-3
lines changed

6 files changed

+178
-3
lines changed

quixstreams/models/topics/topic.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,4 +313,4 @@ def deserialize(self, message: ConfluentKafkaMessageProto):
313313
)
314314

315315
def __repr__(self):
316-
return f'<{self.__class__.__name__} name="{self._name}"> '
316+
return f'<{self.__class__.__name__} name="{self.name}">'

quixstreams/sinks/csv.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ def __init__(
2727
See the ["csv" module docs](https://docs.python.org/3/library/csv.html#csv-fmt-params) for more info.
2828
Default - `"excel"`.
2929
:param key_serializer: a callable to convert keys to strings.
30-
Default - `str()`.
30+
Default - `str`.
3131
:param value_serializer: a callable to convert values to strings.
32-
Default - `json.dumps()`.
32+
Default - `json.dumps`.
3333
"""
3434
super().__init__()
3535
self.path = path

quixstreams/sources/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
from .base import *
22
from .manager import SourceException
33
from .multiprocessing import multiprocessing
4+
from .csv import CSVSource

quixstreams/sources/csv.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import csv
2+
import json
3+
4+
from typing import Optional, Callable, Any
5+
6+
from quixstreams.models.topics import Topic
7+
8+
from .base import Source
9+
10+
11+
class CSVSource(Source):
12+
def __init__(
13+
self,
14+
path: str,
15+
dialect: str = "excel",
16+
name: Optional[str] = None,
17+
shutdown_timeout: float = 10,
18+
key_deserializer: Callable[[Any], str] = str,
19+
value_deserializer: Callable[[Any], str] = json.loads,
20+
) -> None:
21+
"""
22+
A base CSV source that reads data from a single CSV file.
23+
Best used with :class:`quixstreams.sinks.csv.CSVSink`.
24+
25+
Required columns: key, value
26+
Optional columns: timestamp
27+
28+
:param path: path to the CSV file
29+
:param dialect: a CSV dialect to use. It affects quoting and delimiters.
30+
See the ["csv" module docs](https://docs.python.org/3/library/csv.html#csv-fmt-params) for more info.
31+
Default - `"excel"`.
32+
:param key_deseralizer: a callable to convert strings to key.
33+
Default - `str`
34+
:param value_deserializer: a callable to convert strings to value.
35+
Default - `json.loads`
36+
"""
37+
super().__init__(name or path, shutdown_timeout)
38+
self.path = path
39+
self.dialect = dialect
40+
41+
self._key_deserializer = key_deserializer
42+
self._value_deserializer = value_deserializer
43+
44+
def run(self):
45+
key_deserializer = self._key_deserializer
46+
value_deserializer = self._value_deserializer
47+
48+
with open(self.path, "r") as f:
49+
reader = csv.DictReader(f, dialect=self.dialect)
50+
51+
while self.running:
52+
try:
53+
item = next(reader)
54+
except StopIteration:
55+
return
56+
57+
# if a timestamp column exist with no value timestamp is ""
58+
timestamp = item.get("timestamp") or None
59+
if timestamp is not None:
60+
timestamp = int(timestamp)
61+
62+
msg = self.serialize(
63+
key=key_deserializer(item["key"]),
64+
value=value_deserializer(item["value"]),
65+
timestamp_ms=timestamp,
66+
)
67+
68+
self.produce(
69+
key=msg.key,
70+
value=msg.value,
71+
timestamp=msg.timestamp,
72+
headers=msg.headers,
73+
)
74+
75+
def default_topic(self) -> Topic:
76+
return Topic(
77+
name=self.name,
78+
key_serializer="string",
79+
key_deserializer="string",
80+
value_deserializer="json",
81+
value_serializer="json",
82+
)

tests/test_quixstreams/test_models/test_topics/test_topics.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ def __call__(self, value: bytes, ctx: SerializationContext):
4444

4545

4646
class TestTopic:
47+
48+
def test_repr(self, topic_manager_topic_factory):
49+
topic = topic_manager_topic_factory(name="foo")
50+
assert str(topic) == '<Topic name="foo">'
51+
4752
@pytest.mark.parametrize(
4853
"key_deserializer, value_deserializer, key, value, expected_key, expected_value",
4954
[
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import csv
2+
import json
3+
import pytest
4+
5+
from unittest.mock import MagicMock
6+
7+
from quixstreams.sources import CSVSource
8+
from quixstreams.rowproducer import RowProducer
9+
10+
11+
class TestCSVSource:
12+
13+
@pytest.fixture
14+
def producer(self):
15+
producer = MagicMock(spec=RowProducer)
16+
producer.flush.return_value = 0
17+
return producer
18+
19+
def test_read(self, tmp_path, producer):
20+
path = tmp_path / "source.csv"
21+
with open(path, "w") as f:
22+
writer = csv.DictWriter(
23+
f, dialect="excel", fieldnames=("key", "value", "timestamp")
24+
)
25+
writer.writeheader()
26+
writer.writerows(
27+
[
28+
{"key": "key1", "value": json.dumps({"value": "value1"})},
29+
{"key": "key2", "value": json.dumps({"value": "value2"})},
30+
{"key": "key3", "value": json.dumps({"value": "value3"})},
31+
{"key": "key4", "value": json.dumps({"value": "value4"})},
32+
{
33+
"key": "key5",
34+
"value": json.dumps({"value": "value5"}),
35+
"timestamp": 10000,
36+
},
37+
]
38+
)
39+
40+
source = CSVSource(path)
41+
source.configure(source.default_topic(), producer)
42+
source.start()
43+
44+
assert producer.produce.called
45+
assert producer.produce.call_count == 5
46+
assert producer.produce.call_args.kwargs == {
47+
"buffer_error_max_tries": 3,
48+
"headers": None,
49+
"key": b"key5",
50+
"partition": None,
51+
"poll_timeout": 5.0,
52+
"timestamp": 10000,
53+
"topic": path,
54+
"value": b'{"value":"value5"}',
55+
}
56+
57+
def test_read_no_timestamp(self, tmp_path, producer):
58+
path = tmp_path / "source.csv"
59+
with open(path, "w") as f:
60+
writer = csv.DictWriter(f, dialect="excel", fieldnames=("key", "value"))
61+
writer.writeheader()
62+
writer.writerows(
63+
[
64+
{"key": "key1", "value": json.dumps({"value": "value1"})},
65+
{"key": "key2", "value": json.dumps({"value": "value2"})},
66+
{"key": "key3", "value": json.dumps({"value": "value3"})},
67+
{"key": "key4", "value": json.dumps({"value": "value4"})},
68+
{"key": "key5", "value": json.dumps({"value": "value5"})},
69+
]
70+
)
71+
72+
source = CSVSource(path)
73+
source.configure(source.default_topic(), producer)
74+
source.start()
75+
76+
assert producer.produce.called
77+
assert producer.produce.call_count == 5
78+
assert producer.produce.call_args.kwargs == {
79+
"buffer_error_max_tries": 3,
80+
"headers": None,
81+
"key": b"key5",
82+
"partition": None,
83+
"poll_timeout": 5.0,
84+
"timestamp": None,
85+
"topic": path,
86+
"value": b'{"value":"value5"}',
87+
}

0 commit comments

Comments
 (0)