Basic Consume Example¶
This sample demonstrates how to establish a channel connection to the DXL streaming service. Once the connection is established, the sample repeatedly consumes and displays available records for the consumer group.
Prerequisites¶
- A DXL streaming service is available for the sample to connect to.
- Credentials for a consumer are available for use with the sample.
Setup¶
Modify the example to include the appropriate settings for the streaming service channel:
CHANNEL_URL = "http://127.0.0.1:50080" CHANNEL_USERNAME = "me" CHANNEL_PASSWORD = "secret" CHANNEL_CONSUMER_GROUP = "sample_consumer_group" CHANNEL_TOPIC_SUBSCRIPTIONS = ["case-mgmt-events"] # Path to a CA bundle file containing certificates of trusted CAs. The CA # bundle is used to validate that the certificate of the server being connected # to was signed by a valid authority. If set to an empty string, the server # certificate is not validated. VERIFY_CERTIFICATE_BUNDLE = ""
For testing purposes, you can use the fake_streaming_service
Python tool
embedded in the OpenDXL Streaming Client SDK to start up a local
streaming service which includes some fake data for a single preconfigured
consumer group. The initial settings in the example above include the URL,
credentials, and consumer group used by the fake_streaming_service
.
To launch the fake_streaming_service
tool, run the following command in
a command window:
python sample/fake_streaming_service.py
Messages like the following should appear in the command window:
INFO:__main__:Starting service INFO:__main__:Started service on http://mycaseserver:50080
Running¶
To run this sample execute the sample/basic/basic_consume_example.py
script
as follows:
python sample/basic/basic_consume_example.py
The initial line in the output window should be similar to the following:
2018-05-30 17:35:36,743 __main__ - INFO - Starting event loop
As records are received by the sample, the contents of the message payloads
should be displayed to the output window. Using the fake_streaming_service
,
for example, initial payloads similar to the following should appear:
2018-05-30 17:35:36,754 __main__ - INFO - Received payloads: [ { "case": { "id": "9ab2cebb-6b5f-418b-a15f-df1a9ee213f2", "name": "A great case full of malware", "priority": "Low", "url": "https://mycaseserver.com/#/cases/9ab2cebb-6b5f-418b-a15f-df1a9ee213f2" }, "entity": "case", "id": "a45a03de-5c3d-452a-8a37-f68be954e784", "nature": "", "origin": "", "tenant-id": "7af4746a-63be-45d8-9fb5-5f58bf909c25", "timestamp": "", "transaction-id": "", "type": "creation", "user": "johndoe" }, { "case": { "id": "9ab2cebb-6b5f-418b-a15f-df1a9ee213f2", "name": "A great case full of malware", "priority": "Low", "url": "https://mycaseserver.com/#/cases/9ab2cebb-6b5f-418b-a15f-df1a9ee213f2" }, "entity": "case", "id": "a45a03de-5c3d-452a-8a37-f68be954e784", "nature": "", "origin": "", "tenant-id": "7af4746a-63be-45d8-9fb5-5f58bf909c25", "timestamp": "", "transaction-id": "", "type": "priority-update", "user": "other" } ]
When no new records are available from the service, the sample should output a line similar to the following:
2018-05-30 17:39:27,895 __main__ - INFO - Received records: []
Details¶
The majority of the sample code is shown below:
# Create a new channel object with Channel(CHANNEL_URL, auth=ChannelAuth(CHANNEL_URL, CHANNEL_USERNAME, CHANNEL_PASSWORD, verify_cert_bundle=VERIFY_CERTIFICATE_BUNDLE), consumer_group=CHANNEL_CONSUMER_GROUP, verify_cert_bundle=VERIFY_CERTIFICATE_BUNDLE) as channel: # Create a function which will be called back upon by the 'run' method (see # below) when records are received from the channel. def process_callback(payloads): # Print the payloads which were received. 'payloads' is a list of # dictionary objects extracted from the records received from the # channel. logger.info("Received payloads: \n%s", json.dumps(payloads, indent=4, sort_keys=True)) # Return 'True' in order for the 'run' call to continue attempting to # consume records. return True # Consume records indefinitely channel.run(process_callback, wait_between_queries=WAIT_BETWEEN_QUERIES, topics=CHANNEL_TOPIC_SUBSCRIPTIONS)
The first step is to create a dxlstreamingclient.channel.Channel
instance, which establishes a channel to the streaming service. The channel
includes the URL to the streaming service, CHANNEL_URL
, and credentials
that the client uses to authenticate itself to the service, CHANNEL_USERNAME
and CHANNEL_PASSWORD
.
The example defines a process_callback
function which is invoked with the
payloads (a list of dictionary objects) extracted from records consumed from the
channel. The process_callback
function outputs the contents of the
payloads parameter and returns True
to indicate that the channel should
continue consuming records. Note that if the process_callback
function were
to instead return False
, the run
method would stop polling the service
for new records and would instead return.
The final step is to call the dxlstreamingclient.channel.Channel.run()
method. The run
method establishes a consumer instance with the service,
subscribes the consumer instance for events delivered to the topics
included in the CHANNEL_TOPIC_SUBSCRIPTIONS
variable, and continuously
polls the streaming service for available records. The payloads from any
records which are received from the streaming service are passed in a call to
the process_callback
function. Note that if no records are received from a
poll attempt, an empty list of payloads is passed into the process_callback
function.