Basic Produce Example¶
This sample demonstrates how to produce records to the DXL streaming service.
Prerequisites¶
- A DXL streaming service is available for the sample to connect to.
- Credentials for the service available for use with the sample.
Setup¶
Modify the example to include the appropriate settings for the streaming service channel:
CHANNEL_URL = "http://127.0.0.1:50080" CHANNEL_USERNAME = "me" CHANNEL_PASSWORD = "secret" CHANNEL_TOPIC = "my-topic" # Path to a CA bundle file containing certificates of trusted CAs. The CA # bundle is used to validate that the certificate of the server being connected # to was signed by a valid authority. If set to an empty string, the server # certificate is not validated. VERIFY_CERTIFICATE_BUNDLE = ""
For testing purposes, you can use the fake_streaming_service
Python tool
embedded in the OpenDXL Streaming Client SDK to start up a local
streaming service. The initial settings in the example above include the URL
and credentials used by the fake_streaming_service
.
To launch the fake_streaming_service
tool, run the following command in
a command window:
python sample/fake_streaming_service.py
Messages like the following should appear in the command window:
INFO:__main__:Starting service INFO:__main__:Started service on http://mycaseserver:50080
Running¶
To run this sample execute the sample/basic/basic_produce_example.py
script
as follows:
python sample/basic/basic_produce_example.py
If the records are successfully produced to the streaming service, the following line should appear in the output window:
Succeeded.
To validate that the records were produced to the streaming service with
the expected content, you can execute the
sample/basic/basic_consume_example.py
script as follows:
python sample/basic/basic_consume_example.py
One of the records received by the sample should appear similar to the following:
2018-05-30 17:35:36,754 __main__ - INFO - Received payloads: [ ... { "message": "Hello from OpenDXL" } ... ]
Details¶
The majority of the sample code is shown below:
CHANNEL_TOPIC = "my-topic" # Create the message payload to be included in a record message_payload = { "message": "Hello from OpenDXL" } # Create the full payload with records to produce to the channel channel_payload = { "records": [ { "routingData": { "topic": CHANNEL_TOPIC, "shardingKey": "" }, "message": { "headers": {}, # Convert the message payload from a dictionary to a # base64-encoded string. "payload": base64.b64encode( json.dumps(message_payload).encode()).decode() } } ] } # Create a new channel object with Channel(CHANNEL_URL, auth=ChannelAuth(CHANNEL_URL, CHANNEL_USERNAME, CHANNEL_PASSWORD, verify_cert_bundle=VERIFY_CERTIFICATE_BUNDLE), verify_cert_bundle=VERIFY_CERTIFICATE_BUNDLE) as channel: # Produce the payload records to the channel channel.produce(channel_payload) print("Succeeded.")
The first step is to create a payload dictionary which includes an array of
records to be sent to the channel. The message.payload item in each record
is flattened from a dictionary into a string and encoded using the base64
algorithm.
The next step is to create a dxlstreamingclient.channel.Channel
instance, which establishes a channel to the streaming service. The channel
parameters include the URL to the streaming service, CHANNEL_URL
, and
credentials that the client uses to authenticate itself to the service,
CHANNEL_USERNAME
and CHANNEL_PASSWORD
.
The final step is to call the
dxlstreamingclient.channel.Channel.produce()
method with the payload of
records to be produced to the channel. Assuming the records can be produced
successfully, the text "Succeeded." should appear in the console output.