Skip to main content

Integrating Kafka with Node.js

Integrating Kafka with Node.js

Apache Kafka is a popular open-source distributed event streaming platform that uses publish & subscribe mechanism to stream the records(data).

Kafka Terminologies

  1. Distributed system: Distributed system is a computing environment where various software components located on different machines (over multiple locations). All components coordinate together to get stuff done as one unit.  
  2. Kafka Broker: Brokers are cluster of multiple servers. Message of each topic are split among the various brokers. Brokers handle all requests from clients to write and read events. A Kafka cluster is simply a collection of one or more Kafka brokers.
  3. Topics: A topic is a stream of "related" messages. Its unique throughout application. Kafka producers write messages to topics.
  4. Producer: Producer publishes data on the topics. A producer sends a message to a broker and the broker receives and stores messages.
  5. Consumers: Consumers read data from topics. A consumer connects to the broker, and requests the messages available on the stream.

  1. Consumer Group: In Kafka, a group of consumers identify themselves (using a configuration property) as belonging to the same group. In a consumer group, multiple consumers read from the same topic, but each consumer reads from exclusive partitions.
  2. Topic Partitions: This split of message streams is generally referred to as "partitioning". Topics in Kafka are partitioned, which is when we break a topic into multiple log files that can live on separate Kafka brokers. 
  3. ZooKeeper is a centralized service that helps you coordinate and manage distributed applications.      


          Note: Single topic can be subscribed by multiple consumers. 

           Install KafkaJS using npm
           npm install kafkajs

          Setup KafkaJS client
          Create a file called kafka.js:
            const { Kafka } = require('kafkajs');
            const kafka = new Kafka({
                  clientId: 'app1',
                  brokers: ['kafka1:9096', 'kafka2:9096'],
            });
            module.exports = kafka
    
        Create a producer using our client:
        const kafka = require('./kafka');
        const producer = kafka.producer();
        await producer.connect();
        await producer.send({
              topic: 'test-topic',
              messages: [
                    { value: 'Hello Apache Kafka!' },
              ],
        })

        await producer.disconnect()

        Create a consumer to consume our message:
        const kafka = require('./kafka')
        const consumer = kafka.consumer({ groupId: 'consumerGroup' })
        await consumer.connect()
        await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })

        await consumer.run({
              eachMessage: async ({ topic, partition, message }) => {
                          console.log({
                                  value: message.value.toString(),
                          })
               },
        })

        Mutilple Topic Subscribe to Consumer Group
        const topicGroup1 = ["topic1","topic2"];
        const consumer1 = kafka.consumer({groupId: 'consumer1', fromBegining:true});

        const funConsumer1 = async () => {
    await consumer1.connect();
    await topicGroup1.forEach((topic) => {
                 consumer1.subscribe({topic:topic });
    });
    // we can also use 
    //await consumer1.subscribe({ topics: topicGroup1 })

    await funConsumer1.run({
autoCommit: false,
eachMessage: async (task) => {
console.log(task);
await funConsumer1.commitOffsets([{topic: task.topic, offset :                                                                                                     (Number(task.message.offset) +1).toString()}])
}
    });
        }
        funConsumer1().catch((error) => {
              console.error('Error running the consumer:', error);
        });


        Error Handling

        consumer1.on('consumer.crash', async (payload) => {
            try {
                  consumer1.disconnect();
            } catch(error) {
console.log(error);
    } finally {
setTimeout( async () => {
funConsumer1().catch((error) => {
  console.error('Error running the consumer while crash:', error);
});
}, 5000);
    }
         });

Comments

  1. This article clearly explains the importance of targeted advertising. Paid campaigns can significantly boost ROI when executed strategically. At Styl – Digital Marketing Solution, we design cost-effective PPC campaigns that maximize conversions and business growth. Very informative post!

    Styl Digital Marketing

    ReplyDelete

Post a Comment

Popular posts from this blog

Generate XML file in Cakephp

Steps to Generate XML file using CakePHP: Step-1 Enable to parse xml extension in config route.php file.     Router::parseExtensions('xml'); Step-2 Add Request Handler Component to the Controller    var $components = array(‘RequestHandler’); Step-3 Add controller Action For XML Generation in Post Controller     function generateXMLFile()     {         if ($this->RequestHandler->isXml()) { // check request type             $this->layout = 'empty'; // create an empty layout in app/views/layouts/empty.ctp              }        }  Add header code in empty layout <?php header('Content-type: text/xml');?> <?php echo $this->Xml->header(); ?> <?php echo $content_for_layout; ?> Step-4 Set up View To generate XML Create xml folder inside Posts vi...

How To Create Shortcodes In WordPress

We can create own shortcode by using its predified hooks add_shortcode( 'hello-world', 'techsudhir_hello_world_shortcode' ); 1. Write the Shortcode Function Write a function with a unique name, which will execute the code you’d like the shortcode to trigger: function techsudhir_hello_world_shortcode() {    return 'Hello world!'; } Example: [hello-world] If we were to use this function normally, it would return Hello world! as a string 2. Shortcode function with parameters function techsudhir_hello_world_shortcode( $atts ) {    $a = shortcode_atts( array(       'name' => 'world'    ), $atts );    return 'Hello ' . $a['name'] . !'; } Example: [hello-world name="Sudhir"] You can also call shortcode function in PHP using do_shortcode function Example: do_shortcode('[hello-world]');

How to replace plain URLs with links

Here we will explain how to replace Urls with links from string Using PHP $string ='Rajiv Uttamchandani is an astrophysicist, human rights activist, and entrepreneur. Academy, a nonprofit organization dedicated to providing a robust technology-centered education program for refugee and displaced youth around the world.  CNN Interview - https://www.youtube.com/watch?v=EtTwGke6Jtg   CNN Interview - https://www.youtube.com/watch?v=g7pRTAppsCc&feature=youtu.be'; $string = preg_replace('@(https?://([-\w\.]+)+(:\d+)?(/([\w/_\.%-=#]*(\?\S+)?)?)?)@', '<a href="$1">$1</a>', $string); Using Javascript <script> function linkify(inputText) {     var replacedText, replacePattern1, replacePattern2, replacePattern3;     //URLs starting with http://, https://, or ftp://     replacePattern1 = /(\b(https?|ftp):\/\/[-A-Z0-9+&@#\/%?=~_|!:,.;]*[-A-Z0-9+&@#\/%=~_|])/gim;     replacedText = inputT...