wechat wss demo

client.js 25KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091
  1. 'use strict'
  2. /**
  3. * Module dependencies
  4. */
  5. var events = require('events')
  6. var Store = require('./store')
  7. var eos = require('end-of-stream')
  8. var mqttPacket = require('mqtt-packet')
  9. var Writable = require('readable-stream').Writable
  10. var inherits = require('inherits')
  11. var reInterval = require('reinterval')
  12. var validations = require('./validations')
  13. var xtend = require('xtend')
  14. var setImmediate = global.setImmediate || function (callback) {
  15. // works in node v0.8
  16. process.nextTick(callback)
  17. }
  18. var defaultConnectOptions = {
  19. keepalive: 60,
  20. reschedulePings: true,
  21. protocolId: 'MQTT',
  22. protocolVersion: 4,
  23. reconnectPeriod: 1000,
  24. connectTimeout: 30 * 1000,
  25. clean: true,
  26. resubscribe: true
  27. }
  28. function defaultId () {
  29. return 'mqttjs_' + Math.random().toString(16).substr(2, 8)
  30. }
  31. function sendPacket (client, packet, cb) {
  32. client.emit('packetsend', packet)
  33. var result = mqttPacket.writeToStream(packet, client.stream)
  34. if (!result && cb) {
  35. client.stream.once('drain', cb)
  36. } else if (cb) {
  37. cb()
  38. }
  39. }
  40. function flush (queue) {
  41. if (queue) {
  42. Object.keys(queue).forEach(function (messageId) {
  43. if (typeof queue[messageId] === 'function') {
  44. queue[messageId](new Error('Connection closed'))
  45. delete queue[messageId]
  46. }
  47. })
  48. }
  49. }
  50. function storeAndSend (client, packet, cb) {
  51. client.outgoingStore.put(packet, function storedPacket (err) {
  52. if (err) {
  53. return cb && cb(err)
  54. }
  55. sendPacket(client, packet, cb)
  56. })
  57. }
  58. function nop () {}
  59. /**
  60. * MqttClient constructor
  61. *
  62. * @param {Stream} stream - stream
  63. * @param {Object} [options] - connection options
  64. * (see Connection#connect)
  65. */
  66. function MqttClient (streamBuilder, options) {
  67. var k
  68. var that = this
  69. if (!(this instanceof MqttClient)) {
  70. return new MqttClient(streamBuilder, options)
  71. }
  72. this.options = options || {}
  73. // Defaults
  74. for (k in defaultConnectOptions) {
  75. if (typeof this.options[k] === 'undefined') {
  76. this.options[k] = defaultConnectOptions[k]
  77. } else {
  78. this.options[k] = options[k]
  79. }
  80. }
  81. this.options.clientId = (typeof this.options.clientId === 'string') ? this.options.clientId : defaultId()
  82. this.streamBuilder = streamBuilder
  83. // Inflight message storages
  84. this.outgoingStore = this.options.outgoingStore || new Store()
  85. this.incomingStore = this.options.incomingStore || new Store()
  86. // Should QoS zero messages be queued when the connection is broken?
  87. this.queueQoSZero = this.options.queueQoSZero === undefined ? true : this.options.queueQoSZero
  88. // map of subscribed topics to support reconnection
  89. this._resubscribeTopics = {}
  90. // map of a subscribe messageId and a topic
  91. this.messageIdToTopic = {}
  92. // Ping timer, setup in _setupPingTimer
  93. this.pingTimer = null
  94. // Is the client connected?
  95. this.connected = false
  96. // Are we disconnecting?
  97. this.disconnecting = false
  98. // Packet queue
  99. this.queue = []
  100. // connack timer
  101. this.connackTimer = null
  102. // Reconnect timer
  103. this.reconnectTimer = null
  104. // MessageIDs starting with 1
  105. this.nextId = Math.floor(Math.random() * 65535)
  106. // Inflight callbacks
  107. this.outgoing = {}
  108. // Mark connected on connect
  109. this.on('connect', function () {
  110. if (this.disconnected) {
  111. return
  112. }
  113. this.connected = true
  114. var outStore = null
  115. outStore = this.outgoingStore.createStream()
  116. // Control of stored messages
  117. outStore.once('readable', function () {
  118. function storeDeliver () {
  119. var packet = outStore.read(1)
  120. var cb
  121. if (!packet) {
  122. return
  123. }
  124. // Avoid unnecessary stream read operations when disconnected
  125. if (!that.disconnecting && !that.reconnectTimer && that.options.reconnectPeriod > 0) {
  126. outStore.read(0)
  127. cb = that.outgoing[packet.messageId]
  128. that.outgoing[packet.messageId] = function (err, status) {
  129. // Ensure that the original callback passed in to publish gets invoked
  130. if (cb) {
  131. cb(err, status)
  132. }
  133. storeDeliver()
  134. }
  135. that._sendPacket(packet)
  136. } else if (outStore.destroy) {
  137. outStore.destroy()
  138. }
  139. }
  140. storeDeliver()
  141. }).on('error', this.emit.bind(this, 'error'))
  142. })
  143. // Mark disconnected on stream close
  144. this.on('close', function () {
  145. this.connected = false
  146. clearTimeout(this.connackTimer)
  147. })
  148. // Setup ping timer
  149. this.on('connect', this._setupPingTimer)
  150. // Send queued packets
  151. this.on('connect', function () {
  152. var queue = this.queue
  153. function deliver () {
  154. var entry = queue.shift()
  155. var packet = null
  156. if (!entry) {
  157. return
  158. }
  159. packet = entry.packet
  160. that._sendPacket(
  161. packet,
  162. function (err) {
  163. if (entry.cb) {
  164. entry.cb(err)
  165. }
  166. deliver()
  167. }
  168. )
  169. }
  170. deliver()
  171. })
  172. var firstConnection = true
  173. // resubscribe
  174. this.on('connect', function () {
  175. if (!firstConnection &&
  176. this.options.clean &&
  177. Object.keys(this._resubscribeTopics).length > 0) {
  178. if (this.options.resubscribe) {
  179. this._resubscribeTopics.resubscribe = true
  180. this.subscribe(this._resubscribeTopics)
  181. } else {
  182. this._resubscribeTopics = {}
  183. }
  184. }
  185. firstConnection = false
  186. })
  187. // Clear ping timer
  188. this.on('close', function () {
  189. if (that.pingTimer !== null) {
  190. that.pingTimer.clear()
  191. that.pingTimer = null
  192. }
  193. })
  194. // Setup reconnect timer on disconnect
  195. this.on('close', this._setupReconnect)
  196. events.EventEmitter.call(this)
  197. this._setupStream()
  198. }
  199. inherits(MqttClient, events.EventEmitter)
  200. /**
  201. * setup the event handlers in the inner stream.
  202. *
  203. * @api private
  204. */
  205. MqttClient.prototype._setupStream = function () {
  206. var connectPacket
  207. var that = this
  208. var writable = new Writable()
  209. var parser = mqttPacket.parser(this.options)
  210. var completeParse = null
  211. var packets = []
  212. this._clearReconnect()
  213. this.stream = this.streamBuilder(this)
  214. parser.on('packet', function (packet) {
  215. packets.push(packet)
  216. })
  217. function nextTickWork () {
  218. process.nextTick(work)
  219. }
  220. function work () {
  221. var packet = packets.shift()
  222. var done = completeParse
  223. if (packet) {
  224. that._handlePacket(packet, nextTickWork)
  225. } else {
  226. completeParse = null
  227. done()
  228. }
  229. }
  230. writable._write = function (buf, enc, done) {
  231. completeParse = done
  232. parser.parse(buf)
  233. work()
  234. }
  235. this.stream.pipe(writable)
  236. // Suppress connection errors
  237. this.stream.on('error', nop)
  238. // Echo stream close
  239. eos(this.stream, this.emit.bind(this, 'close'))
  240. // Send a connect packet
  241. connectPacket = Object.create(this.options)
  242. connectPacket.cmd = 'connect'
  243. // avoid message queue
  244. sendPacket(this, connectPacket)
  245. // Echo connection errors
  246. parser.on('error', this.emit.bind(this, 'error'))
  247. // many drain listeners are needed for qos 1 callbacks if the connection is intermittent
  248. this.stream.setMaxListeners(1000)
  249. clearTimeout(this.connackTimer)
  250. this.connackTimer = setTimeout(function () {
  251. that._cleanUp(true)
  252. }, this.options.connectTimeout)
  253. }
  254. MqttClient.prototype._handlePacket = function (packet, done) {
  255. this.emit('packetreceive', packet)
  256. switch (packet.cmd) {
  257. case 'publish':
  258. this._handlePublish(packet, done)
  259. break
  260. case 'puback':
  261. case 'pubrec':
  262. case 'pubcomp':
  263. case 'suback':
  264. case 'unsuback':
  265. this._handleAck(packet)
  266. done()
  267. break
  268. case 'pubrel':
  269. this._handlePubrel(packet, done)
  270. break
  271. case 'connack':
  272. this._handleConnack(packet)
  273. done()
  274. break
  275. case 'pingresp':
  276. this._handlePingresp(packet)
  277. done()
  278. break
  279. default:
  280. // do nothing
  281. // maybe we should do an error handling
  282. // or just log it
  283. break
  284. }
  285. }
  286. MqttClient.prototype._checkDisconnecting = function (callback) {
  287. if (this.disconnecting) {
  288. if (callback) {
  289. callback(new Error('client disconnecting'))
  290. } else {
  291. this.emit('error', new Error('client disconnecting'))
  292. }
  293. }
  294. return this.disconnecting
  295. }
  296. /**
  297. * publish - publish <message> to <topic>
  298. *
  299. * @param {String} topic - topic to publish to
  300. * @param {String, Buffer} message - message to publish
  301. * @param {Object} [opts] - publish options, includes:
  302. * {Number} qos - qos level to publish on
  303. * {Boolean} retain - whether or not to retain the message
  304. * {Boolean} dup - whether or not mark a message as duplicate
  305. * @param {Function} [callback] - function(err){}
  306. * called when publish succeeds or fails
  307. * @returns {MqttClient} this - for chaining
  308. * @api public
  309. *
  310. * @example client.publish('topic', 'message');
  311. * @example
  312. * client.publish('topic', 'message', {qos: 1, retain: true, dup: true});
  313. * @example client.publish('topic', 'message', console.log);
  314. */
  315. MqttClient.prototype.publish = function (topic, message, opts, callback) {
  316. var packet
  317. // .publish(topic, payload, cb);
  318. if (typeof opts === 'function') {
  319. callback = opts
  320. opts = null
  321. }
  322. // default opts
  323. var defaultOpts = {qos: 0, retain: false, dup: false}
  324. opts = xtend(defaultOpts, opts)
  325. if (this._checkDisconnecting(callback)) {
  326. return this
  327. }
  328. packet = {
  329. cmd: 'publish',
  330. topic: topic,
  331. payload: message,
  332. qos: opts.qos,
  333. retain: opts.retain,
  334. messageId: this._nextId(),
  335. dup: opts.dup
  336. }
  337. switch (opts.qos) {
  338. case 1:
  339. case 2:
  340. // Add to callbacks
  341. this.outgoing[packet.messageId] = callback || nop
  342. this._sendPacket(packet)
  343. break
  344. default:
  345. this._sendPacket(packet, callback)
  346. break
  347. }
  348. return this
  349. }
  350. /**
  351. * subscribe - subscribe to <topic>
  352. *
  353. * @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos}
  354. * @param {Object} [opts] - optional subscription options, includes:
  355. * {Number} qos - subscribe qos level
  356. * @param {Function} [callback] - function(err, granted){} where:
  357. * {Error} err - subscription error (none at the moment!)
  358. * {Array} granted - array of {topic: 't', qos: 0}
  359. * @returns {MqttClient} this - for chaining
  360. * @api public
  361. * @example client.subscribe('topic');
  362. * @example client.subscribe('topic', {qos: 1});
  363. * @example client.subscribe({'topic': 0, 'topic2': 1}, console.log);
  364. * @example client.subscribe('topic', console.log);
  365. */
  366. MqttClient.prototype.subscribe = function () {
  367. var packet
  368. var args = Array.prototype.slice.call(arguments)
  369. var subs = []
  370. var obj = args.shift()
  371. var resubscribe = obj.resubscribe
  372. var callback = args.pop() || nop
  373. var opts = args.pop()
  374. var invalidTopic
  375. var that = this
  376. delete obj.resubscribe
  377. if (typeof obj === 'string') {
  378. obj = [obj]
  379. }
  380. if (typeof callback !== 'function') {
  381. opts = callback
  382. callback = nop
  383. }
  384. invalidTopic = validations.validateTopics(obj)
  385. if (invalidTopic !== null) {
  386. setImmediate(callback, new Error('Invalid topic ' + invalidTopic))
  387. return this
  388. }
  389. if (this._checkDisconnecting(callback)) {
  390. return this
  391. }
  392. var defaultOpts = { qos: 0 }
  393. opts = xtend(defaultOpts, opts)
  394. if (Array.isArray(obj)) {
  395. obj.forEach(function (topic) {
  396. if (that._resubscribeTopics[topic] < opts.qos ||
  397. !that._resubscribeTopics.hasOwnProperty(topic) ||
  398. resubscribe) {
  399. subs.push({
  400. topic: topic,
  401. qos: opts.qos
  402. })
  403. }
  404. })
  405. } else {
  406. Object
  407. .keys(obj)
  408. .forEach(function (k) {
  409. if (that._resubscribeTopics[k] < obj[k] ||
  410. !that._resubscribeTopics.hasOwnProperty(k) ||
  411. resubscribe) {
  412. subs.push({
  413. topic: k,
  414. qos: obj[k]
  415. })
  416. }
  417. })
  418. }
  419. packet = {
  420. cmd: 'subscribe',
  421. subscriptions: subs,
  422. qos: 1,
  423. retain: false,
  424. dup: false,
  425. messageId: this._nextId()
  426. }
  427. if (!subs.length) {
  428. callback(null, [])
  429. return
  430. }
  431. // subscriptions to resubscribe to in case of disconnect
  432. if (this.options.resubscribe) {
  433. var topics = []
  434. subs.forEach(function (sub) {
  435. if (that.options.reconnectPeriod > 0) {
  436. that._resubscribeTopics[sub.topic] = sub.qos
  437. topics.push(sub.topic)
  438. }
  439. })
  440. that.messageIdToTopic[packet.messageId] = topics
  441. }
  442. this.outgoing[packet.messageId] = function (err, packet) {
  443. if (!err) {
  444. var granted = packet.granted
  445. for (var i = 0; i < granted.length; i += 1) {
  446. subs[i].qos = granted[i]
  447. }
  448. }
  449. callback(err, subs)
  450. }
  451. this._sendPacket(packet)
  452. return this
  453. }
  454. /**
  455. * unsubscribe - unsubscribe from topic(s)
  456. *
  457. * @param {String, Array} topic - topics to unsubscribe from
  458. * @param {Function} [callback] - callback fired on unsuback
  459. * @returns {MqttClient} this - for chaining
  460. * @api public
  461. * @example client.unsubscribe('topic');
  462. * @example client.unsubscribe('topic', console.log);
  463. */
  464. MqttClient.prototype.unsubscribe = function (topic, callback) {
  465. var packet = {
  466. cmd: 'unsubscribe',
  467. qos: 1,
  468. messageId: this._nextId()
  469. }
  470. var that = this
  471. callback = callback || nop
  472. if (this._checkDisconnecting(callback)) {
  473. return this
  474. }
  475. if (typeof topic === 'string') {
  476. packet.unsubscriptions = [topic]
  477. } else if (typeof topic === 'object' && topic.length) {
  478. packet.unsubscriptions = topic
  479. }
  480. if (this.options.resubscribe) {
  481. packet.unsubscriptions.forEach(function (topic) {
  482. delete that._resubscribeTopics[topic]
  483. })
  484. }
  485. this.outgoing[packet.messageId] = callback
  486. this._sendPacket(packet)
  487. return this
  488. }
  489. /**
  490. * end - close connection
  491. *
  492. * @returns {MqttClient} this - for chaining
  493. * @param {Boolean} force - do not wait for all in-flight messages to be acked
  494. * @param {Function} cb - called when the client has been closed
  495. *
  496. * @api public
  497. */
  498. MqttClient.prototype.end = function (force, cb) {
  499. var that = this
  500. if (typeof force === 'function') {
  501. cb = force
  502. force = false
  503. }
  504. function closeStores () {
  505. that.disconnected = true
  506. that.incomingStore.close(function () {
  507. that.outgoingStore.close(function () {
  508. if (cb) {
  509. cb.apply(null, arguments)
  510. }
  511. that.emit('end')
  512. })
  513. })
  514. if (that._deferredReconnect) {
  515. that._deferredReconnect()
  516. }
  517. }
  518. function finish () {
  519. // defer closesStores of an I/O cycle,
  520. // just to make sure things are
  521. // ok for websockets
  522. that._cleanUp(force, setImmediate.bind(null, closeStores))
  523. }
  524. if (this.disconnecting) {
  525. return this
  526. }
  527. this._clearReconnect()
  528. this.disconnecting = true
  529. if (!force && Object.keys(this.outgoing).length > 0) {
  530. // wait 10ms, just to be sure we received all of it
  531. this.once('outgoingEmpty', setTimeout.bind(null, finish, 10))
  532. } else {
  533. finish()
  534. }
  535. return this
  536. }
  537. /**
  538. * removeOutgoingMessage - remove a message in outgoing store
  539. * the outgoing callback will be called withe Error('Message removed') if the message is removed
  540. *
  541. * @param {Number} mid - messageId to remove message
  542. * @returns {MqttClient} this - for chaining
  543. * @api public
  544. *
  545. * @example client.removeOutgoingMessage(client.getLastMessageId());
  546. */
  547. MqttClient.prototype.removeOutgoingMessage = function (mid) {
  548. var cb = this.outgoing[mid]
  549. delete this.outgoing[mid]
  550. this.outgoingStore.del({messageId: mid}, function () {
  551. cb(new Error('Message removed'))
  552. })
  553. return this
  554. }
  555. /**
  556. * reconnect - connect again using the same options as connect()
  557. *
  558. * @param {Object} [opts] - optional reconnect options, includes:
  559. * {Store} incomingStore - a store for the incoming packets
  560. * {Store} outgoingStore - a store for the outgoing packets
  561. * if opts is not given, current stores are used
  562. * @returns {MqttClient} this - for chaining
  563. *
  564. * @api public
  565. */
  566. MqttClient.prototype.reconnect = function (opts) {
  567. var that = this
  568. var f = function () {
  569. if (opts) {
  570. that.options.incomingStore = opts.incomingStore
  571. that.options.outgoingStore = opts.outgoingStore
  572. } else {
  573. that.options.incomingStore = null
  574. that.options.outgoingStore = null
  575. }
  576. that.incomingStore = that.options.incomingStore || new Store()
  577. that.outgoingStore = that.options.outgoingStore || new Store()
  578. that.disconnecting = false
  579. that.disconnected = false
  580. that._deferredReconnect = null
  581. that._reconnect()
  582. }
  583. if (this.disconnecting && !this.disconnected) {
  584. this._deferredReconnect = f
  585. } else {
  586. f()
  587. }
  588. return this
  589. }
  590. /**
  591. * _reconnect - implement reconnection
  592. * @api privateish
  593. */
  594. MqttClient.prototype._reconnect = function () {
  595. this.emit('reconnect')
  596. this._setupStream()
  597. }
  598. /**
  599. * _setupReconnect - setup reconnect timer
  600. */
  601. MqttClient.prototype._setupReconnect = function () {
  602. var that = this
  603. if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) {
  604. if (!this.reconnecting) {
  605. this.emit('offline')
  606. this.reconnecting = true
  607. }
  608. that.reconnectTimer = setInterval(function () {
  609. that._reconnect()
  610. }, that.options.reconnectPeriod)
  611. }
  612. }
  613. /**
  614. * _clearReconnect - clear the reconnect timer
  615. */
  616. MqttClient.prototype._clearReconnect = function () {
  617. if (this.reconnectTimer) {
  618. clearInterval(this.reconnectTimer)
  619. this.reconnectTimer = null
  620. }
  621. }
  622. /**
  623. * _cleanUp - clean up on connection end
  624. * @api private
  625. */
  626. MqttClient.prototype._cleanUp = function (forced, done) {
  627. if (done) {
  628. this.stream.on('close', done)
  629. }
  630. if (forced) {
  631. if ((this.options.reconnectPeriod === 0) && this.options.clean) {
  632. flush(this.outgoing)
  633. }
  634. this.stream.destroy()
  635. } else {
  636. this._sendPacket(
  637. { cmd: 'disconnect' },
  638. setImmediate.bind(
  639. null,
  640. this.stream.end.bind(this.stream)
  641. )
  642. )
  643. }
  644. if (!this.disconnecting) {
  645. this._clearReconnect()
  646. this._setupReconnect()
  647. }
  648. if (this.pingTimer !== null) {
  649. this.pingTimer.clear()
  650. this.pingTimer = null
  651. }
  652. if (done && !this.connected) {
  653. this.stream.removeListener('close', done)
  654. done()
  655. }
  656. }
  657. /**
  658. * _sendPacket - send or queue a packet
  659. * @param {String} type - packet type (see `protocol`)
  660. * @param {Object} packet - packet options
  661. * @param {Function} cb - callback when the packet is sent
  662. * @api private
  663. */
  664. MqttClient.prototype._sendPacket = function (packet, cb) {
  665. if (!this.connected) {
  666. if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
  667. this.queue.push({ packet: packet, cb: cb })
  668. } else if (packet.qos > 0) {
  669. cb = this.outgoing[packet.messageId]
  670. this.outgoingStore.put(packet, function (err) {
  671. if (err) {
  672. return cb && cb(err)
  673. }
  674. })
  675. } else if (cb) {
  676. cb(new Error('No connection to broker'))
  677. }
  678. return
  679. }
  680. // When sending a packet, reschedule the ping timer
  681. this._shiftPingInterval()
  682. if (packet.cmd !== 'publish') {
  683. sendPacket(this, packet, cb)
  684. return
  685. }
  686. switch (packet.qos) {
  687. case 2:
  688. case 1:
  689. storeAndSend(this, packet, cb)
  690. break
  691. /**
  692. * no need of case here since it will be caught by default
  693. * and jshint comply that before default it must be a break
  694. * anyway it will result in -1 evaluation
  695. */
  696. case 0:
  697. /* falls through */
  698. default:
  699. sendPacket(this, packet, cb)
  700. break
  701. }
  702. }
  703. /**
  704. * _setupPingTimer - setup the ping timer
  705. *
  706. * @api private
  707. */
  708. MqttClient.prototype._setupPingTimer = function () {
  709. var that = this
  710. if (!this.pingTimer && this.options.keepalive) {
  711. this.pingResp = true
  712. this.pingTimer = reInterval(function () {
  713. that._checkPing()
  714. }, this.options.keepalive * 1000)
  715. }
  716. }
  717. /**
  718. * _shiftPingInterval - reschedule the ping interval
  719. *
  720. * @api private
  721. */
  722. MqttClient.prototype._shiftPingInterval = function () {
  723. if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) {
  724. this.pingTimer.reschedule(this.options.keepalive * 1000)
  725. }
  726. }
  727. /**
  728. * _checkPing - check if a pingresp has come back, and ping the server again
  729. *
  730. * @api private
  731. */
  732. MqttClient.prototype._checkPing = function () {
  733. if (this.pingResp) {
  734. this.pingResp = false
  735. this._sendPacket({ cmd: 'pingreq' })
  736. } else {
  737. // do a forced cleanup since socket will be in bad shape
  738. this._cleanUp(true)
  739. }
  740. }
  741. /**
  742. * _handlePingresp - handle a pingresp
  743. *
  744. * @api private
  745. */
  746. MqttClient.prototype._handlePingresp = function () {
  747. this.pingResp = true
  748. }
  749. /**
  750. * _handleConnack
  751. *
  752. * @param {Object} packet
  753. * @api private
  754. */
  755. MqttClient.prototype._handleConnack = function (packet) {
  756. var rc = packet.returnCode
  757. var errors = [
  758. '',
  759. 'Unacceptable protocol version',
  760. 'Identifier rejected',
  761. 'Server unavailable',
  762. 'Bad username or password',
  763. 'Not authorized'
  764. ]
  765. clearTimeout(this.connackTimer)
  766. if (rc === 0) {
  767. this.reconnecting = false
  768. this.emit('connect', packet)
  769. } else if (rc > 0) {
  770. var err = new Error('Connection refused: ' + errors[rc])
  771. err.code = rc
  772. this.emit('error', err)
  773. }
  774. }
  775. /**
  776. * _handlePublish
  777. *
  778. * @param {Object} packet
  779. * @api private
  780. */
  781. /*
  782. those late 2 case should be rewrite to comply with coding style:
  783. case 1:
  784. case 0:
  785. // do not wait sending a puback
  786. // no callback passed
  787. if (1 === qos) {
  788. this._sendPacket({
  789. cmd: 'puback',
  790. messageId: mid
  791. });
  792. }
  793. // emit the message event for both qos 1 and 0
  794. this.emit('message', topic, message, packet);
  795. this.handleMessage(packet, done);
  796. break;
  797. default:
  798. // do nothing but every switch mus have a default
  799. // log or throw an error about unknown qos
  800. break;
  801. for now i just suppressed the warnings
  802. */
  803. MqttClient.prototype._handlePublish = function (packet, done) {
  804. var topic = packet.topic.toString()
  805. var message = packet.payload
  806. var qos = packet.qos
  807. var mid = packet.messageId
  808. var that = this
  809. switch (qos) {
  810. case 2:
  811. this.incomingStore.put(packet, function () {
  812. that._sendPacket({cmd: 'pubrec', messageId: mid}, done)
  813. })
  814. break
  815. case 1:
  816. // emit the message event
  817. this.emit('message', topic, message, packet)
  818. this.handleMessage(packet, function (err) {
  819. if (err) {
  820. return done && done(err)
  821. }
  822. // send 'puback' if the above 'handleMessage' method executed
  823. // successfully.
  824. that._sendPacket({cmd: 'puback', messageId: mid}, done)
  825. })
  826. break
  827. case 0:
  828. // emit the message event
  829. this.emit('message', topic, message, packet)
  830. this.handleMessage(packet, done)
  831. break
  832. default:
  833. // do nothing
  834. // log or throw an error about unknown qos
  835. break
  836. }
  837. }
  838. /**
  839. * Handle messages with backpressure support, one at a time.
  840. * Override at will.
  841. *
  842. * @param Packet packet the packet
  843. * @param Function callback call when finished
  844. * @api public
  845. */
  846. MqttClient.prototype.handleMessage = function (packet, callback) {
  847. callback()
  848. }
  849. /**
  850. * _handleAck
  851. *
  852. * @param {Object} packet
  853. * @api private
  854. */
  855. MqttClient.prototype._handleAck = function (packet) {
  856. /* eslint no-fallthrough: "off" */
  857. var mid = packet.messageId
  858. var type = packet.cmd
  859. var response = null
  860. var cb = this.outgoing[mid]
  861. var that = this
  862. if (!cb) {
  863. // Server sent an ack in error, ignore it.
  864. return
  865. }
  866. // Process
  867. switch (type) {
  868. case 'pubcomp':
  869. // same thing as puback for QoS 2
  870. case 'puback':
  871. // Callback - we're done
  872. delete this.outgoing[mid]
  873. this.outgoingStore.del(packet, cb)
  874. break
  875. case 'pubrec':
  876. response = {
  877. cmd: 'pubrel',
  878. qos: 2,
  879. messageId: mid
  880. }
  881. this._sendPacket(response)
  882. break
  883. case 'suback':
  884. delete this.outgoing[mid]
  885. if (packet.granted.length === 1 && (packet.granted[0] & 0x80) !== 0) {
  886. // suback with Failure status
  887. var topics = this.messageIdToTopic[mid]
  888. if (topics) {
  889. topics.forEach(function (topic) {
  890. delete that._resubscribeTopics[topic]
  891. })
  892. }
  893. }
  894. cb(null, packet)
  895. break
  896. case 'unsuback':
  897. delete this.outgoing[mid]
  898. cb(null)
  899. break
  900. default:
  901. that.emit('error', new Error('unrecognized packet type'))
  902. }
  903. if (this.disconnecting &&
  904. Object.keys(this.outgoing).length === 0) {
  905. this.emit('outgoingEmpty')
  906. }
  907. }
  908. /**
  909. * _handlePubrel
  910. *
  911. * @param {Object} packet
  912. * @api private
  913. */
  914. MqttClient.prototype._handlePubrel = function (packet, callback) {
  915. var mid = packet.messageId
  916. var that = this
  917. var comp = {cmd: 'pubcomp', messageId: mid}
  918. that.incomingStore.get(packet, function (err, pub) {
  919. if (!err && pub.cmd !== 'pubrel') {
  920. that.emit('message', pub.topic, pub.payload, pub)
  921. that.incomingStore.put(packet)
  922. that.handleMessage(pub, function (err) {
  923. if (err) {
  924. return callback && callback(err)
  925. }
  926. that._sendPacket(comp, callback)
  927. })
  928. } else {
  929. that._sendPacket(comp, callback)
  930. }
  931. })
  932. }
  933. /**
  934. * _nextId
  935. */
  936. MqttClient.prototype._nextId = function () {
  937. var id = this.nextId++
  938. // Ensure 16 bit unsigned int:
  939. if (id === 65535) {
  940. this.nextId = 1
  941. }
  942. return id
  943. }
  944. /**
  945. * getLastMessageId
  946. */
  947. MqttClient.prototype.getLastMessageId = function () {
  948. return (this.nextId === 1) ? 65535 : (this.nextId - 1)
  949. }
  950. module.exports = MqttClient