21 June 2022

JSON is a well-known popular format for data transmission. There are a lot of tools/frameworks in all languages to serialize/deserialize any objects to a JSON text representation and vice versa. JSON is also popular in context of event-driven architecture since producer and receiver pairs must be kept loosely coupled. In this article, I'm gonna explain how to configure publisher/subscriber to encode events' value automatically to/from JSON representation.

Serialization/De-serialization in Kafka

There are two ways to config serializers:

There are lots of built-in serializers such as ByteArraySerializer, ByteBufferSerializer, IntegerSerializer, and UUIDSerializer in the kafka-clients library. These serializers are enough for event keys which are mostly simple data types. For values, however, we are looking for more mature schemas such as JSON, XML, Avro, Thrift, and Protocol Buffers. This is because values in comparision to keys have complex structure and details. In cases that built-in serializers are not good fit, you need to first choose one kind of schema for data transmission, JSON for example, and then choose one of these two options:

The first option seems more natural to the Kafka design. Besides, you can add the serializer class to a common library and then re-use it in many microservices.

Configuration

To implement a custom JSON serializer and deserializer, we are going to use fasterxml library which is one of old famous json libraries in Java.

<dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>${jackson.version}</version>
</dependency>
<dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>${jackson.version}</version>
</dependency>

Now I just implement Kafka's Serializer interface which has only one serialize method.

public interface Serializer<T> extends Closeable {
    byte[] serialize(String topic, T payload);
}

In my application, this implementation will be as this:

public class OrderPayloadSerializer implements Serializer<OrderPayload> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    private static final class MarshallingException extends RuntimeException {
        private static final long serialVersionUID = 1L;

        private MarshallingException(Throwable cause) { super(cause); }
    }

    @Override
    public byte[] serialize(String topic, OrderPayload data) {
        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            throw new MarshallingException(e);
        }
    }
}

OrderPayload is my event's value, which is going to be transmitted in JSON format. Please note that this class also needs some JSON configuration to properly converted to JSON text.

public final class OrderPayload extends EventPayload{
    private final Date orderDate;
    private final String merchandise;
    private final String user;

    public OrderPayload(@JsonProperty("id")UUID id, @JsonProperty("order_date")Date orderDate,
                        @JsonProperty("merchandise")String merchandise, @JsonProperty("user")String user) {
        super(id);
        this.orderDate = orderDate;
        this.merchandise = merchandise;
        this.user = user;
    }

    public Date getOrderDate() {
        return orderDate;
    }

    public String getMerchandise() {
        return merchandise;
    }

    public String getUser() {
        return user;
    }

    @Override
    public String toString() {
        return "OrderPayload{" +
                "orderDate=" + orderDate +
                ", merchandise='" + merchandise + '\'' +
                ", user='" + user + '\'' +
                '}';
    }
}

Note that OrderPayload is immutable that is completely compatible with nature of Kafka.

Dockerization

This is Maven multi-module project, with two child modules, json-producer and json-consumer. Both of these modules are dockerized with their own docker file. The parent project is equipped with docker-compose, which runs Kafka and Zookeeper in addition to producer/consumer projects as docker containers.

Run Application

Run the application by executing the following command in the project's root directory:

    mvn clean package
    docker-compose up -d

You should see the logs of successful execution of containers. You can then login to each individual containers to see what's happening inside

    docker-compose logs producer-service

Full source code of this article can be found on my GitHub repository.