For a project, I'm trying to log the basic transactions of the user such as addition and removal of an item and for multiple types of items and sending a message to kafka for each transaction. The accuracy of the log mechanism is not crucial and I don't want it to block my business code in the case of kafka server downtime. In this case an async approach for sending data to kafka is a better way to go.
My kafka producer code is in its boot project. For making it async, I just have to add two annotations: @EnableAsync and @Async.
@EnableAsync will be used in your configuration class (also remember that your class with @SpringBootApplication is also a config class) and will try to find a TaskExecutor bean. If not, it creates a SimpleAsyncTaskExecutor. SimpleAsyncTaskExecutor is ok for toy projects but for anything larger than that it's a bit risky since it does not limit concurrent threads and does not reuse threads. So to be safe, we will also add a task executor bean.
So,
The second thing we need is addition of @Async.
My old code was:
There's another thing I have to share with you. For sending an object through kafkatemplate, I have to equip it with the serializer file I have.
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=com.sezinkarli.kafkautils.serializer.KafkaInfoSerializer
Now you have a boot project that can send async objects to the desired topic.
My kafka producer code is in its boot project. For making it async, I just have to add two annotations: @EnableAsync and @Async.
@EnableAsync will be used in your configuration class (also remember that your class with @SpringBootApplication is also a config class) and will try to find a TaskExecutor bean. If not, it creates a SimpleAsyncTaskExecutor. SimpleAsyncTaskExecutor is ok for toy projects but for anything larger than that it's a bit risky since it does not limit concurrent threads and does not reuse threads. So to be safe, we will also add a task executor bean.
So,
@SpringBootApplication
public class KafkaUtilsApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaUtilsApplication.class, args);
}
}
will become
@EnableAsync
@SpringBootApplication
public class KafkaUtilsApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaUtilsApplication.class, args);
}
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("KafkaMsgExecutor-");
executor.initialize();
return executor;
}
}
As you can see there's not much change here. The default values I set should be tweaked based on your app's needs.
The second thing we need is addition of @Async.
My old code was:
@Service
public class KafkaProducerServiceImpl implements KafkaProducerService {
private static final String TOPIC = "logs";
@Autowired
private KafkaTemplate<String, KafkaInfo> kafkaTemplate;
@Override
public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) {
kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus);
}
}
As you can see the sync code is quite straightforward. It just takes the kafkaTemplate and sends a message object to the "logs" topic.
My new code is a bit longer than that.
@Service
public class KafkaProducerServiceImpl implements KafkaProducerService {
private static final String TOPIC = "logs";
@Autowired
private KafkaTemplate kafkaTemplate;
@Async
@Override
public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) {
ListenableFuture<SendResult<String, KafkaInfo>> future = kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus));
future.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(final SendResult<String, KafkaInfo> message) {
// left empty intentionally
}
@Override
public void onFailure(final Throwable throwable) {
// left empty intentionally
}
});
}
}
Here onSuccess() is not really meaningful for me. But onFailure() I can log the exception so I'm informed if there's a problem with my kafka server.There's another thing I have to share with you. For sending an object through kafkatemplate, I have to equip it with the serializer file I have.
public class KafkaInfoSerializer implements Serializer<kafkainfo> {
@Override
public void configure(Map map, boolean b) {
}
@Override
public byte[] serialize(String arg0, KafkaInfo info) {
byte[] retVal = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
retVal = objectMapper.writeValueAsString(info).getBytes();
} catch (Exception e) {
// log the exception
}
return retVal;
}
@Override
public void close() {
}
}
Also, don't forget to add the configuration for it. There are several ways of defining serializers for kafka.
One of the easiest ways is adding it to application.properties.
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=com.sezinkarli.kafkautils.serializer.KafkaInfoSerializer
Now you have a boot project that can send async objects to the desired topic.