Skip to content

Commit f1a1a22

Browse files
authored
Add Python logging with same format as Java (#360)
1 parent 796b3d6 commit f1a1a22

File tree

5 files changed

+47
-35
lines changed

5 files changed

+47
-35
lines changed

langstream-runtime/langstream-runtime-impl/src/main/python/langstream_runtime/__main__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,21 @@
1515
# limitations under the License.
1616
#
1717

18+
import logging
1819
import sys
1920

2021
import yaml
2122

2223
from . import runtime
2324

2425
if __name__ == "__main__":
26+
logging.addLevelName(logging.WARNING, "WARN")
27+
logging.basicConfig(
28+
level=logging.INFO,
29+
format="%(asctime)s.%(msecs)03d [%(threadName)s] %(levelname)-5s %(name).36s -- %(message)s", # noqa: E501
30+
datefmt="%H:%M:%S",
31+
)
32+
2533
print(sys.argv)
2634
if len(sys.argv) != 2:
2735
print("Missing pod configuration file argument")

langstream-runtime/langstream-runtime-impl/src/main/python/langstream_runtime/kafka_connection.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
)
3737
from .topic_connector import TopicConsumer, TopicProducer
3838

39+
LOG = logging.getLogger(__name__)
3940
STRING_DESERIALIZER = StringDeserializer()
4041

4142
SERIALIZERS = {
@@ -102,7 +103,7 @@ def create_dlq_producer(agent_id, streaming_cluster, configuration):
102103
dlq_conf = configuration.get("deadLetterTopicProducer")
103104
if not dlq_conf:
104105
return None
105-
logging.info(
106+
LOG.info(
106107
f"Creating dead-letter topic producer for agent {agent_id} using configuration "
107108
f"{configuration}"
108109
)
@@ -155,16 +156,16 @@ def __init__(self, configs):
155156
self.commit_ever_called = False
156157

157158
def start(self):
158-
self.consumer = Consumer(self.configs)
159-
logging.info(f"Subscribing consumer to {self.topic}")
159+
self.consumer = Consumer(self.configs, logger=LOG)
160+
LOG.info(f"Subscribing consumer to {self.topic}")
160161
self.consumer.subscribe(
161162
[self.topic], on_assign=self.on_assign, on_revoke=self.on_revoke
162163
)
163164

164165
def close(self):
165166
with self.lock:
166167
if self.consumer:
167-
logging.info(
168+
LOG.info(
168169
f"Closing consumer to {self.topic} with {self.pending_commits} "
169170
f"pending commits and {len(self.uncommitted)} uncommitted "
170171
f"offsets: {self.uncommitted} "
@@ -188,9 +189,9 @@ def read(self) -> List[KafkaRecord]:
188189
if message is None:
189190
return []
190191
if message.error():
191-
logging.error(f"Consumer error: {message.error()}")
192+
LOG.error(f"Consumer error: {message.error()}")
192193
return []
193-
logging.debug(
194+
LOG.debug(
194195
f"Received message from Kafka topics {self.consumer.assignment()}:"
195196
f" {message}"
196197
)
@@ -223,7 +224,7 @@ def commit(self, records: List[KafkaRecord]):
223224
if committed_tp_offset.error:
224225
raise KafkaException(committed_tp_offset.error)
225226
current_offset = committed_tp_offset.offset
226-
logging.info(
227+
LOG.info(
227228
f"Current position on partition {topic_partition} is "
228229
f"{current_offset}"
229230
)
@@ -269,34 +270,34 @@ def on_commit(self, error, partitions):
269270
with self.lock:
270271
self.pending_commits -= 1
271272
if error:
272-
logging.error(f"Error committing offsets on topic {self.topic}: {error}")
273+
LOG.error(f"Error committing offsets on topic {self.topic}: {error}")
273274
if not self.commit_failure:
274275
self.commit_failure = KafkaException(error)
275276
else:
276-
logging.debug(f"Offsets committed: {partitions}")
277+
LOG.debug(f"Offsets committed: {partitions}")
277278

278279
def on_assign(self, consumer: Consumer, partitions: List[TopicPartition]):
279280
with self.lock:
280-
logging.info(f"Partitions assigned: {partitions}")
281+
LOG.info(f"Partitions assigned: {partitions}")
281282
for partition in partitions:
282283
offset = consumer.committed([partition])[0].offset
283-
logging.info(f"Last committed offset for {partition} is {offset}")
284+
LOG.info(f"Last committed offset for {partition} is {offset}")
284285
if offset >= 0:
285286
self.committed[partition] = offset
286287

287288
def on_revoke(self, _, partitions: List[TopicPartition]):
288289
with self.lock:
289-
logging.info(f"Partitions revoked: {partitions}")
290+
LOG.info(f"Partitions revoked: {partitions}")
290291
for partition in partitions:
291292
if partition in self.committed:
292293
offset = self.committed.pop(partition)
293-
logging.info(
294+
LOG.info(
294295
f"Current offset {offset} on partition {partition} (revoked)"
295296
)
296297
if partition in self.uncommitted:
297298
offsets = self.uncommitted.pop(partition)
298299
if len(offsets) > 0:
299-
logging.warning(
300+
LOG.warning(
300301
f"There are uncommitted offsets {offsets} on partition "
301302
f"{partition} (revoked), these messages will be "
302303
f"re-delivered"
@@ -330,13 +331,13 @@ def __init__(self, configs):
330331
self.delivery_failure: Optional[Exception] = None
331332

332333
def start(self):
333-
self.producer = Producer(self.configs)
334+
self.producer = Producer(self.configs, logger=LOG)
334335

335336
def write(self, records: List[Record]):
336337
for record in records:
337338
if self.delivery_failure:
338339
raise self.delivery_failure
339-
logging.info(f"Sending record {record}")
340+
LOG.info(f"Sending record {record}")
340341
headers = []
341342
if record.headers():
342343
for key, value in record.headers():

langstream-runtime/langstream-runtime-impl/src/main/python/langstream_runtime/runtime.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
TopicConsumerWithDLQSource,
4545
)
4646

47+
LOG = logging.getLogger(__name__)
48+
4749

4850
def current_time_millis():
4951
return int(time.time_ns() / 1000_000)
@@ -64,7 +66,7 @@ def __init__(self, configuration):
6466

6567
def handle_errors(self, source_record: Record, error) -> ErrorsProcessingOutcome:
6668
self.failures += 1
67-
logging.info(
69+
LOG.info(
6870
f"Handling error {error} for source record {source_record}, "
6971
f"errors count {self.failures} (max retries {self.retries})"
7072
)
@@ -264,7 +266,7 @@ def run_with_server(
264266

265267

266268
def run(configuration, agent=None, agent_info: AgentInfo = AgentInfo(), max_loops=-1):
267-
logging.info(f"Pod Configuration {configuration}")
269+
LOG.info(f"Pod Configuration {configuration}")
268270

269271
if "streamingCluster" not in configuration:
270272
raise ValueError("streamingCluster cannot be null")
@@ -405,7 +407,7 @@ def run_main_loop(
405407
source,
406408
)
407409
except Exception as e:
408-
logging.exception("Error while processing records")
410+
LOG.exception("Error while processing records")
409411
# raise the error
410412
# this way the consumer will not commit the records
411413
raise e
@@ -426,7 +428,7 @@ def run_processor_agent(
426428
trial_number = 0
427429
while len(records_to_process) > 0:
428430
trial_number += 1
429-
logging.info(
431+
LOG.info(
430432
f"run processor on {len(records_to_process)} records "
431433
f"(trial #{trial_number})"
432434
)
@@ -439,26 +441,26 @@ def run_processor_agent(
439441
if isinstance(processor_result, Exception):
440442
action = errors_handler.handle_errors(source_record, processor_result)
441443
if action == ErrorsProcessingOutcome.SKIP:
442-
logging.error(
444+
LOG.error(
443445
f"Unrecoverable error {processor_result} while processing the "
444446
f"records, skipping"
445447
)
446448
results_by_record[source_record] = (source_record, processor_result)
447449
elif action == ErrorsProcessingOutcome.RETRY:
448-
logging.error(
450+
LOG.error(
449451
f"Retryable error {processor_result} while processing the "
450452
f"records, retrying"
451453
)
452454
records_to_process.append(source_record)
453455
elif action == ErrorsProcessingOutcome.FAIL:
454-
logging.error(
456+
LOG.error(
455457
f"Unrecoverable error {processor_result} while processing some "
456458
f"records, failing"
457459
)
458460
# TODO: replace with custom exception ?
459461
source.permanent_failure(source_record, processor_result)
460462
if errors_handler.fail_processing_on_permanent_errors():
461-
logging.error("Failing processing on permanent error")
463+
LOG.error("Failing processing on permanent error")
462464
raise processor_result
463465
# in case the source does not throw an exception we mark the record
464466
# as "skipped"
@@ -496,26 +498,26 @@ def write_records_to_the_sink(
496498
action = errors_handler.handle_errors(source_record, error)
497499
if action == ErrorsProcessingOutcome.SKIP:
498500
# skip (the whole batch)
499-
logging.error(
501+
LOG.error(
500502
f"Unrecoverable error {error} while processing the records, "
501503
f"skipping"
502504
)
503505
source_record_tracker.commit(for_the_sink)
504506
return
505507
elif action == ErrorsProcessingOutcome.RETRY:
506508
# retry (the whole batch)
507-
logging.error(
509+
LOG.error(
508510
f"Retryable error {error} while processing the records, retrying"
509511
)
510512
elif action == ErrorsProcessingOutcome.FAIL:
511-
logging.error(
513+
LOG.error(
512514
f"Unrecoverable error {error} while processing some records, "
513515
f"failing"
514516
)
515517
# TODO: replace with custom exception ?
516518
source.permanent_failure(source_record, error)
517519
if errors_handler.fail_processing_on_permanent_errors():
518-
logging.error("Failing processing on permanent error")
520+
LOG.error("Failing processing on permanent error")
519521
raise error
520522
# in case the source does not throw an exception we mark the record as
521523
# "skipped"
@@ -526,7 +528,7 @@ def write_records_to_the_sink(
526528

527529
class NoopTopicConsumer(TopicConsumer):
528530
def read(self):
529-
logging.info("Sleeping for 1 second, no records...")
531+
LOG.info("Sleeping for 1 second, no records...")
530532
time.sleep(1)
531533
return []
532534

langstream-runtime/langstream-runtime-impl/src/main/python/langstream_runtime/tests/test_runtime.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ def test_simple_agent(_):
5050
agent = runtime.init_agent(config)
5151
agent_info = runtime.AgentInfo()
5252
runtime.run(config, agent=agent, agent_info=agent_info, max_loops=2)
53-
print(json.dumps(agent_info.worker_status(), indent=2))
5453
assert (
5554
json.dumps(agent_info.worker_status(), indent=2)
5655
== """[

langstream-runtime/langstream-runtime-impl/src/main/python/langstream_runtime/topic_connector.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
TopicConsumer,
2727
)
2828

29+
LOG = logging.getLogger(__name__)
30+
2931

3032
class TopicConsumerSource(Source):
3133
def __init__(self, consumer: TopicConsumer):
@@ -38,11 +40,11 @@ def commit(self, records: List[Record]):
3840
self.consumer.commit(records)
3941

4042
def start(self):
41-
logging.info(f"Starting consumer {self.consumer}")
43+
LOG.info(f"Starting consumer {self.consumer}")
4244
self.consumer.start()
4345

4446
def close(self):
45-
logging.info(f"Closing consumer {self.consumer}")
47+
LOG.info(f"Closing consumer {self.consumer}")
4648
self.consumer.close()
4749

4850
def agent_info(self) -> Dict[str, Any]:
@@ -66,7 +68,7 @@ def close(self):
6668
self.dlq_producer.close()
6769

6870
def permanent_failure(self, record: Record, error: Exception):
69-
logging.error(f"Sending record to DLQ: {record}")
71+
LOG.error(f"Sending record to DLQ: {record}")
7072
self.dlq_producer.write([record])
7173

7274

@@ -76,11 +78,11 @@ def __init__(self, producer: TopicProducer):
7678
self.commit_callback = None
7779

7880
def start(self):
79-
logging.info(f"Starting producer {self.producer}")
81+
LOG.info(f"Starting producer {self.producer}")
8082
self.producer.start()
8183

8284
def close(self):
83-
logging.info(f"Closing producer {self.producer}")
85+
LOG.info(f"Closing producer {self.producer}")
8486
self.producer.close()
8587

8688
def write(self, records: List[Record]):

0 commit comments

Comments
 (0)