dxlstreamingclient.channel module¶
Contains the Channel
class, which is used to connect to the consumer
service.
-
class
dxlstreamingclient.channel.
Channel
(base, auth, consumer_group=None, path_prefix=None, consumer_path_prefix='/databus/consumer-service/v1', producer_path_prefix='/databus/cloudproxy/v1', offset='latest', request_timeout=None, session_timeout=None, retry_on_fail=True, verify_cert_bundle='', extra_configs=None)¶ Bases:
object
The
Channel
class is responsible for all communication with the streaming service.The following example demonstrates the creation of a
Channel
instance and creating a consumer for the consumer group:# Create the channel with Channel("http://channel-server", auth=ChannelAuth("http://channel-server, "user", "password"), consumer_group="thegroup") as channel: # Create a new consumer on the consumer group channel.create()
NOTE: The preferred way to construct the channel is via the Python "with" statement as shown above. The "with" statement ensures that resources associated with the channel are properly cleaned up when the block is exited.
Constructor parameters:
Parameters: - base (str) -- Base URL at which the streaming service resides.
- auth (requests.auth.AuthBase) -- Authentication object to use for channel requests.
- consumer_group (str) -- Consumer group to subscribe the channel consumer to.
- path_prefix (str) -- Path to append to streaming service requests.
- consumer_path_prefix (str) -- Path to append to consumer-related requests made to the streaming service. Note that if the path_prefix parameter is set to a non-empty value, the path_prefix value will be appended to consumer-related requests instead of the consumer_path_prefix value.
- producer_path_prefix (str) -- Path to append to producer-related requests made to the streaming service. Note that if the path_prefix parameter is set to a non-empty value, the path_prefix value will be appended to producer-related requests instead of the producer_path_prefix value.
- offset (str) -- Offset for the next record to retrieve from the
streaming service for the new
consume()
call. Must be one of 'latest', 'earliest', or 'none'. - request_timeout (int) -- The configuration controls the maximum amount of time the client (consumer) will wait for the broker response of a request. If the response is not received before the request timeout elapses the client may resend the request or fail the request if retries are exhausted. If set to None (the default), the request timeout is determined automatically by the streaming service. Note that if a value is set for the request timeout, the value should exceed the session_timeout. Otherwise, the streaming service may fail to create new consumers properly. To ensure that the request timeout is greater than the session_timeout, values for either both (or neither) of the request_timeout and session_timeout parameters should be specified.
- session_timeout (int) -- The timeout (in seconds) used to detect channel consumer failures. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker may remove this consumer from the group. If set to None (the default), the session timeout is determined automatically by the streaming service. Note that if a value is set for the session timeout, the value should be less than the request_timeout. Otherwise, the streaming service may fail to create new consumers properly. To ensure that the session timeout is less than the request_timeout, values for either both (or neither) of the request_timeout and session_timeout parameters should be specified.
- retry_on_fail (bool) -- Whether or not the channel will automatically retry a call which failed due to a temporary error.
- verify_cert_bundle (str) -- Path to a CA bundle file containing certificates of trusted CAs. The CA bundle is used to validate that the certificate of the authentication server being connected to was signed by a valid authority. If set to an empty string, the server certificate is not validated.
- extra_configs (dict) -- Dictionary of key/value pairs containing any custom configuration settings which should be sent to the streaming service when a consumer is created. Note that any values specified for the offset, request_timeout, and/or session_timeout parameters will override the corresponding values, if specified, in the extra_configs parameter.
-
commit
(**kwargs)¶ Commits the record offsets to the channel
Raises: - ConsumerError -- if the consumer associated with the channel
does not exist on the server and
retry_on_fail
is set to False. - TemporaryError -- if the commit attempt fails and
retry_on_fail
is set to False. - PermanentError -- if the channel has been destroyed.
- ConsumerError -- if the consumer associated with the channel
does not exist on the server and
-
consume
(**kwargs)¶ Consumes records from all the subscribed topics
Raises: - ConsumerError -- if the consumer associated with the channel
does not exist on the server and
retry_on_fail
is set to False. - TemporaryError -- if the consume attempt fails and
retry_on_fail
is set to False. - PermanentError -- if the channel has been destroyed or the channel has not been subscribed to any topics.
Returns: A list of the payloads (decoded as dictionaries) from the records returned from the server.
Return type: list(dict)
- ConsumerError -- if the consumer associated with the channel
does not exist on the server and
-
create
(**kwargs)¶ Creates a new consumer on the consumer group
Raises: - TemporaryError -- if the creation attempt fails and
retry_on_fail
is set to False. - PermanentError -- if the channel has been destroyed.
- TemporaryError -- if the creation attempt fails and
-
delete
()¶ Deletes the consumer from the consumer group
Raises: TemporaryError -- if the delete attempt fails.
-
destroy
()¶ Destroys the channel (releases all associated resources).
NOTE: Once the method has been invoked, no other calls should be made to the channel.
Also note that this method should rarely be called directly. Instead, the preferred usage of the channel is via a Python "with" statement as shown below:
# Create the channel with Channel("http://channel-server", auth=ChannelAuth("http://channel-server, "user", "password"), consumer_group="thegroup") as channel: # Create a new consumer on the consumer group channel.create()
The "with" statement ensures that resources associated with the channel are properly cleaned up when the block is exited (the
destroy()
method is invoked).Raises: TemporaryError -- if a consumer has previously been created for the channel but an attempt to delete the consumer from the channel fails.
-
produce
(payload)¶ Produces records to the channel.
Parameters: payload -- Payload containing the records to be posted to the channel. Raises: PermanentError -- if an unsuccessful response is received from the streaming service.
-
reset
()¶ Resets local consumer data stored for the channel.
-
retry_on_fail
¶ Whether or not the channel will automatically retry a call which failed due to a temporary error.
-
run
(process_callback, wait_between_queries=30, topics=None)¶ Repeatedly consume records from the subscribed topics. The supplied
process_callback
is invoked with alist
containing each payload (as a dictionary) extracted from its corresponding record.The
process_callback
should return a value ofTrue
in order for this function to continue consuming additional records. For a return value ofFalse
or no return value, no additional records will be consumed and this function will return.The
stop()
method can also be called to halt an execution of this method.Parameters: - process_callback -- Callable which is invoked with a list of payloads from records which have been consumed.
- wait_between_queries (int) -- Number of seconds to wait between calls to consume records.
- topics (str or list(str)) -- Topic list. If set to a non-empty value, the channel
will be subscribed to the specified topics. If set to an empty
value, the channel will use topics previously subscribed via a
call to the
subscribe()
method.
Raises: PermanentError -- if the channel has been destroyed or a prior run is already in progress.
-
stop
()¶ Stop an active execution of the
run()
call. If norun()
call is active, this function returns immediately. If arun()
call is active, this function blocks until the run has been completed.
-
subscribe
(**kwargs)¶ Subscribes the consumer to a list of topics
Parameters: topics (str or list(str)) -- Topic list.
Raises: - ConsumerError -- if the consumer associated with the channel
does not exist on the server and
retry_on_fail
is set to False. - TemporaryError -- if the subscription attempt fails and
retry_on_fail
is set to False. - PermanentError -- if the channel has been destroyed.
- ConsumerError -- if the consumer associated with the channel
does not exist on the server and
-
class
dxlstreamingclient.channel.
ChannelAuth
(base, username, password, verify_cert_bundle='')¶ Bases:
requests.auth.AuthBase
Authentication class for use with channel requests.
Constructor parameters:
Parameters: - base (str) -- Base URL to forward authentication requests to.
- username (str) -- User name to supply for request authentication.
- password (str) -- Password to supply for request authentication.
- verify_cert_bundle (str) -- Path to a CA bundle file containing certificates of trusted CAs. The CA bundle is used to validate that the certificate of the authentication server being connected to was signed by a valid authority. If set to an empty string, the server certificate is not validated.
-
reset
()¶ Purge any credentials cached from a previous authentication.
-
exception
dxlstreamingclient.channel.
ConsumerError
¶ Bases:
dxlstreamingclient.error.TemporaryError
Error raised when a channel operation fails due to the associated consumer not being recognized by the streaming service.