A simple producer/consumer application
You've seen how Kafka works out of the box. Next, let's develop a custom producer/consumer application. The producer will retrieve user input from the console and send each new line as a message to a Kafka server. The consumer will retrieve messages for a given topic and print them to the console. The producer and consumer components in this case are your own implementations of
Let's start by creating a
Producer.java class. This client class contains logic to read user input from the console and send that input as a message to the Kafka server.
We configure the producer by creating an object from the
java.util.Properties class and setting its properties. The ProducerConfig class defines all the different properties available, but Kafka's default values are sufficient for most uses. For the default config we only need to set three mandatory properties:
BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers) sets a list of host:port pairs used for establishing the initial connections to the Kakfa cluster in the
host1:port1,host2:port2,... format. Even if we have more than one broker in our Kafka cluster, we only need to specify the value of the first broker's
host:port. The Kafka client will use this value to make a discover call on the broker, which will return a list of all the brokers in the cluster. It's a good idea to specify more than one broker in the
BOOTSTRAP_SERVERS_CONFIG, so that if that first broker is down the client will be able to try other brokers.
The Kafka server expects messages in
byte key, byte value format. Rather than converting every key and value, Kafka's client-side library permits us to use friendlier types like
int for sending messages. The library will convert these to the appropriate type. For example, the sample app doesn't have a message-specific key, so we'll use null for the key. For the value we'll use a
String, which is the data entered by the user on the console.
To configure the message key, we set a value of
KEY_SERIALIZER_CLASS_CONFIG on the
org.apache.kafka.common.serialization.ByteArraySerializer. This works because null doesn't need to be converted into
byte. For the message value, we set
VALUE_SERIALIZER_CLASS_CONFIG on the
org.apache.kafka.common.serialization.StringSerializer, because that class knows how to convert a
String into a
Custom key/value objects
StringSerializer, Kafka provides serializers for other primitives such as
long. In order to use a custom object for our key or value, we would need to create a class implementing
org.apache.kafka.common.serialization.Serializer. We could then add logic to serialize the class into
byte. We would also have to use a corresponding deserializer in our consumer code.
Sign up for CIO Asia eNewsletters.