diff --git a/install/config.example.yml b/install/config.example.yml index be1a5a0..8925c03 100644 --- a/install/config.example.yml +++ b/install/config.example.yml @@ -9,6 +9,9 @@ media: ffmpeg: '' rtmp: + # enable cluster mode this will pretty much entirely + # break the ability to play rtmp for clients + cluster: false port: 1935 http: diff --git a/package-lock.json b/package-lock.json index e6e1e5a..c5e1872 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "satyr", - "version": "0.9.4", + "version": "0.10.0", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -950,6 +950,11 @@ "integrity": "sha512-41Cifkg6e8TylSpdtTpeLVMqvSBEVzTttHvERD741+pnZ8ANv0004MRL43QKPDlK9cGvNp6NZWZUBlbGXYxxng==", "optional": true }, + "is-port-available": { + "version": "0.1.5", + "resolved": "https://registry.npmjs.org/is-port-available/-/is-port-available-0.1.5.tgz", + "integrity": "sha512-/r7UZAQtfgDFdhxzM71jG0mkC4oSRA513cImMILdRe/+UOIe0Se/D/Z7XCua4AFg5k4Zt3ALMGaC1W3FzlrR2w==" + }, "isarray": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/isarray/-/isarray-1.0.0.tgz", @@ -963,11 +968,6 @@ "asn1.js": "^5.2.0" } }, - "jwt-decode": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/jwt-decode/-/jwt-decode-3.0.0.tgz", - "integrity": "sha512-RBQv2MTm3FNKQkdzhEyQwh5MbdNgMa+FyIJIK5RMWEn6hRgRHr7j55cRxGhRe6vGJDElyi6f6u/yfkP7AoXddA==" - }, "lodash": { "version": "4.17.20", "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.20.tgz", diff --git a/package.json b/package.json index e921a3e..bee4cbf 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "express": "^4.17.1", "flags": "^0.1.3", "irc": "^0.5.2", + "is-port-available": "^0.1.5", "jose": "^1.15.1", "mysql": "^2.17.1", "node-media-server": "^2.2.4", diff --git a/src/cluster.ts b/src/cluster.ts new file mode 100644 index 0000000..4a5181e --- /dev/null +++ b/src/cluster.ts @@ -0,0 +1,289 @@ +import * as cluster from 'cluster'; +import * as net from 'net'; +import * as NodeRtmpSession from '../node_modules/node-media-server/node_rtmp_session'; +import * as logger from '../node_modules/node-media-server/node_core_logger'; +import * as dirty from "dirty"; +import { mkdir, fstat, access } from "fs"; +import * as strf from "strftime"; +import * as ctx from '../node_modules/node-media-server/node_core_ctx'; +import * as db from "./database"; +import {config} from "./config"; +import * as isPortAvailable from "is-port-available"; + +const sleep = ms => new Promise(resolve => setTimeout(resolve, ms)); +const { exec, execFile } = require('child_process'); + +const keystore = dirty(); +const num_processes = require('os').cpus().length; +const workerMap = {}; + +if (cluster.isMaster) { + //master logic + + //store workers in here + var workers = []; + + // Helper function for spawning worker at index 'i'. + var spawn = function(i) { + workers[i] = cluster.fork(); + workers[i].on('message', (msg) => { + handleMsgMaster(msg, i) + }); + + // Restart worker on exit + workers[i].on('exit', function(code, signal) { + console.log('[RTMP Cluster MASTER] Respawning Worker', i); + spawn(i); + }); + }; + + // Spawn initial workers + for (var i = 0; i < num_processes; i++) { + spawn(i); + } + + var nextWorker: number = 0; + + //TODO assign incoming connections correctly + + var server = net.createServer({ pauseOnConnect: true }, function(connection) { + if(nextWorker >= workers.length) nextWorker = 0; + var worker = workers[nextWorker]; + worker.send('rtmp-session:connection', connection); //send connection to worker + }).listen(config['rtmp']['port']); + + console.log('[RTMP Cluster MASTER] Master Ready.'); +} else { + + //worker logic + + //we need our own database pool since we can't share memory anyone else + db.initRTMPCluster(); + + const rtmpcfg = { + logType: 0, + rtmp: Object.assign({port: 1936}, config['rtmp']) + }; + + //find a unique port to listen on + getPort().then((wPort) => { + + // creating the rtmp server + var serv = net.createServer((socket) => { + let session = new NodeRtmpSession(rtmpcfg, socket); + session.run(); + }).listen(wPort); + logger.setLogType(0); + + // RTMP Server Logic + newRTMPListener('postPublish', (id, StreamPath, args) =>{ + console.log(`[RTMP Cluster WORKER ${process.pid}] Publish Hook for stream: ${id}`); + let session = getRTMPSession(id); + let app: string = StreamPath.split("/")[1]; + let key: string = StreamPath.split("/")[2]; + //disallow urls not formatted exactly right + if (StreamPath.split("/").length !== 3 || key.includes(' ')){ + console.log(`[RTMP Cluster WORKER ${process.pid}] Malformed URL, closing connection for stream: ${id}`); + session.reject(); + return false; + } + if(app !== config['media']['privateEndpoint']){ + //app isn't at public endpoint if we've reached this point + console.log(`[RTMP Cluster WORKER ${process.pid}] Wrong endpoint, rejecting stream: ${id}`); + session.reject(); + return false; + } + //if the url is formatted correctly and the user is streaming to the correct private endpoint + //grab the username from the database and redirect the stream there if the key is valid + //otherwise kill the session + db.query('select username,record_flag from users where stream_key='+db.raw.escape(key)+' limit 1').then(async (results) => { + if(results[0]){ + //transcode to mpd after making sure directory exists + keystore[results[0].username] = key; + mkdir(config['http']['directory']+'/'+config['media']['publicEndpoint']+'/'+results[0].username, { recursive : true }, ()=>{;}); + while(true){ + if(session.audioCodec !== 0 && session.videoCodec !== 0){ + transCommand(results[0].username, key, wPort).then((r) => { + execFile(config['media']['ffmpeg'], r, {maxBuffer: Infinity}, (err, stdout, stderr) => { + /*console.log(err); + console.log(stdout); + console.log(stderr);*/ + }); + }); + break; + } + await sleep(300); + } + if(results[0].record_flag && config['media']['record']){ + console.log(`[RTMP Cluster WORKER ${process.pid}] Initiating recording for stream: ${id}`); + mkdir(config['http']['directory']+'/'+config['media']['publicEndpoint']+'/'+results[0].username, { recursive : true }, (err) => { + if (err) throw err; + execFile(config['media']['ffmpeg'], ['-loglevel', 'fatal', '-i', 'rtmp://127.0.0.1:'+wPort+'/'+config['media']['privateEndpoint']+'/'+key, '-vcodec', 'copy', '-acodec', 'copy', config['http']['directory']+'/'+config['media']['publicEndpoint']+'/'+results[0].username+'/'+strf('%d%b%Y-%H%M')+'.mp4'], { + detached : true, + stdio : 'inherit', + maxBuffer: Infinity + }).unref(); + //spawn an ffmpeg process to record the stream, then detach it completely + //ffmpeg can then (probably) finalize the recording if satyr crashes mid-stream + }); + } + else { + console.log(`[RTMP Cluster WORKER ${process.pid}] Skipping recording for stream: ${id}`); + } + db.query('update user_meta set live=true where username=\''+results[0].username+'\' limit 1'); + db.query('SELECT twitch_key,enabled from twitch_mirror where username='+db.raw.escape(results[0].username)+' limit 1').then(async (tm) => { + if(!tm[0]['enabled'] || !config['twitch_mirror']['enabled'] || !config['twitch_mirror']['ingest']) return; + console.log('[NodeMediaServer] Mirroring to twitch for stream:',id) + execFile(config['media']['ffmpeg'], ['-loglevel', 'fatal', '-i', 'rtmp://127.0.0.1:'+wPort+'/'+config['media']['privateEndpoint']+'/'+key, '-vcodec', 'copy', '-acodec', 'copy', '-f', 'flv', config['twitch_mirror']['ingest']+tm[0]['twitch_key']], { + detached: true, + stdio : 'inherit', + maxBuffer: Infinity + }).unref(); + }); + console.log('[NodeMediaServer] Stream key ok for stream:',id); + console.log(`[RTMP Cluster WORKER ${process.pid}] Stream key ok for stream: ${id}`); + //notify master process that we're handling the stream for this user + process.send({type: 'handle-publish', name:results[0].username}); + } + else{ + console.log(`[RTMP Cluster WORKER ${process.pid}] Invalid stream key for stream: ${id}`); + session.reject(); + } + }); + }); + + newRTMPListener('donePublish', (id, StreamPath, args) => { + let app: string = StreamPath.split("/")[1]; + let key: string = StreamPath.split("/")[2]; + if(app === config['media']['privateEndpoint']) { + db.query('update user_meta,users set user_meta.live=false where users.stream_key='+db.raw.escape(key)); + db.query('select username from users where stream_key='+db.raw.escape(key)+' limit 1').then(async (results) => { + if(results[0]) keystore.rm(results[0].username); + //notify master process that we're no longer handling the stream for this user + process.send({type: 'handle-publish-done', name:results[0].username}); + }); + } + }); + + newRTMPListener('prePlay', (id, StreamPath, args) => { + let session = getRTMPSession(id); + let app: string = StreamPath.split("/")[1]; + let key: string = StreamPath.split("/")[2]; + //correctly formatted urls again + if (StreamPath.split("/").length !== 3){ + console.log("[NodeMediaServer] Malformed URL, closing connection for stream:",id); + session.reject(); + return false; + } + //localhost can play from whatever endpoint + //other clients must use private endpoint + if(app !== config['media']['publicEndpoint'] && !session.isLocal) { + console.log("[NodeMediaServer] Non-local Play from private endpoint, rejecting client:",id); + session.reject(); + return false; + } + //rewrite playpath to private endpoint serverside + //(hopefully) + if(app === config['media']['publicEndpoint']) { + if(keystore[key]){ + session.playStreamPath = '/'+config['media']['privateEndpoint']+'/'+keystore[key]; + return true; + } + //here the client is asking for a valid stream that we don't have + //so we are going to ask the master process for it + else session.reject(); + } + }); + + //recieve messages from master + process.on('message', function(message, connection) { + if (message === 'rtmp-session:connection') { + // Emulate a connection event on the server by emitting the + // event with the connection the master sent us. + serv.emit('connection', connection); + connection.resume(); + return; + } + if(message['type'] === 'stream-request:h') { + if(!message['available']) + getRTMPSession(message['id']).reject(); + } + }); + console.log(`[RTMP Cluster WORKER ${process.pid}] Worker Ready.`); + + }); +} + +function newRTMPListener(eventName, listener) { + ctx.nodeEvent.on(eventName, listener); +} + +function getRTMPSession(id) { + return ctx.sessions.get(id); +} + +async function getPort(): Promise{ + let port = 1936+process.pid; + while(true){ + let i=0; + if(await isPortAvailable(port+i)){ + port += i; + break; + } + i++; + } + return port; +} + +async function transCommand(user: string, key: string, wPort): Promise{ + let args: string[] = ['-loglevel', 'fatal', '-y']; + if(config['transcode']['inputflags'] !== null && config['transcode']['inputflags'] !== "") args = args.concat(config['transcode']['inputflags'].split(" ")); + args = args.concat(['-i', 'rtmp://127.0.0.1:'+wPort+'/'+config['media']['privateEndpoint']+'/'+key, '-movflags', '+faststart']); + if(config['transcode']['adaptive']===true && config['transcode']['variants'] > 1) { + for(let i=0;i 51 ? 51 : Math.floor(18 + (i * 7)); + args = args.concat(['-crf:'+i, ''+crf]); + } + for(let i=1;i= workers.length) nextWorker = 0; + } + if(msg['type'] === 'handle-publish-done'){ + workerMap[msg['name']] = undefined; + } + if(msg['type'] === 'stream-request:h'){ + if(workerMap[msg['key']] !== undefined){ + workers[index].send({type: 'stream-request:h', id: msg['id'], key: msg['key'], available: true}); + } + else { + workers[index].send({type: 'stream-request:h', id: msg['id'], key: msg['key'], available: false}); + } + } +} \ No newline at end of file diff --git a/src/database.ts b/src/database.ts index 1ee8a17..0d733e7 100644 --- a/src/database.ts +++ b/src/database.ts @@ -12,6 +12,13 @@ function init (){ console.log('Connected to database.'); } +function initRTMPCluster(){ + let cfg = config['database']; + cfg['connectionLimit'] = Math.floor(config['database']['connectionLimit'] / require('os').cpus().length); + raw = mysql.createPool(cfg); + cryptoconfig = config['crypto']; +} + async function addUser(name: string, password: string){ //does not respect registration setting in config if(password === '') return false; @@ -63,4 +70,4 @@ async function hash(pwd){ return await bcrypt.hash(pwd, cryptoconfig['saltRounds']); } -export { query, raw, init, addUser, rmUser, validatePassword, hash, genKey }; \ No newline at end of file +export { query, raw, init, addUser, rmUser, validatePassword, hash, genKey, initRTMPCluster }; \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index 2573948..d2c3f15 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,12 +4,13 @@ import {init as initHTTP} from "./http"; import {init as clean} from "./cleanup"; import {init as initChat} from "./chat"; import { config } from "./config"; +import { execFile } from "child_process"; async function run() { await initDB(); await clean(); await initHTTP(); - await initRTMP(); + config['rtmp']['cluster'] ? execFile(process.cwd()+'/node_modules/.bin/ts-node' [process.cwd()+'src/cluster.ts']) : await initRTMP(); await initChat(); console.log(`Satyr v${config['satyr']['version']} ready`); }