For a project, I'm trying to log the basic transactions of the user such as addition and removal of an item and for multiple types of items and sending a message to kafka for each transaction. The accuracy of the log mechanism is not crucial and I don't want it to block my business code in the case of kafka server downtime. In this case an async approach for sending data to kafka is a better way to go.
My kafka producer code is in its boot project. For making it async, I just have to add two annotations: @EnableAsync and @Async.
@EnableAsync will be used in your configuration class (also remember that your class with @SpringBootApplication is also a config class) and will try to find a TaskExecutor bean. If not, it creates a SimpleAsyncTaskExecutor. SimpleAsyncTaskExecutor is ok for toy projects but for anything larger than that it's a bit risky since it does not limit concurrent threads and does not reuse threads. So to be safe, we will also add a task executor bean.
So,
will become
As you can see there's not much change here. The default values I set should be tweaked based on your app's needs.
The second thing we need is addition of @Async.
My old code was:
As you can see the sync code is quite straightforward. It just takes the kafkaTemplate and sends a message object to the "logs" topic.
My new code is a bit longer than that.
Here onSuccess() is not really meaningful for me. But onFailure() I can log the exception so I'm informed if there's a problem with my kafka server.
There's another thing I have to share with you. For sending an object through kafkatemplate, I have to equip it with the serializer file I have.
Also, don't forget to add the configuration for it. There are several ways of defining serializers for kafka.
One of the easiest ways is adding it to application.properties.
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=com.sezinkarli.kafkautils.serializer.KafkaInfoSerializer
Now you have a boot project that can send async objects to the desired topic.
My kafka producer code is in its boot project. For making it async, I just have to add two annotations: @EnableAsync and @Async.
@EnableAsync will be used in your configuration class (also remember that your class with @SpringBootApplication is also a config class) and will try to find a TaskExecutor bean. If not, it creates a SimpleAsyncTaskExecutor. SimpleAsyncTaskExecutor is ok for toy projects but for anything larger than that it's a bit risky since it does not limit concurrent threads and does not reuse threads. So to be safe, we will also add a task executor bean.
So,
1 | @SpringBootApplication |
2 | public class KafkaUtilsApplication { |
3 | public static void main(String[] args) { |
4 | SpringApplication.run(KafkaUtilsApplication. class , args); |
5 | } |
6 | } |
01 | @EnableAsync |
02 | @SpringBootApplication |
03 | public class KafkaUtilsApplication { |
04 | public static void main(String[] args) { |
05 | SpringApplication.run(KafkaUtilsApplication. class , args); |
06 | } |
07 |
08 | @Bean |
09 | public Executor taskExecutor() { |
10 | ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); |
11 | executor.setCorePoolSize( 2 ); |
12 | executor.setMaxPoolSize( 2 ); |
13 | executor.setQueueCapacity( 500 ); |
14 | executor.setThreadNamePrefix( "KafkaMsgExecutor-" ); |
15 | executor.initialize(); |
16 | return executor; |
17 | } |
18 | } |
The second thing we need is addition of @Async.
My old code was:
01 | @Service |
02 | public class KafkaProducerServiceImpl implements KafkaProducerService { |
03 |
04 | private static final String TOPIC = "logs" ; |
05 |
06 | @Autowired |
07 | private KafkaTemplate<String, KafkaInfo> kafkaTemplate; |
08 |
09 | @Override |
10 | public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) { |
11 | kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus); |
12 | } |
13 | } |
01 | @Service |
02 | public class KafkaProducerServiceImpl implements KafkaProducerService { |
03 |
04 | private static final String TOPIC = "logs" ; |
05 |
06 | @Autowired |
07 | private KafkaTemplate<string kafkainfo= "" > kafkaTemplate; |
08 |
09 | @Async |
10 | @Override |
11 | public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) { |
12 | ListenableFuture<SendResult<String, KafkaInfo>> future = kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus)); |
13 | future.addCallback( new ListenableFutureCallback<>() { |
14 | @Override |
15 | public void onSuccess( final SendResult<String, KafkaInfo> message) { |
16 | // left empty intentionally |
17 | } |
18 |
19 | @Override |
20 | public void onFailure( final Throwable throwable) { |
21 | // left empty intentionally |
22 |
23 | } |
24 | }); |
25 | } |
26 | } |
27 | </string> |
There's another thing I have to share with you. For sending an object through kafkatemplate, I have to equip it with the serializer file I have.
01 | public class KafkaInfoSerializer implements Serializer<kafkainfo> { |
02 |
03 | @Override |
04 | public void configure(Map<string> map, boolean b) { |
05 | } |
06 |
07 | @Override |
08 | public byte [] serialize(String arg0, KafkaInfo info) { |
09 | byte [] retVal = null ; |
10 | ObjectMapper objectMapper = new ObjectMapper(); |
11 | try { |
12 | retVal = objectMapper.writeValueAsString(info).getBytes(); |
13 | } catch (Exception e) { |
14 | // log the exception |
15 | } |
16 | return retVal; |
17 | } |
18 |
19 | @Override |
20 | public void close() { |
21 | } |
22 | } |
23 | </string> |
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=com.sezinkarli.kafkautils.serializer.KafkaInfoSerializer
Now you have a boot project that can send async objects to the desired topic.