snippets_js
Integration
New integration from credential
const kafka = yepcode.integration.kafka('credential-slug')
New integration from plain authentication data
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092'],
ssl: true,
sasl: {
mechanism: 'plain',
username: 'my-username',
password: 'my-password'
}
})
Producer
Producer
const producer = kafka.producer()
await producer.connect()
await producer.send({
topic: 'test-topic',
messages: [
{ value: 'Hello Kafka user!' },
],
})
await producer.disconnect()
Consumer
Consumer
const consumer = kafka.consumer({ groupId: 'test-group' })
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
value: message.value.toString(),
})
},
})