|
| 1 | +import os |
1 | 2 | import sys
|
2 | 3 | from collections import defaultdict
|
3 | 4 | import configparser
|
|
6 | 7 | import arrow
|
7 | 8 | import msgpack
|
8 | 9 | import statistics
|
9 |
| -from confluent_kafka import Consumer, Producer, TopicPartition |
10 | 10 | import confluent_kafka
|
| 11 | +from confluent_kafka import Consumer, Producer, TopicPartition |
| 12 | +from confluent_kafka.admin import AdminClient, NewTopic |
11 | 13 |
|
12 | 14 | # TODO: how to handle data holes?
|
13 | 15 |
|
@@ -62,17 +64,29 @@ def __init__(self, conf_fname='anomalydetector.conf'):
|
62 | 64 |
|
63 | 65 | # Initialize kafka consumer
|
64 | 66 | self.consumer = Consumer({
|
65 |
| - 'bootstrap.servers': 'kafka1:9092, kafka2:9092, kafka3:9092', |
| 67 | + 'bootstrap.servers': KAFKA_HOST, |
66 | 68 | 'group.id': self.kafka_consumer_group,
|
67 | 69 | 'auto.offset.reset': 'earliest',
|
68 | 70 | })
|
69 | 71 |
|
70 | 72 | self.consumer.subscribe([self.kafka_topic_in])
|
71 | 73 |
|
72 | 74 | # Initialize kafka producer
|
73 |
| - self.producer = Producer({'bootstrap.servers': 'kafka1:9092,kafka2:9092,kafka3:9092', |
| 75 | + self.producer = Producer({'bootstrap.servers': KAFKA_HOST, |
74 | 76 | 'default.topic.config': {'compression.codec': 'snappy'}})
|
75 | 77 |
|
| 78 | + # Create kafka topic |
| 79 | + admin_client = AdminClient({'bootstrap.servers': KAFKA_HOST}) |
| 80 | + |
| 81 | + topic_list = [NewTopic(self.kafka_topic_out, num_partitions=10, replication_factor=1)] |
| 82 | + created_topic = admin_client.create_topics(topic_list) |
| 83 | + for topic, f in created_topic.items(): |
| 84 | + try: |
| 85 | + f.result() # The result itself is None |
| 86 | + logging.warning("Topic {} created".format(topic)) |
| 87 | + except Exception as e: |
| 88 | + logging.warning("Failed to create topic {}: {}".format(topic, e)) |
| 89 | + |
76 | 90 | # Set start time
|
77 | 91 | self.detection_starttime = self.get_current_timestamp()
|
78 | 92 | logging.info('Detection starttime set to: {}'.format(self.detection_starttime))
|
@@ -200,6 +214,13 @@ def report_anomaly(self, timestamp, datapoint, deviation):
|
200 | 214 |
|
201 | 215 |
|
202 | 216 | if __name__ == "__main__":
|
| 217 | + |
| 218 | + global KAFKA_HOST |
| 219 | + if 'KAFKA_HOST' in os.environ: |
| 220 | + KAFKA_HOST = os.environ["KAFKA_HOST"] |
| 221 | + else: |
| 222 | + KAFKA_HOST = 'kafka1:9092,kafka2:9092,kafka3:9092' |
| 223 | + |
203 | 224 | if len(sys.argv)<2:
|
204 | 225 | print("usage: %s config_file" % sys.argv[0])
|
205 | 226 | sys.exit()
|
|
0 commit comments