Subscribe / Unsubscribe Enewsletters | Login | Register

Pencil Banner

Big data messaging with Kafka, Part 1

Sunil Patil | April 26, 2016
Build a continuous big data messaging system with Kafka.

Configuring the message consumer

Next we'll create a simple consumer that subscribes to a topic. Whenever a new message is published to the topic, it will read that message and print it to the console. The consumer code is quite similar to the producer code. We start by creating an object of java.util.Properties, setting its consumer-specific properties, and then using it to create a new object of KafkaConsumer. The ConsumerConfig class defines all the properties that we can set. There are just four mandatory properties:

  • BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
  • KEY_DESERIALIZER_CLASS_CONFIG (key.deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (value.deserializer)
  • GROUP_ID_CONFIG (bootstrap.servers)

Just as we did for the producer class, we'll use BOOTSTRAP_SERVERS_CONFIG to configure the host/port pairs for the consumer class. This config lets us establish the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format.

As I previously noted, the Kafka server expects messages in byte[] key and byte[] value formats, and has its own implementation for serializing different types into byte[]. Just as we did with the producer, on the consumer side we'll have to use a custom deserializer to convert byte[] back into the appropriate type.

In the case of the example application, we know the producer is using ByteArraySerializer for the key and StringSerializer for the value. On the client side we therefore need to use org.apache.kafka.common.serialization.ByteArrayDeserializer for the key and org.apache.kafka.common.serialization.StringDeserializer for the value. Setting those classes as values for KEY_DESERIALIZER_CLASS_CONFIG and VALUE_DESERIALIZER_CLASS_CONFIG will enable the consumer to deserialize byte[] encoded types sent by the producer.

Finally, we need to set the value of the GROUP_ID_CONFIG. This should be a group name in string format. I'll explain more about this config in a minute. For now, just look at the Kafka consumer with the four mandatory properties set:

Listing 2. KafkaConsumer


  public class Consumer {
      private static Scanner in;
      private static boolean stop = false;

      public static void main(String[] argv)throws Exception{
          if (argv.length != 2) {
              System.err.printf("Usage: %s <topicName> <groupId>\n",
                      Consumer.class.getSimpleName());
              System.exit(-1);
          }
          in = new Scanner(System.in);
          String topicName = argv[0];
          String groupId = argv[1];

          ConsumerThread consumerRunnable = new ConsumerThread(topicName,groupId);
          consumerRunnable.start();
          String line = "";
          while (!line.equals("exit")) {
              line = in.next();
          }
          consumerRunnable.getKafkaConsumer().wakeup();
          System.out.println("Stopping consumer .....");
          consumerRunnable.join();
      }

      private static class ConsumerThread extends Thread{
          private String topicName;
          private String groupId;
          private KafkaConsumer<String,String> kafkaConsumer;

          public ConsumerThread(String topicName, String groupId){
              this.topicName = topicName;
              this.groupId = groupId;
          }
          public void run() {
              Properties configProperties = new Properties();
              configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
              configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
              configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple");

              //Figure out where to start processing messages from
              kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
              kafkaConsumer.subscribe(Arrays.asList(topicName));
              //Start processing messages
              try {
                  while (true) {
                      ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                      for (ConsumerRecord<String, String> record : records)
                          System.out.println(record.value());
                  }
              }catch(WakeupException ex){
                  System.out.println("Exception caught " + ex.getMessage());
              }finally{
                  kafkaConsumer.close();
                  System.out.println("After closing KafkaConsumer");
              }
          }
          public KafkaConsumer<String,String> getKafkaConsumer(){
             return this.kafkaConsumer;
          }
      }
  }

 

Previous Page  1  2  3  4  5  6  Next Page 

Sign up for CIO Asia eNewsletters.