21 June 2022
JSON is a well-known popular format for data transmission. There are a lot of tools/frameworks in all languages to serialize/deserialize any objects to a JSON text representation and vice versa. JSON is also popular in context of event-driven architecture since producer and receiver pairs must be kept loosely coupled. In this article, I'm gonna explain how to configure publisher/subscriber to encode events' value automatically to/from JSON representation.
There are two ways to config serializers:
key.serializer
and value.serializer
properties.KafkaProducer
object.ByteArraySerializer
, ByteBufferSerializer
,
IntegerSerializer
, and UUIDSerializer
in the kafka-clients
library. These serializers
are enough for event keys which are mostly simple data types. For values, however, we are looking for more mature schemas
such as JSON, XML, Avro, Thrift, and Protocol Buffers. This is because values in comparision to keys have complex structure and
details.
In cases that built-in serializers are not good fit, you need to first choose one kind of schema for data transmission, JSON for example, and then choose one of these two options:
To implement a custom JSON serializer and deserializer, we are going to use fasterxml library which is one of old famous json libraries in Java.
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>${jackson.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>${jackson.version}</version> </dependency>
Now I just implement Kafka's Serializer
interface which has only one serialize method.
public interface Serializer<T> extends Closeable { byte[] serialize(String topic, T payload); }
In my application, this implementation will be as this:
public class OrderPayloadSerializer implements Serializer<OrderPayload> { private final ObjectMapper objectMapper = new ObjectMapper(); private static final class MarshallingException extends RuntimeException { private static final long serialVersionUID = 1L; private MarshallingException(Throwable cause) { super(cause); } } @Override public byte[] serialize(String topic, OrderPayload data) { try { return objectMapper.writeValueAsBytes(data); } catch (JsonProcessingException e) { throw new MarshallingException(e); } } }
OrderPayload
is my event's value, which is going to be transmitted in JSON format. Please note that this class
also needs some JSON configuration to properly converted to JSON text.
public final class OrderPayload extends EventPayload{ private final Date orderDate; private final String merchandise; private final String user; public OrderPayload(@JsonProperty("id")UUID id, @JsonProperty("order_date")Date orderDate, @JsonProperty("merchandise")String merchandise, @JsonProperty("user")String user) { super(id); this.orderDate = orderDate; this.merchandise = merchandise; this.user = user; } public Date getOrderDate() { return orderDate; } public String getMerchandise() { return merchandise; } public String getUser() { return user; } @Override public String toString() { return "OrderPayload{" + "orderDate=" + orderDate + ", merchandise='" + merchandise + '\'' + ", user='" + user + '\'' + '}'; } }
Note that OrderPayload
is immutable that is completely compatible with nature of Kafka.
This is Maven multi-module project, with two child modules, json-producer and json-consumer. Both of these modules are dockerized with their own docker file. The parent project is equipped with docker-compose, which runs Kafka and Zookeeper in addition to producer/consumer projects as docker containers.
Run the application by executing the following command in the project's root directory:
mvn clean package docker-compose up -d
You should see the logs of successful execution of containers. You can then login to each individual containers to see what's happening inside
docker-compose logs producer-service
Full source code of this article can be found on my GitHub repository.