For a project, I'm trying to log the basic transactions of the user such as addition and removal of an item and for multiple types of items and sending a message to kafka for each transaction. The accuracy of the log mechanism is not crucial and I don't want it to block my business code in the case of kafka server downtime. In this case an async approach for sending data to kafka is a better way to go.
My kafka producer code is in its boot project. For making it async, I just have to add two annotations: @EnableAsync and @Async.
@EnableAsync will be used in your configuration class (also remember that your class with @SpringBootApplication is also a config class) and will try to find a TaskExecutor bean. If not, it creates a SimpleAsyncTaskExecutor. SimpleAsyncTaskExecutor is ok for toy projects but for anything larger than that it's a bit risky since it does not limit concurrent threads and does not reuse threads. So to be safe, we will also add a task executor bean.
So,
The second thing we need is addition of @Async.
My old code was:
There's another thing I have to share with you. For sending an object through kafkatemplate, I have to equip it with the serializer file I have.
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=com.sezinkarli.kafkautils.serializer.KafkaInfoSerializer
Now you have a boot project that can send async objects to the desired topic.
My kafka producer code is in its boot project. For making it async, I just have to add two annotations: @EnableAsync and @Async.
@EnableAsync will be used in your configuration class (also remember that your class with @SpringBootApplication is also a config class) and will try to find a TaskExecutor bean. If not, it creates a SimpleAsyncTaskExecutor. SimpleAsyncTaskExecutor is ok for toy projects but for anything larger than that it's a bit risky since it does not limit concurrent threads and does not reuse threads. So to be safe, we will also add a task executor bean.
So,
@SpringBootApplication public class KafkaUtilsApplication { public static void main(String[] args) { SpringApplication.run(KafkaUtilsApplication.class, args); } }will become
@EnableAsync @SpringBootApplication public class KafkaUtilsApplication { public static void main(String[] args) { SpringApplication.run(KafkaUtilsApplication.class, args); } @Bean public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(2); executor.setQueueCapacity(500); executor.setThreadNamePrefix("KafkaMsgExecutor-"); executor.initialize(); return executor; } }As you can see there's not much change here. The default values I set should be tweaked based on your app's needs.
The second thing we need is addition of @Async.
My old code was:
@Service public class KafkaProducerServiceImpl implements KafkaProducerService { private static final String TOPIC = "logs"; @Autowired private KafkaTemplate<String, KafkaInfo> kafkaTemplate; @Override public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) { kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus); } }As you can see the sync code is quite straightforward. It just takes the kafkaTemplate and sends a message object to the "logs" topic. My new code is a bit longer than that.
@Service public class KafkaProducerServiceImpl implements KafkaProducerService { private static final String TOPIC = "logs"; @Autowired private KafkaTemplateHere onSuccess() is not really meaningful for me. But onFailure() I can log the exception so I'm informed if there's a problem with my kafka server.kafkaTemplate; @Async @Override public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) { ListenableFuture<SendResult<String, KafkaInfo>> future = kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus)); future.addCallback(new ListenableFutureCallback<>() { @Override public void onSuccess(final SendResult<String, KafkaInfo> message) { // left empty intentionally } @Override public void onFailure(final Throwable throwable) { // left empty intentionally } }); } }
There's another thing I have to share with you. For sending an object through kafkatemplate, I have to equip it with the serializer file I have.
public class KafkaInfoSerializer implements Serializer<kafkainfo> { @Override public void configure(MapAlso, don't forget to add the configuration for it. There are several ways of defining serializers for kafka. One of the easiest ways is adding it to application.properties.map, boolean b) { } @Override public byte[] serialize(String arg0, KafkaInfo info) { byte[] retVal = null; ObjectMapper objectMapper = new ObjectMapper(); try { retVal = objectMapper.writeValueAsString(info).getBytes(); } catch (Exception e) { // log the exception } return retVal; } @Override public void close() { } }
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=com.sezinkarli.kafkautils.serializer.KafkaInfoSerializer
Now you have a boot project that can send async objects to the desired topic.