Tutorials on Apache Camel, Apache Kafka, JBoss Fuse

Search our tutorials

In this tutorial we will learn how to set up a Maven project to run a Kafka Java Consumer and Producer.

This project is composed of the following Classes:

  • SampleKafkaProducer: A standalone Java class which sends messages to a Kafka topic.
  • SampleKafkaConsumer: A standalone Java class which consumes Kafka messages from to a Topic

It is required that you start a single Node Kafka cluster as discussed in this tutorial: Kafka tutorial #2: Getting started with Kafka

Let's start from the SampleKafkaProducer class:

package com.masteringintegration.kafka;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

// Example class sending text Messages to Kafka cluster

public class SampleKafkaProducer extends Thread {
    private final KafkaProducer<Integer, String> producer;
    private final String topic;
    private final Boolean isAsync;

    public static final String KAFKA_SERVER_URL = "localhost";
    public static final int KAFKA_SERVER_PORT = 9092;
    public static final String CLIENT_ID = "SampleProducer";
    public static final String TOPIC = "testTopic";
    public static final int MESSAGES = 100;

    public SampleKafkaProducer(String topic, Boolean isAsync) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
        properties.put("client.id", CLIENT_ID);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer(properties);
        this.topic = topic;
        this.isAsync = isAsync;
    }

    public static void main( String[] args )
    {

        boolean isAsync = false;
        SampleKafkaProducer producer = new SampleKafkaProducer(TOPIC, isAsync);
        // start the producer
        producer.start();
    }

    public void run() {
        int messageNo = 1;
        while (messageNo < MESSAGES) {
            String messageStr = "This is Message number:" + messageNo;
            long startTime = System.currentTimeMillis();
            if (isAsync) { // Send asynchronously
                producer.send(new ProducerRecord<>(topic,
                        messageNo,
                        messageStr), new MessageCallBack(startTime, messageNo, messageStr));
            } else { // Send synchronously
                try {
                    producer.send(new ProducerRecord<>(topic,
                            messageNo,
                            messageStr)).get();
                    System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                    // handle the exception
                }
            }
            ++messageNo;
        }
    }
}

class MessageCallBack implements Callback {

    private final long startTime;
    private final int key;
    private final String message;

    public MessageCallBack(long startTime, int key, String message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;
    }


    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            System.out.println(
                    "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                            "), " +
                            "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
        } else {
            exception.printStackTrace();
        }
    }
}

This class extends Thread so that each message is sent asynchronously to a Topic named "testTopic".

Let's check now the Kafka Consumer class:

package com.masteringintegration.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class SampleKafkaConsumer {

    public static void main(String[] args) {
        String server = "127.0.0.1:9092";
        String groupId = "SampleKafkaConsumer";
        String topic = "testTopic";

        new SampleKafkaConsumer(server, groupId, topic).run();
    }

    // Variables

    private final Logger mLogger = LoggerFactory.getLogger(SampleKafkaConsumer.class.getName());
    private final String mBootstrapServer;
    private final String mGroupId;
    private final String mTopic;

    // Constructor

    SampleKafkaConsumer(String bootstrapServer, String groupId, String topic) {
        mBootstrapServer = bootstrapServer;
        mGroupId = groupId;
        mTopic = topic;
    }

    // Public

    void run() {
        mLogger.info("Creating consumer thread");

        CountDownLatch latch = new CountDownLatch(1);

        ConsumerRunnable consumerRunnable = new ConsumerRunnable(mBootstrapServer, mGroupId, mTopic, latch);
        Thread thread = new Thread(consumerRunnable);
        thread.start();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            mLogger.info("Caught shutdown hook");
            consumerRunnable.shutdown();
            await(latch);

            mLogger.info("Application has exited");
        }));

        await(latch);
    }

    // Private

    void await(CountDownLatch latch) {
        try {
            latch.await();
        } catch (InterruptedException e) {
            mLogger.error("Application got interrupted", e);
        } finally {
            mLogger.info("Application is closing");
        }
    }

    // Inner classes

    private class ConsumerRunnable implements Runnable {

        private CountDownLatch mLatch;
        private KafkaConsumer<String, String> mConsumer;

        ConsumerRunnable(String bootstrapServer, String groupId, String topic, CountDownLatch latch) {
            mLatch = latch;

            Properties props = consumerProps(bootstrapServer, groupId);
            mConsumer = new KafkaConsumer<>(props);
            mConsumer.subscribe(Collections.singletonList(topic));
        }

        @Override
        public void run() {
            try {
                while (true) {
                    ConsumerRecords<String, String> records = mConsumer.poll(Duration.ofMillis(100));

                    for (ConsumerRecord<String, String> record : records) {
                        mLogger.info("Key: " + record.key() + ", Value: " + record.value());
                        mLogger.info("Partition: " + record.partition() + ", Offset: " + record.offset());
                    }
                }
            } catch (WakeupException e) {
                mLogger.info("Received shutdown signal!");
            } finally {
                mConsumer.close();
                mLatch.countDown();
            }
        }

        void shutdown() {
            mConsumer.wakeup();
        }

        private Properties consumerProps(String bootstrapServer, String groupId) {
            String deserializer = StringDeserializer.class.getName();
            Properties properties = new Properties();
            properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
            properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer);
            properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
            properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

            return properties;
        }
    }
}

The consumer class implements Runnable so that messages can be consumed asynchronously as well.

In order to build the project we have included the following pom.xml file:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.masteringintegration.kafka</groupId>
  <artifactId>kafka-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>kafka-demo</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.5.0</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <version>1.7.25</version>
    </dependency>

  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.6.0</version>
        <executions>
          <execution>
            <id>consumer</id>
            <configuration>
              <mainClass>com.masteringintegration.kafka.SampleKafkaConsumer</mainClass>
            </configuration>
          </execution>
          <execution>
            <id>producer</id>
            <configuration>
              <mainClass>com.masteringintegration.kafka.SampleKafkaProducer</mainClass>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

As you can see, we have included two different executions so that you can run both java standalone applications. You can run the Producer as follows:

mvn exec:java@producer

The messages will be sent to the destination:

Sent message: (1, This is Message number:1)
Sent message: (2, This is Message number:2)
Sent message: (3, This is Message number:3)
Sent message: (4, This is Message number:4)
Sent message: (5, This is Message number:5)
Sent message: (6, This is Message number:6)
Sent message: (7, This is Message number:7)
Sent message: (8, This is Message number:8)
Sent message: (9, This is Message number:9)
Sent message: (10, This is Message number:10)
 . . . . .

Now start the Consumer application:

mvn exec:java@consumer

You will see that messages are consumed from the Topic:

[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:1
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24276
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:2
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24277
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:3
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24278
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:4
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24279
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:5
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24280
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:6
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24281
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: This is Message number:7
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24282
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key:, Value: This is Message number:8
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24283
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: 	, Value: This is Message number:9
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24284
[Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: 
, Value: This is Message number:10
 . . . . .

Congratulations! You have just managed to connect to Kafka cluster using a Java Producer and a Consumer. Continue your Kafka learning path with the following tutorial: Kafka Tutorial: Creating a Java Producer and Consumer using Serializers and Deserializers

Source code for this tutorial: https://github.com/fmarchioni/masteringintegration/tree/master/kafka/java

 

FREE WildFly Application Server - JBoss - Quarkus - Drools Tutorials