Skip to content
This repository was archived by the owner on Mar 24, 2025. It is now read-only.

Commit 33668aa

Browse files
aearlyAlexander Early
andauthored
Support publishing v2j batches in the event publisher sidecar. (#590)
* Support publishing v2j event batches * lint * add test Co-authored-by: Alexander Early <alexander.early@reddit.com>
1 parent 3a04f08 commit 33668aa

File tree

2 files changed

+33
-3
lines changed

2 files changed

+33
-3
lines changed

baseplate/sidecars/event_publisher.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,21 @@ def reset(self) -> None:
8686
self._size = len(self._header) + len(self._end)
8787

8888

89+
class V2JBatch(V2Batch):
90+
# Send a batch as a plain JSON array. Useful when your events are not
91+
# Thrift JSON
92+
_header = "["
93+
_end = b"]"
94+
95+
def serialize(self) -> SerializedBatch:
96+
serialized = self._header.encode() + b",".join(self._items) + self._end
97+
return SerializedBatch(item_count=len(self._items), serialized=serialized)
98+
99+
89100
class BatchPublisher:
90101
def __init__(self, metrics_client: metrics.Client, cfg: Any):
91102
self.metrics = metrics_client
92-
self.url = f"https://{cfg.collector.hostname}/v{cfg.collector.version:d}"
103+
self.url = f"https://{cfg.collector.hostname}/v{cfg.collector.version}"
93104
self.key_name = cfg.key.name
94105
self.key_secret = cfg.key.secret
95106
self.session = requests.Session()
@@ -149,7 +160,7 @@ def publish(self, payload: SerializedBatch) -> None:
149160
raise MaxRetriesError("could not sent batch")
150161

151162

152-
SERIALIZER_BY_VERSION = {2: V2Batch}
163+
SERIALIZER_BY_VERSION = {"2": V2Batch, "2j": V2JBatch}
153164

154165

155166
def publish_events() -> None:
@@ -181,7 +192,7 @@ def publish_events() -> None:
181192
{
182193
"collector": {
183194
"hostname": config.String,
184-
"version": config.Optional(config.Integer, default=1),
195+
"version": config.Optional(config.String, default="2"),
185196
},
186197
"key": {"name": config.String, "secret": config.Base64},
187198
"max_queue_size": config.Optional(config.Integer, MAX_QUEUE_SIZE),

tests/unit/lib/events/publisher_tests.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,25 @@ def test_v2(self):
5656
self.assertEqual(result.item_count, 0)
5757

5858

59+
class V2JBatchTests(unittest.TestCase):
60+
def test_v2j(self):
61+
batch = event_publisher.V2JBatch(max_size=50)
62+
batch.add(None)
63+
batch.add(b'{"source":1}')
64+
batch.add(b'{"source":2}')
65+
66+
result = batch.serialize()
67+
self.assertEqual(result.item_count, 2)
68+
self.assertEqual(result.serialized, b'[{"source":1},{"source":2}]')
69+
70+
with self.assertRaises(event_publisher.BatchFull):
71+
batch.add(b"x" * 100)
72+
73+
batch.reset()
74+
result = batch.serialize()
75+
self.assertEqual(result.item_count, 0)
76+
77+
5978
class PublisherTests(unittest.TestCase):
6079
@mock.patch("requests.Session", autospec=True)
6180
def setUp(self, Session):

0 commit comments

Comments
 (0)