'use strict'
var events = require('events')
var inherits = require('inherits')
var mqtt = require('mqtt')
var CallbackManager = require('./callback-manager')
var decodeMessage = require('./decode-message')
var DxlError = require('./dxl-error')
var message = require('./message')
var RequestManager = require('./request-manager')
var ServiceManager = require('./service-manager')
var HttpsProxyAgent = require('https-proxy-agent')
var url = require('url')
var nodeUtil = require('util')
var debug = nodeUtil.debuglog('dxlclient')
var REPLY_TO_PREFIX = '/mcafee/client/'
/**
* @classdesc Responsible for all communication with the
* Data Exchange Layer (DXL) fabric. (It can be thought of as the 'main'
* class). All other classes exist to support the functionality provided by
* the client.
* @example
* var dxl = require('@opendxl/dxl-client')
* var fs = require('fs')
* var config = new dxl.Config(
* fs.readFileSync('c:\\certs\\brokercerts.crt'),
* fs.readFileSync('c:\\certs\\client.crt'),
* fs.readFileSync('c:\\certs\\client.key'),
* [dxl.Broker.parse('ssl://192.168.99.100')])
*
* var client = new dxl.Client(config)
* client.connect()
* @param {Config} config - Object containing the configuration settings for
* the client
* @constructor
*/
function Client (config) {
/**
* The {@link Config} instance that was specified when the client was
* constructed.
* @type {Config}
* @name Client#config
*/
this.config = config
/**
* The client id derived from the client configuration.
* @private
* @type {String}
* @name Client#_clientId
*/
this._clientId = config._clientId
/**
* Information for the last broker to which the client connected.
* @private
* @type {Broker}
* @name Client#_lastConnectedBroker
*/
this._lastConnectedBroker = null
/**
* Prefix of the topic that the broker will reply to for request messages.
* @private
* @type {string}
* @name Client#_replyToTopic
*/
this._replyToTopic = REPLY_TO_PREFIX + this._clientId
/**
* Handle to the underlying MQTT client object
* @private
* @type {MqttClient}
* @name Client#_mqttClient
*/
this._mqttClient = null
/**
* Mapping of registered subscriptions. Each key in the object represents
* a topic string. Each value in the object is an object with keys
* representing one of the type constants in the {@link Message} class - for
* example, {@link Message.MESSAGE_TYPE_RESPONSE}, and values being an array
* of callback functions.
* @private
* @example
* var this._subscriptionsByMessageType = {}
* var callbacksForTopic = {}
* callbacksForTopic[Message.MESSAGE_TYPE_RESPONSE] =
* function(message) { console.log('my callback') }
* this._subscriptionsByMessageType['/topic1'] = callbacksForTopic
* @name Client#_callbacksByMessageType
* @default {}
* @type {Object}
* @name Client#_subscriptionsByMessageType
*/
this._subscriptionsByMessageType = {}
/**
* Handle to a manager of message callbacks registered with the client.
* @private
* @type {CallbackManager}
* @name Client#_callbackManager
*/
this._callbackManager = new CallbackManager()
/**
* Handle to a manager of requests made to a broker.
* @private
* @type {RequestManager}
* @name Client#_requestManager
*/
this._requestManager = new RequestManager(this, this._replyToTopic)
/**
* Handle to a manager of services registered with the broker.
* @private
* @type {ServiceManager}
* @name Client#_serviceManager
*/
this._serviceManager = new ServiceManager(this)
/**
* Unique host/port combinations distilled from the brokers in the client
* configuration. See {@link Config#brokers}. This array of broker
* information is passed along to the MQTT client to establish a connection.
* @private
* @example
* var this._servers = [ { host: 'host1', port: 8883 },
* { host: 'host2', port: 8993 } ]
* @type {Array<Object>}
* @name Client#_servers
*/
this._servers = this.config.brokers.reduce(function (result, broker) {
broker.hosts.forEach(function (host) {
if (!result.some(function (hostPortEntry) {
return (host === hostPortEntry.host) &&
(broker.port === hostPortEntry.port)
})) {
result.push({host: host, port: broker.port})
}
})
return result
}, [])
this._iswebSocketEnabled = this.config.useWebSockets
events.EventEmitter.call(this)
}
inherits(Client, events.EventEmitter)
/**
* @property {Boolean} - Whether or not the client is currently connected to
* the DXL fabric.
* @name Client#connected
*/
Object.defineProperty(Client.prototype, 'connected', {
get: function () {
return this._mqttClient ? this._mqttClient.connected : false
}
})
/**
* @property {Broker} - Broker that the client is currently connected to.
* _null_ is returned if the client is not currently connected to a broker.
* @name Client#currentBroker
*/
Object.defineProperty(Client.prototype, 'currentBroker', {
get: function () {
return this.connected ? this._lastConnectedBroker : null
}
})
var EXPLICIT_SUBSCRIPTION_MESSAGE_TYPE = ''
function explicitSubscriptionCallback () {}
/**
* Add a topic subscription.
* @private
* @param {Client} client - The {@link Client} instance to which the topic
* subscription should be added.
* @param {String} topic - Topic to subscribe to. An empty string or null value
* indicates that the callback should receive messages for all topics
* (no filtering).
* @param {(Number|String)} messageType - Type of DXL messages for which the
* callback should be invoked. Corresponds to one of the message type
* constants in the {@link Message} class - for example,
* {@link Message.MESSAGE_TYPE_RESPONSE}.
* @param {Function} callback - Callback function which should be invoked
* for a matching message. The first argument passed to the callback
* function is the DXL Message object.
* @param {Boolean} [subscribeToTopic=true] - Whether or not to subscribe for
* the topic with the broker.
*/
function addSubscription (client, topic, messageType,
callback, subscribeToTopic) {
if (typeof (subscribeToTopic) === 'undefined') { subscribeToTopic = true }
if (callback !== explicitSubscriptionCallback) {
client._callbackManager.addCallback(messageType, topic, callback)
}
if (subscribeToTopic && topic) {
var topicMessageTypes = client._subscriptionsByMessageType[topic]
// Only subscribe for the topic with the broker if no prior
// subscription has been established
if (!topicMessageTypes) {
if (client._mqttClient) {
client._mqttClient.subscribe(topic)
}
topicMessageTypes = {}
client._subscriptionsByMessageType[topic] = topicMessageTypes
}
var messageTypeCallbacks = topicMessageTypes[messageType]
if (messageTypeCallbacks) {
if (messageTypeCallbacks.indexOf(callback) < 0) {
messageTypeCallbacks.push(callback)
}
} else {
topicMessageTypes[messageType] = [callback]
}
}
}
/**
* Removes a topic subscription.
* @private
* @param {Client} client - The {@link Client} instance from which the topic
* subscription should be removed.
* @param {String} topic - Topic to unsubscribe from.
* @param {(Number|String)} messageType - Type of DXL messages for which the
* callback should be invoked. Corresponds to one of the message type
* constants in the {@link Message} class - for example,
* {@link Message.MESSAGE_TYPE_RESPONSE}.
* @param {Function} callback - Callback function which should be invoked
* for a matching message.
*/
function removeSubscription (client, topic, messageType, callback) {
if (callback !== explicitSubscriptionCallback) {
client._callbackManager.removeCallback(messageType, topic, callback)
}
if (topic) {
var subscriptionsByMessageType = client._subscriptionsByMessageType
var topicMessageTypes = subscriptionsByMessageType[topic]
if (topicMessageTypes) {
// If a call to the client's unsubscribe() function for the topic
// was made, unsubscribe regardless of any other active
// callback-based subscriptions
if (callback === explicitSubscriptionCallback) {
delete subscriptionsByMessageType[topic]
} else {
var messageTypeCallbacks = topicMessageTypes[messageType]
if (messageTypeCallbacks) {
var callbackPosition = messageTypeCallbacks.indexOf(callback)
if (callbackPosition > -1) {
if (messageTypeCallbacks.length > 1) {
// Remove the callback from the list of subscribers
// for the topic and associated message type
messageTypeCallbacks.splice(callbackPosition, 1)
} else {
if (Object.keys(topicMessageTypes).length > 1) {
// Remove the message type entry since no more callbacks
// are registered for the topic
delete topicMessageTypes[messageType]
} else {
// Remove the topic entry since no more message types are
// registered for it
delete subscriptionsByMessageType[topic]
}
}
}
}
}
if (client._mqttClient && !subscriptionsByMessageType[topic]) {
client._mqttClient.unsubscribe(topic)
}
}
}
}
/**
* Publishes data to a specific topic.
* @private
* @param {Client} client - The {@link Client} instance to which the message
* should be published.
* @param {String} topic - Topic to publish message to.
* @param {(String|Buffer)} message - Message to publish.
* @throws {DxlError} If the MQTT client is not connected.
*/
function publish (client, topic, message) {
if (client._mqttClient) {
client._mqttClient.publish(topic, message)
} else {
throw new DxlError(
'Client not connected, unable to publish data to: ' + topic)
}
}
/**
* Attempts to connect the client to the DXL fabric. This method returns
* immediately if a broker has been configured to connect to. The connection
* is established asynchronously. If provided, the callback function will
* be invoked the first time a connection has been established to the broker.
* @param {Function} [callback=null] - Callback function to invoke when
* the connection is first established. No arguments are passed to the
* callback.
* @throws {DxlError} If no brokers have been specified in the Config passed
* to the client constructor.
*/
Client.prototype.connect = function (callback) {
if (!this._servers.length) {
throw new DxlError(
'Unable to connect: no brokers specified in the client configuration')
}
var that = this
var firstConnection = true
// server list
var brokerList = this._servers
// default protocol is MQTT
var protocolToUse = 'mqtts'
var rejectUnauthorized = true
// if UseWebSockets property is enabled or mqtt broker list is empty, connect via WebSockets
if (this._iswebSocketEnabled) {
protocolToUse = 'wss'
rejectUnauthorized = false
console.log('Setting \'UseWebSockets\' is enabled. Client will connect via WebSockets')
}
var connectOptions = {
servers: brokerList,
protocol: protocolToUse,
protocolId: 'MQIsdp',
protocolVersion: 3,
clientId: this._clientId,
key: this.config.privateKey,
cert: this.config.cert,
ca: this.config.brokerCaBundle,
checkServerIdentity: function () {
return undefined
},
keepalive: this.config.keepAliveInterval,
reconnectPeriod: this.config.reconnectDelay * 1000,
rejectUnauthorized: rejectUnauthorized,
requestCert: true
}
// check proxy configuration (Optional)
var proxy = that.config.proxy
if (protocolToUse === 'wss' && proxy) {
var port = proxy.port
var hostname = proxy.address
var user = proxy.user
var pwd = proxy.password
// http proxy support Only
var proxyUrl = nodeUtil.format('http://%s:%d', hostname, port)
var proxyOptions = url.parse(proxyUrl)
// true for wss
proxyOptions.secureEndpoint = true
console.log('Connecting via Proxy:', proxyUrl)
if (user && pwd) {
proxyOptions.auth = nodeUtil.format('%s:%s', user, pwd)
console.log('Proxy connection user:', user)
}
// Set up a proxy agent
var wsagent = new HttpsProxyAgent(proxyOptions)
// Set Agent for wsOption in MQTT
connectOptions.wsOptions = {
agent: wsagent
}
}
debug('Connect options', connectOptions)
var mqttClient = mqtt.connect(connectOptions)
Object.keys(this._subscriptionsByMessageType).forEach(function (topic) {
mqttClient.subscribe(topic)
})
mqttClient.on('connect', function () {
that._lastConnectedBroker = null
if (connectOptions.host && connectOptions.port) {
console.log('Connected to: ' + connectOptions.host + ':' +
connectOptions.port)
for (var index = 0; index < that.config.brokers.length; index++) {
var broker = that.config.brokers[index]
if ((broker.hosts.indexOf(connectOptions.host) > -1) &&
(broker.port === connectOptions.port)) {
that._lastConnectedBroker = broker
break
}
}
} else {
console.log('Connected')
}
that._serviceManager.onConnected()
if (typeof (callback) !== 'undefined' && callback && firstConnection) {
firstConnection = false
callback()
}
that.emit('connect')
})
mqttClient.on('close', function () {
that.emit('close')
})
mqttClient.on('error', function (error) {
console.log(error.toString())
// Avoid emitting MQTT client errors if there are no registered listeners.
// Otherwise, the error would be unhandled and could, in Node.js, shut
// down the hosting process.
if (that.listenerCount('error') > 0) {
that.emit('error', error)
}
})
mqttClient.on('message', function (topic, rawMessage) {
try {
var message = decodeMessage(rawMessage)
message.destinationTopic = topic
that._callbackManager.onMessage(message)
} catch (err) {
console.log('Failed to process incoming message: ' + err)
}
})
mqttClient.on('packetreceive', function (packet) {
that.emit('packetreceive', packet)
})
mqttClient.on('packetsend', function (packet) {
that.emit('packetsend', packet)
})
mqttClient.on('reconnect', function () {
that.emit('reconnect')
})
this._mqttClient = mqttClient
}
/**
* Adds an event callback to the client for the specified topic. The callback
* will be invoked when {@link Event} messages are received by the client on
* the specified topic.
* @param {String} topic - Topic to receive {@link Event} messages on. An empty
* string or null value indicates that the callback should receive messages
* for all topics (no filtering).
* @param {Function} eventCallback - Callback function which should be invoked
* for a matching message. The first argument passed to the callback
* function is the {@link Event} object.
* @param {Boolean} [subscribeToTopic=true] - Whether or not to subscribe for
* the topic with the broker.
*/
Client.prototype.addEventCallback = function (topic,
eventCallback,
subscribeToTopic) {
addSubscription(this, topic, message.MESSAGE_TYPE_EVENT, eventCallback,
subscribeToTopic)
}
/**
* Removes an event callback from the client for the specified topic. This
* method must be invoked with the same arguments as when the callback was
* originally registered via {@link Client#addEventCallback}.
* @param {String} topic - The topic to remove the callback for.
* @param {Function} eventCallback - The event callback to be removed for the
* specified topic.
*/
Client.prototype.removeEventCallback = function (topic, eventCallback) {
removeSubscription(this, topic, message.MESSAGE_TYPE_EVENT,
eventCallback)
}
/**
* Adds a request callback to the client for the specified topic. The callback
* will be invoked when {@link Request} messages are received by the client on
* the specified topic. Note that usage of this is quite rare due to the fact
* that registration of instances with the client occurs automatically when
* registering a service.
* @param {String} topic - Topic to receive {@link Request} messages on. A
* empty string or null value indicates that the callback should receive
* messages for all topics (no filtering).
* @param {Function} requestCallback - Callback function which should be
* invoked for a matching message. The first argument passed to the callback
* function is the {@link Request} object.
* @param {Boolean} [subscribeToTopic=true] - Whether or not to subscribe for
* the topic with the broker.
*/
Client.prototype.addRequestCallback = function (topic,
requestCallback,
subscribeToTopic) {
addSubscription(this, topic, message.MESSAGE_TYPE_REQUEST,
requestCallback, subscribeToTopic)
}
/**
* Removes a request callback from the client for the specified topic. This
* method must be invoked with the same arguments as when the callback was
* originally registered via {@link Client#addRequestCallback}.
* @param {String} topic - The topic to remove the callback for.
* @param {Function} requestCallback - The request callback to be removed for
* the specified topic.
*/
Client.prototype.removeRequestCallback = function (topic, requestCallback) {
removeSubscription(this, topic, message.MESSAGE_TYPE_REQUEST,
requestCallback)
}
/**
* Adds a response callback to the client for the specified topic. The callback
* will be invoked when {@link Response} messages are received by the client on
* the specified topic. Note that usage of this is quite rare due to the fact
* that the use of response callbacks are typically limited to invoking a
* remote DXL service via the {@link Client#asyncRequest} method.
* @param {String} topic - Topic to receive {@link Response} messages on. A
* empty string or null value indicates that the callback should receive
* messages for all topics (no filtering).
* @param {Function} responseCallback - Callback function which should be
* invoked for a matching message. The first argument passed to the callback
* function is the {@link Request} object.
* @param {Boolean} [subscribeToTopic=true] - Whether or not to subscribe for
* the topic with the broker.
*/
Client.prototype.addResponseCallback = function (topic,
responseCallback,
subscribeToTopic) {
addSubscription(this, topic, message.MESSAGE_TYPE_RESPONSE,
responseCallback, subscribeToTopic)
addSubscription(this, topic, message.MESSAGE_TYPE_ERROR,
responseCallback, subscribeToTopic)
}
/**
* Removes a response callback from the client for the specified topic. This
* method must be invoked with the same arguments as when the callback was
* originally registered via {@link Client#addResponseCallback}.
* @param {String} topic - The topic to remove the callback for.
* @param {Function} responseCallback - The response callback to be removed for
* the specified topic.
*/
Client.prototype.removeResponseCallback = function (topic,
responseCallback) {
removeSubscription(this, topic, message.MESSAGE_TYPE_RESPONSE,
responseCallback)
removeSubscription(this, topic, message.MESSAGE_TYPE_ERROR,
responseCallback)
}
/**
* Subscribes to the specified topic on the DXL fabric. This method is
* typically used in conjunction with the registration of event callbacks
* via the {@link Client#addEventCallback} method.
*
* **NOTE:** By default when registering an event callback the client will
* automatically subscribe to the topic. In the example below, the
* {@link Client#addEventCallback} method is invoked with the
* _subscribeToTopic_ parameter set to false, preventing the automatic
* subscription.
* @example
* client.addEventCallback('/testeventtopic',
* function (event) {
* console.log('Received event! ' + event.sourceClientId)
* }, false)
* client.subscribe('/testeventtopic')
* @param {String} topic - The topic to subscribe to
*/
Client.prototype.subscribe = function (topic) {
addSubscription(this, topic, EXPLICIT_SUBSCRIPTION_MESSAGE_TYPE,
explicitSubscriptionCallback)
}
/**
* Unsubscribes from the specified topic on the DXL fabric. See the
* {@link Client#subscribe} method for more information on subscriptions.
* @param {String} topic - The topic to unsubscribe from.
*/
Client.prototype.unsubscribe = function (topic) {
removeSubscription(this, topic, EXPLICIT_SUBSCRIPTION_MESSAGE_TYPE,
explicitSubscriptionCallback)
}
/**
* @property {Array<String>} - An array containing the topics that the client
* is currently subscribed to. See {@link Client#subscribe} for more
* information on adding subscriptions.
* @name Client#subscriptions
*/
Object.defineProperty(Client.prototype, 'subscriptions', {
get: function () { return Object.keys(this._subscriptionsByMessageType) }
})
/**
* Sends a {@link Request} message to a remote DXL service asynchronously. An
* optional response callback can be specified. This callback will be invoked
* when the corresponding {@link Response} message (or an error) is received by
* the client.
* @param {Request} request - The request message to send to a remote DXL
* service.
* @param {Function} [responseCallback] - An optional response callback
* that will be invoked with the result of the request.
*
* If an error occurs during the request, the first parameter supplied to the
* callback contains an {@link Error} with failure details. If the response
* from the DXL fabric to the request includes an {@link ErrorResponse}, the
* first parameter is a {@link RequestError} (which contains the error
* response in its {@link RequestError#dxlErrorResponse} property).
*
* If the request is successful, the second parameter includes a
* {@link Response} message.
* @throws {DxlError} If no prior attempt has been made to connect the client
* via a call to {@link Client#connect}.
*/
Client.prototype.asyncRequest = function (request, responseCallback) {
this._requestManager.asyncRequest(request, responseCallback)
}
/**
* Attempts to deliver the specified {@link Event} message to the DXL fabric.
* See {@link Message} for more information on message types, how they are
* delivered to remote clients, etc.
* @param {Event} event - The {@link Event} to send.
* @throws {DxlError} If no prior attempt has been made to connect the client
* via a call to {@link Client#connect}.
*/
Client.prototype.sendEvent = function (event) {
publish(this, event.destinationTopic, event._toBytes())
}
/**
* Attempts to deliver the specified {@link Response} message to the DXL
* fabric. The fabric will in turn attempt to deliver the response back to the
* client who sent the corresponding {@link Request}.
* @param {Response} response - The {@link Response} to send.
* @throws {DxlError} If no prior attempt has been made to connect the client
* via a call to {@link Client#connect}.
*/
Client.prototype.sendResponse = function (response) {
publish(this, response.destinationTopic, response._toBytes())
}
/**
* Registers a DXL service with the fabric asynchronously. The specified
* {@link ServiceRegistrationInfo} instance contains information about the
* service that is to be registered.
* @param {ServiceRegistrationInfo} serviceRegInfo - A
* {@link ServiceRegistrationInfo} instance containing information about the
* service that is to be registered.
* @param {Function} [callback=null] - An optional callback that will be
* invoked when the registration attempt is complete. If an error occurs
* during the registration attempt, the first parameter supplied to the
* callback contains an {@link Error} with failure details.
*/
Client.prototype.registerServiceAsync = function (serviceRegInfo, callback) {
this._serviceManager.registerServiceAsync(serviceRegInfo, callback)
}
/**
* Unregisters (removes) a DXL service from the fabric asynchronously. The
* specified {@link ServiceRegistrationInfo} instance contains information
* about the service that is to be removed.
* @param {ServiceRegistrationInfo} serviceRegInfo - A
* {@link ServiceRegistrationInfo} instance containing information about the
* service that is to be unregistered.
* @param {Function} [callback=null] - An optional callback that will be
* invoked when the unregistration attempt is complete. If an error occurs
* during the unregistration attempt, the first parameter supplied to the
* callback contains an {@link Error} with failure details.
*/
Client.prototype.unregisterServiceAsync = function (serviceRegInfo, callback) {
this._serviceManager.unregisterServiceAsync(serviceRegInfo, callback)
}
/**
* Attempts to disconnect the client from the DXL fabric.
* @param {Function} [callback=null] - An optional callback that will be
* invoked when the disconnection attempt is complete. No arguments are
* passed to the callback.
*/
Client.prototype.disconnect = function (callback) {
var invokeCallback = (typeof (callback) !== 'undefined')
if (this._mqttClient) {
// If the client is already disconnected, avoid waiting for any in-flight
// messages to be acked before proceeding with the shutdown of client
// resources. These messages would not be acked anyway - since no connection
// exists to deliver the acks. Waiting for acks when the client is already
// disconnected could cause resources to never be freed - and a callback to
// not be invoked.
var doNotWaitForAcksOnInFlightMessages = !this.connected
// The underlying MQTT client only invokes a callback given to it if
// it isn't already in the process of disconnecting. The logic below
// ensures that the callback is only invoked once.
if (this._mqttClient.disconnecting) {
this._mqttClient.end(doNotWaitForAcksOnInFlightMessages)
} else {
invokeCallback = false
this._mqttClient.end(doNotWaitForAcksOnInFlightMessages, callback)
}
}
if (invokeCallback) {
callback()
}
}
/**
* Destroys the client (releases all associated resources).
*
* **NOTE:** Once the method has been invoked, no other calls should be made to
* the client.
*
* @param {Function} [callback=null] - An optional callback that will be
* invoked after client resources have been destroyed. No arguments are
* passed to the callback.
*/
Client.prototype.destroy = function (callback) {
this._serviceManager.destroy()
if (this._mqttClient) {
var topics = Object.keys(this._subscriptionsByMessageType)
if (topics.length) {
this._mqttClient.unsubscribe(topics)
}
}
this.disconnect(callback)
this._subscriptionsByMessageType = {}
this._callbackManager.destroy()
}
/**
* Sends a request.
* @private
* @param {Request} request - The {@link Request} to send.
* @private
*/
Client.prototype._sendRequest = function (request) {
request.replyToTopic = this._replyToTopic
publish(this, request.destinationTopic, request._toBytes())
}
module.exports = Client