dxlstreamingclient.channel module

Contains the Channel class, which is used to connect to the consumer service.

class dxlstreamingclient.channel.Channel(base, auth, consumer_group=None, path_prefix=None, consumer_path_prefix='/databus/consumer-service/v1', producer_path_prefix='/databus/cloudproxy/v1', offset='latest', request_timeout=None, session_timeout=None, retry_on_fail=True, verify_cert_bundle='', extra_configs=None)

Bases: object

The Channel class is responsible for all communication with the streaming service.

The following example demonstrates the creation of a Channel instance and creating a consumer for the consumer group:

# Create the channel
with Channel("http://channel-server",
             auth=ChannelAuth("http://channel-server,
                "user", "password"),
             consumer_group="thegroup") as channel:
    # Create a new consumer on the consumer group
    channel.create()

NOTE: The preferred way to construct the channel is via the Python "with" statement as shown above. The "with" statement ensures that resources associated with the channel are properly cleaned up when the block is exited.

Constructor parameters:

Parameters:
  • base (str) -- Base URL at which the streaming service resides.
  • auth (requests.auth.AuthBase) -- Authentication object to use for channel requests.
  • consumer_group (str) -- Consumer group to subscribe the channel consumer to.
  • path_prefix (str) -- Path to append to streaming service requests.
  • consumer_path_prefix (str) -- Path to append to consumer-related requests made to the streaming service. Note that if the path_prefix parameter is set to a non-empty value, the path_prefix value will be appended to consumer-related requests instead of the consumer_path_prefix value.
  • producer_path_prefix (str) -- Path to append to producer-related requests made to the streaming service. Note that if the path_prefix parameter is set to a non-empty value, the path_prefix value will be appended to producer-related requests instead of the producer_path_prefix value.
  • offset (str) -- Offset for the next record to retrieve from the streaming service for the new consume() call. Must be one of 'latest', 'earliest', or 'none'.
  • request_timeout (int) -- The configuration controls the maximum amount of time the client (consumer) will wait for the broker response of a request. If the response is not received before the request timeout elapses the client may resend the request or fail the request if retries are exhausted. If set to None (the default), the request timeout is determined automatically by the streaming service. Note that if a value is set for the request timeout, the value should exceed the session_timeout. Otherwise, the streaming service may fail to create new consumers properly. To ensure that the request timeout is greater than the session_timeout, values for either both (or neither) of the request_timeout and session_timeout parameters should be specified.
  • session_timeout (int) -- The timeout (in seconds) used to detect channel consumer failures. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker may remove this consumer from the group. If set to None (the default), the session timeout is determined automatically by the streaming service. Note that if a value is set for the session timeout, the value should be less than the request_timeout. Otherwise, the streaming service may fail to create new consumers properly. To ensure that the session timeout is less than the request_timeout, values for either both (or neither) of the request_timeout and session_timeout parameters should be specified.
  • retry_on_fail (bool) -- Whether or not the channel will automatically retry a call which failed due to a temporary error.
  • verify_cert_bundle (str) -- Path to a CA bundle file containing certificates of trusted CAs. The CA bundle is used to validate that the certificate of the authentication server being connected to was signed by a valid authority. If set to an empty string, the server certificate is not validated.
  • extra_configs (dict) -- Dictionary of key/value pairs containing any custom configuration settings which should be sent to the streaming service when a consumer is created. Note that any values specified for the offset, request_timeout, and/or session_timeout parameters will override the corresponding values, if specified, in the extra_configs parameter.
commit(**kwargs)

Commits the record offsets to the channel

Raises:
consume(**kwargs)

Consumes records from all the subscribed topics

Raises:
Returns:

A list of the payloads (decoded as dictionaries) from the records returned from the server.

Return type:

list(dict)

create(**kwargs)

Creates a new consumer on the consumer group

Raises:
delete()

Deletes the consumer from the consumer group

Raises:TemporaryError -- if the delete attempt fails.
destroy()

Destroys the channel (releases all associated resources).

NOTE: Once the method has been invoked, no other calls should be made to the channel.

Also note that this method should rarely be called directly. Instead, the preferred usage of the channel is via a Python "with" statement as shown below:

# Create the channel
with Channel("http://channel-server",
             auth=ChannelAuth("http://channel-server,
                 "user", "password"),
             consumer_group="thegroup") as channel:
    # Create a new consumer on the consumer group
    channel.create()

The "with" statement ensures that resources associated with the channel are properly cleaned up when the block is exited (the destroy() method is invoked).

Raises:TemporaryError -- if a consumer has previously been created for the channel but an attempt to delete the consumer from the channel fails.
produce(payload)

Produces records to the channel.

Parameters:payload -- Payload containing the records to be posted to the channel.
Raises:PermanentError -- if an unsuccessful response is received from the streaming service.
reset()

Resets local consumer data stored for the channel.

retry_on_fail

Whether or not the channel will automatically retry a call which failed due to a temporary error.

run(process_callback, wait_between_queries=30, topics=None)

Repeatedly consume records from the subscribed topics. The supplied process_callback is invoked with a list containing each payload (as a dictionary) extracted from its corresponding record.

The process_callback should return a value of True in order for this function to continue consuming additional records. For a return value of False or no return value, no additional records will be consumed and this function will return.

The stop() method can also be called to halt an execution of this method.

Parameters:
  • process_callback -- Callable which is invoked with a list of payloads from records which have been consumed.
  • wait_between_queries (int) -- Number of seconds to wait between calls to consume records.
  • topics (str or list(str)) -- Topic list. If set to a non-empty value, the channel will be subscribed to the specified topics. If set to an empty value, the channel will use topics previously subscribed via a call to the subscribe() method.
Raises:

PermanentError -- if the channel has been destroyed or a prior run is already in progress.

stop()

Stop an active execution of the run() call. If no run() call is active, this function returns immediately. If a run() call is active, this function blocks until the run has been completed.

subscribe(**kwargs)

Subscribes the consumer to a list of topics

Parameters:

topics (str or list(str)) -- Topic list.

Raises:
class dxlstreamingclient.channel.ChannelAuth(base, username, password, verify_cert_bundle='')

Bases: requests.auth.AuthBase

Authentication class for use with channel requests.

Constructor parameters:

Parameters:
  • base (str) -- Base URL to forward authentication requests to.
  • username (str) -- User name to supply for request authentication.
  • password (str) -- Password to supply for request authentication.
  • verify_cert_bundle (str) -- Path to a CA bundle file containing certificates of trusted CAs. The CA bundle is used to validate that the certificate of the authentication server being connected to was signed by a valid authority. If set to an empty string, the server certificate is not validated.
reset()

Purge any credentials cached from a previous authentication.

exception dxlstreamingclient.channel.ConsumerError

Bases: dxlstreamingclient.error.TemporaryError

Error raised when a channel operation fails due to the associated consumer not being recognized by the streaming service.