wechat wss demo

stream.js 3.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. 'use strict'
  2. var Transform = require('readable-stream').Transform
  3. var duplexify = require('duplexify')
  4. var WS = require('ws')
  5. var Buffer = require('safe-buffer').Buffer
  6. module.exports = WebSocketStream
  7. function buildProxy (options, socketWrite, socketEnd) {
  8. var proxy = new Transform({
  9. objectMode: options.objectMode
  10. })
  11. proxy._write = socketWrite
  12. proxy._flush = socketEnd
  13. return proxy
  14. }
  15. function WebSocketStream(target, protocols, options) {
  16. var stream, socket
  17. var isBrowser = process.title === 'browser'
  18. var isNative = !!global.WebSocket
  19. var socketWrite = isBrowser ? socketWriteBrowser : socketWriteNode
  20. if (protocols && !Array.isArray(protocols) && 'object' === typeof protocols) {
  21. // accept the "options" Object as the 2nd argument
  22. options = protocols
  23. protocols = null
  24. if (typeof options.protocol === 'string' || Array.isArray(options.protocol)) {
  25. protocols = options.protocol;
  26. }
  27. }
  28. if (!options) options = {}
  29. if (options.objectMode === undefined) {
  30. options.objectMode = !(options.binary === true || options.binary === undefined)
  31. }
  32. var proxy = buildProxy(options, socketWrite, socketEnd)
  33. if (!options.objectMode) {
  34. proxy._writev = writev
  35. }
  36. // browser only: sets the maximum socket buffer size before throttling
  37. var bufferSize = options.browserBufferSize || 1024 * 512
  38. // browser only: how long to wait when throttling
  39. var bufferTimeout = options.browserBufferTimeout || 1000
  40. // use existing WebSocket object that was passed in
  41. if (typeof target === 'object') {
  42. socket = target
  43. // otherwise make a new one
  44. } else {
  45. // special constructor treatment for native websockets in browsers, see
  46. // https://github.com/maxogden/websocket-stream/issues/82
  47. if (isNative && isBrowser) {
  48. socket = new WS(target, protocols)
  49. } else {
  50. socket = new WS(target, protocols, options)
  51. }
  52. socket.binaryType = 'arraybuffer'
  53. }
  54. // was already open when passed in
  55. if (socket.readyState === socket.OPEN) {
  56. stream = proxy
  57. } else {
  58. stream = duplexify.obj()
  59. socket.onopen = onopen
  60. }
  61. stream.socket = socket
  62. socket.onclose = onclose
  63. socket.onerror = onerror
  64. socket.onmessage = onmessage
  65. proxy.on('close', destroy)
  66. var coerceToBuffer = !options.objectMode
  67. function socketWriteNode(chunk, enc, next) {
  68. // avoid errors, this never happens unless
  69. // destroy() is called
  70. if (socket.readyState !== socket.OPEN) {
  71. next()
  72. return
  73. }
  74. if (coerceToBuffer && typeof chunk === 'string') {
  75. chunk = Buffer.from(chunk, 'utf8')
  76. }
  77. socket.send(chunk, next)
  78. }
  79. function socketWriteBrowser(chunk, enc, next) {
  80. if (socket.bufferedAmount > bufferSize) {
  81. setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next)
  82. return
  83. }
  84. if (coerceToBuffer && typeof chunk === 'string') {
  85. chunk = Buffer.from(chunk, 'utf8')
  86. }
  87. try {
  88. socket.send(chunk)
  89. } catch(err) {
  90. return next(err)
  91. }
  92. next()
  93. }
  94. function socketEnd(done) {
  95. socket.close()
  96. done()
  97. }
  98. function onopen() {
  99. stream.setReadable(proxy)
  100. stream.setWritable(proxy)
  101. stream.emit('connect')
  102. }
  103. function onclose() {
  104. stream.end()
  105. stream.destroy()
  106. }
  107. function onerror(err) {
  108. stream.destroy(err)
  109. }
  110. function onmessage(event) {
  111. var data = event.data
  112. if (data instanceof ArrayBuffer) data = Buffer.from(data)
  113. else data = Buffer.from(data, 'utf8')
  114. proxy.push(data)
  115. }
  116. function destroy() {
  117. socket.close()
  118. }
  119. // this is to be enabled only if objectMode is false
  120. function writev (chunks, cb) {
  121. var buffers = new Array(chunks.length)
  122. for (var i = 0; i < chunks.length; i++) {
  123. if (typeof chunks[i].chunk === 'string') {
  124. buffers[i] = Buffer.from(chunks[i], 'utf8')
  125. } else {
  126. buffers[i] = chunks[i].chunk
  127. }
  128. }
  129. this._write(Buffer.concat(buffers), 'binary', cb)
  130. }
  131. return stream
  132. }