Basic Consume Example¶
This sample demonstrates how to establish a channel connection to the DXL streaming service. Once the connection is established, the sample repeatedly consumes and displays available records for the consumer group.
Prerequisites¶
- A DXL streaming service is available for the sample to connect to.
- Credentials for a consumer are available for use with the sample.
Setup¶
Modify the example to include the appropriate settings for the streaming service channel:
CHANNEL_URL = "http://127.0.0.1:50080" CHANNEL_USERNAME = "me" CHANNEL_PASSWORD = "secret" CHANNEL_CONSUMER_GROUP = "sample_consumer_group" CHANNEL_TOPIC_SUBSCRIPTIONS = ["case-mgmt-events"] # Path to a CA bundle file containing certificates of trusted CAs. The CA # bundle is used to validate that the certificate of the server being connected # to was signed by a valid authority. If set to an empty string, the server # certificate is not validated. VERIFY_CERTIFICATE_BUNDLE = ""
For testing purposes, you can use the fake_streaming_service Python tool
embedded in the OpenDXL Streaming Client SDK to start up a local
streaming service which includes some fake data for a single preconfigured
consumer group. The initial settings in the example above include the URL,
credentials, and consumer group used by the fake_streaming_service.
To launch the fake_streaming_service tool, run the following command in
a command window:
python sample/fake_streaming_service.py
Messages like the following should appear in the command window:
INFO:__main__:Starting service INFO:__main__:Started service on http://mycaseserver:50080
Running¶
To run this sample execute the sample/basic/basic_consume_example.py script
as follows:
python sample/basic/basic_consume_example.py
The initial line in the output window should be similar to the following:
2018-05-30 17:35:36,743 __main__ - INFO - Starting event loop
As records are received by the sample, the contents of the message payloads
should be displayed to the output window. Using the fake_streaming_service,
for example, initial payloads similar to the following should appear:
2018-05-30 17:35:36,754 __main__ - INFO - Received payloads: [ { "case": { "id": "9ab2cebb-6b5f-418b-a15f-df1a9ee213f2", "name": "A great case full of malware", "priority": "Low", "url": "https://mycaseserver.com/#/cases/9ab2cebb-6b5f-418b-a15f-df1a9ee213f2" }, "entity": "case", "id": "a45a03de-5c3d-452a-8a37-f68be954e784", "nature": "", "origin": "", "tenant-id": "7af4746a-63be-45d8-9fb5-5f58bf909c25", "timestamp": "", "transaction-id": "", "type": "creation", "user": "johndoe" }, { "case": { "id": "9ab2cebb-6b5f-418b-a15f-df1a9ee213f2", "name": "A great case full of malware", "priority": "Low", "url": "https://mycaseserver.com/#/cases/9ab2cebb-6b5f-418b-a15f-df1a9ee213f2" }, "entity": "case", "id": "a45a03de-5c3d-452a-8a37-f68be954e784", "nature": "", "origin": "", "tenant-id": "7af4746a-63be-45d8-9fb5-5f58bf909c25", "timestamp": "", "transaction-id": "", "type": "priority-update", "user": "other" } ]
When no new records are available from the service, the sample should output a line similar to the following:
2018-05-30 17:39:27,895 __main__ - INFO - Received records: []
Details¶
The majority of the sample code is shown below:
# Create a new channel object with Channel(CHANNEL_URL, auth=ChannelAuth(CHANNEL_URL, CHANNEL_USERNAME, CHANNEL_PASSWORD, verify_cert_bundle=VERIFY_CERTIFICATE_BUNDLE), consumer_group=CHANNEL_CONSUMER_GROUP, verify_cert_bundle=VERIFY_CERTIFICATE_BUNDLE) as channel: # Create a function which will be called back upon by the 'run' method (see # below) when records are received from the channel. def process_callback(payloads): # Print the payloads which were received. 'payloads' is a list of # dictionary objects extracted from the records received from the # channel. logger.info("Received payloads: \n%s", json.dumps(payloads, indent=4, sort_keys=True)) # Return 'True' in order for the 'run' call to continue attempting to # consume records. return True # Consume records indefinitely channel.run(process_callback, wait_between_queries=WAIT_BETWEEN_QUERIES, topics=CHANNEL_TOPIC_SUBSCRIPTIONS)
The first step is to create a dxlstreamingclient.channel.Channel
instance, which establishes a channel to the streaming service. The channel
includes the URL to the streaming service, CHANNEL_URL, and credentials
that the client uses to authenticate itself to the service, CHANNEL_USERNAME
and CHANNEL_PASSWORD.
The example defines a process_callback function which is invoked with the
payloads (a list of dictionary objects) extracted from records consumed from the
channel. The process_callback function outputs the contents of the
payloads parameter and returns True to indicate that the channel should
continue consuming records. Note that if the process_callback function were
to instead return False, the run method would stop polling the service
for new records and would instead return.
The final step is to call the dxlstreamingclient.channel.Channel.run()
method. The run method establishes a consumer instance with the service,
subscribes the consumer instance for events delivered to the topics
included in the CHANNEL_TOPIC_SUBSCRIPTIONS variable, and continuously
polls the streaming service for available records. The payloads from any
records which are received from the streaming service are passed in a call to
the process_callback function. Note that if no records are received from a
poll attempt, an empty list of payloads is passed into the process_callback
function.