Subscribe / Unsubscribe Enewsletters | Login | Register

Pencil Banner

Look out, Spark and Storm, here comes Apache Apex

Ian Pointer | April 21, 2016
A new open source streaming analytics solution derived from DataTorrent's RTS platform, Apex offers blazing speed and simplified programmability. Let's give it a spin

Here's the code that implements our LogCounterOperator:

public class LogCounterOperator extends BaseOperator {

private HashMap<String, Integer> counter;

public transient DefaultInputPort<String> input = new DefaultInputPort<String>() {


    public void process(String text) {

     String type = text.substring(0, text.indexOf(' '));

     Integer currentCounter = counter.getOrDefault(type, 0);

     counter.put(type, currentCounter+1);



public transient DefaultOutputPort<Map<String, Integer>> output = new DefaultOutputPort<>();


public void endWindow() {




public void setup(OperatorContext context){

      counter = new HashMap();



We're using a simple HashMap for counting our types of log, and we define two ports on handling data flowing through the operator: one for input, and one for output. As these are typed, trying to fit incompatible operators will be a compile-time failure rather than something you find out after deployment. Note that although I've only defined one input and one output port here, it's possible to have multiple inputs and outputs.

The lifecycle of a Generic Operator is simple. Apex will first call setup() for any needed initialization; in the above example, setup() handles the creation of theHashMap. It will then call beginWindow() to indicate that a new window/batch of input processing is beginning, then call process() on every item of data that flows through the operator during the window. When there's no more time left in the current window, Apex calls endWindow(). We don't need any per-window logic, so we leave ourselves with the empty beginWindow() definition that you can find in the abstract BaseOperator. However, at the end of every window, we want to send out our current counts, so we emit the HashMap through the outport port.

Meanwhile, the overridden process() method handles our business logic of taking the first word from the log line and updating our counters. Finally, we have a teardown()method that is called when Apex brings down the pipeline for any clean-up that may be needed -- in this case, we don't need to do anything, but I've cleared the HashMap as an example.

Having constructed our operator, we can now construct the pipeline itself. Again, if you have experience of making a Storm topology, you'll be right at home with this piece of code:

public void populateDAG(DAG dag, Configuration conf) {

      KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("KafkaInput", new KafkaSinglePortStringInputOperator());

      kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());

      LogCounterOperator logCounter = dag.addOperator("LogCounterOperator", new LogCounterOperator());

      ConsoleOutputOperator console = dag.addOperator("Console", new ConsoleOutputOperator());


      dag.addStream("LogLines", kafkaInput.outputPort, logCounter.input);

      dag.addStream("Console", logCounter.output, console.input);


First, we define the nodes of our DAG -- the operators. Then we define the edges of the graph ("streams" in Apex parlance). These streams connect an outport port of an operator to an input port of another operator. Here we connect Kafka to LogCounterOperator and connect the outport port to ConsoleOutputOperator. That's it! If we compile and run the application, we can see the HashMapprinted to standard output:


Previous Page  1  2  3  Next Page 

Sign up for CIO Asia eNewsletters.