OpenDXL streaming client can be invoked from command line for testing or
checking purposes. Executing CLI like a standard Java library with no
arguments, will output the help:
$ java -jar opendxlstreamingclient-java-sdk-<VERSION>.jar
ERROR: There are no options
Option (* = required) Description
--------------------- -----------
--auth-url <String: auth-url> The URL to authorization service.
--cg <String: cg> The consumer group name.
--config <String: config> The consumer configuration.
--consume-timeout <String: domain> Consume Poll Timeout. Time that the channel waits for
new records during a consume operation. Optional
parameter. (default: 0)
--consumer-id <String: consumer-id> Consumer Id
--consumer-prefix <String: consumer- Consumer path prefix. (default:
prefix> /databus/consumer-service/v1)
--cookie <String: cookie> Cookie value
--domain <String: domain> Cookie domain value
--http-proxy <String: http-proxy> Http Proxy settings in comma-separated format:
enabled (true/false), host (URL format),
port (integer), username (string),
password (string). Enabled, host and port are
mandatory. (default: no proxy). Example:
true,<0.0.0.0>,<PORT>,<USERNAME>,<PASSWORD>
where <0.0.0.0> must be replaced by the HttpProxy
IP Address or hostname
* --operation <String: operation> Operations: login | create | subscribe | consume
| commit | subscriptions | delete | produce
--password <String: password> The password to send to authorization
service.
--producer-prefix <String: producer- Producer path prefix. (default:
prefix> /databus/cloudproxy/v1)
--records <String: producer-records> Array of records to be produced in a
simplified JSON format.
--retry <String: retry> Retry on fail. (default: true)
--token <String: token> The authorized token.
--topic <String: topic> Comma-separated topic list to
subscribe to: topic1,topic2,...,
topicN.
--url <String: url> The URL to hit consumer service.
--user <String: user> The user name to send to authorization
service.
--verify-cert-bundle <String: verify- The ca certificate. (default: "", e.g.: empty string)
cert-bundle>
Supported Operations
In order to get records from Streaming Service, the user has to invoke a
few CLI operations. Operations arguments are placed after
--operation
option. For instance:
$ java -jar opendxlstreamingclient-java-sdk-<VERSION>.jar --operation <OPERATION_ARGUMENT> ...
Operation Arguments
Operation Arguments |
Description |
login |
get a identity token |
create |
create a consumer |
subscribe |
subscribe to topics |
consume |
poll records from Streaming Service |
commit |
mark records as read for the consumer |
delete |
close and erase a consumer |
subscriptions |
get the list of topics subscribed to |
produce |
send records to Streaming Service |
The sequence invocation of CLI operations to get records assuming a
token has already been received is: create
-> subscribe
->
consume
. Because of the polling model, consume
operation should
be called repeatedly to read the following records.
In order to send records to Streaming Service, the produce
operation
has to be used. Unlike consume
operation, the produce
operation
can be called immediately since it is a stateless operation and it is
completely independent of consume
ones. Like all other operations, a
token is also required.
login
It is an operation argument to get an identity token. The token is a
prerequisite for the rest of the CLI operations.
Mandatory Arguments for login |
Description |
--auth-url |
Identity REST service which should return a token |
--user |
Identity REST service user |
--password |
Identity REST service password |
Optional Arguments for login |
Description |
--verify-cert-bundle |
Certificate filename or certificate in String format to be able to hit Identity REST service |
--http-proxy |
Http Proxy data: whether enabled, FQDN or IP Address, TCP Port. Optional username and password |
example
$ java -jar opendxlstreamingclient-java-sdk-<VERSION>.jar \
--operation login \
--auth-url http://<0.0.0.0>:<PORT>/login \
--user <USER_NAME> \
--password <PASSWORD> \
--verify-cert-bundle cerf-file-name.crt
{
"code": "200",
"result": <TOKEN>,
"options": {
"password": ["PASSWORD"],
"auth-url": ["http://<0.0.0.0>:<PORT>/login"],
"verify-cert-bundle": ["cerf-file-name.crt"],
"user": ["USER_NAME"],
"http-proxy": [""]
}
}
create
It is an operation argument to create a consumer that will be part of a
specific Consumer Group and will have specific configuration.
Mandatory Arguments for create |
Description |
--url |
Streaming Service base URL |
--token |
Identity token gotten by login operation |
--cg |
Consumer Group name |
Optional Arguments for create |
Description |
--config |
Consumer configuration, a string of comma separated values Kafka Consumer properties, e.g.:
max.message.size=1000,min.message.size=200,auto.offset.reset=latest,
session.timeout.ms=300000,request.timeout.ms=310000,
enable.auto.commit=false |
--consumer-prefix |
Consumer prefix URL path. If not present, then its default value will be used instead. Default value: /databus/consumer-service/v1 |
--verify-cert-bundle |
Certificate file name or certificate in String format to be able to hit Streaming Service |
--http-proxy |
Http Proxy data: whether enabled, FQDN or IP Address, TCP Port. Optional username and password |
example
java -jar opendxlstreamingclient-java-sdk-<VERSION>.jar \
--operation create \
--url http://<0.0.0.0>:<PORT>/streaming \
--consumer-prefix /v1 \
--token <TOKEN> \
--cg cg1 \
--verify-cert-bundle cert-file-name.crt
{
"code": "200",
"result": {
"consumerId": "6d1dfd66-61f2-4525-ae70-e00ca6a9ffd78e9c79e2-26e6-4cd7-8e2d-75ce2a868cd7",
"cookie": {
"value": "ee92c43b218c",
"domain": "domain"
}
},
"options": {
"cg": ["cg1"],
"consumer-prefix": ["/v1"],
"verify-cert-bundle": ["cert-file-name.crt"],
"http-proxy": [""],
"retry": ["true"],
"url": ["http://<0.0.0.0>:<PORT>/streaming"],
"token": [<TOKEN>]
}
}
subscribe
It is an operation argument to set the topics from which to receive
records. If operation is successful, then its response is HTTP 204 No
Content without body.
Mandatory Arguments for subscribe |
Description |
--topic |
One or more comma-separated topic names to subscribe to |
--consumer-id |
Consumer Id previously obtained by create operation |
--cookie |
Cookie value obtained by create operation |
--domain |
Cookie Domain value obtained by create operation |
--url |
Streaming Service base URL |
--token |
Identity token gotten by login operation |
Optional Arguments for subscribe |
Description |
--consumer-prefix |
Consumer prefix URL path. If not present, then its default value will be used instead. Default value: /databus/consumer-service/v1 |
--verify-cert-bundle |
Certificate file name or certificate in String format to be able to hit Streaming Service |
--http-proxy |
Http Proxy data: whether enabled, FQDN or IP Address, TCP Port. Optional username and password |
example
java -jar opendxlstreamingclient-java-sdk-<VERSION>.jar \
--operation subscribe \
--topic topic-1,topic-2,topic-3 \
--consumer-id 6d1dfd66-61f2-4525-ae70-e00ca6a9ffd78e9c79e2-26e6-4cd7-8e2d-75ce2a868cd7 \
--cookie ee92c43b218c \
--domain domain \
--url http://<0.0.0.0>:<PORT>/consumer-service/v1 \
--token <TOKEN> \
--consumer-prefix /databus/consumer-service/v1
{
"code":"204",
"result":"",
"options":{
"cookie":["ee92c43b218c"],
"consumer-prefix":["/databus/consumer-service/v1"],
"domain":["domain"],
"consumer-id":["6d1dfd66-61f2-4525-ae70-e00ca6a9ffd78e9c79e2-26e6-4cd7-8e2d-75ce2a868cd7"],
"verify-cert-bundle":[""],
"topic":["topic-1,topic-2,topic-3"],
"http-proxy":[""],
"url":["http://<0.0.0.0>:<PORT>/databus/consumer-service/v1"],
"token":[<TOKEN>]
}
}
subscriptions
It is an operation argument to get the topics that the channel is
already subscribed to. If operation is successful, then its response is
HTTP 200 with a body showing the subscribed topics.
Mandatory Arguments for subscriptions |
Description |
--consumer-id |
Consumer Id previously obtained by create operation |
--cookie |
Cookie value obtained by create operation |
--domain |
Cookie Domain value obtained by create operation |
--url |
Streaming Service base URL |
--token |
Identity token gotten by login operation |
Optional Arguments for subscriptions |
Description |
--consumer-prefix |
Consumer prefix URL path. If not present, then its default value will be used instead. Default value: /databus/consumer-service/v1 |
--verify-cert-bundle |
Certificate file name or certificate in String format to be able to hit Streaming Service |
--http-proxy |
Http Proxy data: whether enabled, FQDN or IP Address, TCP Port. Optional username and password |
example
java -jar opendxlstreamingclient-java-sdk-<VERSION>.jar \
--operation subscriptions \
--url http://<0.0.0.0>:<PORT>/ \
--token <TOKEN> \
--consumer-id 6d1dfd66-61f2-4525-ae70-e00ca6a9ffd78e9c79e2-26e6-4cd7-8e2d-75ce2a868cd7 \
--cookie ee92c43b218c \
--domain domain
{
"code":"200",
"result":[
"topic-2-5ca969eb-2757-46ed-bc3f-f9266ccccea7-group0",
"topic-3-5ca969eb-2757-46ed-bc3f-f9266ccccea7-group0",
"topic-1-5ca969eb-2757-46ed-bc3f-f9266ccccea7-group0"
],
"options":{
"cookie":["ee92c43b218c"],
"consumer-prefix":["/databus/consumer-service/v1"],
"verify-cert-bundle":[""],
"consumer-id":["6d1dfd66-61f2-4525-ae70-e00ca6a9ffd78e9c79e2-26e6-4cd7-8e2d-75ce2a868cd7"],
"domain":["domain"],
"http-proxy":[""],
"url":["http://<0.0.0.0>:<PORT>/"],
"token":[<TOKEN>]
}
}
consume
It is an operation argument to get the new records received by the
subscribed topics since last call to commit. If operation is successful,
then its response is HTTP 200 with a body showing the consumed records.
Mandatory Arguments for consume |
Description |
--consumer-id |
Consumer Id previously obtained by create operation |
--cookie |
Cookie value obtained by create operation |
--domain |
Cookie Domain value obtained by create operation |
--url |
Streaming Service base URL |
--token |
Identity token gotten by login operation |
Optional Arguments for consume |
Description |
--consumer-timeout |
Time limit to wait for consumer records. If no records are available when the request begins, then request will wait up to this time for new records to arrive. |
--verify-cert-bundle |
Certificate file name or certificate in String format to be able to hit Streaming Service |
--http-proxy |
Http Proxy data: whether enabled, FQDN or IP Address, TCP Port. Optional username and password |
example
java -jar opendxlstreamingclient-java-sdk-<VERSION>.jar \
--operation consume \
--url http://<0.0.0.0>:<PORT>/ \
--token <TOKEN> \
--consumer-id 6d1dfd66-61f2-4525-ae70-e00ca6a9ffd78e9c79e2-26e6-4cd7-8e2d-75ce2a868cd7 \
--cookie ee92c43b218c \
--domain domain
{
"code":"200",
"result":[
{
"routingData":{
"topic":"topic1-5ca969eb-2757-46ed-bc3f-f9266ccccea7",
"shardingKey":"pool-1-thread-1-0-1"},
"message":{
"headers":{
"sourceId":"abc",
"scope":"algo",
"tenantId":"5ca969eb-2757-46ed-bc3f-f9266ccccea7",
"zoneId":"TMP.Identity.TRUCHATOR"
},
"payload":"SGVsbG8gV29ybGQgYXQ6MjAxOS0wNS0xNFQxNjoyNDoxMC4xNDAgRXh0cmE6IA=="
},
"partition":1,"offset":3
},
{
"routingData":{
"topic":"topic1-5ca969eb-2757-46ed-bc3f-f9266ccccea7",
"shardingKey":"pool-1-thread-1-0-2"},
"message":{
"headers":{
"sourceId":"abc",
"scope":"algo",
"tenantId":"5ca969eb-2757-46ed-bc3f-f9266ccccea7",
"zoneId":"TMP.Identity.TRUCHATOR"
},
"payload":"SGVsbG8gV29ybGQgYXQ6MjAxOS0wNS0xNFQxNjoyNDoxMC4xNDEgRXh0cmE6IA=="
},
"partition":3,"offset":0
}
],
"options":{
"cookie":["ee92c43b218c"],
"consumer-prefix":["/databus/consumer-service/v1"],
"verify-cert-bundle":[""],
"consumer-id":["6d1dfd66-61f2-4525-ae70-e00ca6a9ffd78e9c79e2-26e6-4cd7-8e2d-75ce2a868cd7"],
"domain":["domain"],
"consume-timeout":[""],
"http-proxy":[""],
"url":["http://<0.0.0.0>:<PORT>/"],
"token":[<TOKEN>]
}
}
commit
It is an operation argument to set the last consumed records to
successfully processed by this consumer group. Consequently, these last
consumed records will not be present in the response to next consume
operation execution.
Note that consume and commit are operations done on a consumer group
basis. If a consumer of a given consumer group consumes records and
commits afterwards, then next consume operations from any consumer of
such consumer group will no longer contain those records. However, those
records are still available to consumers of other consumer groups as
long as they have not consumed them yet and their retention period has
not expired.
Mandatory Arguments for commit |
Description |
--consumer-id |
Consumer Id previously obtained by create operation |
--cookie |
Cookie value obtained by create operation |
--domain |
Cookie Domain value obtained by create operation |
--url |
Streaming Service base URL |
--token |
Identity token gotten by login operation |
Optional Arguments for commit |
Description |
--consumer-prefix |
Consumer prefix URL path. If not present, then its default value will be used instead. Default value: /databus/consumer-service/v1 |
--verify-cert-bundle |
Certificate file name or certificate in String format to be able to hit Streaming Service |
--http-proxy |
Http Proxy data: whether enabled, FQDN or IP Address, TCP Port. Optional username and password |
example
java -jar opendxlstreamingclient-java-sdk-<VERSION>.jar \
--operation commit \
--url http://<0.0.0.0>:<PORT>/ \
--token <TOKEN> \
--consumer-id 6d1dfd66-61f2-4525-ae70-e00ca6a9ffd78e9c79e2-26e6-4cd7-8e2d-75ce2a868cd7 \
--cookie ee92c43b218c \
--domain domain
{
"code":"204",
"result":"",
"options":{
"cookie":["ee92c43b218c"],
"consumer-prefix":["/databus/consumer-service/v1"],
"verify-cert-bundle":[""],
"consumer-id":["6d1dfd66-61f2-4525-ae70-e00ca6a9ffd78e9c79e2-26e6-4cd7-8e2d-75ce2a868cd7"],
"domain":["domain"],
"http-proxy":[""],
"url":["http://<0.0.0.0>:<PORT>/"],
"token":[<TOKEN>]
}
}
delete
It is an operation argument to delete the consumer identified by its
consumer-id. Once deleted, the consumer will fail in all subsequent
operations, e.g.: such consumer-id can no longer subscribe, get
subscriptions, consume or commit.
Mandatory Arguments for delete |
Description |
--consumer-id |
Consumer Id previously obtained by create operation |
--cookie |
Cookie value obtained by create operation |
--domain |
Cookie Domain value obtained by create operation |
--url |
Streaming Service base URL |
--token |
Identity token gotten by login operation |
Optional Arguments for delete |
Description |
--consumer-prefix |
Consumer prefix URL path. If not present, then its default value will be used instead. Default value: /databus/consumer-service/v1 |
--verify-cert-bundle |
Certificate file name or certificate in String format to be able to hit Streaming Service |
--http-proxy |
Http Proxy data: whether enabled, FQDN or IP Address, TCP Port. Optional username and password |
example
java -jar opendxlstreamingclient-java-sdk-<VERSION>.jar \
--operation delete \
--url http://<0.0.0.0>:<PORT>/ \
--token <TOKEN> \
--consumer-id 6d1dfd66-61f2-4525-ae70-e00ca6a9ffd78e9c79e2-26e6-4cd7-8e2d-75ce2a868cd7 \
--cookie ee92c43b218c \
--domain domain
{
"code":"204",
"result":"",
"options":{
"cookie":["ee92c43b218c"],
"consumer-prefix":["/databus/consumer-service/v1"],
"verify-cert-bundle":[""],
"consumer-id":["6d1dfd66-61f2-4525-ae70-e00ca6a9ffd78e9c79e2-26e6-4cd7-8e2d-75ce2a868cd7"],
"domain":["domain"],
"http-proxy":[""],
"url":["http://<0.0.0.0>:<PORT>/"],
"token":[<TOKEN>]
}
}
produce
It is an operation argument to send records to the Streaming Service.
Successfully sent records will be available at the Streaming Service for
consumption.
A record consists of four attributes: topic, payload, shardingKey and
headers. Since the produce
operation can send many records in a
single call, its --records
parameter takes an array of records. CLI
expects these records in a simplified JSON format. Then, the produce
operation reshapes them into the suitable data type expected by the
Streaming Service. Topic and payload are mandatory parameters for
Streaming Service, while shardingKey and headers are optional.
Mandatory Arguments for produce |
Description |
--url |
Streaming Service base URL |
--token |
Identity token gotten by login operation |
--records |
JSON array of simplified Producer Records |
Optional Arguments for produce |
Description |
--producer-prefix |
Producer prefix URL path. If not present, then its default value will be used instead. Default value: /databus/cloud proxy/v1 |
--verify-cert-bundle |
Certificate file name or certificate in String format to be able to hit Streaming Service |
--http-proxy |
Http Proxy data: whether enabled, FQDN or IP Address, TCP Port. Optional username and password |
example
java -jar opendxlstreamingclient-java-sdk-<VERSION>.jar \
--operation produce \
--url http://<0.0.0.0>:<PORT>/ \
--token <TOKEN> \
--records '[{"topic":"topic1","payload":"HelloOpenDXL-1"},{"topic":"my-topic","payload":"HelloOpenDXL-2","shardingKey":"101418986","headers":{"sourceId":"D5452543-E2FB-4585-8BE5-A61C3636819C","anotherId":"valueOfAnotherId"}}]'
{
"code":"204",
"result":"",
"options":{
"records":["[{\"topic\":\"topic1\",\"payload\":\"HelloOpenDXL-1\"},{\"topic\":\"my-topic\",\"payload\":\"HelloOpenDXL-2\",\"shardingKey\":\"101418986\",\"headers\":{\"sourceId\":\"D5452543-E2FB-4585-8BE5-A61C3636819C\",\"anotherId\":\"valueOfAnotherId\"}}]"],
"verify-cert-bundle":[""],
"producer-prefix": ["/databus/cloudproxy/v1"],
"http-proxy":[""],
"url":["http://<0.0.0.0>:<PORT>/"],
"token":[<TOKEN>]
}
}