Advanced Transform Example¶
This sample, similar to the Basic Event Example, sends an event to a DXL
broker. The DXL Elasticsearch service receives a notification for the event and
stores the payload into documents on an Elasticsearch server. The sample then
retrieves the contents of the stored documents via a call to the Elasticsearch
Get
API. The sample displays the results of the Get
call.
The most significant difference between the behavior for this sample and the Basic Event Example is that this sample configures a custom Python script which the Elasticsearch service invokes to transform the content of the event payload. The transform script defines two separate Elasticsearch documents which are stored for the event.
For more information on the Elasticsearch Get
API, see the
Elasticsearch Python Get API
and Elasticsearch REST Get API
documentation.
Prerequisites¶
- The samples configuration step has been completed (see Samples Configuration).
- The Elasticsearch API DXL service is running, using the
sample
configuration (see Running).
Running¶
To run this sample execute the sample/basic/advanced_transform_example.py
script as follows:
python sample/basic/advanced_transform_example.py
The output should appear similar to the following:
Waiting for event payloads to be stored in Elasticsearch... Response to the get request for id 'advanced-event-example-id-1': { "_id": "advanced-event-example-id-1", "_index": "opendxl-elasticsearch-service-examples", "_source": { "id": "advanced-event-example-id-1", "message": "Hello from OpenDXL", "source": "Advanced Transform Example" }, "_type": "advanced-transform-example-doc", "_version": 1, "found": true } Response to the get request for id 'advanced-event-example-id-2': { "_id": "advanced-event-example-id-2", "_index": "opendxl-elasticsearch-service-examples", "_source": { "id": "advanced-event-example-id-2", "message": "Hello from OpenDXL", "source": "Advanced Transform Example" }, "_type": "advanced-transform-example-doc", "_version": 1, "found": true }
Details¶
In order to enable the use of the get
API, the API name is listed in the
apiNames
setting under the [General]
section in the sample
"dxlelasticsearchservice.config" file that the service uses:
[General] apiNames=...,get,...
The "dxlelasticsearchservice.config" file also includes some settings which instruct the service to store the event payload into an Elasticsearch document:
[General] eventGroupNames=...,advanced_transform_example [advanced_transform_example] topics=/sample/elasticsearch/advancedtransform documentIndex=opendxl-elasticsearch-service-examples documentType=advanced-transform-example-doc transformScript=advanced_transform_example_script.py
The advanced_transform_example
event group section lists the name of the
event topic which the sample sends,
/sample/elasticsearch/advancedtransform
. The payload for each matching
event received is passed into an on_event
function defined by the
advanced_transform_example_script.py
script. The on_event
function
transforms the event payload into parameters for two corresponding documents
which are stored to the Elasticsearch server. See the code snippets below for
more information on the transform script.
For more information on the configuration, see the Service Configuration File section.
The majority of the sample code is shown below:
# Create the client with DxlClient(config) as client: # Connect to the fabric client.connect() logger.info("Connected to DXL fabric.") # Create the event event = Event(EVENT_TOPIC) # Set the payload MessageUtils.encode_payload(event, "Hello from OpenDXL") # Send the event client.send_event(event) print("Waiting for event payloads to be stored in Elasticsearch...") time.sleep(5) for document_id in DOCUMENT_IDS: # Create the get request request_topic = "/opendxl-elasticsearch/service/elasticsearch-api/get" req = Request(request_topic) # Set the payload for the get request MessageUtils.dict_to_json_payload(req, { "index": DOCUMENT_INDEX, "doc_type": DOCUMENT_TYPE, "id": document_id}) # Send a request to the elasticsearch DXL service to try to retrieve the # document that should be stored for the event. res = client.sync_request(req, timeout=30) if res.message_type != Message.MESSAGE_TYPE_ERROR: # Display results for the get request res_dict = MessageUtils.json_payload_to_dict(res) print("Response to the get request for id '{}':\n{}".format( document_id, MessageUtils.dict_to_json(res_dict, pretty_print=True))) else: print("Error invoking service with topic '{}' for id '{}': {} ({})".format( request_topic, document_id, res.error_message, res.error_code)) if res.payload: # Display the payload in the error response res_dict = MessageUtils.json_payload_to_dict(res) print("Error payload:\n{}".format( MessageUtils.dict_to_json(res_dict, pretty_print=True)))
After connecting to the DXL fabric, an event is sent to the fabric.
Upon receipt of a notification for the event, the DXL Elasticsearch service
passes the event information along to the on_event
function in the
transform script configured for the event group. The script is named
advanced_transform_example_script.py
and resides the sample
directory.
The majority of the code for the advanced_transform_example_script.py
script is below:
def on_event(event, index_operation): """ Callback invoked with content received for a DXL event. The callback should return a dictionary (or list of dictionaries) with parameters for the Elasticsearch index operation(s) that the service should perform. The elements for each dictionary returned should correspond to parameters in the `Elasticsearch Python Index API <https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.Elasticsearch.index>`__. For example, the value for the "doc_type" element in the dictionary will be supplied as the "doc_type" parameter for the Elasticsearch "index" operation. :param dxlclient.message.Event event: The event which was received. :param dict index_operation: A dict with a set of parameters configured for storing the event payload into Elasticsearch. The dictionary should include the following: .. code-block::python { "index": "<documentIndex value from the application config>", "doc_type": "<documentType value from the application config>", "body": "<payload from the event parameter>", "id": "<id from event payload>" } If the event payload could be converted into a dict from JSON, the value for the "body" element will be a dict. Otherwise, the payload will be a str. The value for the "id" element is pulled from the value for the key in the event payload which corresponds to the "idFieldName" value in the application configuration. If no value was set for "idFieldName" in the application configuration, the value for the "id" element is None. :return: A dictionary (or list of dictionaries) with parameters for an Elasticsearch "index" operation to perform. If None is returned, no "index" operations will be performed. :rtype: dict or list(dict) """ logger.info("Event payload received for transform: %s", event.payload) logger.info("Index operation received for transform: %s", index_operation) # Modify the "id" and "body" elements in the index operation dictionary. event_payload = MessageUtils.decode_payload(event) index_operation["id"] = "advanced-event-example-id-1" # Store the event payload string in a dictionary. This allows Elasticsearch # to serialize the document into JSON for storage. index_operation["body"] = MessageUtils.dict_to_json({ "id": index_operation["id"], "message": event_payload, "source": "Advanced Transform Example"}) # Create a second index operation dictionary, using some of the values # from the first dictionary. another_index_operation = { "index": index_operation["index"], "doc_type": index_operation["doc_type"], "id": "advanced-event-example-id-2", "body": MessageUtils.dict_to_json({ "id": "advanced-event-example-id-2", "message": event_payload, "source": "Advanced Transform Example"}) } # Return info for two documents to store in Elasticsearch. return [index_operation, another_index_operation]
The first parameter to the on_event
function (event
) contains the
dxlclient.message.Event
object which was received for the event
notification. The second parameter to the function (index_operation
)
contains a dict
with elements which map to parameters in the Elasticsearch
Python Index API.
The elements in the dict
are preconfigured with information from the event
received from the DXL fabric:
{ "index": "opendxl-elasticsearch-service-examples", "doc_type": "advanced-transform-example-doc", "id": None, "body": "Hello from OpenDXL" }
The on_event
function fills out and returns parameters for two different
documents to be stored to Elasticsearch:
- For the first index operation
dict
, the function modifies theindex_operation
input parameter with a value for the documentid
andbody
. - For the second index operation
dict
, the function copies over some of the values from theindex_operation
input parameter.
The DXL Elasticsearch service uses the Elasticsearch Index
API to store the
index operation dictionaries to Elasticsearch. For more information on the
parameters which can be set for an index operation, see the
Elasticsearch Python Index API
documentation.
To confirm that that the two documents were stored properly, the sample creates a request message with a topic that targets the "get" method of the Elasticsearch API DXL service.
The next step is to set the payload
of the request message. The contents of
the payload include the index
, type (doc_type
), and id
of the
document to retrieve.
From the Elasticsearch Python Get API documentation:
"Get a typed JSON document from the index based on its id."
The next step is to perform a synchronous request via the DXL fabric. Since the process of storing the documents to Elasticsearch is asynchronous to sending the event, the "get" requests are repeated up to 5 times, with a delay of 2 seconds between requests, to allow some time for the documents to be stored before they can be retrieved. If the result after retries in getting each stored document is not an error, the response from the successful "get" request is displayed. The request process is performed twice, once for each document defined by the transform script which is expected to be stored to Elasticsearch.