Transactions producer consumer example

This sample demonstrates how to produce and consume messages from the DXL Databus client by using a DatabusProducer and DatabusConsumer in a running Kafka cluster with Transactions.

Code highlights are shown below:

Sample Code

public static void main(String[] args) throws Exception {
    LOG.info("Ctrl-C to finish");
    new TransactionConsumerProducerExample().startExample();
}

public TransactionConsumerProducerExample() throws Exception {

    // Start Kafka cluster
    ClusterHelper
            .getInstance()
            .addBroker(9092)
            .addBroker(9093)
            .addBroker(9094)
            .zookeeperPort(2181)
            .start();

    // Create a new Kafka Transactional topic
    ClusterHelper.getInstance().addNewKafkaTopic(producerTopic, TRANSACTIONAL_TOPIC_REPLICATION_FACTOR,
            TRANSACTIONAL_TOPIC_PARTITION_NUMBER);

    // Prepare a Producer
    this.producer = getProducer();

    // Prepare a Consumer
    this.consumer = getConsumer();

    // Subscribe to topic
    this.consumer.subscribe(Collections.singletonList(consumerTopic));

    this.executor = Executors.newFixedThreadPool(2);
}

public void startExample() throws InterruptedException {

    Runnable consumerTask = getConsumerTask();
    Runnable producerTask = getProducerTask();

    executor.submit(consumerTask);
    executor.submit(producerTask);

    Runtime.getRuntime().addShutdownHook(
            new Thread(
                    new Runnable() {
                        public void run() {
                            stopExample(executor);
                            LOG.info("Example finished");
                        }
                    }));

}

private Runnable getConsumerTask() {
    return () -> {
        try {
            LOG.info("Consumer started");
            while (!closed.get()) {
                // Polling the databus
                final ConsumerRecords<byte[]> records = consumer.poll(CONSUMER_TIME_CADENCE_MS);

                // Iterate records
                for (ConsumerRecord<byte[]> record : records) {

                    // Get headers as String
                    final StringBuilder headers = new StringBuilder().append("[");
                    record.getHeaders().getAll().forEach((k, v) -> headers.append("[" + k + ":" + v + "]"));
                    headers.append("]");

                    LOG.info("[CONSUMER <- KAFKA][MSG RECEIVED] ID " + record.getKey() +
                            " TOPIC:" + record.getComposedTopic() +
                            " KEY:" + record.getKey() +
                            " PARTITION:" + record.getPartition() +
                            " OFFSET:" + record.getOffset() +
                            " TIMESTAMP:" + record.getTimestamp() +
                            " HEADERS:" + headers +
                            " PAYLOAD:" + new String(record.getMessagePayload().getPayload()));
                }
                consumer.commitAsync();
            }
        } catch (Exception e) {
            LOG.error(e.getMessage());
        } finally {
            consumer.unsubscribe();
            consumer.close();
            LOG.info("Consumer closed");
        }
    };
}

private Runnable getProducerTask() {
    return () -> {
        LOG.info("Producer started");
        producer.initTransactions();
        while (!closed.get()) {
            try {

                // Start Transaction
                producer.beginTransaction();

                LOG.info("[TRANSACTION BEGIN]");

                // Send Transaction messages
                for (int i = 0; i < TRANSACTION_MESSAGES_NUMBER; i++) {
                    // Prepare a record
                    String message = "Hello World at:" + LocalDateTime.now() + "-" + i;

                    // user should provide the encoding
                    final byte[] payload = message.getBytes(Charset.defaultCharset());
                    final ProducerRecord<byte[]> producerRecord = getProducerRecord(producerTopic, payload);

                    // Send the record
                    producer.send(producerRecord);
                    LOG.info("[PRODUCER -> KAFKA][SENDING MSG] ID " + producerRecord.getRoutingData().getShardingKey() +
                            " TOPIC:" + TopicNameBuilder.getTopicName(producerTopic, null) +
                            " PAYLOAD:" + message);
                }

                // Commit transaction
                producer.commitTransaction();

                LOG.info("[TRANSACTION COMMITTED SUCCESSFUL]");
            } catch (Exception e) {
                // In case of exceptions, just abort the transaction.
                LOG.info("[TRANSACTION ERROR][ABORTING TRANSACTION] CAUSE " + e.getMessage());
                producer.abortTransaction();
            }

            justWait(PRODUCER_TIME_CADENCE_MS);
        }

        producer.flush();
        producer.close();
        LOG.info("Producer closed");
    };
}

synchronized private void stopExample(final ExecutorService executor) {
    try {
        closed.set(true);
        consumer.wakeup();
        ClusterHelper.getInstance().stop();
        executor.shutdown();
        executor.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
    } finally {
        executor.shutdownNow();
    }
}

The first step is to create the Kafka cluster instance with which to run the example. The constructor method TransactionConsumerProducerExample() is responsible for accomplishing that. This method also creates a DatabusConsumer instance by invoking the``getConsumer()`` method. The getProducer() method is also invoked, which creates an instance of a DatabusProducer. Finally, the ClusterHelper.getInstance().addNewKafkaTopic() method is invoked which creates a transactional topic. A transactional topic is a Kafka topic with at least 3 partitions and a replication factor of at least 3. It is necessary to create this topic in a minimum of 3 running brokers. Thus, in the constructor we have to add 3 brokers instances.

ClusterHelper
        .getInstance()
        .addBroker(9092)
        .addBroker(9093)
        .addBroker(9094)
        .zookeeperPort(2181)
        .start();

The getConsumer() and getProducer() methods has custom configurations to enable transactions:

public Consumer<byte[]> getConsumer() {
    final Properties consumerProps = new Properties();
    consumerProps.put(ConsumerConfiguration.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    consumerProps.put(ConsumerConfiguration.GROUP_ID_CONFIG, "consumer-group-1");
    consumerProps.put(ConsumerConfiguration.ENABLE_AUTO_COMMIT_CONFIG, "true");
    consumerProps.put(ConsumerConfiguration.SESSION_TIMEOUT_MS_CONFIG, "30000");
    consumerProps.put(ConsumerConfiguration.CLIENT_ID_CONFIG, "consumer-id-sample");
    // Configure isolation level as read_commited in order to consume transaction messages
    consumerProps.put(ConsumerConfiguration.ISOLATION_LEVEL_CONFIG, "read_committed");
    return new DatabusConsumer<>(consumerProps, new ByteArrayDeserializer());
}

public Producer<byte[]> getProducer() {
    final Map config = new HashMap<String, Object>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-id-sample");
    config.put(ProducerConfig.LINGER_MS_CONFIG, "100");
    config.put(ProducerConfig.BATCH_SIZE_CONFIG, "150000");
    // Configure transactional Id and transaction timeout to produce transactional messages
    config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-transactional-id-sample");
    config.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "7000");
    config.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "5000");
    return new DatabusProducer<>(config, new ByteArraySerializer());
}

DatabusConsumer receives the following basic configuration:

Config Parameter Name Description
BOOTSTRAP_SERVERS_CONFIG The Kafka broker and port to listen.
GROUP_ID_CONFIG The consumer group associated.
ENABLE_AUTO_COMMIT_CONFIG If auto-commit will be enabled or not.
SESSION_TIMEOUT_MS_CONFIG The heartbeat interval in ms to check if the Kafka broker is alive.
CLIENT_ID_CONFIG The related clientId.

And this configuration parameter to consume transactions messages:

Config Parameter Name Description
ISOLATION_LEVEL_CONFIG Controls how to read messages written transactionally. If set to read_committed , consumer.poll() will only return transactional messages which have been committed.

DatabusProducer receives the following basic configuration:

Config Parameter Name Description
BOOTSTRAP_SERVERS_CONFIG The Kafka broker and port to listen.
CLIENT_ID_CONFIG The related clientId.
LINGER_MS_CONFIG The amount of time in ms to wait for additional messages before sending the current batch.
BATCH_SIZE_CONFIG the amount of memory in bytes (not messages!) that will be used for each batch.

Then add the configurations parameter to produce transactions messages:

Config Parameter Name Description
TRANSACTIONAL_ID_CONFIG This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions.
TRANSACTION_TIMEOUT_CONFIG The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.

After invoking the getProducer() and getConsumer() methods, the consumer subscribes to a topic in the following line:

this.consumer.subscribe(Collections.singletonList(consumerTopic));

Then, the TransactionConsumerProducerExample() constructor is executed and the startExample() method is called. This method calls two internal methods for the producer and consumer, getConsumerTask() and getProducerTask(). Both methods execute threads, in order to produce and consume messages respectively.

Both methods are explained in detail below:

getConsumerTask()

private Runnable getConsumerTask() {
        return () -> {
            try {
                LOG.info("Consumer started");
                while (!closed.get()) {

                    // Polling the databus
                    final ConsumerRecords<byte[]> records = consumer.poll(CONSUMER_TIME_CADENCE_MS);

                    // Iterate records
                    for (ConsumerRecord<byte[]> record : records) {

                        // Get headers as String
                        final StringBuilder headers = new StringBuilder().append("[");
                        record.getHeaders().getAll().forEach((k, v) -> headers.append("[" + k + ":" + v + "]"));
                        headers.append("]");

                        LOG.info("[CONSUMER <- KAFKA][MSG RCEIVED] ID " + record.getKey() +
                                " TOPIC:" + record.getComposedTopic() +
                                " KEY:" + record.getKey() +
                                " PARTITION:" + record.getPartition() +
                                " OFFSET:" + record.getOffset() +
                                " TIMESTAMP:" + record.getTimestamp() +
                                " HEADERS:" + headers +
                                " PAYLOAD:" + new String(record.getMessagePayload().getPayload()));
                    }
                    //consumer.commitSync();
                    consumer.commitAsync();
                }
            } catch (Exception e) {
                LOG.error(e.getMessage());
            } finally {
                consumer.unsubscribe();
                consumer.close();
                LOG.info("Consumer closed");
            }
        };
    }

The consumer thread runs, polling for produced records, until the sample stops or an exception is triggered.

final ConsumerRecords<byte[]> records = consumer.poll(CONSUMER_TIME_CADENCE_MS);

The CONSUMER_TIME_CADENCE_MS is the time, in ms, spent waiting to poll for data.

When the poll completes, the consumer logs the data of received messages and calls the commit method.

consumer.commitAsync();

commitAsync(), commits the last offset and carry on.

When the sample stops, the unsubscribe and close method of the consumer are invoked.

These methods do the following:

  • Unsubscribe from topics currently subscribed.
  • Close the consumer. This will close the network connections and sockets.
consumer.unsubscribe();
consumer.close();

getProducerTask()

private Runnable getProducerTask() {
        return () -> {
            LOG.info("Producer started");
            producer.initTransactions();
            while (!closed.get()) {
                try {

                    // Start Transaction
                    producer.beginTransaction();

                    LOG.info("[TRANSACTION BEGIN]");

                    // Send Transaction messages
                    for (int i = 0; i < TRANSACTION_MESSAGES_NUMBER; i++) {
                        // Prepare a record
                        String message = "Hello World at:" + LocalDateTime.now() + "-" + i;

                        // user should provide the encoding
                        final byte[] payload = message.getBytes(Charset.defaultCharset());
                        final ProducerRecord<byte[]> producerRecord = getProducerRecord(producerTopic, payload);

                        // Send the record
                        producer.send(producerRecord);
                        LOG.info("[PRODUCER -> KAFKA][SENDING MSG] ID " + producerRecord.getRoutingData().getShardingKey() +
                                " TOPIC:" + TopicNameBuilder.getTopicName(producerTopic, null) +
                                " PAYLOAD:" + message);
                    }

                    // Commit transaction
                    producer.commitTransaction();

                    LOG.info("[TRANSACTION COMMITTED SUCCESSFUL]");
                } catch (Exception e) {
                    // In case of exceptions, just abort the transaction.
                    LOG.info("[TRANSACTION ERROR][ABORTING TRANSACTION] CAUSE " + e.getMessage());
                    producer.abortTransaction();
                }

                justWait(PRODUCER_TIME_CADENCE_MS);
            }

            producer.flush();
            producer.close();
            LOG.info("Producer closed");
        };
    }
The Producer thread runs, producing records in a transaction,
until the sample stops or an exception occurs.

First, the producer invokes the initTransactions() method to enable transactions in the producer.

Next, in the loop, it invokes the beginTransactionsMethod() to start a new Transaction.

After this, the producer creates a batch of messages (with an associated producer record) to send in the transaction. The number of messages created for the transaction is determined by the value of the TRANSACTION_MESSAGES_NUMBER. Each producer record is created by invoking the getProducerRecord() method.

public ProducerRecord<byte[]> getProducerRecord(final String topic, final byte[] payload) {
    String key = String.valueOf(System.currentTimeMillis());
    RoutingData routingData = new RoutingData(topic, key, null);
    Headers headers = null;
    MessagePayload<byte[]> messagePayload = new MessagePayload<>(payload);
    return new ProducerRecord<>(routingData, headers, messagePayload);
}

In this method a ProducerRecord instance is created, specifying a RoutingData object with topic and key, a Headers object, and a MessagePayload object with the message content.

At this point, the message is sent by invoking the following method.

producer.send(producerRecord, new MyCallback(producerRecord.getRoutingData().getShardingKey()));

This method sends a producer record and associates a callback for each sent record. The callback is used because send is asynchronous and this method will return immediately once the record has been stored in the buffer of records waiting to be sent. This allows sending many records in parallel without blocking to wait for the response after each one. Fully non-blocking usage can make use of the callback parameter to provide a callback that will be invoked once the request is complete.

When all messages are sent the commitTransaction() method is called. This commits the ongoing transaction and will flush any unsent records before actually committing the transaction.

After each send, the justWait() method is invoked to wait prior to producing a new record. PRODUCER_TIME_CADENCE_MS is the time in ms that the producer waits prior to sending a new message.

Finally, when the sample stops, the flush and close methods are invoked.

producer.flush();
producer.close();

The flush method method makes all buffered records immediately available to send and blocks on the completion of the requests associated with these records. Flush gives a convenient way to ensure all previously sent messages have actually completed.

The close method closes the producer and frees resources such as connections, threads, and buffers associated with the producer.

If for any reason a transaction fails, the abortTransaction() method is invoked. At this point, any unflushed messages will be aborted.

Run the sample

Prerequisites

  • Java Development Kit 8 (JDK 8) or later.

Running

To run this sample execute the runsample script as follows:

$ ./runsample sample.TransactionConsumerProducerExample

The output shows:

Zookeeper node started: localhost:2181
Kafka broker started: localhost:9092
Kafka broker started: localhost:9093
Kafka broker started: localhost:9094
Created topic topic1.
Consumer started
Producer started
[TRANSACTION BEGIN]
[PRODUCER -> KAFKA][SENDING MSG] ID 1569250588449 TOPIC:topic1 PAYLOAD:Hello World at:2019-09-23T11:56:28.449-0
[PRODUCER -> KAFKA][SENDING MSG] ID 1569250588449 TOPIC:topic1 PAYLOAD:Hello World at:2019-09-23T11:56:28.449-1
[PRODUCER -> KAFKA][SENDING MSG] ID 1569250588450 TOPIC:topic1 PAYLOAD:Hello World at:2019-09-23T11:56:28.450-2
[PRODUCER -> KAFKA][SENDING MSG] ID 1569250588450 TOPIC:topic1 PAYLOAD:Hello World at:2019-09-23T11:56:28.450-3
[PRODUCER -> KAFKA][SENDING MSG] ID 1569250588450 TOPIC:topic1 PAYLOAD:Hello World at:2019-09-23T11:56:28.450-4
[TRANSACTION COMMITTED SUCCESSFUL]
[CONSUMER <- KAFKA][MSG RECEIVED] ID 1569250588449 TOPIC:topic1 KEY:1569250588449 PARTITION:2 OFFSET:5 TIMESTAMP:1569250588449 HEADERS:[] PAYLOAD:Hello World at:2019-09-23T11:56:28.449-0
[CONSUMER <- KAFKA][MSG RECEIVED] ID 1569250588449 TOPIC:topic1 KEY:1569250588449 PARTITION:2 OFFSET:6 TIMESTAMP:1569250588450 HEADERS:[] PAYLOAD:Hello World at:2019-09-23T11:56:28.449-1
[CONSUMER <- KAFKA][MSG RECEIVED] ID 1569250588450 TOPIC:topic1 KEY:1569250588450 PARTITION:2 OFFSET:7 TIMESTAMP:1569250588450 HEADERS:[] PAYLOAD:Hello World at:2019-09-23T11:56:28.450-2
[CONSUMER <- KAFKA][MSG RECEIVED] ID 1569250588450 TOPIC:topic1 KEY:1569250588450 PARTITION:2 OFFSET:8 TIMESTAMP:1569250588450 HEADERS:[] PAYLOAD:Hello World at:2019-09-23T11:56:28.450-3
[CONSUMER <- KAFKA][MSG RECEIVED] ID 1569250588450 TOPIC:topic1 KEY:1569250588450 PARTITION:2 OFFSET:9 TIMESTAMP:1569250588450 HEADERS:[] PAYLOAD:Hello World at:2019-09-23T11:56:28.450-4