Kafka
Kafka provides a unified, high-throughput, low-latency platform for handling real-time data feeds. Kafka can connect to external systems (for data import/export)
Official Websitehttps://kafka.apache.org/
Tagsqueuekafka
- JavaScript
- Python
Documentationhttps://kafka.js.org/docs/getting-started
NodeJS packagehttps://www.npmjs.com/package/kafkajs
Version2.2.4
Source Codehttps://github.com/tulios/kafkajs
Documentationhttps://kafka-python.readthedocs.io/en/stable/
Pypi packagehttps://pypi.org/project/kafka-python/
Version2.0.2
Source Codehttps://github.com/dpkp/kafka-python
Network Connection needs
This integration needs network access to the server where the service is running.
See the Network access page for details about how to achieve that.
Credential configuration
To configure this credential, you need a Client ID,
one or more Brokers
hosts, and username
and password
if needed for authentification.
Optionally, you can set any of the extra config parameterss you can see here.
Here is an example of a filled credential configuration form in YepCode:
Kafka Snippets available in Editor
note
The title is the triggering text for YepCode to autocomplete the script.
- JavaScript
- Python
Integration
New integration from credential
const kafka = yepcode.integration.kafka('credential-slug')
New integration from plain authentication data
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092'],
ssl: true,
sasl: {
mechanism: 'plain',
username: 'my-username',
password: 'my-password'
}
})
Producer
Producer
const producer = kafka.producer()
await producer.connect()
await producer.send({
topic: 'test-topic',
messages: [
{ value: 'Hello Kafka user!' },
],
})
await producer.disconnect()
Consumer
Consumer
const consumer = kafka.consumer({ groupId: 'test-group' })
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
value: message.value.toString(),
})
},
})
Integration
New integration from credential
kafka = yepcode.integration.kafka('credential-slug')
consumer = kafka.consumer()
producer = kafka.producer()
New integration from plain authentication data (Consumer)
from kafka import KafkaConsumer
import msgpack
kafka = KafkaConsumer(
client_id='my-app',
bootstrap_servers=['kafka1:9092', 'kafka2:9092']
security_protocol='SASL_SSL',
sasl_mechanism='PLAIN',
sasl_plain_username='my-username',
sasl_plain_password='my-password',
# consume msgpack
value_deserializer=msgpack.unpackb
)
New integration from plain authentication data (Producer)
from kafka import KafkaProducer
import msgpack
kafka = KafkaProducer(
client_id='my-app',
bootstrap_servers=['kafka1:9092', 'kafka2:9092']
security_protocol='SASL_SSL',
sasl_mechanism='PLAIN',
sasl_plain_username='my-username',
sasl_plain_password='my-password',
# encode objects via msgpack
value_serializer=msgpack.dumps
)
Producer
Producer
# Asynchronous by default
kafka.send('test-topic', {'value': 'Hello Kafka user!'})
# block until all async messages are sent
kafka.flush()
Consumer
Consumer
for message in kafka:
print(message.value)