wechat wss demo

abstract_client.js 63KB


  1. 'use strict'
  2. /**
  3. * Testing dependencies
  4. */
  5. var should = require('should')
  6. var sinon = require('sinon')
  7. var mqtt = require('../')
  8. var xtend = require('xtend')
  9. var Server = require('./server')
  10. var Store = require('./../lib/store')
  11. var port = 9876
  12. module.exports = function (server, config) {
  13. function connect (opts) {
  14. opts = xtend(config, opts)
  15. return mqtt.connect(opts)
  16. }
  17. describe('closing', function () {
  18. it('should emit close if stream closes', function (done) {
  19. var client = connect()
  20. client.once('connect', function () {
  21. client.stream.end()
  22. })
  23. client.once('close', function () {
  24. client.end()
  25. done()
  26. })
  27. })
  28. it('should mark the client as disconnected', function (done) {
  29. var client = connect()
  30. client.once('close', function () {
  31. client.end()
  32. if (!client.connected) {
  33. done()
  34. } else {
  35. done(new Error('Not marked as disconnected'))
  36. }
  37. })
  38. client.once('connect', function () {
  39. client.stream.end()
  40. })
  41. })
  42. it('should stop ping timer if stream closes', function (done) {
  43. var client = connect()
  44. client.once('close', function () {
  45. should.not.exist(client.pingTimer)
  46. client.end()
  47. done()
  48. })
  49. client.once('connect', function () {
  50. should.exist(client.pingTimer)
  51. client.stream.end()
  52. })
  53. })
  54. it('should emit close after end called', function (done) {
  55. var client = connect()
  56. client.once('close', function () {
  57. done()
  58. })
  59. client.once('connect', function () {
  60. client.end()
  61. })
  62. })
  63. it('should emit end after end called and client must be disconnected', function (done) {
  64. var client = connect()
  65. client.once('end', function () {
  66. if (client.disconnected) {
  67. return done()
  68. }
  69. done(new Error('client must be disconnected'))
  70. })
  71. client.once('connect', function () {
  72. client.end()
  73. })
  74. })
  75. it('should pass store close error to end callback but not to end listeners', function (done) {
  76. var store = new Store()
  77. var client = connect({outgoingStore: store})
  78. store.close = function (cb) {
  79. cb(new Error('test'))
  80. }
  81. client.once('end', function () {
  82. if (arguments.length === 0) {
  83. return done()
  84. }
  85. throw new Error('no argument shoould be passed to event')
  86. })
  87. client.once('connect', function () {
  88. client.end(function (test) {
  89. if (test && test.message === 'test') {
  90. return
  91. }
  92. throw new Error('bad argument passed to callback')
  93. })
  94. })
  95. })
  96. it('should return `this` if end called twice', function (done) {
  97. var client = connect()
  98. client.once('connect', function () {
  99. client.end()
  100. var value = client.end()
  101. if (value === client) {
  102. done()
  103. } else {
  104. done(new Error('Not returning client.'))
  105. }
  106. })
  107. })
  108. it('should emit end only on first client end', function (done) {
  109. var client = connect()
  110. client.once('end', function () {
  111. var timeout = setTimeout(done.bind(null), 200)
  112. client.once('end', function () {
  113. clearTimeout(timeout)
  114. done(new Error('end was emitted twice'))
  115. })
  116. client.end()
  117. })
  118. client.once('connect', client.end.bind(client))
  119. })
  120. it('should stop ping timer after end called', function (done) {
  121. var client = connect()
  122. client.once('connect', function () {
  123. should.exist(client.pingTimer)
  124. client.end()
  125. should.not.exist(client.pingTimer)
  126. done()
  127. })
  128. })
  129. it('should be able to end even on a failed connection', function (done) {
  130. var client = connect({host: 'this_hostname_should_not_exist'})
  131. var timeout = setTimeout(function () {
  132. done(new Error('Failed to end a disconnected client'))
  133. }, 500)
  134. setTimeout(function () {
  135. client.end(function () {
  136. clearTimeout(timeout)
  137. done()
  138. })
  139. }, 200)
  140. })
  141. it('should emit end even on a failed connection', function (done) {
  142. var client = connect({host: 'this_hostname_should_not_exist'})
  143. var timeout = setTimeout(function () {
  144. done(new Error('Disconnected client has failed to emit end'))
  145. }, 500)
  146. client.once('end', function () {
  147. clearTimeout(timeout)
  148. done()
  149. })
  150. setTimeout(client.end.bind(client), 200)
  151. })
  152. it('should emit end only once for a reconnecting client', function (done) {
  153. var client = connect({host: 'this_hostname_should_not_exist', connectTimeout: 10, reconnectPeriod: 10})
  154. client.once('end', function () {
  155. var timeout = setTimeout(done.bind(null))
  156. client.once('end', function () {
  157. clearTimeout(timeout)
  158. done(new Error('end emitted twice'))
  159. })
  160. })
  161. setTimeout(client.end.bind(client), 300)
  162. })
  163. })
  164. describe('connecting', function () {
  165. it('should connect to the broker', function (done) {
  166. var client = connect()
  167. client.on('error', done)
  168. server.once('client', function () {
  169. client.end()
  170. done()
  171. })
  172. })
  173. it('should send a default client id', function (done) {
  174. var client = connect()
  175. client.on('error', done)
  176. server.once('client', function (serverClient) {
  177. serverClient.once('connect', function (packet) {
  178. packet.clientId.should.match(/mqttjs.*/)
  179. serverClient.disconnect()
  180. done()
  181. })
  182. })
  183. })
  184. it('should send be clean by default', function (done) {
  185. var client = connect()
  186. client.on('error', done)
  187. server.once('client', function (serverClient) {
  188. serverClient.once('connect', function (packet) {
  189. packet.clean.should.be.true()
  190. serverClient.disconnect()
  191. done()
  192. })
  193. })
  194. })
  195. it('should connect with the given client id', function (done) {
  196. var client = connect({clientId: 'testclient'})
  197. client.on('error', function (err) {
  198. throw err
  199. })
  200. server.once('client', function (serverClient) {
  201. serverClient.once('connect', function (packet) {
  202. packet.clientId.should.match(/testclient/)
  203. serverClient.disconnect()
  204. done()
  205. })
  206. })
  207. })
  208. it('should connect with the client id and unclean state', function (done) {
  209. var client = connect({clientId: 'testclient', clean: false})
  210. client.on('error', function (err) {
  211. throw err
  212. })
  213. server.once('client', function (serverClient) {
  214. serverClient.once('connect', function (packet) {
  215. packet.clientId.should.match(/testclient/)
  216. packet.clean.should.be.false()
  217. serverClient.disconnect()
  218. done()
  219. })
  220. })
  221. })
  222. it('should require a clientId with clean=false', function (done) {
  223. try {
  224. var client = connect({ clean: false })
  225. client.on('error', function (err) {
  226. done(err)
  227. // done(new Error('should have thrown'));
  228. })
  229. } catch (err) {
  230. done()
  231. }
  232. })
  233. it('should default to localhost', function (done) {
  234. var client = connect({clientId: 'testclient'})
  235. client.on('error', function (err) {
  236. throw err
  237. })
  238. server.once('client', function (serverClient) {
  239. serverClient.once('connect', function (packet) {
  240. packet.clientId.should.match(/testclient/)
  241. serverClient.disconnect()
  242. done()
  243. })
  244. })
  245. })
  246. it('should emit connect', function (done) {
  247. var client = connect()
  248. client.once('connect', function () {
  249. client.end()
  250. done()
  251. })
  252. client.once('error', done)
  253. })
  254. it('should provide connack packet with connect event', function (done) {
  255. server.once('client', function (serverClient) {
  256. serverClient.connack({returnCode: 0, sessionPresent: true})
  257. server.once('client', function (serverClient) {
  258. serverClient.connack({returnCode: 0, sessionPresent: false})
  259. })
  260. })
  261. var client = connect()
  262. client.once('connect', function (packet) {
  263. should(packet.sessionPresent).be.equal(true)
  264. client.once('connect', function (packet) {
  265. should(packet.sessionPresent).be.equal(false)
  266. client.end()
  267. done()
  268. })
  269. })
  270. })
  271. it('should mark the client as connected', function (done) {
  272. var client = connect()
  273. client.once('connect', function () {
  274. client.end()
  275. if (client.connected) {
  276. done()
  277. } else {
  278. done(new Error('Not marked as connected'))
  279. }
  280. })
  281. })
  282. it('should emit error', function (done) {
  283. var client = connect({clientId: 'invalid'})
  284. client.once('connect', function () {
  285. done(new Error('Should not emit connect'))
  286. })
  287. client.once('error', function (error) {
  288. should(error.code).be.equal(2) // code for clientID identifer rejected
  289. client.end()
  290. done()
  291. })
  292. })
  293. it('should have different client ids', function (done) {
  294. var client1 = connect()
  295. var client2 = connect()
  296. client1.options.clientId.should.not.equal(client2.options.clientId)
  297. client1.end(true)
  298. client2.end(true)
  299. setImmediate(done)
  300. })
  301. })
  302. describe('handling offline states', function () {
  303. it('should emit offline events once when the client transitions from connected states to disconnected ones', function (done) {
  304. var client = connect({reconnectPeriod: 20})
  305. client.on('connect', function () {
  306. this.stream.end()
  307. })
  308. client.on('offline', function () {
  309. client.end(true, done)
  310. })
  311. })
  312. it('should emit offline events once when the client (at first) can NOT connect to servers', function (done) {
  313. // fake a port
  314. var client = connect({ reconnectPeriod: 20, port: 4557 })
  315. client.on('offline', function () {
  316. client.end(true, done)
  317. })
  318. })
  319. })
  320. describe('topic validations when subscribing', function () {
  321. it('should be ok for well-formated topics', function (done) {
  322. var client = connect()
  323. client.subscribe(
  324. [
  325. '+', '+/event', 'event/+', '#', 'event/#', 'system/event/+',
  326. 'system/+/event', 'system/registry/event/#', 'system/+/event/#',
  327. 'system/registry/event/new_device', 'system/+/+/new_device'
  328. ],
  329. function (err) {
  330. client.end(function () {
  331. if (err) {
  332. return done(new Error(err))
  333. }
  334. done()
  335. })
  336. }
  337. )
  338. })
  339. it('should return an error (via callbacks) for topic #/event', function (done) {
  340. var client = connect()
  341. client.subscribe(['#/event', 'event#', 'event+'], function (err) {
  342. client.end(false, function () {
  343. if (err) {
  344. return done()
  345. }
  346. done(new Error('Validations do NOT work'))
  347. })
  348. })
  349. })
  350. it('should return an empty array for duplicate subs', function (done) {
  351. var client = connect()
  352. client.subscribe('event', function (err, granted1) {
  353. if (err) {
  354. return done(err)
  355. }
  356. client.subscribe('event', function (err, granted2) {
  357. if (err) {
  358. return done(err)
  359. }
  360. granted2.should.Array()
  361. granted2.should.be.empty()
  362. done()
  363. })
  364. })
  365. })
  366. it('should return an error (via callbacks) for topic #/event', function (done) {
  367. var client = connect()
  368. client.subscribe('#/event', function (err) {
  369. client.end(function () {
  370. if (err) {
  371. return done()
  372. }
  373. done(new Error('Validations do NOT work'))
  374. })
  375. })
  376. })
  377. it('should return an error (via callbacks) for topic event#', function (done) {
  378. var client = connect()
  379. client.subscribe('event#', function (err) {
  380. client.end(function () {
  381. if (err) {
  382. return done()
  383. }
  384. done(new Error('Validations do NOT work'))
  385. })
  386. })
  387. })
  388. it('should return an error (via callbacks) for topic system/#/event', function (done) {
  389. var client = connect()
  390. client.subscribe('system/#/event', function (err) {
  391. client.end(function () {
  392. if (err) {
  393. return done()
  394. }
  395. done(new Error('Validations do NOT work'))
  396. })
  397. })
  398. })
  399. it('should return an error (via callbacks) for empty topic list', function (done) {
  400. var client = connect()
  401. client.subscribe([], function (err) {
  402. client.end()
  403. if (err) {
  404. return done()
  405. }
  406. done(new Error('Validations do NOT work'))
  407. })
  408. })
  409. it('should return an error (via callbacks) for topic system/+/#/event', function (done) {
  410. var client = connect()
  411. client.subscribe('system/+/#/event', function (err) {
  412. client.end(true, function () {
  413. if (err) {
  414. return done()
  415. }
  416. done(new Error('Validations do NOT work'))
  417. })
  418. })
  419. })
  420. })
  421. describe('offline messages', function () {
  422. it('should queue message until connected', function (done) {
  423. var client = connect()
  424. client.publish('test', 'test')
  425. client.subscribe('test')
  426. client.unsubscribe('test')
  427. client.queue.length.should.equal(3)
  428. client.once('connect', function () {
  429. client.queue.length.should.equal(0)
  430. setTimeout(function () {
  431. client.end(true, done)
  432. }, 10)
  433. })
  434. })
  435. it('should not queue qos 0 messages if queueQoSZero is false', function (done) {
  436. var client = connect({queueQoSZero: false})
  437. client.publish('test', 'test', {qos: 0})
  438. client.queue.length.should.equal(0)
  439. client.on('connect', function () {
  440. setTimeout(function () {
  441. client.end(true, done)
  442. }, 10)
  443. })
  444. })
  445. it('should queue qos != 0 messages', function (done) {
  446. var client = connect({queueQoSZero: false})
  447. client.publish('test', 'test', {qos: 1})
  448. client.publish('test', 'test', {qos: 2})
  449. client.subscribe('test')
  450. client.unsubscribe('test')
  451. client.queue.length.should.equal(2)
  452. client.on('connect', function () {
  453. setTimeout(function () {
  454. client.end(true, done)
  455. }, 10)
  456. })
  457. })
  458. it('should call cb if an outgoing QoS 0 message is not sent', function (done) {
  459. var client = connect({queueQoSZero: false})
  460. var called = false
  461. client.publish('test', 'test', {qos: 0}, function () {
  462. called = true
  463. })
  464. client.on('connect', function () {
  465. called.should.equal(true)
  466. setTimeout(function () {
  467. client.end(true, done)
  468. }, 10)
  469. })
  470. })
  471. it('should delay ending up until all inflight messages are delivered', function (done) {
  472. var client = connect()
  473. var subscribeCalled = false
  474. client.on('connect', function () {
  475. client.subscribe('test', function () {
  476. subscribeCalled = true
  477. })
  478. client.publish('test', 'test', function () {
  479. client.end(false, function () {
  480. subscribeCalled.should.be.equal(true)
  481. done()
  482. })
  483. })
  484. })
  485. })
  486. it('wait QoS 1 publish messages', function (done) {
  487. var client = connect()
  488. var messageReceived = false
  489. client.on('connect', function () {
  490. client.subscribe('test')
  491. client.publish('test', 'test', { qos: 1 }, function () {
  492. client.end(false, function () {
  493. messageReceived.should.equal(true)
  494. done()
  495. })
  496. })
  497. client.on('message', function () {
  498. messageReceived = true
  499. })
  500. })
  501. server.once('client', function (serverClient) {
  502. serverClient.on('subscribe', function () {
  503. serverClient.on('publish', function (packet) {
  504. serverClient.publish(packet)
  505. })
  506. })
  507. })
  508. })
  509. it('does not wait acks when force-closing', function (done) {
  510. // non-running broker
  511. var client = connect('mqtt://localhost:8993')
  512. client.publish('test', 'test', { qos: 1 })
  513. client.end(true, done)
  514. })
  515. it('should call cb if store.put fails', function (done) {
  516. const store = new Store()
  517. store.put = function (packet, cb) {
  518. process.nextTick(cb, new Error('oops there is an error'))
  519. }
  520. var client = connect({ incomingStore: store, outgoingStore: store })
  521. client.publish('test', 'test', { qos: 2 }, function (err) {
  522. if (err) {
  523. client.end(true, done)
  524. }
  525. })
  526. })
  527. })
  528. describe('publishing', function () {
  529. it('should publish a message (offline)', function (done) {
  530. var client = connect()
  531. var payload = 'test'
  532. var topic = 'test'
  533. client.publish(topic, payload)
  534. server.on('client', onClient)
  535. function onClient (serverClient) {
  536. serverClient.once('connect', function () {
  537. server.removeListener('client', onClient)
  538. })
  539. serverClient.once('publish', function (packet) {
  540. packet.topic.should.equal(topic)
  541. packet.payload.toString().should.equal(payload)
  542. packet.qos.should.equal(0)
  543. packet.retain.should.equal(false)
  544. client.end(true, done)
  545. })
  546. }
  547. })
  548. it('should publish a message (online)', function (done) {
  549. var client = connect()
  550. var payload = 'test'
  551. var topic = 'test'
  552. client.on('connect', function () {
  553. client.publish(topic, payload)
  554. })
  555. server.once('client', function (serverClient) {
  556. serverClient.once('publish', function (packet) {
  557. packet.topic.should.equal(topic)
  558. packet.payload.toString().should.equal(payload)
  559. packet.qos.should.equal(0)
  560. packet.retain.should.equal(false)
  561. client.end()
  562. done()
  563. })
  564. })
  565. })
  566. it('should publish a message (retain, offline)', function (done) {
  567. var client = connect({ queueQoSZero: true })
  568. var payload = 'test'
  569. var topic = 'test'
  570. var called = false
  571. client.publish(topic, payload, { retain: true }, function () {
  572. called = true
  573. })
  574. server.once('client', function (serverClient) {
  575. serverClient.once('publish', function (packet) {
  576. packet.topic.should.equal(topic)
  577. packet.payload.toString().should.equal(payload)
  578. packet.qos.should.equal(0)
  579. packet.retain.should.equal(true)
  580. called.should.equal(true)
  581. client.end()
  582. done()
  583. })
  584. })
  585. })
  586. it('should emit a packetsend event', function (done) {
  587. var client = connect()
  588. var payload = 'test_payload'
  589. var testTopic = 'testTopic'
  590. client.on('packetsend', function (packet) {
  591. if (packet.cmd === 'publish') {
  592. packet.qos.should.equal(0)
  593. packet.topic.should.equal(testTopic)
  594. packet.payload.should.equal(payload)
  595. packet.retain.should.equal(false)
  596. client.end()
  597. done()
  598. }
  599. })
  600. client.publish(testTopic, payload)
  601. })
  602. it('should accept options', function (done) {
  603. var client = connect()
  604. var payload = 'test'
  605. var topic = 'test'
  606. var opts = {
  607. retain: true,
  608. qos: 1
  609. }
  610. client.once('connect', function () {
  611. client.publish(topic, payload, opts)
  612. })
  613. server.once('client', function (serverClient) {
  614. serverClient.once('publish', function (packet) {
  615. packet.topic.should.equal(topic)
  616. packet.payload.toString().should.equal(payload)
  617. packet.qos.should.equal(opts.qos, 'incorrect qos')
  618. packet.retain.should.equal(opts.retain, 'incorrect ret')
  619. packet.dup.should.equal(false, 'incorrect dup')
  620. client.end()
  621. done()
  622. })
  623. })
  624. })
  625. it('should publish with the default options for an empty parameter', function (done) {
  626. var client = connect()
  627. var payload = 'test'
  628. var topic = 'test'
  629. var defaultOpts = {qos: 0, retain: false, dup: false}
  630. client.once('connect', function () {
  631. client.publish(topic, payload, {})
  632. })
  633. server.once('client', function (serverClient) {
  634. serverClient.once('publish', function (packet) {
  635. packet.topic.should.equal(topic)
  636. packet.payload.toString().should.equal(payload)
  637. packet.qos.should.equal(defaultOpts.qos, 'incorrect qos')
  638. packet.retain.should.equal(defaultOpts.retain, 'incorrect ret')
  639. packet.dup.should.equal(defaultOpts.dup, 'incorrect dup')
  640. client.end()
  641. done()
  642. })
  643. })
  644. })
  645. it('should mark a message as duplicate when "dup" option is set', function (done) {
  646. var client = connect()
  647. var payload = 'duplicated-test'
  648. var topic = 'test'
  649. var opts = {
  650. retain: true,
  651. qos: 1,
  652. dup: true
  653. }
  654. client.once('connect', function () {
  655. client.publish(topic, payload, opts)
  656. })
  657. server.once('client', function (serverClient) {
  658. serverClient.once('publish', function (packet) {
  659. packet.topic.should.equal(topic)
  660. packet.payload.toString().should.equal(payload)
  661. packet.dup.should.equal(opts.dup, 'incorrect dup')
  662. client.end()
  663. done()
  664. })
  665. })
  666. })
  667. it('should fire a callback (qos 0)', function (done) {
  668. var client = connect()
  669. client.once('connect', function () {
  670. client.publish('a', 'b', function () {
  671. client.end()
  672. done()
  673. })
  674. })
  675. })
  676. it('should fire a callback (qos 1)', function (done) {
  677. var client = connect()
  678. var opts = { qos: 1 }
  679. client.once('connect', function () {
  680. client.publish('a', 'b', opts, function () {
  681. client.end()
  682. done()
  683. })
  684. })
  685. })
  686. it('should fire a callback (qos 2)', function (done) {
  687. var client = connect()
  688. var opts = { qos: 2 }
  689. client.once('connect', function () {
  690. client.publish('a', 'b', opts, function () {
  691. client.end()
  692. done()
  693. })
  694. })
  695. })
  696. it('should support UTF-8 characters in topic', function (done) {
  697. var client = connect()
  698. client.once('connect', function () {
  699. client.publish('中国', 'hello', function () {
  700. client.end()
  701. done()
  702. })
  703. })
  704. })
  705. it('should support UTF-8 characters in payload', function (done) {
  706. var client = connect()
  707. client.once('connect', function () {
  708. client.publish('hello', '中国', function () {
  709. client.end()
  710. done()
  711. })
  712. })
  713. })
  714. it('should publish 10 QoS 2 and receive them', function (done) {
  715. var client = connect()
  716. var count = 0
  717. client.on('connect', function () {
  718. client.subscribe('test')
  719. client.publish('test', 'test', { qos: 2 })
  720. })
  721. client.on('message', function () {
  722. if (count >= 10) {
  723. client.end()
  724. done()
  725. } else {
  726. client.publish('test', 'test', { qos: 2 })
  727. }
  728. })
  729. server.once('client', function (serverClient) {
  730. serverClient.on('offline', function () {
  731. client.end()
  732. done('error went offline... didnt see this happen')
  733. })
  734. serverClient.on('subscribe', function () {
  735. serverClient.on('publish', function (packet) {
  736. serverClient.publish(packet)
  737. })
  738. })
  739. serverClient.on('pubrel', function () {
  740. count++
  741. })
  742. })
  743. })
  744. function testQosHandleMessage (qos, done) {
  745. var client = connect()
  746. var messageEventCount = 0
  747. var handleMessageCount = 0
  748. client.handleMessage = function (packet, callback) {
  749. setTimeout(function () {
  750. handleMessageCount++
  751. // next message event should not emit until handleMessage completes
  752. handleMessageCount.should.equal(messageEventCount)
  753. if (handleMessageCount === 10) {
  754. setTimeout(function () {
  755. client.end()
  756. done()
  757. })
  758. }
  759. callback()
  760. }, 100)
  761. }
  762. client.on('message', function (topic, message, packet) {
  763. messageEventCount++
  764. })
  765. client.on('connect', function () {
  766. client.subscribe('test')
  767. })
  768. server.once('client', function (serverClient) {
  769. serverClient.on('offline', function () {
  770. client.end()
  771. done('error went offline... didnt see this happen')
  772. })
  773. serverClient.on('subscribe', function () {
  774. for (var i = 0; i < 10; i++) {
  775. serverClient.publish({
  776. messageId: i,
  777. topic: 'test',
  778. payload: 'test' + i,
  779. qos: qos
  780. })
  781. }
  782. })
  783. })
  784. }
  785. it('should publish 10 QoS 0 and receive them only when `handleMessage` finishes', function (done) {
  786. testQosHandleMessage(0, done)
  787. })
  788. it('should publish 10 QoS 1 and receive them only when `handleMessage` finishes', function (done) {
  789. testQosHandleMessage(1, done)
  790. })
  791. it('should publish 10 QoS 2 and receive them only when `handleMessage` finishes', function (done) {
  792. testQosHandleMessage(2, done)
  793. })
  794. it('should not send a `puback` if the execution of `handleMessage` fails for messages with QoS `1`', function (done) {
  795. var client = connect()
  796. client.handleMessage = function (packet, callback) {
  797. callback(new Error('Error thrown by the application'))
  798. }
  799. client._sendPacket = sinon.spy()
  800. client._handlePublish({
  801. messageId: Math.floor(65535 * Math.random()),
  802. topic: 'test',
  803. payload: 'test',
  804. qos: 1
  805. }, function (err) {
  806. should.exist(err)
  807. })
  808. client._sendPacket.callCount.should.equal(0)
  809. client.end()
  810. client.on('connect', function () { done() })
  811. })
  812. it('should silently ignore errors thrown by `handleMessage` and return when no callback is passed ' +
  813. 'into `handlePublish` method', function (done) {
  814. var client = connect()
  815. client.handleMessage = function (packet, callback) {
  816. callback(new Error('Error thrown by the application'))
  817. }
  818. try {
  819. client._handlePublish({
  820. messageId: Math.floor(65535 * Math.random()),
  821. topic: 'test',
  822. payload: 'test',
  823. qos: 1
  824. })
  825. done()
  826. } catch (err) {
  827. done(err)
  828. } finally {
  829. client.end()
  830. }
  831. })
  832. it('should not send a `pubcomp` if the execution of `handleMessage` fails for messages with QoS `2`', function (done) {
  833. var store = new Store()
  834. var client = connect({incomingStore: store})
  835. var messageId = Math.floor(65535 * Math.random())
  836. var topic = 'test'
  837. var payload = 'test'
  838. var qos = 2
  839. client.handleMessage = function (packet, callback) {
  840. callback(new Error('Error thrown by the application'))
  841. }
  842. client.once('connect', function () {
  843. client.subscribe(topic, {qos: 2})
  844. store.put({
  845. messageId: messageId,
  846. topic: topic,
  847. payload: payload,
  848. qos: qos,
  849. cmd: 'publish'
  850. }, function () {
  851. // cleans up the client
  852. client.end()
  853. client._sendPacket = sinon.spy()
  854. client._handlePubrel({cmd: 'pubrel', messageId: messageId}, function (err) {
  855. should.exist(err)
  856. })
  857. client._sendPacket.callCount.should.equal(0)
  858. done()
  859. })
  860. })
  861. })
  862. it('should silently ignore errors thrown by `handleMessage` and return when no callback is passed ' +
  863. 'into `handlePubrel` method', function (done) {
  864. var store = new Store()
  865. var client = connect({incomingStore: store})
  866. var messageId = Math.floor(65535 * Math.random())
  867. var topic = 'test'
  868. var payload = 'test'
  869. var qos = 2
  870. client.handleMessage = function (packet, callback) {
  871. callback(new Error('Error thrown by the application'))
  872. }
  873. client.once('connect', function () {
  874. client.subscribe(topic, {qos: 2})
  875. store.put({
  876. messageId: messageId,
  877. topic: topic,
  878. payload: payload,
  879. qos: qos,
  880. cmd: 'publish'
  881. }, function () {
  882. try {
  883. client._handlePubrel({cmd: 'pubrel', messageId: messageId})
  884. done()
  885. } catch (err) {
  886. done(err)
  887. } finally {
  888. client.end()
  889. }
  890. })
  891. })
  892. })
  893. })
  894. describe('unsubscribing', function () {
  895. it('should send an unsubscribe packet (offline)', function (done) {
  896. var client = connect()
  897. client.unsubscribe('test')
  898. server.once('client', function (serverClient) {
  899. serverClient.once('unsubscribe', function (packet) {
  900. packet.unsubscriptions.should.containEql('test')
  901. client.end()
  902. done()
  903. })
  904. })
  905. })
  906. it('should send an unsubscribe packet', function (done) {
  907. var client = connect()
  908. var topic = 'topic'
  909. client.once('connect', function () {
  910. client.unsubscribe(topic)
  911. })
  912. server.once('client', function (serverClient) {
  913. serverClient.once('unsubscribe', function (packet) {
  914. packet.unsubscriptions.should.containEql(topic)
  915. client.end()
  916. done()
  917. })
  918. })
  919. })
  920. it('should emit a packetsend event', function (done) {
  921. var client = connect()
  922. var testTopic = 'testTopic'
  923. client.once('connect', function () {
  924. client.subscribe(testTopic)
  925. })
  926. client.on('packetsend', function (packet) {
  927. if (packet.cmd === 'subscribe') {
  928. client.end()
  929. done()
  930. }
  931. })
  932. })
  933. it('should emit a packetreceive event', function (done) {
  934. var client = connect()
  935. var testTopic = 'testTopic'
  936. client.once('connect', function () {
  937. client.subscribe(testTopic)
  938. })
  939. client.on('packetreceive', function (packet) {
  940. if (packet.cmd === 'suback') {
  941. client.end()
  942. done()
  943. }
  944. })
  945. })
  946. it('should accept an array of unsubs', function (done) {
  947. var client = connect()
  948. var topics = ['topic1', 'topic2']
  949. client.once('connect', function () {
  950. client.unsubscribe(topics)
  951. })
  952. server.once('client', function (serverClient) {
  953. serverClient.once('unsubscribe', function (packet) {
  954. packet.unsubscriptions.should.eql(topics)
  955. done()
  956. })
  957. })
  958. })
  959. it('should fire a callback on unsuback', function (done) {
  960. var client = connect()
  961. var topic = 'topic'
  962. client.once('connect', function () {
  963. client.unsubscribe(topic, done)
  964. })
  965. server.once('client', function (serverClient) {
  966. serverClient.once('unsubscribe', function (packet) {
  967. serverClient.unsuback(packet)
  968. client.end()
  969. })
  970. })
  971. })
  972. it('should unsubscribe from a chinese topic', function (done) {
  973. var client = connect()
  974. var topic = '中国'
  975. client.once('connect', function () {
  976. client.unsubscribe(topic)
  977. })
  978. server.once('client', function (serverClient) {
  979. serverClient.once('unsubscribe', function (packet) {
  980. packet.unsubscriptions.should.containEql(topic)
  981. client.end()
  982. done()
  983. })
  984. })
  985. })
  986. })
  987. describe('keepalive', function () {
  988. var clock
  989. beforeEach(function () {
  990. clock = sinon.useFakeTimers()
  991. })
  992. afterEach(function () {
  993. clock.restore()
  994. })
  995. it('should checkPing at keepalive interval', function (done) {
  996. var interval = 3
  997. var client = connect({ keepalive: interval })
  998. client._checkPing = sinon.spy()
  999. client.once('connect', function () {
  1000. clock.tick(interval * 1000)
  1001. client._checkPing.callCount.should.equal(1)
  1002. clock.tick(interval * 1000)
  1003. client._checkPing.callCount.should.equal(2)
  1004. clock.tick(interval * 1000)
  1005. client._checkPing.callCount.should.equal(3)
  1006. client.end()
  1007. done()
  1008. })
  1009. })
  1010. it('should not checkPing if publishing at a higher rate than keepalive', function (done) {
  1011. var intervalMs = 3000
  1012. var client = connect({keepalive: intervalMs / 1000})
  1013. client._checkPing = sinon.spy()
  1014. client.once('connect', function () {
  1015. client.publish('foo', 'bar')
  1016. clock.tick(intervalMs - 1)
  1017. client.publish('foo', 'bar')
  1018. clock.tick(2)
  1019. client._checkPing.callCount.should.equal(0)
  1020. client.end()
  1021. done()
  1022. })
  1023. })
  1024. it('should checkPing if publishing at a higher rate than keepalive and reschedulePings===false', function (done) {
  1025. var intervalMs = 3000
  1026. var client = connect({
  1027. keepalive: intervalMs / 1000,
  1028. reschedulePings: false
  1029. })
  1030. client._checkPing = sinon.spy()
  1031. client.once('connect', function () {
  1032. client.publish('foo', 'bar')
  1033. clock.tick(intervalMs - 1)
  1034. client.publish('foo', 'bar')
  1035. clock.tick(2)
  1036. client._checkPing.callCount.should.equal(1)
  1037. client.end()
  1038. done()
  1039. })
  1040. })
  1041. })
  1042. describe('pinging', function () {
  1043. it('should set a ping timer', function (done) {
  1044. var client = connect({keepalive: 3})
  1045. client.once('connect', function () {
  1046. should.exist(client.pingTimer)
  1047. client.end()
  1048. done()
  1049. })
  1050. })
  1051. it('should not set a ping timer keepalive=0', function (done) {
  1052. var client = connect({keepalive: 0})
  1053. client.on('connect', function () {
  1054. should.not.exist(client.pingTimer)
  1055. client.end()
  1056. done()
  1057. })
  1058. })
  1059. it('should reconnect if pingresp is not sent', function (done) {
  1060. var client = connect({keepalive: 1, reconnectPeriod: 100})
  1061. // Fake no pingresp being send by stubbing the _handlePingresp function
  1062. client._handlePingresp = function () {}
  1063. client.once('connect', function () {
  1064. client.once('connect', function () {
  1065. client.end()
  1066. done()
  1067. })
  1068. })
  1069. })
  1070. it('should not reconnect if pingresp is successful', function (done) {
  1071. var client = connect({keepalive: 100})
  1072. client.once('close', function () {
  1073. done(new Error('Client closed connection'))
  1074. })
  1075. setTimeout(done, 1000)
  1076. })
  1077. it('should defer the next ping when sending a control packet', function (done) {
  1078. var client = connect({keepalive: 1})
  1079. client.once('connect', function () {
  1080. client._checkPing = sinon.spy()
  1081. client.publish('foo', 'bar')
  1082. setTimeout(function () {
  1083. client._checkPing.callCount.should.equal(0)
  1084. client.publish('foo', 'bar')
  1085. setTimeout(function () {
  1086. client._checkPing.callCount.should.equal(0)
  1087. client.publish('foo', 'bar')
  1088. setTimeout(function () {
  1089. client._checkPing.callCount.should.equal(0)
  1090. done()
  1091. }, 75)
  1092. }, 75)
  1093. }, 75)
  1094. })
  1095. })
  1096. })
  1097. describe('subscribing', function () {
  1098. it('should send a subscribe message (offline)', function (done) {
  1099. var client = connect()
  1100. client.subscribe('test')
  1101. server.once('client', function (serverClient) {
  1102. serverClient.once('subscribe', function () {
  1103. done()
  1104. })
  1105. })
  1106. })
  1107. it('should send a subscribe message', function (done) {
  1108. var client = connect()
  1109. var topic = 'test'
  1110. client.once('connect', function () {
  1111. client.subscribe(topic)
  1112. })
  1113. server.once('client', function (serverClient) {
  1114. serverClient.once('subscribe', function (packet) {
  1115. packet.subscriptions.should.containEql({
  1116. topic: topic,
  1117. qos: 0
  1118. })
  1119. done()
  1120. })
  1121. })
  1122. })
  1123. it('should emit a packetsend event', function (done) {
  1124. var client = connect()
  1125. var testTopic = 'testTopic'
  1126. client.once('connect', function () {
  1127. client.subscribe(testTopic)
  1128. })
  1129. client.on('packetsend', function (packet) {
  1130. if (packet.cmd === 'subscribe') {
  1131. done()
  1132. }
  1133. })
  1134. })
  1135. it('should emit a packetreceive event', function (done) {
  1136. var client = connect()
  1137. var testTopic = 'testTopic'
  1138. client.once('connect', function () {
  1139. client.subscribe(testTopic)
  1140. })
  1141. client.on('packetreceive', function (packet) {
  1142. if (packet.cmd === 'suback') {
  1143. done()
  1144. }
  1145. })
  1146. })
  1147. it('should accept an array of subscriptions', function (done) {
  1148. var client = connect()
  1149. var subs = ['test1', 'test2']
  1150. client.once('connect', function () {
  1151. client.subscribe(subs)
  1152. })
  1153. server.once('client', function (serverClient) {
  1154. serverClient.once('subscribe', function (packet) {
  1155. // i.e. [{topic: 'a', qos: 0}, {topic: 'b', qos: 0}]
  1156. var expected = subs.map(function (i) {
  1157. return {topic: i, qos: 0}
  1158. })
  1159. packet.subscriptions.should.eql(expected)
  1160. done()
  1161. })
  1162. })
  1163. })
  1164. it('should accept an hash of subscriptions', function (done) {
  1165. var client = connect()
  1166. var topics = {
  1167. test1: 0,
  1168. test2: 1
  1169. }
  1170. client.once('connect', function () {
  1171. client.subscribe(topics)
  1172. })
  1173. server.once('client', function (serverClient) {
  1174. serverClient.once('subscribe', function (packet) {
  1175. var k
  1176. var expected = []
  1177. for (k in topics) {
  1178. if (topics.hasOwnProperty(k)) {
  1179. expected.push({
  1180. topic: k,
  1181. qos: topics[k]
  1182. })
  1183. }
  1184. }
  1185. packet.subscriptions.should.eql(expected)
  1186. done()
  1187. })
  1188. })
  1189. })
  1190. it('should accept an options parameter', function (done) {
  1191. var client = connect()
  1192. var topic = 'test'
  1193. var opts = {qos: 1}
  1194. client.once('connect', function () {
  1195. client.subscribe(topic, opts)
  1196. })
  1197. server.once('client', function (serverClient) {
  1198. serverClient.once('subscribe', function (packet) {
  1199. var expected = [{
  1200. topic: topic,
  1201. qos: 1
  1202. }]
  1203. packet.subscriptions.should.eql(expected)
  1204. done()
  1205. })
  1206. })
  1207. })
  1208. it('should subscribe with the default options for an empty options parameter', function (done) {
  1209. var client = connect()
  1210. var topic = 'test'
  1211. var defaultOpts = {qos: 0}
  1212. client.once('connect', function () {
  1213. client.subscribe(topic, {})
  1214. })
  1215. server.once('client', function (serverClient) {
  1216. serverClient.once('subscribe', function (packet) {
  1217. packet.subscriptions.should.containEql({
  1218. topic: topic,
  1219. qos: defaultOpts.qos
  1220. })
  1221. done()
  1222. })
  1223. })
  1224. })
  1225. it('should fire a callback on suback', function (done) {
  1226. var client = connect()
  1227. var topic = 'test'
  1228. client.once('connect', function () {
  1229. client.subscribe(topic, { qos: 2 }, function (err, granted) {
  1230. if (err) {
  1231. done(err)
  1232. } else {
  1233. should.exist(granted, 'granted not given')
  1234. granted.should.containEql({topic: 'test', qos: 2})
  1235. done()
  1236. }
  1237. })
  1238. })
  1239. })
  1240. it('should fire a callback with error if disconnected (options provided)', function (done) {
  1241. var client = connect()
  1242. var topic = 'test'
  1243. client.once('connect', function () {
  1244. client.end(true, function () {
  1245. client.subscribe(topic, {qos: 2}, function (err, granted) {
  1246. should.not.exist(granted, 'granted given')
  1247. should.exist(err, 'no error given')
  1248. done()
  1249. })
  1250. })
  1251. })
  1252. })
  1253. it('should fire a callback with error if disconnected (options not provided)', function (done) {
  1254. var client = connect()
  1255. var topic = 'test'
  1256. client.once('connect', function () {
  1257. client.end(true, function () {
  1258. client.subscribe(topic, function (err, granted) {
  1259. should.not.exist(granted, 'granted given')
  1260. should.exist(err, 'no error given')
  1261. done()
  1262. })
  1263. })
  1264. })
  1265. })
  1266. it('should subscribe with a chinese topic', function (done) {
  1267. var client = connect()
  1268. var topic = '中国'
  1269. client.once('connect', function () {
  1270. client.subscribe(topic)
  1271. })
  1272. server.once('client', function (serverClient) {
  1273. serverClient.once('subscribe', function (packet) {
  1274. packet.subscriptions.should.containEql({
  1275. topic: topic,
  1276. qos: 0
  1277. })
  1278. done()
  1279. })
  1280. })
  1281. })
  1282. })
  1283. describe('receiving messages', function () {
  1284. it('should fire the message event', function (done) {
  1285. var client = connect()
  1286. var testPacket = {
  1287. topic: 'test',
  1288. payload: 'message',
  1289. retain: true,
  1290. qos: 1,
  1291. messageId: 5
  1292. }
  1293. client.subscribe(testPacket.topic)
  1294. client.once('message', function (topic, message, packet) {
  1295. topic.should.equal(testPacket.topic)
  1296. message.toString().should.equal(testPacket.payload)
  1297. packet.should.equal(packet)
  1298. client.end()
  1299. done()
  1300. })
  1301. server.once('client', function (serverClient) {
  1302. serverClient.on('subscribe', function () {
  1303. serverClient.publish(testPacket)
  1304. })
  1305. })
  1306. })
  1307. it('should emit a packetreceive event', function (done) {
  1308. var client = connect()
  1309. var testPacket = {
  1310. topic: 'test',
  1311. payload: 'message',
  1312. retain: true,
  1313. qos: 1,
  1314. messageId: 5
  1315. }
  1316. client.subscribe(testPacket.topic)
  1317. client.on('packetreceive', function (packet) {
  1318. if (packet.cmd === 'publish') {
  1319. packet.qos.should.equal(1)
  1320. packet.topic.should.equal(testPacket.topic)
  1321. packet.payload.toString().should.equal(testPacket.payload)
  1322. packet.retain.should.equal(true)
  1323. client.end()
  1324. done()
  1325. }
  1326. })
  1327. server.once('client', function (serverClient) {
  1328. serverClient.on('subscribe', function () {
  1329. serverClient.publish(testPacket)
  1330. })
  1331. })
  1332. })
  1333. it('should support binary data', function (done) {
  1334. var client = connect({ encoding: 'binary' })
  1335. var testPacket = {
  1336. topic: 'test',
  1337. payload: 'message',
  1338. retain: true,
  1339. qos: 1,
  1340. messageId: 5
  1341. }
  1342. client.subscribe(testPacket.topic)
  1343. client.once('message', function (topic, message, packet) {
  1344. topic.should.equal(testPacket.topic)
  1345. message.should.be.an.instanceOf(Buffer)
  1346. message.toString().should.equal(testPacket.payload)
  1347. packet.should.equal(packet)
  1348. done()
  1349. })
  1350. server.once('client', function (serverClient) {
  1351. serverClient.on('subscribe', function () {
  1352. serverClient.publish(testPacket)
  1353. })
  1354. })
  1355. })
  1356. it('should emit a message event (qos=2)', function (done) {
  1357. var client = connect()
  1358. var testPacket = {
  1359. topic: 'test',
  1360. payload: 'message',
  1361. retain: true,
  1362. qos: 2,
  1363. messageId: 5
  1364. }
  1365. server.testPublish = testPacket
  1366. client.subscribe(testPacket.topic)
  1367. client.once('message', function (topic, message, packet) {
  1368. topic.should.equal(testPacket.topic)
  1369. message.toString().should.equal(testPacket.payload)
  1370. packet.should.equal(packet)
  1371. done()
  1372. })
  1373. server.once('client', function (serverClient) {
  1374. serverClient.on('subscribe', function () {
  1375. serverClient.publish(testPacket)
  1376. })
  1377. })
  1378. })
  1379. it('should emit a message event (qos=2) - repeated publish', function (done) {
  1380. var client = connect()
  1381. var testPacket = {
  1382. topic: 'test',
  1383. payload: 'message',
  1384. retain: true,
  1385. qos: 2,
  1386. messageId: 5
  1387. }
  1388. server.testPublish = testPacket
  1389. client.subscribe(testPacket.topic)
  1390. client.on('message', function (topic, message, packet) {
  1391. topic.should.equal(testPacket.topic)
  1392. message.toString().should.equal(testPacket.payload)
  1393. packet.should.equal(packet)
  1394. done()
  1395. })
  1396. server.once('client', function (serverClient) {
  1397. serverClient.on('subscribe', function () {
  1398. serverClient.publish(testPacket)
  1399. // twice, should be ignored
  1400. serverClient.publish(testPacket)
  1401. })
  1402. })
  1403. })
  1404. it('should support chinese topic', function (done) {
  1405. var client = connect({ encoding: 'binary' })
  1406. var testPacket = {
  1407. topic: '国',
  1408. payload: 'message',
  1409. retain: true,
  1410. qos: 1,
  1411. messageId: 5
  1412. }
  1413. client.subscribe(testPacket.topic)
  1414. client.once('message', function (topic, message, packet) {
  1415. topic.should.equal(testPacket.topic)
  1416. message.should.be.an.instanceOf(Buffer)
  1417. message.toString().should.equal(testPacket.payload)
  1418. packet.should.equal(packet)
  1419. done()
  1420. })
  1421. server.once('client', function (serverClient) {
  1422. serverClient.on('subscribe', function () {
  1423. serverClient.publish(testPacket)
  1424. })
  1425. })
  1426. })
  1427. })
  1428. describe('qos handling', function () {
  1429. it('should follow qos 0 semantics (trivial)', function (done) {
  1430. var client = connect()
  1431. var testTopic = 'test'
  1432. var testMessage = 'message'
  1433. client.once('connect', function () {
  1434. client.subscribe(testTopic, {qos: 0})
  1435. })
  1436. server.once('client', function (serverClient) {
  1437. serverClient.once('subscribe', function () {
  1438. serverClient.publish({
  1439. topic: testTopic,
  1440. payload: testMessage,
  1441. qos: 0,
  1442. retain: false
  1443. })
  1444. done()
  1445. })
  1446. })
  1447. })
  1448. it('should follow qos 1 semantics', function (done) {
  1449. var client = connect()
  1450. var testTopic = 'test'
  1451. var testMessage = 'message'
  1452. var mid = 50
  1453. client.once('connect', function () {
  1454. client.subscribe(testTopic, {qos: 1})
  1455. })
  1456. server.once('client', function (serverClient) {
  1457. serverClient.once('subscribe', function () {
  1458. serverClient.publish({
  1459. topic: testTopic,
  1460. payload: testMessage,
  1461. messageId: mid,
  1462. qos: 1
  1463. })
  1464. })
  1465. serverClient.once('puback', function (packet) {
  1466. packet.messageId.should.equal(mid)
  1467. done()
  1468. })
  1469. })
  1470. })
  1471. it('should follow qos 2 semantics', function (done) {
  1472. var client = connect()
  1473. var testTopic = 'test'
  1474. var testMessage = 'message'
  1475. var mid = 253
  1476. client.once('connect', function () {
  1477. client.subscribe(testTopic, {qos: 2})
  1478. })
  1479. server.once('client', function (serverClient) {
  1480. serverClient.once('subscribe', function () {
  1481. serverClient.publish({
  1482. topic: testTopic,
  1483. payload: testMessage,
  1484. qos: 2,
  1485. messageId: mid
  1486. })
  1487. })
  1488. serverClient.once('pubcomp', function () {
  1489. done()
  1490. })
  1491. })
  1492. })
  1493. })
  1494. describe('auto reconnect', function () {
  1495. it('should mark the client disconnecting if #end called', function () {
  1496. var client = connect()
  1497. client.end()
  1498. client.disconnecting.should.eql(true)
  1499. })
  1500. it('should reconnect after stream disconnect', function (done) {
  1501. var client = connect()
  1502. var tryReconnect = true
  1503. client.on('connect', function () {
  1504. if (tryReconnect) {
  1505. client.stream.end()
  1506. tryReconnect = false
  1507. } else {
  1508. client.end()
  1509. done()
  1510. }
  1511. })
  1512. })
  1513. it('should emit \'reconnect\' when reconnecting', function (done) {
  1514. var client = connect()
  1515. var tryReconnect = true
  1516. var reconnectEvent = false
  1517. client.on('reconnect', function () {
  1518. reconnectEvent = true
  1519. })
  1520. client.on('connect', function () {
  1521. if (tryReconnect) {
  1522. client.stream.end()
  1523. tryReconnect = false
  1524. } else {
  1525. reconnectEvent.should.equal(true)
  1526. client.end()
  1527. done()
  1528. }
  1529. })
  1530. })
  1531. it('should emit \'offline\' after going offline', function (done) {
  1532. var client = connect()
  1533. var tryReconnect = true
  1534. var offlineEvent = false
  1535. client.on('offline', function () {
  1536. offlineEvent = true
  1537. })
  1538. client.on('connect', function () {
  1539. if (tryReconnect) {
  1540. client.stream.end()
  1541. tryReconnect = false
  1542. } else {
  1543. offlineEvent.should.equal(true)
  1544. client.end()
  1545. done()
  1546. }
  1547. })
  1548. })
  1549. it('should not reconnect if it was ended by the user', function (done) {
  1550. var client = connect()
  1551. client.on('connect', function () {
  1552. client.end()
  1553. done() // it will raise an exception if called two times
  1554. })
  1555. })
  1556. it('should setup a reconnect timer on disconnect', function (done) {
  1557. var client = connect()
  1558. client.once('connect', function () {
  1559. should.not.exist(client.reconnectTimer)
  1560. client.stream.end()
  1561. })
  1562. client.once('close', function () {
  1563. should.exist(client.reconnectTimer)
  1564. client.end()
  1565. done()
  1566. })
  1567. })
  1568. it('should allow specification of a reconnect period', function (done) {
  1569. var end
  1570. var period = 200
  1571. var client = connect({reconnectPeriod: period})
  1572. var reconnect = false
  1573. var start = Date.now()
  1574. client.on('connect', function () {
  1575. if (!reconnect) {
  1576. client.stream.end()
  1577. reconnect = true
  1578. } else {
  1579. client.end()
  1580. end = Date.now()
  1581. if (end - start >= period) {
  1582. // Connected in about 2 seconds, that's good enough
  1583. done()
  1584. } else {
  1585. done(new Error('Strange reconnect period'))
  1586. }
  1587. }
  1588. })
  1589. })
  1590. it('should always cleanup successfully on reconnection', function (done) {
  1591. var client = connect({host: 'this_hostname_should_not_exist', connectTimeout: 0, reconnectPeriod: 1})
  1592. setTimeout(client.end.bind(client, done), 50)
  1593. })
  1594. it('should resend in-flight QoS 1 publish messages from the client', function (done) {
  1595. var client = connect({reconnectPeriod: 200})
  1596. var serverPublished = false
  1597. var clientCalledBack = false
  1598. server.once('client', function (serverClient) {
  1599. serverClient.on('connect', function () {
  1600. setImmediate(function () {
  1601. serverClient.stream.destroy()
  1602. })
  1603. })
  1604. server.once('client', function (serverClientNew) {
  1605. serverClientNew.on('publish', function () {
  1606. serverPublished = true
  1607. check()
  1608. })
  1609. })
  1610. })
  1611. client.publish('hello', 'world', { qos: 1 }, function () {
  1612. clientCalledBack = true
  1613. check()
  1614. })
  1615. function check () {
  1616. if (serverPublished && clientCalledBack) {
  1617. client.end()
  1618. done()
  1619. }
  1620. }
  1621. })
  1622. it('should resend in-flight QoS 2 publish messages from the client', function (done) {
  1623. var client = connect({reconnectPeriod: 200})
  1624. var serverPublished = false
  1625. var clientCalledBack = false
  1626. server.once('client', function (serverClient) {
  1627. // ignore errors
  1628. serverClient.on('error', function () {})
  1629. serverClient.on('publish', function () {
  1630. setImmediate(function () {
  1631. serverClient.stream.destroy()
  1632. })
  1633. })
  1634. server.once('client', function (serverClientNew) {
  1635. serverClientNew.on('pubrel', function () {
  1636. serverPublished = true
  1637. check()
  1638. })
  1639. })
  1640. })
  1641. client.publish('hello', 'world', { qos: 2 }, function () {
  1642. clientCalledBack = true
  1643. check()
  1644. })
  1645. function check () {
  1646. if (serverPublished && clientCalledBack) {
  1647. client.end()
  1648. done()
  1649. }
  1650. }
  1651. })
  1652. it('should not resend in-flight QoS 1 removed publish messages from the client', function (done) {
  1653. var client = connect({reconnectPeriod: 200})
  1654. var clientCalledBack = false
  1655. server.once('client', function (serverClient) {
  1656. serverClient.on('connect', function () {
  1657. setImmediate(function () {
  1658. serverClient.stream.destroy()
  1659. })
  1660. })
  1661. server.once('client', function (serverClientNew) {
  1662. serverClientNew.on('publish', function () {
  1663. should.fail()
  1664. done()
  1665. })
  1666. })
  1667. })
  1668. client.publish('hello', 'world', { qos: 1 }, function (err) {
  1669. clientCalledBack = true
  1670. should(err.message).be.equal('Message removed')
  1671. })
  1672. should(Object.keys(client.outgoing).length).be.equal(1)
  1673. should(Object.keys(client.outgoingStore._inflights).length).be.equal(1)
  1674. client.removeOutgoingMessage(client.getLastMessageId())
  1675. should(Object.keys(client.outgoing).length).be.equal(0)
  1676. should(Object.keys(client.outgoingStore._inflights).length).be.equal(0)
  1677. clientCalledBack.should.be.true()
  1678. client.end()
  1679. done()
  1680. })
  1681. it('should not resend in-flight QoS 2 removed publish messages from the client', function (done) {
  1682. var client = connect({reconnectPeriod: 200})
  1683. var clientCalledBack = false
  1684. server.once('client', function (serverClient) {
  1685. serverClient.on('connect', function () {
  1686. setImmediate(function () {
  1687. serverClient.stream.destroy()
  1688. })
  1689. })
  1690. server.once('client', function (serverClientNew) {
  1691. serverClientNew.on('publish', function () {
  1692. should.fail()
  1693. done()
  1694. })
  1695. })
  1696. })
  1697. client.publish('hello', 'world', { qos: 2 }, function (err) {
  1698. clientCalledBack = true
  1699. should(err.message).be.equal('Message removed')
  1700. })
  1701. should(Object.keys(client.outgoing).length).be.equal(1)
  1702. should(Object.keys(client.outgoingStore._inflights).length).be.equal(1)
  1703. client.removeOutgoingMessage(client.getLastMessageId())
  1704. should(Object.keys(client.outgoing).length).be.equal(0)
  1705. should(Object.keys(client.outgoingStore._inflights).length).be.equal(0)
  1706. clientCalledBack.should.be.true()
  1707. client.end()
  1708. done()
  1709. })
  1710. it('should resubscribe when reconnecting', function (done) {
  1711. var client = connect({ reconnectPeriod: 100 })
  1712. var tryReconnect = true
  1713. var reconnectEvent = false
  1714. client.on('reconnect', function () {
  1715. reconnectEvent = true
  1716. })
  1717. client.on('connect', function () {
  1718. if (tryReconnect) {
  1719. client.subscribe('hello', function () {
  1720. client.stream.end()
  1721. server.once('client', function (serverClient) {
  1722. serverClient.on('subscribe', function () {
  1723. client.end()
  1724. done()
  1725. })
  1726. })
  1727. })
  1728. tryReconnect = false
  1729. } else {
  1730. reconnectEvent.should.equal(true)
  1731. }
  1732. })
  1733. })
  1734. it('should not resubscribe when reconnecting if resubscribe is disabled', function (done) {
  1735. var client = connect({ reconnectPeriod: 100, resubscribe: false })
  1736. var tryReconnect = true
  1737. var reconnectEvent = false
  1738. client.on('reconnect', function () {
  1739. reconnectEvent = true
  1740. })
  1741. client.on('connect', function () {
  1742. if (tryReconnect) {
  1743. client.subscribe('hello', function () {
  1744. client.stream.end()
  1745. server.once('client', function (serverClient) {
  1746. serverClient.on('subscribe', function () {
  1747. should.fail()
  1748. })
  1749. })
  1750. })
  1751. tryReconnect = false
  1752. } else {
  1753. reconnectEvent.should.equal(true)
  1754. should(Object.keys(client._resubscribeTopics).length).be.equal(0)
  1755. done()
  1756. }
  1757. })
  1758. })
  1759. it('should not resubscribe when reconnecting if suback is error', function (done) {
  1760. var tryReconnect = true
  1761. var reconnectEvent = false
  1762. var server2 = new Server(function (c) {
  1763. c.on('connect', function (packet) {
  1764. c.connack({returnCode: 0})
  1765. })
  1766. c.on('subscribe', function (packet) {
  1767. c.suback({
  1768. messageId: packet.messageId,
  1769. granted: packet.subscriptions.map(function (e) {
  1770. return e.qos | 0x80
  1771. })
  1772. })
  1773. c.pubrel({ messageId: Math.floor(Math.random() * 9000) + 1000 })
  1774. })
  1775. })
  1776. server2.listen(port + 49, function () {
  1777. var client = mqtt.connect({
  1778. port: port + 49,
  1779. host: 'localhost',
  1780. reconnectPeriod: 100
  1781. })
  1782. client.on('reconnect', function () {
  1783. reconnectEvent = true
  1784. })
  1785. client.on('connect', function () {
  1786. if (tryReconnect) {
  1787. client.subscribe('hello', function () {
  1788. client.stream.end()
  1789. server.once('client', function (serverClient) {
  1790. serverClient.on('subscribe', function () {
  1791. should.fail()
  1792. })
  1793. })
  1794. })
  1795. tryReconnect = false
  1796. } else {
  1797. reconnectEvent.should.equal(true)
  1798. should(Object.keys(client._resubscribeTopics).length).be.equal(0)
  1799. server2.close()
  1800. done()
  1801. }
  1802. })
  1803. })
  1804. })
  1805. it('should preserved incomingStore after disconnecting if clean is false', function (done) {
  1806. var reconnect = false
  1807. var client = {}
  1808. var incomingStore = new mqtt.Store({ clean: false })
  1809. var outgoingStore = new mqtt.Store({ clean: false })
  1810. var server2 = new Server(function (c) {
  1811. c.on('connect', function (packet) {
  1812. c.connack({returnCode: 0})
  1813. if (reconnect) {
  1814. c.pubrel({ messageId: 1 })
  1815. }
  1816. })
  1817. c.on('subscribe', function (packet) {
  1818. c.suback({
  1819. messageId: packet.messageId,
  1820. granted: packet.subscriptions.map(function (e) {
  1821. return e.qos
  1822. })
  1823. })
  1824. c.publish({ topic: 'topic', payload: 'payload', qos: 2, messageId: 1, retain: false })
  1825. })
  1826. c.on('pubrec', function (packet) {
  1827. client.end(false, function () {
  1828. client.reconnect({
  1829. incomingStore: incomingStore,
  1830. outgoingStore: outgoingStore
  1831. })
  1832. })
  1833. })
  1834. c.on('pubcomp', function (packet) {
  1835. client.end()
  1836. server2.close()
  1837. done()
  1838. })
  1839. })
  1840. server2.listen(port + 50, function () {
  1841. client = mqtt.connect({
  1842. port: port + 50,
  1843. host: 'localhost',
  1844. clean: false,
  1845. clientId: 'cid1',
  1846. reconnectPeriod: 0,
  1847. incomingStore: incomingStore,
  1848. outgoingStore: outgoingStore
  1849. })
  1850. client.on('connect', function () {
  1851. if (!reconnect) {
  1852. client.subscribe('test', {qos: 2}, function () {
  1853. })
  1854. reconnect = true
  1855. }
  1856. })
  1857. client.on('message', function (topic, message) {
  1858. topic.should.equal('topic')
  1859. message.toString().should.equal('payload')
  1860. })
  1861. })
  1862. })
  1863. it('should be able to pub/sub if reconnect() is called at close handler', function (done) {
  1864. var client = connect({ reconnectPeriod: 0 })
  1865. var tryReconnect = true
  1866. var reconnectEvent = false
  1867. client.on('close', function () {
  1868. if (tryReconnect) {
  1869. tryReconnect = false
  1870. client.reconnect()
  1871. } else {
  1872. reconnectEvent.should.equal(true)
  1873. done()
  1874. }
  1875. })
  1876. client.on('reconnect', function () {
  1877. reconnectEvent = true
  1878. })
  1879. client.on('connect', function () {
  1880. if (tryReconnect) {
  1881. client.end()
  1882. } else {
  1883. client.subscribe('hello', function () {
  1884. client.end()
  1885. })
  1886. }
  1887. })
  1888. })
  1889. it('should be able to pub/sub if reconnect() is called at out of close handler', function (done) {
  1890. var client = connect({ reconnectPeriod: 0 })
  1891. var tryReconnect = true
  1892. var reconnectEvent = false
  1893. client.on('close', function () {
  1894. if (tryReconnect) {
  1895. tryReconnect = false
  1896. setTimeout(function () {
  1897. client.reconnect()
  1898. }, 100)
  1899. } else {
  1900. reconnectEvent.should.equal(true)
  1901. done()
  1902. }
  1903. })
  1904. client.on('reconnect', function () {
  1905. reconnectEvent = true
  1906. })
  1907. client.on('connect', function () {
  1908. if (tryReconnect) {
  1909. client.end()
  1910. } else {
  1911. client.subscribe('hello', function () {
  1912. client.end()
  1913. })
  1914. }
  1915. })
  1916. })
  1917. context('with alternate server client', function () {
  1918. var cachedClientListeners
  1919. beforeEach(function () {
  1920. cachedClientListeners = server.listeners('client')
  1921. server.removeAllListeners('client')
  1922. })
  1923. afterEach(function () {
  1924. server.removeAllListeners('client')
  1925. cachedClientListeners.forEach(function (listener) {
  1926. server.on('client', listener)
  1927. })
  1928. })
  1929. it('should resubscribe even if disconnect is before suback', function (done) {
  1930. var client = mqtt.connect(Object.assign({ reconnectPeriod: 100 }, config))
  1931. var subscribeCount = 0
  1932. var connectCount = 0
  1933. server.on('client', function (serverClient) {
  1934. serverClient.on('connect', function () {
  1935. connectCount++
  1936. serverClient.connack({returnCode: 0})
  1937. })
  1938. serverClient.on('subscribe', function () {
  1939. subscribeCount++
  1940. // disconnect before sending the suback on the first subscribe
  1941. if (subscribeCount === 1) {
  1942. client.stream.end()
  1943. }
  1944. // after the second connection, confirm that the only two
  1945. // subscribes have taken place, then cleanup and exit
  1946. if (connectCount >= 2) {
  1947. subscribeCount.should.equal(2)
  1948. client.end(true, done)
  1949. }
  1950. })
  1951. })
  1952. client.subscribe('hello')
  1953. })
  1954. it('should resubscribe exactly once', function (done) {
  1955. var client = mqtt.connect(Object.assign({ reconnectPeriod: 100 }, config))
  1956. var subscribeCount = 0
  1957. server.on('client', function (serverClient) {
  1958. serverClient.on('connect', function () {
  1959. serverClient.connack({returnCode: 0})
  1960. })
  1961. serverClient.on('subscribe', function () {
  1962. subscribeCount++
  1963. // disconnect before sending the suback on the first subscribe
  1964. if (subscribeCount === 1) {
  1965. client.stream.end()
  1966. }
  1967. // after the second connection, only two subs
  1968. // subscribes have taken place, then cleanup and exit
  1969. if (subscribeCount === 2) {
  1970. client.end(true, done)
  1971. }
  1972. })
  1973. })
  1974. client.subscribe('hello')
  1975. })
  1976. })
  1977. })
  1978. }