Coding Cheatsheets - Learn web development code and tutorials for Software developers which will helps you in project. Get help on JavaScript, PHP, XML, and more.

Post Page Advertisement [Top]

Integrating Kafka with Node.js

Apache Kafka is a popular open-source distributed event streaming platform that uses publish & subscribe mechanism to stream the records(data).

Kafka Terminologies

  1. Distributed system: Distributed system is a computing environment where various software components located on different machines (over multiple locations). All components coordinate together to get stuff done as one unit.  
  2. Kafka Broker: Brokers are cluster of multiple servers. Message of each topic are split among the various brokers. Brokers handle all requests from clients to write and read events. A Kafka cluster is simply a collection of one or more Kafka brokers.
  3. Topics: A topic is a stream of "related" messages. Its unique throughout application. Kafka producers write messages to topics.
  4. Producer: Producer publishes data on the topics. A producer sends a message to a broker and the broker receives and stores messages.
  5. Consumers: Consumers read data from topics. A consumer connects to the broker, and requests the messages available on the stream.

  1. Consumer Group: In Kafka, a group of consumers identify themselves (using a configuration property) as belonging to the same group. In a consumer group, multiple consumers read from the same topic, but each consumer reads from exclusive partitions.
  2. Topic Partitions: This split of message streams is generally referred to as "partitioning". Topics in Kafka are partitioned, which is when we break a topic into multiple log files that can live on separate Kafka brokers. 
  3. ZooKeeper is a centralized service that helps you coordinate and manage distributed applications.      


          Note: Single topic can be subscribed by multiple consumers. 

           Install KafkaJS using npm
           npm install kafkajs

          Setup KafkaJS client
          Create a file called kafka.js:
            const { Kafka } = require('kafkajs');
            const kafka = new Kafka({
                  clientId: 'app1',
                  brokers: ['kafka1:9096', 'kafka2:9096'],
            });
            module.exports = kafka
    
        Create a producer using our client:
        const kafka = require('./kafka');
        const producer = kafka.producer();
        await producer.connect();
        await producer.send({
              topic: 'test-topic',
              messages: [
                    { value: 'Hello Apache Kafka!' },
              ],
        })

        await producer.disconnect()

        Create a consumer to consume our message:
        const kafka = require('./kafka')
        const consumer = kafka.consumer({ groupId: 'consumerGroup' })
        await consumer.connect()
        await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })

        await consumer.run({
              eachMessage: async ({ topic, partition, message }) => {
                          console.log({
                                  value: message.value.toString(),
                          })
               },
        })

        Mutilple Topic Subscribe to Consumer Group
        const topicGroup1 = ["topic1","topic2"];
        const consumer1 = kafka.consumer({groupId: 'consumer1', fromBegining:true});

        const funConsumer1 = async () => {
    await consumer1.connect();
    await topicGroup1.forEach((topic) => {
                 consumer1.subscribe({topic:topic });
    });
    // we can also use 
    //await consumer1.subscribe({ topics: topicGroup1 })

    await funConsumer1.run({
autoCommit: false,
eachMessage: async (task) => {
console.log(task);
await funConsumer1.commitOffsets([{topic: task.topic, offset :                                                                                                     (Number(task.message.offset) +1).toString()}])
}
    });
        }
        funConsumer1().catch((error) => {
              console.error('Error running the consumer:', error);
        });


        Error Handling

        consumer1.on('consumer.crash', async (payload) => {
            try {
                  consumer1.disconnect();
            } catch(error) {
console.log(error);
    } finally {
setTimeout( async () => {
funConsumer1().catch((error) => {
  console.error('Error running the consumer while crash:', error);
});
}, 5000);
    }
         });

No comments:

Post a Comment

Bottom Ad [Post Page]