This is second in a series of articles for developers about working with Kafka. We are covering the practicalities of running Kafka on your development machine, how to write code that uses it, and how to write reliable tests. In the previous article (Kafka for Developers — Part 1) we started a local Kafka broker using Docker containers, and wrote a simple JUnit test to show it was working.
All the example code for these articles is at https://github.com/cyberycon/kafka-howto. This repo consists of numbered directories each of which has a separate project. The code for this article is in folder 02-embedded-kafka. You’ll need Java 8 and Maven installed to build and run the examples. To verify everything is working, run command
mvn package from the 02-embedded-kafka directory. Import the Maven project into the IDE of your choice to work with the code.
In this article we’ll:
- see how Spring Boot minimizes configuration and boiler-plate for using Kafka
- write a test that uses Embedded Kafka for reliability
An Example Application
I’ll be using a simple example application in order to demonstrate everything with working code. The application is a simulation of electricity meters producing regular readings. The readings are read by consumers for different purposes — for example for billing system, or for monitoring overall demand. In this article, we’re only looking at the application that simulates the meters. The application will post a new reading onto a Kafka topic every few seconds.
The application is a simple Spring Boot command line application that emits regular meter readings onto a Kafka topic. We’ll look at consuming these readings in the next article on Topics, Partitions, and Consumers.
Although there is no consumer application yet, we’ll have unit tests for the basic functionality of the meter, and an integration test to verify that the application is posting events in the correct format to a Kafka topic.
The main business logic of this application is in
Meter.java. Invoke the
start() method to start emitting meter readings, and the
stop() method to stop. Each reading is sent to an implementation of
MeterReadingsSender. The diagram below shows the class structure, with the two supplied implementations of
MockReadingSender is used for testing.
Each meter reading is generated by adding a random number to the previous reading (you can’t use a negative amount of electricity so each reading must be more than the previous one).
Meter implements the
Runnable interface, and the readings are generated in a separate thread by the
The main application class
CommandLineRunner and will start the meter running and generating random readings. Each reading event posted onto the queue is a String in the format:
You can override the properties for meter id, area, and interval on the command line. This will be helpful in later articles when we start looking at multiple producers and consumers. The area property is used as the message key, which is something looked at in the next article.
If you compare this example with the one from the previous article, there is no boilerplate code to set up producer properties. Spring Boot has a set of defaults that match the most common uses. You can override any default in your application.properties file with properties that start
spring.kafka. Producer defaults start
spring.kafka.producer and consumer defaults start
There are two test classes:
SmartMeterTests class tests the basic functionality of the
SmartMeter — does it emit readings, do they meet the expected criteria? It uses the
MockReadingSender to record the readings sent by the
SmartMeter class. Nothing is sent to Kafka. The
MockReadingSender asserts that each reading has a higher value than the previous one. When electricity meters run backwards it usually means someone is cheating.
SmartMeterIntegrationTestsclass is an integration test that sets up a Kafka listener to see whether the application is posting readings to Kafka when the
MeterReadingSenderKafkaImpl is the
MeterReadingSender wired into the
It’s hard to write reliable tests that use stateful services like databases or message queues. We can write a test that sends messages to our local Kafka broker, and then add a listener to check that they have been sent, but because Kafka topics are persistent it’s hard to know whether a test passed or failed because of events previously sent to the topic.
Spring Boot provides an Embedded Kafka for testing that is clean every time the Spring context for a test class is created. By adding the
@DirtiesContext annotation at the start of a test class, you can ensure the context is recreated for every test in the class.
The listing below shows the
SmartMeterIntegrationTests class (omitting imports). The
@EmbeddedKafka notation starts up a clean Kafka instance as part of the Spring test context. The
brokerProperties sets a non-standard port for running the Embedded Kafka. In the application-test.properties file, property
spring.kafka.bootstrap-servers=localhost:19092 configures the Spring context to use this address for Kafka producers and consumers.
The test autowires a
TestListener. The test starts the meter running and uses a
CountDownLatch to wait for three messages to be received by the
TestListener, which is listed below.
This class has a single method which is annotated as a
@KafkaListener. Spring Boot wires this up to consume topics from the
meter.reading topic. It asserts the received messages are strings in the format and decrements a
CountDownLatch. The test is reliable because using Embedded Kafka ensures that the Kafka topics always start empty.
In this article we used Spring Boot’s Kafka support to avoid all the boilerplate configuration that is otherwise needed to configure producers and consumers. And we used Embedded Kafka to write a reliable integration test.
In the next article, consumers, topics, and partitions.