| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376 |
- 'use strict'
-
- var bl = require('bl')
- var inherits = require('inherits')
- var EE = require('events').EventEmitter
- var Packet = require('./packet')
- var constants = require('./constants')
-
- function Parser () {
- if (!(this instanceof Parser)) return new Parser()
-
- this._states = [
- '_parseHeader',
- '_parseLength',
- '_parsePayload',
- '_newPacket'
- ]
-
- this._resetState()
- }
-
- inherits(Parser, EE)
-
- Parser.prototype._resetState = function () {
- this.packet = new Packet()
- this.error = null
- this._list = bl()
- this._stateCounter = 0
- }
-
- Parser.prototype.parse = function (buf) {
- if (this.error) this._resetState()
-
- this._list.append(buf)
-
- while ((this.packet.length !== -1 || this._list.length > 0) &&
- this[this._states[this._stateCounter]]() &&
- !this.error) {
- this._stateCounter++
-
- if (this._stateCounter >= this._states.length) this._stateCounter = 0
- }
-
- return this._list.length
- }
-
- Parser.prototype._parseHeader = function () {
- // There is at least one byte in the buffer
- var zero = this._list.readUInt8(0)
- this.packet.cmd = constants.types[zero >> constants.CMD_SHIFT]
- this.packet.retain = (zero & constants.RETAIN_MASK) !== 0
- this.packet.qos = (zero >> constants.QOS_SHIFT) & constants.QOS_MASK
- this.packet.dup = (zero & constants.DUP_MASK) !== 0
-
- this._list.consume(1)
-
- return true
- }
-
- Parser.prototype._parseLength = function () {
- // There is at least one byte in the list
- var bytes = 0
- var mul = 1
- var length = 0
- var result = true
- var current
-
- while (bytes < 5) {
- current = this._list.readUInt8(bytes++)
- length += mul * (current & constants.LENGTH_MASK)
- mul *= 0x80
-
- if ((current & constants.LENGTH_FIN_MASK) === 0) break
- if (this._list.length <= bytes) {
- result = false
- break
- }
- }
-
- if (result) {
- this.packet.length = length
- this._list.consume(bytes)
- }
-
- return result
- }
-
- Parser.prototype._parsePayload = function () {
- var result = false
-
- // Do we have a payload? Do we have enough data to complete the payload?
- // PINGs have no payload
- if (this.packet.length === 0 || this._list.length >= this.packet.length) {
- this._pos = 0
-
- switch (this.packet.cmd) {
- case 'connect':
- this._parseConnect()
- break
- case 'connack':
- this._parseConnack()
- break
- case 'publish':
- this._parsePublish()
- break
- case 'puback':
- case 'pubrec':
- case 'pubrel':
- case 'pubcomp':
- this._parseMessageId()
- break
- case 'subscribe':
- this._parseSubscribe()
- break
- case 'suback':
- this._parseSuback()
- break
- case 'unsubscribe':
- this._parseUnsubscribe()
- break
- case 'unsuback':
- this._parseUnsuback()
- break
- case 'pingreq':
- case 'pingresp':
- case 'disconnect':
- // These are empty, nothing to do
- break
- default:
- this._emitError(new Error('Not supported'))
- }
-
- result = true
- }
-
- return result
- }
-
- Parser.prototype._parseConnect = function () {
- var protocolId // Protocol ID
- var clientId // Client ID
- var topic // Will topic
- var payload // Will payload
- var password // Password
- var username // Username
- var flags = {}
- var packet = this.packet
-
- // Parse protocolId
- protocolId = this._parseString()
-
- if (protocolId === null) return this._emitError(new Error('Cannot parse protocolId'))
- if (protocolId !== 'MQTT' && protocolId !== 'MQIsdp') {
- return this._emitError(new Error('Invalid protocolId'))
- }
-
- packet.protocolId = protocolId
-
- // Parse constants version number
- if (this._pos >= this._list.length) return this._emitError(new Error('Packet too short'))
-
- packet.protocolVersion = this._list.readUInt8(this._pos)
-
- if (packet.protocolVersion !== 3 && packet.protocolVersion !== 4) {
- return this._emitError(new Error('Invalid protocol version'))
- }
-
- this._pos++
-
- if (this._pos >= this._list.length) {
- return this._emitError(new Error('Packet too short'))
- }
-
- // Parse connect flags
- flags.username = (this._list.readUInt8(this._pos) & constants.USERNAME_MASK)
- flags.password = (this._list.readUInt8(this._pos) & constants.PASSWORD_MASK)
- flags.will = (this._list.readUInt8(this._pos) & constants.WILL_FLAG_MASK)
-
- if (flags.will) {
- packet.will = {}
- packet.will.retain = (this._list.readUInt8(this._pos) & constants.WILL_RETAIN_MASK) !== 0
- packet.will.qos = (this._list.readUInt8(this._pos) &
- constants.WILL_QOS_MASK) >> constants.WILL_QOS_SHIFT
- }
-
- packet.clean = (this._list.readUInt8(this._pos) & constants.CLEAN_SESSION_MASK) !== 0
- this._pos++
-
- // Parse keepalive
- packet.keepalive = this._parseNum()
- if (packet.keepalive === -1) return this._emitError(new Error('Packet too short'))
-
- // Parse clientId
- clientId = this._parseString()
- if (clientId === null) return this._emitError(new Error('Packet too short'))
- packet.clientId = clientId
-
- if (flags.will) {
- // Parse will topic
- topic = this._parseString()
- if (topic === null) return this._emitError(new Error('Cannot parse will topic'))
- packet.will.topic = topic
-
- // Parse will payload
- payload = this._parseBuffer()
- if (payload === null) return this._emitError(new Error('Cannot parse will payload'))
- packet.will.payload = payload
- }
-
- // Parse username
- if (flags.username) {
- username = this._parseString()
- if (username === null) return this._emitError(new Error('Cannot parse username'))
- packet.username = username
- }
-
- // Parse password
- if (flags.password) {
- password = this._parseBuffer()
- if (password === null) return this._emitError(new Error('Cannot parse password'))
- packet.password = password
- }
-
- return packet
- }
-
- Parser.prototype._parseConnack = function () {
- var packet = this.packet
-
- if (this._list.length < 2) return null
-
- packet.sessionPresent = !!(this._list.readUInt8(this._pos++) & constants.SESSIONPRESENT_MASK)
- packet.returnCode = this._list.readUInt8(this._pos)
-
- if (packet.returnCode === -1) return this._emitError(new Error('Cannot parse return code'))
- }
-
- Parser.prototype._parsePublish = function () {
- var packet = this.packet
- packet.topic = this._parseString()
-
- if (packet.topic === null) return this._emitError(new Error('Cannot parse topic'))
-
- // Parse messageId
- if (packet.qos > 0) if (!this._parseMessageId()) { return }
-
- packet.payload = this._list.slice(this._pos, packet.length)
- }
-
- Parser.prototype._parseSubscribe = function () {
- var packet = this.packet
- var topic
- var qos
-
- if (packet.qos !== 1) {
- return this._emitError(new Error('Wrong subscribe header'))
- }
-
- packet.subscriptions = []
-
- if (!this._parseMessageId()) { return }
-
- while (this._pos < packet.length) {
- // Parse topic
- topic = this._parseString()
- if (topic === null) return this._emitError(new Error('Cannot parse topic'))
-
- qos = this._list.readUInt8(this._pos++)
-
- // Push pair to subscriptions
- packet.subscriptions.push({ topic: topic, qos: qos })
- }
- }
-
- Parser.prototype._parseSuback = function () {
- this.packet.granted = []
-
- if (!this._parseMessageId()) { return }
-
- // Parse granted QoSes
- while (this._pos < this.packet.length) {
- this.packet.granted.push(this._list.readUInt8(this._pos++))
- }
- }
-
- Parser.prototype._parseUnsubscribe = function () {
- var packet = this.packet
-
- packet.unsubscriptions = []
-
- // Parse messageId
- if (!this._parseMessageId()) { return }
-
- while (this._pos < packet.length) {
- var topic
-
- // Parse topic
- topic = this._parseString()
- if (topic === null) return this._emitError(new Error('Cannot parse topic'))
-
- // Push topic to unsubscriptions
- packet.unsubscriptions.push(topic)
- }
- }
-
- Parser.prototype._parseUnsuback = function () {
- if (!this._parseMessageId()) return this._emitError(new Error('Cannot parse messageId'))
- }
-
- Parser.prototype._parseMessageId = function () {
- var packet = this.packet
-
- packet.messageId = this._parseNum()
-
- if (packet.messageId === null) {
- this._emitError(new Error('Cannot parse messageId'))
- return false
- }
-
- return true
- }
-
- Parser.prototype._parseString = function (maybeBuffer) {
- var length = this._parseNum()
- var result
- var end = length + this._pos
-
- if (length === -1 || end > this._list.length || end > this.packet.length) return null
-
- result = this._list.toString('utf8', this._pos, end)
- this._pos += length
-
- return result
- }
-
- Parser.prototype._parseBuffer = function () {
- var length = this._parseNum()
- var result
- var end = length + this._pos
-
- if (length === -1 || end > this._list.length || end > this.packet.length) return null
-
- result = this._list.slice(this._pos, end)
-
- this._pos += length
-
- return result
- }
-
- Parser.prototype._parseNum = function () {
- if (this._list.length - this._pos < 2) return -1
-
- var result = this._list.readUInt16BE(this._pos)
- this._pos += 2
-
- return result
- }
-
- Parser.prototype._newPacket = function () {
- if (this.packet) {
- this._list.consume(this.packet.length)
- this.emit('packet', this.packet)
- }
-
- this.packet = new Packet()
-
- return true
- }
-
- Parser.prototype._emitError = function (err) {
- this.error = err
- this.emit('error', err)
- }
-
- module.exports = Parser
|