websocket.js 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895
  1. 'use strict';
  2. const EventEmitter = require('events');
  3. const crypto = require('crypto');
  4. const https = require('https');
  5. const http = require('http');
  6. const net = require('net');
  7. const tls = require('tls');
  8. const url = require('url');
  9. const PerMessageDeflate = require('./permessage-deflate');
  10. const EventTarget = require('./event-target');
  11. const extension = require('./extension');
  12. const Receiver = require('./receiver');
  13. const Sender = require('./sender');
  14. const {
  15. BINARY_TYPES,
  16. EMPTY_BUFFER,
  17. GUID,
  18. kStatusCode,
  19. kWebSocket,
  20. NOOP
  21. } = require('./constants');
  22. const readyStates = ['CONNECTING', 'OPEN', 'CLOSING', 'CLOSED'];
  23. const protocolVersions = [8, 13];
  24. const closeTimeout = 30 * 1000;
  25. /**
  26. * Class representing a WebSocket.
  27. *
  28. * @extends EventEmitter
  29. */
  30. class WebSocket extends EventEmitter {
  31. /**
  32. * Create a new `WebSocket`.
  33. *
  34. * @param {(String|url.Url|url.URL)} address The URL to which to connect
  35. * @param {(String|String[])} protocols The subprotocols
  36. * @param {Object} options Connection options
  37. */
  38. constructor(address, protocols, options) {
  39. super();
  40. this.readyState = WebSocket.CONNECTING;
  41. this.protocol = '';
  42. this._binaryType = BINARY_TYPES[0];
  43. this._closeFrameReceived = false;
  44. this._closeFrameSent = false;
  45. this._closeMessage = '';
  46. this._closeTimer = null;
  47. this._closeCode = 1006;
  48. this._extensions = {};
  49. this._receiver = null;
  50. this._sender = null;
  51. this._socket = null;
  52. if (address !== null) {
  53. this._isServer = false;
  54. this._redirects = 0;
  55. if (Array.isArray(protocols)) {
  56. protocols = protocols.join(', ');
  57. } else if (typeof protocols === 'object' && protocols !== null) {
  58. options = protocols;
  59. protocols = undefined;
  60. }
  61. initAsClient(this, address, protocols, options);
  62. } else {
  63. this._isServer = true;
  64. }
  65. }
  66. get CONNECTING() {
  67. return WebSocket.CONNECTING;
  68. }
  69. get CLOSING() {
  70. return WebSocket.CLOSING;
  71. }
  72. get CLOSED() {
  73. return WebSocket.CLOSED;
  74. }
  75. get OPEN() {
  76. return WebSocket.OPEN;
  77. }
  78. /**
  79. * This deviates from the WHATWG interface since ws doesn't support the
  80. * required default "blob" type (instead we define a custom "nodebuffer"
  81. * type).
  82. *
  83. * @type {String}
  84. */
  85. get binaryType() {
  86. return this._binaryType;
  87. }
  88. set binaryType(type) {
  89. if (!BINARY_TYPES.includes(type)) return;
  90. this._binaryType = type;
  91. //
  92. // Allow to change `binaryType` on the fly.
  93. //
  94. if (this._receiver) this._receiver._binaryType = type;
  95. }
  96. /**
  97. * @type {Number}
  98. */
  99. get bufferedAmount() {
  100. if (!this._socket) return 0;
  101. //
  102. // `socket.bufferSize` is `undefined` if the socket is closed.
  103. //
  104. return (this._socket.bufferSize || 0) + this._sender._bufferedBytes;
  105. }
  106. /**
  107. * @type {String}
  108. */
  109. get extensions() {
  110. return Object.keys(this._extensions).join();
  111. }
  112. /**
  113. * Set up the socket and the internal resources.
  114. *
  115. * @param {net.Socket} socket The network socket between the server and client
  116. * @param {Buffer} head The first packet of the upgraded stream
  117. * @param {Number} maxPayload The maximum allowed message size
  118. * @private
  119. */
  120. setSocket(socket, head, maxPayload) {
  121. const receiver = new Receiver(
  122. this._binaryType,
  123. this._extensions,
  124. maxPayload
  125. );
  126. this._sender = new Sender(socket, this._extensions);
  127. this._receiver = receiver;
  128. this._socket = socket;
  129. receiver[kWebSocket] = this;
  130. socket[kWebSocket] = this;
  131. receiver.on('conclude', receiverOnConclude);
  132. receiver.on('drain', receiverOnDrain);
  133. receiver.on('error', receiverOnError);
  134. receiver.on('message', receiverOnMessage);
  135. receiver.on('ping', receiverOnPing);
  136. receiver.on('pong', receiverOnPong);
  137. socket.setTimeout(0);
  138. socket.setNoDelay();
  139. if (head.length > 0) socket.unshift(head);
  140. socket.on('close', socketOnClose);
  141. socket.on('data', socketOnData);
  142. socket.on('end', socketOnEnd);
  143. socket.on('error', socketOnError);
  144. this.readyState = WebSocket.OPEN;
  145. this.emit('open');
  146. }
  147. /**
  148. * Emit the `'close'` event.
  149. *
  150. * @private
  151. */
  152. emitClose() {
  153. this.readyState = WebSocket.CLOSED;
  154. if (!this._socket) {
  155. this.emit('close', this._closeCode, this._closeMessage);
  156. return;
  157. }
  158. if (this._extensions[PerMessageDeflate.extensionName]) {
  159. this._extensions[PerMessageDeflate.extensionName].cleanup();
  160. }
  161. this._receiver.removeAllListeners();
  162. this.emit('close', this._closeCode, this._closeMessage);
  163. }
  164. /**
  165. * Start a closing handshake.
  166. *
  167. * +----------+ +-----------+ +----------+
  168. * - - -|ws.close()|-->|close frame|-->|ws.close()|- - -
  169. * | +----------+ +-----------+ +----------+ |
  170. * +----------+ +-----------+ |
  171. * CLOSING |ws.close()|<--|close frame|<--+-----+ CLOSING
  172. * +----------+ +-----------+ |
  173. * | | | +---+ |
  174. * +------------------------+-->|fin| - - - -
  175. * | +---+ | +---+
  176. * - - - - -|fin|<---------------------+
  177. * +---+
  178. *
  179. * @param {Number} code Status code explaining why the connection is closing
  180. * @param {String} data A string explaining why the connection is closing
  181. * @public
  182. */
  183. close(code, data) {
  184. if (this.readyState === WebSocket.CLOSED) return;
  185. if (this.readyState === WebSocket.CONNECTING) {
  186. const msg = 'WebSocket was closed before the connection was established';
  187. return abortHandshake(this, this._req, msg);
  188. }
  189. if (this.readyState === WebSocket.CLOSING) {
  190. if (this._closeFrameSent && this._closeFrameReceived) this._socket.end();
  191. return;
  192. }
  193. this.readyState = WebSocket.CLOSING;
  194. this._sender.close(code, data, !this._isServer, (err) => {
  195. //
  196. // This error is handled by the `'error'` listener on the socket. We only
  197. // want to know if the close frame has been sent here.
  198. //
  199. if (err) return;
  200. this._closeFrameSent = true;
  201. if (this._closeFrameReceived) this._socket.end();
  202. });
  203. //
  204. // Specify a timeout for the closing handshake to complete.
  205. //
  206. this._closeTimer = setTimeout(
  207. this._socket.destroy.bind(this._socket),
  208. closeTimeout
  209. );
  210. }
  211. /**
  212. * Send a ping.
  213. *
  214. * @param {*} data The data to send
  215. * @param {Boolean} mask Indicates whether or not to mask `data`
  216. * @param {Function} cb Callback which is executed when the ping is sent
  217. * @public
  218. */
  219. ping(data, mask, cb) {
  220. if (typeof data === 'function') {
  221. cb = data;
  222. data = mask = undefined;
  223. } else if (typeof mask === 'function') {
  224. cb = mask;
  225. mask = undefined;
  226. }
  227. if (this.readyState !== WebSocket.OPEN) {
  228. const err = new Error(
  229. `WebSocket is not open: readyState ${this.readyState} ` +
  230. `(${readyStates[this.readyState]})`
  231. );
  232. if (cb) return cb(err);
  233. throw err;
  234. }
  235. if (typeof data === 'number') data = data.toString();
  236. if (mask === undefined) mask = !this._isServer;
  237. this._sender.ping(data || EMPTY_BUFFER, mask, cb);
  238. }
  239. /**
  240. * Send a pong.
  241. *
  242. * @param {*} data The data to send
  243. * @param {Boolean} mask Indicates whether or not to mask `data`
  244. * @param {Function} cb Callback which is executed when the pong is sent
  245. * @public
  246. */
  247. pong(data, mask, cb) {
  248. if (typeof data === 'function') {
  249. cb = data;
  250. data = mask = undefined;
  251. } else if (typeof mask === 'function') {
  252. cb = mask;
  253. mask = undefined;
  254. }
  255. if (this.readyState !== WebSocket.OPEN) {
  256. const err = new Error(
  257. `WebSocket is not open: readyState ${this.readyState} ` +
  258. `(${readyStates[this.readyState]})`
  259. );
  260. if (cb) return cb(err);
  261. throw err;
  262. }
  263. if (typeof data === 'number') data = data.toString();
  264. if (mask === undefined) mask = !this._isServer;
  265. this._sender.pong(data || EMPTY_BUFFER, mask, cb);
  266. }
  267. /**
  268. * Send a data message.
  269. *
  270. * @param {*} data The message to send
  271. * @param {Object} options Options object
  272. * @param {Boolean} options.compress Specifies whether or not to compress `data`
  273. * @param {Boolean} options.binary Specifies whether `data` is binary or text
  274. * @param {Boolean} options.fin Specifies whether the fragment is the last one
  275. * @param {Boolean} options.mask Specifies whether or not to mask `data`
  276. * @param {Function} cb Callback which is executed when data is written out
  277. * @public
  278. */
  279. send(data, options, cb) {
  280. if (typeof options === 'function') {
  281. cb = options;
  282. options = {};
  283. }
  284. if (this.readyState !== WebSocket.OPEN) {
  285. const err = new Error(
  286. `WebSocket is not open: readyState ${this.readyState} ` +
  287. `(${readyStates[this.readyState]})`
  288. );
  289. if (cb) return cb(err);
  290. throw err;
  291. }
  292. if (typeof data === 'number') data = data.toString();
  293. const opts = Object.assign(
  294. {
  295. binary: typeof data !== 'string',
  296. mask: !this._isServer,
  297. compress: true,
  298. fin: true
  299. },
  300. options
  301. );
  302. if (!this._extensions[PerMessageDeflate.extensionName]) {
  303. opts.compress = false;
  304. }
  305. this._sender.send(data || EMPTY_BUFFER, opts, cb);
  306. }
  307. /**
  308. * Forcibly close the connection.
  309. *
  310. * @public
  311. */
  312. terminate() {
  313. if (this.readyState === WebSocket.CLOSED) return;
  314. if (this.readyState === WebSocket.CONNECTING) {
  315. const msg = 'WebSocket was closed before the connection was established';
  316. return abortHandshake(this, this._req, msg);
  317. }
  318. if (this._socket) {
  319. this.readyState = WebSocket.CLOSING;
  320. this._socket.destroy();
  321. }
  322. }
  323. }
  324. readyStates.forEach((readyState, i) => {
  325. WebSocket[readyState] = i;
  326. });
  327. //
  328. // Add the `onopen`, `onerror`, `onclose`, and `onmessage` attributes.
  329. // See https://html.spec.whatwg.org/multipage/comms.html#the-websocket-interface
  330. //
  331. ['open', 'error', 'close', 'message'].forEach((method) => {
  332. Object.defineProperty(WebSocket.prototype, `on${method}`, {
  333. /**
  334. * Return the listener of the event.
  335. *
  336. * @return {(Function|undefined)} The event listener or `undefined`
  337. * @public
  338. */
  339. get() {
  340. const listeners = this.listeners(method);
  341. for (var i = 0; i < listeners.length; i++) {
  342. if (listeners[i]._listener) return listeners[i]._listener;
  343. }
  344. return undefined;
  345. },
  346. /**
  347. * Add a listener for the event.
  348. *
  349. * @param {Function} listener The listener to add
  350. * @public
  351. */
  352. set(listener) {
  353. const listeners = this.listeners(method);
  354. for (var i = 0; i < listeners.length; i++) {
  355. //
  356. // Remove only the listeners added via `addEventListener`.
  357. //
  358. if (listeners[i]._listener) this.removeListener(method, listeners[i]);
  359. }
  360. this.addEventListener(method, listener);
  361. }
  362. });
  363. });
  364. WebSocket.prototype.addEventListener = EventTarget.addEventListener;
  365. WebSocket.prototype.removeEventListener = EventTarget.removeEventListener;
  366. module.exports = WebSocket;
  367. /**
  368. * Initialize a WebSocket client.
  369. *
  370. * @param {WebSocket} websocket The client to initialize
  371. * @param {(String|url.Url|url.URL)} address The URL to which to connect
  372. * @param {String} protocols The subprotocols
  373. * @param {Object} options Connection options
  374. * @param {(Boolean|Object)} options.perMessageDeflate Enable/disable
  375. * permessage-deflate
  376. * @param {Number} options.handshakeTimeout Timeout in milliseconds for the
  377. * handshake request
  378. * @param {Number} options.protocolVersion Value of the `Sec-WebSocket-Version`
  379. * header
  380. * @param {String} options.origin Value of the `Origin` or
  381. * `Sec-WebSocket-Origin` header
  382. * @param {Number} options.maxPayload The maximum allowed message size
  383. * @param {Boolean} options.followRedirects Whether or not to follow redirects
  384. * @param {Number} options.maxRedirects The maximum number of redirects allowed
  385. * @private
  386. */
  387. function initAsClient(websocket, address, protocols, options) {
  388. const opts = Object.assign(
  389. {
  390. protocolVersion: protocolVersions[1],
  391. maxPayload: 100 * 1024 * 1024,
  392. perMessageDeflate: true,
  393. followRedirects: false,
  394. maxRedirects: 10
  395. },
  396. options,
  397. {
  398. createConnection: undefined,
  399. socketPath: undefined,
  400. hostname: undefined,
  401. protocol: undefined,
  402. timeout: undefined,
  403. method: undefined,
  404. auth: undefined,
  405. host: undefined,
  406. path: undefined,
  407. port: undefined
  408. }
  409. );
  410. if (!protocolVersions.includes(opts.protocolVersion)) {
  411. throw new RangeError(
  412. `Unsupported protocol version: ${opts.protocolVersion} ` +
  413. `(supported versions: ${protocolVersions.join(', ')})`
  414. );
  415. }
  416. var parsedUrl;
  417. if (typeof address === 'object' && address.href !== undefined) {
  418. parsedUrl = address;
  419. websocket.url = address.href;
  420. } else {
  421. //
  422. // The WHATWG URL constructor is not available on Node.js < 6.13.0
  423. //
  424. parsedUrl = url.URL ? new url.URL(address) : url.parse(address);
  425. websocket.url = address;
  426. }
  427. const isUnixSocket = parsedUrl.protocol === 'ws+unix:';
  428. if (!parsedUrl.host && (!isUnixSocket || !parsedUrl.pathname)) {
  429. throw new Error(`Invalid URL: ${websocket.url}`);
  430. }
  431. const isSecure =
  432. parsedUrl.protocol === 'wss:' || parsedUrl.protocol === 'https:';
  433. const defaultPort = isSecure ? 443 : 80;
  434. const key = crypto.randomBytes(16).toString('base64');
  435. const get = isSecure ? https.get : http.get;
  436. const path = parsedUrl.search
  437. ? `${parsedUrl.pathname || '/'}${parsedUrl.search}`
  438. : parsedUrl.pathname || '/';
  439. var perMessageDeflate;
  440. opts.createConnection = isSecure ? tlsConnect : netConnect;
  441. opts.defaultPort = opts.defaultPort || defaultPort;
  442. opts.port = parsedUrl.port || defaultPort;
  443. opts.host = parsedUrl.hostname.startsWith('[')
  444. ? parsedUrl.hostname.slice(1, -1)
  445. : parsedUrl.hostname;
  446. opts.headers = Object.assign(
  447. {
  448. 'Sec-WebSocket-Version': opts.protocolVersion,
  449. 'Sec-WebSocket-Key': key,
  450. Connection: 'Upgrade',
  451. Upgrade: 'websocket'
  452. },
  453. opts.headers
  454. );
  455. opts.path = path;
  456. opts.timeout = opts.handshakeTimeout;
  457. if (opts.perMessageDeflate) {
  458. perMessageDeflate = new PerMessageDeflate(
  459. opts.perMessageDeflate !== true ? opts.perMessageDeflate : {},
  460. false,
  461. opts.maxPayload
  462. );
  463. opts.headers['Sec-WebSocket-Extensions'] = extension.format({
  464. [PerMessageDeflate.extensionName]: perMessageDeflate.offer()
  465. });
  466. }
  467. if (protocols) {
  468. opts.headers['Sec-WebSocket-Protocol'] = protocols;
  469. }
  470. if (opts.origin) {
  471. if (opts.protocolVersion < 13) {
  472. opts.headers['Sec-WebSocket-Origin'] = opts.origin;
  473. } else {
  474. opts.headers.Origin = opts.origin;
  475. }
  476. }
  477. if (parsedUrl.auth) {
  478. opts.auth = parsedUrl.auth;
  479. } else if (parsedUrl.username || parsedUrl.password) {
  480. opts.auth = `${parsedUrl.username}:${parsedUrl.password}`;
  481. }
  482. if (isUnixSocket) {
  483. const parts = path.split(':');
  484. opts.socketPath = parts[0];
  485. opts.path = parts[1];
  486. }
  487. var req = (websocket._req = get(opts));
  488. if (opts.timeout) {
  489. req.on('timeout', () => {
  490. abortHandshake(websocket, req, 'Opening handshake has timed out');
  491. });
  492. }
  493. req.on('error', (err) => {
  494. if (websocket._req.aborted) return;
  495. req = websocket._req = null;
  496. websocket.readyState = WebSocket.CLOSING;
  497. websocket.emit('error', err);
  498. websocket.emitClose();
  499. });
  500. req.on('response', (res) => {
  501. const location = res.headers.location;
  502. const statusCode = res.statusCode;
  503. if (
  504. location &&
  505. opts.followRedirects &&
  506. statusCode >= 300 &&
  507. statusCode < 400
  508. ) {
  509. if (++websocket._redirects > opts.maxRedirects) {
  510. abortHandshake(websocket, req, 'Maximum redirects exceeded');
  511. return;
  512. }
  513. req.abort();
  514. const addr = url.URL
  515. ? new url.URL(location, address)
  516. : url.resolve(address, location);
  517. initAsClient(websocket, addr, protocols, options);
  518. } else if (!websocket.emit('unexpected-response', req, res)) {
  519. abortHandshake(
  520. websocket,
  521. req,
  522. `Unexpected server response: ${res.statusCode}`
  523. );
  524. }
  525. });
  526. req.on('upgrade', (res, socket, head) => {
  527. websocket.emit('upgrade', res);
  528. //
  529. // The user may have closed the connection from a listener of the `upgrade`
  530. // event.
  531. //
  532. if (websocket.readyState !== WebSocket.CONNECTING) return;
  533. req = websocket._req = null;
  534. const digest = crypto
  535. .createHash('sha1')
  536. .update(key + GUID)
  537. .digest('base64');
  538. if (res.headers['sec-websocket-accept'] !== digest) {
  539. abortHandshake(websocket, socket, 'Invalid Sec-WebSocket-Accept header');
  540. return;
  541. }
  542. const serverProt = res.headers['sec-websocket-protocol'];
  543. const protList = (protocols || '').split(/, */);
  544. var protError;
  545. if (!protocols && serverProt) {
  546. protError = 'Server sent a subprotocol but none was requested';
  547. } else if (protocols && !serverProt) {
  548. protError = 'Server sent no subprotocol';
  549. } else if (serverProt && !protList.includes(serverProt)) {
  550. protError = 'Server sent an invalid subprotocol';
  551. }
  552. if (protError) {
  553. abortHandshake(websocket, socket, protError);
  554. return;
  555. }
  556. if (serverProt) websocket.protocol = serverProt;
  557. if (perMessageDeflate) {
  558. try {
  559. const extensions = extension.parse(
  560. res.headers['sec-websocket-extensions']
  561. );
  562. if (extensions[PerMessageDeflate.extensionName]) {
  563. perMessageDeflate.accept(extensions[PerMessageDeflate.extensionName]);
  564. websocket._extensions[
  565. PerMessageDeflate.extensionName
  566. ] = perMessageDeflate;
  567. }
  568. } catch (err) {
  569. abortHandshake(
  570. websocket,
  571. socket,
  572. 'Invalid Sec-WebSocket-Extensions header'
  573. );
  574. return;
  575. }
  576. }
  577. websocket.setSocket(socket, head, opts.maxPayload);
  578. });
  579. }
  580. /**
  581. * Create a `net.Socket` and initiate a connection.
  582. *
  583. * @param {Object} options Connection options
  584. * @return {net.Socket} The newly created socket used to start the connection
  585. * @private
  586. */
  587. function netConnect(options) {
  588. //
  589. // Override `options.path` only if `options` is a copy of the original options
  590. // object. This is always true on Node.js >= 8 but not on Node.js 6 where
  591. // `options.socketPath` might be `undefined` even if the `socketPath` option
  592. // was originally set.
  593. //
  594. if (options.protocolVersion) options.path = options.socketPath;
  595. return net.connect(options);
  596. }
  597. /**
  598. * Create a `tls.TLSSocket` and initiate a connection.
  599. *
  600. * @param {Object} options Connection options
  601. * @return {tls.TLSSocket} The newly created socket used to start the connection
  602. * @private
  603. */
  604. function tlsConnect(options) {
  605. options.path = undefined;
  606. options.servername = options.servername || options.host;
  607. return tls.connect(options);
  608. }
  609. /**
  610. * Abort the handshake and emit an error.
  611. *
  612. * @param {WebSocket} websocket The WebSocket instance
  613. * @param {(http.ClientRequest|net.Socket)} stream The request to abort or the
  614. * socket to destroy
  615. * @param {String} message The error message
  616. * @private
  617. */
  618. function abortHandshake(websocket, stream, message) {
  619. websocket.readyState = WebSocket.CLOSING;
  620. const err = new Error(message);
  621. Error.captureStackTrace(err, abortHandshake);
  622. if (stream.setHeader) {
  623. stream.abort();
  624. stream.once('abort', websocket.emitClose.bind(websocket));
  625. websocket.emit('error', err);
  626. } else {
  627. stream.destroy(err);
  628. stream.once('error', websocket.emit.bind(websocket, 'error'));
  629. stream.once('close', websocket.emitClose.bind(websocket));
  630. }
  631. }
  632. /**
  633. * The listener of the `Receiver` `'conclude'` event.
  634. *
  635. * @param {Number} code The status code
  636. * @param {String} reason The reason for closing
  637. * @private
  638. */
  639. function receiverOnConclude(code, reason) {
  640. const websocket = this[kWebSocket];
  641. websocket._socket.removeListener('data', socketOnData);
  642. websocket._socket.resume();
  643. websocket._closeFrameReceived = true;
  644. websocket._closeMessage = reason;
  645. websocket._closeCode = code;
  646. if (code === 1005) websocket.close();
  647. else websocket.close(code, reason);
  648. }
  649. /**
  650. * The listener of the `Receiver` `'drain'` event.
  651. *
  652. * @private
  653. */
  654. function receiverOnDrain() {
  655. this[kWebSocket]._socket.resume();
  656. }
  657. /**
  658. * The listener of the `Receiver` `'error'` event.
  659. *
  660. * @param {(RangeError|Error)} err The emitted error
  661. * @private
  662. */
  663. function receiverOnError(err) {
  664. const websocket = this[kWebSocket];
  665. websocket._socket.removeListener('data', socketOnData);
  666. websocket.readyState = WebSocket.CLOSING;
  667. websocket._closeCode = err[kStatusCode];
  668. websocket.emit('error', err);
  669. websocket._socket.destroy();
  670. }
  671. /**
  672. * The listener of the `Receiver` `'finish'` event.
  673. *
  674. * @private
  675. */
  676. function receiverOnFinish() {
  677. this[kWebSocket].emitClose();
  678. }
  679. /**
  680. * The listener of the `Receiver` `'message'` event.
  681. *
  682. * @param {(String|Buffer|ArrayBuffer|Buffer[])} data The message
  683. * @private
  684. */
  685. function receiverOnMessage(data) {
  686. this[kWebSocket].emit('message', data);
  687. }
  688. /**
  689. * The listener of the `Receiver` `'ping'` event.
  690. *
  691. * @param {Buffer} data The data included in the ping frame
  692. * @private
  693. */
  694. function receiverOnPing(data) {
  695. const websocket = this[kWebSocket];
  696. websocket.pong(data, !websocket._isServer, NOOP);
  697. websocket.emit('ping', data);
  698. }
  699. /**
  700. * The listener of the `Receiver` `'pong'` event.
  701. *
  702. * @param {Buffer} data The data included in the pong frame
  703. * @private
  704. */
  705. function receiverOnPong(data) {
  706. this[kWebSocket].emit('pong', data);
  707. }
  708. /**
  709. * The listener of the `net.Socket` `'close'` event.
  710. *
  711. * @private
  712. */
  713. function socketOnClose() {
  714. const websocket = this[kWebSocket];
  715. this.removeListener('close', socketOnClose);
  716. this.removeListener('end', socketOnEnd);
  717. websocket.readyState = WebSocket.CLOSING;
  718. //
  719. // The close frame might not have been received or the `'end'` event emitted,
  720. // for example, if the socket was destroyed due to an error. Ensure that the
  721. // `receiver` stream is closed after writing any remaining buffered data to
  722. // it. If the readable side of the socket is in flowing mode then there is no
  723. // buffered data as everything has been already written and `readable.read()`
  724. // will return `null`. If instead, the socket is paused, any possible buffered
  725. // data will be read as a single chunk and emitted synchronously in a single
  726. // `'data'` event.
  727. //
  728. websocket._socket.read();
  729. websocket._receiver.end();
  730. this.removeListener('data', socketOnData);
  731. this[kWebSocket] = undefined;
  732. clearTimeout(websocket._closeTimer);
  733. if (
  734. websocket._receiver._writableState.finished ||
  735. websocket._receiver._writableState.errorEmitted
  736. ) {
  737. websocket.emitClose();
  738. } else {
  739. websocket._receiver.on('error', receiverOnFinish);
  740. websocket._receiver.on('finish', receiverOnFinish);
  741. }
  742. }
  743. /**
  744. * The listener of the `net.Socket` `'data'` event.
  745. *
  746. * @param {Buffer} chunk A chunk of data
  747. * @private
  748. */
  749. function socketOnData(chunk) {
  750. if (!this[kWebSocket]._receiver.write(chunk)) {
  751. this.pause();
  752. }
  753. }
  754. /**
  755. * The listener of the `net.Socket` `'end'` event.
  756. *
  757. * @private
  758. */
  759. function socketOnEnd() {
  760. const websocket = this[kWebSocket];
  761. websocket.readyState = WebSocket.CLOSING;
  762. websocket._receiver.end();
  763. this.end();
  764. }
  765. /**
  766. * The listener of the `net.Socket` `'error'` event.
  767. *
  768. * @private
  769. */
  770. function socketOnError() {
  771. const websocket = this[kWebSocket];
  772. this.removeListener('error', socketOnError);
  773. this.on('error', NOOP);
  774. websocket.readyState = WebSocket.CLOSING;
  775. this.destroy();
  776. }