wechat wss demo

parser.js 9.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. 'use strict'
  2. var bl = require('bl')
  3. var inherits = require('inherits')
  4. var EE = require('events').EventEmitter
  5. var Packet = require('./packet')
  6. var constants = require('./constants')
  7. function Parser () {
  8. if (!(this instanceof Parser)) return new Parser()
  9. this._states = [
  10. '_parseHeader',
  11. '_parseLength',
  12. '_parsePayload',
  13. '_newPacket'
  14. ]
  15. this._resetState()
  16. }
  17. inherits(Parser, EE)
  18. Parser.prototype._resetState = function () {
  19. this.packet = new Packet()
  20. this.error = null
  21. this._list = bl()
  22. this._stateCounter = 0
  23. }
  24. Parser.prototype.parse = function (buf) {
  25. if (this.error) this._resetState()
  26. this._list.append(buf)
  27. while ((this.packet.length !== -1 || this._list.length > 0) &&
  28. this[this._states[this._stateCounter]]() &&
  29. !this.error) {
  30. this._stateCounter++
  31. if (this._stateCounter >= this._states.length) this._stateCounter = 0
  32. }
  33. return this._list.length
  34. }
  35. Parser.prototype._parseHeader = function () {
  36. // There is at least one byte in the buffer
  37. var zero = this._list.readUInt8(0)
  38. this.packet.cmd = constants.types[zero >> constants.CMD_SHIFT]
  39. this.packet.retain = (zero & constants.RETAIN_MASK) !== 0
  40. this.packet.qos = (zero >> constants.QOS_SHIFT) & constants.QOS_MASK
  41. this.packet.dup = (zero & constants.DUP_MASK) !== 0
  42. this._list.consume(1)
  43. return true
  44. }
  45. Parser.prototype._parseLength = function () {
  46. // There is at least one byte in the list
  47. var bytes = 0
  48. var mul = 1
  49. var length = 0
  50. var result = true
  51. var current
  52. while (bytes < 5) {
  53. current = this._list.readUInt8(bytes++)
  54. length += mul * (current & constants.LENGTH_MASK)
  55. mul *= 0x80
  56. if ((current & constants.LENGTH_FIN_MASK) === 0) break
  57. if (this._list.length <= bytes) {
  58. result = false
  59. break
  60. }
  61. }
  62. if (result) {
  63. this.packet.length = length
  64. this._list.consume(bytes)
  65. }
  66. return result
  67. }
  68. Parser.prototype._parsePayload = function () {
  69. var result = false
  70. // Do we have a payload? Do we have enough data to complete the payload?
  71. // PINGs have no payload
  72. if (this.packet.length === 0 || this._list.length >= this.packet.length) {
  73. this._pos = 0
  74. switch (this.packet.cmd) {
  75. case 'connect':
  76. this._parseConnect()
  77. break
  78. case 'connack':
  79. this._parseConnack()
  80. break
  81. case 'publish':
  82. this._parsePublish()
  83. break
  84. case 'puback':
  85. case 'pubrec':
  86. case 'pubrel':
  87. case 'pubcomp':
  88. this._parseMessageId()
  89. break
  90. case 'subscribe':
  91. this._parseSubscribe()
  92. break
  93. case 'suback':
  94. this._parseSuback()
  95. break
  96. case 'unsubscribe':
  97. this._parseUnsubscribe()
  98. break
  99. case 'unsuback':
  100. this._parseUnsuback()
  101. break
  102. case 'pingreq':
  103. case 'pingresp':
  104. case 'disconnect':
  105. // These are empty, nothing to do
  106. break
  107. default:
  108. this._emitError(new Error('Not supported'))
  109. }
  110. result = true
  111. }
  112. return result
  113. }
  114. Parser.prototype._parseConnect = function () {
  115. var protocolId // Protocol ID
  116. var clientId // Client ID
  117. var topic // Will topic
  118. var payload // Will payload
  119. var password // Password
  120. var username // Username
  121. var flags = {}
  122. var packet = this.packet
  123. // Parse protocolId
  124. protocolId = this._parseString()
  125. if (protocolId === null) return this._emitError(new Error('Cannot parse protocolId'))
  126. if (protocolId !== 'MQTT' && protocolId !== 'MQIsdp') {
  127. return this._emitError(new Error('Invalid protocolId'))
  128. }
  129. packet.protocolId = protocolId
  130. // Parse constants version number
  131. if (this._pos >= this._list.length) return this._emitError(new Error('Packet too short'))
  132. packet.protocolVersion = this._list.readUInt8(this._pos)
  133. if (packet.protocolVersion !== 3 && packet.protocolVersion !== 4) {
  134. return this._emitError(new Error('Invalid protocol version'))
  135. }
  136. this._pos++
  137. if (this._pos >= this._list.length) {
  138. return this._emitError(new Error('Packet too short'))
  139. }
  140. // Parse connect flags
  141. flags.username = (this._list.readUInt8(this._pos) & constants.USERNAME_MASK)
  142. flags.password = (this._list.readUInt8(this._pos) & constants.PASSWORD_MASK)
  143. flags.will = (this._list.readUInt8(this._pos) & constants.WILL_FLAG_MASK)
  144. if (flags.will) {
  145. packet.will = {}
  146. packet.will.retain = (this._list.readUInt8(this._pos) & constants.WILL_RETAIN_MASK) !== 0
  147. packet.will.qos = (this._list.readUInt8(this._pos) &
  148. constants.WILL_QOS_MASK) >> constants.WILL_QOS_SHIFT
  149. }
  150. packet.clean = (this._list.readUInt8(this._pos) & constants.CLEAN_SESSION_MASK) !== 0
  151. this._pos++
  152. // Parse keepalive
  153. packet.keepalive = this._parseNum()
  154. if (packet.keepalive === -1) return this._emitError(new Error('Packet too short'))
  155. // Parse clientId
  156. clientId = this._parseString()
  157. if (clientId === null) return this._emitError(new Error('Packet too short'))
  158. packet.clientId = clientId
  159. if (flags.will) {
  160. // Parse will topic
  161. topic = this._parseString()
  162. if (topic === null) return this._emitError(new Error('Cannot parse will topic'))
  163. packet.will.topic = topic
  164. // Parse will payload
  165. payload = this._parseBuffer()
  166. if (payload === null) return this._emitError(new Error('Cannot parse will payload'))
  167. packet.will.payload = payload
  168. }
  169. // Parse username
  170. if (flags.username) {
  171. username = this._parseString()
  172. if (username === null) return this._emitError(new Error('Cannot parse username'))
  173. packet.username = username
  174. }
  175. // Parse password
  176. if (flags.password) {
  177. password = this._parseBuffer()
  178. if (password === null) return this._emitError(new Error('Cannot parse password'))
  179. packet.password = password
  180. }
  181. return packet
  182. }
  183. Parser.prototype._parseConnack = function () {
  184. var packet = this.packet
  185. if (this._list.length < 2) return null
  186. packet.sessionPresent = !!(this._list.readUInt8(this._pos++) & constants.SESSIONPRESENT_MASK)
  187. packet.returnCode = this._list.readUInt8(this._pos)
  188. if (packet.returnCode === -1) return this._emitError(new Error('Cannot parse return code'))
  189. }
  190. Parser.prototype._parsePublish = function () {
  191. var packet = this.packet
  192. packet.topic = this._parseString()
  193. if (packet.topic === null) return this._emitError(new Error('Cannot parse topic'))
  194. // Parse messageId
  195. if (packet.qos > 0) if (!this._parseMessageId()) { return }
  196. packet.payload = this._list.slice(this._pos, packet.length)
  197. }
  198. Parser.prototype._parseSubscribe = function () {
  199. var packet = this.packet
  200. var topic
  201. var qos
  202. if (packet.qos !== 1) {
  203. return this._emitError(new Error('Wrong subscribe header'))
  204. }
  205. packet.subscriptions = []
  206. if (!this._parseMessageId()) { return }
  207. while (this._pos < packet.length) {
  208. // Parse topic
  209. topic = this._parseString()
  210. if (topic === null) return this._emitError(new Error('Cannot parse topic'))
  211. qos = this._list.readUInt8(this._pos++)
  212. // Push pair to subscriptions
  213. packet.subscriptions.push({ topic: topic, qos: qos })
  214. }
  215. }
  216. Parser.prototype._parseSuback = function () {
  217. this.packet.granted = []
  218. if (!this._parseMessageId()) { return }
  219. // Parse granted QoSes
  220. while (this._pos < this.packet.length) {
  221. this.packet.granted.push(this._list.readUInt8(this._pos++))
  222. }
  223. }
  224. Parser.prototype._parseUnsubscribe = function () {
  225. var packet = this.packet
  226. packet.unsubscriptions = []
  227. // Parse messageId
  228. if (!this._parseMessageId()) { return }
  229. while (this._pos < packet.length) {
  230. var topic
  231. // Parse topic
  232. topic = this._parseString()
  233. if (topic === null) return this._emitError(new Error('Cannot parse topic'))
  234. // Push topic to unsubscriptions
  235. packet.unsubscriptions.push(topic)
  236. }
  237. }
  238. Parser.prototype._parseUnsuback = function () {
  239. if (!this._parseMessageId()) return this._emitError(new Error('Cannot parse messageId'))
  240. }
  241. Parser.prototype._parseMessageId = function () {
  242. var packet = this.packet
  243. packet.messageId = this._parseNum()
  244. if (packet.messageId === null) {
  245. this._emitError(new Error('Cannot parse messageId'))
  246. return false
  247. }
  248. return true
  249. }
  250. Parser.prototype._parseString = function (maybeBuffer) {
  251. var length = this._parseNum()
  252. var result
  253. var end = length + this._pos
  254. if (length === -1 || end > this._list.length || end > this.packet.length) return null
  255. result = this._list.toString('utf8', this._pos, end)
  256. this._pos += length
  257. return result
  258. }
  259. Parser.prototype._parseBuffer = function () {
  260. var length = this._parseNum()
  261. var result
  262. var end = length + this._pos
  263. if (length === -1 || end > this._list.length || end > this.packet.length) return null
  264. result = this._list.slice(this._pos, end)
  265. this._pos += length
  266. return result
  267. }
  268. Parser.prototype._parseNum = function () {
  269. if (this._list.length - this._pos < 2) return -1
  270. var result = this._list.readUInt16BE(this._pos)
  271. this._pos += 2
  272. return result
  273. }
  274. Parser.prototype._newPacket = function () {
  275. if (this.packet) {
  276. this._list.consume(this.packet.length)
  277. this.emit('packet', this.packet)
  278. }
  279. this.packet = new Packet()
  280. return true
  281. }
  282. Parser.prototype._emitError = function (err) {
  283. this.error = err
  284. this.emit('error', err)
  285. }
  286. module.exports = Parser