| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596 |
- 'use strict'
-
- var protocol = require('./constants')
- var Buffer = require('safe-buffer').Buffer
- var empty = Buffer.allocUnsafe(0)
- var zeroBuf = Buffer.from([0])
- var numbers = require('./numbers')
- var nextTick = require('process-nextick-args').nextTick
-
- var numCache = numbers.cache
- var generateNumber = numbers.generateNumber
- var generateCache = numbers.generateCache
- var writeNumber = writeNumberCached
- var toGenerate = true
-
- function generate (packet, stream) {
- if (stream.cork) {
- stream.cork()
- nextTick(uncork, stream)
- }
-
- if (toGenerate) {
- toGenerate = false
- generateCache()
- }
-
- switch (packet.cmd) {
- case 'connect':
- return connect(packet, stream)
- case 'connack':
- return connack(packet, stream)
- case 'publish':
- return publish(packet, stream)
- case 'puback':
- case 'pubrec':
- case 'pubrel':
- case 'pubcomp':
- case 'unsuback':
- return confirmation(packet, stream)
- case 'subscribe':
- return subscribe(packet, stream)
- case 'suback':
- return suback(packet, stream)
- case 'unsubscribe':
- return unsubscribe(packet, stream)
- case 'pingreq':
- case 'pingresp':
- case 'disconnect':
- return emptyPacket(packet, stream)
- default:
- stream.emit('error', new Error('Unknown command'))
- return false
- }
- }
- /**
- * Controls numbers cache.
- * Set to "false" to allocate buffers on-the-flight instead of pre-generated cache
- */
- Object.defineProperty(generate, 'cacheNumbers', {
- get: function () {
- return writeNumber === writeNumberCached
- },
- set: function (value) {
- if (value) {
- if (!numCache || Object.keys(numCache).length === 0) toGenerate = true
- writeNumber = writeNumberCached
- } else {
- toGenerate = false
- writeNumber = writeNumberGenerated
- }
- }
- })
-
- function uncork (stream) {
- stream.uncork()
- }
-
- function connect (opts, stream) {
- var settings = opts || {}
- var protocolId = settings.protocolId || 'MQTT'
- var protocolVersion = settings.protocolVersion || 4
- var will = settings.will
- var clean = settings.clean
- var keepalive = settings.keepalive || 0
- var clientId = settings.clientId || ''
- var username = settings.username
- var password = settings.password
-
- if (clean === undefined) clean = true
-
- var length = 0
-
- // Must be a string and non-falsy
- if (!protocolId ||
- (typeof protocolId !== 'string' && !Buffer.isBuffer(protocolId))) {
- stream.emit('error', new Error('Invalid protocolId'))
- return false
- } else length += protocolId.length + 2
-
- // Must be 3 or 4
- if (protocolVersion !== 3 && protocolVersion !== 4) {
- stream.emit('error', new Error('Invalid protocol version'))
- return false
- } else length += 1
-
- // ClientId might be omitted in 3.1.1, but only if cleanSession is set to 1
- if ((typeof clientId === 'string' || Buffer.isBuffer(clientId)) &&
- (clientId || protocolVersion === 4) && (clientId || clean)) {
- length += clientId.length + 2
- } else {
- if (protocolVersion < 4) {
- stream.emit('error', new Error('clientId must be supplied before 3.1.1'))
- return false
- }
- if ((clean * 1) === 0) {
- stream.emit('error', new Error('clientId must be given if cleanSession set to 0'))
- return false
- }
- }
-
- // Must be a two byte number
- if (typeof keepalive !== 'number' ||
- keepalive < 0 ||
- keepalive > 65535 ||
- keepalive % 1 !== 0) {
- stream.emit('error', new Error('Invalid keepalive'))
- return false
- } else length += 2
-
- // Connect flags
- length += 1
-
- // If will exists...
- if (will) {
- // It must be an object
- if (typeof will !== 'object') {
- stream.emit('error', new Error('Invalid will'))
- return false
- }
- // It must have topic typeof string
- if (!will.topic || typeof will.topic !== 'string') {
- stream.emit('error', new Error('Invalid will topic'))
- return false
- } else {
- length += Buffer.byteLength(will.topic) + 2
- }
-
- // Payload
- if (will.payload && will.payload) {
- if (will.payload.length >= 0) {
- if (typeof will.payload === 'string') {
- length += Buffer.byteLength(will.payload) + 2
- } else {
- length += will.payload.length + 2
- }
- } else {
- stream.emit('error', new Error('Invalid will payload'))
- return false
- }
- } else {
- length += 2
- }
- }
-
- // Username
- var providedUsername = false
- if (username != null) {
- if (isStringOrBuffer(username)) {
- providedUsername = true
- length += Buffer.byteLength(username) + 2
- } else {
- stream.emit('error', new Error('Invalid username'))
- return false
- }
- }
-
- // Password
- if (password != null) {
- if (!providedUsername) {
- stream.emit('error', new Error('Username is required to use password'))
- return false
- }
-
- if (isStringOrBuffer(password)) {
- length += byteLength(password) + 2
- } else {
- stream.emit('error', new Error('Invalid password'))
- return false
- }
- }
-
- // Generate header
- stream.write(protocol.CONNECT_HEADER)
-
- // Generate length
- writeLength(stream, length)
-
- // Generate protocol ID
- writeStringOrBuffer(stream, protocolId)
- stream.write(
- protocolVersion === 4 ? protocol.VERSION4 : protocol.VERSION3
- )
-
- // Connect flags
- var flags = 0
- flags |= (username != null) ? protocol.USERNAME_MASK : 0
- flags |= (password != null) ? protocol.PASSWORD_MASK : 0
- flags |= (will && will.retain) ? protocol.WILL_RETAIN_MASK : 0
- flags |= (will && will.qos) ? will.qos << protocol.WILL_QOS_SHIFT : 0
- flags |= will ? protocol.WILL_FLAG_MASK : 0
- flags |= clean ? protocol.CLEAN_SESSION_MASK : 0
-
- stream.write(Buffer.from([flags]))
-
- // Keepalive
- writeNumber(stream, keepalive)
-
- // Client ID
- writeStringOrBuffer(stream, clientId)
-
- // Will
- if (will) {
- writeString(stream, will.topic)
- writeStringOrBuffer(stream, will.payload)
- }
-
- // Username and password
- if (username != null) {
- writeStringOrBuffer(stream, username)
- }
- if (password != null) {
- writeStringOrBuffer(stream, password)
- }
- // This is a small packet that happens only once on a stream
- // We assume the stream is always free to receive more data after this
- return true
- }
-
- function connack (opts, stream) {
- var settings = opts || {}
- var rc = settings.returnCode
-
- // Check return code
- if (typeof rc !== 'number') {
- stream.emit('error', new Error('Invalid return code'))
- return false
- }
-
- stream.write(protocol.CONNACK_HEADER)
- writeLength(stream, 2)
- stream.write(opts.sessionPresent ? protocol.SESSIONPRESENT_HEADER : zeroBuf)
-
- return stream.write(Buffer.from([rc]))
- }
-
- function publish (opts, stream) {
- var settings = opts || {}
- var qos = settings.qos || 0
- var retain = settings.retain ? protocol.RETAIN_MASK : 0
- var topic = settings.topic
- var payload = settings.payload || empty
- var id = settings.messageId
-
- var length = 0
-
- // Topic must be a non-empty string or Buffer
- if (typeof topic === 'string') length += Buffer.byteLength(topic) + 2
- else if (Buffer.isBuffer(topic)) length += topic.length + 2
- else {
- stream.emit('error', new Error('Invalid topic'))
- return false
- }
-
- // Get the payload length
- if (!Buffer.isBuffer(payload)) length += Buffer.byteLength(payload)
- else length += payload.length
-
- // Message ID must a number if qos > 0
- if (qos && typeof id !== 'number') {
- stream.emit('error', new Error('Invalid messageId'))
- return false
- } else if (qos) length += 2
-
- // Header
- stream.write(protocol.PUBLISH_HEADER[qos][opts.dup ? 1 : 0][retain ? 1 : 0])
-
- // Remaining length
- writeLength(stream, length)
-
- // Topic
- writeNumber(stream, byteLength(topic))
- stream.write(topic)
-
- // Message ID
- if (qos > 0) writeNumber(stream, id)
-
- // Payload
- return stream.write(payload)
- }
-
- /* Puback, pubrec, pubrel and pubcomp */
- function confirmation (opts, stream) {
- var settings = opts || {}
- var type = settings.cmd || 'puback'
- var id = settings.messageId
- var dup = (settings.dup && type === 'pubrel') ? protocol.DUP_MASK : 0
- var qos = 0
-
- if (type === 'pubrel') qos = 1
-
- // Check message ID
- if (typeof id !== 'number') {
- stream.emit('error', new Error('Invalid messageId'))
- return false
- }
-
- // Header
- stream.write(protocol.ACKS[type][qos][dup][0])
-
- // Length
- writeLength(stream, 2)
-
- // Message ID
- return writeNumber(stream, id)
- }
-
- function subscribe (opts, stream) {
- var settings = opts || {}
- var dup = settings.dup ? protocol.DUP_MASK : 0
- var id = settings.messageId
- var subs = settings.subscriptions
-
- var length = 0
-
- // Check message ID
- if (typeof id !== 'number') {
- stream.emit('error', new Error('Invalid messageId'))
- return false
- } else length += 2
-
- // Check subscriptions
- if (typeof subs === 'object' && subs.length) {
- for (var i = 0; i < subs.length; i += 1) {
- var itopic = subs[i].topic
- var iqos = subs[i].qos
-
- if (typeof itopic !== 'string') {
- stream.emit('error', new Error('Invalid subscriptions - invalid topic'))
- return false
- }
- if (typeof iqos !== 'number') {
- stream.emit('error', new Error('Invalid subscriptions - invalid qos'))
- return false
- }
-
- length += Buffer.byteLength(itopic) + 2 + 1
- }
- } else {
- stream.emit('error', new Error('Invalid subscriptions'))
- return false
- }
-
- // Generate header
- stream.write(protocol.SUBSCRIBE_HEADER[1][dup ? 1 : 0][0])
-
- // Generate length
- writeLength(stream, length)
-
- // Generate message ID
- writeNumber(stream, id)
-
- var result = true
-
- // Generate subs
- for (var j = 0; j < subs.length; j++) {
- var sub = subs[j]
- var jtopic = sub.topic
- var jqos = sub.qos
-
- // Write topic string
- writeString(stream, jtopic)
-
- // Write qos
- result = stream.write(protocol.QOS[jqos])
- }
-
- return result
- }
-
- function suback (opts, stream) {
- var settings = opts || {}
- var id = settings.messageId
- var granted = settings.granted
-
- var length = 0
-
- // Check message ID
- if (typeof id !== 'number') {
- stream.emit('error', new Error('Invalid messageId'))
- return false
- } else length += 2
-
- // Check granted qos vector
- if (typeof granted === 'object' && granted.length) {
- for (var i = 0; i < granted.length; i += 1) {
- if (typeof granted[i] !== 'number') {
- stream.emit('error', new Error('Invalid qos vector'))
- return false
- }
- length += 1
- }
- } else {
- stream.emit('error', new Error('Invalid qos vector'))
- return false
- }
-
- // header
- stream.write(protocol.SUBACK_HEADER)
-
- // Length
- writeLength(stream, length)
-
- // Message ID
- writeNumber(stream, id)
-
- return stream.write(Buffer.from(granted))
- }
-
- function unsubscribe (opts, stream) {
- var settings = opts || {}
- var id = settings.messageId
- var dup = settings.dup ? protocol.DUP_MASK : 0
- var unsubs = settings.unsubscriptions
-
- var length = 0
-
- // Check message ID
- if (typeof id !== 'number') {
- stream.emit('error', new Error('Invalid messageId'))
- return false
- } else {
- length += 2
- }
- // Check unsubs
- if (typeof unsubs === 'object' && unsubs.length) {
- for (var i = 0; i < unsubs.length; i += 1) {
- if (typeof unsubs[i] !== 'string') {
- stream.emit('error', new Error('Invalid unsubscriptions'))
- return false
- }
- length += Buffer.byteLength(unsubs[i]) + 2
- }
- } else {
- stream.emit('error', new Error('Invalid unsubscriptions'))
- return false
- }
-
- // Header
- stream.write(protocol.UNSUBSCRIBE_HEADER[1][dup ? 1 : 0][0])
-
- // Length
- writeLength(stream, length)
-
- // Message ID
- writeNumber(stream, id)
-
- // Unsubs
- var result = true
- for (var j = 0; j < unsubs.length; j++) {
- result = writeString(stream, unsubs[j])
- }
-
- return result
- }
-
- function emptyPacket (opts, stream) {
- return stream.write(protocol.EMPTY[opts.cmd])
- }
-
- /**
- * calcLengthLength - calculate the length of the remaining
- * length field
- *
- * @api private
- */
- function calcLengthLength (length) {
- if (length >= 0 && length < 128) return 1
- else if (length >= 128 && length < 16384) return 2
- else if (length >= 16384 && length < 2097152) return 3
- else if (length >= 2097152 && length < 268435456) return 4
- else return 0
- }
-
- function genBufLength (length) {
- var digit = 0
- var pos = 0
- var buffer = Buffer.allocUnsafe(calcLengthLength(length))
-
- do {
- digit = length % 128 | 0
- length = length / 128 | 0
- if (length > 0) digit = digit | 0x80
-
- buffer.writeUInt8(digit, pos++)
- } while (length > 0)
-
- return buffer
- }
-
- /**
- * writeLength - write an MQTT style length field to the buffer
- *
- * @param <Buffer> buffer - destination
- * @param <Number> pos - offset
- * @param <Number> length - length (>0)
- * @returns <Number> number of bytes written
- *
- * @api private
- */
-
- var lengthCache = {}
- function writeLength (stream, length) {
- var buffer = lengthCache[length]
-
- if (!buffer) {
- buffer = genBufLength(length)
- if (length < 16384) lengthCache[length] = buffer
- }
-
- stream.write(buffer)
- }
-
- /**
- * writeString - write a utf8 string to the buffer
- *
- * @param <Buffer> buffer - destination
- * @param <Number> pos - offset
- * @param <String> string - string to write
- * @return <Number> number of bytes written
- *
- * @api private
- */
-
- function writeString (stream, string) {
- var strlen = Buffer.byteLength(string)
- writeNumber(stream, strlen)
-
- stream.write(string, 'utf8')
- }
-
- /**
- * writeNumber - write a two byte number to the buffer
- *
- * @param <Buffer> buffer - destination
- * @param <Number> pos - offset
- * @param <String> number - number to write
- * @return <Number> number of bytes written
- *
- * @api private
- */
- function writeNumberCached (stream, number) {
- return stream.write(numCache[number])
- }
- function writeNumberGenerated (stream, number) {
- return stream.write(generateNumber(number))
- }
-
- /**
- * writeStringOrBuffer - write a String or Buffer with the its length prefix
- *
- * @param <Buffer> buffer - destination
- * @param <Number> pos - offset
- * @param <String> toWrite - String or Buffer
- * @return <Number> number of bytes written
- */
- function writeStringOrBuffer (stream, toWrite) {
- if (typeof toWrite === 'string') {
- writeString(stream, toWrite)
- } else if (toWrite) {
- writeNumber(stream, toWrite.length)
- stream.write(toWrite)
- } else writeNumber(stream, 0)
- }
-
- function byteLength (bufOrString) {
- if (!bufOrString) return 0
- else if (bufOrString instanceof Buffer) return bufOrString.length
- else return Buffer.byteLength(bufOrString)
- }
-
- function isStringOrBuffer (field) {
- return typeof field === 'string' || field instanceof Buffer
- }
-
- module.exports = generate
|