| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- 'use strict'
-
- /**
- * Module dependencies
- */
- var xtend = require('xtend')
-
- var Readable = require('readable-stream').Readable
- var streamsOpts = { objectMode: true }
- var defaultStoreOptions = {
- clean: true
- }
-
- /**
- * In-memory implementation of the message store
- * This can actually be saved into files.
- *
- * @param {Object} [options] - store options
- */
- function Store (options) {
- if (!(this instanceof Store)) {
- return new Store(options)
- }
-
- this.options = options || {}
-
- // Defaults
- this.options = xtend(defaultStoreOptions, options)
-
- this._inflights = {}
- }
-
- /**
- * Adds a packet to the store, a packet is
- * anything that has a messageId property.
- *
- */
- Store.prototype.put = function (packet, cb) {
- this._inflights[packet.messageId] = packet
-
- if (cb) {
- cb()
- }
-
- return this
- }
-
- /**
- * Creates a stream with all the packets in the store
- *
- */
- Store.prototype.createStream = function () {
- var stream = new Readable(streamsOpts)
- var inflights = this._inflights
- var ids = Object.keys(this._inflights)
- var destroyed = false
- var i = 0
-
- stream._read = function () {
- if (!destroyed && i < ids.length) {
- this.push(inflights[ids[i++]])
- } else {
- this.push(null)
- }
- }
-
- stream.destroy = function () {
- if (destroyed) {
- return
- }
-
- var self = this
-
- destroyed = true
-
- process.nextTick(function () {
- self.emit('close')
- })
- }
-
- return stream
- }
-
- /**
- * deletes a packet from the store.
- */
- Store.prototype.del = function (packet, cb) {
- packet = this._inflights[packet.messageId]
- if (packet) {
- delete this._inflights[packet.messageId]
- cb(null, packet)
- } else if (cb) {
- cb(new Error('missing packet'))
- }
-
- return this
- }
-
- /**
- * get a packet from the store.
- */
- Store.prototype.get = function (packet, cb) {
- packet = this._inflights[packet.messageId]
- if (packet) {
- cb(null, packet)
- } else if (cb) {
- cb(new Error('missing packet'))
- }
-
- return this
- }
-
- /**
- * Close the store
- */
- Store.prototype.close = function (cb) {
- if (this.options.clean) {
- this._inflights = null
- }
- if (cb) {
- cb()
- }
- }
-
- module.exports = Store
|