snippets_py
Integration
New integration from credential
kafka = yepcode.integration.kafka('credential-slug')
consumer = kafka.consumer()
producer = kafka.producer()
New integration from plain authentication data (Consumer)
from kafka import KafkaConsumer
import msgpack
kafka = KafkaConsumer(
client_id='my-app',
bootstrap_servers=['kafka1:9092', 'kafka2:9092']
security_protocol='SASL_SSL',
sasl_mechanism='PLAIN',
sasl_plain_username='my-username',
sasl_plain_password='my-password',
# consume msgpack
value_deserializer=msgpack.unpackb
)
New integration from plain authentication data (Producer)
from kafka import KafkaProducer
import msgpack
kafka = KafkaProducer(
client_id='my-app',
bootstrap_servers=['kafka1:9092', 'kafka2:9092']
security_protocol='SASL_SSL',
sasl_mechanism='PLAIN',
sasl_plain_username='my-username',
sasl_plain_password='my-password',
# encode objects via msgpack
value_serializer=msgpack.dumps
)
Producer
Producer
# Asynchronous by default
kafka.send('test-topic', {'value': 'Hello Kafka user!'})
# block until all async messages are sent
kafka.flush()
Consumer
Consumer
for message in kafka:
print(message.value)