initial commit
This commit is contained in:
commit
178168261d
4
.gitignore
vendored
Normal file
4
.gitignore
vendored
Normal file
|
@ -0,0 +1,4 @@
|
|||
/config.py
|
||||
__pycache__
|
||||
result
|
||||
result-*
|
9
config_template.py
Normal file
9
config_template.py
Normal file
|
@ -0,0 +1,9 @@
|
|||
CONFIG = {
|
||||
'sasl_mechanism': 'PLAIN',
|
||||
'security_protocol': 'SASL_SSL',
|
||||
'sasl_plain_username': '$NAME',
|
||||
'sasl_plain_password': password,
|
||||
'bootstrap_servers': 'kafka.ytrizja.de:9092'
|
||||
}
|
||||
|
||||
GROUP_ID = 'chat-$NAME'
|
9
consume.py
Normal file
9
consume.py
Normal file
|
@ -0,0 +1,9 @@
|
|||
from kafka import KafkaConsumer
|
||||
from config import CONFIG, GROUP_ID
|
||||
|
||||
consumer = KafkaConsumer('chat', group_id=GROUP_ID, auto_offset_reset='earliest', enable_auto_commit=True, **CONFIG)
|
||||
try:
|
||||
for message in consumer:
|
||||
print("%d:%d: %s := %s" % (message.partition, message.offset, message.key.decode(), message.value.decode()))
|
||||
except KeyboardInterrupt:
|
||||
pass
|
15
produce.py
Normal file
15
produce.py
Normal file
|
@ -0,0 +1,15 @@
|
|||
import sys
|
||||
from kafka import KafkaProducer
|
||||
from config import CONFIG
|
||||
|
||||
producer = KafkaProducer(compression_type='zstd', **CONFIG)
|
||||
topic = 'chat'
|
||||
|
||||
key = input('Key: ').encode()
|
||||
|
||||
for line in sys.stdin:
|
||||
line = line.rstrip()
|
||||
if '.' == line:
|
||||
break
|
||||
|
||||
producer.send(topic, key=key, value=line.encode())
|
Loading…
Reference in a new issue