Kafka for Developers — Part 2 — Spring Boot and Embedded Kafka
This is second in a series of articles for developers about working with Kafka. We are covering the practicalities of running Kafka on your development machine, how to write code that uses it, and how to write reliable tests. In the previous article (Kafka for Developers — Part 1) we started a local Kafka broker using Docker containers, and wrote a simple JUnit test to show it was working.
All the example code for these articles is at https://github.com/cyberycon/kafka-howto. This repo consists of numbered directories each of which has a separate project. The code for this article is in folder 02-embedded-kafka. You’ll need Java 8 and Maven installed to build and run the examples. To verify everything is working, run command mvn package
from the 02-embedded-kafka directory. Import the Maven project into the IDE of your choice to work with the code.
In this article we’ll:
- see how Spring Boot minimizes configuration and boiler-plate for using Kafka
- write a test that uses Embedded Kafka for reliability
An Example Application
I’ll be using a simple example application in order to demonstrate everything with working code. The application is a simulation of electricity meters producing regular readings. The readings are read by consumers for different purposes — for example for billing system, or for monitoring overall demand. In this article, we’re only looking at the application that simulates the meters. The application will post a new reading onto a Kafka topic every few seconds.
The application is a simple Spring Boot command line application that emits regular meter readings onto a Kafka topic. We’ll look at consuming these readings in the next article on Topics, Partitions, and Consumers.
Although there is no consumer application yet, we’ll have unit tests for the basic functionality of the meter, and an integration test to verify that the application is posting events in the correct format to a Kafka topic.
Application Structure
The main business logic of this application is in Meter.java
. Invoke the start()
method to start emitting meter readings, and the stop()
method to stop. Each reading is sent to an implementation of MeterReadingsSender
. The diagram below shows the class structure, with the two supplied implementations of MeterReadingSender
. MockReadingSender
is used for testing.
Each meter reading is generated by adding a random number to the previous reading (you can’t use a negative amount of electricity so each reading must be more than the previous one). Meter
implements the Runnable
interface, and the readings are generated in a separate thread by the run()
method.
The main application class SmartMeterApplication
implements CommandLineRunner
and will start the meter running and generating random readings. Each reading event posted onto the queue is a String in the format:
meterId:timestamp:meterReading
You can override the properties for meter id, area, and interval on the command line. This will be helpful in later articles when we start looking at multiple producers and consumers. The area property is used as the message key, which is something looked at in the next article.
If you compare this example with the one from the previous article, there is no boilerplate code to set up producer properties. Spring Boot has a set of defaults that match the most common uses. You can override any default in your application.properties file with properties that start spring.kafka
. Producer defaults start spring.kafka.producer
and consumer defaults start spring.kafka.consumer
.
Tests
There are two test classes:
SmartMeterTests
SmartMeterIntegrationTests
The SmartMeterTests
class tests the basic functionality of the SmartMeter
— does it emit readings, do they meet the expected criteria? It uses the MockReadingSender
to record the readings sent by the SmartMeter
class. Nothing is sent to Kafka. The MockReadingSender
asserts that each reading has a higher value than the previous one. When electricity meters run backwards it usually means someone is cheating.
The SmartMeterIntegrationTests
class is an integration test that sets up a Kafka listener to see whether the application is posting readings to Kafka when the MeterReadingSenderKafkaImpl
is the MeterReadingSender
wired into the Meter
class.
It’s hard to write reliable tests that use stateful services like databases or message queues. We can write a test that sends messages to our local Kafka broker, and then add a listener to check that they have been sent, but because Kafka topics are persistent it’s hard to know whether a test passed or failed because of events previously sent to the topic.
Spring Boot provides an Embedded Kafka for testing that is clean every time the Spring context for a test class is created. By adding the @DirtiesContext
annotation at the start of a test class, you can ensure the context is recreated for every test in the class.
The listing below shows the SmartMeterIntegrationTests
class (omitting imports). The@EmbeddedKafka
notation starts up a clean Kafka instance as part of the Spring test context. The brokerProperties
sets a non-standard port for running the Embedded Kafka. In the application-test.properties file, property spring.kafka.bootstrap-servers=localhost:19092
configures the Spring context to use this address for Kafka producers and consumers.
The test autowires a Meter
and TestListener
. The test starts the meter running and uses a CountDownLatch
to wait for three messages to be received by the TestListener
, which is listed below.
This class has a single method which is annotated as a @KafkaListener
. Spring Boot wires this up to consume topics from the meter.reading
topic. It asserts the received messages are strings in the format and decrements a CountDownLatch
. The test is reliable because using Embedded Kafka ensures that the Kafka topics always start empty.
Summary
In this article we used Spring Boot’s Kafka support to avoid all the boilerplate configuration that is otherwise needed to configure producers and consumers. And we used Embedded Kafka to write a reliable integration test.
In the next article, consumers, topics, and partitions.