Basic Event Example¶
This sample sends an event to a DXL broker. The DXL Elasticsearch service
receives a notification for the event and stores the payload into a document
on an Elasticsearch server. The sample then retrieves the contents of the stored document
via a call to the Elasticsearch Get
API. The sample displays the results of
the Get
call.
For more information on the Elasticsearch Get
API, see the
Elasticsearch Python Get API
and Elasticsearch REST Get API
documentation.
Prerequisites¶
- The samples configuration step has been completed (see Samples Configuration).
- The Elasticsearch API DXL service is running, using the
sample
configuration (see Running).
Running¶
To run this sample execute the sample/basic/basic_event_example.py
script
as follows:
python sample/basic/basic_event_example.py
The output should appear similar to the following:
Waiting for event payload to be stored in Elasticsearch... Response to the get request: { "_id": "basic-event-example-id", "_index": "opendxl-elasticsearch-service-examples", "_source": { "event_id": "basic-event-example-id", "message": "Hello from OpenDXL", "source": "Basic Event Example" }, "_type": "basic-event-example-doc", "_version": 1, "found": true }
Details¶
In order to enable the use of the get
API, the API name is listed in the
apiNames
setting under the [General]
section in the sample
"dxlelasticsearchservice.config" file that the service uses:
[General] apiNames=...,get,...
The "dxlelasticsearchservice.config" file also includes some settings which instruct the service to store the event payload into an Elasticsearch document:
[General] eventGroupNames=basic_event_example,... [basic_event_example] topics=/sample/elasticsearch/basicevent documentIndex=opendxl-elasticsearch-service-examples documentType=basic-event-example-doc idFieldName=event_id
The basic_event_example
event group section lists the name of the event
topic which the sample sends, /sample/elasticsearch/basicevent
. The payload
for each matching event received is stored into Elasticsearch as a document
with the index (documentIndex
) and type (documentType
) listed in the
configuration. The ID for the Elasticsearch document is retrieved from the
field name in the event payload which corresponds to the value for the
idFieldName
setting, event_id
. The sample includes the following
payload for the event:
{ "event_id": "basic-event-example-id", "message": "Hello from OpenDXL", "source": "Basic Event Example" }
Since the value in the payload for the event_id
field is
basic-event-example-id
, the ID of the document stored to Elasticsearch for
the event is also basic-event-example-id
.
For more information on the configuration, see the Service Configuration File section.
For more information on the Elasticsearch document storage process, see the Elasticsearch REST Index API.
The majority of the sample code is shown below:
# Create the client with DxlClient(config) as client: # Connect to the fabric client.connect() logger.info("Connected to DXL fabric.") # Create the event event = Event(EVENT_TOPIC) # Set the payload MessageUtils.dict_to_json_payload(event, { "event_id": DOCUMENT_ID, "message": "Hello from OpenDXL", "source": "Basic Event Example"}) # Send the event client.send_event(event) # Create the get request request_topic = "/opendxl-elasticsearch/service/elasticsearch-api/get" req = Request(request_topic) # Set the payload for the get request MessageUtils.dict_to_json_payload(req, { "index": DOCUMENT_INDEX, "doc_type": DOCUMENT_TYPE, "id": DOCUMENT_ID}) print("Waiting for event payload to be stored in Elasticsearch...") time.sleep(5) # Send a request to the elasticsearch DXL service to retrieve the document # that should be stored for the event. res = client.sync_request(req, timeout=30) if res.message_type != Message.MESSAGE_TYPE_ERROR: # Display results for the get request res_dict = MessageUtils.json_payload_to_dict(res) print("Response to the get request:\n{}".format( MessageUtils.dict_to_json(res_dict, pretty_print=True))) else: print("Error invoking service with topic '{}': {} ({})".format( request_topic, res.error_message, res.error_code)) if res.payload: # Display the payload in the error response res_dict = MessageUtils.json_payload_to_dict(res) print("Error payload:\n{}".format( MessageUtils.dict_to_json(res_dict, pretty_print=True)))
After connecting to the DXL fabric, an event is sent to the fabric.
Upon receipt of a notification for the event, the DXL Elasticsearch service stores a corresponding document to the Elasticsearch server.
To confirm that the document was stored properly, a request message is created with a topic that targets the "get" method of the Elasticsearch API DXL service.
The next step is to set the payload
of the request message. The contents of
the payload include the index
, type (doc_type
), and id
of the
document to retrieve.
From the Elasticsearch Python Get API documentation:
"Get a typed JSON document from the index based on its id."
The next step is to perform a synchronous request via the DXL fabric. Since the process of storing the document to Elasticsearch is asynchronous to sending the event, the "get" requests are repeated up to 5 times, with a delay of 2 seconds between requests, to allow some time for the document to be stored before it can be retrieved. If the result after retries in getting the stored document is not an error, the response from the successful "get" request is displayed.