commit 178168261d961e4347c7d7eb21af8e333e6477ff Author: Alain Zscheile Date: Sat Dec 3 02:20:52 2022 +0100 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f53a38b --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/config.py +__pycache__ +result +result-* diff --git a/config_template.py b/config_template.py new file mode 100644 index 0000000..cacaf76 --- /dev/null +++ b/config_template.py @@ -0,0 +1,9 @@ +CONFIG = { + 'sasl_mechanism': 'PLAIN', + 'security_protocol': 'SASL_SSL', + 'sasl_plain_username': '$NAME', + 'sasl_plain_password': password, + 'bootstrap_servers': 'kafka.ytrizja.de:9092' +} + +GROUP_ID = 'chat-$NAME' diff --git a/consume.py b/consume.py new file mode 100644 index 0000000..bcbc60e --- /dev/null +++ b/consume.py @@ -0,0 +1,9 @@ +from kafka import KafkaConsumer +from config import CONFIG, GROUP_ID + +consumer = KafkaConsumer('chat', group_id=GROUP_ID, auto_offset_reset='earliest', enable_auto_commit=True, **CONFIG) +try: + for message in consumer: + print("%d:%d: %s := %s" % (message.partition, message.offset, message.key.decode(), message.value.decode())) +except KeyboardInterrupt: + pass diff --git a/produce.py b/produce.py new file mode 100644 index 0000000..46fa4f8 --- /dev/null +++ b/produce.py @@ -0,0 +1,15 @@ +import sys +from kafka import KafkaProducer +from config import CONFIG + +producer = KafkaProducer(compression_type='zstd', **CONFIG) +topic = 'chat' + +key = input('Key: ').encode() + +for line in sys.stdin: + line = line.rstrip() + if '.' == line: + break + + producer.send(topic, key=key, value=line.encode()) diff --git a/shell.nix b/shell.nix new file mode 100644 index 0000000..b95fbf9 --- /dev/null +++ b/shell.nix @@ -0,0 +1,13 @@ +{ pkgs ? import {} }: +let + my-python = pkgs.python3; + python-with-my-packages = my-python.withPackages (p: with p; [ + kafka-python + zstandard + ]); +in +pkgs.mkShell { + packages = [ + python-with-my-packages + ]; +}