src/utils/media.js
const CommunicationProtocol = require('../network/communication/abstract/communication-protocol')
const Communication = require('../network/communication/communication')
const debug = (require('debug'))('foglet-core:media')
const uuid = require('uuid/v4')
const lmerge = require('lodash.merge')
const MediaRecorderStream = require('media-recorder-stream')
const MediaSource = require('mediasource')
const Stream = require('stream')
class ReadableFromStream extends Stream.Readable {
constructor (source, parent, options) {
super(options)
this.source = source
this.parent = parent
this.objectMode = true
this.count = 0
let stack = ''
this.source.on('data', (data) => {
if (this.count === 0) {
if (!this.parent._activeStream.has(data.id)) {
debug('Setting options for %s', data.id, data)
this.parent._activeStream.set(data.id, {source: this, options: data})
}
this.parent.emit('receive', data.id)
this.count++
} else {
if (data.type === 'full') {
this.push(data.payload)
this.count++
} else if (data.type === 'end') {
stack += data.payload
this.push(new Uint8Array(JSON.parse(stack).data))
this.count++
stack = ''
} else {
stack += data.payload
}
}
})
this.source.on('end', () => {
this.end()
})
}
_read (size) {}
}
/**
* Media Stream Manager
* If using Video/audio stream for all users: use the broadcast primitive (Data Channel)
* If using Video/audio stream for only one user, use the unicast primitive (Streaming)
* But pay attention that using unicast method, when a shuffling occur the connection might diseapear.
* For this usage, create an overlay network with only this peer connected to you.
* Or shut down the shuffle mechanism but this is not recommended.
* @extends CommunicationProtocol
*/
class Media extends CommunicationProtocol {
constructor (source, protocol, options) {
super(source, `foglet-media-internal-${protocol}`)
this.options = {
chunkSize: 16 * 1000 // pay attention to the maximum, or it will not work. see (http://viblast.com/blog/2015/2/5/webrtc-data-channel-message-size/)
}
this._activeMedia = new Map()
this._activeStream = new Map()
this._communication = new Communication(source, `foglet-media-internal-${protocol}`)
this.NI = this._source.rps.NI
this.NO = this._source.rps.NO
this.i = this._source.rps.i
this.o = this._source.rps.o
this._source.rps.on('stream', (id, stream) => {
this._receive(id, stream)
})
this._communication.onStreamBroadcast((id, stream) => {
debug('Receive a media stream: ', id, stream)
this._reconstruct(stream)
})
}
get pid () {
return this._source.rps._pid()
}
/**
* Send a message to only one neighbor...
* @param {Object} id - The id to send the stream (media) to
* @param {Object} media - The stream to send
* @return {boolean}
*/
sendUnicast (id, media) {
if (!media.id) media.id = uuid()
if (!this._activeMedia.has(media.id)) {
this._activeMedia.set(media.id, media)
this._setListeners(media)
}
return this._source.rps.stream(id, media)
}
/**
* Send a MediaStream using our broadcast primitives using Data Channel.
* @param {Object} media - The stream to send
* @param {Object} options - MediaRecorder options (see MediaRecorder API)
* @return {boolean}
*/
sendBroadcastOverDataChannel (media, options = {}) {
// https://developer.mozilla.org/en-US/docs/Web/API/MediaRecorder/MediaRecorder
options = lmerge({
mimeType: 'video/webm; codecs="vp8"', // You MUST set the MIME type
interval: 100, // A short interval is recommended to keep buffer sizes low
bitsPerSecond: 128 * 1024
}, options)
if (!media.id) media.id = uuid()
if (!this._activeMedia.has(media.id)) {
this._activeMedia.set(media.id, media)
this._setListeners(media)
}
console.log(media, options)
let ms
try {
ms = new MediaRecorderStream(media, options)
} catch (e) {
throw new Error('Error when recording the media: ', e)
}
const stream = this._communication.streamBroadcast()
options.id = media.id
stream.write(options)
ms.on('data', (data) => {
const chunkified = this.chunkify(JSON.stringify(data))
if (chunkified.length === 0) {
stream.write({
type: 'full',
id: 0,
payload: chunkified[0]
})
} else {
for (let i = 0; i < chunkified.length; i++) {
if (i === chunkified.length - 1) {
stream.write({
type: 'end',
id: i,
payload: chunkified[i]
})
} else {
stream.write({
type: 'chunk',
id: i,
payload: chunkified[i]
})
}
}
}
})
ms.on('end', () => {
stream.end()
})
}
_reconstruct (stream, options = null) {
const readable = new ReadableFromStream(stream, this)
readable.on('error', (err) => {
console.error(err)
})
}
getStreamMedia (id, elem) {
if (!this._activeStream.has(id)) return undefined
const wrapper = new MediaSource(elem)
const writable = wrapper.createWriteStream(this._activeStream.get(id).options.mimeType)
elem.addEventListener('error', function () {
// listen for errors on the video/audio element directly
var errorCode = elem.error
var detailedError = wrapper.detailedError
console.error(errorCode, detailedError)
// wrapper.detailedError will often have a more detailed error message
})
writable.on('error', function (err) {
// listening to the stream 'error' event is optional
console.error(err)
})
this._activeStream.get(id).source.pipe(writable)
}
/**
* Handler executed when a message is recevied
* @param {string} id - Message issuer's ID
* @param {Object} stream - The stream received
* @return {void}
*/
_receive (id, stream) {
debug('Receive a media stream: ', id, stream)
if (!stream.id) stream.id = uuid()
if (!this._activeMedia.has(stream.id)) {
this._activeMedia.set(stream.id, {peer: id, stream})
this._setListeners(stream)
}
this.emit('receive', id, stream)
}
_setListeners (media) {
media.onactive = () => {
console.log('Media %s is active...', media.id)
}
media.oninactive = () => {
console.log('Media %s is inactive... (Disconnection or a Shuffling occured.)', media.id)
// this._sendRequest(media.id)
}
media.onended = () => {
console.log('Media %s is finished...', media.id)
}
}
/**
* Chunk a string into n message of size 'chunkSize'
* @param {string} string
* @param {Number=this.options.chunkSize} chunkSize
*/
chunkify (string, chunkSize = this.options.chunkSize) {
// https://stackoverflow.com/questions/7033639/split-large-string-in-n-size-chunks-in-javascript
return string.match(new RegExp('.{1,' + chunkSize + '}', 'g'))
}
}
module.exports = Media