Basic Produce and Consume using token-based authentication¶
This sample demonstrates how to produce records to and to consume them
from the DXL streaming service using Producer
and Consumer
objects respectively. Producer objects implement the Producer
interface which exposes just the producer methods (e.g.: produce) of
Channel class. Consumer objects implement the Consumer
interface
which exposes the Channel class consumer methods (e.g.: create,
subscribe, subscriptions, consume, commit, delete). Two builders,
ProducerBuilder
and ConsumerBuilder
, are available to
respectively instantiate Producer
and Consumer
objects which
internally are Channel objects initialized to either producer or
consume.
Rationale to use Producer
and Consumer
objects instead of
Channel
is to avoid trying to consume using a Channel
object
which was not properly set up. To discuss this in more detail, let’s
assume a Channel
object is instantiated to just produce, so
Channel
constructor just sets produce parameters, like
producerPathPrefix
, and no consumer ones. However, such Channel
object also exposes its consume methods although it was not properly
initialized to call them. If its create()
method was called, then an
error would be thrown because no consumer group had been set. If a
Producer
object is used instead of a Channel
object, then such
error will never take place because Producer
does not expose
create()
nor other consume related API.
Code highlights are shown below:
Sample Code¶
...
// topics to consume from
static final List<String> CONSUMER_TOPICS = Arrays.asList("case-mgmt-events",
"my-topic",
"topic-abc123",
"topic1");
// topics to produce to
static final String PRODUCER_TOPIC_1 = "my-topic";
static final String PRODUCER_TOPIC_2 = "topic1";
// logger instance
static Logger logger = Logger.getLogger(ProduceAndConsumeRecordsUsingInterfacesWithToken.class);
...
// 1st step
// Create new Consumer object
Consumer consumer = new ConsumerBuilder(channelUrl, new ChannelAuthToken(token), channelConsumerGroup)
.withRetryOnFail(true)
.withCertificateBundle(verifyCertificateBundle)
.withExtraConfigs(extraConfigs)
.withHttpProxy(new HttpProxySettings(PROXY_ENABLED,
PROXY_HOST,
PROXY_PORT,
PROXY_USR,
PROXY_PWD))
.build();
// 2nd step
// Create new Producer object
Producer producer = new ProducerBuilder(channelUrl, new ChannelAuthToken(token))
.withCertificateBundle(verifyCertificateBundle)
.withHttpProxy(new HttpProxySettings(PROXY_ENABLED,
PROXY_HOST,
PROXY_PORT,
PROXY_USR,
PROXY_PWD))
.build();
// 3rd step
// Create object which processCallback() method will be called back upon by the run method (see below)
// when records are received from the channel
ConsumerRecordProcessor consumerRecordCallback = new ConsumerRecordProcessor() {
@Override
public boolean processCallback(ConsumerRecords consumerRecords, String consumerId) {
// Print the received payloads. 'payloads' is a list of
// dictionary objects extracted from the records received
// from the channel.
logger.info(new StringBuilder("Received ")
.append(consumerRecords.getRecords().size())
.append(" records")
.toString());
for (ConsumerRecords.ConsumerRecord record : consumerRecords.getRecords()) {
logger.info("topic = " + record.getTopic());
logger.info("partition = " + record.getPartition());
logger.info("offset = " + record.getOffset());
logger.info("sharding key = " + record.getShardingKey());
logger.info("headers = " + record.getHeaders());
logger.info("payload = " + record.getPayload());
logger.info("decoded payload = " + new String(record.getDecodedPayload()));
logger.info("");
}
// Return 'True' in order for the 'run' call to continue attempting to consume records.
logger.info("let commit records");
return true;
}
};
// 4th step
// Create thread to produce records to selected topics. These records will be consumed and printed out
// by the consumerRecordCallback once consumer.run() is called.
final Thread produceThread = new Thread(new Runnable() {
@Override
public void run() {
// counter to append to produce record payloads so each payload will be unique
int recordCounter = 1;
while (!Thread.interrupted()) {
// Set up a ProducerRecords object containing two producer records.
// produce API can send many records, even to different topics, in a single call.
// Showing only record mandatory parameters, topic and payload.
final ProducerRecords producerRecords = new ProducerRecords();
producerRecords.add(
new ProducerRecords.ProducerRecord
.Builder(PRODUCER_TOPIC_1,
"Hello from OpenDXL - " + recordCounter)
.build()
);
producerRecords.add(
new ProducerRecords.ProducerRecord
.Builder(PRODUCER_TOPIC_2,
"Hello from OpenDXL - " + (recordCounter + 1))
.build()
);
try {
logger.info("produce records " + recordCounter + " and " + (recordCounter + 1));
// produce records to DXL streaming service
producer.produce(producerRecords);
recordCounter += 2;
} catch (final PermanentError | TemporaryError e) {
printError(e);
}
}
});
// Finally let Producer and Consumer run
// Produce records indefinitely
produceThread.start();
// Consume records indefinitely
final int consumePollTimeoutMs = 500;
consumer.run(consumerRecordCallback, channelTopicSubscriptions, consumePollTimeoutMs);
The first step is to create a Consumer
instance, which establishes a
channel to the streaming service. The Consumer
must include the host
and port to connect to the streaming service, channelUrl
, the TOKEN
that the client uses to authenticate itself to the service,
new ChannelAuthToken(token)
, and the consumer group,
channelConsumerGroup
. It may also include a certificate,
verifyCertificateBundle
, additional consumer configuration
parameters, extraConfigs
, HTTP proxy settings
new HttpProxySettings(...)
and a path to the streaming service
consumer API, consumerPathPrefix
. If consumerPathPrefix
is not
specified, then its default value, "/databus/consumer-service/v1"
,
will be used instead.
The second step is to create a Producer
instance, which establishes
another channel to the streaming service, this one to produce only. The
channel must include the host and port to connect to the streaming
service, channelUrl
, and TOKEN that the client uses to authenticate
itself to the service, new ChannelAuthToken(token)
. It may also
specify a path to the streaming service, producerPathPrefix
, a
certificate, verifyCertificateBundle
, and HTTP proxy settings
new HttpProxySettings(...)
. If producerPathPrefix
is not
specified, then its default value, "/databus/cloudproxy/v1"
, will be
used instead.
The third step is to define a consumerRecordCallback
instance which
is invoked with the consumerRecords
extracted from records obtained
by the Consumer
. The consumerRecordCallback
function outputs the
contents of each record and its metadata and returns true
to
indicate that the channel should continue consuming records. Note that
if the consumerRecordCallback
function were to instead return
false
, the run()
method would stop polling the service for new
records and it would return.
The fourth step is to define a produceThread
thread which will
continuously call Producer
produce method to send records to the DXL
streaming service. A ProducerRecords
object is created and it is
populated with two producer records, one with "my-topic"
topic and
another "topic1"
topic. Then this ProducerRecords
object is
passed to Producer
produce method, thus sending two records in a
single API call.
The final step is to start producerThread
and consumer
. Once
producerThread.start()
is called, records are continuously produced.
Then, consumer.run()
method establishes a consumer instance with the
DXL streaming service, subscribes the consumer instance for records
delivered to the topics
included in the
channelTopicSubscriptions
variable, and continuously polls the
streaming service for available records. The records returned by the
poll to the streaming service are passed in a call to the
consumerRecordCallback
instance. Note that if no records are
received from a poll attempt, an empty list of records is passed into
the consumerRecordCallback
function.
As records are produced and received by the sample, the contents of the messages should be displayed to the output window. The output should appear similar to the following:
...
INFO [Thread-0] ... - produce records 65 and 66
INFO [Thread-0] ... - produce records 67 and 68
INFO [main] ... - Received 3 records
INFO [main] ... - topic = my-topic
INFO [main] ... - partition = 2
INFO [main] ... - offset = 29
INFO [main] ... - sharding key = 29598919
INFO [main] ... - headers = {sourceId=D5452543-E2FB-4585-8BE5-A61C3636819C, scope=soc.etv.vi, tenantId=5ca969eb-2757-46ed-bc3f-f9266ccccea7, zoneId=TMP.Identity.TRUCHATOR}
INFO [main] ... - payload = SGVsbG8gZnJvbSBPcGVuRFhMIC0gNTk=
INFO [main] ... - decoded payload = Hello from OpenDXL - 59
INFO [main] ... -
INFO [main] ... - topic = topic1
INFO [main] ... - partition = 0
INFO [main] ... - offset = 28
INFO [main] ... - sharding key = 176927523
INFO [main] ... - headers = {sourceId=F567D6A2-500E-4D35-AE15-A707f165D4FA, scope=soc.etv.vi, tenantId=5ca969eb-2757-46ed-bc3f-f9266ccccea7, zoneId=TMP.Identity.TRUCHATOR}
INFO [main] ... - payload = SGVsbG8gZnJvbSBPcGVuRFhMIC0gNTg=
INFO [main] ... - decoded payload = Hello from OpenDXL - 58
INFO [main] ... -
INFO [main] ... - topic = my-topic
INFO [main] ... - partition = 0
INFO [main] ... - offset = 29
INFO [main] ... - sharding key = 176927523
INFO [main] ... - headers = {sourceId=F567D6A2-500E-4D35-AE15-A707f165D4FA, scope=soc.etv.vi, tenantId=5ca969eb-2757-46ed-bc3f-f9266ccccea7, zoneId=TMP.Identity.TRUCHATOR}
INFO [main] ... - payload = SGVsbG8gZnJvbSBPcGVuRFhMIC0gNjA=
INFO [main] ... - decoded payload = Hello from OpenDXL - 60
INFO [main] ... -
INFO [main] ... - let commit records
INFO [Thread-0] ... - produce records 69 and 70
INFO [Thread-0] ... - produce records 71 and 72
INFO [main] ... - Received 5 records
...
Finally, when CTRL+C is pressed, then the example will end logging the following lines:
INFO [Thread-1] (ProduceAndConsumeRecordsUsingInterfacesWithToken.java:175) - Shutdown app requested. Exiting
INFO [Thread-1] (Channel.java:844) - Channel was stopped.
Run the sample¶
Prerequisites¶
- A DXL streaming service is available for the sample to connect to.
- Credentials for producer and consumer are available for use with the sample.
Setup¶
Modify the example to include the appropriate settings for the streaming service channel:
private static final String CHANNEL_URL = "http://127.0.0.1:50080";
private static final String TOKEN = "Your_Token";
private static final List<String> CONSUMER_TOPICS = Arrays.asList("topic1", "my-topic");
private static final String CONSUMER_GROUP = "sample_consumer_group";
private static final String PRODUCER_TOPIC_1 = "my-topic";
private static final String PRODUCER_TOPIC_2 = "topic1";
private static final String VERIFY_CERTIFICATE_BUNDLE = "-----BEGIN CERTIFICATE-----"
+ "Your Certificate if nedeed"
+ "-----END CERTIFICATE-----";
private static final boolean PROXY_ENABLED = true;
private static final String PROXY_HOST = "10.20.30.40";
private static final int PROXY_PORT = 8080;
private static final String PROXY_USR = "";
private static final String PROXY_PWD = "";
Running¶
To run this sample execute the runsample script as follows:
$ ./runsample sample.ProduceAndConsumeRecordsUsingInterfacesWithToken
The initial line in the output window should be similar to the following:
INFO [main] (Channel.java:745) - Channel is running
As records are sent and received by the sample, the Producer
and
Consumer
log lines and the contents of the message payloads should
be displayed to the output window.
INFO [Thread-0] ... - produce records 1 and 2
INFO [Thread-0] ... - produce records 3 and 4
INFO [Thread-0] ... - produce records 5 and 6
INFO [main] ... - Received 2 records
INFO [main] ... - topic = my-topic
INFO [main] ... - partition = 2
INFO [main] ... - offset = 0
INFO [main] ... - sharding key = 29598919
INFO [main] ... - headers = {sourceId=D5452543-E2FB-4585-8BE5-A61C3636819C, scope=soc.etv.vi, tenantId=5ca969eb-2757-46ed-bc3f-f9266ccccea7, zoneId=TMP.Identity.TRUCHATOR}
INFO [main] ... - payload = SGVsbG8gZnJvbSBPcGVuRFhMIC0gMQ==
INFO [main] ... - decoded payload = Hello from OpenDXL - 1
INFO [main] ... -
INFO [main] ... - topic = topic1
INFO [main] ... - partition = 0
INFO [main] ... - offset = 0
INFO [main] ... - sharding key = 176927523
INFO [main] ... - headers = {sourceId=F567D6A2-500E-4D35-AE15-A707f165D4FA, scope=soc.etv.vi, tenantId=5ca969eb-2757-46ed-bc3f-f9266ccccea7, zoneId=TMP.Identity.TRUCHATOR}
INFO [main] ... - payload = SGVsbG8gZnJvbSBPcGVuRFhMIC0gMg==
INFO [main] ... - decoded payload = Hello from OpenDXL - 2
INFO [main] ... -
INFO [main] ... - let commit records
INFO [Thread-0] ... - produce records 7 and 8
INFO [Thread-0] ... - produce records 9 and 10
INFO [main] ... - Received 1 records
INFO [main] ... - topic = my-topic
INFO [main] ... - partition = 2
INFO [main] ... - offset = 1
INFO [main] ... - sharding key = 29598919
INFO [main] ... - headers = {sourceId=D5452543-E2FB-4585-8BE5-A61C3636819C, scope=soc.etv.vi, tenantId=5ca969eb-2757-46ed-bc3f-f9266ccccea7, zoneId=TMP.Identity.TRUCHATOR}
INFO [main] ... - payload = SGVsbG8gZnJvbSBPcGVuRFhMIC0gMw==
INFO [main] ... - decoded payload = Hello from OpenDXL - 3
INFO [main] ... -
INFO [main] ... - let commit records
INFO [Thread-0] ... - produce records 11 and 12
INFO [Thread-0] ... - produce records 13 and 14
INFO [main] ... - Received 4 records
INFO [main] ... - topic = topic1
INFO [main] ... - partition = 2
INFO [main] ... - offset = 2
INFO [main] ... - sharding key = 29598919
INFO [main] ... - headers = {sourceId=D5452543-E2FB-4585-8BE5-A61C3636819C, scope=soc.etv.vi, tenantId=5ca969eb-2757-46ed-bc3f-f9266ccccea7, zoneId=TMP.Identity.TRUCHATOR}
INFO [main] ... - payload = SGVsbG8gZnJvbSBPcGVuRFhMIC0gNQ==
INFO [main] ... - decoded payload = Hello from OpenDXL - 5
INFO [main] ... -
INFO [main] ... - topic = my-topic
INFO [main] ... - partition = 2
INFO [main] ... - offset = 3
INFO [main] ... - sharding key = 29598919
INFO [main] ... - headers = {sourceId=D5452543-E2FB-4585-8BE5-A61C3636819C, scope=soc.etv.vi, tenantId=5ca969eb-2757-46ed-bc3f-f9266ccccea7, zoneId=TMP.Identity.TRUCHATOR}
INFO [main] ... - payload = SGVsbG8gZnJvbSBPcGVuRFhMIC0gNw==
INFO [main] ... - decoded payload = Hello from OpenDXL - 7
INFO [main] ... -
INFO [main] ... - topic = topic1
INFO [main] ... - partition = 0
INFO [main] ... - offset = 1
INFO [main] ... - sharding key = 176927523
INFO [main] ... - headers = {sourceId=F567D6A2-500E-4D35-AE15-A707f165D4FA, scope=soc.etv.vi, tenantId=5ca969eb-2757-46ed-bc3f-f9266ccccea7, zoneId=TMP.Identity.TRUCHATOR}
INFO [main] ... - payload = SGVsbG8gZnJvbSBPcGVuRFhMIC0gNA==
INFO [main] ... - decoded payload = Hello from OpenDXL - 4
INFO [main] ... -
INFO [main] ... - topic = my-topic
INFO [main] ... - partition = 0
INFO [main] ... - offset = 2
INFO [main] ... - sharding key = 176927523
INFO [main] ... - headers = {sourceId=F567D6A2-500E-4D35-AE15-A707f165D4FA, scope=soc.etv.vi, tenantId=5ca969eb-2757-46ed-bc3f-f9266ccccea7, zoneId=TMP.Identity.TRUCHATOR}
INFO [main] ... - payload = SGVsbG8gZnJvbSBPcGVuRFhMIC0gNg==
INFO [main] ... - decoded payload = Hello from OpenDXL - 6
INFO [main] ... -
INFO [main] ... - let commit records
...