db.js

/**
 * @file Helper methods for dealing with IndexedDB cache of messages, users, and topics.
 *
 * @copyright 2015-2022 Tinode LLC.
 */
'use strict';

// NOTE TO DEVELOPERS:
// Localizable strings should be double quoted "строка на другом языке",
// non-localizable strings should be single quoted 'non-localized'.

const DB_VERSION = 1;
const DB_NAME = 'tinode-web';

let IDBProvider;

export default class DB {
  #onError = _ => {};
  #logger = _ => {};

  // Instance of IndexDB.
  db = null;
  // Indicator that the cache is disabled.
  disabled = false;

  constructor(onError, logger) {
    this.#onError = onError || this.#onError;
    this.#logger = logger || this.#logger;
  }

  #mapObjects(source, callback, context) {
    if (!this.db) {
      return disabled ?
        Promise.resolve([]) :
        Promise.reject(new Error("not initialized"));
    }

    return new Promise((resolve, reject) => {
      const trx = this.db.transaction([source]);
      trx.onerror = event => {
        this.#logger('PCache', 'mapObjects', source, event.target.error);
        reject(event.target.error);
      };
      trx.objectStore(source).getAll().onsuccess = event => {
        if (callback) {
          event.target.result.forEach(topic => {
            callback.call(context, topic);
          });
        }
        resolve(event.target.result);
      };
    });
  }

  /**
   * Initialize persistent cache: open or create/upgrade if needed.
   * @returns {Promise} promise to be resolved/rejected when the DB is initialized.
   */
  initDatabase() {
    return new Promise((resolve, reject) => {
      // Open the database and initialize callbacks.
      const req = IDBProvider.open(DB_NAME, DB_VERSION);
      req.onsuccess = event => {
        this.db = event.target.result;
        this.disabled = false;
        resolve(this.db);
      };
      req.onerror = event => {
        this.#logger('PCache', "failed to initialize", event);
        reject(event.target.error);
        this.#onError(event.target.error);
      };
      req.onupgradeneeded = event => {
        this.db = event.target.result;

        this.db.onerror = event => {
          this.#logger('PCache', "failed to create storage", event);
          this.#onError(event.target.error);
        };

        // Individual object stores.
        // Object store (table) for topics. The primary key is topic name.
        this.db.createObjectStore('topic', {
          keyPath: 'name'
        });

        // Users object store. UID is the primary key.
        this.db.createObjectStore('user', {
          keyPath: 'uid'
        });

        // Subscriptions object store topic <-> user. Topic name + UID is the primary key.
        this.db.createObjectStore('subscription', {
          keyPath: ['topic', 'uid']
        });

        // Messages object store. The primary key is topic name + seq.
        this.db.createObjectStore('message', {
          keyPath: ['topic', 'seq']
        });
      };
    });
  }

  /**
   * Delete persistent cache.
   */
  deleteDatabase() {
    // Close connection, otherwise operations will fail with 'onblocked'.
    if (this.db) {
      this.db.close();
      this.db = null;
    }
    return new Promise((resolve, reject) => {
      const req = IDBProvider.deleteDatabase(DB_NAME);
      req.onblocked = _ => {
        if (this.db) {
          this.db.close();
        }
        const err = new Error("blocked");
        this.#logger('PCache', 'deleteDatabase', err);
        reject(err);
      };
      req.onsuccess = _ => {
        this.db = null;
        this.disabled = true;
        resolve(true);
      };
      req.onerror = event => {
        this.#logger('PCache', 'deleteDatabase', event.target.error);
        reject(event.target.error);
      };
    });
  }

  /**
   * Check if persistent cache is ready for use.
   * @memberOf DB
   * @returns {boolean} <code>true</code> if cache is ready, <code>false</code> otherwise.
   */
  isReady() {
    return !!this.db;
  }

  // Topics.

  /**
   * Save to cache or update topic in persistent cache.
   * @memberOf DB
   * @param {Topic} topic - topic to be added or updated.
   * @returns {Promise} promise resolved/rejected on operation completion.
   */
  updTopic(topic) {
    if (!this.isReady()) {
      return this.disabled ?
        Promise.resolve() :
        Promise.reject(new Error("not initialized"));
    }
    return new Promise((resolve, reject) => {
      const trx = this.db.transaction(['topic'], 'readwrite');
      trx.oncomplete = event => {
        resolve(event.target.result);
      };
      trx.onerror = event => {
        this.#logger('PCache', 'updTopic', event.target.error);
        reject(event.target.error);
      };
      const req = trx.objectStore('topic').get(topic.name);
      req.onsuccess = _ => {
        trx.objectStore('topic').put(DB.#serializeTopic(req.result, topic));
        trx.commit();
      };
    });
  }

  /**
   * Mark or unmark topic as deleted.
   * @memberOf DB
   * @param {string} name - name of the topic to mark or unmark.
   * @return {Promise} promise resolved/rejected on operation completion.
   */
  markTopicAsDeleted(name) {
    if (!this.isReady()) {
      return this.disabled ?
        Promise.resolve() :
        Promise.reject(new Error("not initialized"));
    }
    return new Promise((resolve, reject) => {
      const trx = this.db.transaction(['topic'], 'readwrite');
      trx.oncomplete = event => {
        resolve(event.target.result);
      };
      trx.onerror = event => {
        this.#logger('PCache', 'markTopicAsDeleted', event.target.error);
        reject(event.target.error);
      };
      const req = trx.objectStore('topic').get(name);
      req.onsuccess = event => {
        const topic = event.target.result;
        topic._deleted = true;
        trx.objectStore('topic').put(topic);
        trx.commit();
      };
    });
  }

  /**
   * Remove topic from persistent cache.
   * @memberOf DB
   * @param {string} name - name of the topic to remove from database.
   * @return {Promise} promise resolved/rejected on operation completion.
   */
  remTopic(name) {
    if (!this.isReady()) {
      return this.disabled ?
        Promise.resolve() :
        Promise.reject(new Error("not initialized"));
    }
    return new Promise((resolve, reject) => {
      const trx = this.db.transaction(['topic', 'subscription', 'message'], 'readwrite');
      trx.oncomplete = event => {
        resolve(event.target.result);
      };
      trx.onerror = event => {
        this.#logger('PCache', 'remTopic', event.target.error);
        reject(event.target.error);
      };
      trx.objectStore('topic').delete(IDBKeyRange.only(name));
      trx.objectStore('subscription').delete(IDBKeyRange.bound([name, '-'], [name, '~']));
      trx.objectStore('message').delete(IDBKeyRange.bound([name, 0], [name, Number.MAX_SAFE_INTEGER]));
      trx.commit();
    });
  }

  /**
   * Execute a callback for each stored topic.
   * @memberOf DB
   * @param {function} callback - function to call for each topic.
   * @param {Object} context - the value or <code>this</code> inside the callback.
   * @return {Promise} promise resolved/rejected on operation completion.
   */
  mapTopics(callback, context) {
    return this.#mapObjects('topic', callback, context);
  }

  /**
   * Copy data from serialized object to topic.
   * @memberOf DB
   * @param {Topic} topic - target to deserialize to.
   * @param {Object} src - serialized data to copy from.
   */
  deserializeTopic(topic, src) {
    DB.#deserializeTopic(topic, src);
  }

  // Users.
  /**
   * Add or update user object in the persistent cache.
   * @memberOf DB
   * @param {string} uid - ID of the user to save or update.
   * @param {Object} pub - user's <code>public</code> information.
   * @returns {Promise} promise resolved/rejected on operation completion.
   */
  updUser(uid, pub) {
    if (arguments.length < 2 || pub === undefined) {
      // No point inupdating user with invalid data.
      return;
    }
    if (!this.isReady()) {
      return this.disabled ?
        Promise.resolve() :
        Promise.reject(new Error("not initialized"));
    }
    return new Promise((resolve, reject) => {
      const trx = this.db.transaction(['user'], 'readwrite');
      trx.oncomplete = event => {
        resolve(event.target.result);
      };
      trx.onerror = event => {
        this.#logger('PCache', 'updUser', event.target.error);
        reject(event.target.error);
      };
      trx.objectStore('user').put({
        uid: uid,
        public: pub
      });
      trx.commit();
    });
  }

  /**
   * Remove user from persistent cache.
   * @memberOf DB
   * @param {string} uid - ID of the user to remove from the cache.
   * @return {Promise} promise resolved/rejected on operation completion.
   */
  remUser(uid) {
    if (!this.isReady()) {
      return this.disabled ?
        Promise.resolve() :
        Promise.reject(new Error("not initialized"));
    }
    return new Promise((resolve, reject) => {
      const trx = this.db.transaction(['user'], 'readwrite');
      trx.oncomplete = event => {
        resolve(event.target.result);
      };
      trx.onerror = event => {
        this.#logger('PCache', 'remUser', event.target.error);
        reject(event.target.error);
      };
      trx.objectStore('user').delete(IDBKeyRange.only(uid));
      trx.commit();
    });
  }

  /**
   * Execute a callback for each stored user.
   * @memberOf DB
   * @param {function} callback - function to call for each topic.
   * @param {Object} context - the value or <code>this</code> inside the callback.
   * @return {Promise} promise resolved/rejected on operation completion.
   */
  mapUsers(callback, context) {
    return this.#mapObjects('user', callback, context);
  }

  /**
   * Read a single user from persistent cache.
   * @memberOf DB
   * @param {string} uid - ID of the user to fetch from cache.
   * @return {Promise} promise resolved/rejected on operation completion.
   */
  getUser(uid) {
    if (!this.isReady()) {
      return this.disabled ?
        Promise.resolve() :
        Promise.reject(new Error("not initialized"));
    }
    return new Promise((resolve, reject) => {
      const trx = this.db.transaction(['user']);
      trx.oncomplete = event => {
        const user = event.target.result;
        resolve({
          user: user.uid,
          public: user.public
        });
      };
      trx.onerror = event => {
        this.#logger('PCache', 'getUser', event.target.error);
        reject(event.target.error);
      };
      trx.objectStore('user').get(uid);
    });
  }

  // Subscriptions.
  /**
   * Add or update subscription in persistent cache.
   * @memberOf DB
   * @param {string} topicName - name of the topic which owns the message.
   * @param {string} uid - ID of the subscribed user.
   * @param {Object} sub - subscription to save.
   * @return {Promise} promise resolved/rejected on operation completion.
   */
  updSubscription(topicName, uid, sub) {
    if (!this.isReady()) {
      return this.disabled ?
        Promise.resolve() :
        Promise.reject(new Error("not initialized"));
    }
    return new Promise((resolve, reject) => {
      const trx = this.db.transaction(['subscription'], 'readwrite');
      trx.oncomplete = event => {
        resolve(event.target.result);
      };
      trx.onerror = event => {
        this.#logger('PCache', 'updSubscription', event.target.error);
        reject(event.target.error);
      };
      trx.objectStore('subscription').get([topicName, uid]).onsuccess = (event) => {
        trx.objectStore('subscription').put(DB.#serializeSubscription(event.target.result, topicName, uid, sub));
        trx.commit();
      };
    });
  }

  /**
   * Execute a callback for each cached subscription in a given topic.
   * @memberOf DB
   * @param {string} topicName - name of the topic which owns the subscriptions.
   * @param {function} callback - function to call for each subscription.
   * @param {Object} context - the value or <code>this</code> inside the callback.
   * @return {Promise} promise resolved/rejected on operation completion.
   */
  mapSubscriptions(topicName, callback, context) {
    if (!this.isReady()) {
      return this.disabled ?
        Promise.resolve([]) :
        Promise.reject(new Error("not initialized"));
    }
    return new Promise((resolve, reject) => {
      const trx = this.db.transaction(['subscription']);
      trx.onerror = (event) => {
        this.#logger('PCache', 'mapSubscriptions', event.target.error);
        reject(event.target.error);
      };
      trx.objectStore('subscription').getAll(IDBKeyRange.bound([topicName, '-'], [topicName, '~'])).onsuccess = (event) => {
        if (callback) {
          event.target.result.forEach((topic) => {
            callback.call(context, topic);
          });
        }
        resolve(event.target.result);
      };
    });
  }

  // Messages.

  /**
   * Save message to persistent cache.
   * @memberOf DB
   * @param {string} topicName - name of the topic which owns the message.
   * @param {Object} msg - message to save.
   * @return {Promise} promise resolved/rejected on operation completion.
   */
  addMessage(msg) {
    if (!this.isReady()) {
      return this.disabled ?
        Promise.resolve() :
        Promise.reject(new Error("not initialized"));
    }
    return new Promise((resolve, reject) => {
      const trx = this.db.transaction(['message'], 'readwrite');
      trx.onsuccess = event => {
        resolve(event.target.result);
      };
      trx.onerror = event => {
        this.#logger('PCache', 'addMessage', event.target.error);
        reject(event.target.error);
      };
      trx.objectStore('message').add(DB.#serializeMessage(null, msg));
      trx.commit();
    });
  }

  /**
   * Update delivery status of a message stored in persistent cache.
   * @memberOf DB
   * @param {string} topicName - name of the topic which owns the message.
   * @param {number} seq - ID of the message to update
   * @param {number} status - new delivery status of the message.
   * @return {Promise} promise resolved/rejected on operation completion.
   */
  updMessageStatus(topicName, seq, status) {
    if (!this.isReady()) {
      return this.disabled ?
        Promise.resolve() :
        Promise.reject(new Error("not initialized"));
    }
    return new Promise((resolve, reject) => {
      const trx = this.db.transaction(['message'], 'readwrite');
      trx.onsuccess = event => {
        resolve(event.target.result);
      };
      trx.onerror = event => {
        this.#logger('PCache', 'updMessageStatus', event.target.error);
        reject(event.target.error);
      };
      const req = trx.objectStore('message').get(IDBKeyRange.only([topicName, seq]));
      req.onsuccess = event => {
        const src = req.result || event.target.result;
        if (!src || src._status == status) {
          trx.commit();
          return;
        }
        trx.objectStore('message').put(DB.#serializeMessage(src, {
          topic: topicName,
          seq: seq,
          _status: status
        }));
        trx.commit();
      };
    });
  }

  /**
   * Remove one or more messages from persistent cache.
   * @memberOf DB
   * @param {string} topicName - name of the topic which owns the message.
   * @param {number} from - id of the message to remove or lower boundary when removing range (inclusive).
   * @param {number=} to - upper boundary (exclusive) when removing a range of messages.
   * @return {Promise} promise resolved/rejected on operation completion.
   */
  remMessages(topicName, from, to) {
    if (!this.isReady()) {
      return this.disabled ?
        Promise.resolve() :
        Promise.reject(new Error("not initialized"));
    }
    return new Promise((resolve, reject) => {
      if (!from && !to) {
        from = 0;
        to = Number.MAX_SAFE_INTEGER;
      }
      const range = to > 0 ? IDBKeyRange.bound([topicName, from], [topicName, to], false, true) :
        IDBKeyRange.only([topicName, from]);
      const trx = this.db.transaction(['message'], 'readwrite');
      trx.onsuccess = (event) => {
        resolve(event.target.result);
      };
      trx.onerror = (event) => {
        this.#logger('PCache', 'remMessages', event.target.error);
        reject(event.target.error);
      };
      trx.objectStore('message').delete(range);
      trx.commit();
    });
  }

  /**
   * Retrieve messages from persistent store.
   * @memberOf DB
   * @param {string} topicName - name of the topic to retrieve messages from.
   * @param {function} callback to call for each retrieved message.
   * @param {Object} query - parameters of the message range to retrieve.
   * @param {number=} query.since - the least message ID to retrieve (inclusive).
   * @param {number=} query.before - the greatest message ID to retrieve (exclusive).
   * @param {number=} query.limit - the maximum number of messages to retrieve.
   * @return {Promise} promise resolved/rejected on operation completion.
   */
  readMessages(topicName, query, callback, context) {
    if (!this.isReady()) {
      return this.disabled ?
        Promise.resolve([]) :
        Promise.reject(new Error("not initialized"));
    }
    return new Promise((resolve, reject) => {
      query = query || {};
      const since = query.since > 0 ? query.since : 0;
      const before = query.before > 0 ? query.before : Number.MAX_SAFE_INTEGER;
      const limit = query.limit | 0;

      const result = [];
      const range = IDBKeyRange.bound([topicName, since], [topicName, before], false, true);
      const trx = this.db.transaction(['message']);
      trx.onerror = (event) => {
        this.#logger('PCache', 'readMessages', event.target.error);
        reject(event.target.error);
      };
      // Iterate in descending order.
      trx.objectStore('message').openCursor(range, 'prev').onsuccess = (event) => {
        const cursor = event.target.result;
        if (cursor) {
          if (callback) {
            callback.call(context, cursor.value);
          }
          result.push(cursor.value);
          if (limit <= 0 || result.length < limit) {
            cursor.continue();
          } else {
            resolve(result);
          }
        } else {
          resolve(result);
        }
      };
    });
  }

  // Private methods.

  // Serializable topic fields.
  static #topic_fields = ['created', 'updated', 'deleted', 'read', 'recv', 'seq', 'clear', 'defacs',
    'creds', 'public', 'trusted', 'private', 'touched', '_deleted'
  ];

  // Copy data from src to Topic object.
  static #deserializeTopic(topic, src) {
    DB.#topic_fields.forEach((f) => {
      if (src.hasOwnProperty(f)) {
        topic[f] = src[f];
      }
    });
    if (Array.isArray(src.tags)) {
      topic._tags = src.tags;
    }
    if (src.acs) {
      topic.setAccessMode(src.acs);
    }
    topic.seq |= 0;
    topic.read |= 0;
    topic.unread = Math.max(0, topic.seq - topic.read);
  }

  // Copy values from 'src' to 'dst'. Allocate dst if it's null or undefined.
  static #serializeTopic(dst, src) {
    const res = dst || {
      name: src.name
    };
    DB.#topic_fields.forEach((f) => {
      if (src.hasOwnProperty(f)) {
        res[f] = src[f];
      }
    });
    if (Array.isArray(src._tags)) {
      res.tags = src._tags;
    }
    if (src.acs) {
      res.acs = src.getAccessMode().jsonHelper();
    }
    return res;
  }

  static #serializeSubscription(dst, topicName, uid, sub) {
    const fields = ['updated', 'mode', 'read', 'recv', 'clear', 'lastSeen', 'userAgent'];
    const res = dst || {
      topic: topicName,
      uid: uid
    };

    fields.forEach((f) => {
      if (sub.hasOwnProperty(f)) {
        res[f] = sub[f];
      }
    });

    return res;
  }

  static #serializeMessage(dst, msg) {
    // Serializable fields.
    const fields = ['topic', 'seq', 'ts', '_status', 'from', 'head', 'content'];
    const res = dst || {};
    fields.forEach((f) => {
      if (msg.hasOwnProperty(f)) {
        res[f] = msg[f];
      }
    });
    return res;
  }

  /**
   * To use DB in a non browser context, supply indexedDB provider.
   * @static
   * @memberof DB
   * @param idbProvider indexedDB provider, e.g. for node <code>require('fake-indexeddb')</code>.
   */
  static setDatabaseProvider(idbProvider) {
    IDBProvider = idbProvider;
  }
}