Home Reference Source

lib/neighborhood.js

'use strict'

const debug = (require('debug'))('neighborhood-wrtc')

const merge = require('lodash.merge')
const uuid = require('uuid/v4')
const Socket = require('simple-peer')
const Events = require('events')

const ArcStore = require('./arcstore.js')
const EPending = require('./entries/epending.js')
const EDying = require('./entries/edying.js')

const MResponse = require('./messages/mresponse.js')
const MRequest = require('./messages/mrequest.js')
const MSend = require('./messages/msend.js')
const MInternalSend = require('./messages/minternalsend.js')

// const ExIncompleteMessage = require('./exceptions/exincompletemessage.js')

/**
 * Easy-to-use interface to establish multiple WebRTC connections using
 * SimplePeer (npm: simple-peer)
 */
class Neighborhood extends Events {
  /**
     * @param {object} [options] the options available to the connections, e.g.
     * timeout before
     * @param {object} [options.socketClass] simple-peer default socket class (usefull if you need to change the type of socket)
     * @param {object} [options.config] simple-peer options
     * @param {number} [options.timeout = 60000] Time to wait (in milliseconds) for dying socket
     * @param {number} [options.pendingTimeout = 10000] Time to wait (in milliseconds) for pending socket
     * before neighborhood-wrtc assumes that a connection establishment failed,
     * or before an unused connection is removed.
     * @param {function} [options.encoding] Method to customize message sent,
     * default: return JSON.stringify(data);
     * @param {function} [options.decoding] Method to decode a received message,
     * default: return JSON.parse(data);
     */
  constructor (options) {
    super()
    // #1 save options
    this.options = {
      socketClass: Socket,
      peer: uuid(),
      config: { trickle: true, initiator: false },
      timeout: 1 * 60 * 1000,
      pendingTimeout: 10 * 1000,
      encoding: (d) => { return JSON.stringify(d) },
      decoding: (d) => { return JSON.parse(d) }
    }
    this.options = merge(this.options, options)
    this.encode = this.options.encoding // not sure it should stay that
    this.decode = this.options.decoding // way

    // #2 unmutable values
    this.PEER = this.options.peer
    debug('[%s] initialized.', this.PEER)

    // #3 initialize tables
    this.pending = new Map() // not finalized yet
    this.living = new ArcStore() // live and usable
    this.dying = new Map() // being removed
  }

  /**
   * Create a WebRTC connection.
   * @param {function|object} arg1 Either a callback function to send the
   * message to the remote peer (for instance, it can use a signaling server
   * or the already created WebRTC connexions), or a message received from the
   * remote peer.
   * @param {object} arg2 The message received from a peer that initialized a
   * WebRTC connection.
   */
  connect (arg1, arg2) {
    return new Promise((resolve, reject) => {
      const id = uuid()
      if (typeof arg1 === 'function' && typeof arg2 === 'undefined') {
        this._initiate(arg1, id) // arg1: callback for offers
      } else if (typeof arg1 === 'function' && typeof arg2 !== 'undefined') {
        this._accept(arg1, arg2, id) // arg1:callback, arg2:request
      } else {
        this._finalize(arg1, id) // arg1: response
      }
      this.once(id, (connectedWith, timeout = false, message) => {
        if (timeout) reject(new Error('timeout exceeded.', message))
        resolve(connectedWith)
      })
    })
  }

  /**
   * @private
   * Initiate the creation of a WebRTC connection. At this point, the identity
   * of the remote peer is unknown.
   * @param {function} sender A function called at each offer
   * @param {String} jobId The jobId that initiate the connection
   */
  _initiate (sender, jobId) {
    // #1 create an initiator
    this.options.config.initiator = true
    let SocketClass = this.options.socketClass
    let socket
    // handle DOMException: Failed to construct 'RTCPeerConnection': Cannot create so many PeerConnections
    try {
      socket = new SocketClass(this.options.config)
    } catch (e) {
      this.emit(jobId, null, true, e.message)
      return
    }
    // #2 insert the new entry in the pending table
    let entry = new EPending(uuid(), null, socket)
    entry.jobId = jobId
    // entry.tid = peerIdToConnectWith || entry.tid
    this.pending.set(entry.tid, entry)

    // #3 define events
    socket.once('connect', () => {
      entry.successful = true
      if (this.living.contains(entry.peer)) {
        entry.alreadyExists = true
        entry.successful = true
        debug('[_initiate(connect/living)] insert/increment')
        const inserting = this.living.insert(entry.peer, undefined, entry.tid)
        if (inserting) {
          debug('[init] emit connect event: ', entry.jobId, entry.peer, false)
          this.emit(entry.jobId, entry.peer, false)
          // notify
          this._connected(entry.peer, true)
        }
        entry.peer = null // becomes the unknown soldier
      } else {
        debug('[_initiate(connect)] insert/increment')
        const inserting = this.living.insert(entry.peer, socket, entry.tid)
        if (inserting) {
          debug('[init] emit connect event: ', entry.jobId, entry.peer, false)
          this.emit(entry.jobId, entry.peer, false)
          // notify
          this._connected(entry.peer, true)
        }
      }

      this._checkPendingEntry(entry)
    })
    socket.once('close', () => {
      if (entry.peer !== null) { // if not the unknown soldier
        if (this.living.contains(entry.peer)) {
          // #A remove the socket from the table of living connections
          this.living.removePeer(entry.peer)
          // #B notify
          this._disconnected(entry.peer)
        } else if (this.dying.has(entry.peer)) {
          let d = this.dying.get(entry.peer)
          clearTimeout(d.timeout)
          this.dying.delete(entry.peer)
        }
        debug('[init] emit close event: ', entry.jobId, entry.peer, true)
        this._checkPendingEntry(entry)
        this.emit(entry.jobId, entry.peer, true, 'closed')
      } else {
        debug('[%s] -‡- WebRTC -‡> %s', this.PEER, 'unknown')
      }
    })

    socket.on('data', (d) => {
      let msg = this.decode(d)
      if (msg.type === 'MInternalSend') {
        this._receiveInternalMessage(msg)
      } else {
        this._received(msg.peer, msg.payload)
      }
    })
    socket.on('stream', (s) => {
      this._streamed(entry.peer, s)
    })
    socket.on('error', (e) => {
      // Nothing here, for the failure are detected and handled after
      // this.options.timeout milliseconds.
      debug(e)
      socket.destroy()
      debug('[init] emit error event: ', entry.jobId, entry.peer, true)
      this.emit(entry.jobId, entry.peer, true, e.message)
    })
    // #4 send offer message using sender
    socket.on('signal', (offer) => {
      if (socket.connected && !socket._isNegociating) {
        this._sendRenegociateRequest(new MRequest(entry.tid, this.PEER, offer, 'renegociate'), entry.peer)
      } else {
        sender(new MRequest(entry.tid, this.PEER, offer))
      }
    })

    // #5 check if the socket has been established correctly
    setTimeout(() => {
      if ((!entry.successful || entry.alreadyExists) && (entry.socket !== null)) {
        entry.socket.destroy()
      }
      if (!entry.successful) {
        this._failed(entry.peer, true)
        debug('[init] emit timeout event: ', entry.jobId, entry.peer, true)
        this.emit(entry.jobId, entry.peer, true)
      }
      this.pending.delete(entry.tid)
    }, this.options.pendingTimeout)
  }

  /**
     * @private
     * Try to finalize the WebRTC connection using the remote offers.
     * @param {MResponse} msg The message containing an offer, a peerId etc.
     */
  _finalize (msg) {
    if (msg.offerType === 'renegociate') {
      debug(`[%s] _finalize regenociation:`, msg)
      if (this.living.store.has(msg.peer)) {
        const socket = this.living.get(msg.peer).socket
        socket.connected && !socket._isNegociating && socket.signal(msg.offer)
      }
      return
    }
    if (!this.pending.has(msg.tid)) {
      // debug(new ExLateMessage('_finalize', msg))
      return
    }

    let entry = this.pending.get(msg.tid)
    if (entry) {
      if (entry.alreadyExists || entry.successful) {
        this._checkPendingEntry(entry)
        debug('The socket already exists: ', entry.peer)
        return
      }
    }
    // #A check if the connection already exists
    if (this.living.contains(msg.peer)) {
      entry.alreadyExists = true
      entry.successful = true
      debug('[_finalize(living exists)] insert/increment')
      const inserting = this.living.insert(msg.peer, undefined, msg.tid)
      if (inserting) {
        debug('[%s]finalize --- arc --> %s', this.PEER, msg.peer)
        this._connected(msg.peer, true)
        this.emit(entry.jobId, msg.peer, false)
      }
      this._checkPendingEntry(entry)
    } else if (this.dying.has(msg.peer)) {
      debug('[_finalize(dying exists)] insert/increment')
      // #B rise from the dead
      entry.alreadyExists = true
      entry.successful = true
      let rise = this.dying.get(msg.peer)
      clearTimeout(rise.timeout)
      this.dying.delete(msg.peer)
      const inserting = this.living.insert(msg.peer, rise.socket, msg.tid)
      if (inserting) {
        debug('[%s]finalize -¡- arc -¡> %s', this.PEER, msg.peer)
        this._connected(msg.peer, true)
        this.emit(entry.jobId, msg.peer, false)
      }
      this._checkPendingEntry(entry)
    } else {
      // #C just signal the offer
      entry.peer = msg.peer
      if (!msg.offer) {
        // throw new ExIncompleteMessage('_finalize', entry, msg)
        // do not do anything
        // if the connection is not done, it will timeout
        // otherwise it will open a channel
      } else {
        debug('[finalize] signaling: ', msg)
        entry.socket.signal(msg.offer)
      }
    }
  }

  /**
     * @private
     * Establish a connection in response to the request of remote peer.
     * @param {function} sender The function that send the offer to the remote
     * initiating peer.
     * @param {MRequest} msg The request message containing offers, peerId, etc.
     **/
  _accept (sender, msg) {
    if (msg.offerType === 'renegociate') {
      debug(`[%s] _accept regenociation:`, msg)
      if (this.living.store.has(msg.peer)) {
        this.living.get(msg.peer).socket.signal(msg.offer)
      }
      return
    }
    // #1 initialize the entry if it does not exist
    let firstCall = false
    const tid = msg.tid
    const peer = msg.peer
    if (!this.pending.has(tid)) {
      firstCall = true
      let entry = new EPending(tid, peer)
      this.pending.set(tid, entry)

      setTimeout(() => {
        (!entry.successful || entry.alreadyExists) && entry.socket && entry.socket.destroy()
        !entry.successful && this._failed(peer, false)
        this.pending.delete(tid)
      }, this.options.pendingTimeout)
    }

    // #2 check if a WebRTC connection to peerId already exists
    let entry = this.pending.get(msg.tid)
    // let entry = this.pending.get(peer)
    if (entry.alreadyExists || entry.successful) { return }

    // #A check if the connection already exists
    if (this.living.contains(msg.peer)) {
      entry.alreadyExists = true
      entry.successful = true
      debug('[_accept(living exists)] insert/increment', msg)
      const inserting = this.living.insert(msg.peer, undefined, msg.tid)
      if (inserting) {
        debug('[%s] <-- arc --- %s', this.PEER, entry.peer)
        this._connected(msg.peer, false)
      }
      firstCall && sender(new MResponse(entry.tid, this.PEER, null))

      this._checkPendingEntry(entry)
    } else if (this.dying.has(msg.peer)) {
      debug('[_accept(dying exists)] insert/increment', msg)
      // #B rise from the dead
      entry.alreadyExists = true
      entry.successful = true
      let rise = this.dying.get(msg.peer)
      clearTimeout(rise.timeout)
      this.dying.delete(msg.peer)
      const inserting = this.living.insert(msg.peer, rise.socket, msg.tid)
      if (inserting) {
        debug('[%s] <¡- arc -¡- %s', this.PEER, msg.peer)
        this._connected(msg.peer, false)
      }
      firstCall && sender(new MResponse(entry.tid, this.PEER, null))

      // delete the pending entry cause we do not use the created one if exists
      this._checkPendingEntry(entry)
    } else {
      // #3 create the events and signal the offer
      if (firstCall && !entry.socket) {
        // #A create a socket
        this.options.config.initiator = false
        let SocketClass = this.options.socketClass
        let socket
        // handle DOMException: Failed to construct 'RTCPeerConnection': Cannot create so many PeerConnections
        try {
          socket = new SocketClass(this.options.config)
        } catch (e) {
          this.emit(entry.jobId, entry.peer, true, e.message)
          return
        }
        // #B update the entry
        entry.socket = socket
        // #C define events
        socket.once('connect', () => {
          entry.successful = true
          if (this.living.contains(entry.peer)) {
            entry.alreadyExists = true
            entry.successful = true
            debug('[_accept(connect/living)] insert/increment')
            const inserting = this.living.insert(entry.peer, undefined, msg.tid)
            if (inserting) {
              debug('[%s] <-- arc --- %s', this.PEER, entry.peer)
              this._connected(entry.peer, false)
            }
            entry.peer = null // becomes the unknown soldier
          } else {
            debug('[_accept(connect/dying)] insert/increment')
            const inserting = this.living.insert(entry.peer, socket, msg.tid)
            if (inserting) {
              debug('[%s] <-- WebRTC --- %s', this.PEER, entry.peer)
              this._connected(entry.peer, false)
            }
          }

          this._checkPendingEntry(entry)
        })
        socket.once('close', () => {
          if (entry.peer !== null) { // if not the unknown soldier
            if (this.living.contains(entry.peer)) {
              // #A remove the socket from the table of
              // living connections
              this.living.removePeer(entry.peer)
              this._disconnected(entry.peer)
            } else if (this.dying.has(entry.peer)) {
              let d = this.dying.get(entry.peer)
              clearTimeout(d.timeout)
              this.dying.delete(entry.peer)
            }
            debug('[%s] <‡- WebRTC -‡- %s', this.PEER, entry.peer)
          } else {
            debug('[%s] <‡- WebRTC -‡- %s', this.PEER, 'unknown')
          }
          this._checkPendingEntry(entry)
        })

        socket.on('data', (d) => {
          let msg = this.decode(d)
          if (msg.type === 'MInternalSend') {
            this._receiveInternalMessage(msg)
          } else {
            this._received(msg.peer, msg.payload)
          }
        })
        socket.on('stream', (s) => {
          this._streamed(entry.peer, s)
        })
        socket.on('error', (e) => {
          // Nothing here, for the failure are detected and handled
          // after this.options.timeout milliseconds.
          debug(e)
          socket.destroy()
        })
        // #4 send offer message using sender
        socket.on('signal', (offer) => {
          if (socket.connected && !socket._isNegotiating) {
            this._sendRenegociateResponse(new MResponse(entry.tid, this.PEER, offer, 'renegociate'), entry.peer)
          } else {
            sender(new MResponse(entry.tid, this.PEER, offer))
          }
        })
      }
      entry.socket.signal(msg.offer)
    }
  }

  /**
     * Remove an arc. If it was the last arc,
     * the WebRTC connexion is downgraded to the dying table. In this table, the
     * connexion will be closed if none create it.
     * @param {string|undefined} peerId The identifier of the peer. If no arg,
     * remove all arcs.
     */
  disconnect (peerId) {
    return new Promise((resolve, reject) => {
      if (typeof peerId === 'undefined') {
        // #1 remove all arcs
        let entries
        try {
          entries = this.living.removeAll()
        } catch (e) {
          return reject(e)
        }
        entries.forEach((entry) => {
          if (entry.socket !== null) {
            let dying = new EDying(entry.peer, entry.socket,
              setTimeout(() => {
                entry.socket && entry.socket.destroy()
              }, this.options.timeout))
            this.dying.set(dying.peer, dying)
          }

          for (let i = 0; i < entry.occ; ++i) {
            if (entry.socket === null || (entry.socket !== null && i < entry.occ - 1)) {
              debug('DISCONNECT-ALL [%s] ††† arc ††† %s', this.PEER, peerId)
            } else {
              debug('DISCONNECT-ALL [%s] ††† WebRTC ††† %s', this.PEER, peerId)
            }
            this._disconnected(entry.peer)
          }
        })
        resolve()
      } else {
        let entry = null
        // #2 remove one arc
        try {
          entry = this.living.remove(peerId)
        } catch (e) {
          return reject(e)
        }
        if (entry) {
          let dying = new EDying(entry.peer, entry.socket, setTimeout(() => {
            entry.socket && entry.socket.destroy()
          }, this.options.timeout))
          this.dying.set(dying.peer, dying)
          debug('DISCONNECT-ONE [%s] ††† WebRTC ††† %s', this.PEER, peerId)
        } else {
          debug('DISCONNECT-ONE [%s] ††† arc ††† %s', this.PEER, peerId)
        }
        this._disconnected(peerId)
        resolve()
      }
    })
  }

  /**
     * Send a message to a remote peer.
     * @param {string} peerId The remote peer to send the message to.
     * @param {object} message The message to send.
     * @param {number} [retry=0] Retry few times to send the message before
     * giving up.
     * @returns {promise} Resolved when the message is sent, reject
     * otherwise. Note that loss of messages is not handled by default.
     */
  send (peerId, message, retry = 0) {
    return new Promise((resolve, reject) => {
      // #1 get the proper entry in the tables
      let entry = null
      if (this.living.contains(peerId)) {
        entry = this.living.get(peerId)
      } else if (this.dying.has(peerId)) {
        entry = this.dying.get(peerId) // (TODO) warn: not safe
      }
      if (entry === null) {
        return reject(new Error('peer not found: ' + peerId))
      }
      // #2 define the recursive sending function
      let __send = (r) => {
        try {
          entry.socket.send(this.encode(new MSend(this.PEER, message)))
          debug('[%s] --- msg --> %s:%s', this.PEER, peerId)
          resolve()
        } catch (e) {
          debug('[%s] -X- msg -X> %s:%s', this.PEER, peerId)
          if (r < retry) {
            setTimeout(() => { __send(r + 1) }, 1000)
          } else {
            return reject(e)
          }
        }
      }
      // #3 start to send
      __send(0)
    })
  }

  stream (peerId, media, retry = 0) {
    return new Promise((resolve, reject) => {
      // #1 get the proper entry in the tables
      let entry = null
      if (this.living.contains(peerId)) {
        entry = this.living.get(peerId)
      } else if (this.dying.has(peerId)) {
        entry = this.dying.get(peerId) // (TODO) warn: not safe
      }
      if (entry === null) {
        this.living.store.forEach(elem => {
          debug(elem.peer)
        })
        reject(new Error('peer not found: ' + peerId))
      }
      // #2 define the recursive sending function
      let __send = (r) => {
        try {
          entry.socket.addStream(media)
          debug('[%s] --- MEDIA msg --> %s:%s', this.PEER, peerId)
          resolve()
        } catch (e) {
          debug('[%s] -X- MEDIA msg -X> %s:%s', this.PEER, peerId)
          if (r < retry) {
            setTimeout(() => { __send(r + 1) }, 1000)
          } else {
            reject(e)
          }
        }
      }
      // #3 start to send
      __send(0)
    })
  }

  _sendRenegociateRequest (request, to, retry = 0) {
    return new Promise((resolve, reject) => {
      // #1 get the proper entry in the tables
      let entry = null
      if (this.living.contains(to)) {
        entry = this.living.get(to)
      } else if (this.dying.has(to)) {
        entry = this.dying.get(to) // (TODO) warn: not safe
      }
      if (entry === null) {
        this.living.store.forEach(elem => {
          debug(elem.peer)
        })
        return reject(new Error('peer not found: ' + to))
      }
      // #2 define the recursive sending function
      let __send = (r) => {
        try {
          entry.socket.send(this.encode(new MInternalSend(this.PEER, request)))
          debug('[%s] --- MEDIA Internal Renegociate msg --> %s:%s',
            this.PEER, to)
          resolve()
        } catch (e) {
          debug('[%s] -X- MEDIA Internal Renegociate msg -X> %s:%s',
            this.PEER, to)
          if (r < retry) {
            setTimeout(() => { __send(r + 1) }, 1000)
          } else {
            return reject(e)
          }
        }
      }
      // #3 start to send
      __send(0)
    })
  }

  _sendRenegociateResponse (response, to, retry = 0) {
    return new Promise((resolve, reject) => {
      // #1 get the proper entry in the tables
      let entry = null
      if (this.living.contains(to)) {
        entry = this.living.get(to)
      } else if (this.dying.has(to)) {
        entry = this.dying.get(to) // (TODO) warn: not safe
      }
      if (entry === null) {
        this.living.store.forEach(elem => {
          debug(elem.peer)
        })
        return reject(new Error('peer not found: ' + to))
      }
      // #2 define the recursive sending function
      let __send = (r) => {
        try {
          entry.socket.send(this.encode(new MInternalSend(this.PEER, response)))
          debug('[%s] --- MEDIA Internal Renegociate msg --> %s:%s',
            this.PEER, to)
          resolve()
        } catch (e) {
          debug('[%s] -X- MEDIA Internal Renegociate msg -X> %s:%s',
            this.PEER, to)
          if (r < retry) {
            setTimeout(() => { __send(r + 1) }, 1000)
          } else {
            return reject(e)
          }
        }
      }
      // #3 start to send
      __send(0)
    })
  }

  _receiveInternalMessage (msg) {
    debug('Receive internal message: ', msg)
    this.living.get(msg.peer).socket.signal(msg.payload.offer)
  }

  /**
   * return an array of living sockets
   * @return {[ELiving]} a living entry with socket, peer id and number of occurences (arcs)
   */
  neighbours () {
    const neigh = []
    this.living.store.forEach(elem => {
      neigh.push(elem)
    })
    return neigh
  }

  _checkPendingEntry (entry) {
    if (this.pending.has(entry.tid)) {
      if (entry.peer === null) {
        if (entry.socket) {
          entry.socket.destroy()
          entry.socket = null
        }
      }
      this.pending.delete(entry.tid)
    }
  }
}

module.exports = Neighborhood