From bdfac44e217d17559c3200b8aabc5137df790497 Mon Sep 17 00:00:00 2001 From: knotteye Date: Fri, 21 Aug 2020 06:50:40 -0500 Subject: [PATCH] Add the beginnings of the ability to cluster RTMP servers. It looks like there won't be a way to reliably play RTMP streams like this without digging into node-media-server code. For now that means clustering will have the drawback of being able to do DASH only. Still need to add a config option and reliable recording. --- src/cluster.ts | 272 ++++++++++++++++++++++++++++++++++++++++++++++++ src/database.ts | 9 +- 2 files changed, 280 insertions(+), 1 deletion(-) create mode 100644 src/cluster.ts diff --git a/src/cluster.ts b/src/cluster.ts new file mode 100644 index 0000000..5de1356 --- /dev/null +++ b/src/cluster.ts @@ -0,0 +1,272 @@ +import * as cluster from 'cluster'; +import * as net from 'net'; +import * as NodeRtmpSession from '../node_modules/node-media-server/node_rtmp_session'; +import * as logger from '../node_modules/node-media-server/node_core_logger'; +import * as dirty from "dirty"; +import { mkdir, fstat, access } from "fs"; +import * as strf from "strftime"; +import * as ctx from '../node_modules/node-media-server/node_core_ctx'; +import * as db from "./database"; +import {config} from "./config"; +import { messageRateLimitPresets } from 'dank-twitch-irc'; + +const {readValue, writeValue} = require('@mediafish/amf0'); + +const sleep = ms => new Promise(resolve => setTimeout(resolve, ms)); +const { exec, execFile } = require('child_process'); + +const keystore = dirty(); +const num_processes = require('os').cpus().length; +const workerMap = {}; + +if (cluster.isMaster) { + //master logic + + //store workers in here + var workers = []; + + // Helper function for spawning worker at index 'i'. + var spawn = function(i) { + workers[i] = cluster.fork(); + workers[i].on('message', (msg) => { + handleMsgMaster(msg, i) + }); + + // Restart worker on exit + workers[i].on('exit', function(code, signal) { + console.log('[RTMP Cluster MASTER] Respawning Worker', i); + spawn(i); + }); + }; + + // Spawn initial workers + for (var i = 0; i < num_processes; i++) { + spawn(i); + } + + var nextWorker: number = 0; + + //TODO assign incoming connections correctly + + var server = net.createServer({ pauseOnConnect: true }, function(connection) { + if(nextWorker >= workers.length) nextWorker = 0; + var worker = workers[nextWorker]; + /*connection.on('data', (chunk) => { + const buff = Buffer.from(chunk); + let offset, value, array = []; + while(true) { + [offset, value] = readValue(buff, offset); + console.log(JSON.stringify(value, null, 4)); + array.push(value); + } + });*/ + worker.send('rtmp-session:connection', connection); //send connection to worker + }).listen(config['rtmp']['port']); + + console.log('[RTMP Cluster MASTER] Master Ready.'); +} else { + + //worker logic + + //we need our own database pool since we can't share memory anyone else + db.initRTMPCluster(); + + const rtmpcfg = { + logType: 0, + rtmp: Object.assign({port: 1936}, config['rtmp']) + }; + + // creating the rtmp server + var serv = net.createServer((socket) => { + let session = new NodeRtmpSession(rtmpcfg, socket); + session.run(); + }).listen(1936); + logger.setLogType(0); + + // RTMP Server Logic + newRTMPListener('postPublish', (id, StreamPath, args) =>{ + console.log(`[RTMP Cluster WORKER ${process.pid}] Publish Hook for stream: ${id}`); + let session = getRTMPSession(id); + let app: string = StreamPath.split("/")[1]; + let key: string = StreamPath.split("/")[2]; + //disallow urls not formatted exactly right + if (StreamPath.split("/").length !== 3 || key.includes(' ')){ + console.log(`[RTMP Cluster WORKER ${process.pid}] Malformed URL, closing connection for stream: ${id}`); + session.reject(); + return false; + } + if(app !== config['media']['privateEndpoint']){ + //app isn't at public endpoint if we've reached this point + console.log(`[RTMP Cluster WORKER ${process.pid}] Wrong endpoint, rejecting stream: ${id}`); + session.reject(); + return false; + } + //if the url is formatted correctly and the user is streaming to the correct private endpoint + //grab the username from the database and redirect the stream there if the key is valid + //otherwise kill the session + db.query('select username,record_flag from users where stream_key='+db.raw.escape(key)+' limit 1').then(async (results) => { + if(results[0]){ + //transcode to mpd after making sure directory exists + keystore[results[0].username] = key; + mkdir(config['http']['directory']+'/'+config['media']['publicEndpoint']+'/'+results[0].username, { recursive : true }, ()=>{;}); + while(true){ + if(session.audioCodec !== 0 && session.videoCodec !== 0){ + transCommand(results[0].username, key).then((r) => { + execFile(config['media']['ffmpeg'], r, {maxBuffer: Infinity}, (err, stdout, stderr) => { + /*console.log(err); + console.log(stdout); + console.log(stderr);*/ + }); + }); + break; + } + await sleep(300); + } + if(results[0].record_flag && config['media']['record']){ + console.log(`[RTMP Cluster WORKER ${process.pid}] Initiating recording for stream: ${id}`); + mkdir(config['http']['directory']+'/'+config['media']['publicEndpoint']+'/'+results[0].username, { recursive : true }, (err) => { + if (err) throw err; + execFile(config['media']['ffmpeg'], ['-loglevel', 'fatal', '-i', 'rtmp://127.0.0.1:'+config['rtmp']['port']+'/'+config['media']['privateEndpoint']+'/'+key, '-vcodec', 'copy', '-acodec', 'copy', config['http']['directory']+'/'+config['media']['publicEndpoint']+'/'+results[0].username+'/'+strf('%d%b%Y-%H%M')+'.mp4'], { + detached : true, + stdio : 'inherit', + maxBuffer: Infinity + }).unref(); + //spawn an ffmpeg process to record the stream, then detach it completely + //ffmpeg can then (probably) finalize the recording if satyr crashes mid-stream + }); + } + else { + console.log(`[RTMP Cluster WORKER ${process.pid}] Skipping recording for stream: ${id}`); + } + db.query('update user_meta set live=true where username=\''+results[0].username+'\' limit 1'); + console.log(`[RTMP Cluster WORKER ${process.pid}] Stream key ok for stream: ${id}`); + //notify master process that we're handling the stream for this user + process.send({type: 'handle-publish', name:results[0].username}); + } + else{ + console.log(`[RTMP Cluster WORKER ${process.pid}] Invalid stream key for stream: ${id}`); + session.reject(); + } + }); + }); + + newRTMPListener('donePublish', (id, StreamPath, args) => { + let app: string = StreamPath.split("/")[1]; + let key: string = StreamPath.split("/")[2]; + if(app === config['media']['privateEndpoint']) { + db.query('update user_meta,users set user_meta.live=false where users.stream_key='+db.raw.escape(key)); + db.query('select username from users where stream_key='+db.raw.escape(key)+' limit 1').then(async (results) => { + if(results[0]) keystore.rm(results[0].username); + //notify master process that we're no longer handling the stream for this user + process.send({type: 'handle-publish-done', name:results[0].username}); + }); + } + }); + + newRTMPListener('prePlay', (id, StreamPath, args) => { + let session = getRTMPSession(id); + let app: string = StreamPath.split("/")[1]; + let key: string = StreamPath.split("/")[2]; + //correctly formatted urls again + if (StreamPath.split("/").length !== 3){ + console.log("[NodeMediaServer] Malformed URL, closing connection for stream:",id); + session.reject(); + return false; + } + //localhost can play from whatever endpoint + //other clients must use private endpoint + if(app !== config['media']['publicEndpoint'] && !session.isLocal) { + console.log("[NodeMediaServer] Non-local Play from private endpoint, rejecting client:",id); + session.reject(); + return false; + } + //rewrite playpath to private endpoint serverside + //(hopefully) + if(app === config['media']['publicEndpoint']) { + if(keystore[key]){ + session.playStreamPath = '/'+config['media']['privateEndpoint']+'/'+keystore[key]; + return true; + } + //here the client is asking for a valid stream that we don't have + //so we are going to ask the master process for it + else session.reject(); + } + }); + + //recieve messages from master + process.on('message', function(message, connection) { + if (message === 'rtmp-session:connection') { + // Emulate a connection event on the server by emitting the + // event with the connection the master sent us. + serv.emit('connection', connection); + connection.resume(); + return; + } + if(message['type'] === 'stream-request:h') { + if(!message['available']) + getRTMPSession(message['id']).reject(); + } + }); + console.log(`[RTMP Cluster WORKER ${process.pid}] Worker Ready.`); +} + +function newRTMPListener(eventName, listener) { + ctx.nodeEvent.on(eventName, listener); +} + +function getRTMPSession(id) { + return ctx.sessions.get(id); +} + +async function transCommand(user: string, key: string): Promise{ + let args: string[] = ['-loglevel', 'fatal', '-y']; + if(config['transcode']['inputflags'] !== null && config['transcode']['inputflags'] !== "") args = args.concat(config['transcode']['inputflags'].split(" ")); + args = args.concat(['-i', 'rtmp://127.0.0.1:'+config['rtmp']['port']+'/'+config['media']['privateEndpoint']+'/'+key, '-movflags', '+faststart']); + if(config['transcode']['adaptive']===true && config['transcode']['variants'] > 1) { + for(let i=0;i 51 ? 51 : Math.floor(18 + (i * 7)); + args = args.concat(['-crf:'+i, ''+crf]); + } + for(let i=1;i= workers.length) nextWorker = 0; + } + if(msg['type'] === 'handle-publish-done'){ + workerMap[msg['name']] = undefined; + } + if(msg['type'] === 'stream-request:h'){ + if(workerMap[msg['key']] !== undefined){ + workers[index].send({type: 'stream-request:h', id: msg['id'], key: msg['key'], available: true}); + } + else { + workers[index].send({type: 'stream-request:h', id: msg['id'], key: msg['key'], available: false}); + } + } +} \ No newline at end of file diff --git a/src/database.ts b/src/database.ts index 1ee8a17..0d733e7 100644 --- a/src/database.ts +++ b/src/database.ts @@ -12,6 +12,13 @@ function init (){ console.log('Connected to database.'); } +function initRTMPCluster(){ + let cfg = config['database']; + cfg['connectionLimit'] = Math.floor(config['database']['connectionLimit'] / require('os').cpus().length); + raw = mysql.createPool(cfg); + cryptoconfig = config['crypto']; +} + async function addUser(name: string, password: string){ //does not respect registration setting in config if(password === '') return false; @@ -63,4 +70,4 @@ async function hash(pwd){ return await bcrypt.hash(pwd, cryptoconfig['saltRounds']); } -export { query, raw, init, addUser, rmUser, validatePassword, hash, genKey }; \ No newline at end of file +export { query, raw, init, addUser, rmUser, validatePassword, hash, genKey, initRTMPCluster }; \ No newline at end of file