wechat wss demo

store.js 2.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. 'use strict'
  2. /**
  3. * Module dependencies
  4. */
  5. var xtend = require('xtend')
  6. var Readable = require('readable-stream').Readable
  7. var streamsOpts = { objectMode: true }
  8. var defaultStoreOptions = {
  9. clean: true
  10. }
  11. /**
  12. * In-memory implementation of the message store
  13. * This can actually be saved into files.
  14. *
  15. * @param {Object} [options] - store options
  16. */
  17. function Store (options) {
  18. if (!(this instanceof Store)) {
  19. return new Store(options)
  20. }
  21. this.options = options || {}
  22. // Defaults
  23. this.options = xtend(defaultStoreOptions, options)
  24. this._inflights = {}
  25. }
  26. /**
  27. * Adds a packet to the store, a packet is
  28. * anything that has a messageId property.
  29. *
  30. */
  31. Store.prototype.put = function (packet, cb) {
  32. this._inflights[packet.messageId] = packet
  33. if (cb) {
  34. cb()
  35. }
  36. return this
  37. }
  38. /**
  39. * Creates a stream with all the packets in the store
  40. *
  41. */
  42. Store.prototype.createStream = function () {
  43. var stream = new Readable(streamsOpts)
  44. var inflights = this._inflights
  45. var ids = Object.keys(this._inflights)
  46. var destroyed = false
  47. var i = 0
  48. stream._read = function () {
  49. if (!destroyed && i < ids.length) {
  50. this.push(inflights[ids[i++]])
  51. } else {
  52. this.push(null)
  53. }
  54. }
  55. stream.destroy = function () {
  56. if (destroyed) {
  57. return
  58. }
  59. var self = this
  60. destroyed = true
  61. process.nextTick(function () {
  62. self.emit('close')
  63. })
  64. }
  65. return stream
  66. }
  67. /**
  68. * deletes a packet from the store.
  69. */
  70. Store.prototype.del = function (packet, cb) {
  71. packet = this._inflights[packet.messageId]
  72. if (packet) {
  73. delete this._inflights[packet.messageId]
  74. cb(null, packet)
  75. } else if (cb) {
  76. cb(new Error('missing packet'))
  77. }
  78. return this
  79. }
  80. /**
  81. * get a packet from the store.
  82. */
  83. Store.prototype.get = function (packet, cb) {
  84. packet = this._inflights[packet.messageId]
  85. if (packet) {
  86. cb(null, packet)
  87. } else if (cb) {
  88. cb(new Error('missing packet'))
  89. }
  90. return this
  91. }
  92. /**
  93. * Close the store
  94. */
  95. Store.prototype.close = function (cb) {
  96. if (this.options.clean) {
  97. this._inflights = null
  98. }
  99. if (cb) {
  100. cb()
  101. }
  102. }
  103. module.exports = Store