src/network/communication/broadcast/broadcast.js
/*
This broadcast implementation is clearly inspired from https://github.com/Chat-Wane/CausalBroadcastDefinition
This is a causal broadcast customizable, if you want to specifiy
*/
'use strict'
const AbstractBroadcast = require('./../abstract/abstract-broadcast.js')
const VVwE = require('version-vector-with-exceptions') // Version-Vector With Exceptions
const messages = require('./messages.js')
const uuid = require('uuid/v4')
const sortedIndexBy = require('lodash.sortedindexby')
const debug = (require('debug'))('foglet-core:broadcast')
/**
* Format the IDs of messages in string format
* @param {Obbject} message - The message to format
* @return {string} The formatted message's id in string format
*/
function formatID (message) {
return `e=${message.id.e}&c=${message.id.c}`
}
/**
* Broadcast represent the base implementation of a broadcast protocol for the foglet library.
* Based on the CausalBrodacastDefinition Package: see: https://github.com/Chat-Wane/CausalBroadcastDefinition
* @extends AbstractBroadcast
* @author Arnaud Grall (Folkvir)
*/
class Broadcast extends AbstractBroadcast {
/**
* Constructor
* @param {AbstractNetwork} source - The source RPS/overlay
* @param {string} protocol - The name of the broadcast protocol
*/
constructor (source, protocol) {
super(source, protocol)
if (source && protocol) {
this.options = {
id: source._options.peer,
delta: 1000 * 30
}
// the id is your id, base on the .PEER id in the RPS options
this._causality = new VVwE(this.options.id)
// buffer of received messages
this._buffer = []
// buffer of anti-entropy messages (chunkified because of large size)
this._bufferAntiEntropy = messages.MAntiEntropyResponse('init')
} else {
return new Error('Not enough parameters', 'fbroadcast.js')
}
}
/**
* Send a message to all neighbours
* @private
* @param {Object} message - The message to send
* @return {void}
*/
_sendAll (message) {
const n = this._source.getNeighbours(Infinity)
if (n.length > 0) {
n.forEach(p => {
this._unicast.send(p, message).catch(e => {
debug(e)
})
})
}
}
/**
* Send a message in broadcast
* @param {Object} message - The message to send
* @param {Object} [id] {e: <stringId>, c: <Integer>} this uniquely represents the id of the operation
* @param {Object} [isReady] {e: <stringId>, c: <Integer>} this uniquely represents the id of the operation that we must wait before delivering the message
* @return {boolean}
*/
send (message, id, isReady = undefined) {
const a = id || this._causality.increment()
const broadcastMessage = messages.BroadcastMessage(this._protocol, a, isReady, message)
// #2 register the message in the structure
this._causality.incrementFrom(a)
// #3 send the message to the neighborhood
this._sendAll(broadcastMessage)
return a
}
/**
* We started Antientropy mechanism in order to retreive old missed files
*/
startAntiEntropy (delta = this.options.delta) {
this._intervalAntiEntropy = setInterval(() => {
this._source.getNeighbours().forEach(peer => this._unicast.send(peer, messages.MAntiEntropyRequest(this._causality)))
}, delta)
this.on('antiEntropy', (id, messageCausality, ourCausality) => this._defaultBehaviorAntiEntropy(id, messageCausality, ourCausality))
}
/**
* This callback depends on the type of the applications, this is the default behavior when you receive old missed files
*/
_defaultBehaviorAntiEntropy (id, messageCausality, ourCausality) {
debug('(Warning) You should modify this, AntiEntropy default behavior: ', id, messageCausality, ourCausality)
}
/**
* Clear the AntiEntropy mechanism
*/
clearAntiEntropy () {
if (this._intervalAntiEntropy) clearInterval(this._intervalAntiEntropy)
}
/**
* Send entropy response
* @deprecated
* @param {[type]} origin [description]
* @param {[type]} causalityAtReceipt [description]
* @param {[type]} elements [description]
* @return {[type]} [description]
*/
sendAntiEntropyResponse (origin, causalityAtReceipt, elements) {
let id = uuid()
// #1 metadata of the antientropy response
let sent = this._unicast.send(origin, messages.MAntiEntropyResponse(id, causalityAtReceipt, elements.length))
let i = 0
while (sent && i < elements.length) {
sent = this._unicast.send(origin, messages.MAntiEntropyResponse(id, null, elements.length, elements[i]))
++i
}
}
/**
* Handler executed when a message is recevied
* @param {string} id - Message issuer's ID
* @param {Object} message - The message received
* @return {void}
*/
_receive (id, message) {
// if not present, add the issuer of the message in the message
if (!('issuer' in message)) { message.issuer = id }
switch (message.type) {
case 'MAntiEntropyRequest': {
debug(id, message)
this.emit('antiEntropy', id, message.causality, this._causality.clone())
break
}
case 'MAntiEntropyResponse': {
// #A replace the buffered message
if (this._bufferAntiEntropy.id !== message.id) {
this._bufferAntiEntropy = message
}
// #B add the new element to the buffer
if (message.element) {
this._bufferAntiEntropy.elements.push(message.element)
}
// #C add causality metadata
if (message.causality) {
this._bufferAntiEntropy.causality = message.causality
}
// #D the buffered message is fully arrived, deliver
if (this._bufferAntiEntropy.elements.length ===
this._bufferAntiEntropy.nbElements) {
// #1 considere each message in the response independantly
for (let i = 0; i < this._bufferAntiEntropy.elements.length; ++i) {
let element = this._bufferAntiEntropy.elements[i]
// #2 only check if the message has not been received yet
if (!this._shouldStopPropagation(element)) {
this._causality.incrementFrom(element.id)
this.emit('receive', message.issuer, element.payload)
}
}
// #3 merge causality structures
this._causality.merge(this._bufferAntiEntropy.causality)
}
break
}
default: {
if (!this._shouldStopPropagation(message)) {
// #1 register the operation
// maintain `this._buffer` sorted to search in O(log n)
const index = sortedIndexBy(this._buffer, message, formatID)
this._buffer.splice(index, 0, message)
// #2 deliver
this._reviewBuffer()
// #3 rebroadcast
this._sendAll(message)
}
break
}
}
}
/**
* Check if a message should be propagated or not
* @private
* @param {Object} message - The message to check
* @return {boolean} True if the message should not be propagated, False if it should be.
*/
_shouldStopPropagation (message) {
return this._causality.isLower(message.id) || (this._findInBuffer(formatID(message)) >= 0)
}
/**
* Try to find the index of a message in the internal buffer
* @private
* @param {string} id - Message's ID
* @return {int} The index of the message in the buffer, or -1 if not found
*/
_findInBuffer (id) {
// use a binary search algorithm since `this._buffer` is sorted by IDs
let minIndex = 0
let maxIndex = this._buffer.length - 1
let currentIndex, currentElement
while (minIndex <= maxIndex) {
currentIndex = (minIndex + maxIndex) / 2 | 0
currentElement = formatID(this._buffer[currentIndex])
if (currentElement < id) {
minIndex = currentIndex + 1
} else if (currentElement > id) {
maxIndex = currentIndex - 1
} else {
return currentIndex
}
}
return -1
}
/**
* Scan internal buffer to deliver waiting messages
* @private
* @return {void}
*/
_reviewBuffer () {
let message
let found = false
for (let index = this._buffer.length - 1; index >= 0; --index) {
message = this._buffer[index]
if (this._causality.isLower(message.id)) {
this._buffer.splice(index, 1)
} else {
// console.log(message, this._causality.isReady(message.isReady), this._causality);
if (this._causality.isReady(message.isReady)) {
found = true
this._causality.incrementFrom(message.id)
this._buffer.splice(index, 1)
this.emit('receive', message.issuer, message.payload)
}
}
}
if (found) {
this._reviewBuffer()
}
}
}
module.exports = Broadcast