Consumer Metrics Example

This sample demonstrates collecting consumer metrics from the DXL Databus client by using a consumer in a running Kafka cluster. The type of metrics that can be obtained are as follows:

  • Consumer metrics associated to a clientId.
  • Consumer metrics associated to a clientId and a topic.

The purpose of this example is to show Consumer metric values varying across time, so a Consumer instance is created and it must continuously consume records from Kafka. In order to provide these records, the example instantiates a Producer to continue producing records to specific topics for the Consumer to consume from. The Producer instance has only a helper role in this example so it is not covered in detail.

Code highlights are shown below:

Sample Code

public static void main(String[] args) throws InterruptedException {
    LOG.info("Ctrl-C to finish");
    new ConsumerMetricsExample().startExample();
}

public ConsumerMetricsExample() {

    // Start Kafka cluster
    ClusterHelper
        .getInstance()
        .addBroker(BROKER_PORT)
        .zookeeperPort(ZOOKEEPER_PORT)
        .start();

    // Prepare a Producer
    this.producer = getProducer();

    // Prepare a Consumer
    this.consumer = getConsumer();

    // Subscribe Consumer to topic
    this.consumer.subscribe(Collections.singletonList(consumerTopic));

    // Set up two threads: one to produce records and another to consume them
    this.executor = Executors.newFixedThreadPool(2);

    // Set up a thread to periodically collect consumer metrics
    this.reportMetricsScheduler = Executors.newScheduledThreadPool(1);

    // Decimal format to apply to collected consumer metrics
    decimalFormat = new DecimalFormat(INTEGER_FORMAT_PATTERN);

}

// Create consumer instance
public Consumer<byte[]> getConsumer() {
    final Properties consumerProps = new Properties();
    consumerProps.put(ConsumerConfiguration.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    consumerProps.put(ConsumerConfiguration.GROUP_ID_CONFIG, "consumer-group-1");
    consumerProps.put(ConsumerConfiguration.ENABLE_AUTO_COMMIT_CONFIG, "true");
    consumerProps.put(ConsumerConfiguration.SESSION_TIMEOUT_MS_CONFIG, "30000");
    consumerProps.put(ConsumerConfiguration.CLIENT_ID_CONFIG, "consumer-id-sample");
    return new DatabusConsumer<>(consumerProps, new ByteArrayDeserializer());
}

// Start the consumer metrics sample
public void startExample() throws InterruptedException {

    Runnable consumerTask = getConsumerTask();
    Runnable producerTask = getProducerTask();

    executor.submit(consumerTask);
    executor.submit(producerTask);

    reportMetricsScheduler.scheduleAtFixedRate(reportMetrics(),
            REPORT_METRICS_INITIAL_DELAY,
            REPORT_METRICS_PERIOD,
            TimeUnit.MILLISECONDS);

    Runtime.getRuntime().addShutdownHook(
            new Thread(
                new Runnable() {
                    public void run() {
                        stopExample(executor);
                        LOG.info("Example finished");
                    }
                }));

}

// Consumer loop to continuously get records from Kafka
private Runnable getConsumerTask() {
    return () -> {
        try {
            LOG.info("Consumer started");
            int pollCount = 0;
            while (!closed.get()) {

                // Polling the databus
                consumer.poll(CONSUMER_POLL_TIMEOUT);

                consumer.commitSync();
                justWait(CONSUMER_TIME_CADENCE_MS);
            }
        } catch (Exception e) {
            LOG.error(e.getMessage());
        } finally {
            consumer.unsubscribe();
            consumer.close();
            LOG.info("Consumer closed");
        }
    };
}

// Instance a thread to get Kafka consumer metrics
private Runnable reportMetrics() {
    return () -> {
        try {
            ConsumerMetricPerClientIdAndTopics recordsTotalMetric
                    = consumer.recordsTotalMetric();
            ConsumerMetricPerClientIdAndTopics bytesTotalMetric
                    = consumer.bytesTotalMetric();
            ConsumerMetricPerClientIdAndTopics recordsPerSecMetric
                    = consumer.recordsPerSecondAvgMetric();
            ConsumerMetricPerClientIdAndTopics bytesPerSecondAvgMetric
                    = consumer.bytesPerSecondAvgMetric();
            ConsumerMetricPerClientIdAndTopicPartitions recordsLagPerTopicPartition
                    = consumer.recordsLagPerTopicPartition();

            LOG.info("");
            LOG.info("CONSUMER TOTAL:"
                    + decimalFormat.format(recordsTotalMetric.getValue()) + "rec "
                    + decimalFormat.format(bytesTotalMetric.getValue()) + "bytes");
             LOG.info("CONSUMER RATE:"
                    + decimalFormat.format(recordsPerSecMetric.getValue()) + "rec "
                    + decimalFormat.format(bytesPerSecondAvgMetric.getValue()) + "bytes");

            for(Map.Entry<String, ConsumerMetric > topicMetric
                    : recordsPerSecMetric.getTopicMetrics().entrySet()) {
                LOG.info(" - " + topicMetric.getKey()
                         + ":" + decimalFormat.format(topicMetric.getValue().getValue())
                         + "rec/sec");
            }

            for(Map.Entry<String, ConsumerMetric > topicMetric
                    : bytesPerSecondAvgMetric.getTopicMetrics().entrySet()) {
                LOG.info(" - " + topicMetric.getKey()
                         + ":" + decimalFormat.format(topicMetric.getValue().getValue())
                         + "bytes/sec");
            }
            LOG.info("CONSUMER MAX LAG FOR ANY PARTITION:"
                     + decimalFormat.format(consumer.recordsLagMaxMetric().getValue())
                     + "rec");

            Map<TopicPartition, ConsumerMetric> topicPartitionsMetrics =
                    recordsLagPerTopicPartition.getTopicPartitionsMetrics();

            for( Map.Entry<TopicPartition, ConsumerMetric> tpMetric
                   : topicPartitionsMetrics.entrySet()) {
                LOG.info(" - " + tpMetric.getKey().topic()
                         + "-" + tpMetric.getKey().partition()
                         + " " + decimalFormat.format(tpMetric.getValue().getValue())
                         + "rec");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }
    };
}

// Producer loop to continuously produce records to Kafka
private Runnable getProducerTask() {
    return () -> {
        LOG.info("Producer started");
        while (!closed.get()) {

            // Prepare a record
            final String message = "Hello World at:" + LocalDateTime.now();

            // user should provide the encoding
            final byte[] payload = message.getBytes(Charset.defaultCharset());
            final ProducerRecord<byte[]> producerRecord = getProducerRecord(producerTopic,
                                                                            payload);

            // Send the record
            producer.send(producerRecord);

            justWait(PRODUCER_TIME_CADENCE_MS);
        }
        producer.close();
        LOG.info("Producer closed");

    };
}

The first step is to create the Kafka cluster and the Consumer and the helper Producer with which to run the example. The constructor method ConsumerMetricsExample() is responsible for accomplishing that. It also subscribes the Consumer to the selected topic CONSUMER_TOPIC.

Second, the startExample() method creates a consumer thread by invoking getConsumerTask(). This starts the thread for the Consumer instance which continuously receives records.

Third, the startExample() method creates a producer thread by calling getProducerTask(). This starts the thread for the Producer which sends records to the topic that the Consumer is subscribed to.

Finally, the startExample() method starts the metrics collecting thread which will periodically call reportMetrics(). The metrics thread has two parameters which must be configured:

  • REPORT_METRICS_INITIAL_DELAY which is the time to wait prior collecting metrics.
  • REPORT_METRICS_PERIOD which is the interval at which to collect metrics from Kafka.

After the consumer, producer and metrics threads have started, the sample periodically displays the consumer metrics.

The reportMetrics() method is responsible for collecting consumer metrics from the Kafka cluster.

The Consumer metrics obtained by the sample are as follows:

Metric Name Description
records-consumed-total Total number of records consumed per consumer and its topics.
bytes-consumed-total Total bytes consumed per consumer and its topics.
records-consumed-rate Average number of records consumed per seconds for each consumer and its topics.
bytes-consumed-rate Average bytes consumed per second for each consumer and its topics.
records-lag-max The maximum lag in terms of number of records for any partition.
records-lag The latest lag of the partition.

Further information about Kafka monitoring and metrics can be found here.

Finally, pressing CTRL+C shuts down the example. The shut down steps involve:

  • stop producer thread and close Producer instance
  • stop consumer thread, unsubscribe Consumer instance from topics and close it
  • stop Kafka cluster

Run the sample

Prerequisites

  • Java Development Kit 8 (JDK 8) or later.

Running

To run this sample execute the runsample script as follows:

$ ./runsample sample.ConsumerMetricsExample

The output shows:

Ctrl-C to finish
Zookeeper node started: localhost:2182
Kafka broker started: localhost:9092
Consumer started
Producer started

CONSUMER TOTAL:12,220rec 822,133bytes
CONSUMER RATE:382rec 25,698bytes
 - topic1:382rec/sec
 - topic1:25,697bytes/sec
CONSUMER MAX LAG FOR ANY PARTITION:9,984rec
 - topic1-1 0rec
 - topic1-0 6,984rec
 - topic1-3 0rec
 - topic1-2 0rec
 - topic1-5 0rec
 - topic1-4 0rec

CONSUMER TOTAL:49,703rec 3,343,564bytes
CONSUMER RATE:1,184rec 79,641bytes
 - topic1:1,184rec/sec
 - topic1:79,641bytes/sec
CONSUMER MAX LAG FOR ANY PARTITION:18,847rec
 - topic1-1 0rec
 - topic1-0 0rec
 - topic1-3 0rec
 - topic1-2 0rec
 - topic1-5 10,847rec
 - topic1-4 0rec

CONSUMER TOTAL:94,409rec 6,351,728bytes
CONSUMER RATE:1,816rec 122,189bytes
 - topic1:1,816rec/sec
 - topic1:122,186bytes/sec
CONSUMER MAX LAG FOR ANY PARTITION:63,553rec
 - topic1-1 18,007rec
 - topic1-0 55,553rec
 - topic1-3 0rec
 - topic1-2 0rec
 - topic1-5 3,948rec
 - topic1-4 7,511rec

CONSUMER TOTAL:138,297rec 9,306,623bytes
CONSUMER RATE:4,324rec 290,987bytes
 - topic1:4,324rec/sec
 - topic1:290,987bytes/sec
CONSUMER MAX LAG FOR ANY PARTITION:238,941rec
 - topic1-1 18,007rec
 - topic1-0 48,235rec
 - topic1-3 100,829rec
 - topic1-2 157,391rec
 - topic1-5 231,441rec
 - topic1-4 7,511rec