@ -1,10 +1,13 @@
import * as NodeMediaServer from "node-media-server" ;
import * as dirty from "dirty" ;
import { mkdir , fstat , access } from "fs" ;
import * as strf from "strftime" ;
import * as db from "./database" ;
const sleep = ms = > new Promise ( resolve = > setTimeout ( resolve , ms ) ) ;
const { exec , execFile } = require ( 'child_process' ) ;
const keystore = dirty ( ) ;
function init ( mediaconfig : any , satyrconfig : any ) {
const nms = new NodeMediaServer ( mediaconfig ) ;
nms . run ( ) ;
@ -15,12 +18,12 @@ function init (mediaconfig: any, satyrconfig: any) {
let app : string = StreamPath . split ( "/" ) [ 1 ] ;
let key : string = StreamPath . split ( "/" ) [ 2 ] ;
//disallow urls not formatted exactly right
if ( StreamPath . split ( "/" ) . length !== 3 ) {
if ( StreamPath . split ( "/" ) . length !== 3 || key . includes ( ' ' ) ) {
console . log ( "[NodeMediaServer] Malformed URL, closing connection for stream:" , id ) ;
session . reject ( ) ;
return false ;
}
if ( app === satyrconfig . publicEndpoint ) {
/ * i f ( a p p = = = s a t y r c o n f i g . p u b l i c E n d p o i n t ) {
if ( session . isLocal ) {
//only allow publish to public endpoint from localhost
console . log ( "[NodeMediaServer] Local publish, stream:" , ` ${ id } ok. ` ) ;
@ -53,7 +56,7 @@ function init (mediaconfig: any, satyrconfig: any) {
}
return true ;
} ) ;
}
} * /
if ( app !== satyrconfig . privateEndpoint ) {
//app isn't at public endpoint if we've reached this point
console . log ( "[NodeMediaServer] Wrong endpoint, rejecting stream:" , id ) ;
@ -63,16 +66,12 @@ function init (mediaconfig: any, satyrconfig: any) {
//if the url is formatted correctly and the user is streaming to the correct private endpoint
//grab the username from the database and redirect the stream there if the key is valid
//otherwise kill the session
if ( key . includes ( ' ' ) ) {
session . reject ( ) ;
return false ;
}
db . query ( 'select username from users where stream_key=' + db . raw . escape ( key ) + ' limit 1' ) . then ( async ( results ) = > {
db . query ( 'select username,record_flag from users where stream_key=' + db . raw . escape ( key ) + ' limit 1' ) . then ( async ( results ) = > {
if ( results [ 0 ] ) {
//push to rtmp
execFile ( satyrconfig . ffmpeg , [ '-loglevel' , 'fatal' , '-analyzeduration' , '0' , '-i' , 'rtmp://127.0.0.1:' + mediaconfig . rtmp . port + '/' + satyrconfig . privateEndpoint + '/' + key , '-vcodec' , 'copy' , '-acodec' , 'copy' , '-crf' , '18' , '-f' , 'flv' , 'rtmp://127.0.0.1:' + mediaconfig . rtmp . port + '/' + satyrconfig . publicEndpoint + '/' + results [ 0 ] . username ] , { maxBuffer : Infinity } ) ;
//exec('ffmpeg -analyzeduration 0 -i rtmp://127.0.0.1:'+mediaconfig.rtmp.port+'/'+satyrconfig.privateEndpoint+'/'+key+' -vcodec copy -acodec copy -crf 18 -f flv rtmp://127.0.0.1:'+mediaconfig.rtmp.port+'/'+satyrconfig.publicEndpoint+'/'+results[0].username);
//execFile(satyrconfig.ffmpeg, ['-loglevel', 'fatal', '-analyzeduration', '0', '-i', 'rtmp://127.0.0.1:'+mediaconfig.rtmp.port+'/'+satyrconfig.privateEndpoint+'/'+key, '-vcodec', 'copy', '-acodec', 'copy', '-crf', '18', '-f', 'flv', 'rtmp://127.0.0.1:'+mediaconfig.rtmp.port+'/'+satyrconfig.publicEndpoint+'/'+results[0].username], {maxBuffer: Infinity});
//push to mpd after making sure directory exists
keystore [ results [ 0 ] . username ] = key ;
mkdir ( satyrconfig . directory + '/' + satyrconfig . publicEndpoint + '/' + results [ 0 ] . username , { recursive : true } , async ( err ) = > {
if ( err ) throw err ;
while ( true ) {
@ -83,6 +82,24 @@ function init (mediaconfig: any, satyrconfig: any) {
await sleep ( 300 ) ;
}
} ) ;
if ( results [ 0 ] . record_flag && satyrconfig . record ) {
console . log ( '[NodeMediaServer] Initiating recording for stream:' , id ) ;
mkdir ( satyrconfig . directory + '/' + satyrconfig . publicEndpoint + '/' + results [ 0 ] . username , { recursive : true } , ( err ) = > {
if ( err ) throw err ;
execFile ( satyrconfig . ffmpeg , [ '-loglevel' , 'fatal' , '-i' , 'rtmp://127.0.0.1:' + mediaconfig . rtmp . port + '/' + satyrconfig . prviateEndpoint + '/' + key , '-vcodec' , 'copy' , '-acodec' , 'copy' , satyrconfig . directory + '/' + satyrconfig . publicEndpoint + '/' + results [ 0 ] . username + '/' + strf ( '%d%b%Y-%H%M' ) + '.mp4' ] , {
detached : true ,
stdio : 'inherit' ,
maxBuffer : Infinity
} ) . unref ( ) ;
//spawn an ffmpeg process to record the stream, then detach it completely
//ffmpeg can then (probably) finalize the recording if satyr crashes mid-stream
} ) ;
}
else {
console . log ( '[NodeMediaServer] Skipping recording for stream:' , id ) ;
}
db . query ( 'update user_meta set live=true where username=\'' + results [ 0 ] . username + '\' limit 1' ) ;
console . log ( '[NodeMediaServer] Stream key ok for stream:' , id ) ;
}
else {
console . log ( '[NodeMediaServer] Invalid stream key for stream:' , id ) ;
@ -93,8 +110,12 @@ function init (mediaconfig: any, satyrconfig: any) {
nms . on ( 'donePublish' , ( id , StreamPath , args ) = > {
let app : string = StreamPath . split ( "/" ) [ 1 ] ;
let key : string = StreamPath . split ( "/" ) [ 2 ] ;
if ( app === satyrconfig . publicEndpoint ) {
db . query ( 'update user_meta set live=false where username=\'' + key + '\' limit 1' ) ;
if ( app === satyrconfig . privateEndpoint ) {
db . query ( 'update user_meta,users set user_meta.live=false where users.stream_key=' + db . raw . escape ( key ) ) ;
db . query ( 'select username from users where stream_key=' + db . raw . escape ( key ) + ' limit 1' ) . then ( async ( results ) = > {
keystore . rm ( results [ 0 ] . username ) ;
} ) ;
}
} ) ;
nms . on ( 'prePlay' , ( id , StreamPath , args ) = > {
@ -112,7 +133,15 @@ function init (mediaconfig: any, satyrconfig: any) {
if ( app === satyrconfig . privateEndpoint && ! session . isLocal ) {
console . log ( "[NodeMediaServer] Non-local Play from private endpoint, rejecting client:" , id ) ;
session . reject ( ) ;
return false ;
}
if ( app === satyrconfig . publicEndpoint ) {
if ( keystore [ key ] ) {
session . playStreamPath = '/' + satyrconfig . privateEndpoint + '/' + keystore [ key ] ;
return true ;
}
}
session . reject ( ) ;
} ) ;
}
export { init } ;