Home Reference Source

lib/tman.js

/* eslint no-unused-vars: 0 */
'use strict'

const debug = (require('debug'))('tman-wrtc')
const N2N = require('n2n-overlay-wrtc')
const U = require('unicast-definition')
const merge = require('lodash.merge')
const isEmpty = require('lodash.isempty')

const PartialView = require('./partialview.js')
const Cache = require('./cache.js')

const MJoin = require('./messages/mjoin.js')
const MSuggest = require('./messages/msuggest.js')
const MSuggestBack = require('./messages/msuggestback.js')
const MRequire = require('./messages/mrequire.js')
const MGiveDescriptor = require('./messages/mgivedescriptor.js')
const MRequestDescriptor = require('./messages/mrequestdescriptor.js')

const ExMessage = require('./exceptions/exmessage.js')
const ExJoin = require('./exceptions/exjoin.js')

/**
 * Peer-sampling protocol running on top of WebRTC that builds network
 * topologies using ranking functions.
 */
class TMan extends N2N {
  /**
     * @param {object} [options] Options given to TMan to build the desired
     * topology.
     * @param {string} [options.pid = 'tman-wrtc'] The identifier of this
     * protocol.
     * @param {number} [options.delta = 60000] Every delta millisecond, exchange
     * neighbors of the partial view.
     * @param {number} [options.timeout = 120000] WebRTC connections are
     * expensive to establish, may fail, etc. Instead of immediately removing
     * them, the protocol keep them warm during options.timeout
     * milliseconds. Consequently, messages transiting through them can still be
     * transmitted, and if the protocol requires such an arc, it can be
     * reestablished at no cost.
     * @param {number} [options.descriptorTimeout = 10000] Peers regularly ask
     * for descriptor and await an answer. If this answer does not come up in
     * time, it throws an exception.
     * @param {IPSP} [parent] This module can depend of another peer-sampling
     * protocol. If set, it will share the neighbors populating its inview and
     * outview. Thus, the above options will be of no use. See neigbhorhood-wrtc
     * module for more informations on the sharing process.
     */
  constructor (options = {}, parent) {
    // #0 initialize our N2N-parent
    super(merge({ pid: 'tman-wrtc',
      delta: 1 * 60 * 1000,
      timeout: 2 * 60 * 1000,
      descriptorTimeout: 10 * 1000,
      retry: 5,
      inview: parent && parent.NI,
      outview: parent && parent.NO,
      descriptor: {x: Math.random()},
      ranking: (neighbor) => (a, b) => {
        const db = b.descriptor.x - neighbor.descriptor.x
        const da = a.descriptor.x - neighbor.descriptor.x
        return Math.abs(da) - Math.abs(db)
      }}, options))
    // #1 constants (from N2N): this.PID, this.PEER
    debug('[%s] Initalized with ==> %s ==>', this.PID, this.PEER)
    // #2 initialize the partial view
    this.partialView = new PartialView()
    this.cache = new Cache(this.options.timeout)
    // #3 connectedness state of this protocol
    this.state = 'disconnected'
    // #4 periodic shuffling
    this.periodic = null
    // #5 events
    this.on('receive', (peerId, message) => this._receive(peerId, message))
    // this.on('stream', (peerId, message) =>  ); // (TODO) ?
    this.on('open', (peerId) => {
      this._open(peerId)
      this._updateState()
    })
    this.on('close', (peerId) => {
      this._close(peerId)
      this._updateState()
    })
    this.on('fail', (peerId) => {
      this._onArcDown(peerId)
      this._updateState()
    })
    // #6 if has parent, register events to get descriptors
    this.parent = parent || null
    if (this.parent) {
      this.unicast = new U(this.parent,
        {pid: 'tman-wrtc-unicast',
          retry: this.options.retry})

      this.parent.on('open', (peerId) => !this.cache.has(peerId) &&
        this.unicast.emit('requestDescriptor',
          peerId, this.getOutviewId())
          .catch((e) => {
            console.error(e)
          }))
      this.unicast.on('tman-exchange', (id, message) => this._receive(id, message))
      this.unicast.on('requestDescriptor', (requester) => this.unicast.emit('giveDescriptor', requester,
        this.getInviewId(),
        this.options.descriptor)
        .catch((e) => {
          console.error(e)
        }))
      this.unicast.on('giveDescriptor', (peerId, descriptor) => {
        if (!this.cache.has(peerId)) {
          debug("[%s] get %s's descriptor from %s.",
            this.PID, peerId, this.parent.PID)
          this.cache.add(peerId, descriptor)
        }
      })
    }
  }

  /**
     * Joining a network.
     * @param {callback} sender Function that will be called each time an offer
     * arrives to this peer. It is the responsability of the caller to send
     * these offer (using sender) to the contact inside the network.
     * @returns {Promise} A promise that is resolved when the peer joins the
     * network; rejected after a timeout, or already connected state.
     */
  join (sender) {
    this.parent && this.parent.join(sender)
    let result = new Promise((resolve, reject) => {
      // #0 connectedness state check
      (this.state !== 'disconnected') &&
      reject(new ExJoin('join', 'Already connected.'))
      // #1 set timeout before reject
      let to = setTimeout(() => {
        reject(new ExJoin('join', 'Timeout exceeded.'))
      }, this.options.timeout)
      // #2 very first call, only done once
      this.once('open', (peerId) => {
        this.send(peerId,
          new MJoin(this.getInviewId(), this.options.descriptor),
          this.options.retry).then(() => {
          clearTimeout(to)
          this._start()
          resolve(peerId)
        }).catch(() => {
          reject(new ExJoin('join',
            'Could not notify contact.'))
        })
      })
    })
    // #3 start the very first connection of this peer
    this.connect(sender)
    return result
  }

  /**
     * @private Behavior when a peer just joined the network through this peer.
     * @param {string} peerId The identifier of the newcomer.
     * @param {MJoin} message The message sent by the newcomer.
     */
  _onJoin (peerId, message) {
    if (this.partialView.size > 0) {
      // #1 all neigbors -> peerId
      debug('[%s] %s ===> join %s; %s neigbhors ===> %s',
        this.PID, peerId, this.PEER, this.partialView.size, peerId)
      let neighbors = []
      this.partialView.forEach((epv) => {
        neighbors.push({peer: epv.peer, descriptor: epv.descriptor})
      })
      neighbors.push({peer: this.getInviewId(),
        descriptor: this.options.descriptor})
      this.send(peerId, new MSuggestBack(neighbors), this.options.retry).catch((e) => {
        console.error(e)
      })
    } else {
      // #2 this -> peerId
      debug('[%s] %s ===> join %s ===> %s',
        this.PID, peerId, this.PEER, peerId)
      this.cache.add(message.peer, message.descriptor)
      this.connect(null, peerId)
      this.send(peerId,
        new MSuggestBack([{peer: this.getInviewId(),
          descriptor: this.options.descriptor}]),
        this.options.retry).catch((e) => {
        console.error(e)
      })
      this._start()
    }
  }

  /**
     * @private Behavior when a WebRTC connection failed to establish properly
     * @param {string|null} peerId The identifier of the peer with which we
     * failed to create a connection. Null if it was yet to be known.
     */
  _onArcDown (peerId) {
    // Do nothing.
  }

  /**
     * @private Behavior when a peer seems down. This is called when, during an
     * exchange, this protocol chooses a down peer to exchange with.
     * @param {string} peerId The identifier of the peer that seems
     * left/crashed.
     */
  _onPeerDown (peerId) {
    this.partialView.removeAllNeighbor(peerId)
  }

  /**
     * @private Getter for the size of the sample to send to the remote chosen
     * peer.
     * @param {string[]} flatten Array of identifiers.
     * @return {number} The size of the sample to create.
     */
  _sampleSize (flatten) {
    return Math.min(flatten.length, 2)
  }

  /**
     * @private Getter for the maximum partial view size.
     */
  _partialViewSize () {
    return 4
  }

  /**
     * @private Get a sample of the partial view.
     * @param {object} neighbor Object containing .peer for identifier of the
     * peer to exchange with and .descriptor for the peer's descriptor.
     * @returns {object[]} Array of object containing .peer and .descriptor.
     */
  _getSample (neighbor) {
    // #1 create a flatten version of the partial view
    let flatten = []
    // #A extract the partial view of tman
    this.partialView.forEach((epv, peerId) => {
      epv.ages.forEach((age) => {
        !isEmpty(epv.descriptor) && flatten.push(peerId)
      })
    })
    // #B add random peers from parent
    this.parent && this.parent.partialView.forEach((ages, peerId) => {
      if (this.cache.has(peerId) && flatten.indexOf(peerId) < 0) {
        flatten.push(peerId)
      }
    })
    // #2 replace all peerId occurrences by ours
    flatten = flatten.map((peerId) => {
      let d = {descriptor: this.options.descriptor}
      if (peerId === neighbor.peer) {
        d.peer = this.getInviewId()
      } else {
        d.descriptor = (this.cache.has(peerId) && this.cache.get(peerId)) ||
        this.partialView.getDescriptor(peerId)
        d.peer = peerId
      }
      return d
    })
    // #3 process the size of the sample
    const sampleSize = this._sampleSize(flatten)
    // #4 rank according to PeerId
    flatten.sort(this.options.ranking(neighbor))
    return flatten.slice(0, sampleSize)
  }

  /**
     * @private Periodically called function that aims to distribute links among
     * peers according to the desired property.
     */
  _exchange () {
    // #0 if the partial view is empty --- could be due to disconnections,
    // failure, or _onExchange started with other peers --- skip this round.
    if (this.partialView.size <= 0 &&
      this.parent && this.parent.partialView.size <= 0) {
      return
    }
    this.partialView.increment()
    // #1 get the oldest peer in our partial view. If the partial view is
    // empty, fall back to parent's partial view.
    let chosen
    let sample
    let fromOurOwn = true
    if (this.partialView.size > 0) {
      // #A use our own partial view
      chosen = this.partialView.oldest
      sample = this._getSample(this.partialView.get(chosen))
      // #2 propose the sample to the chosen one
      chosen && this.send(chosen, new MSuggest(this.getInviewId(),
        this.options.descriptor,
        sample))
        .then(() => {
          // #A it seems the message has been sent correctly
          debug('[%s] %s ==> suggest %s ==> %s',
            this.PID, this.PEER, sample.length, chosen)
        })
        .catch((e) => {
          // #B the peer cannot be reached, he is supposedly dead
          debug('[%s] %s =X> suggest =X> %s',
            this.PID, this.PEER, chosen)
          fromOurOwn && this._onPeerDown(chosen)
        })
    } else if (this.parent && this.parent.partialView.size > 0) {
      // #B use the partial view of our parent
      let rnNeighbors = this.parent.getPeers()
      chosen = rnNeighbors[Math.floor(Math.random() * rnNeighbors.length)]
      sample = []
      let found = false
      fromOurOwn = false
      while (!found && rnNeighbors.length > 0) {
        const rn = Math.floor(Math.random() * rnNeighbors.length)
        if (this.cache.has(rnNeighbors[rn])) {
          found = true
          chosen = rnNeighbors[rn]
          sample = this._getSample({peer: chosen,
            descriptor: this.cache.get(chosen)
          })
        } else {
          rnNeighbors.splice(rn, 1)
        }
      }
      // #2 propose the sample to the chosen one
      console.log(chosen, sample)
      chosen && this.unicast.emit('tman-exchange', chosen, this.getInviewId(), new MSuggest(this.getInviewId(),
        this.options.descriptor,
        sample))
        .then(() => {
          // #A it seems the message has been sent correctly
          debug('[%s] %s ==> suggest %s ==> %s',
            this.PID, this.PEER, sample.length, chosen)
        })
        .catch((e) => {
          // #B the peer cannot be reached, he is supposedly dead
          debug('[%s] %s =X> suggest =X> %s',
            this.PID, this.PEER, chosen)
          fromOurOwn && this._onPeerDown(chosen)
        })
    }
  }

  /**
     * @private Behavior when this peer receives suggestions from another peer
     * running its periodic protocol.
     * @param {string} peerId The identifier of the initiating peer.
     * @param {ISuggest} message The message containing the suggestions.
     */
  _onExchange (peerId, message) {
    // #1 prepare the sample to send back
    const sample = this._getSample(message)
    this.send(peerId, new MSuggestBack(sample), this.options.retry)
      .then(() => {
        debug('[%s] %s ==> suggest back %s ==> %s',
          this.PID, this.PEER, sample.length, peerId)
      })
      .catch((e) => {
      })
    // #2 analyze the received sample and keep the elements if they are
    // better than the current ones
    this._onExchangeBack(peerId, message)
  }

  /**
     * @private Determines which peers are tokeep and which are toreject.
     * @param {string} peerId The identifier of the peer that sent the message.
     * @param {MSuggest|MSuggestBack} message The message received.
     */
  _onExchangeBack (peerId, message) {
    console.log(peerId, message)
    // #1 keep the best elements from the received sample
    let ranked = []
    // -- begin hot fix, remove duplicates
    const a = new Map()
    message.sample.forEach((s) => {
      a.set(s.peer, s)
    })
    this.partialView.forEach((epv, neighbor) => {
      if (!a.has(neighbor)) a.set(neighbor, epv)
    })
    ranked = [...a.values()]
    // -- end hot fix

    ranked.sort(this.options.ranking(this.options))
    // #2 require the elements
    let sliced = ranked.slice(0, this._partialViewSize())
    let request = []
    sliced.forEach((e) => {
      if (!this.partialView.has(e.peer)) {
        request.push(e.peer)
        this.cache.add(e.peer, e.descriptor)
      }
    })
    if (request.length > 0) {
      debug('[%s] %s wants to keep %s peers.',
        this.PID, this.PEER, request.length)
      this.send(peerId, new MRequire(request), this.options.retry)
        .catch((e) => {
          debug('[%s] %s =X> request descriptors %s =X> %s',
            this.PID, this.PEER, request.length, peerId)
        })
    }

    let rest = ranked.slice(this._partialViewSize(), ranked.length)
    if (rest.length > 0 && this.partialView.size > this._partialViewSize()) {
      rest.forEach((p) => {
        this.partialView.has(p.peer) && this.disconnect(p.peer)
      })
    }
  }

  /**
     * @private A peer requested to be connected with a set of neighbors.
     * @param {string} peerId The identifier of the peer that requests
     * connections.
     * @param {MRequire} message The request message.
     */
  _onRequire (peerId, message) {
    // #1 bridge the requester and the requested peers
    debug('[%s] %s requested to be bridged with %s peers',
      this.PID, peerId, message.peers.length)
    message.peers.forEach((neighbor) => {
      this.connect(peerId, neighbor)
    })
  }

  /**
     * @private Called each time this protocol receives a message. It processes
     * its own and the rest is redirected to the appropriate registered
     * protocol.
     * @param {string} peerId The identifier of the peer that sent the message.
     * @param {object|MExchange} message The message received.
     */
  _receive (peerId, message) {
    if (message.type && message.type === 'MSuggest') {
      this._onExchange(peerId, message)
    } else if (message.type && message.type === 'MSuggestBack') {
      this._onExchangeBack(peerId, message)
    } else if (message.type && message.type === 'MRequire') {
      this._onRequire(peerId, message)
    } else if (message.type && message.type === 'MJoin') {
      this._onJoin(peerId, message)
    } else if (message.type && message.type === 'MGiveDescriptor') {
      this.emit(this.PID + '-' + peerId, message)
    } else if (message.type && message.type === 'MRequestDescriptor') {
      this._onRequestDescriptor(peerId, message)
    } else {
      throw new ExMessage('_receive', message, 'unhandled')
    }
  }

  /**
     * @private Start periodic shuffling.
     */
  _start (delay = this.options.delta) {
    this.periodic = setInterval(() => this._exchange(), delay)
  }

  /**
     * @private Stop periodic shuffling.
     */
  _stop () {
    this.periodic && clearInterval(this.periodic)
  }

  /**
     * @private Get the descriptor from a remote Peer
     * @param {string} peerId The identifier of the peer.
     * @returns {Promise} Resolved when the descriptor has been added to our
     * cache; Rejected after a timeout or an error when sending the message.
     */
  _requestDescriptor (peerId) {
    return new Promise((resolve, reject) => {
      let to = null
      this.send(peerId, new MRequestDescriptor(), this.options.retry)
        .then(() => {
          to = setTimeout(() => {
            this.removeAllListeners(this.PID + '-' + peerId)
            reject(new Error('timeout')) // (TODO) throw exception
          }, this.options.descriptorTimeout)
        }).catch((e) => {
          reject(e)
        })

      this.once(this.PID + '-' + peerId, (message) => {
        clearTimeout(to)
        this.cache.add(message.peer, message.descriptor)
        resolve()
      })
    })
  }

  /**
     * @private Behavior when this peer receives a request of descriptor.
     * @param {string} peerId The identifier of the requester.
     * @param {MRequestDescriptor} message The message received.
     */
  _onRequestDescriptor (peerId, message) {
    this.send(peerId, new MGiveDescriptor(this.getInviewId(),
      this.options.descriptor),
    this.options.retry)
      .catch((e) => debug('[%s] %s =X> give descriptor =X> %s',
        this.PID, this.PEER, peerId))
  }

  /**
     * @private Behavior when a connection is ready to be added in the partial
     * view.
     * @param {string} peerId The identifier of the new neighbor.
     */
  _open (peerId) {
    debug('[%s] %s ===> %s', this.PID, this.PEER, peerId)
    // #1 Check if it is already in the view. We do not want duplicate. Such
    // cases happen due to concurrency. Check if the descriptor is still in
    // the cache.
    if (this.partialView.has(peerId)) {
      this.disconnect(peerId)
    } else if (!this.cache.has(peerId)) {
      this._requestDescriptor(peerId)
        .then(() => {
          // #2 re-check for it may have been added in the meantime
          if (this.partialView.has(peerId)) {
            this.disconnect(peerId)
          } else {
            // #3 rank peers to check which is to throw, if there is
            this._keep(peerId)
          }
        })
        .catch((e) => {
          this.disconnect(peerId)
        })
    } else {
      // #3 rank peers to check which is to throw, if there is
      this._keep(peerId)
    }
  }

  /**
     * @private Check if the new peer should be added to our partial view or
     * rejected
     * @param {string} peerId The identifier of the peer to check.
     */
  _keep (peerId) {
    let ranked = []
    this.partialView.forEach((epv, neighbor) => ranked.push(epv))
    ranked.push({
      peer: peerId,
      descriptor: this.cache.get(peerId)
    })
    ranked.sort(this.options.ranking(this.options))
    let sliced = ranked.slice(0, this._partialViewSize())
    ranked.splice(0, this._partialViewSize())
    // ranked becomes the rest: the lowest graded
    if (ranked.length === 0 || ranked.indexOf(peerId) < 0) {
      this.partialView.addNeighbor(peerId, this.cache.get(peerId))
    }
    ranked.forEach((neighbor) => this.disconnect(neighbor.peer))
  }

  /**
     * @private Behavior when a connection is closed.
     * @param {string} peerId The identifier of the removed arc.
     */
  _close (peerId) {
    debug('[%s] %s =†=> %s', this.PID, this.PEER, peerId)
    this.partialView.has(peerId) && this.partialView.removeOldest(peerId)
  }

  /**
     * @private Update the connectedness state of the peer.
     */
  _updateState () {
    const remember = this.state
    if (this.i.size > 0 && this.o.size > 0 && remember !== 'connected') {
      this.state = 'connected'
    } else if (((this.i.size > 0) && (this.o.size <= 0)) ||
      ((this.o.size > 0) && (this.i.size <= 0) &&
      remember !== 'partially connected')) {
      this.state = 'partially connected'
    } else if (this.i.size <= 0 && this.o.size <= 0 &&
      remember !== 'disconnected') {
      this.state = 'disconnected'
    // this._stop()
    }
    (remember !== this.state) && this.emit('statechange', this.state)
  }

  /**
     * Get k neighbors from the partial view. If k is not reached, it tries to
     * fill the gap with neighbors from the inview.  It is worth noting that
     * each peer controls its outview but not its inview. The more the neigbhors
     * from the outview the better.
     * @param {number} k The number of neighbors requested. If k is not defined,
     * it returns every known identifiers of the partial view.
     * @return {string[]} Array of identifiers.
     */
  getPeers (k) {
    let peers = []
    if (typeof k === 'undefined') {
      // #1 get all the partial view
      this.partialView.forEach((epv, peerId) => peers.push(peerId))
    } else {
      // #2 get random identifier from outview
      let out = []
      this.partialView.forEach((epv, peerId) => out.push(peerId))
      while (peers.length < k && out.length > 0) {
        let rn = Math.floor(Math.random() * out.length)
        peers.push(out[rn])
        out.splice(rn, 1)
      }
      // #3 get random identifier from the inview to fill k-entries
      let inView = []
      this.i.forEach((occ, peerId) => inView.push(peerId))
      while (peers.length < k && inView.length > 0) {
        let rn = Math.floor(Math.random() * inView.length)
        peers.push(inView[rn])
        inView.splice(rn, 1)
      }
    }
    debug('[%s] %s provides %s peers', this.PID, this.PEER, peers.length)
    return peers
  }
}

module.exports = TMan