Skip to main content

snippets_py

Integration

New integration from credential
kafka = yepcode.integration.kafka('credential-slug')

consumer = kafka.consumer()
producer = kafka.producer()
New integration from plain authentication data (Consumer)
from kafka import KafkaConsumer
import msgpack

kafka = KafkaConsumer(
client_id='my-app',
bootstrap_servers=['kafka1:9092', 'kafka2:9092']
security_protocol='SASL_SSL',
sasl_mechanism='PLAIN',
sasl_plain_username='my-username',
sasl_plain_password='my-password',
# consume msgpack
value_deserializer=msgpack.unpackb
)
New integration from plain authentication data (Producer)
from kafka import KafkaProducer
import msgpack

kafka = KafkaProducer(
client_id='my-app',
bootstrap_servers=['kafka1:9092', 'kafka2:9092']
security_protocol='SASL_SSL',
sasl_mechanism='PLAIN',
sasl_plain_username='my-username',
sasl_plain_password='my-password',
# encode objects via msgpack
value_serializer=msgpack.dumps
)

Producer

Producer
# Asynchronous by default
kafka.send('test-topic', {'value': 'Hello Kafka user!'})
# block until all async messages are sent
kafka.flush()

Consumer

Consumer
for message in kafka:
print(message.value)