wechat wss demo

writeToStream.js 14KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596
  1. 'use strict'
  2. var protocol = require('./constants')
  3. var Buffer = require('safe-buffer').Buffer
  4. var empty = Buffer.allocUnsafe(0)
  5. var zeroBuf = Buffer.from([0])
  6. var numbers = require('./numbers')
  7. var nextTick = require('process-nextick-args').nextTick
  8. var numCache = numbers.cache
  9. var generateNumber = numbers.generateNumber
  10. var generateCache = numbers.generateCache
  11. var writeNumber = writeNumberCached
  12. var toGenerate = true
  13. function generate (packet, stream) {
  14. if (stream.cork) {
  15. stream.cork()
  16. nextTick(uncork, stream)
  17. }
  18. if (toGenerate) {
  19. toGenerate = false
  20. generateCache()
  21. }
  22. switch (packet.cmd) {
  23. case 'connect':
  24. return connect(packet, stream)
  25. case 'connack':
  26. return connack(packet, stream)
  27. case 'publish':
  28. return publish(packet, stream)
  29. case 'puback':
  30. case 'pubrec':
  31. case 'pubrel':
  32. case 'pubcomp':
  33. case 'unsuback':
  34. return confirmation(packet, stream)
  35. case 'subscribe':
  36. return subscribe(packet, stream)
  37. case 'suback':
  38. return suback(packet, stream)
  39. case 'unsubscribe':
  40. return unsubscribe(packet, stream)
  41. case 'pingreq':
  42. case 'pingresp':
  43. case 'disconnect':
  44. return emptyPacket(packet, stream)
  45. default:
  46. stream.emit('error', new Error('Unknown command'))
  47. return false
  48. }
  49. }
  50. /**
  51. * Controls numbers cache.
  52. * Set to "false" to allocate buffers on-the-flight instead of pre-generated cache
  53. */
  54. Object.defineProperty(generate, 'cacheNumbers', {
  55. get: function () {
  56. return writeNumber === writeNumberCached
  57. },
  58. set: function (value) {
  59. if (value) {
  60. if (!numCache || Object.keys(numCache).length === 0) toGenerate = true
  61. writeNumber = writeNumberCached
  62. } else {
  63. toGenerate = false
  64. writeNumber = writeNumberGenerated
  65. }
  66. }
  67. })
  68. function uncork (stream) {
  69. stream.uncork()
  70. }
  71. function connect (opts, stream) {
  72. var settings = opts || {}
  73. var protocolId = settings.protocolId || 'MQTT'
  74. var protocolVersion = settings.protocolVersion || 4
  75. var will = settings.will
  76. var clean = settings.clean
  77. var keepalive = settings.keepalive || 0
  78. var clientId = settings.clientId || ''
  79. var username = settings.username
  80. var password = settings.password
  81. if (clean === undefined) clean = true
  82. var length = 0
  83. // Must be a string and non-falsy
  84. if (!protocolId ||
  85. (typeof protocolId !== 'string' && !Buffer.isBuffer(protocolId))) {
  86. stream.emit('error', new Error('Invalid protocolId'))
  87. return false
  88. } else length += protocolId.length + 2
  89. // Must be 3 or 4
  90. if (protocolVersion !== 3 && protocolVersion !== 4) {
  91. stream.emit('error', new Error('Invalid protocol version'))
  92. return false
  93. } else length += 1
  94. // ClientId might be omitted in 3.1.1, but only if cleanSession is set to 1
  95. if ((typeof clientId === 'string' || Buffer.isBuffer(clientId)) &&
  96. (clientId || protocolVersion === 4) && (clientId || clean)) {
  97. length += clientId.length + 2
  98. } else {
  99. if (protocolVersion < 4) {
  100. stream.emit('error', new Error('clientId must be supplied before 3.1.1'))
  101. return false
  102. }
  103. if ((clean * 1) === 0) {
  104. stream.emit('error', new Error('clientId must be given if cleanSession set to 0'))
  105. return false
  106. }
  107. }
  108. // Must be a two byte number
  109. if (typeof keepalive !== 'number' ||
  110. keepalive < 0 ||
  111. keepalive > 65535 ||
  112. keepalive % 1 !== 0) {
  113. stream.emit('error', new Error('Invalid keepalive'))
  114. return false
  115. } else length += 2
  116. // Connect flags
  117. length += 1
  118. // If will exists...
  119. if (will) {
  120. // It must be an object
  121. if (typeof will !== 'object') {
  122. stream.emit('error', new Error('Invalid will'))
  123. return false
  124. }
  125. // It must have topic typeof string
  126. if (!will.topic || typeof will.topic !== 'string') {
  127. stream.emit('error', new Error('Invalid will topic'))
  128. return false
  129. } else {
  130. length += Buffer.byteLength(will.topic) + 2
  131. }
  132. // Payload
  133. if (will.payload && will.payload) {
  134. if (will.payload.length >= 0) {
  135. if (typeof will.payload === 'string') {
  136. length += Buffer.byteLength(will.payload) + 2
  137. } else {
  138. length += will.payload.length + 2
  139. }
  140. } else {
  141. stream.emit('error', new Error('Invalid will payload'))
  142. return false
  143. }
  144. } else {
  145. length += 2
  146. }
  147. }
  148. // Username
  149. var providedUsername = false
  150. if (username != null) {
  151. if (isStringOrBuffer(username)) {
  152. providedUsername = true
  153. length += Buffer.byteLength(username) + 2
  154. } else {
  155. stream.emit('error', new Error('Invalid username'))
  156. return false
  157. }
  158. }
  159. // Password
  160. if (password != null) {
  161. if (!providedUsername) {
  162. stream.emit('error', new Error('Username is required to use password'))
  163. return false
  164. }
  165. if (isStringOrBuffer(password)) {
  166. length += byteLength(password) + 2
  167. } else {
  168. stream.emit('error', new Error('Invalid password'))
  169. return false
  170. }
  171. }
  172. // Generate header
  173. stream.write(protocol.CONNECT_HEADER)
  174. // Generate length
  175. writeLength(stream, length)
  176. // Generate protocol ID
  177. writeStringOrBuffer(stream, protocolId)
  178. stream.write(
  179. protocolVersion === 4 ? protocol.VERSION4 : protocol.VERSION3
  180. )
  181. // Connect flags
  182. var flags = 0
  183. flags |= (username != null) ? protocol.USERNAME_MASK : 0
  184. flags |= (password != null) ? protocol.PASSWORD_MASK : 0
  185. flags |= (will && will.retain) ? protocol.WILL_RETAIN_MASK : 0
  186. flags |= (will && will.qos) ? will.qos << protocol.WILL_QOS_SHIFT : 0
  187. flags |= will ? protocol.WILL_FLAG_MASK : 0
  188. flags |= clean ? protocol.CLEAN_SESSION_MASK : 0
  189. stream.write(Buffer.from([flags]))
  190. // Keepalive
  191. writeNumber(stream, keepalive)
  192. // Client ID
  193. writeStringOrBuffer(stream, clientId)
  194. // Will
  195. if (will) {
  196. writeString(stream, will.topic)
  197. writeStringOrBuffer(stream, will.payload)
  198. }
  199. // Username and password
  200. if (username != null) {
  201. writeStringOrBuffer(stream, username)
  202. }
  203. if (password != null) {
  204. writeStringOrBuffer(stream, password)
  205. }
  206. // This is a small packet that happens only once on a stream
  207. // We assume the stream is always free to receive more data after this
  208. return true
  209. }
  210. function connack (opts, stream) {
  211. var settings = opts || {}
  212. var rc = settings.returnCode
  213. // Check return code
  214. if (typeof rc !== 'number') {
  215. stream.emit('error', new Error('Invalid return code'))
  216. return false
  217. }
  218. stream.write(protocol.CONNACK_HEADER)
  219. writeLength(stream, 2)
  220. stream.write(opts.sessionPresent ? protocol.SESSIONPRESENT_HEADER : zeroBuf)
  221. return stream.write(Buffer.from([rc]))
  222. }
  223. function publish (opts, stream) {
  224. var settings = opts || {}
  225. var qos = settings.qos || 0
  226. var retain = settings.retain ? protocol.RETAIN_MASK : 0
  227. var topic = settings.topic
  228. var payload = settings.payload || empty
  229. var id = settings.messageId
  230. var length = 0
  231. // Topic must be a non-empty string or Buffer
  232. if (typeof topic === 'string') length += Buffer.byteLength(topic) + 2
  233. else if (Buffer.isBuffer(topic)) length += topic.length + 2
  234. else {
  235. stream.emit('error', new Error('Invalid topic'))
  236. return false
  237. }
  238. // Get the payload length
  239. if (!Buffer.isBuffer(payload)) length += Buffer.byteLength(payload)
  240. else length += payload.length
  241. // Message ID must a number if qos > 0
  242. if (qos && typeof id !== 'number') {
  243. stream.emit('error', new Error('Invalid messageId'))
  244. return false
  245. } else if (qos) length += 2
  246. // Header
  247. stream.write(protocol.PUBLISH_HEADER[qos][opts.dup ? 1 : 0][retain ? 1 : 0])
  248. // Remaining length
  249. writeLength(stream, length)
  250. // Topic
  251. writeNumber(stream, byteLength(topic))
  252. stream.write(topic)
  253. // Message ID
  254. if (qos > 0) writeNumber(stream, id)
  255. // Payload
  256. return stream.write(payload)
  257. }
  258. /* Puback, pubrec, pubrel and pubcomp */
  259. function confirmation (opts, stream) {
  260. var settings = opts || {}
  261. var type = settings.cmd || 'puback'
  262. var id = settings.messageId
  263. var dup = (settings.dup && type === 'pubrel') ? protocol.DUP_MASK : 0
  264. var qos = 0
  265. if (type === 'pubrel') qos = 1
  266. // Check message ID
  267. if (typeof id !== 'number') {
  268. stream.emit('error', new Error('Invalid messageId'))
  269. return false
  270. }
  271. // Header
  272. stream.write(protocol.ACKS[type][qos][dup][0])
  273. // Length
  274. writeLength(stream, 2)
  275. // Message ID
  276. return writeNumber(stream, id)
  277. }
  278. function subscribe (opts, stream) {
  279. var settings = opts || {}
  280. var dup = settings.dup ? protocol.DUP_MASK : 0
  281. var id = settings.messageId
  282. var subs = settings.subscriptions
  283. var length = 0
  284. // Check message ID
  285. if (typeof id !== 'number') {
  286. stream.emit('error', new Error('Invalid messageId'))
  287. return false
  288. } else length += 2
  289. // Check subscriptions
  290. if (typeof subs === 'object' && subs.length) {
  291. for (var i = 0; i < subs.length; i += 1) {
  292. var itopic = subs[i].topic
  293. var iqos = subs[i].qos
  294. if (typeof itopic !== 'string') {
  295. stream.emit('error', new Error('Invalid subscriptions - invalid topic'))
  296. return false
  297. }
  298. if (typeof iqos !== 'number') {
  299. stream.emit('error', new Error('Invalid subscriptions - invalid qos'))
  300. return false
  301. }
  302. length += Buffer.byteLength(itopic) + 2 + 1
  303. }
  304. } else {
  305. stream.emit('error', new Error('Invalid subscriptions'))
  306. return false
  307. }
  308. // Generate header
  309. stream.write(protocol.SUBSCRIBE_HEADER[1][dup ? 1 : 0][0])
  310. // Generate length
  311. writeLength(stream, length)
  312. // Generate message ID
  313. writeNumber(stream, id)
  314. var result = true
  315. // Generate subs
  316. for (var j = 0; j < subs.length; j++) {
  317. var sub = subs[j]
  318. var jtopic = sub.topic
  319. var jqos = sub.qos
  320. // Write topic string
  321. writeString(stream, jtopic)
  322. // Write qos
  323. result = stream.write(protocol.QOS[jqos])
  324. }
  325. return result
  326. }
  327. function suback (opts, stream) {
  328. var settings = opts || {}
  329. var id = settings.messageId
  330. var granted = settings.granted
  331. var length = 0
  332. // Check message ID
  333. if (typeof id !== 'number') {
  334. stream.emit('error', new Error('Invalid messageId'))
  335. return false
  336. } else length += 2
  337. // Check granted qos vector
  338. if (typeof granted === 'object' && granted.length) {
  339. for (var i = 0; i < granted.length; i += 1) {
  340. if (typeof granted[i] !== 'number') {
  341. stream.emit('error', new Error('Invalid qos vector'))
  342. return false
  343. }
  344. length += 1
  345. }
  346. } else {
  347. stream.emit('error', new Error('Invalid qos vector'))
  348. return false
  349. }
  350. // header
  351. stream.write(protocol.SUBACK_HEADER)
  352. // Length
  353. writeLength(stream, length)
  354. // Message ID
  355. writeNumber(stream, id)
  356. return stream.write(Buffer.from(granted))
  357. }
  358. function unsubscribe (opts, stream) {
  359. var settings = opts || {}
  360. var id = settings.messageId
  361. var dup = settings.dup ? protocol.DUP_MASK : 0
  362. var unsubs = settings.unsubscriptions
  363. var length = 0
  364. // Check message ID
  365. if (typeof id !== 'number') {
  366. stream.emit('error', new Error('Invalid messageId'))
  367. return false
  368. } else {
  369. length += 2
  370. }
  371. // Check unsubs
  372. if (typeof unsubs === 'object' && unsubs.length) {
  373. for (var i = 0; i < unsubs.length; i += 1) {
  374. if (typeof unsubs[i] !== 'string') {
  375. stream.emit('error', new Error('Invalid unsubscriptions'))
  376. return false
  377. }
  378. length += Buffer.byteLength(unsubs[i]) + 2
  379. }
  380. } else {
  381. stream.emit('error', new Error('Invalid unsubscriptions'))
  382. return false
  383. }
  384. // Header
  385. stream.write(protocol.UNSUBSCRIBE_HEADER[1][dup ? 1 : 0][0])
  386. // Length
  387. writeLength(stream, length)
  388. // Message ID
  389. writeNumber(stream, id)
  390. // Unsubs
  391. var result = true
  392. for (var j = 0; j < unsubs.length; j++) {
  393. result = writeString(stream, unsubs[j])
  394. }
  395. return result
  396. }
  397. function emptyPacket (opts, stream) {
  398. return stream.write(protocol.EMPTY[opts.cmd])
  399. }
  400. /**
  401. * calcLengthLength - calculate the length of the remaining
  402. * length field
  403. *
  404. * @api private
  405. */
  406. function calcLengthLength (length) {
  407. if (length >= 0 && length < 128) return 1
  408. else if (length >= 128 && length < 16384) return 2
  409. else if (length >= 16384 && length < 2097152) return 3
  410. else if (length >= 2097152 && length < 268435456) return 4
  411. else return 0
  412. }
  413. function genBufLength (length) {
  414. var digit = 0
  415. var pos = 0
  416. var buffer = Buffer.allocUnsafe(calcLengthLength(length))
  417. do {
  418. digit = length % 128 | 0
  419. length = length / 128 | 0
  420. if (length > 0) digit = digit | 0x80
  421. buffer.writeUInt8(digit, pos++)
  422. } while (length > 0)
  423. return buffer
  424. }
  425. /**
  426. * writeLength - write an MQTT style length field to the buffer
  427. *
  428. * @param <Buffer> buffer - destination
  429. * @param <Number> pos - offset
  430. * @param <Number> length - length (>0)
  431. * @returns <Number> number of bytes written
  432. *
  433. * @api private
  434. */
  435. var lengthCache = {}
  436. function writeLength (stream, length) {
  437. var buffer = lengthCache[length]
  438. if (!buffer) {
  439. buffer = genBufLength(length)
  440. if (length < 16384) lengthCache[length] = buffer
  441. }
  442. stream.write(buffer)
  443. }
  444. /**
  445. * writeString - write a utf8 string to the buffer
  446. *
  447. * @param <Buffer> buffer - destination
  448. * @param <Number> pos - offset
  449. * @param <String> string - string to write
  450. * @return <Number> number of bytes written
  451. *
  452. * @api private
  453. */
  454. function writeString (stream, string) {
  455. var strlen = Buffer.byteLength(string)
  456. writeNumber(stream, strlen)
  457. stream.write(string, 'utf8')
  458. }
  459. /**
  460. * writeNumber - write a two byte number to the buffer
  461. *
  462. * @param <Buffer> buffer - destination
  463. * @param <Number> pos - offset
  464. * @param <String> number - number to write
  465. * @return <Number> number of bytes written
  466. *
  467. * @api private
  468. */
  469. function writeNumberCached (stream, number) {
  470. return stream.write(numCache[number])
  471. }
  472. function writeNumberGenerated (stream, number) {
  473. return stream.write(generateNumber(number))
  474. }
  475. /**
  476. * writeStringOrBuffer - write a String or Buffer with the its length prefix
  477. *
  478. * @param <Buffer> buffer - destination
  479. * @param <Number> pos - offset
  480. * @param <String> toWrite - String or Buffer
  481. * @return <Number> number of bytes written
  482. */
  483. function writeStringOrBuffer (stream, toWrite) {
  484. if (typeof toWrite === 'string') {
  485. writeString(stream, toWrite)
  486. } else if (toWrite) {
  487. writeNumber(stream, toWrite.length)
  488. stream.write(toWrite)
  489. } else writeNumber(stream, 0)
  490. }
  491. function byteLength (bufOrString) {
  492. if (!bufOrString) return 0
  493. else if (bufOrString instanceof Buffer) return bufOrString.length
  494. else return Buffer.byteLength(bufOrString)
  495. }
  496. function isStringOrBuffer (field) {
  497. return typeof field === 'string' || field instanceof Buffer
  498. }
  499. module.exports = generate