| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091 |
- 'use strict'
-
- /**
- * Module dependencies
- */
- var events = require('events')
- var Store = require('./store')
- var eos = require('end-of-stream')
- var mqttPacket = require('mqtt-packet')
- var Writable = require('readable-stream').Writable
- var inherits = require('inherits')
- var reInterval = require('reinterval')
- var validations = require('./validations')
- var xtend = require('xtend')
- var setImmediate = global.setImmediate || function (callback) {
- // works in node v0.8
- process.nextTick(callback)
- }
- var defaultConnectOptions = {
- keepalive: 60,
- reschedulePings: true,
- protocolId: 'MQTT',
- protocolVersion: 4,
- reconnectPeriod: 1000,
- connectTimeout: 30 * 1000,
- clean: true,
- resubscribe: true
- }
-
- function defaultId () {
- return 'mqttjs_' + Math.random().toString(16).substr(2, 8)
- }
-
- function sendPacket (client, packet, cb) {
- client.emit('packetsend', packet)
-
- var result = mqttPacket.writeToStream(packet, client.stream)
-
- if (!result && cb) {
- client.stream.once('drain', cb)
- } else if (cb) {
- cb()
- }
- }
-
- function flush (queue) {
- if (queue) {
- Object.keys(queue).forEach(function (messageId) {
- if (typeof queue[messageId] === 'function') {
- queue[messageId](new Error('Connection closed'))
- delete queue[messageId]
- }
- })
- }
- }
-
- function storeAndSend (client, packet, cb) {
- client.outgoingStore.put(packet, function storedPacket (err) {
- if (err) {
- return cb && cb(err)
- }
- sendPacket(client, packet, cb)
- })
- }
-
- function nop () {}
-
- /**
- * MqttClient constructor
- *
- * @param {Stream} stream - stream
- * @param {Object} [options] - connection options
- * (see Connection#connect)
- */
- function MqttClient (streamBuilder, options) {
- var k
- var that = this
-
- if (!(this instanceof MqttClient)) {
- return new MqttClient(streamBuilder, options)
- }
-
- this.options = options || {}
-
- // Defaults
- for (k in defaultConnectOptions) {
- if (typeof this.options[k] === 'undefined') {
- this.options[k] = defaultConnectOptions[k]
- } else {
- this.options[k] = options[k]
- }
- }
-
- this.options.clientId = (typeof this.options.clientId === 'string') ? this.options.clientId : defaultId()
-
- this.streamBuilder = streamBuilder
-
- // Inflight message storages
- this.outgoingStore = this.options.outgoingStore || new Store()
- this.incomingStore = this.options.incomingStore || new Store()
-
- // Should QoS zero messages be queued when the connection is broken?
- this.queueQoSZero = this.options.queueQoSZero === undefined ? true : this.options.queueQoSZero
-
- // map of subscribed topics to support reconnection
- this._resubscribeTopics = {}
-
- // map of a subscribe messageId and a topic
- this.messageIdToTopic = {}
-
- // Ping timer, setup in _setupPingTimer
- this.pingTimer = null
- // Is the client connected?
- this.connected = false
- // Are we disconnecting?
- this.disconnecting = false
- // Packet queue
- this.queue = []
- // connack timer
- this.connackTimer = null
- // Reconnect timer
- this.reconnectTimer = null
- // MessageIDs starting with 1
- this.nextId = Math.floor(Math.random() * 65535)
-
- // Inflight callbacks
- this.outgoing = {}
-
- // Mark connected on connect
- this.on('connect', function () {
- if (this.disconnected) {
- return
- }
-
- this.connected = true
- var outStore = null
- outStore = this.outgoingStore.createStream()
-
- // Control of stored messages
- outStore.once('readable', function () {
- function storeDeliver () {
- var packet = outStore.read(1)
- var cb
-
- if (!packet) {
- return
- }
-
- // Avoid unnecessary stream read operations when disconnected
- if (!that.disconnecting && !that.reconnectTimer && that.options.reconnectPeriod > 0) {
- outStore.read(0)
- cb = that.outgoing[packet.messageId]
- that.outgoing[packet.messageId] = function (err, status) {
- // Ensure that the original callback passed in to publish gets invoked
- if (cb) {
- cb(err, status)
- }
-
- storeDeliver()
- }
- that._sendPacket(packet)
- } else if (outStore.destroy) {
- outStore.destroy()
- }
- }
- storeDeliver()
- }).on('error', this.emit.bind(this, 'error'))
- })
-
- // Mark disconnected on stream close
- this.on('close', function () {
- this.connected = false
- clearTimeout(this.connackTimer)
- })
-
- // Setup ping timer
- this.on('connect', this._setupPingTimer)
-
- // Send queued packets
- this.on('connect', function () {
- var queue = this.queue
-
- function deliver () {
- var entry = queue.shift()
- var packet = null
-
- if (!entry) {
- return
- }
-
- packet = entry.packet
-
- that._sendPacket(
- packet,
- function (err) {
- if (entry.cb) {
- entry.cb(err)
- }
- deliver()
- }
- )
- }
-
- deliver()
- })
-
- var firstConnection = true
- // resubscribe
- this.on('connect', function () {
- if (!firstConnection &&
- this.options.clean &&
- Object.keys(this._resubscribeTopics).length > 0) {
- if (this.options.resubscribe) {
- this._resubscribeTopics.resubscribe = true
- this.subscribe(this._resubscribeTopics)
- } else {
- this._resubscribeTopics = {}
- }
- }
-
- firstConnection = false
- })
-
- // Clear ping timer
- this.on('close', function () {
- if (that.pingTimer !== null) {
- that.pingTimer.clear()
- that.pingTimer = null
- }
- })
-
- // Setup reconnect timer on disconnect
- this.on('close', this._setupReconnect)
-
- events.EventEmitter.call(this)
-
- this._setupStream()
- }
- inherits(MqttClient, events.EventEmitter)
-
- /**
- * setup the event handlers in the inner stream.
- *
- * @api private
- */
- MqttClient.prototype._setupStream = function () {
- var connectPacket
- var that = this
- var writable = new Writable()
- var parser = mqttPacket.parser(this.options)
- var completeParse = null
- var packets = []
-
- this._clearReconnect()
-
- this.stream = this.streamBuilder(this)
-
- parser.on('packet', function (packet) {
- packets.push(packet)
- })
-
- function nextTickWork () {
- process.nextTick(work)
- }
-
- function work () {
- var packet = packets.shift()
- var done = completeParse
-
- if (packet) {
- that._handlePacket(packet, nextTickWork)
- } else {
- completeParse = null
- done()
- }
- }
-
- writable._write = function (buf, enc, done) {
- completeParse = done
- parser.parse(buf)
- work()
- }
-
- this.stream.pipe(writable)
-
- // Suppress connection errors
- this.stream.on('error', nop)
-
- // Echo stream close
- eos(this.stream, this.emit.bind(this, 'close'))
-
- // Send a connect packet
- connectPacket = Object.create(this.options)
- connectPacket.cmd = 'connect'
- // avoid message queue
- sendPacket(this, connectPacket)
-
- // Echo connection errors
- parser.on('error', this.emit.bind(this, 'error'))
-
- // many drain listeners are needed for qos 1 callbacks if the connection is intermittent
- this.stream.setMaxListeners(1000)
-
- clearTimeout(this.connackTimer)
- this.connackTimer = setTimeout(function () {
- that._cleanUp(true)
- }, this.options.connectTimeout)
- }
-
- MqttClient.prototype._handlePacket = function (packet, done) {
- this.emit('packetreceive', packet)
-
- switch (packet.cmd) {
- case 'publish':
- this._handlePublish(packet, done)
- break
- case 'puback':
- case 'pubrec':
- case 'pubcomp':
- case 'suback':
- case 'unsuback':
- this._handleAck(packet)
- done()
- break
- case 'pubrel':
- this._handlePubrel(packet, done)
- break
- case 'connack':
- this._handleConnack(packet)
- done()
- break
- case 'pingresp':
- this._handlePingresp(packet)
- done()
- break
- default:
- // do nothing
- // maybe we should do an error handling
- // or just log it
- break
- }
- }
-
- MqttClient.prototype._checkDisconnecting = function (callback) {
- if (this.disconnecting) {
- if (callback) {
- callback(new Error('client disconnecting'))
- } else {
- this.emit('error', new Error('client disconnecting'))
- }
- }
- return this.disconnecting
- }
-
- /**
- * publish - publish <message> to <topic>
- *
- * @param {String} topic - topic to publish to
- * @param {String, Buffer} message - message to publish
- * @param {Object} [opts] - publish options, includes:
- * {Number} qos - qos level to publish on
- * {Boolean} retain - whether or not to retain the message
- * {Boolean} dup - whether or not mark a message as duplicate
- * @param {Function} [callback] - function(err){}
- * called when publish succeeds or fails
- * @returns {MqttClient} this - for chaining
- * @api public
- *
- * @example client.publish('topic', 'message');
- * @example
- * client.publish('topic', 'message', {qos: 1, retain: true, dup: true});
- * @example client.publish('topic', 'message', console.log);
- */
- MqttClient.prototype.publish = function (topic, message, opts, callback) {
- var packet
-
- // .publish(topic, payload, cb);
- if (typeof opts === 'function') {
- callback = opts
- opts = null
- }
-
- // default opts
- var defaultOpts = {qos: 0, retain: false, dup: false}
- opts = xtend(defaultOpts, opts)
-
- if (this._checkDisconnecting(callback)) {
- return this
- }
-
- packet = {
- cmd: 'publish',
- topic: topic,
- payload: message,
- qos: opts.qos,
- retain: opts.retain,
- messageId: this._nextId(),
- dup: opts.dup
- }
-
- switch (opts.qos) {
- case 1:
- case 2:
-
- // Add to callbacks
- this.outgoing[packet.messageId] = callback || nop
- this._sendPacket(packet)
- break
- default:
- this._sendPacket(packet, callback)
- break
- }
-
- return this
- }
-
- /**
- * subscribe - subscribe to <topic>
- *
- * @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos}
- * @param {Object} [opts] - optional subscription options, includes:
- * {Number} qos - subscribe qos level
- * @param {Function} [callback] - function(err, granted){} where:
- * {Error} err - subscription error (none at the moment!)
- * {Array} granted - array of {topic: 't', qos: 0}
- * @returns {MqttClient} this - for chaining
- * @api public
- * @example client.subscribe('topic');
- * @example client.subscribe('topic', {qos: 1});
- * @example client.subscribe({'topic': 0, 'topic2': 1}, console.log);
- * @example client.subscribe('topic', console.log);
- */
- MqttClient.prototype.subscribe = function () {
- var packet
- var args = Array.prototype.slice.call(arguments)
- var subs = []
- var obj = args.shift()
- var resubscribe = obj.resubscribe
- var callback = args.pop() || nop
- var opts = args.pop()
- var invalidTopic
- var that = this
-
- delete obj.resubscribe
-
- if (typeof obj === 'string') {
- obj = [obj]
- }
-
- if (typeof callback !== 'function') {
- opts = callback
- callback = nop
- }
-
- invalidTopic = validations.validateTopics(obj)
- if (invalidTopic !== null) {
- setImmediate(callback, new Error('Invalid topic ' + invalidTopic))
- return this
- }
-
- if (this._checkDisconnecting(callback)) {
- return this
- }
-
- var defaultOpts = { qos: 0 }
- opts = xtend(defaultOpts, opts)
-
- if (Array.isArray(obj)) {
- obj.forEach(function (topic) {
- if (that._resubscribeTopics[topic] < opts.qos ||
- !that._resubscribeTopics.hasOwnProperty(topic) ||
- resubscribe) {
- subs.push({
- topic: topic,
- qos: opts.qos
- })
- }
- })
- } else {
- Object
- .keys(obj)
- .forEach(function (k) {
- if (that._resubscribeTopics[k] < obj[k] ||
- !that._resubscribeTopics.hasOwnProperty(k) ||
- resubscribe) {
- subs.push({
- topic: k,
- qos: obj[k]
- })
- }
- })
- }
-
- packet = {
- cmd: 'subscribe',
- subscriptions: subs,
- qos: 1,
- retain: false,
- dup: false,
- messageId: this._nextId()
- }
-
- if (!subs.length) {
- callback(null, [])
- return
- }
-
- // subscriptions to resubscribe to in case of disconnect
- if (this.options.resubscribe) {
- var topics = []
- subs.forEach(function (sub) {
- if (that.options.reconnectPeriod > 0) {
- that._resubscribeTopics[sub.topic] = sub.qos
- topics.push(sub.topic)
- }
- })
- that.messageIdToTopic[packet.messageId] = topics
- }
-
- this.outgoing[packet.messageId] = function (err, packet) {
- if (!err) {
- var granted = packet.granted
- for (var i = 0; i < granted.length; i += 1) {
- subs[i].qos = granted[i]
- }
- }
-
- callback(err, subs)
- }
-
- this._sendPacket(packet)
-
- return this
- }
-
- /**
- * unsubscribe - unsubscribe from topic(s)
- *
- * @param {String, Array} topic - topics to unsubscribe from
- * @param {Function} [callback] - callback fired on unsuback
- * @returns {MqttClient} this - for chaining
- * @api public
- * @example client.unsubscribe('topic');
- * @example client.unsubscribe('topic', console.log);
- */
- MqttClient.prototype.unsubscribe = function (topic, callback) {
- var packet = {
- cmd: 'unsubscribe',
- qos: 1,
- messageId: this._nextId()
- }
- var that = this
-
- callback = callback || nop
-
- if (this._checkDisconnecting(callback)) {
- return this
- }
-
- if (typeof topic === 'string') {
- packet.unsubscriptions = [topic]
- } else if (typeof topic === 'object' && topic.length) {
- packet.unsubscriptions = topic
- }
-
- if (this.options.resubscribe) {
- packet.unsubscriptions.forEach(function (topic) {
- delete that._resubscribeTopics[topic]
- })
- }
-
- this.outgoing[packet.messageId] = callback
-
- this._sendPacket(packet)
-
- return this
- }
-
- /**
- * end - close connection
- *
- * @returns {MqttClient} this - for chaining
- * @param {Boolean} force - do not wait for all in-flight messages to be acked
- * @param {Function} cb - called when the client has been closed
- *
- * @api public
- */
- MqttClient.prototype.end = function (force, cb) {
- var that = this
-
- if (typeof force === 'function') {
- cb = force
- force = false
- }
-
- function closeStores () {
- that.disconnected = true
- that.incomingStore.close(function () {
- that.outgoingStore.close(function () {
- if (cb) {
- cb.apply(null, arguments)
- }
- that.emit('end')
- })
- })
- if (that._deferredReconnect) {
- that._deferredReconnect()
- }
- }
-
- function finish () {
- // defer closesStores of an I/O cycle,
- // just to make sure things are
- // ok for websockets
- that._cleanUp(force, setImmediate.bind(null, closeStores))
- }
-
- if (this.disconnecting) {
- return this
- }
-
- this._clearReconnect()
-
- this.disconnecting = true
-
- if (!force && Object.keys(this.outgoing).length > 0) {
- // wait 10ms, just to be sure we received all of it
- this.once('outgoingEmpty', setTimeout.bind(null, finish, 10))
- } else {
- finish()
- }
-
- return this
- }
-
- /**
- * removeOutgoingMessage - remove a message in outgoing store
- * the outgoing callback will be called withe Error('Message removed') if the message is removed
- *
- * @param {Number} mid - messageId to remove message
- * @returns {MqttClient} this - for chaining
- * @api public
- *
- * @example client.removeOutgoingMessage(client.getLastMessageId());
- */
- MqttClient.prototype.removeOutgoingMessage = function (mid) {
- var cb = this.outgoing[mid]
- delete this.outgoing[mid]
- this.outgoingStore.del({messageId: mid}, function () {
- cb(new Error('Message removed'))
- })
- return this
- }
-
- /**
- * reconnect - connect again using the same options as connect()
- *
- * @param {Object} [opts] - optional reconnect options, includes:
- * {Store} incomingStore - a store for the incoming packets
- * {Store} outgoingStore - a store for the outgoing packets
- * if opts is not given, current stores are used
- * @returns {MqttClient} this - for chaining
- *
- * @api public
- */
- MqttClient.prototype.reconnect = function (opts) {
- var that = this
- var f = function () {
- if (opts) {
- that.options.incomingStore = opts.incomingStore
- that.options.outgoingStore = opts.outgoingStore
- } else {
- that.options.incomingStore = null
- that.options.outgoingStore = null
- }
- that.incomingStore = that.options.incomingStore || new Store()
- that.outgoingStore = that.options.outgoingStore || new Store()
- that.disconnecting = false
- that.disconnected = false
- that._deferredReconnect = null
- that._reconnect()
- }
-
- if (this.disconnecting && !this.disconnected) {
- this._deferredReconnect = f
- } else {
- f()
- }
- return this
- }
-
- /**
- * _reconnect - implement reconnection
- * @api privateish
- */
- MqttClient.prototype._reconnect = function () {
- this.emit('reconnect')
- this._setupStream()
- }
-
- /**
- * _setupReconnect - setup reconnect timer
- */
- MqttClient.prototype._setupReconnect = function () {
- var that = this
-
- if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) {
- if (!this.reconnecting) {
- this.emit('offline')
- this.reconnecting = true
- }
- that.reconnectTimer = setInterval(function () {
- that._reconnect()
- }, that.options.reconnectPeriod)
- }
- }
-
- /**
- * _clearReconnect - clear the reconnect timer
- */
- MqttClient.prototype._clearReconnect = function () {
- if (this.reconnectTimer) {
- clearInterval(this.reconnectTimer)
- this.reconnectTimer = null
- }
- }
-
- /**
- * _cleanUp - clean up on connection end
- * @api private
- */
- MqttClient.prototype._cleanUp = function (forced, done) {
- if (done) {
- this.stream.on('close', done)
- }
-
- if (forced) {
- if ((this.options.reconnectPeriod === 0) && this.options.clean) {
- flush(this.outgoing)
- }
- this.stream.destroy()
- } else {
- this._sendPacket(
- { cmd: 'disconnect' },
- setImmediate.bind(
- null,
- this.stream.end.bind(this.stream)
- )
- )
- }
-
- if (!this.disconnecting) {
- this._clearReconnect()
- this._setupReconnect()
- }
-
- if (this.pingTimer !== null) {
- this.pingTimer.clear()
- this.pingTimer = null
- }
-
- if (done && !this.connected) {
- this.stream.removeListener('close', done)
- done()
- }
- }
-
- /**
- * _sendPacket - send or queue a packet
- * @param {String} type - packet type (see `protocol`)
- * @param {Object} packet - packet options
- * @param {Function} cb - callback when the packet is sent
- * @api private
- */
- MqttClient.prototype._sendPacket = function (packet, cb) {
- if (!this.connected) {
- if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
- this.queue.push({ packet: packet, cb: cb })
- } else if (packet.qos > 0) {
- cb = this.outgoing[packet.messageId]
- this.outgoingStore.put(packet, function (err) {
- if (err) {
- return cb && cb(err)
- }
- })
- } else if (cb) {
- cb(new Error('No connection to broker'))
- }
-
- return
- }
-
- // When sending a packet, reschedule the ping timer
- this._shiftPingInterval()
-
- if (packet.cmd !== 'publish') {
- sendPacket(this, packet, cb)
- return
- }
-
- switch (packet.qos) {
- case 2:
- case 1:
- storeAndSend(this, packet, cb)
- break
- /**
- * no need of case here since it will be caught by default
- * and jshint comply that before default it must be a break
- * anyway it will result in -1 evaluation
- */
- case 0:
- /* falls through */
- default:
- sendPacket(this, packet, cb)
- break
- }
- }
-
- /**
- * _setupPingTimer - setup the ping timer
- *
- * @api private
- */
- MqttClient.prototype._setupPingTimer = function () {
- var that = this
-
- if (!this.pingTimer && this.options.keepalive) {
- this.pingResp = true
- this.pingTimer = reInterval(function () {
- that._checkPing()
- }, this.options.keepalive * 1000)
- }
- }
-
- /**
- * _shiftPingInterval - reschedule the ping interval
- *
- * @api private
- */
- MqttClient.prototype._shiftPingInterval = function () {
- if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) {
- this.pingTimer.reschedule(this.options.keepalive * 1000)
- }
- }
- /**
- * _checkPing - check if a pingresp has come back, and ping the server again
- *
- * @api private
- */
- MqttClient.prototype._checkPing = function () {
- if (this.pingResp) {
- this.pingResp = false
- this._sendPacket({ cmd: 'pingreq' })
- } else {
- // do a forced cleanup since socket will be in bad shape
- this._cleanUp(true)
- }
- }
-
- /**
- * _handlePingresp - handle a pingresp
- *
- * @api private
- */
- MqttClient.prototype._handlePingresp = function () {
- this.pingResp = true
- }
-
- /**
- * _handleConnack
- *
- * @param {Object} packet
- * @api private
- */
-
- MqttClient.prototype._handleConnack = function (packet) {
- var rc = packet.returnCode
- var errors = [
- '',
- 'Unacceptable protocol version',
- 'Identifier rejected',
- 'Server unavailable',
- 'Bad username or password',
- 'Not authorized'
- ]
-
- clearTimeout(this.connackTimer)
-
- if (rc === 0) {
- this.reconnecting = false
- this.emit('connect', packet)
- } else if (rc > 0) {
- var err = new Error('Connection refused: ' + errors[rc])
- err.code = rc
- this.emit('error', err)
- }
- }
-
- /**
- * _handlePublish
- *
- * @param {Object} packet
- * @api private
- */
- /*
- those late 2 case should be rewrite to comply with coding style:
-
- case 1:
- case 0:
- // do not wait sending a puback
- // no callback passed
- if (1 === qos) {
- this._sendPacket({
- cmd: 'puback',
- messageId: mid
- });
- }
- // emit the message event for both qos 1 and 0
- this.emit('message', topic, message, packet);
- this.handleMessage(packet, done);
- break;
- default:
- // do nothing but every switch mus have a default
- // log or throw an error about unknown qos
- break;
-
- for now i just suppressed the warnings
- */
- MqttClient.prototype._handlePublish = function (packet, done) {
- var topic = packet.topic.toString()
- var message = packet.payload
- var qos = packet.qos
- var mid = packet.messageId
- var that = this
-
- switch (qos) {
- case 2:
- this.incomingStore.put(packet, function () {
- that._sendPacket({cmd: 'pubrec', messageId: mid}, done)
- })
- break
- case 1:
- // emit the message event
- this.emit('message', topic, message, packet)
- this.handleMessage(packet, function (err) {
- if (err) {
- return done && done(err)
- }
- // send 'puback' if the above 'handleMessage' method executed
- // successfully.
- that._sendPacket({cmd: 'puback', messageId: mid}, done)
- })
- break
- case 0:
- // emit the message event
- this.emit('message', topic, message, packet)
- this.handleMessage(packet, done)
- break
- default:
- // do nothing
- // log or throw an error about unknown qos
- break
- }
- }
-
- /**
- * Handle messages with backpressure support, one at a time.
- * Override at will.
- *
- * @param Packet packet the packet
- * @param Function callback call when finished
- * @api public
- */
- MqttClient.prototype.handleMessage = function (packet, callback) {
- callback()
- }
-
- /**
- * _handleAck
- *
- * @param {Object} packet
- * @api private
- */
-
- MqttClient.prototype._handleAck = function (packet) {
- /* eslint no-fallthrough: "off" */
- var mid = packet.messageId
- var type = packet.cmd
- var response = null
- var cb = this.outgoing[mid]
- var that = this
-
- if (!cb) {
- // Server sent an ack in error, ignore it.
- return
- }
-
- // Process
- switch (type) {
- case 'pubcomp':
- // same thing as puback for QoS 2
- case 'puback':
- // Callback - we're done
- delete this.outgoing[mid]
- this.outgoingStore.del(packet, cb)
- break
- case 'pubrec':
- response = {
- cmd: 'pubrel',
- qos: 2,
- messageId: mid
- }
-
- this._sendPacket(response)
- break
- case 'suback':
- delete this.outgoing[mid]
- if (packet.granted.length === 1 && (packet.granted[0] & 0x80) !== 0) {
- // suback with Failure status
- var topics = this.messageIdToTopic[mid]
- if (topics) {
- topics.forEach(function (topic) {
- delete that._resubscribeTopics[topic]
- })
- }
- }
- cb(null, packet)
- break
- case 'unsuback':
- delete this.outgoing[mid]
- cb(null)
- break
- default:
- that.emit('error', new Error('unrecognized packet type'))
- }
-
- if (this.disconnecting &&
- Object.keys(this.outgoing).length === 0) {
- this.emit('outgoingEmpty')
- }
- }
-
- /**
- * _handlePubrel
- *
- * @param {Object} packet
- * @api private
- */
- MqttClient.prototype._handlePubrel = function (packet, callback) {
- var mid = packet.messageId
- var that = this
-
- var comp = {cmd: 'pubcomp', messageId: mid}
-
- that.incomingStore.get(packet, function (err, pub) {
- if (!err && pub.cmd !== 'pubrel') {
- that.emit('message', pub.topic, pub.payload, pub)
- that.incomingStore.put(packet)
- that.handleMessage(pub, function (err) {
- if (err) {
- return callback && callback(err)
- }
- that._sendPacket(comp, callback)
- })
- } else {
- that._sendPacket(comp, callback)
- }
- })
- }
-
- /**
- * _nextId
- */
- MqttClient.prototype._nextId = function () {
- var id = this.nextId++
- // Ensure 16 bit unsigned int:
- if (id === 65535) {
- this.nextId = 1
- }
- return id
- }
-
- /**
- * getLastMessageId
- */
- MqttClient.prototype.getLastMessageId = function () {
- return (this.nextId === 1) ? 65535 : (this.nextId - 1)
- }
-
- module.exports = MqttClient
|