Tuesday, January 7, 2020

send your data async on kafka

For a project, I'm trying to log the basic transactions of the user such as addition and removal of an item and for multiple types of items and sending a message to kafka for each transaction. The accuracy of the log mechanism is not crucial and I don't want it to block my business code in the case of kafka server downtime. In this case an async approach for sending data to kafka is a better way to go.

My kafka producer code is in its boot project. For making it async, I just have to add two annotations: @EnableAsync and @Async.

@EnableAsync will be used in your configuration class (also remember that your class with @SpringBootApplication is also a config class) and will try to find a TaskExecutor bean. If not, it creates a SimpleAsyncTaskExecutor. SimpleAsyncTaskExecutor is ok for toy projects but for anything larger than that it's a bit risky since it does not limit concurrent threads and does not reuse threads. So to be safe, we will also add a task executor bean.

So,
@SpringBootApplication
public class KafkaUtilsApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaUtilsApplication.class, args);
    }
}
will become
@EnableAsync
@SpringBootApplication
public class KafkaUtilsApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaUtilsApplication.class, args);
    }

    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("KafkaMsgExecutor-");
        executor.initialize();
        return executor;
    }
}
As you can see there's not much change here. The default values I set should be tweaked based on your app's needs.

The second thing we need is addition of @Async.


My old code was:
@Service
public class KafkaProducerServiceImpl implements KafkaProducerService {

    private static final String TOPIC = "logs";

    @Autowired
    private KafkaTemplate<String, KafkaInfo> kafkaTemplate;

    @Override
    public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) {
        kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus);
    }
}
As you can see the sync code is quite straightforward. It just takes the kafkaTemplate and sends a message object to the "logs" topic. My new code is a bit longer than that.
@Service
public class KafkaProducerServiceImpl implements KafkaProducerService {

    private static final String TOPIC = "logs";

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Async
    @Override
    public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) {
        ListenableFuture<SendResult<String, KafkaInfo>> future = kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus));
        future.addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onSuccess(final SendResult<String, KafkaInfo> message) {
                // left empty intentionally
            }

            @Override
            public void onFailure(final Throwable throwable) {
                // left empty intentionally

            }
        });
    }
}
Here onSuccess() is not really meaningful for me. But onFailure() I can log the exception so I'm informed if there's a problem with my kafka server.

There's another thing I have to share with you. For sending an object through kafkatemplate, I have to equip it with the serializer file I have.


public class KafkaInfoSerializer implements Serializer<kafkainfo> {

    @Override
    public void configure(Map map, boolean b) {
    }

    @Override
    public byte[] serialize(String arg0, KafkaInfo info) {
        byte[] retVal = null;
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            retVal = objectMapper.writeValueAsString(info).getBytes();
        } catch (Exception e) {
            // log the exception
        }
        return retVal;
    }

    @Override
    public void close() {
    }
}
Also, don't forget to add the configuration for it. There are several ways of defining serializers for kafka. One of the easiest ways is adding it to application.properties. 

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=com.sezinkarli.kafkautils.serializer.KafkaInfoSerializer

Now you have a boot project that can send async objects to the desired topic.