Tuesday, January 7, 2020

send your data async on kafka

For a project, I'm trying to log the basic transactions of the user such as addition and removal of an item and for multiple types of items and sending a message to kafka for each transaction. The accuracy of the log mechanism is not crucial and I don't want it to block my business code in the case of kafka server downtime. In this case an async approach for sending data to kafka is a better way to go.

My kafka producer code is in its boot project. For making it async, I just have to add two annotations: @EnableAsync and @Async.

@EnableAsync will be used in your configuration class (also remember that your class with @SpringBootApplication is also a config class) and will try to find a TaskExecutor bean. If not, it creates a SimpleAsyncTaskExecutor. SimpleAsyncTaskExecutor is ok for toy projects but for anything larger than that it's a bit risky since it does not limit concurrent threads and does not reuse threads. So to be safe, we will also add a task executor bean.

So,
1@SpringBootApplication
2public class KafkaUtilsApplication {
3    public static void main(String[] args) {
4        SpringApplication.run(KafkaUtilsApplication.class, args);
5    }
6}
will become
01@EnableAsync
02@SpringBootApplication
03public class KafkaUtilsApplication {
04    public static void main(String[] args) {
05        SpringApplication.run(KafkaUtilsApplication.class, args);
06    }
07 
08    @Bean
09    public Executor taskExecutor() {
10        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
11        executor.setCorePoolSize(2);
12        executor.setMaxPoolSize(2);
13        executor.setQueueCapacity(500);
14        executor.setThreadNamePrefix("KafkaMsgExecutor-");
15        executor.initialize();
16        return executor;
17    }
18}
As you can see there's not much change here. The default values I set should be tweaked based on your app's needs.

The second thing we need is addition of @Async.


My old code was:
01@Service
02public class KafkaProducerServiceImpl implements KafkaProducerService {
03 
04    private static final String TOPIC = "logs";
05 
06    @Autowired
07    private KafkaTemplate<String, KafkaInfo> kafkaTemplate;
08 
09    @Override
10    public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) {
11        kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus);
12    }
13}
As you can see the sync code is quite straightforward. It just takes the kafkaTemplate and sends a message object to the "logs" topic. My new code is a bit longer than that.
01@Service
02public class KafkaProducerServiceImpl implements KafkaProducerService {
03 
04    private static final String TOPIC = "logs";
05 
06    @Autowired
07    private KafkaTemplate<string kafkainfo=""> kafkaTemplate;
08 
09    @Async
10    @Override
11    public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) {
12        ListenableFuture<SendResult<String, KafkaInfo>> future = kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus));
13        future.addCallback(new ListenableFutureCallback<>() {
14            @Override
15            public void onSuccess(final SendResult<String, KafkaInfo> message) {
16                // left empty intentionally
17            }
18 
19            @Override
20            public void onFailure(final Throwable throwable) {
21                // left empty intentionally
22 
23            }
24        });
25    }
26}
27</string>
Here onSuccess() is not really meaningful for me. But onFailure() I can log the exception so I'm informed if there's a problem with my kafka server.

There's another thing I have to share with you. For sending an object through kafkatemplate, I have to equip it with the serializer file I have.


01public class KafkaInfoSerializer implements Serializer<kafkainfo> {
02 
03    @Override
04    public void configure(Map<string> map, boolean b) {
05    }
06 
07    @Override
08    public byte[] serialize(String arg0, KafkaInfo info) {
09        byte[] retVal = null;
10        ObjectMapper objectMapper = new ObjectMapper();
11        try {
12            retVal = objectMapper.writeValueAsString(info).getBytes();
13        } catch (Exception e) {
14            // log the exception
15        }
16        return retVal;
17    }
18 
19    @Override
20    public void close() {
21    }
22}
23</string>
Also, don't forget to add the configuration for it. There are several ways of defining serializers for kafka. One of the easiest ways is adding it to application.properties. 

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=com.sezinkarli.kafkautils.serializer.KafkaInfoSerializer

Now you have a boot project that can send async objects to the desired topic.

No comments:

Post a Comment