Reading from Kafka in Java

To read records from a Kafka cluster using the consumer API in Java, you will need to follow these steps (written by OpenAI ChatGPT).

Add the following dependency to your pom.xml file.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.7.0</version>
</dependency>

Import the necessary Kafka classes in your Java file.

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

Create a KafkaConsumer object by setting the necessary configuration properties.

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

Subscribe to the topic you want to read from.

String topic = "my-topic";
consumer.subscribe(Collections.singletonList(topic));

Poll for new records using the poll() method of the KafkaConsumer object.

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // process the record
    }
}

Close the consumer when you are finished.

consumer.close();

That’s it! That’s how you read records from a Kafka cluster using the consumer API in Java.

Leave a reply

Your email address will not be published. Required fields are marked *