Integrate Kafka with Java

 💡 Introduction

In modern enterprise applications, real-time communication between distributed systems is crucial.
Apache Kafka — the powerful distributed streaming platform — enables high-throughput, fault-tolerant event-driven architectures.

In this blog, we’ll see how to integrate Kafka with both Spring Boot and classic Spring Framework, using practical examples for Producer and Consumer applications.


⚙️ 1️⃣ What Is Apache Kafka?

Apache Kafka is a distributed event streaming platform designed for:

  • Publishing and subscribing to data streams

  • Processing real-time data

  • Connecting systems using reliable message queues

Kafka is often used for:

  • Microservice communication

  • Log aggregation

  • Real-time analytics

  • BPM and workflow triggers (like jBPM event integration)


🧩 2️⃣ Core Concepts

ComponentDescription
ProducerSends (publishes) messages to Kafka topics
ConsumerReads messages from Kafka topics
TopicA category or feed name to which messages are published
BrokerKafka server that stores and serves messages
ZookeeperCoordinates Kafka brokers (not needed in latest Kafka versions)

🏗️ 3️⃣ Kafka Setup (Local or Docker)

🐳 Using Docker

docker run -d --name zookeeper -p 2181:2181 zookeeper docker run -d --name kafka -p 9092:9092 \ -e KAFKA_ZOOKEEPER_CONNECT=host.docker.internal:2181 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \ wurstmeister/kafka

🧾 Create a Topic

kafka-topics.sh --create --topic demo-topic --bootstrap-server localhost:9092

🧠 4️⃣ Kafka Integration with Spring Boot

🧱 Step 1: Add Dependencies

Add these to your pom.xml:

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

📦 Step 2: Application Configuration

🧩 application.yml

spring: kafka: bootstrap-servers: localhost:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: demo-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

💬 Step 3: Create a Producer Service

import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class MessageProducer { private final KafkaTemplate<String, String> kafkaTemplate; private static final String TOPIC = "demo-topic"; public MessageProducer(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void send(String message) { kafkaTemplate.send(TOPIC, message); System.out.println("✅ Sent message: " + message); } }

🎧 Step 4: Create a Consumer Listener

import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class MessageConsumer { @KafkaListener(topics = "demo-topic", groupId = "demo-group") public void consume(String message) { System.out.println("🎯 Consumed message: " + message); } }

🚀 Step 5: REST Controller to Publish Message

import org.springframework.web.bind.annotation.*; @RestController @RequestMapping("/api/kafka") public class KafkaController { private final MessageProducer producer; public KafkaController(MessageProducer producer) { this.producer = producer; } @PostMapping("/publish") public String sendMessage(@RequestParam String message) { producer.send(message); return "✅ Message sent to Kafka topic!"; } }

Test:
Run the app → hit
POST http://localhost:8080/api/kafka/publish?message=HelloKafka
Observe the logs in both Producer and Consumer consoles.


⚙️ 5️⃣ Integration with Classic Spring (Non-Boot)

If you’re using Spring Framework without Boot, define beans manually in your XML or Java configuration.

🧾 kafka-config.xml

<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg> <map> <entry key="bootstrap.servers" value="localhost:9092"/> <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/> </map> </constructor-arg> </bean> <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg ref="producerFactory"/> </bean>

Then inject and use KafkaTemplate the same way as in Spring Boot:

kafkaTemplate.send("demo-topic", "Message from classic Spring!");

🔄 6️⃣ Two-Way Communication (Request-Reply)

Kafka supports request-reply using correlation IDs. Example:

kafkaTemplate.sendAndReceive(new ProducerRecord<>("request-topic", "QueryData"));

Consumers can reply to another topic:

@KafkaListener(topics = "request-topic") public void reply(ConsumerRecord<String, String> record) { kafkaTemplate.send("response-topic", "Processed: " + record.value()); }

🧰 7️⃣ Key Tips for Production

AreaBest Practice
Error HandlingUse SeekToCurrentErrorHandler for retries
🔁 IdempotencyEnable enable.idempotence=true for exactly-once semantics
🔐 SecurityUse SASL/SSL configs for cloud brokers
⚙️ MonitoringIntegrate Prometheus / Micrometer with Spring Actuator
📦 Batch ProcessingUse @KafkaListener(batch = true) for high throughput

📈 8️⃣ Architecture Diagram

+---------------------+ +--------------------+ | Spring Boot App | | Spring Boot App | | (Producer) | | (Consumer) | | /api/kafka/publish | ---> | @KafkaListener | +----------+----------+ +----------+---------+ | | | | v v +---------------------------------+ | Apache Kafka Broker | | (Topic: demo-topic) | +---------------------------------+

🧠 9️⃣ Common Errors

ErrorReasonFix
bootstrap broker disconnectedKafka not runningStart Kafka broker (9092)
TimeoutExceptionWrong broker endpointCheck bootstrap-servers URL
SerializationExceptionWrong serializerMatch key/value serializer types
Consumer rebalance loopNo group.id or topic mismatchAdd group-id in config

👉 Watch “Integrate Apache Kafka with Java” in Action:

🎬 A step-by-step demo video coming soon on YouTube: Learn IT with Shikha


🏁 Conclusion

🎯 By integrating Apache Kafka with Spring Boot and Spring Framework, you can build event-driven microservices that are scalable, reliable, and fast.

This setup helps your apps:

  • Stream real-time data

  • Connect asynchronously

  • Handle millions of events with minimal latency

💬 Kafka + Spring = Reactive, Resilient, and Real-time Architecture 🚀


💼 Professional Support Available

 If you are facing issues in real projects related to enterprise backend development or workflow automation, I provide paid consulting, production debugging, project support, and focused trainings. 

 Technologies covered include Java, Spring Boot, PL/SQL, Azure, and workflow automation (jBPM, Camunda BPM, RHPAM).

📧 Contact: ishikhanirankari@gmail.com | info@realtechnologiesindia.com

🌐 Website: IT Trainings | Digital metal podium    


Comments

Popular posts from this blog

jBPM Installation Guide: Step by Step Setup

Scopes of Signal in jBPM

OOPs Concepts in Java | English | Object Oriented Programming Explained