From af127b712485753558d22921d6ee84804fcb00d7 Mon Sep 17 00:00:00 2001 From: Ian Abreu Date: Mon, 21 May 2018 12:28:53 -0400 Subject: [PATCH 1/5] adding kafka logging --- intelmq/bots/outputs/kafka/README.md | 13 +++++ intelmq/bots/outputs/kafka/REQUIREMENTS.txt | 2 + intelmq/bots/outputs/kafka/__init__.py | 0 intelmq/bots/outputs/kafka/output.py | 55 +++++++++++++++++++++ 4 files changed, 70 insertions(+) create mode 100644 intelmq/bots/outputs/kafka/README.md create mode 100644 intelmq/bots/outputs/kafka/REQUIREMENTS.txt create mode 100644 intelmq/bots/outputs/kafka/__init__.py create mode 100644 intelmq/bots/outputs/kafka/output.py diff --git a/intelmq/bots/outputs/kafka/README.md b/intelmq/bots/outputs/kafka/README.md new file mode 100644 index 0000000000..6dd0f1fc1a --- /dev/null +++ b/intelmq/bots/outputs/kafka/README.md @@ -0,0 +1,13 @@ +# Kafka Output Bot + +### Output Bot that sends events to kafka + + +Bot parameters: + +* kafka_broker_list : comma seperated list of kafka brokers. defaults to 127.0.0.1 +* kafka_topic : Index for the ElasticSearch output, defaults to intelmq +* flatten_fields : In ES, some query and aggregations work better if the fields are flat and not JSON. Here you can provide a list of fields to convert. + Can be a list of strings (fieldnames) or a string with field names separated by a comma (,). eg `extra,field2` or `['extra', 'field2']` + Default: ['extra'] + diff --git a/intelmq/bots/outputs/kafka/REQUIREMENTS.txt b/intelmq/bots/outputs/kafka/REQUIREMENTS.txt new file mode 100644 index 0000000000..f31c0ffc4a --- /dev/null +++ b/intelmq/bots/outputs/kafka/REQUIREMENTS.txt @@ -0,0 +1,2 @@ +pykafka + diff --git a/intelmq/bots/outputs/kafka/__init__.py b/intelmq/bots/outputs/kafka/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/intelmq/bots/outputs/kafka/output.py b/intelmq/bots/outputs/kafka/output.py new file mode 100644 index 0000000000..0cb894aa32 --- /dev/null +++ b/intelmq/bots/outputs/kafka/output.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- + +from json import loads +from json import dumps +from collections.abc import Mapping + +try: + from confluent_kafka import Producer +except ImportError: + Producer = None + +from intelmq.lib.bot import Bot + +class KafkaOutputBot(Bot): + + def init(self): + if Producer is None: + raise ValueError('Missing confluent-kafka-python module.') + + self.broker_list = getattr(self.parameters, + 'kafka_broker_list', '127.0.0.1:9092') + self.kafka_topic = getattr(self.parameters, + 'kafka_topic', 'intelmq') + self.flatten_fields = getattr(self.parameters, + 'flatten_fields', ['extra']) + if isinstance(self.flatten_fields, str): + self.flatten_fields = self.flatten_fields.split(',') + + self.kafka = Producer({'bootstrap.servers': self.broker_list}) + + def process(self): + event = self.receive_message() + event_dict = event.to_dict(hierarchical=False) + + for field in self.flatten_fields: + if field in event_dict: + val = event_dict[field] + # if it's a string try to parse it as JSON + if isinstance(val, str): + try: + val = loads(val) + except ValueError: + pass + + self.kafka.produce(self.kafka_topic, dumps(event_dict).encode('utf-8'), callback=self.delivery_report) + self.acknowledge_message() + self.kafka.flush() + + def delivery_report(self, err, msg): + """ Called once for each message produced to flag for failure. + Triggered by poll() or flush().""" + if err is not None: + self.logger.exception('Message delivery failed: {}'.format(err)) + +BOT = KafkaOutputBot From 5506ec4a810df62bd4a6118f57b99f5fdbf6a3a6 Mon Sep 17 00:00:00 2001 From: Ian Abreu Date: Mon, 21 May 2018 12:35:30 -0400 Subject: [PATCH 2/5] adding Confluent requirements --- intelmq/bots/outputs/kafka/REQUIREMENTS.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/intelmq/bots/outputs/kafka/REQUIREMENTS.txt b/intelmq/bots/outputs/kafka/REQUIREMENTS.txt index f31c0ffc4a..dabcfc5c9f 100644 --- a/intelmq/bots/outputs/kafka/REQUIREMENTS.txt +++ b/intelmq/bots/outputs/kafka/REQUIREMENTS.txt @@ -1,2 +1 @@ -pykafka - +confluent-kafka==0.11.4 From 2ebd55d9aca29644a476d9b364fc20edfce17de9 Mon Sep 17 00:00:00 2001 From: Ian Abreu Date: Mon, 4 Jun 2018 14:26:24 -0400 Subject: [PATCH 3/5] making some fixes --- intelmq/bots/BOTS | 9 +++ intelmq/bots/outputs/kafka/README.md | 24 +++++-- intelmq/bots/outputs/kafka/output.py | 94 +++++++++++++++++++++++++--- 3 files changed, 113 insertions(+), 14 deletions(-) diff --git a/intelmq/bots/BOTS b/intelmq/bots/BOTS index 3323545fb4..06a7964cc3 100644 --- a/intelmq/bots/BOTS +++ b/intelmq/bots/BOTS @@ -623,6 +623,15 @@ "replacement_char": "_" } }, + "Kafka": { + "description": "Kafka is the bot responsible to send events to a kafka topic.", + "module": "intelmq.bots.outputs.kafka.output", + "parameters":{ + "kafka_broker_list": "127.0.0.1", + "kafka_topic": "intelmq", + "flatten_fields": "extra" + } + }, "File": { "description": "File is the bot responsible to send events to a file.", "module": "intelmq.bots.outputs.file.output", diff --git a/intelmq/bots/outputs/kafka/README.md b/intelmq/bots/outputs/kafka/README.md index 6dd0f1fc1a..b0f1a17456 100644 --- a/intelmq/bots/outputs/kafka/README.md +++ b/intelmq/bots/outputs/kafka/README.md @@ -5,9 +5,25 @@ Bot parameters: -* kafka_broker_list : comma seperated list of kafka brokers. defaults to 127.0.0.1 -* kafka_topic : Index for the ElasticSearch output, defaults to intelmq -* flatten_fields : In ES, some query and aggregations work better if the fields are flat and not JSON. Here you can provide a list of fields to convert. - Can be a list of strings (fieldnames) or a string with field names separated by a comma (,). eg `extra,field2` or `['extra', 'field2']` +Kafka Producer +* kafka_broker_list : comma seperated list of kafka brokers. defaults to 127.0.0.1 + +* kafka_topic : Index for the ElasticSearch output, defaults to intelmq + +* flatten_fields : In ES, some query and aggregations work better if the fields are flat and not JSON. + Here you can provide a list of fields to convert. Can be a list of strings (fieldnames) + or a string with field names separated by a comma (,). eg `extra,field2` or `['extra', 'field2']` Default: ['extra'] +AVRO Producer +* avro_topic_schema : a file path, pointing to a file containing a dict object containing expected values, and their destination topics. + the keys in this dict should be all expected values in 'avro_topic_field' + + IMPORTANT:schema must contain "other" keyword, with either None, or a topic name as the value. + If None is declared, other values are dropped. + +* avro_topic_field : The field to be used to map intelligence identy to destination topic + +* avro_value_schema_file: The schema file + +* avro_schema_registry: URL where the schema registry is defined. diff --git a/intelmq/bots/outputs/kafka/output.py b/intelmq/bots/outputs/kafka/output.py index 0cb894aa32..b36c9576d8 100644 --- a/intelmq/bots/outputs/kafka/output.py +++ b/intelmq/bots/outputs/kafka/output.py @@ -2,31 +2,83 @@ from json import loads from json import dumps +from json import load from collections.abc import Mapping try: from confluent_kafka import Producer except ImportError: Producer = None +try: + from confluent_kafka import avro +except ImportError: + avro = None + +if avro is not None: + from confluent_kafka.avro import AvroProducer from intelmq.lib.bot import Bot +from intelmq.lib.utils import load_configuration + +def replace_keys(obj, key_char='.', replacement='_'): + if isinstance(obj, Mapping): + replacement_obj = {} + for key, val in obj.items(): + replacement_key = key.replace(key_char, replacement) + replacement_obj[replacement_key] = replace_keys(val, key_char, replacement) + return replacement_obj + return obj class KafkaOutputBot(Bot): def init(self): if Producer is None: raise ValueError('Missing confluent-kafka-python module.') + if avro is None: + raise ValueError('Missing python3-avro module') + + self.broker_list = getattr( + self.parameters, 'kafka_broker_list', '127.0.0.1:9092') + self.kafka_topic = getattr( + self.parameters, 'kafka_topic', 'intelmq') + self.flatten_fields = getattr( + self.parameters, 'flatten_fields', ['extra']) + self.enable_avro = getattr( + self.parameters, 'enable_avro', False) + + # Fields below are to define an output schema via AVRO + if self.enable_avro is True: + + self.avro_value_schema = avro.loads( + dumps(load_configuration(self.parameters.avro_value_schema_file))) + self.avro_key_schema = avro.loads( + dumps(load_configuration(self.parameters.avro_key_schema_file))) + + self.avro_topic_schema = load_configuration(self.parameters.avro_topic_schema) + + self.avro_topic_field = getattr( + self.parameters, 'avro_topic_field', None) + self.avro_schema_registry = getattr( + self.parameters, 'avro_schema_registry', None) + + # Build a list of producers for each destination topic + self.producer = AvroProducer( + { + 'bootstrap.servers': self.broker_list, + 'schema.registry.url': self.avro_schema_registry + }, + default_key_schema=self.avro_key_schema, + default_value_schema = self.avro_value_schema) + else: + self.producer = Producer( + { + 'bootstrap.servers': self.broker_list + }) + - self.broker_list = getattr(self.parameters, - 'kafka_broker_list', '127.0.0.1:9092') - self.kafka_topic = getattr(self.parameters, - 'kafka_topic', 'intelmq') - self.flatten_fields = getattr(self.parameters, - 'flatten_fields', ['extra']) if isinstance(self.flatten_fields, str): self.flatten_fields = self.flatten_fields.split(',') - self.kafka = Producer({'bootstrap.servers': self.broker_list}) def process(self): event = self.receive_message() @@ -41,10 +93,31 @@ def process(self): val = loads(val) except ValueError: pass + if isinstance(val, Mapping): + for key, value in val.items(): + event_dict[field + '.' + key] = value + event_dict.pop(field) + + event_dict = replace_keys(event_dict) - self.kafka.produce(self.kafka_topic, dumps(event_dict).encode('utf-8'), callback=self.delivery_report) - self.acknowledge_message() - self.kafka.flush() + if self.enable_avro is False: + self.producer.produce(self.kafka_topic, dumps(event_dict).encode('utf-8'), callback=self.delivery_report) + self.acknowledge_message() + self.kafka.flush() + else: + try: + key_field = str(event_dict[self.avro_topic_field]).replace('.','_') + except KeyError: + self.logger.debug('Event %s has no field %s, dropping.', format(event_dict),format(self.avro_topic_field)) + self.acknowledge_message() + return + + submit_key = {'indicator':event_dict[key_field]} + event_topic = self.avro_topic_schema[key_field] + self.logger.debug('Shipped %s to topic: %s', format(submit_key),format(event_topic)) + self.producer.produce(topic=event_topic, value=event_dict, key=submit_key) + self.acknowledge_message() + self.producer.poll(0) def delivery_report(self, err, msg): """ Called once for each message produced to flag for failure. @@ -52,4 +125,5 @@ def delivery_report(self, err, msg): if err is not None: self.logger.exception('Message delivery failed: {}'.format(err)) + BOT = KafkaOutputBot From fca0525005037beaeb6d8c023dc24786d685afae Mon Sep 17 00:00:00 2001 From: Ian Abreu Date: Mon, 4 Jun 2018 16:33:10 -0400 Subject: [PATCH 4/5] fixing up codestyle --- intelmq/bots/outputs/kafka/output.py | 59 +++++++++++++--------------- 1 file changed, 28 insertions(+), 31 deletions(-) diff --git a/intelmq/bots/outputs/kafka/output.py b/intelmq/bots/outputs/kafka/output.py index b36c9576d8..642f50911c 100644 --- a/intelmq/bots/outputs/kafka/output.py +++ b/intelmq/bots/outputs/kafka/output.py @@ -4,7 +4,8 @@ from json import dumps from json import load from collections.abc import Mapping - +from intelmq.lib.utils import load_configuration +from intelmq.lib.bot import Bot try: from confluent_kafka import Producer except ImportError: @@ -17,8 +18,6 @@ if avro is not None: from confluent_kafka.avro import AvroProducer -from intelmq.lib.bot import Bot -from intelmq.lib.utils import load_configuration def replace_keys(obj, key_char='.', replacement='_'): if isinstance(obj, Mapping): @@ -29,6 +28,7 @@ def replace_keys(obj, key_char='.', replacement='_'): return replacement_obj return obj + class KafkaOutputBot(Bot): def init(self): @@ -37,14 +37,14 @@ def init(self): if avro is None: raise ValueError('Missing python3-avro module') - self.broker_list = getattr( - self.parameters, 'kafka_broker_list', '127.0.0.1:9092') - self.kafka_topic = getattr( - self.parameters, 'kafka_topic', 'intelmq') - self.flatten_fields = getattr( - self.parameters, 'flatten_fields', ['extra']) - self.enable_avro = getattr( - self.parameters, 'enable_avro', False) + self.broker_list = getattr(self.parameters, + 'kafka_broker_list', '127.0.0.1:9092') + self.kafka_topic = getattr(self.parameters, + 'kafka_topic', 'intelmq') + self.flatten_fields = getattr(self.parameters, + 'flatten_fields', ['extra']) + self.enable_avro = getattr(self.parameters, + 'enable_avro', False) # Fields below are to define an output schema via AVRO if self.enable_avro is True: @@ -60,26 +60,23 @@ def init(self): self.parameters, 'avro_topic_field', None) self.avro_schema_registry = getattr( self.parameters, 'avro_schema_registry', None) - - # Build a list of producers for each destination topic - self.producer = AvroProducer( - { - 'bootstrap.servers': self.broker_list, - 'schema.registry.url': self.avro_schema_registry - }, - default_key_schema=self.avro_key_schema, - default_value_schema = self.avro_value_schema) - else: - self.producer = Producer( - { - 'bootstrap.servers': self.broker_list - }) + # Build a list of producers for each destination topic + self.producer = AvroProducer({ + 'bootstrap.servers': self.broker_list, + 'schema.registry.url': self.avro_schema_registry + }, + default_key_schema=self.avro_key_schema, + default_value_schema=self.avro_value_schema + ) + else: + self.producer = Producer({ + 'bootstrap.servers': self.broker_list + }) if isinstance(self.flatten_fields, str): self.flatten_fields = self.flatten_fields.split(',') - def process(self): event = self.receive_message() event_dict = event.to_dict(hierarchical=False) @@ -106,15 +103,15 @@ def process(self): self.kafka.flush() else: try: - key_field = str(event_dict[self.avro_topic_field]).replace('.','_') + key_field = str(event_dict[self.avro_topic_field]).replace('.', '_') except KeyError: - self.logger.debug('Event %s has no field %s, dropping.', format(event_dict),format(self.avro_topic_field)) + self.logger.debug('Event %s has no field %s, dropping.', format(event_dict), format(self.avro_topic_field)) self.acknowledge_message() return - - submit_key = {'indicator':event_dict[key_field]} + + submit_key = {'indicator': event_dict[key_field]} event_topic = self.avro_topic_schema[key_field] - self.logger.debug('Shipped %s to topic: %s', format(submit_key),format(event_topic)) + self.logger.debug('Shipped %s to topic: %s', format(submit_key), format(event_topic)) self.producer.produce(topic=event_topic, value=event_dict, key=submit_key) self.acknowledge_message() self.producer.poll(0) From 221a980ebd38ec8b16b1e8a40a376a76ad3e249e Mon Sep 17 00:00:00 2001 From: Ian Abreu Date: Mon, 4 Jun 2018 16:48:27 -0400 Subject: [PATCH 5/5] fixing linting --- intelmq/bots/BOTS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/intelmq/bots/BOTS b/intelmq/bots/BOTS index 06a7964cc3..86ece48c13 100644 --- a/intelmq/bots/BOTS +++ b/intelmq/bots/BOTS @@ -626,7 +626,7 @@ "Kafka": { "description": "Kafka is the bot responsible to send events to a kafka topic.", "module": "intelmq.bots.outputs.kafka.output", - "parameters":{ + "parameters": { "kafka_broker_list": "127.0.0.1", "kafka_topic": "intelmq", "flatten_fields": "extra"