connection.js

/**
 * @file Abstraction layer for websocket and long polling connections.
 *
 * @copyright 2015-2022 Tinode LLC.
 */
'use strict';

import CommError from './comm-error.js';
import {
  jsonParseHelper
} from './utils.js';

let WebSocketProvider;
let XHRProvider;

// Error code to return in case of a network problem.
const NETWORK_ERROR = 503;
const NETWORK_ERROR_TEXT = "Connection failed";

// Error code to return when user disconnected from server.
const NETWORK_USER = 418;
const NETWORK_USER_TEXT = "Disconnected by client";

// Settings for exponential backoff
const _BOFF_BASE = 2000; // 2000 milliseconds, minimum delay between reconnects
const _BOFF_MAX_ITER = 10; // Maximum delay between reconnects 2^10 * 2000 ~ 34 minutes
const _BOFF_JITTER = 0.3; // Add random delay

// Helper function for creating an endpoint URL.
function makeBaseUrl(host, protocol, version, apiKey) {
  let url = null;

  if (['http', 'https', 'ws', 'wss'].includes(protocol)) {
    url = `${protocol}://${host}`;
    if (url.charAt(url.length - 1) !== '/') {
      url += '/';
    }
    url += 'v' + version + '/channels';
    if (['http', 'https'].includes(protocol)) {
      // Long polling endpoint ends with "lp", i.e.
      // '/v0/channels/lp' vs just '/v0/channels' for ws
      url += '/lp';
    }
    url += '?apikey=' + apiKey;
  }
  return url;
}

/**
 * An abstraction for a websocket or a long polling connection.
 *
 * @class Connection
 * @memberof Tinode

 * @param {Object} config - configuration parameters.
 * @param {string} config.host - Host name and optional port number to connect to.
 * @param {string} config.apiKey - API key generated by <code>keygen</code>.
 * @param {string} config.transport - Network transport to use, either <code>"ws"<code>/<code>"wss"</code> for websocket or
 *      <code>lp</code> for long polling.
 * @param {boolean} config.secure - Use Secure WebSocket if <code>true</code>.
 * @param {string} version_ - Major value of the protocol version, e.g. '0' in '0.17.1'.
 * @param {boolean} autoreconnect_ - If connection is lost, try to reconnect automatically.
 */
export default class Connection {
  // Logger, does nothing by default.
  static #log = _ => {};

  #boffTimer = null;
  #boffIteration = 0;
  #boffClosed = false; // Indicator if the socket was manually closed - don't autoreconnect if true.

  // Websocket.
  #socket = null;

  host;
  secure;
  apiKey;

  version;
  autoreconnect;

  initialized;

  // (config.host, config.apiKey, config.transport, config.secure), PROTOCOL_VERSION, true
  constructor(config, version_, autoreconnect_) {
    this.host = config.host;
    this.secure = config.secure;
    this.apiKey = config.apiKey;

    this.version = version_;
    this.autoreconnect = autoreconnect_;

    if (config.transport === 'lp') {
      // explicit request to use long polling
      this.#init_lp();
      this.initialized = 'lp';
    } else if (config.transport === 'ws') {
      // explicit request to use web socket
      // if websockets are not available, horrible things will happen
      this.#init_ws();
      this.initialized = 'ws';
    }

    if (!this.initialized) {
      // Invalid or undefined network transport.
      Connection.#log("Unknown or invalid network transport. Running under Node? Call 'Tinode.setNetworkProviders()'.");
      throw new Error("Unknown or invalid network transport. Running under Node? Call 'Tinode.setNetworkProviders()'.");
    }
  }

  /**
   * To use Connection in a non browser context, supply WebSocket and XMLHttpRequest providers.
   * @static
   * @memberof Connection
   * @param wsProvider WebSocket provider, e.g. for nodeJS , <code>require('ws')</code>.
   * @param xhrProvider XMLHttpRequest provider, e.g. for node <code>require('xhr')</code>.
   */
  static setNetworkProviders(wsProvider, xhrProvider) {
    WebSocketProvider = wsProvider;
    XHRProvider = xhrProvider;
  }

  /**
   * Assign a non-default logger.
   * @static
   * @memberof Connection
   * @param {function} l variadic logging function.
   */
  static set logger(l) {
    Connection.#log = l;
  }

  /**
   * Initiate a new connection
   * @memberof Tinode.Connection#
   * @param {string} host_ Host name to connect to; if <code>null</code> the old host name will be used.
   * @param {boolean} force Force new connection even if one already exists.
   * @return {Promise} Promise resolved/rejected when the connection call completes, resolution is called without
   *  parameters, rejection passes the {Error} as parameter.
   */
  connect(host_, force) {
    return Promise.reject(null);
  }

  /**
   * Try to restore a network connection, also reset backoff.
   * @memberof Tinode.Connection#
   *
   * @param {boolean} force - reconnect even if there is a live connection already.
   */
  reconnect(force) {}

  /**
   * Terminate the network connection
   * @memberof Tinode.Connection#
   */
  disconnect() {}

  /**
   * Send a string to the server.
   * @memberof Tinode.Connection#
   *
   * @param {string} msg - String to send.
   * @throws Throws an exception if the underlying connection is not live.
   */
  sendText(msg) {}

  /**
   * Check if connection is alive.
   * @memberof Tinode.Connection#
   * @returns {boolean} <code>true</code> if connection is live, <code>false</code> otherwise.
   */
  isConnected() {
    return false;
  }

  /**
   * Get the name of the current network transport.
   * @memberof Tinode.Connection#
   * @returns {string} name of the transport such as <code>"ws"</code> or <code>"lp"</code>.
   */
  transport() {
    return this.initialized;
  }

  /**
   * Send network probe to check if connection is indeed live.
   * @memberof Tinode.Connection#
   */
  probe() {
    this.sendText('1');
  }

  /**
   * Reset autoreconnect counter to zero.
   * @memberof Tinode.Connection#
   */
  backoffReset() {
    this.#boffReset();
  }

  // Backoff implementation - reconnect after a timeout.
  #boffReconnect() {
    // Clear timer
    clearTimeout(this.#boffTimer);
    // Calculate when to fire the reconnect attempt
    const timeout = _BOFF_BASE * (Math.pow(2, this.#boffIteration) * (1.0 + _BOFF_JITTER * Math.random()));
    // Update iteration counter for future use
    this.#boffIteration = (this.#boffIteration >= _BOFF_MAX_ITER ? this.#boffIteration : this.#boffIteration + 1);
    if (this.onAutoreconnectIteration) {
      this.onAutoreconnectIteration(timeout);
    }

    this.#boffTimer = setTimeout(_ => {
      Connection.#log(`Reconnecting, iter=${this.#boffIteration}, timeout=${timeout}`);
      // Maybe the socket was closed while we waited for the timer?
      if (!this.#boffClosed) {
        const prom = this.connect();
        if (this.onAutoreconnectIteration) {
          this.onAutoreconnectIteration(0, prom);
        } else {
          // Suppress error if it's not used.
          prom.catch(_ => {
            /* do nothing */
          });
        }
      } else if (this.onAutoreconnectIteration) {
        this.onAutoreconnectIteration(-1);
      }
    }, timeout);
  }

  // Terminate auto-reconnect process.
  #boffStop() {
    clearTimeout(this.#boffTimer);
    this.#boffTimer = null;
  }

  // Reset auto-reconnect iteration counter.
  #boffReset() {
    this.#boffIteration = 0;
  }

  // Initialization for long polling.
  #init_lp() {
    const XDR_UNSENT = 0; // Client has been created. open() not called yet.
    const XDR_OPENED = 1; // open() has been called.
    const XDR_HEADERS_RECEIVED = 2; // send() has been called, and headers and status are available.
    const XDR_LOADING = 3; // Downloading; responseText holds partial data.
    const XDR_DONE = 4; // The operation is complete.

    // Fully composed endpoint URL, with API key & SID
    let _lpURL = null;

    let _poller = null;
    let _sender = null;

    let lp_sender = (url_) => {
      const sender = new XHRProvider();
      sender.onreadystatechange = (evt) => {
        if (sender.readyState == XDR_DONE && sender.status >= 400) {
          // Some sort of error response
          throw new CommError("LP sender failed", sender.status);
        }
      };

      sender.open('POST', url_, true);
      return sender;
    }

    let lp_poller = (url_, resolve, reject) => {
      let poller = new XHRProvider();
      let promiseCompleted = false;

      poller.onreadystatechange = evt => {
        if (poller.readyState == XDR_DONE) {
          if (poller.status == 201) { // 201 == HTTP.Created, get SID
            let pkt = JSON.parse(poller.responseText, jsonParseHelper);
            _lpURL = url_ + '&sid=' + pkt.ctrl.params.sid;
            poller = lp_poller(_lpURL);
            poller.send(null);
            if (this.onOpen) {
              this.onOpen();
            }

            if (resolve) {
              promiseCompleted = true;
              resolve();
            }

            if (this.autoreconnect) {
              this.#boffStop();
            }
          } else if (poller.status > 0 && poller.status < 400) { // 0 = network error; 400 = HTTP.BadRequest
            if (this.onMessage) {
              this.onMessage(poller.responseText);
            }
            poller = lp_poller(_lpURL);
            poller.send(null);
          } else {
            // Don't throw an error here, gracefully handle server errors
            if (reject && !promiseCompleted) {
              promiseCompleted = true;
              reject(poller.responseText);
            }
            if (this.onMessage && poller.responseText) {
              this.onMessage(poller.responseText);
            }
            if (this.onDisconnect) {
              const code = poller.status || (this.#boffClosed ? NETWORK_USER : NETWORK_ERROR);
              const text = poller.responseText || (this.#boffClosed ? NETWORK_USER_TEXT : NETWORK_ERROR_TEXT);
              this.onDisconnect(new CommError(text, code), code);
            }

            // Polling has stopped. Indicate it by setting poller to null.
            poller = null;
            if (!this.#boffClosed && this.autoreconnect) {
              this.#boffReconnect();
            }
          }
        }
      };
      // Using POST to avoid caching response by service worker.
      poller.open('POST', url_, true);
      return poller;
    }

    this.connect = (host_, force) => {
      this.#boffClosed = false;

      if (_poller) {
        if (!force) {
          return Promise.resolve();
        }
        _poller.onreadystatechange = undefined;
        _poller.abort();
        _poller = null;
      }

      if (host_) {
        this.host = host_;
      }

      return new Promise((resolve, reject) => {
        const url = makeBaseUrl(this.host, this.secure ? 'https' : 'http', this.version, this.apiKey);
        Connection.#log("LP connecting to:", url);
        _poller = lp_poller(url, resolve, reject);
        _poller.send(null);
      }).catch(err => {
        Connection.#log("LP connection failed:", err);
      });
    };

    this.reconnect = force => {
      this.#boffStop();
      this.connect(null, force);
    };

    this.disconnect = _ => {
      this.#boffClosed = true;
      this.#boffStop();

      if (_sender) {
        _sender.onreadystatechange = undefined;
        _sender.abort();
        _sender = null;
      }
      if (_poller) {
        _poller.onreadystatechange = undefined;
        _poller.abort();
        _poller = null;
      }

      if (this.onDisconnect) {
        this.onDisconnect(new CommError(NETWORK_USER_TEXT, NETWORK_USER), NETWORK_USER);
      }
      // Ensure it's reconstructed
      _lpURL = null;
    };

    this.sendText = (msg) => {
      _sender = lp_sender(_lpURL);
      if (_sender && (_sender.readyState == XDR_OPENED)) { // 1 == OPENED
        _sender.send(msg);
      } else {
        throw new Error("Long poller failed to connect");
      }
    };

    this.isConnected = _ => {
      return (_poller && true);
    };
  }

  // Initialization for Websocket
  #init_ws() {
    this.connect = (host_, force) => {
      this.#boffClosed = false;

      if (this.#socket) {
        if (!force && this.#socket.readyState == this.#socket.OPEN) {
          return Promise.resolve();
        }
        this.#socket.close();
        this.#socket = null;
      }

      if (host_) {
        this.host = host_;
      }

      return new Promise((resolve, reject) => {
        const url = makeBaseUrl(this.host, this.secure ? 'wss' : 'ws', this.version, this.apiKey);

        Connection.#log("WS connecting to: ", url);

        // It throws when the server is not accessible but the exception cannot be caught:
        // https://stackoverflow.com/questions/31002592/javascript-doesnt-catch-error-in-websocket-instantiation/31003057
        const conn = new WebSocketProvider(url);

        conn.onerror = err => {
          reject(err);
        };

        conn.onopen = _ => {
          if (this.autoreconnect) {
            this.#boffStop();
          }

          if (this.onOpen) {
            this.onOpen();
          }

          resolve();
        };

        conn.onclose = _ => {
          this.#socket = null;

          if (this.onDisconnect) {
            const code = this.#boffClosed ? NETWORK_USER : NETWORK_ERROR;
            this.onDisconnect(new CommError(this.#boffClosed ? NETWORK_USER_TEXT : NETWORK_ERROR_TEXT, code), code);
          }

          if (!this.#boffClosed && this.autoreconnect) {
            this.#boffReconnect();
          }
        };

        conn.onmessage = evt => {
          if (this.onMessage) {
            this.onMessage(evt.data);
          }
        };

        this.#socket = conn;
      });
    }

    this.reconnect = force => {
      this.#boffStop();
      this.connect(null, force);
    };

    this.disconnect = _ => {
      this.#boffClosed = true;
      this.#boffStop();

      if (!this.#socket) {
        return;
      }
      this.#socket.close();
      this.#socket = null;
    };

    this.sendText = msg => {
      if (this.#socket && (this.#socket.readyState == this.#socket.OPEN)) {
        this.#socket.send(msg);
      } else {
        throw new Error("Websocket is not connected");
      }
    };

    this.isConnected = _ => {
      return (this.#socket && (this.#socket.readyState == this.#socket.OPEN));
    };
  }

  // Callbacks:

  /**
   * A callback to pass incoming messages to. See {@link Tinode.Connection#onMessage}.
   * @callback Tinode.Connection.OnMessage
   * @memberof Tinode.Connection
   * @param {string} message - Message to process.
   */
  onMessage = undefined;

  /**
   * A callback for reporting a dropped connection.
   * @type {function}
   * @memberof Tinode.Connection#
   */
  onDisconnect = undefined;

  /**
   * A callback called when the connection is ready to be used for sending. For websockets it's socket open,
   * for long polling it's <code>readyState=1</code> (OPENED)
   * @type {function}
   * @memberof Tinode.Connection#
   */
  onOpen = undefined;

  /**
   * A callback to notify of reconnection attempts. See {@link Tinode.Connection#onAutoreconnectIteration}.
   * @memberof Tinode.Connection
   * @callback AutoreconnectIterationType
   * @param {string} timeout - time till the next reconnect attempt in milliseconds. <code>-1</code> means reconnect was skipped.
   * @param {Promise} promise resolved or rejected when the reconnect attemp completes.
   *
   */
  /**
   * A callback to inform when the next attampt to reconnect will happen and to receive connection promise.
   * @memberof Tinode.Connection#
   * @type {Tinode.Connection.AutoreconnectIterationType}
   */
  onAutoreconnectIteration = undefined;
}

Connection.NETWORK_ERROR = NETWORK_ERROR;
Connection.NETWORK_ERROR_TEXT = NETWORK_ERROR_TEXT;
Connection.NETWORK_USER = NETWORK_USER;
Connection.NETWORK_USER_TEXT = NETWORK_USER_TEXT;