wechat wss demo

client.js 14KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. 'use strict'
  2. var mqtt = require('..')
  3. var should = require('should')
  4. var fork = require('child_process').fork
  5. var path = require('path')
  6. var abstractClientTests = require('./abstract_client')
  7. var net = require('net')
  8. var eos = require('end-of-stream')
  9. var mqttPacket = require('mqtt-packet')
  10. var Buffer = require('safe-buffer').Buffer
  11. var Duplex = require('readable-stream').Duplex
  12. var Connection = require('mqtt-connection')
  13. var Server = require('./server')
  14. var port = 9876
  15. var server
  16. function connOnlyServer () {
  17. return new Server(function (client) {
  18. client.on('connect', function (packet) {
  19. client.connack({returnCode: 0})
  20. })
  21. })
  22. }
  23. /**
  24. * Test server
  25. */
  26. function buildServer () {
  27. return new Server(function (client) {
  28. client.on('connect', function (packet) {
  29. if (packet.clientId === 'invalid') {
  30. client.connack({returnCode: 2})
  31. } else {
  32. client.connack({returnCode: 0})
  33. }
  34. })
  35. client.on('publish', function (packet) {
  36. setImmediate(function () {
  37. switch (packet.qos) {
  38. case 0:
  39. break
  40. case 1:
  41. client.puback(packet)
  42. break
  43. case 2:
  44. client.pubrec(packet)
  45. break
  46. }
  47. })
  48. })
  49. client.on('pubrel', function (packet) {
  50. client.pubcomp(packet)
  51. })
  52. client.on('pubrec', function (packet) {
  53. client.pubrel(packet)
  54. })
  55. client.on('pubcomp', function () {
  56. // Nothing to be done
  57. })
  58. client.on('subscribe', function (packet) {
  59. client.suback({
  60. messageId: packet.messageId,
  61. granted: packet.subscriptions.map(function (e) {
  62. return e.qos
  63. })
  64. })
  65. })
  66. client.on('unsubscribe', function (packet) {
  67. client.unsuback(packet)
  68. })
  69. client.on('pingreq', function () {
  70. client.pingresp()
  71. })
  72. })
  73. }
  74. server = buildServer().listen(port)
  75. describe('MqttClient', function () {
  76. describe('creating', function () {
  77. it('should allow instantiation of MqttClient without the \'new\' operator', function (done) {
  78. should(function () {
  79. var client
  80. try {
  81. client = mqtt.MqttClient(function () {
  82. throw Error('break')
  83. }, {})
  84. client.end()
  85. } catch (err) {
  86. if (err.message !== 'break') {
  87. throw err
  88. }
  89. done()
  90. }
  91. }).not.throw('Object #<Object> has no method \'_setupStream\'')
  92. })
  93. })
  94. var config = { protocol: 'mqtt', port: port }
  95. abstractClientTests(server, config)
  96. describe('message ids', function () {
  97. it('should increment the message id', function () {
  98. var client = mqtt.connect(config)
  99. var currentId = client._nextId()
  100. client._nextId().should.equal(currentId + 1)
  101. client.end()
  102. })
  103. it('should return 1 once the internal counter reached limit', function () {
  104. var client = mqtt.connect(config)
  105. client.nextId = 65535
  106. client._nextId().should.equal(65535)
  107. client._nextId().should.equal(1)
  108. client.end()
  109. })
  110. it('should return 65535 for last message id once the internal counter reached limit', function () {
  111. var client = mqtt.connect(config)
  112. client.nextId = 65535
  113. client._nextId().should.equal(65535)
  114. client.getLastMessageId().should.equal(65535)
  115. client._nextId().should.equal(1)
  116. client.getLastMessageId().should.equal(1)
  117. client.end()
  118. })
  119. it('should not throw an error if packet\'s messageId is not found when receiving a pubrel packet', function (done) {
  120. var server2 = new Server(function (c) {
  121. c.on('connect', function (packet) {
  122. c.connack({returnCode: 0})
  123. c.pubrel({ messageId: Math.floor(Math.random() * 9000) + 1000 })
  124. })
  125. })
  126. server2.listen(port + 49, function () {
  127. var client = mqtt.connect({
  128. port: port + 49,
  129. host: 'localhost'
  130. })
  131. client.on('packetsend', function (packet) {
  132. if (packet.cmd === 'pubcomp') {
  133. client.end()
  134. server2.close()
  135. done()
  136. }
  137. })
  138. })
  139. })
  140. it('should not go overflow if the TCP frame contains a lot of PUBLISH packets', function (done) {
  141. var parser = mqttPacket.parser()
  142. var count = 0
  143. var max = 1000
  144. var duplex = new Duplex({
  145. read: function (n) {},
  146. write: function (chunk, enc, cb) {
  147. parser.parse(chunk)
  148. cb() // nothing to do
  149. }
  150. })
  151. var client = new mqtt.MqttClient(function () {
  152. return duplex
  153. }, {})
  154. client.on('message', function (t, p, packet) {
  155. if (++count === max) {
  156. done()
  157. }
  158. })
  159. parser.on('packet', function (packet) {
  160. var packets = []
  161. if (packet.cmd === 'connect') {
  162. duplex.push(mqttPacket.generate({
  163. cmd: 'connack',
  164. sessionPresent: false,
  165. returnCode: 0
  166. }))
  167. for (var i = 0; i < max; i++) {
  168. packets.push(mqttPacket.generate({
  169. cmd: 'publish',
  170. topic: Buffer.from('hello'),
  171. payload: Buffer.from('world'),
  172. retain: false,
  173. dup: false,
  174. messageId: i + 1,
  175. qos: 1
  176. }))
  177. }
  178. duplex.push(Buffer.concat(packets))
  179. }
  180. })
  181. })
  182. })
  183. describe('flushing', function () {
  184. it('should attempt to complete pending unsub and send on ping timeout', function (done) {
  185. this.timeout(10000)
  186. var server3 = connOnlyServer().listen(port + 72)
  187. var pubCallbackCalled = false
  188. var unsubscribeCallbackCalled = false
  189. var client = mqtt.connect({
  190. port: port + 72,
  191. host: 'localhost',
  192. keepalive: 1,
  193. connectTimeout: 350,
  194. reconnectPeriod: 0
  195. })
  196. client.once('connect', () => {
  197. client.publish('fakeTopic', 'fakeMessage', {qos: 1}, (err, result) => {
  198. should.exist(err)
  199. pubCallbackCalled = true
  200. })
  201. client.unsubscribe('fakeTopic', (err, result) => {
  202. should.exist(err)
  203. unsubscribeCallbackCalled = true
  204. })
  205. setTimeout(() => {
  206. client.end(() => {
  207. should.equal(pubCallbackCalled && unsubscribeCallbackCalled, true, 'callbacks not invoked')
  208. server3.close()
  209. done()
  210. })
  211. }, 5000)
  212. })
  213. })
  214. })
  215. describe('reconnecting', function () {
  216. it('should attempt to reconnect once server is down', function (done) {
  217. this.timeout(15000)
  218. var innerServer = fork(path.join(__dirname, 'helpers', 'server_process.js'))
  219. var client = mqtt.connect({ port: 3000, host: 'localhost', keepalive: 1 })
  220. client.once('connect', function () {
  221. innerServer.kill('SIGINT') // mocks server shutdown
  222. client.once('close', function () {
  223. should.exist(client.reconnectTimer)
  224. client.end()
  225. done()
  226. })
  227. })
  228. })
  229. it('should reconnect to multiple host-ports combination if servers is passed', function (done) {
  230. this.timeout(15000)
  231. var server2 = buildServer().listen(port + 42)
  232. server2.on('client', function (c) {
  233. c.stream.destroy()
  234. server2.close()
  235. })
  236. server2.on('listening', function () {
  237. var client = mqtt.connect({
  238. servers: [
  239. { port: port + 42, host: 'localhost' },
  240. { port: port, host: 'localhost' }
  241. ],
  242. keepalive: 50
  243. })
  244. server.once('client', function () {
  245. client.end()
  246. done()
  247. })
  248. client.once('connect', function () {
  249. client.stream.destroy()
  250. })
  251. })
  252. })
  253. it('should reconnect if a connack is not received in an interval', function (done) {
  254. this.timeout(2000)
  255. var server2 = net.createServer().listen(port + 43)
  256. server2.on('connection', function (c) {
  257. eos(c, function () {
  258. server2.close()
  259. })
  260. })
  261. server2.on('listening', function () {
  262. var client = mqtt.connect({
  263. servers: [
  264. { port: port + 43, host: 'localhost_fake' },
  265. { port: port, host: 'localhost' }
  266. ],
  267. connectTimeout: 500
  268. })
  269. server.once('client', function () {
  270. client.end()
  271. done()
  272. })
  273. client.once('connect', function () {
  274. client.stream.destroy()
  275. })
  276. })
  277. })
  278. it('should not be cleared by the connack timer', function (done) {
  279. this.timeout(4000)
  280. var server2 = net.createServer().listen(port + 44)
  281. server2.on('connection', function (c) {
  282. c.destroy()
  283. })
  284. server2.once('listening', function () {
  285. var reconnects = 0
  286. var connectTimeout = 1000
  287. var reconnectPeriod = 100
  288. var expectedReconnects = Math.floor(connectTimeout / reconnectPeriod)
  289. var client = mqtt.connect({
  290. port: port + 44,
  291. host: 'localhost',
  292. connectTimeout: connectTimeout,
  293. reconnectPeriod: reconnectPeriod
  294. })
  295. client.on('reconnect', function () {
  296. reconnects++
  297. if (reconnects >= expectedReconnects) {
  298. client.end()
  299. done()
  300. }
  301. })
  302. })
  303. })
  304. it('should not keep requeueing the first message when offline', function (done) {
  305. this.timeout(2500)
  306. var server2 = buildServer().listen(port + 45)
  307. var client = mqtt.connect({
  308. port: port + 45,
  309. host: 'localhost',
  310. connectTimeout: 350,
  311. reconnectPeriod: 300
  312. })
  313. server2.on('client', function (c) {
  314. client.publish('hello', 'world', { qos: 1 }, function () {
  315. c.destroy()
  316. server2.close()
  317. client.publish('hello', 'world', { qos: 1 })
  318. })
  319. })
  320. setTimeout(function () {
  321. if (client.queue.length === 0) {
  322. client.end(true)
  323. done()
  324. } else {
  325. client.end(true)
  326. }
  327. }, 2000)
  328. })
  329. it('should not send the same subscribe multiple times on a flaky connection', function (done) {
  330. this.timeout(3500)
  331. var KILL_COUNT = 4
  332. var killedConnections = 0
  333. var subIds = {}
  334. var client = mqtt.connect({
  335. port: port + 46,
  336. host: 'localhost',
  337. connectTimeout: 350,
  338. reconnectPeriod: 300
  339. })
  340. var server2 = new Server(function (client) {
  341. client.on('error', function () {})
  342. client.on('connect', function (packet) {
  343. if (packet.clientId === 'invalid') {
  344. client.connack({returnCode: 2})
  345. } else {
  346. client.connack({returnCode: 0})
  347. }
  348. })
  349. }).listen(port + 46)
  350. server2.on('client', function (c) {
  351. client.subscribe('topic', function () {
  352. done()
  353. client.end()
  354. c.destroy()
  355. server2.close()
  356. })
  357. c.on('subscribe', function (packet) {
  358. if (killedConnections < KILL_COUNT) {
  359. // Kill the first few sub attempts to simulate a flaky connection
  360. killedConnections++
  361. c.destroy()
  362. } else {
  363. // Keep track of acks
  364. if (!subIds[packet.messageId]) {
  365. subIds[packet.messageId] = 0
  366. }
  367. subIds[packet.messageId]++
  368. if (subIds[packet.messageId] > 1) {
  369. done(new Error('Multiple duplicate acked subscriptions received for messageId ' + packet.messageId))
  370. client.end(true)
  371. c.destroy()
  372. server2.destroy()
  373. }
  374. c.suback({
  375. messageId: packet.messageId,
  376. granted: packet.subscriptions.map(function (e) {
  377. return e.qos
  378. })
  379. })
  380. }
  381. })
  382. })
  383. })
  384. it('should not fill the queue of subscribes if it cannot connect', function (done) {
  385. this.timeout(2500)
  386. var port2 = port + 48
  387. var server2 = net.createServer(function (stream) {
  388. var client = new Connection(stream)
  389. client.on('error', function () {})
  390. client.on('connect', function (packet) {
  391. client.connack({returnCode: 0})
  392. client.destroy()
  393. })
  394. })
  395. server2.listen(port2, function () {
  396. var client = mqtt.connect({
  397. port: port2,
  398. host: 'localhost',
  399. connectTimeout: 350,
  400. reconnectPeriod: 300
  401. })
  402. client.subscribe('hello')
  403. setTimeout(function () {
  404. client.queue.length.should.equal(1)
  405. client.end()
  406. done()
  407. }, 1000)
  408. })
  409. })
  410. it('should not send the same publish multiple times on a flaky connection', function (done) {
  411. this.timeout(3500)
  412. var KILL_COUNT = 4
  413. var killedConnections = 0
  414. var pubIds = {}
  415. var client = mqtt.connect({
  416. port: port + 47,
  417. host: 'localhost',
  418. connectTimeout: 350,
  419. reconnectPeriod: 300
  420. })
  421. var server2 = net.createServer(function (stream) {
  422. var client = new Connection(stream)
  423. client.on('error', function () {})
  424. client.on('connect', function (packet) {
  425. if (packet.clientId === 'invalid') {
  426. client.connack({returnCode: 2})
  427. } else {
  428. client.connack({returnCode: 0})
  429. }
  430. })
  431. this.emit('client', client)
  432. }).listen(port + 47)
  433. server2.on('client', function (c) {
  434. client.publish('topic', 'data', { qos: 1 }, function () {
  435. done()
  436. client.end()
  437. c.destroy()
  438. server2.destroy()
  439. })
  440. c.on('publish', function onPublish (packet) {
  441. if (killedConnections < KILL_COUNT) {
  442. // Kill the first few pub attempts to simulate a flaky connection
  443. killedConnections++
  444. c.destroy()
  445. // to avoid receiving inflight messages
  446. c.removeListener('publish', onPublish)
  447. } else {
  448. // Keep track of acks
  449. if (!pubIds[packet.messageId]) {
  450. pubIds[packet.messageId] = 0
  451. }
  452. pubIds[packet.messageId]++
  453. if (pubIds[packet.messageId] > 1) {
  454. done(new Error('Multiple duplicate acked publishes received for messageId ' + packet.messageId))
  455. client.end(true)
  456. c.destroy()
  457. server2.destroy()
  458. }
  459. c.puback(packet)
  460. }
  461. })
  462. })
  463. })
  464. })
  465. })