Class: Channel

Channel(base, optionsopt)

The Channel class is responsible for all communication with the streaming service.

The example below demonstrates the creation of a Channel instance and creating a consumer for the consumer group.

Constructor

new Channel(base, optionsopt)

Parameters:
Name Type Attributes Description
base String

Base URL at which the streaming service resides.

options Object <optional>

Options to use for the channel.

Properties
Name Type Attributes Default Description
auth BaseChannelAuth <optional>

Authentication object to use for channel requests.

consumerGroup String <optional>

Consumer group to subscribe the channel consumer to.

pathPrefix String <optional>

Path to append to streaming service requests.

consumerPathPrefix String <optional>
/databus/consumer-service/v1

Path to append to consumer-related requests made to the streaming service. Note that if options.pathPrefix is set to a non-empty value, the options.pathPrefix value will be appended to consumer-related requests instead of the options.consumerPathPrefix value.

producerPathPrefix String <optional>
/databus/cloudproxy/v1

Path to append to producer-related requests made to the streaming service. Note that if the options.pathPrefix parameter is set to a non-empty value, the options.pathPrefix value will be appended to producer-related requests instead of the options.producerPathPrefix value.

offset String <optional>
latest

Offset for the next record to retrieve from the streaming service for the new Channel#consume call. Must be one of 'latest', 'earliest', or 'none'.

requestTimeout Number <optional>

The configuration controls the maximum amount of time the client (consumer) will wait for the broker response of a request. If the response is not received before the request timeout elapses the client may resend the request or fail the request if retries are exhausted. If set to null or undefined (the default), the request timeout is determined automatically by the streaming service. Note that if a value is set for the request timeout, the value should exceed the options.sessionTimeout. Otherwise, the streaming service may fail to create new consumers properly. To ensure that the request timeout is greater than the options.sessionTimeout, values for both (or neither) of the options.requestTimeout and options.sessionTimeout parameters should be specified.

sessionTimeout Number <optional>

The timeout (in seconds) used to detect channel consumer failures. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker may remove this consumer from the group. If set to null or undefined (the default), the session timeout is determined automatically by the streaming service. Note that if a value is set for the session timeout, the value should be less than the options.requestTimeout. Otherwise, the streaming service may fail to create new consumers properly. To ensure that the session timeout is less than the options.requestTimeout, values for both (or neither) of the options.requestTimeout and options.sessionTimeout parameters should be specified.

retryOnFail Boolean <optional>
true

Whether or not the channel will automatically retry a call which failed due to a temporary error.

extraConfigs Object <optional>

Object with properties containing any custom configuration settings which should be sent to the streaming service when a consumer is created. Note that any values specified for the options.offset, options.requestTimeout, and/or options.sessionTimeout parameters will override the corresponding values, if specified, in the options.extraConfigs parameter.

key String <optional>

Optional client private keys in PEM format. See https://nodejs.org/api/tls.html#tls_tls_createsecurecontext_options.

cert String <optional>

Optional client cert chains in PEM format. See https://nodejs.org/api/tls.html#tls_tls_createsecurecontext_options.

ca String <optional>

Optionally override the trusted CA certificates used to validate the streaming service. Any string can contain multiple PEM CAs concatenated together. See https://nodejs.org/api/tls.html#tls_tls_createsecurecontext_options.

passphrase String <optional>

Optional shared passphrase used for a single private key. See https://nodejs.org/api/tls.html#tls_tls_createsecurecontext_options.

rejectUnauthorized Boolean <optional>
true

If not false, the server certificate is verified against the list of supplied CAs. See https://nodejs.org/api/tls.html#tls_tls_connect_options_callback.

checkServerIdentity function <optional>

A callback function to be used when checking the server's hostname against the certificate. See https://nodejs.org/api/tls.html#tls_tls_connect_options_callback.

Source:
Example
// Create the channel
var channel = new Channel('http://channel-server',
  {
    auth: new ChannelAuth('http://channel-server', 'user', 'password'),
    consumerGroup: 'thegroup'
  })

// Create a new consumer on the consumer group
channel.create()

Members

retryOnFail

Whether or not the channel will automatically retry a call which failed due to a temporary error.

Source:

Methods

commit(callbackopt)

Commits the record offsets to the channel.

Parameters:
Name Type Attributes Description
callback function <optional>

Callback function invoked when the commit attempt is complete.

The first parameter supplied to the callback is an Error object, if an error occurred during the commit attempt, else null. Possible Error types include:

Source:

consume(callbackopt)

Consumes records from all the subscribed topics.

Parameters:
Name Type Attributes Description
callback function <optional>

Callback function invoked when the consume attempt is complete.

The first parameter supplied to the callback is an Error object, if an error occurred during the consume attempt, else null. Possible Error types include:

The second parameter supplied to the callback is, for a successful consume, an array of payloads (decoded as objects) from records returned from the server. For a failed consume, the second parameter is null.

Source:
Throws:

If the channel has not been subscribed to any topics.

Type
PermanentError

create(callbackopt)

Creates a new consumer on the consumer group.

Parameters:
Name Type Attributes Description
callback function <optional>

Callback function to invoke when the creation attempt has completed. The first parameter in the call to the results callback is an Error object, if an error occurred during the send attempt, else null. Possible Error types include:

Source:

delete(callbackopt)

Deletes the consumer from the consumer group.

Parameters:
Name Type Attributes Description
callback function <optional>

Callback to invoke after the consumer has been deleted. The first parameter supplied to the callback is an Error object, if an error occurred during the delete, else null.

Source:

destroy(callbackopt)

Destroys the channel (releases all associated resources).

NOTE: Once the method has been invoked, no other calls should be made to the channel.

Parameters:
Name Type Attributes Description
callback function <optional>

Function to invoke when the channel has been destroyed. The first parameter supplied to the callback is an Error object, if an error occurred during the destroy, else null. The parameter will be of type TemporaryError if a consumer has previously been created for the channel but an attempt to delete the consumer from the channel fails.

Source:

produce(payload, callbackopt)

Produces records to the channel.

Parameters:
Name Type Attributes Description
payload Object

Payload containing the records to be posted to the channel.

callback function <optional>

Function to invoke when the produce has been stopped. The first parameter supplied to the callback is an Error object, if an error occurred during the produce, else null. The parameter will be of type PermanentError if an unsuccessful response is received from the streaming service for the produce attempt.

Source:

reset()

Resets local consumer data stored for the channel.

Source:

run(processCallback, optionsopt)

Repeatedly consume records from the subscribed topics. The supplied processCallback is an array of payloads (decoded as objects) from records returned from the server.

The processCallback should return a value of true in order for this function to continue consuming additional records. For a return value of false or no return value, no additional records will be consumed and this function will return.

The Channel#stop method can also be called to halt an execution of this method.

Parameters:
Name Type Attributes Description
processCallback function

Callback to invoke with an array of payloads from records which have been consumed.

options Object <optional>

Options to use for the channel.

Properties
Name Type Attributes Default Description
doneCallback function <optional>

Callback to invoke when the run is complete. The first parameter supplied to the callback is an Error object, if an error occurred during the run, else null.

waitBetweenQueries Number <optional>
30

Number of seconds to wait between calls to consume records.

topics String | Array.<String> <optional>

Topic or array of topics. If set to a non-empty value, the channel will be subscribed to the specified topics. If set to an empty value, the channel will use topics previously subscribed via a call to the Channel#subscribe method.

Source:
Throws:

If a previous run is already in progress.

Type
PermanentError

stop(callbackopt)

Stop an active execution of a Channel#run. If no run is active, the supplied callback is invoked immediately. If a run is active, the supplied callback is invoked after the run has been completed.

Parameters:
Name Type Attributes Description
callback function <optional>

Function to invoke when the run has been stopped.

Source:

subscribe(topics, callbackopt)

Subscribes the consumer to an array of topics.

Parameters:
Name Type Attributes Description
topics String | Array.<String>

Topic or array of topics.

callback function <optional>

Callback function invoked when the subscription attempt is complete. The first parameter in the call to the results callback is an Error object, if an error occurred during the send attempt, else null. Possible Error types include:

Source: