Using Kafka
Features
Kafka is a popular publish-subscribe messaging system. JHipster has an optional support for Kafka, that will:
- Configure Spring Cloud Stream with JHipster.
- Add the necessary configuration in the
application-*.yml
files to have a sampletopic-jhipster
topic, and to have an healthcheck monitor for Kafka (which will be available in thehealth
administration screen). - Generate a Docker Compose configuration file, with the sample
topic-jhipster
topic, so Kafka is usable by simply typingdocker-compose -f src/main/docker/kafka.yml up -d
. - Provide support for Kafka in a microservice environment, when using Docker. The Docker Compose sub-generator will generate a specific Kafka configuration, if one microservice or one gateway uses Kafka. All microservices and gateways will then use that Kafka broker for all their messages. The broker is common for all applications, as it is typically used as a message broker between applications.
Tutorial on using Kafka with Spring Cloud Stream in a JHipster application
Prerequisite
Generate a new application and make sure to select Asynchronous messages using Apache Kafka
when prompted for technologies you would like to use. A Docker Compose configuration file is generated and you can start Kafka with the command:
docker-compose -f src/main/docker/kafka.yml up -d
Model
Create a simple model to represent the messages we would be sending through a Kafka topic.
public class Greeting {
private String message;
public Greeting() {
}
public String getMessage() {
return message;
}
public Greeting setMessage(String message) {
this.message = message;
return this;
}
}
Message Channels
Spring Cloud Stream introduced an abstraction layer called message channels
. Producers sends messages to output channels, and consumers subscribes to input channels for messages. This gives you the flexibility to work with different messaging systems (called binders) without writing a lot of platform-specific code.
Let’s create our output and input channels.
Output channel
public interface ProducerChannel {
String CHANNEL = "messageChannel";
@Output
MessageChannel messageChannel();
}
Input channel
public interface ConsumerChannel {
String CHANNEL = "subscribableChannel";
@Input
SubscribableChannel subscribableChannel();
}
Configuration
We need to tell Spring Cloud Stream about our channels in the configuration class generated by Jhipster.
@EnableBinding(value = {Source.class, ProducerChannel.class, ConsumerChannel.class})
public class MessagingConfiguration {
}
We also need to configure our application to talk to Kafka.
spring:
cloud:
stream:
bindings:
messageChannel:
destination: greetings
content-type: application/json
subscribableChannel:
destination: greetings
This corresponds to:
spring.cloud.stream.bindings.<channelName>.destination.<topic>
Producer and Consumer
Producer Resource
Let’s create a simple REST endpoint that we can invoke to send messages to the Kafka topic, greetings
.
@RestController
@RequestMapping("/api")
public class ProducerResource{
private MessageChannel channel;
public ProducerResource(ProducerChannel channel) {
this.channel = channel.messageChannel();
}
@GetMapping("/greetings/{count}")
@Timed
public void produce(@PathVariable int count) {
while(count > 0) {
channel.send(MessageBuilder.withPayload(new Greeting().setMessage("Hello world!: " + count)).build());
count--;
}
}
}
Consumer Service
We can consume the messages using StreamListener
for message mapping and automatic type conversion.
@Service
public class ConsumerService {
private final Logger log = LoggerFactory.getLogger(ConsumerService.class);
@StreamListener(ConsumerChannel.CHANNEL)
public void consume(Greeting greeting) {
log.info("Received message: {}.", greeting.getMessage());
}
}
Running the app
Allow access to the endpoint in SecurityConfiguration.java
:
.antMatchers("/api/greetings/**").permitAll()
If you invoke the endpoint http://localhost:8080/api/greetings/5
, you should see the messages logged to the console.
You can find the full source code on eosimosu/jhipster-kafka.