Source: channel.js

'use strict'

var request = require('request')
var retry = require('retry')
var Buffer = require('safe-buffer').Buffer
var util = require('./util')
var ConsumerError = require('./consumer-error')
var PermanentError = require('./permanent-error')
var StopError = require('./stop-error')
var TemporaryError = require('./temporary-error')

var DEFAULT_CONSUMER_PATH_PREFIX = '/databus/consumer-service/v1'
var DEFAULT_PRODUCER_PATH_PREFIX = '/databus/cloudproxy/v1'

// Constants for consumer config settings
var AUTO_OFFSET_RESET_CONFIG_SETTING = 'auto.offset.reset'
var ENABLE_AUTO_COMMIT_CONFIG_SETTING = 'enable.auto.commit'
var REQUEST_TIMEOUT_CONFIG_SETTING = 'request.timeout.ms'
var SESSION_TIMEOUT_CONFIG_SETTING = 'session.timeout.ms'

// Default number of seconds to wait between consume queries made to the
// streaming service
var DEFAULT_WAIT_BETWEEN_QUERIES = 30

var RETRY_FACTOR = 2
var MIN_RETRY_TIMEOUT = 1000
var MAX_RETRY_TIMEOUT = 10000

/**
 * Asynchronously invoke the supplied callback.
 * @param {Function} callback - Function to invoke.
 * @private
 */
function callbackAsync (callback) {
  if (callback) {
    setImmediate(function () {
      callback(null)
    })
  }
}

/**
 * @classdesc The `Channel` class is responsible for all communication with the
 * streaming service.
 *
 * The example below demonstrates the creation of a `Channel` instance and
 * creating a consumer for the consumer group.
 * @example
 * // Create the channel
 * var channel = new Channel('http://channel-server',
 *   {
 *     auth: new ChannelAuth('http://channel-server', 'user', 'password'),
 *     consumerGroup: 'thegroup'
 *   })
 *
 * // Create a new consumer on the consumer group
 * channel.create()
 * @param {String} base - Base URL at which the streaming service resides.
 * @param {Object} [options] - Options to use for the channel.
 * @param {BaseChannelAuth} [options.auth] - Authentication object to use for
 *   channel requests.
 * @param {String} [options.consumerGroup] - Consumer group to subscribe the
 *   channel consumer to.
 * @param {String} [options.pathPrefix] - Path to append to streaming service
 *   requests.
 * @param {String} [options.consumerPathPrefix=/databus/consumer-service/v1] -
 *   Path to append to consumer-related requests made to the streaming service.
 *   Note that if `options.pathPrefix` is set to a non-empty value, the
 *   `options.pathPrefix` value will be appended to consumer-related requests
 *   instead of the `options.consumerPathPrefix` value.
 * @param {String} [options.producerPathPrefix=/databus/cloudproxy/v1] - Path
 *   to append to producer-related requests made to the streaming service. Note
 *   that if the `options.pathPrefix` parameter is set to a non-empty value, the
 *   `options.pathPrefix` value will be appended to producer-related requests
 *   instead of the `options.producerPathPrefix` value.
 * @param {String} [options.offset=latest] - Offset for the next record to
 *   retrieve from the streaming service for the new {@link Channel#consume}
 *   call. Must be one of 'latest', 'earliest', or 'none'.
 * @param {Number} [options.requestTimeout] - The configuration controls the
 *   maximum amount of time the client (consumer) will wait for the broker
 *   response of a request. If the response is not received before the
 *   request timeout elapses the client may resend the request or fail
 *   the request if retries are exhausted. If set to `null` or `undefined` (the
 *   default), the request timeout is determined automatically by the streaming
 *   service. Note that if a value is set for the request
 *   timeout, the value should exceed the `options.sessionTimeout`. Otherwise,
 *   the streaming service may fail to create new consumers properly. To
 *   ensure that the request timeout is greater than the
 *   `options.sessionTimeout`, values for both (or neither) of the
 *   `options.requestTimeout` and `options.sessionTimeout` parameters should be
 *   specified.
 * @param {Number} [options.sessionTimeout] - The timeout (in seconds) used to
 *   detect channel consumer failures. The consumer sends periodic heartbeats
 *   to indicate its liveness to the broker. If no heartbeats are
 *   received by the broker before the expiration of this session
 *   timeout, then the broker may remove this consumer from the group.
 *   If set to `null` or `undefined` (the default), the session timeout is
 *   determined automatically by the streaming service. Note that if a value is
 *   set for the session timeout, the value should be less than the
 *   `options.requestTimeout`. Otherwise, the streaming service may fail to
 *   create new consumers properly. To ensure that the session timeout is
 *   less than the `options.requestTimeout`, values for both (or neither)
 *   of the `options.requestTimeout` and `options.sessionTimeout` parameters
 *   should be specified.
 * @param {Boolean} [options.retryOnFail=true] - Whether or not the channel
 *   will automatically retry a call which failed due to a temporary error.
 * @param {Object} [options.extraConfigs] - Object with properties containing
 *   any custom configuration settings which should be sent to the streaming
 *   service when a consumer is created. Note that any values specified for the
 *   `options.offset`, `options.requestTimeout`, and/or `options.sessionTimeout`
 *   parameters will override the corresponding values, if specified, in the
 *   `options.extraConfigs` parameter.
 * @param {String} [options.key] - Optional client private keys in PEM format.
 *   See
 *   {@link https://nodejs.org/api/tls.html#tls_tls_createsecurecontext_options}.
 * @param {String} [options.cert] - Optional client cert chains in PEM format.
 *   See
 *   {@link https://nodejs.org/api/tls.html#tls_tls_createsecurecontext_options}.
 * @param {String} [options.ca] - Optionally override the trusted CA
 *   certificates used to validate the streaming service. Any string can
 *   contain multiple PEM CAs concatenated together.
 *   See
 *   {@link https://nodejs.org/api/tls.html#tls_tls_createsecurecontext_options}.
 * @param {String} [options.passphrase] - Optional shared passphrase used for a
 *   single private key. See
 *   {@link https://nodejs.org/api/tls.html#tls_tls_createsecurecontext_options}.
 * @param {Boolean} [options.rejectUnauthorized=true] - If not false, the server
 *   certificate is verified against the list of supplied CAs. See
 *   {@link https://nodejs.org/api/tls.html#tls_tls_connect_options_callback}.
 * @param {Function} [options.checkServerIdentity] - A callback function to
 *   be used when checking the server's hostname against the certificate.
 *   See
 *   {@link https://nodejs.org/api/tls.html#tls_tls_connect_options_callback}.
 * @constructor
 */
function Channel (base, options) {
  var channel = this

  if (!base) {
    throw new PermanentError('Value must be specified for base')
  }
  this._base = base

  options = options || {}

  this._auth = options.auth
  this._consumerGroup = options.consumerGroup

  var offsetValues = ['latest', 'earliest', 'none']
  if (options.offset && offsetValues.indexOf(options.offset) < 0) {
    throw new PermanentError('Value for \'offset\' must be one of ' +
      offsetValues.join(', '))
  }

  var pathPrefix = options.pathPrefix
  if (pathPrefix) {
    this._consumerPathPrefix = pathPrefix
    this._producerPathPrefix = pathPrefix
  } else {
    this._consumerPathPrefix =
      options.consumerPathPrefix || DEFAULT_CONSUMER_PATH_PREFIX
    this._producerPathPrefix =
      options.producerPathPrefix || DEFAULT_PRODUCER_PATH_PREFIX
  }

  this._configs = {}
  var extraConfigs = options.extraConfigs || {}
  Object.keys(extraConfigs).forEach(function (setting) {
    channel._configs[setting] = extraConfigs[setting]
  })

  this._configs[AUTO_OFFSET_RESET_CONFIG_SETTING] = options.offset || 'latest'

  if (extraConfigs[ENABLE_AUTO_COMMIT_CONFIG_SETTING]) {
    this._configs[ENABLE_AUTO_COMMIT_CONFIG_SETTING] =
      extraConfigs[ENABLE_AUTO_COMMIT_CONFIG_SETTING]
  } else {
    // This has to be false for now
    this._configs[ENABLE_AUTO_COMMIT_CONFIG_SETTING] = 'false'
  }

  if ((typeof options.sessionTimeout !== 'undefined') &&
    (options.sessionTimeout !== null)) {
    var sessionTimeout = Number(options.sessionTimeout)
    if (isNaN(sessionTimeout)) {
      throw new TypeError('sessionTimeout must be a number')
    }
    // Convert from seconds to milliseconds
    this._configs[SESSION_TIMEOUT_CONFIG_SETTING] =
      (sessionTimeout * 1000).toString()
  }

  if ((typeof options.requestTimeout !== 'undefined') &&
    (options.requestTimeout !== null)) {
    var requestTimeout = Number(options.requestTimeout)
    if (isNaN(requestTimeout)) {
      throw new TypeError('requestTimeout must be a number')
    }
    // Convert from seconds to milliseconds
    this._configs[REQUEST_TIMEOUT_CONFIG_SETTING] =
      (requestTimeout * 1000).toString()
  }

  /**
   * Whether or not the channel will automatically retry a call which
   * failed due to a temporary error.
   */
  this.retryOnFail = options.hasOwnProperty(
    'retryOnFail') ? options.retryOnFail : true

  /**
   * Setup the defaults to be used in each HTTP request to be made to the
   * channel. A `jar` is defined in order to manage cookies which may be
   * provided from the server when establishing a channel consumer.
   * @private
   */
  this._request = request.defaults(
    util.addTlsOptions({
      baseUrl: this._base,
      jar: request.jar()
    }, options)
  )

  /**
   * Most recent consumer id returned from the streaming service for a
   * channel consumer.
   * @type {String}
   * @private
   */
  this._consumerId = null

  /**
   * Array of strings containing the topics for which a channel consumer is
   * actively subscribed.
   * @type {Array<String>}
   * @private
   */
  this._activeSubscriptions = []

  /**
   * Array of strings containing the topics that the channel consumer has
   * requested to be subscribed to. Once the server has provided a successful
   * response to a subscription request, this array is copied to the
   * {@link Channel#_activeSubscriptions}.
   * @type {Array<String>}
   * @private
   */
  this._requestedSubscriptions = []

  /**
   * Array of records which have been consumed from the channel but whose
   * offsets have not yet been committed back to the channel.
   * @type {Array<Object>}
   * @private
   */
  this._recordsCommitLog = []

  /**
   * Whether or not the channel is currently active. Once the channel has
   * been destroyed, this is set to `false`.
   * @type {boolean}
   * @private
   */
  this._active = true

  /**
   * Whether or not a {@link Channel#run} call is active.
   * @type {boolean}
   * @private
   */
  this._running = false

  /**
   * Whether or not a {@link Channel#stop} call has been made to terminate
   * an active {@link Channel#run} call.
   * @type {boolean}
   * @private
   */
  this._stopRequested = false

  /**
   * An array of callbacks which should be invoked when a channel is stopped.
   * @type {Array<Function>}
   * @private
   */
  this._stopCallbacks = []

  /**
   * Function invoked while a {@link Channel#run} is in progress to resume
   * consumption of records after waiting for the configured
   * `waitBetweenQueries`.
   * @type {Function}
   * @private
   */
  this._runLoopFunc = null

  /**
   * Timeout object related to the `_runLoopFunc`.
   * @private
   */
  this._runLoopTimeout = null

  /**
   * Returns whether or not the channel is still active.
   * @param {Function} [callback] - Callback to invoke with an error if the
   *   channel is not active.
   * @returns {Boolean} true if the channel is still active, false if not.
   * @private
   */
  this._stillActive = function (callback) {
    if (!this._active && callback) {
      callback(new PermanentError('Channel has been destroyed'))
    }
    return this._active
  }

  /**
   * Send an HTTP request to the channel. The request options should include
   * info for the user, if authenticated -- for example, a channel token.
   * @param {Function} sendFunc - Function to invoke in order to send the
   *   request.
   *
   *   The first parameter in the function call is the same as the `options`
   *   parameter. The second parameter is a callback that the function should
   *   invoke with the results from sending the request.
   *
   *   The first parameter in the call to the results callback is an `Error`
   *   object, if an error occurred during the send attempt, else `null`. The
   *   second parameter is an HTTP response object, if the an HTTP response
   *   could be returned for the request, else `null`.
   * @param {Object} [options] - Options to use for the request.
   * @param {Function} [successCallback] - Callback to invoke if the request
   *   is "successful", with an HTTP response code greater than or equal to
   *   200 and less than or equal to 204. The first parameter to the callback
   *   is the HTTP response object.
   * @param {Function} [errorCallback] - Callback to invoke if the request
   *   "fails". A failed connection to the server or HTTP status code in the
   *   response outside of the range from 200 to 204 is considered to be a
   *   failure. The first parameter to the callback is an `Error` object
   *   containing failure details.
   * @param {Function} [notFoundCallback] - Callback to invoke if the response
   *   to the request includes an HTTP 404 (Not Found) status code. If a `null`
   *   value is provided for this callback and the request results in an HTTP
   *   404 response, the `errorCallback` function, if specified, is invoked.
   *   The first parameter to the callback is the HTTP response object.
   * @private
   */
  this._sendRequestWithAuthInfo = function (sendFunc, options,
                                            successCallback, errorCallback,
                                            notFoundCallback) {
    sendFunc(options, function (error, response) {
      if (error) {
        if (errorCallback) {
          errorCallback(error)
        }
      } else if ([200, 201, 202, 204].indexOf(response.statusCode) >= 0) {
        if (successCallback) {
          successCallback(response)
        }
      } else if ([401, 403].indexOf(response.statusCode) >= 0) {
        if (errorCallback) {
          errorCallback(new TemporaryError(
            'Token potentially expired (' + response.statusCode + '): ' +
            JSON.stringify(response.body)
          ))
        }
        // Call reset on the channel authenticator in order to allow it to
        // clear out any cached credentials (for example, a token) before
        // attempting to retry a request.
        if (channel._auth && channel._auth.reset) {
          channel._auth.reset()
        }
      } else if ((response.statusCode === 404) && notFoundCallback) {
        notFoundCallback(response)
      } else {
        if (errorCallback) {
          errorCallback(new TemporaryError(
            'Unexpected temporary error ' + response.statusCode + ': ' +
            JSON.stringify(response.body)
          ))
        }
      }
    })
  }

  /**
   * Send an HTTP request to the channel.
   * @param {Function} sendFunc - Function to invoke in order to send the
   *   request.
   *
   *   The first parameter in the function call is the same as the `options`
   *   parameter. The second parameter is a callback that the function should
   *   invoke with the results from sending the request.
   *
   *   The first parameter in the call to the results callback is an `Error`
   *   object, if an error occurred during the send attempt, else `null`. The
   *   second parameter is an HTTP response object, if the an HTTP response
   *   could be returned for the request, else `null`.
   * @param {Object} [options] - Options to use for the request.
   * @param {Function} [successCallback] - Callback to invoke if the request
   *   is "successful", with an HTTP response code greater than or equal to
   *   200 and less than or equal to 204. The first parameter to the callback
   *   is the HTTP response object.
   * @param {Function} [errorCallback] - Callback to invoke if the request
   *   "fails". A failed connection to the server or HTTP status code in the
   *   response outside of the range from 200 to 204 is considered to be a
   *   failure. The first parameter to the callback is an `Error` object
   *   containing failure details.
   * @param {Function} [notFoundCallback] - Callback to invoke if the response
   *   to the request includes an HTTP 404 (Not Found) status code. If a `null`
   *   value is provided for this callback and the request results in an HTTP
   *   404 response, the `errorCallback` function, if specified, is invoked.
   *   The first parameter to the callback is the HTTP response object.
   * @private
   */
  this._sendRequest = function (sendFunc, options,
                                successCallback, errorCallback,
                                notFoundCallback) {
    if (typeof options === 'string') {
      options = {uri: options}
    }
    if (this._auth) {
      this._auth.authenticate(
        options,
        function (error, optionsWithAuth) {
          if (error) {
            if (errorCallback) {
              errorCallback(error)
            }
          } else {
            channel._sendRequestWithAuthInfo(sendFunc, optionsWithAuth,
              successCallback, errorCallback, notFoundCallback)
          }
        }
      )
    } else {
      this._sendRequestWithAuthInfo(sendFunc, options,
        successCallback, errorCallback, notFoundCallback)
    }
  }

  /**
   * Perform an operation, retrying the operation in the event that it fails.
   *
   * The `operationCallback` will be called with a `completionCallback`
   * repeatedly until at least one of the following occurs:
   *
   * 1. No error is delivered to the first parameter in the `completionCallback`.
   * 2. A {@link ConsumerError} instance is delivered to the
   *   `completionCallback`.
   * 3. The {@link Channel#retryOnFail} property is set to `false`.
   * 4. An {@link Channel#run} call is active but a stop has been requested,
   *    for example, due to a call to {@link Channel#stop}.
   *
   * @param {Function} [operationCallback] - Function to invoke for each
   *   operation attempt. The first parameter delivered to the
   *   `operationCallback` is a callback which is invoked when the operation
   *   attempt is complete. The first parameter sent to the completion callback
   *   is an `Error` object, if an error occurred when performing the operation,
   *   else `null`. The second parameter sent to the completion callback is an
   *   object representing the `result` of the operation attempt.
   * @param {Function} [completeCallback] - Callback function to invoke after
   *   the final attempt to perform the `operationCallback` is made. The first
   *   parameter sent to the `completeCallback` is an `Error` object, if an
   *   `Error` occurred, else `null`. The second parameter sent to the
   *   `operationCallback` is an object containing the `result` from the last
   *   operation attempt (or `null` if the operation was stopped prior to a
   *   final attempt).
   * @private
   */
  this._retryOnFailure = function (operationCallback, completeCallback) {
    if (!operationCallback) {
      if (completeCallback) {
        callbackAsync(completeCallback)
      }
    } else if (this._stillActive(completeCallback)) {
      var operation = retry.operation({
        factor: RETRY_FACTOR,
        forever: true,
        minTimeout: MIN_RETRY_TIMEOUT,
        maxTimeout: MAX_RETRY_TIMEOUT
      })

      operation.attempt(function () {
        if (channel._stillActive(completeCallback)) {
          if (channel._running && channel._stopRequested) {
            if (completeCallback) {
              completeCallback(new StopError())
            }
          } else {
            operationCallback(function (error, result) {
              if (error && (!(error instanceof ConsumerError)) &&
                channel.retryOnFail && operation.retry(error)) {
                console.log('Retrying due to: ' + error.message)
              } else {
                if (error) {
                  console.log('Will not retry due to: ' + error.message +
                    (channel.retryOnFail ? '' : ' (retries disabled)')
                  )
                }
                if (completeCallback) {
                  completeCallback(error, result)
                }
              }
            })
          }
        }
      })
    }
  }

  /**
   * Determine if the channel is already subscribed to consume an array
   * of `topics`.
   * @param {Array<String>} topics - Array of topics.
   * @returns {boolean} true if the channel is subscribed to the supplied
   *   list of topics, false if there are any differences between the
   *   supplied topics and the array of active topics.
   * @private
   */
  this._alreadySubscribed = function (topics) {
    var returnValue = true
    if (topics.length === this._activeSubscriptions.length) {
      for (var i = 0; i < topics.length; i++) {
        if (topics[i] !== this._activeSubscriptions[i]) {
          returnValue = false
          break
        }
      }
    } else {
      returnValue = false
    }
    return returnValue
  }

  /**
   * Internal helper for subscribing the consumer to an array of topics.
   * @param {Array<String>} topics - Topic array.
   * @param {Function} [callback] - Callback function invoked when the
   *   subscription attempt is complete. The first parameter in the call to the
   *   results callback is an `Error` object, if an error occurred during the
   *   send attempt, else `null`.
   * @private
   */
  this._subscribe = function (topics, callback) {
    this._requestedSubscriptions = topics
    this._retryOnFailure(
      function (retryCallback) {
        var consumerId = channel._consumerId
        channel._sendRequest(
          channel._request.post,
          {
            uri: util.appendUrlSubpath(channel._consumerPathPrefix,
              'consumers/' + consumerId + '/subscription'),
            json: true,
            body: {topics: topics}
          },
          function () {
            channel._activeSubscriptions = topics
            retryCallback(null)
          },
          retryCallback,
          function () {
            retryCallback(new ConsumerError("Consumer '" + consumerId +
              "' does not exist"
            ))
          }
        )
      },
      callback
    )
  }

  /**
   * Function which processes the response from the `processCallback`
   * invoked during a run/consume operation. This function attempts to
   * perform a {@link Channel#commit} for outstanding records which have
   * been consumed but not previously committed and waits before calling
   * the `doneCallback` to continue until the supplied `waitBetweenQueries`
   * has elapsed.
   * @param {Boolean} continueRunning - Whether or not to try to consume
   *   more records.
   * @param {Function} doneCallback - Callback function to invoke when
   *   done processing the response. The first parameter delivered to the
   *   `doneCallback` is an `Error` object, if an error occurred during
   *   processing the response, else `null`. The second parameter
   *   delivered to the `doneCallback` is a `boolean` indicating whether
   *   or not to try to consume more records.
   * @param {Number} waitBetweenQueries - Number of seconds to wait between
   *   calls to consume records.
   * @private
   */
  this._handleProcessCallbackResponse = function (
    continueRunning, doneCallback, waitBetweenQueries) {
    if (this._stopRequested) {
      continueRunning = false
    }
    this.commit(function (commitError) {
      if (commitError || !continueRunning) {
        doneCallback(commitError, continueRunning)
      } else {
        channel._runLoopFunc = function () {
          channel._runLoopFunc = null
          channel._runLoopTimeout = null
          doneCallback(null, !channel._stopRequested)
        }
        channel._runLoopTimeout = setTimeout(channel._runLoopFunc,
          waitBetweenQueries * 1000
        )
      }
    })
  }

  /**
   * Repeatedly consume records from subscribed topics during an
   * active {@link Channel#run}.
   * @param {Function} processCallback - Callback to invoke with an array
   *   of payloads from records which have been consumed.
   * @param {Function} doneCallback - Callback function to invoke when
   *   done performing the consume attempt. The first parameter delivered to the
   *   `doneCallback` is an `Error` object, if an error occurred during the
   *   consume attempt, else `null`. The second parameter delivered to the
   *   `doneCallback` is a `boolean` indicating whether or not to try to consume
   *   more records.
   * @param {Number} waitBetweenQueries - Number of seconds to wait between
   *   calls to consume records.
   * @private
   */
  this._consumeForRun = function (processCallback, doneCallback,
                                  waitBetweenQueries) {
    this.consume(function (consumeError, payloads) {
      if (consumeError) {
        doneCallback(consumeError, false)
      } else {
        try {
          var continueRunning = processCallback(payloads,
            function (processError, continueRunning) {
              if (processError) {
                doneCallback(processError, false)
              } else {
                channel._handleProcessCallbackResponse(continueRunning,
                  doneCallback, waitBetweenQueries)
              }
            })
          if (typeof continueRunning !== 'undefined') {
            channel._handleProcessCallbackResponse(
              continueRunning, doneCallback, waitBetweenQueries
            )
          }
        } catch (processError) {
          doneCallback(processError, false)
        }
      }
    })
  }

  /**
   * Handle `Error` objects delivered during a {@link Channel#run}.
   * @param {Error} error - The error.
   * @param {Function} runLoop - The run loop function to invoke in the event
   *   of a recoverable error.
   * @param {Function} [doneCallback] - Function to invoke if the run has been
   *   stopped.
   * @private
   */
  this._handleRunError = function (error, runLoop, doneCallback) {
    // If a `ConsumerError` occurred, reset the stored consumer info and
    // continue running. As the run is continued, a new consumer should be
    // established.
    if (error instanceof ConsumerError) {
      this.reset()
      runLoop()
    } else {
      this._running = false
      var stopCallbacks = this._stopCallbacks
      this._stopCallbacks = []
      this._stopRequested = false
      // Invoke registered stop callbacks to let them know that the current
      // run has been stopped.
      stopCallbacks.forEach(function (stopCallback) {
        try {
          stopCallback()
        } catch (stopError) {
          console.log('Error thrown from stop callback: ' + stopError)
        }
      })
      if (doneCallback) {
        // If the error was due to the stop being requested, don't bother
        // passing that on to the `doneCallback` since this is was a
        // requested shutdown (as opposed to a more critical channel-related
        // error).
        if (error instanceof StopError) {
          error = null
        }
        doneCallback(error)
      }
    }
  }
}

/**
 * Resets local consumer data stored for the channel.
 */
Channel.prototype.reset = function () {
  this._consumerId = null
  this._activeSubscriptions = []
  this._requestedSubscriptions = []
  this._recordsCommitLog = []
}

/**
 * Creates a new consumer on the consumer group.
 * @param {Function} [callback] - Callback function to invoke when the creation
 *   attempt has completed. The first parameter in the call to the
 *   results callback is an `Error` object, if an error occurred during the
 *   send attempt, else `null`. Possible `Error` types include:
 *
 *   * {@link TemporaryError} - If the creation attempt fails and
 *     {@link Channel#retryOnFail} is set to False.
 *   * {@link PermanentError} - If the channel has been destroyed.
 */
Channel.prototype.create = function (callback) {
  if (!this._consumerGroup) {
    throw new PermanentError(
      "No value specified for 'consumerGroup' option during channel init")
  }

  this.reset()
  var channel = this
  this._retryOnFailure(
    function (retryCallback) {
      channel._sendRequest(
        channel._request.post,
        {
          uri: util.appendUrlSubpath(channel._consumerPathPrefix, 'consumers'),
          json: true,
          body: {
            consumerGroup: channel._consumerGroup,
            configs: channel._configs
          }
        },
        function (response) {
          var consumerInstanceId = response.body.consumerInstanceId
          if (consumerInstanceId) {
            channel._consumerId = consumerInstanceId
            retryCallback(null)
          } else {
            retryCallback(new PermanentError(
              'Unable to locate consumerInstanceId in create consumer response'
            ))
          }
        },
        retryCallback
      )
    },
    callback
  )
}

/**
 * Subscribes the consumer to an array of topics.
 * @param {(String|Array<String>)} topics - Topic or array of topics.
 * @param {Function} [callback] - Callback function invoked when the
 *   subscription attempt is complete. The first parameter in the call to the
 *   results callback is an `Error` object, if an error occurred during the
 *   send attempt, else `null`. Possible `Error` types include:
 *
 *   * {@link ConsumerError} - If the consumer associated with the channel
 *     does not exist on the server and {@link Channel#retryOnFail} is set to
 *     `false`.
 *   * {@link TemporaryError} - If the subscription attempt fails and
 *     {@link Channel#retryOnFail} is set to False.
 *   * {@link PermanentError} - If the channel has been destroyed.
 */
Channel.prototype.subscribe = function (topics, callback) {
  if (!topics) {
    throw new PermanentError('Value must be specified for topics')
  }
  if (typeof topics === 'string') {
    topics = [topics]
  } else if (!topics.length) {
    throw new PermanentError('At least one topic must be specified')
  }

  // As an optimization, avoid re-subscribing if the array of topics
  // to subcribe for is identical to what the consumer is already subscribed to.
  if (this._alreadySubscribed(topics)) {
    callbackAsync(callback)
  } else {
    if (this._consumerId) {
      this._subscribe(topics, callback)
    } else {
      var channel = this
      this.create(function (error) {
        if (error) {
          if (callback) {
            callback(error)
          }
        } else {
          channel._subscribe(topics, callback)
        }
      })
    }
  }
}

/**
 * Consumes records from all the subscribed topics.
 * @param {Function} [callback] - Callback function invoked when the
 *   consume attempt is complete.
 *
 *   The first parameter supplied to the callback is an `Error` object, if an
 *   error occurred during the consume attempt, else `null`. Possible `Error`
 *   types include:
 *
 *   * {@link ConsumerError} - If the consumer associated with the channel
 *     does not exist on the server and {@link Channel#retryOnFail} is set to
 *     `false`.
 *   * {@link TemporaryError} - If the consume attempt fails
 *     and {@link Channel#retryOnFail} is set to False.
 *   * {@link PermanentError} - If the channel has been destroyed.
 *
 * The second parameter supplied to the callback is, for a successful consume,
 * an array of payloads (decoded as objects) from records returned from the
 * server. For a failed consume, the second parameter is `null`.
 * @throws {PermanentError} If the channel has not been subscribed to any
 *   topics.
 */
Channel.prototype.consume = function (callback) {
  if (!this._activeSubscriptions.length) {
    throw new PermanentError('Channel is not subscribed to any topic')
  }

  var channel = this
  this._retryOnFailure(
    function (retryCallback) {
      var consumerId = channel._consumerId
      channel._sendRequest(
        channel._request.get,
        {
          uri: util.appendUrlSubpath(channel._consumerPathPrefix, 'consumers/' +
            consumerId + '/records'),
          json: true
        },
        function (response) {
          var payloads = []
          var records = response.body.records
          records.forEach(function (record) {
            channel._recordsCommitLog.push({
              topic: record.routingData.topic,
              partition: record.partition,
              offset: record.offset
            })
            payloads.push(JSON.parse(Buffer.from(record.message.payload,
              'base64')))
          })
          retryCallback(null, payloads)
        },
        retryCallback,
        function () {
          retryCallback(new ConsumerError("Consumer '" + consumerId +
            "' does not exist"
          ))
        }
      )
    },
    callback
  )
}

/**
 * Commits the record offsets to the channel.
 * @param {Function} [callback] - Callback function invoked when the
 *   commit attempt is complete.
 *
 *   The first parameter supplied to the callback is an `Error` object, if an
 *   error occurred during the commit attempt, else `null`. Possible `Error`
 *   types include:
 *
 *   * {@link ConsumerError} - If the consumer associated with the channel
 *     does not exist on the server and {@link Channel#retryOnFail} is set to
 *     `false`.
 *   * {@link TemporaryError} - If the commit attempt fails
 *     and {@link Channel#retryOnFail} is set to False.
 *   * {@link PermanentError} - If the channel has been destroyed.
 */
Channel.prototype.commit = function (callback) {
  if (this._recordsCommitLog.length) {
    var channel = this
    this._retryOnFailure(
      function (retryCallback) {
        var consumerId = channel._consumerId
        channel._sendRequest(
          channel._request.post,
          {
            uri: util.appendUrlSubpath(channel._consumerPathPrefix,
              'consumers/' + consumerId + '/offsets'),
            json: true,
            body: {offsets: channel._recordsCommitLog}
          },
          function () {
            channel._recordsCommitLog = []
            retryCallback(null)
          },
          retryCallback,
          function () {
            retryCallback(new ConsumerError("Consumer '" + consumerId +
              "' does not exist"
            ))
          }
        )
      },
      callback
    )
  } else {
    callbackAsync(callback)
  }
}

/**
 * Repeatedly consume records from the subscribed topics. The supplied
 * `processCallback` is an array of payloads (decoded as objects) from records
 * returned from the server.
 *
 * The `processCallback` should return a value of `true` in order for this
 * function to continue consuming additional records. For a return value of
 * `false` or no return value, no additional records will be consumed and this
 * function will return.
 *
 * The {@link Channel#stop} method can also be called to halt an execution of
 * this method.
 * @param {Function} processCallback - Callback to invoke with an array
 *   of payloads from records which have been consumed.
 * @param {Object} [options] - Options to use for the channel.
 * @param {Function} [options.doneCallback] - Callback to invoke when the run is
 *   complete. The first parameter supplied to the callback is an `Error`
 *   object, if an error occurred during the run, else `null`.
 * @param {Number} [options.waitBetweenQueries=30] - Number of seconds to wait
 *   between calls to consume records.
 * @param {(String|Array<String>)} [options.topics] - Topic or array of topics.
 *   If set to a non-empty value, the channel will be subscribed to the
 *   specified topics. If set to an empty value, the channel will use topics
 *   previously subscribed via a call to the {@link Channel#subscribe} method.
 * @throws {PermanentError} If a previous run is already in progress.
 */
Channel.prototype.run = function (processCallback, options) {
  if (!this._consumerGroup) {
    throw new PermanentError(
      "No value specified for 'consumerGroup' option during channel init")
  }

  options = options || {}

  if (!processCallback) {
    throw new PermanentError('Value must be specified for processCallback')
  }
  if (this._running) {
    throw new PermanentError('Previous run already in progress')
  }

  var doneCallback = options.doneCallback

  var waitBetweenQueries =
    options.waitBetweenQueries || DEFAULT_WAIT_BETWEEN_QUERIES

  var topics = options.topics
  if (typeof topics === 'string') {
    topics = [topics]
  }
  if (topics) {
    if (topics.length) {
      this._requestedSubscriptions = topics
    } else {
      throw new PermanentError('At least one topic must be specified')
    }
  } else if (!this._activeSubscriptions.length) {
    throw new PermanentError('Channel is not subscribed to any topic')
  }

  this._running = true
  var channel = this

  var doConsumeLoop = function (processCallback, doneCallback) {
    channel._consumeForRun(processCallback,
      function (consumeLoopError, continueRunning) {
        if (!consumeLoopError && continueRunning) {
          doConsumeLoop(processCallback, doneCallback)
        } else {
          if (doneCallback) {
            doneCallback(consumeLoopError)
          }
        }
      },
      waitBetweenQueries
    )
  }

  var currentSubscriptions = channel._requestedSubscriptions.slice()
  var doRun = function () {
    channel.subscribe(currentSubscriptions, function (subscribeError) {
      if (subscribeError) {
        channel._handleRunError(subscribeError, doRun, doneCallback)
      } else {
        doConsumeLoop(processCallback,
          function (consumeLoopError) {
            currentSubscriptions = channel._requestedSubscriptions.slice()
            channel._handleRunError(consumeLoopError, doRun, doneCallback)
          })
      }
    })
  }

  doRun()
}

/**
 * Stop an active execution of a {@link Channel#run}. If no run is active, the
 * supplied callback is invoked immediately. If a run is active, the supplied
 * callback is invoked after the run has been completed.
 *
 * @param {Function} [callback] - Function to invoke when the run has been
 *   stopped.
 */
Channel.prototype.stop = function (callback) {
  if (this._running) {
    this._stopRequested = true
    if (this._runLoopTimeout) {
      clearTimeout(this._runLoopTimeout)
      this._runLoopTimeout = null
      if (this._runLoopFunc) {
        callbackAsync(this._runLoopFunc)
        this._runLoopFunc = null
      }
    }
    if (callback) {
      this._stopCallbacks.push(callback)
    }
  } else {
    callbackAsync(callback)
  }
}

/**
 * Deletes the consumer from the consumer group.
 * @param {Function} [callback] - Callback to invoke after the consumer has
 *   been deleted. The first parameter supplied to the callback is an `Error`
 *   object, if an error occurred during the delete, else `null`.
 */
Channel.prototype.delete = function (callback) {
  var consumerId = this._consumerId
  if (consumerId) {
    var channel = this
    channel._sendRequest(
      channel._request.delete,
      util.appendUrlSubpath(channel._consumerPathPrefix, 'consumers/' +
        consumerId),
      function () {
        channel.reset()
        if (callback) {
          callback(null)
        }
      },
      callback,
      function () {
        channel.reset()
        if (callback) {
          callback(new ConsumerError("Consumer with ID '" +
            consumerId +
            "' not found. Resetting consumer anyways."))
        }
      }
    )
  } else {
    callbackAsync(callback)
  }
}

/**
 * Produces records to the channel.
 * @param {Object} payload - Payload containing the records to be posted to the
 *   channel.
 * @param {Function} [callback] - Function to invoke when the produce has been
 *   stopped. The first parameter supplied to the callback is an `Error`
 *   object, if an error occurred during the produce, else `null`. The parameter
 *   will be of type {@link PermanentError} if an unsuccessful response is
 *   received from the streaming service for the produce attempt.
 */
Channel.prototype.produce = function (payload, callback) {
  this._sendRequest(
    this._request.post,
    {
      uri: util.appendUrlSubpath(this._producerPathPrefix, 'produce'),
      json: true,
      body: payload,
      headers: {
        'Content-Type': 'application/vnd.dxl.intel.records.v1+json'
      }
    },
    function () {
      if (callback) {
        callback(null)
      }
    },
    callback
  )
}

/**
 * Destroys the channel (releases all associated resources).
 *
 * **NOTE:** Once the method has been invoked, no other calls should be
 * made to the channel.
 *
 * @param {Function} [callback] - Function to invoke when the channel has been
 *   destroyed. The first parameter supplied to the callback is an `Error`
 *   object, if an error occurred during the destroy, else `null`. The parameter
 *   will be of type {@link TemporaryError} if a consumer has previously been
 *   created for the channel but an attempt to delete the consumer from the
 *   channel fails.
 */
Channel.prototype.destroy = function (callback) {
  if (this._active) {
    var channel = this
    this.stop(function (stopError) {
      if (stopError) {
        callback(stopError)
      } else {
        channel.delete(function (deleteError) {
          if (deleteError) {
            if (callback) {
              callback(deleteError)
            }
          } else {
            channel._active = false
            if (callback) {
              callback(null)
            }
          }
        })
      }
    })
  } else {
    callbackAsync(callback)
  }
}

module.exports = Channel