Basic Consume user-password-based authentication

This sample demonstrates how to establish a channel connection to the DXL streaming service by using user and password credentials authentication mechanism. Once the connection is established, the sample repeatedly consumes and displays available records for the consumer group.

Code highlights are shown below:

Sample Code

final String channelUsername = "me";
final String channelPassword = "secret";
static Logger logger = Logger.getLogger(ConsumeRecordsWithUserPass.class);
// Gather the Http Proxy attributes, e.g.: FQDN or IP Address, TCP port, username, password,
// in a single plain Java object
httpProxySettings = new HttpProxySettings(PROXY_ENABLED,

// Create a new Channel object
Channel channel = new ChannelBuilder(channelUrl,
                        new ChannelAuthUserPass(channelUrl,

// Create object which processCallback() method will be called back upon by the run method (see below)
// when records are received from the channel
ConsumerRecordProcessor consumerRecordCallback = new ConsumerRecordProcessor() {

    public boolean processCallback(ConsumerRecords consumerRecords, String consumerId) {
        // Print the received payloads. 'payloads' is a list of
        // dictionary objects extracted from the records received
        // from the channel. StringBuilder("Received ")
                .append(" records")

        for (ConsumerRecords.ConsumerRecord record : consumerRecords.getRecords()) {

  "topic = " + record.getTopic());
  "partition = " + record.getPartition());
  "offset = " + record.getOffset());
  "sharding key = " + record.getShardingKey());
  "headers = " + record.getHeaders());
  "payload = " + record.getPayload());
  "decoded payload = " + new String(record.getDecodedPayload()));


        // Return 'True' in order for the 'run' call to continue attempting to consume records."let commit records");
        return true;

// Consume records indefinitely, channelTopicSubscriptions);

The first step is to create a Channel instance, which establishes a channel to the streaming service. The channel includes the URL to the streaming service, channelUrl, and credentials that the client uses to authenticate itself to the service, channelUsername and channelPassword. It also includes a certificate, verifyCertificateBundle , and HTTP proxy settings httpProxySettings created previously.

The example defines a consumerRecordCallback instance which is invoked with the consumerRecords extracted from records consumed from the channel. The consumerRecordCallback function outputs the contents of the each record, its metadata and returns true to indicate that the channel should continue consuming records. Note that if the consumerRecordCallback function were to instead return false, the run() method would stop polling the service for new records and would instead return.

The final step is to call the run() method. The run() method establishes a consumer instance with the service, subscribes the consumer instance for records delivered to the topics included in the channelTopicSubscriptions variable, and continuously polls the streaming service for available records. The records from any records which are received from the streaming service are passed in a call to the consumerRecordCallback instance. Note that if no records are received from a poll attempt, an empty list of records is passed into the consumerRecordCallback function.

As records are received by the sample, the contents of the messages should be displayed to the output window. The output should appear similar to the following:

topic = topic1
partition = 0
offset = 4
sharding key = pool-1-thread-1-0-0
headers = {sourceId=abc, scope=algo, tenantId=5ca969eb-2757-46ed-bc3f-f9266ccccea7, zoneId=TMP.Identity.TRUCHATOR}
decoded payload = Hello World at:2019-05-07T16:02:18.667 Extra: B489M93SHUD0NUMV3ZVJMN76BR4NGPA84R3IR5GSC0M9Y1XOQHJ3278LK66PZX588AN6Z1229JEA86X608KILCH73HTRJD2RSJND

topic = topic1
partition = 0
offset = 5
sharding key = pool-1-thread-1-0-0
headers = {sourceId=abc, scope=algo, tenantId=5ca969eb-2757-46ed-bc3f-f9266ccccea7, zoneId=TMP.Identity.TRUCHATOR}
decoded payload = Hello World at:2019-05-07T16:02:22.078 Extra: 991V07AN9GN9DNN5XEJ6COMO40SVQTRSFVXQFAYQ5Z4EWZZ0NWVEQ4IZVNZO9NFLQ19JTL6CYF5RVWDIEJOBC7939O0SND98ZJMX

Run the sample


  • A DXL streaming service is available for the sample to connect to.
  • Credentials for a consumer are available for use with the sample.


Modify the example to include the appropriate settings for the streaming service channel:

private static final String CHANNEL_URL = "";
private static final String USER_NAME = "me";
private static final String USER_PASSWORD = "password";
private static final String CONSUMER_GROUP = "sample_consumer_group";
private static final String VERIFY_CERTIFICATE_BUNDLE = "/mycert.crt";
private static final List<String> TOPICS = Arrays.asList("topic1");

private static final boolean PROXY_ENABLED = true;
private static final String PROXY_HOST = "";
private static final int PROXY_PORT = 8080;
private static final String PROXY_USR = "";
private static final String PROXY_PWD = "";


To run this sample execute the runsample script as follows:

$ ./runsample sample.ConsumeRecordsWithUserPass

The initial line in the output window should be similar to the following:

INFO [main] ( - Channel is running

As records are received by the sample, the contents of the message payloads should be displayed to the output window.

Received 15 records
topic = topic1
partition = 5
offset = 13
sharding key = 123
headers = {, tenantId=DBB1FA1E-6A68-4837-982E-FB8D839FF4DA, zoneId=TMP.Identity.TRUCHATOR}
payload = SGVsbG8sIFdvcmxkLg==
decoded payload = Hello, World.

topic = topic1
partition = 5
offset = 14
sharding key = 123
headers = {, tenantId=DBB1FA1E-6A68-4837-982E-FB8D839FF4DA, zoneId=TMP.Identity.TRUCHATOR}
payload = SGVsbG8sIFdvcmxkLg==
decoded payload = Hello, World.

topic = topic1
partition = 5
offset = 15
sharding key = 123
headers = {, tenantId=DBB1FA1E-6A68-4837-982E-FB8D839FF4DA, zoneId=TMP.Identity.TRUCHATOR}
payload = SGVsbG8sIFdvcmxkLg==
decoded payload = Hello, World.

topic = topic1
partition = 5
offset = 16
sharding key = 123
headers = {, tenantId=DBB1FA1E-6A68-4837-982E-FB8D839FF4DA, zoneId=TMP.Identity.TRUCHATOR}
payload = SGVsbG8sIFdvcmxkLg==
decoded payload = Hello, World.