Push Consumer Example

This sample demonstrates how to consume messages from the DXL Databus client by using a DatabusPushConsumer consumer in a running Kafka cluster.

The DatabusPushConsumer allows clients to focus on receiving message from the databus and not focusing on transport-level mechanism details such as commit and retry.

Benefits

  • Eliminates rebalancing due to internal use of consumer pause and resume mechanisms while records are being processed.
  • Provides ability to reprocess the same set of records automatically when failures occur.
  • Push model instead of polling model. Records are read from Kafka and pushed into client’s logic implementation. Continuous polling is eliminated.
  • Hides complex operations such as poll, commit, pause, resume, and seek which are handled internally.

Code highlights are shown below:

Sample Code

package sample;

import broker.ClusterHelper;
import com.opendxl.databus.common.RecordMetadata;
import com.opendxl.databus.common.internal.builder.TopicNameBuilder;
import com.opendxl.databus.consumer.*;
import com.opendxl.databus.entities.Headers;
import com.opendxl.databus.entities.MessagePayload;
import com.opendxl.databus.entities.RoutingData;
import com.opendxl.databus.producer.*;
import com.opendxl.databus.serialization.ByteArrayDeserializer;
import com.opendxl.databus.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;


public class BasicPushConsumerExample {

    private static final String PRODUCE_TOPIC = "topic1";
    private static final String CONSUME_TOPIC = "topic1";
    private DatabusPushConsumerFuture databusPushConsumerFuture;

    private static final long PRODUCER_TIME_CADENCE_MS = 1000L;
    private final AtomicBoolean closed = new AtomicBoolean(false);

    private static Logger LOG = LoggerFactory.getLogger(BasicPushConsumerExample.class);
    private ExecutorService executor;

    public BasicPushConsumerExample() {

        // Start Kafka cluster
        startKafkaCluster();

        // Start producing messages to Databus
        produceMessages();

        // Create a Push Consumer
        DatabusPushConsumerListenerStatus databusPushConsumerStatus = null;
        try(DatabusPushConsumer<byte[]> consumer =
                    new DatabusPushConsumer(getConsumerConfig(),
                            new ByteArrayDeserializer(), new MessageProcessor())) {

            // Subscribe to topic
            consumer.subscribe(Collections.singletonList(CONSUME_TOPIC));

            // Start pushing messages into MessageProcessor in an async fashion
            this.databusPushConsumerFuture = consumer.pushAsync();

            // Wait until the example is stopped
            databusPushConsumerStatus = this.databusPushConsumerFuture.get();

        } catch (Exception e) {
            LOG.error("ERROR" + e.getMessage(), e);
        } finally {
            LOG.info("Push consumer status:" + databusPushConsumerStatus.getStatus());
        }
    }

    private void startKafkaCluster() {
        ClusterHelper
                .getInstance()
                .addBroker(9092)
                .zookeeperPort(2181)
                .start();
    }

    private Properties getConsumerConfig() {
        // Start pushing messages coming from Databus
        final Properties config = new Properties();
        config.put(ConsumerConfiguration.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfiguration.GROUP_ID_CONFIG, "consumer-group-1");
        config.put(ConsumerConfiguration.ENABLE_AUTO_COMMIT_CONFIG, "false");
        config.put(ConsumerConfiguration.CLIENT_ID_CONFIG, "consumer-id");
        return config;
    }

    /**
     * This is the implementation performed by the Databus SDK client to process messages
     */
    class MessageProcessor implements DatabusPushConsumerListener<byte[]> {

        @Override
        public DatabusPushConsumerListenerResponse onConsume(ConsumerRecords<byte[]> records) {
            // Iterate records
            for (ConsumerRecord<byte[]> record : records) {

                // Get headers as String
                final StringBuilder headers = new StringBuilder().append("[");
                record.getHeaders().getAll().forEach((k, v) -> headers.append("[" + k + ":" + v + "]"));
                headers.append("]");

                LOG.info("[CONSUMER <- KAFKA][MSG RCEIVED] ID " + record.getKey() +
                        " TOPIC:" + record.getComposedTopic() +
                        " KEY:" + record.getKey() +
                        " PARTITION:" + record.getPartition() +
                        " OFFSET:" + record.getOffset() +
                        " TIMESTAMP:" + record.getTimestamp() +
                        " HEADERS:" + headers +
                        " PAYLOAD:" + new String(record.getMessagePayload().getPayload()));
            }

            return DatabusPushConsumerListenerResponse.CONTINUE_AND_COMMIT;
        }
    }

    private void produceMessages() {

        final Map config = new HashMap<String, Object>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-id-sample");
        config.put(ProducerConfig.LINGER_MS_CONFIG, "100");
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, "150000");
        Producer<byte[]> producer =  new DatabusProducer<>(config, new ByteArraySerializer());

        Runnable produceMessagesTask = () -> {
            LOG.info("Producer started");
            while (!closed.get()) {

                // Prepare a record
                final String message = "Hello World at:" + LocalDateTime.now();

                // user should provide the encoding
                final byte[] payload = message.getBytes(Charset.defaultCharset());

                String key = String.valueOf(System.currentTimeMillis());
                RoutingData routingData = new RoutingData(PRODUCE_TOPIC, key, null);
                Headers headers = null;
                MessagePayload<byte[]> messagePayload = new MessagePayload<>(payload);
                ProducerRecord<byte[]> producerRecord = new ProducerRecord<>(routingData, headers, messagePayload);


                // Send the record
                producer.send(producerRecord, new ProducerCallback(producerRecord.getRoutingData().getShardingKey()));
                LOG.info("[PRODUCER -> KAFKA][SENDING MSG] ID " + producerRecord.getRoutingData().getShardingKey() +
                        " TOPIC:" + TopicNameBuilder.getTopicName(PRODUCE_TOPIC, null) +
                        " PAYLOAD:" + message);

                justWait(PRODUCER_TIME_CADENCE_MS);
            }
            producer.flush();
            producer.close();
            LOG.info("Producer closed");

        };

        executor = Executors.newFixedThreadPool(1);
        executor.submit(produceMessagesTask);
    }



    private void justWait(long time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static class ProducerCallback implements Callback {

        private String shardingKey;

        public ProducerCallback(String shardingKey) {

            this.shardingKey = shardingKey;
        }

        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception != null) {
                LOG.warn("Error sending a record " + exception.getMessage());
                return;
            }
            LOG.info("[PRODUCER <- KAFKA][OK MSG SENT] ID " + shardingKey +
                    " TOPIC:" + metadata.topic() +
                    " PARTITION:" + metadata.partition() +
                    " OFFSET:" + metadata.offset());
        }
    }

    synchronized private void stopExample() {
        try {
            closed.set(true);
            ClusterHelper.getInstance().stop();
            executor.shutdown();
            executor.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
        } finally {
            executor.shutdownNow();
            LOG.info("Example finished");

        }
    }

    public void startExample() throws InterruptedException {

        Runtime.getRuntime().addShutdownHook(
                new Thread(
                        new Runnable() {
                            public void run() {
                                stopExample();
                                LOG.info("Example finished");
                            }
                        }));

    }


    public static void main(String[] args) throws InterruptedException {
        LOG.info("Ctrl-C to finish");
        new BasicPushConsumerExample().startExample();
    }

}

Steps to create a Push Consumer

1. Create a DatabusPushConsumer by using a specific consumer configuration and a DatabusPushConsumerListener instance that will handle the received records. In this particular case the MessageProcessor class implements the DatabusPushConsumerListener interface to handle messages from the Databus.

...
DatabusPushConsumer<byte[]> consumer =
            new DatabusPushConsumer(getConsumerConfig(),
                    new ByteArrayDeserializer(), new MessageProcessor());
...
  1. Subscribe to a topic
consumer.subscribe(Collections.singletonList(CONSUME_TOPIC));
  1. Start pushing messages into MessageProcessor in an async fashion.
databusPushConsumerFuture = consumer.pushAsync();
  1. Monitor Push Consumer
...
// Wait indefinitely
databusPushConsumerFuture.get();

...
// Wait for a while then check the listener status
while (true)  {
    try {
        DatabusPushConsumerStatus status =
                this.databusPushConsumerFuture.get(100, TimeUnit.MILLISECONDS);
        // if this line is reached, means the listener has finished.
        return;
    } catch (TimeoutException e) {
        // TimeoutException means that listener is still working, so it continue the loop
        continue;
    } finally {
        LOG.info("Push consumer status:" + databusPushConsumerStatus.getStatus());
        LOG.info("Push consumer status:" + databusPushConsumerStatus.getListenerResult());
    }
}
...