加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
ice.c 206.00 KB
一键复制 编辑 原始数据 按行查看 历史
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496449744984499450045014502450345044505450645074508450945104511451245134514451545164517451845194520452145224523452445254526452745284529453045314532453345344535453645374538453945404541454245434544454545464547454845494550455145524553455445554556455745584559456045614562456345644565456645674568456945704571457245734574457545764577457845794580458145824583458445854586458745884589459045914592459345944595459645974598459946004601460246034604460546064607460846094610461146124613461446154616461746184619462046214622462346244625462646274628462946304631463246334634463546364637463846394640464146424643464446454646464746484649465046514652465346544655465646574658465946604661466246634664466546664667466846694670467146724673467446754676467746784679468046814682468346844685468646874688468946904691469246934694469546964697469846994700470147024703470447054706470747084709471047114712471347144715471647174718471947204721472247234724472547264727472847294730473147324733473447354736473747384739474047414742474347444745474647474748474947504751475247534754475547564757475847594760476147624763476447654766476747684769477047714772477347744775477647774778477947804781478247834784478547864787478847894790479147924793479447954796479747984799480048014802480348044805480648074808480948104811481248134814481548164817481848194820482148224823482448254826482748284829483048314832483348344835483648374838483948404841484248434844484548464847484848494850485148524853485448554856485748584859486048614862486348644865486648674868486948704871487248734874487548764877487848794880488148824883488448854886488748884889489048914892489348944895489648974898489949004901490249034904490549064907490849094910491149124913
/*! \file ice.c
* \author Lorenzo Miniero <lorenzo@meetecho.com>
* \copyright GNU General Public License v3
* \brief ICE/STUN/TURN processing
* \details Implementation (based on libnice) of the ICE process. The
* code handles the whole ICE process, from the gathering of candidates
* to the final setup of a virtual channel RTP and RTCP can be transported
* on. Incoming RTP and RTCP packets from peers are relayed to the associated
* plugins by means of the incoming_rtp and incoming_rtcp callbacks. Packets
* to be sent to peers are relayed by peers invoking the relay_rtp and
* relay_rtcp core callbacks instead.
*
* \ingroup protocols
* \ref protocols
*/
#include <ifaddrs.h>
#include <poll.h>
#include <net/if.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <netdb.h>
#include <fcntl.h>
#include <stun/usages/bind.h>
#include <nice/debug.h>
#include "janus.h"
#include "debug.h"
#include "ice.h"
#include "turnrest.h"
#include "sdp.h"
#include "rtpsrtp.h"
#include "rtcp.h"
#include "apierror.h"
#include "ip-utils.h"
#include "events.h"
/* STUN server/port, if any */
static char *janus_stun_server = NULL;
static uint16_t janus_stun_port = 0;
char *janus_ice_get_stun_server(void) {
return janus_stun_server;
}
uint16_t janus_ice_get_stun_port(void) {
return janus_stun_port;
}
/* TURN server/port and credentials, if any */
static char *janus_turn_server = NULL;
static uint16_t janus_turn_port = 0;
static char *janus_turn_user = NULL, *janus_turn_pwd = NULL;
static NiceRelayType janus_turn_type = NICE_RELAY_TYPE_TURN_UDP;
char *janus_ice_get_turn_server(void) {
return janus_turn_server;
}
uint16_t janus_ice_get_turn_port(void) {
return janus_turn_port;
}
/* TURN REST API support, if any */
char *janus_ice_get_turn_rest_api(void) {
#ifndef HAVE_TURNRESTAPI
return NULL;
#else
return (char *)janus_turnrest_get_backend();
#endif
}
/* ICE-Lite status */
static gboolean janus_ice_lite_enabled;
gboolean janus_ice_is_ice_lite_enabled(void) {
return janus_ice_lite_enabled;
}
/* ICE-TCP support (only libnice >= 0.1.8, currently broken) */
static gboolean janus_ice_tcp_enabled;
gboolean janus_ice_is_ice_tcp_enabled(void) {
return janus_ice_tcp_enabled;
}
/* Full-trickle support */
static gboolean janus_full_trickle_enabled;
gboolean janus_ice_is_full_trickle_enabled(void) {
return janus_full_trickle_enabled;
}
/* mDNS resolution support */
static gboolean janus_mdns_enabled;
gboolean janus_ice_is_mdns_enabled(void) {
return janus_mdns_enabled;
}
/* IPv6 support (still mostly WIP) */
static gboolean janus_ipv6_enabled;
gboolean janus_ice_is_ipv6_enabled(void) {
return janus_ipv6_enabled;
}
/* Opaque IDs set by applications are by default only passed to event handlers
* for correlation purposes, but not sent back to the user or application in
* the related Janus API responses or events, unless configured otherwise */
static gboolean opaqueid_in_api = FALSE;
void janus_enable_opaqueid_in_api(void) {
opaqueid_in_api = TRUE;
}
gboolean janus_is_opaqueid_in_api_enabled(void) {
return opaqueid_in_api;
}
/* Only needed in case we're using static event loops spawned at startup (disabled by default) */
typedef struct janus_ice_static_event_loop {
int id;
GMainContext *mainctx;
GMainLoop *mainloop;
GThread *thread;
} janus_ice_static_event_loop;
static int static_event_loops = 0;
static GSList *event_loops = NULL, *current_loop = NULL;
static janus_mutex event_loops_mutex = JANUS_MUTEX_INITIALIZER;
static void *janus_ice_static_event_loop_thread(void *data) {
janus_ice_static_event_loop *loop = data;
JANUS_LOG(LOG_VERB, "[loop#%d] Event loop thread started\n", loop->id);
if(loop->mainloop == NULL) {
JANUS_LOG(LOG_ERR, "[loop#%d] Invalid loop...\n", loop->id);
g_thread_unref(g_thread_self());
return NULL;
}
JANUS_LOG(LOG_DBG, "[loop#%d] Looping...\n", loop->id);
g_main_loop_run(loop->mainloop);
/* When the loop quits, we can unref it */
g_main_loop_unref(loop->mainloop);
g_main_context_unref(loop->mainctx);
JANUS_LOG(LOG_VERB, "[loop#%d] Event loop thread ended!\n", loop->id);
return NULL;
}
int janus_ice_get_static_event_loops(void) {
return static_event_loops;
}
void janus_ice_set_static_event_loops(int loops) {
if(loops == 0)
return;
else if(loops < 1) {
JANUS_LOG(LOG_WARN, "Invalid number of static event loops (%d), disabling\n", loops);
return;
}
/* Create a pool of new event loops */
int i = 0;
for(i=0; i<loops; i++) {
janus_ice_static_event_loop *loop = g_malloc0(sizeof(janus_ice_static_event_loop));
loop->id = static_event_loops;
loop->mainctx = g_main_context_new();
loop->mainloop = g_main_loop_new(loop->mainctx, FALSE);
/* Now spawn a thread for this loop */
GError *error = NULL;
char tname[16];
g_snprintf(tname, sizeof(tname), "hloop %d", loop->id);
loop->thread = g_thread_try_new(tname, &janus_ice_static_event_loop_thread, loop, &error);
if(error != NULL) {
g_main_loop_unref(loop->mainloop);
g_main_context_unref(loop->mainctx);
g_free(loop);
JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch a new event loop thread...\n",
error->code, error->message ? error->message : "??");
g_error_free(error);
} else {
event_loops = g_slist_append(event_loops, loop);
static_event_loops++;
}
}
current_loop = event_loops;
JANUS_LOG(LOG_INFO, "Spawned %d static event loops (handles won't have a dedicated loop)\n", static_event_loops);
return;
}
void janus_ice_stop_static_event_loops(void) {
if(static_event_loops < 1)
return;
/* Quit all the static loops and wait for the threads to leave */
janus_mutex_lock(&event_loops_mutex);
GSList *l = event_loops;
while(l) {
janus_ice_static_event_loop *loop = (janus_ice_static_event_loop *)l->data;
if(loop->mainloop != NULL && g_main_loop_is_running(loop->mainloop))
g_main_loop_quit(loop->mainloop);
g_thread_join(loop->thread);
l = l->next;
}
g_slist_free_full(event_loops, (GDestroyNotify)g_free);
janus_mutex_unlock(&event_loops_mutex);
}
/* libnice debugging */
static gboolean janus_ice_debugging_enabled;
gboolean janus_ice_is_ice_debugging_enabled(void) {
return janus_ice_debugging_enabled;
}
void janus_ice_debugging_enable(void) {
JANUS_LOG(LOG_VERB, "Enabling libnice debugging...\n");
if(g_getenv("NICE_DEBUG") == NULL) {
JANUS_LOG(LOG_WARN, "No NICE_DEBUG environment variable set, setting maximum debug\n");
g_setenv("NICE_DEBUG", "all", TRUE);
}
if(g_getenv("G_MESSAGES_DEBUG") == NULL) {
JANUS_LOG(LOG_WARN, "No G_MESSAGES_DEBUG environment variable set, setting maximum debug\n");
g_setenv("G_MESSAGES_DEBUG", "all", TRUE);
}
JANUS_LOG(LOG_VERB, "Debugging NICE_DEBUG=%s G_MESSAGES_DEBUG=%s\n",
g_getenv("NICE_DEBUG"), g_getenv("G_MESSAGES_DEBUG"));
janus_ice_debugging_enabled = TRUE;
nice_debug_enable(strstr(g_getenv("NICE_DEBUG"), "all") || strstr(g_getenv("NICE_DEBUG"), "stun"));
}
void janus_ice_debugging_disable(void) {
JANUS_LOG(LOG_VERB, "Disabling libnice debugging...\n");
janus_ice_debugging_enabled = FALSE;
nice_debug_disable(TRUE);
}
/* NAT 1:1 stuff */
static gboolean nat_1_1_enabled = FALSE;
static gboolean keep_private_host = FALSE;
void janus_ice_enable_nat_1_1(gboolean kph) {
nat_1_1_enabled = TRUE;
keep_private_host = kph;
}
/* Interface/IP enforce/ignore lists */
GList *janus_ice_enforce_list = NULL, *janus_ice_ignore_list = NULL;
janus_mutex ice_list_mutex;
void janus_ice_enforce_interface(const char *ip) {
if(ip == NULL)
return;
/* Is this an IP or an interface? */
janus_mutex_lock(&ice_list_mutex);
janus_ice_enforce_list = g_list_append(janus_ice_enforce_list, (gpointer)ip);
janus_mutex_unlock(&ice_list_mutex);
}
gboolean janus_ice_is_enforced(const char *ip) {
if(ip == NULL || janus_ice_enforce_list == NULL)
return false;
janus_mutex_lock(&ice_list_mutex);
GList *temp = janus_ice_enforce_list;
while(temp) {
const char *enforced = (const char *)temp->data;
if(enforced != NULL && strstr(ip, enforced) == ip) {
janus_mutex_unlock(&ice_list_mutex);
return true;
}
temp = temp->next;
}
janus_mutex_unlock(&ice_list_mutex);
return false;
}
void janus_ice_ignore_interface(const char *ip) {
if(ip == NULL)
return;
/* Is this an IP or an interface? */
janus_mutex_lock(&ice_list_mutex);
janus_ice_ignore_list = g_list_append(janus_ice_ignore_list, (gpointer)ip);
if(janus_ice_enforce_list != NULL) {
JANUS_LOG(LOG_WARN, "Added %s to the ICE ignore list, but the ICE enforce list is not empty: the ICE ignore list will not be used\n", ip);
}
janus_mutex_unlock(&ice_list_mutex);
}
gboolean janus_ice_is_ignored(const char *ip) {
if(ip == NULL || janus_ice_ignore_list == NULL)
return false;
janus_mutex_lock(&ice_list_mutex);
GList *temp = janus_ice_ignore_list;
while(temp) {
const char *ignored = (const char *)temp->data;
if(ignored != NULL && strstr(ip, ignored) == ip) {
janus_mutex_unlock(&ice_list_mutex);
return true;
}
temp = temp->next;
}
janus_mutex_unlock(&ice_list_mutex);
return false;
}
/* Frequency of statistics via event handlers (one second by default) */
static int janus_ice_event_stats_period = 1;
void janus_ice_set_event_stats_period(int period) {
janus_ice_event_stats_period = period;
}
int janus_ice_get_event_stats_period(void) {
return janus_ice_event_stats_period;
}
/* RTP/RTCP port range */
uint16_t rtp_range_min = 0;
uint16_t rtp_range_max = 0;
#define JANUS_ICE_PACKET_AUDIO 0
#define JANUS_ICE_PACKET_VIDEO 1
#define JANUS_ICE_PACKET_TEXT 2
#define JANUS_ICE_PACKET_BINARY 3
#define JANUS_ICE_PACKET_SCTP 4
/* Janus enqueued (S)RTP/(S)RTCP packet to send */
typedef struct janus_ice_queued_packet {
char *data;
char *label;
char *protocol;
gint length;
gint type;
gboolean control;
gboolean retransmission;
gboolean encrypted;
gint64 added;
} janus_ice_queued_packet;
/* A few static, fake, messages we use as a trigger: e.g., to start a
* new DTLS handshake, hangup a PeerConnection or close a handle */
static janus_ice_queued_packet
janus_ice_start_gathering,
janus_ice_add_candidates,
janus_ice_dtls_handshake,
janus_ice_hangup_peerconnection,
janus_ice_detach_handle,
janus_ice_data_ready;
/* Janus NACKed packet we're tracking (to avoid duplicates) */
typedef struct janus_ice_nacked_packet {
janus_ice_handle *handle;
int vindex;
guint16 seq_number;
guint source_id;
} janus_ice_nacked_packet;
static gboolean janus_ice_nacked_packet_cleanup(gpointer user_data) {
janus_ice_nacked_packet *pkt = (janus_ice_nacked_packet *)user_data;
if(pkt->handle->stream){
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Cleaning up NACKed packet %"SCNu16" (SSRC %"SCNu32", vindex %d)...\n",
pkt->handle->handle_id, pkt->seq_number, pkt->handle->stream->video_ssrc_peer[pkt->vindex], pkt->vindex);
g_hash_table_remove(pkt->handle->stream->rtx_nacked[pkt->vindex], GUINT_TO_POINTER(pkt->seq_number));
g_hash_table_remove(pkt->handle->stream->pending_nacked_cleanup, GUINT_TO_POINTER(pkt->source_id));
}
return G_SOURCE_REMOVE;
}
/* Deallocation helpers for handles and related structs */
static void janus_ice_handle_free(const janus_refcount *handle_ref);
static void janus_ice_webrtc_free(janus_ice_handle *handle);
static void janus_ice_plugin_session_free(const janus_refcount *app_handle_ref);
static void janus_ice_stream_free(const janus_refcount *handle_ref);
static void janus_ice_component_free(const janus_refcount *handle_ref);
/* Custom GSource for outgoing traffic */
typedef struct janus_ice_outgoing_traffic {
GSource parent;
janus_ice_handle *handle;
GDestroyNotify destroy;
} janus_ice_outgoing_traffic;
static gboolean janus_ice_outgoing_rtcp_handle(gpointer user_data);
static gboolean janus_ice_outgoing_stats_handle(gpointer user_data);
static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janus_ice_queued_packet *pkt);
static gboolean janus_ice_outgoing_traffic_prepare(GSource *source, gint *timeout) {
janus_ice_outgoing_traffic *t = (janus_ice_outgoing_traffic *)source;
return (g_async_queue_length(t->handle->queued_packets) > 0);
}
static gboolean janus_ice_outgoing_traffic_dispatch(GSource *source, GSourceFunc callback, gpointer user_data) {
janus_ice_outgoing_traffic *t = (janus_ice_outgoing_traffic *)source;
int ret = G_SOURCE_CONTINUE;
janus_ice_queued_packet *pkt = NULL;
while((pkt = g_async_queue_try_pop(t->handle->queued_packets)) != NULL) {
if(janus_ice_outgoing_traffic_handle(t->handle, pkt) == G_SOURCE_REMOVE)
ret = G_SOURCE_REMOVE;
}
return ret;
}
static void janus_ice_outgoing_traffic_finalize(GSource *source) {
janus_ice_outgoing_traffic *t = (janus_ice_outgoing_traffic *)source;
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Finalizing loop source\n", t->handle->handle_id);
if(static_event_loops > 0) {
/* This handle was sharing an event loop with others */
janus_ice_webrtc_free(t->handle);
janus_refcount_decrease(&t->handle->ref);
} else if(t->handle->mainloop != NULL && g_main_loop_is_running(t->handle->mainloop)) {
/* This handle had a dedicated event loop, quit it */
g_main_loop_quit(t->handle->mainloop);
}
janus_refcount_decrease(&t->handle->ref);
}
static GSourceFuncs janus_ice_outgoing_traffic_funcs = {
janus_ice_outgoing_traffic_prepare,
NULL, /* We don't need check */
janus_ice_outgoing_traffic_dispatch,
janus_ice_outgoing_traffic_finalize,
NULL, NULL
};
static GSource *janus_ice_outgoing_traffic_create(janus_ice_handle *handle, GDestroyNotify destroy) {
GSource *source = g_source_new(&janus_ice_outgoing_traffic_funcs, sizeof(janus_ice_outgoing_traffic));
janus_ice_outgoing_traffic *t = (janus_ice_outgoing_traffic *)source;
char name[255];
g_snprintf(name, sizeof(name), "source-%"SCNu64, handle->handle_id);
g_source_set_name(source, name);
janus_refcount_increase(&handle->ref);
t->handle = handle;
t->destroy = destroy;
return source;
}
/* Time, in seconds, that should pass with no media (audio or video) being
* received before Janus notifies you about this with a receiving=false */
#define DEFAULT_NO_MEDIA_TIMER 1
static uint no_media_timer = DEFAULT_NO_MEDIA_TIMER;
void janus_set_no_media_timer(uint timer) {
no_media_timer = timer;
if(no_media_timer == 0)
JANUS_LOG(LOG_VERB, "Disabling no-media timer\n");
else
JANUS_LOG(LOG_VERB, "Setting no-media timer to %us\n", no_media_timer);
}
uint janus_get_no_media_timer(void) {
return no_media_timer;
}
/* Number of lost packets per seconds on a media stream (uplink or downlink,
* audio or video), that should result in a slow-link event to the iser */
#define DEFAULT_SLOWLINK_THRESHOLD 4
static uint slowlink_threshold = DEFAULT_SLOWLINK_THRESHOLD;
void janus_set_slowlink_threshold(uint packets) {
slowlink_threshold = packets;
if(slowlink_threshold == 0)
JANUS_LOG(LOG_VERB, "Disabling slow-link events\n");
else
JANUS_LOG(LOG_VERB, "Setting slowlink-threshold to %u packets\n", slowlink_threshold);
}
uint janus_get_slowlink_threshold(void) {
return slowlink_threshold;
}
/* Period, in milliseconds, to refer to for sending TWCC feedback */
#define DEFAULT_TWCC_PERIOD 200
static uint twcc_period = DEFAULT_TWCC_PERIOD;
void janus_set_twcc_period(uint period) {
twcc_period = period;
if(twcc_period == 0) {
JANUS_LOG(LOG_WARN, "Invalid TWCC period, falling back to default\n");
twcc_period = DEFAULT_TWCC_PERIOD;
} else {
JANUS_LOG(LOG_VERB, "Setting TWCC period to %ds\n", twcc_period);
}
}
uint janus_get_twcc_period(void) {
return twcc_period;
}
/* DSCP value, which we can set via libnice: it's disabled by default */
static int dscp_ef = 0;
void janus_set_dscp(int dscp) {
dscp_ef = dscp;
if(dscp_ef > 0) {
JANUS_LOG(LOG_VERB, "Setting DSCP EF to %d\n", dscp_ef);
}
}
int janus_get_dscp(void) {
return dscp_ef;
}
static inline void janus_ice_free_rtp_packet(janus_rtp_packet *pkt) {
if(pkt == NULL) {
return;
}
g_free(pkt->data);
g_free(pkt);
}
static void janus_ice_free_queued_packet(janus_ice_queued_packet *pkt) {
if(pkt == NULL || pkt == &janus_ice_start_gathering ||
pkt == &janus_ice_add_candidates ||
pkt == &janus_ice_dtls_handshake ||
pkt == &janus_ice_hangup_peerconnection ||
pkt == &janus_ice_detach_handle ||
pkt == &janus_ice_data_ready) {
return;
}
g_free(pkt->data);
g_free(pkt->label);
g_free(pkt->protocol);
g_free(pkt);
}
/* Minimum and maximum value, in milliseconds, for the NACK queue/retransmissions (default=200ms/1000ms) */
#define DEFAULT_MIN_NACK_QUEUE 200
#define DEFAULT_MAX_NACK_QUEUE 1000
/* Maximum ignore count after retransmission (200ms) */
#define MAX_NACK_IGNORE 200000
static uint16_t min_nack_queue = DEFAULT_MIN_NACK_QUEUE;
void janus_set_min_nack_queue(uint16_t mnq) {
min_nack_queue = mnq < DEFAULT_MAX_NACK_QUEUE ? mnq : DEFAULT_MAX_NACK_QUEUE;
if(min_nack_queue == 0)
JANUS_LOG(LOG_VERB, "Disabling NACK queue\n");
else
JANUS_LOG(LOG_VERB, "Setting min NACK queue to %dms\n", min_nack_queue);
}
uint16_t janus_get_min_nack_queue(void) {
return min_nack_queue;
}
/* Helper to clean old NACK packets in the buffer when they exceed the queue time limit */
static void janus_cleanup_nack_buffer(gint64 now, janus_ice_stream *stream, gboolean audio, gboolean video) {
if(stream && stream->component) {
janus_ice_component *component = stream->component;
if(audio && component->audio_retransmit_buffer) {
janus_rtp_packet *p = (janus_rtp_packet *)g_queue_peek_head(component->audio_retransmit_buffer);
while(p && (!now || (now - p->created >= (gint64)stream->nack_queue_ms*1000))) {
/* Packet is too old, get rid of it */
g_queue_pop_head(component->audio_retransmit_buffer);
/* Remove from hashtable too */
janus_rtp_header *header = (janus_rtp_header *)p->data;
guint16 seq = ntohs(header->seq_number);
g_hash_table_remove(component->audio_retransmit_seqs, GUINT_TO_POINTER(seq));
/* Free the packet */
janus_ice_free_rtp_packet(p);
p = (janus_rtp_packet *)g_queue_peek_head(component->audio_retransmit_buffer);
}
}
if(video && component->video_retransmit_buffer) {
janus_rtp_packet *p = (janus_rtp_packet *)g_queue_peek_head(component->video_retransmit_buffer);
while(p && (!now || (now - p->created >= (gint64)stream->nack_queue_ms*1000))) {
/* Packet is too old, get rid of it */
g_queue_pop_head(component->video_retransmit_buffer);
/* Remove from hashtable too */
janus_rtp_header *header = (janus_rtp_header *)p->data;
guint16 seq = ntohs(header->seq_number);
g_hash_table_remove(component->video_retransmit_seqs, GUINT_TO_POINTER(seq));
/* Free the packet */
janus_ice_free_rtp_packet(p);
p = (janus_rtp_packet *)g_queue_peek_head(component->video_retransmit_buffer);
}
}
}
}
#define SEQ_MISSING_WAIT 12000 /* 12ms */
#define SEQ_NACKED_WAIT 155000 /* 155ms */
/* janus_seq_info list functions */
static void janus_seq_append(janus_seq_info **head, janus_seq_info *new_seq) {
if(*head == NULL) {
new_seq->prev = new_seq;
new_seq->next = new_seq;
*head = new_seq;
} else {
janus_seq_info *last_seq = (*head)->prev;
new_seq->prev = last_seq;
new_seq->next = *head;
(*head)->prev = new_seq;
last_seq->next = new_seq;
}
}
static janus_seq_info *janus_seq_pop_head(janus_seq_info **head) {
janus_seq_info *pop_seq = *head;
if(pop_seq) {
janus_seq_info *new_head = pop_seq->next;
if(pop_seq == new_head || new_head == NULL) {
*head = NULL;
} else {
*head = new_head;
new_head->prev = pop_seq->prev;
new_head->prev->next = new_head;
}
}
return pop_seq;
}
void janus_seq_list_free(janus_seq_info **head) {
if(!*head)
return;
janus_seq_info *cur = *head;
do {
janus_seq_info *next = cur->next;
g_free(cur);
cur = next;
} while(cur != *head);
*head = NULL;
}
static int janus_seq_in_range(guint16 seqn, guint16 start, guint16 len) {
/* Supports wrapping sequence (easier with int range) */
int n = seqn;
int nh = (1<<16) + n;
int s = start;
int e = s + len;
return (s <= n && n < e) || (s <= nh && nh < e);
}
/* Internal method for relaying RTCP messages, optionally filtering them in case they come from plugins */
void janus_ice_relay_rtcp_internal(janus_ice_handle *handle, janus_plugin_rtcp *packet, gboolean filter_rtcp);
/* Map of active plugin sessions */
static GHashTable *plugin_sessions;
static janus_mutex plugin_sessions_mutex;
gboolean janus_plugin_session_is_alive(janus_plugin_session *plugin_session) {
if(plugin_session == NULL || plugin_session < (janus_plugin_session *)0x1000 ||
g_atomic_int_get(&plugin_session->stopped))
return FALSE;
/* Make sure this plugin session is still alive */
janus_mutex_lock_nodebug(&plugin_sessions_mutex);
janus_plugin_session *result = g_hash_table_lookup(plugin_sessions, plugin_session);
janus_mutex_unlock_nodebug(&plugin_sessions_mutex);
if(result == NULL) {
JANUS_LOG(LOG_ERR, "Invalid plugin session (%p)\n", plugin_session);
}
return (result != NULL);
}
static void janus_plugin_session_dereference(janus_plugin_session *plugin_session) {
if(plugin_session)
janus_refcount_decrease(&plugin_session->ref);
}
static void janus_ice_clear_queued_candidates(janus_ice_handle *handle) {
if(handle == NULL || handle->queued_candidates == NULL) {
return;
}
while(g_async_queue_length(handle->queued_candidates) > 0) {
(void)g_async_queue_try_pop(handle->queued_candidates);
}
}
static void janus_ice_clear_queued_packets(janus_ice_handle *handle) {
if(handle == NULL || handle->queued_packets == NULL) {
return;
}
janus_ice_queued_packet *pkt = NULL;
while(g_async_queue_length(handle->queued_packets) > 0) {
pkt = g_async_queue_try_pop(handle->queued_packets);
janus_ice_free_queued_packet(pkt);
}
}
static void janus_ice_notify_trickle(janus_ice_handle *handle, char *buffer) {
if(handle == NULL)
return;
char cbuffer[200];
if(buffer != NULL)
g_snprintf(cbuffer, sizeof(cbuffer), "candidate:%s", buffer);
/* Send a "trickle" event to the browser */
janus_session *session = (janus_session *)handle->session;
if(session == NULL)
return;
json_t *event = json_object();
json_object_set_new(event, "janus", json_string("trickle"));
json_object_set_new(event, "session_id", json_integer(session->session_id));
json_object_set_new(event, "sender", json_integer(handle->handle_id));
if(opaqueid_in_api && handle->opaque_id != NULL)
json_object_set_new(event, "opaque_id", json_string(handle->opaque_id));
json_t *candidate = json_object();
if(buffer != NULL) {
json_object_set_new(candidate, "sdpMid", json_string(handle->stream_mid));
json_object_set_new(candidate, "sdpMLineIndex", json_integer(0));
json_object_set_new(candidate, "candidate", json_string(cbuffer));
} else {
json_object_set_new(candidate, "completed", json_true());
}
json_object_set_new(event, "candidate", candidate);
/* Send the event */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Sending trickle event (%s) to transport...\n",
handle->handle_id, buffer ? "candidate" : "end-of-candidates");
janus_session_notify_event(session, event);
}
static void janus_ice_notify_media(janus_ice_handle *handle, gboolean video, gboolean up) {
if(handle == NULL)
return;
/* Prepare JSON event to notify user/application */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Notifying that we %s receiving %s\n",
handle->handle_id, up ? "are" : "are NOT", video ? "video" : "audio");
janus_session *session = (janus_session *)handle->session;
if(session == NULL)
return;
json_t *event = json_object();
json_object_set_new(event, "janus", json_string("media"));
json_object_set_new(event, "session_id", json_integer(session->session_id));
json_object_set_new(event, "sender", json_integer(handle->handle_id));
if(opaqueid_in_api && handle->opaque_id != NULL)
json_object_set_new(event, "opaque_id", json_string(handle->opaque_id));
json_object_set_new(event, "type", json_string(video ? "video" : "audio"));
json_object_set_new(event, "receiving", up ? json_true() : json_false());
if(!up && no_media_timer > 1)
json_object_set_new(event, "seconds", json_integer(no_media_timer));
/* Send the event */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Sending event to transport...\n", handle->handle_id);
janus_session_notify_event(session, event);
/* Notify event handlers as well */
if(janus_events_is_enabled()) {
json_t *info = json_object();
json_object_set_new(info, "media", json_string(video ? "video" : "audio"));
json_object_set_new(info, "receiving", up ? json_true() : json_false());
if(!up && no_media_timer > 1)
json_object_set_new(info, "seconds", json_integer(no_media_timer));
janus_events_notify_handlers(JANUS_EVENT_TYPE_MEDIA, JANUS_EVENT_SUBTYPE_MEDIA_STATE,
session->session_id, handle->handle_id, handle->opaque_id, info);
}
}
void janus_ice_notify_hangup(janus_ice_handle *handle, const char *reason) {
if(handle == NULL)
return;
/* Prepare JSON event to notify user/application */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Notifying WebRTC hangup; %p\n", handle->handle_id, handle);
janus_session *session = (janus_session *)handle->session;
if(session == NULL)
return;
json_t *event = json_object();
json_object_set_new(event, "janus", json_string("hangup"));
json_object_set_new(event, "session_id", json_integer(session->session_id));
json_object_set_new(event, "sender", json_integer(handle->handle_id));
if(opaqueid_in_api && handle->opaque_id != NULL)
json_object_set_new(event, "opaque_id", json_string(handle->opaque_id));
if(reason != NULL)
json_object_set_new(event, "reason", json_string(reason));
/* Send the event */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Sending event to transport...; %p\n", handle->handle_id, handle);
janus_session_notify_event(session, event);
/* Notify event handlers as well */
if(janus_events_is_enabled()) {
json_t *info = json_object();
json_object_set_new(info, "connection", json_string("hangup"));
if(reason != NULL)
json_object_set_new(info, "reason", json_string(reason));
janus_events_notify_handlers(JANUS_EVENT_TYPE_WEBRTC, JANUS_EVENT_SUBTYPE_WEBRTC_STATE,
session->session_id, handle->handle_id, handle->opaque_id, info);
}
}
/* Trickle helpers */
janus_ice_trickle *janus_ice_trickle_new(const char *transaction, json_t *candidate) {
if(transaction == NULL || candidate == NULL)
return NULL;
janus_ice_trickle *trickle = g_malloc(sizeof(janus_ice_trickle));
trickle->handle = NULL;
trickle->received = janus_get_monotonic_time();
trickle->transaction = g_strdup(transaction);
trickle->candidate = json_deep_copy(candidate);
return trickle;
}
gint janus_ice_trickle_parse(janus_ice_handle *handle, json_t *candidate, const char **error) {
const char *ignore_error = NULL;
if(error == NULL) {
error = &ignore_error;
}
if(handle == NULL) {
*error = "Invalid handle";
return JANUS_ERROR_HANDLE_NOT_FOUND;
}
/* Parse trickle candidate */
if(!json_is_object(candidate) || json_object_get(candidate, "completed") != NULL) {
JANUS_LOG(LOG_VERB, "No more remote candidates for handle %"SCNu64"!\n", handle->handle_id);
janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALL_TRICKLES);
} else {
/* Handle remote candidate */
json_t *mid = json_object_get(candidate, "sdpMid");
if(mid && !json_is_string(mid)) {
*error = "Trickle error: invalid element type (sdpMid should be a string)";
return JANUS_ERROR_INVALID_ELEMENT_TYPE;
}
json_t *mline = json_object_get(candidate, "sdpMLineIndex");
if(mline && (!json_is_integer(mline) || json_integer_value(mline) < 0)) {
*error = "Trickle error: invalid element type (sdpMLineIndex should be a positive integer)";
return JANUS_ERROR_INVALID_ELEMENT_TYPE;
}
if(!mid && !mline) {
*error = "Trickle error: missing mandatory element (sdpMid or sdpMLineIndex)";
return JANUS_ERROR_MISSING_MANDATORY_ELEMENT;
}
json_t *rc = json_object_get(candidate, "candidate");
if(!rc) {
*error = "Trickle error: missing mandatory element (candidate)";
return JANUS_ERROR_MISSING_MANDATORY_ELEMENT;
}
if(!json_is_string(rc)) {
*error = "Trickle error: invalid element type (candidate should be a string)";
return JANUS_ERROR_INVALID_ELEMENT_TYPE;
}
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Trickle candidate (%s): %s\n", handle->handle_id, json_string_value(mid), json_string_value(rc));
/* Parse it */
int sdpMLineIndex = mline ? json_integer_value(mline) : -1;
const char *sdpMid = json_string_value(mid);
if(sdpMLineIndex > 0 || (handle->stream_mid && sdpMid && strcmp(handle->stream_mid, sdpMid))) {
/* FIXME We bundle everything, so we ignore candidates for anything beyond the first m-line */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Got a mid='%s' candidate (index %d) but we're bundling, ignoring...\n",
handle->handle_id, json_string_value(mid), sdpMLineIndex);
return 0;
}
janus_ice_stream *stream = handle->stream;
if(stream == NULL) {
*error = "Trickle error: invalid element type (no such stream)";
return JANUS_ERROR_TRICKE_INVALID_STREAM;
}
int res = janus_sdp_parse_candidate(stream, json_string_value(rc), 1);
if(res != 0) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Failed to parse candidate... (%d)\n", handle->handle_id, res);
/* FIXME Should we return an error? */
}
}
return 0;
}
void janus_ice_trickle_destroy(janus_ice_trickle *trickle) {
if(trickle == NULL)
return;
g_free(trickle->transaction);
trickle->transaction = NULL;
if(trickle->candidate)
json_decref(trickle->candidate);
trickle->candidate = NULL;
g_free(trickle);
}
/* libnice initialization */
void janus_ice_init(gboolean ice_lite, gboolean ice_tcp, gboolean full_trickle, gboolean ignore_mdns,
gboolean ipv6, uint16_t rtp_min_port, uint16_t rtp_max_port) {
janus_ice_lite_enabled = ice_lite;
janus_ice_tcp_enabled = ice_tcp;
janus_full_trickle_enabled = full_trickle;
janus_mdns_enabled = !ignore_mdns;
janus_ipv6_enabled = ipv6;
JANUS_LOG(LOG_INFO, "Initializing ICE stuff (%s mode, ICE-TCP candidates %s, %s-trickle, IPv6 support %s)\n",
janus_ice_lite_enabled ? "Lite" : "Full",
janus_ice_tcp_enabled ? "enabled" : "disabled",
janus_full_trickle_enabled ? "full" : "half",
janus_ipv6_enabled ? "enabled" : "disabled");
if(janus_ice_tcp_enabled) {
#ifndef HAVE_LIBNICE_TCP
JANUS_LOG(LOG_WARN, "libnice version < 0.1.8, disabling ICE-TCP support\n");
janus_ice_tcp_enabled = FALSE;
#else
if(!janus_ice_lite_enabled) {
JANUS_LOG(LOG_WARN, "You may experience problems when having ICE-TCP enabled without having ICE Lite enabled too in libnice\n");
}
#endif
}
/* libnice debugging is disabled unless explicitly stated */
nice_debug_disable(TRUE);
/*! \note The RTP/RTCP port range configuration may be just a placeholder: for
* instance, libnice supports this since 0.1.0, but the 0.1.3 on Fedora fails
* when linking with an undefined reference to \c nice_agent_set_port_range
* so this is checked by the install.sh script in advance. */
rtp_range_min = rtp_min_port;
rtp_range_max = rtp_max_port;
if(rtp_range_max < rtp_range_min) {
JANUS_LOG(LOG_WARN, "Invalid ICE port range: %"SCNu16" > %"SCNu16"\n", rtp_range_min, rtp_range_max);
} else if(rtp_range_min > 0 || rtp_range_max > 0) {
#ifndef HAVE_PORTRANGE
JANUS_LOG(LOG_WARN, "nice_agent_set_port_range unavailable, port range disabled\n");
#else
JANUS_LOG(LOG_INFO, "ICE port range: %"SCNu16"-%"SCNu16"\n", rtp_range_min, rtp_range_max);
#endif
}
if(!janus_mdns_enabled)
JANUS_LOG(LOG_WARN, "mDNS resolution disabled, .local candidates will be ignored\n");
/* We keep track of plugin sessions to avoid problems */
plugin_sessions = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_plugin_session_dereference);
janus_mutex_init(&plugin_sessions_mutex);
#ifdef HAVE_TURNRESTAPI
/* Initialize the TURN REST API client stack, whether we're going to use it or not */
janus_turnrest_init();
#endif
}
void janus_ice_deinit(void) {
#ifdef HAVE_TURNRESTAPI
janus_turnrest_deinit();
#endif
}
int janus_ice_test_stun_server(janus_network_address *addr, uint16_t port,
uint16_t local_port, janus_network_address *public_addr, uint16_t *public_port) {
if(!addr || !public_addr)
return -1;
/* Test the STUN server */
StunAgent stun;
stun_agent_init (&stun, STUN_ALL_KNOWN_ATTRIBUTES, STUN_COMPATIBILITY_RFC5389, 0);
StunMessage msg;
uint8_t buf[1500];
size_t len = stun_usage_bind_create(&stun, &msg, buf, 1500);
JANUS_LOG(LOG_INFO, "Testing STUN server: message is of %zu bytes\n", len);
/* Use the janus_network_address info to drive the socket creation */
int fd = socket(addr->family, SOCK_DGRAM, 0);
if(fd < 0) {
JANUS_LOG(LOG_FATAL, "Error creating socket for STUN BINDING test\n");
return -1;
}
struct sockaddr *address = NULL, *remote = NULL;
struct sockaddr_in address4 = { 0 }, remote4 = { 0 };
struct sockaddr_in6 address6 = { 0 }, remote6 = { 0 };
socklen_t addrlen = 0;
if(addr->family == AF_INET) {
memset(&address4, 0, sizeof(address4));
address4.sin_family = AF_INET;
address4.sin_port = htons(local_port);
address4.sin_addr.s_addr = INADDR_ANY;
memset(&remote4, 0, sizeof(remote4));
remote4.sin_family = AF_INET;
remote4.sin_port = htons(port);
memcpy(&remote4.sin_addr, &addr->ipv4, sizeof(addr->ipv4));
address = (struct sockaddr *)(&address4);
remote = (struct sockaddr *)(&remote4);
addrlen = sizeof(remote4);
} else if(addr->family == AF_INET6) {
memset(&address6, 0, sizeof(address6));
address6.sin6_family = AF_INET6;
address6.sin6_port = htons(local_port);
address6.sin6_addr = in6addr_any;
memset(&remote6, 0, sizeof(remote6));
remote6.sin6_family = AF_INET6;
remote6.sin6_port = htons(port);
memcpy(&remote6.sin6_addr, &addr->ipv6, sizeof(addr->ipv6));
remote6.sin6_addr = addr->ipv6;
address = (struct sockaddr *)(&address6);
remote = (struct sockaddr *)(&remote6);
addrlen = sizeof(remote6);
}
if(bind(fd, address, addrlen) < 0) {
JANUS_LOG(LOG_FATAL, "Bind failed for STUN BINDING test: %d (%s)\n", errno, strerror(errno));
close(fd);
return -1;
}
int bytes = sendto(fd, buf, len, 0, remote, addrlen);
if(bytes < 0) {
JANUS_LOG(LOG_FATAL, "Error sending STUN BINDING test\n");
close(fd);
return -1;
}
JANUS_LOG(LOG_VERB, " >> Sent %d bytes, waiting for reply...\n", bytes);
struct timeval timeout;
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(fd, &readfds);
timeout.tv_sec = 5; /* FIXME Don't wait forever */
timeout.tv_usec = 0;
int err = select(fd+1, &readfds, NULL, NULL, &timeout);
if(err < 0) {
JANUS_LOG(LOG_FATAL, "Error waiting for a response to our STUN BINDING test: %d (%s)\n", errno, strerror(errno));
close(fd);
return -1;
}
if(!FD_ISSET(fd, &readfds)) {
JANUS_LOG(LOG_FATAL, "No response to our STUN BINDING test\n");
close(fd);
return -1;
}
bytes = recvfrom(fd, buf, 1500, 0, remote, &addrlen);
JANUS_LOG(LOG_VERB, " >> Got %d bytes...\n", bytes);
close(fd);
if(bytes < 0) {
JANUS_LOG(LOG_FATAL, "Failed to receive STUN\n");
return -1;
}
if(stun_agent_validate (&stun, &msg, buf, bytes, NULL, NULL) != STUN_VALIDATION_SUCCESS) {
JANUS_LOG(LOG_FATAL, "Failed to validate STUN BINDING response\n");
return -1;
}
StunClass class = stun_message_get_class(&msg);
StunMethod method = stun_message_get_method(&msg);
if(class != STUN_RESPONSE || method != STUN_BINDING) {
JANUS_LOG(LOG_FATAL, "Unexpected STUN response: %d/%d\n", class, method);
return -1;
}
StunMessageReturn ret = stun_message_find_xor_addr(&msg, STUN_ATTRIBUTE_XOR_MAPPED_ADDRESS, (struct sockaddr_storage *)address, &addrlen);
JANUS_LOG(LOG_VERB, " >> XOR-MAPPED-ADDRESS: %d\n", ret);
if(ret == STUN_MESSAGE_RETURN_SUCCESS) {
if(janus_network_address_from_sockaddr(address, public_addr) != 0) {
JANUS_LOG(LOG_ERR, "Could not resolve XOR-MAPPED-ADDRESS...\n");
return -1;
}
if(public_port != NULL) {
if(address->sa_family == AF_INET) {
struct sockaddr_in *addr = (struct sockaddr_in *)address;
*public_port = ntohs(addr->sin_port);
} else if(address->sa_family == AF_INET6) {
struct sockaddr_in6 *addr = (struct sockaddr_in6 *)address;
*public_port = ntohs(addr->sin6_port);
}
}
return 0;
}
ret = stun_message_find_addr(&msg, STUN_ATTRIBUTE_MAPPED_ADDRESS, (struct sockaddr_storage *)address, &addrlen);
JANUS_LOG(LOG_VERB, " >> MAPPED-ADDRESS: %d\n", ret);
if(ret == STUN_MESSAGE_RETURN_SUCCESS) {
if(janus_network_address_from_sockaddr(address, public_addr) != 0) {
JANUS_LOG(LOG_ERR, "Could not resolve MAPPED-ADDRESS...\n");
return -1;
}
if(public_port != NULL) {
if(address->sa_family == AF_INET) {
struct sockaddr_in *addr = (struct sockaddr_in *)address;
*public_port = ntohs(addr->sin_port);
} else if(address->sa_family == AF_INET6) {
struct sockaddr_in6 *addr = (struct sockaddr_in6 *)address;
*public_port = ntohs(addr->sin6_port);
}
}
return 0;
}
/* No usable attribute? */
JANUS_LOG(LOG_ERR, "No XOR-MAPPED-ADDRESS or MAPPED-ADDRESS...\n");
return -1;
}
int janus_ice_set_stun_server(gchar *stun_server, uint16_t stun_port) {
if(stun_server == NULL)
return 0; /* No initialization needed */
if(stun_port == 0)
stun_port = 3478;
JANUS_LOG(LOG_INFO, "STUN server to use: %s:%u\n", stun_server, stun_port);
/* Resolve address to get an IP */
struct addrinfo *res = NULL;
janus_network_address addr;
janus_network_address_string_buffer addr_buf;
if(getaddrinfo(stun_server, NULL, NULL, &res) != 0 ||
janus_network_address_from_sockaddr(res->ai_addr, &addr) != 0 ||
janus_network_address_to_string_buffer(&addr, &addr_buf) != 0) {
JANUS_LOG(LOG_ERR, "Could not resolve %s...\n", stun_server);
if(res)
freeaddrinfo(res);
return -1;
}
freeaddrinfo(res);
janus_stun_server = g_strdup(janus_network_address_string_from_buffer(&addr_buf));
if(janus_stun_server == NULL) {
JANUS_LOG(LOG_ERR, "Could not resolve %s...\n", stun_server);
return -1;
}
janus_stun_port = stun_port;
JANUS_LOG(LOG_INFO, " >> %s:%u (%s)\n", janus_stun_server, janus_stun_port, addr.family == AF_INET ? "IPv4" : "IPv6");
/* Test the STUN server */
janus_network_address public_addr = { 0 };
if(janus_ice_test_stun_server(&addr, janus_stun_port, 0, &public_addr, NULL) < 0) {
g_free(janus_stun_server);
janus_stun_server = NULL;
return -1;
}
if(janus_network_address_to_string_buffer(&public_addr, &addr_buf) != 0) {
JANUS_LOG(LOG_ERR, "Could not resolve public address...\n");
g_free(janus_stun_server);
janus_stun_server = NULL;
return -1;
}
const char *public_ip = janus_network_address_string_from_buffer(&addr_buf);
JANUS_LOG(LOG_INFO, " >> Our public address is %s\n", public_ip);
janus_add_public_ip(public_ip);
return 0;
}
int janus_ice_set_turn_server(gchar *turn_server, uint16_t turn_port, gchar *turn_type, gchar *turn_user, gchar *turn_pwd) {
if(turn_server == NULL)
return 0; /* No initialization needed */
if(turn_type == NULL)
turn_type = (char *)"udp";
if(turn_port == 0)
turn_port = 3478;
JANUS_LOG(LOG_INFO, "TURN server to use: %s:%u (%s)\n", turn_server, turn_port, turn_type);
if(!strcasecmp(turn_type, "udp")) {
janus_turn_type = NICE_RELAY_TYPE_TURN_UDP;
} else if(!strcasecmp(turn_type, "tcp")) {
janus_turn_type = NICE_RELAY_TYPE_TURN_TCP;
} else if(!strcasecmp(turn_type, "tls")) {
janus_turn_type = NICE_RELAY_TYPE_TURN_TLS;
} else {
JANUS_LOG(LOG_ERR, "Unsupported relay type '%s'...\n", turn_type);
return -1;
}
/* Resolve address to get an IP */
struct addrinfo *res = NULL;
janus_network_address addr;
janus_network_address_string_buffer addr_buf;
if(getaddrinfo(turn_server, NULL, NULL, &res) != 0 ||
janus_network_address_from_sockaddr(res->ai_addr, &addr) != 0 ||
janus_network_address_to_string_buffer(&addr, &addr_buf) != 0) {
JANUS_LOG(LOG_ERR, "Could not resolve %s...\n", turn_server);
if(res)
freeaddrinfo(res);
return -1;
}
freeaddrinfo(res);
g_free(janus_turn_server);
janus_turn_server = g_strdup(janus_network_address_string_from_buffer(&addr_buf));
if(janus_turn_server == NULL) {
JANUS_LOG(LOG_ERR, "Could not resolve %s...\n", turn_server);
return -1;
}
janus_turn_port = turn_port;
JANUS_LOG(LOG_VERB, " >> %s:%u\n", janus_turn_server, janus_turn_port);
g_free(janus_turn_user);
janus_turn_user = NULL;
if(turn_user)
janus_turn_user = g_strdup(turn_user);
g_free(janus_turn_pwd);
janus_turn_pwd = NULL;
if(turn_pwd)
janus_turn_pwd = g_strdup(turn_pwd);
return 0;
}
int janus_ice_set_turn_rest_api(gchar *api_server, gchar *api_key, gchar *api_method) {
#ifndef HAVE_TURNRESTAPI
JANUS_LOG(LOG_ERR, "Janus has been built with no libcurl support, TURN REST API unavailable\n");
return -1;
#else
if(api_server != NULL &&
(strstr(api_server, "http://") != api_server && strstr(api_server, "https://") != api_server)) {
JANUS_LOG(LOG_ERR, "Invalid TURN REST API backend: not an HTTP address\n");
return -1;
}
janus_turnrest_set_backend(api_server, api_key, api_method);
JANUS_LOG(LOG_INFO, "TURN REST API backend: %s\n", api_server ? api_server : "(disabled)");
#endif
return 0;
}
/* ICE stuff */
static const gchar *janus_ice_state_name[] =
{
"disconnected",
"gathering",
"connecting",
"connected",
"ready",
"failed"
};
const gchar *janus_get_ice_state_name(gint state) {
if(state < 0 || state > 5)
return NULL;
return janus_ice_state_name[state];
}
/* Thread to take care of the handle loop */
static void *janus_ice_handle_thread(void *data) {
janus_ice_handle *handle = data;
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Handle thread started; %p\n", handle->handle_id, handle);
if(handle->mainloop == NULL) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Invalid loop...\n", handle->handle_id);
janus_refcount_decrease(&handle->ref);
g_thread_unref(g_thread_self());
return NULL;
}
JANUS_LOG(LOG_DBG, "[%"SCNu64"] Looping...\n", handle->handle_id);
g_main_loop_run(handle->mainloop);
janus_ice_webrtc_free(handle);
handle->thread = NULL;
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Handle thread ended! %p\n", handle->handle_id, handle);
/* Unref the handle */
janus_refcount_decrease(&handle->ref);
g_thread_unref(g_thread_self());
return NULL;
}
janus_ice_handle *janus_ice_handle_create(void *core_session, const char *opaque_id, const char *token) {
if(core_session == NULL)
return NULL;
janus_session *session = (janus_session *)core_session;
janus_ice_handle *handle = NULL;
guint64 handle_id = 0;
while(handle_id == 0) {
handle_id = janus_random_uint64();
handle = janus_session_handles_find(session, handle_id);
if(handle != NULL) {
/* Handle ID already taken, try another one */
janus_refcount_decrease(&handle->ref); /* janus_session_handles_find increases it */
handle_id = 0;
}
}
handle = (janus_ice_handle *)g_malloc0(sizeof(janus_ice_handle));
JANUS_LOG(LOG_INFO, "Creating new handle in session %"SCNu64": %"SCNu64"; %p %p\n", session->session_id, handle_id, core_session, handle);
janus_refcount_init(&handle->ref, janus_ice_handle_free);
janus_refcount_increase(&session->ref);
handle->session = core_session;
if(opaque_id)
handle->opaque_id = g_strdup(opaque_id);
if(token)
handle->token = g_strdup(token);
handle->created = janus_get_monotonic_time();
handle->handle_id = handle_id;
handle->app = NULL;
handle->app_handle = NULL;
handle->queued_candidates = g_async_queue_new();
handle->queued_packets = g_async_queue_new();
janus_mutex_init(&handle->mutex);
janus_session_handles_insert(session, handle);
return handle;
}
gint janus_ice_handle_attach_plugin(void *core_session, janus_ice_handle *handle, janus_plugin *plugin) {
if(core_session == NULL)
return JANUS_ERROR_SESSION_NOT_FOUND;
janus_session *session = (janus_session *)core_session;
if(plugin == NULL)
return JANUS_ERROR_PLUGIN_NOT_FOUND;
if(handle == NULL)
return JANUS_ERROR_HANDLE_NOT_FOUND;
if(handle->app != NULL) {
/* This handle is already attached to a plugin */
return JANUS_ERROR_PLUGIN_ATTACH;
}
int error = 0;
janus_plugin_session *session_handle = g_malloc(sizeof(janus_plugin_session));
session_handle->gateway_handle = handle;
session_handle->plugin_handle = NULL;
g_atomic_int_set(&session_handle->stopped, 0);
plugin->create_session(session_handle, &error);
if(error) {
/* TODO Make error struct to pass verbose information */
g_free(session_handle);
return error;
}
janus_refcount_init(&session_handle->ref, janus_ice_plugin_session_free);
/* Handle and plugin session reference each other */
janus_refcount_increase(&session_handle->ref);
janus_refcount_increase(&handle->ref);
handle->app = plugin;
handle->app_handle = session_handle;
/* Add this plugin session to active sessions map */
janus_mutex_lock(&plugin_sessions_mutex);
g_hash_table_insert(plugin_sessions, session_handle, session_handle);
janus_mutex_unlock(&plugin_sessions_mutex);
/* Create a new context, loop, and source */
if(static_event_loops == 0) {
handle->mainctx = g_main_context_new();
handle->mainloop = g_main_loop_new(handle->mainctx, FALSE);
} else {
/* We're actually using static event loops, pick one from the list */
janus_refcount_increase(&handle->ref);
janus_mutex_lock(&event_loops_mutex);
janus_ice_static_event_loop *loop = (janus_ice_static_event_loop *)current_loop->data;
handle->mainctx = loop->mainctx;
handle->mainloop = loop->mainloop;
current_loop = current_loop->next;
if(current_loop == NULL)
current_loop = event_loops;
janus_mutex_unlock(&event_loops_mutex);
}
handle->rtp_source = janus_ice_outgoing_traffic_create(handle, (GDestroyNotify)g_free);
g_source_set_priority(handle->rtp_source, G_PRIORITY_DEFAULT);
g_source_attach(handle->rtp_source, handle->mainctx);
if(static_event_loops == 0) {
/* Now spawn a thread for this loop */
GError *terror = NULL;
char tname[16];
g_snprintf(tname, sizeof(tname), "hloop %"SCNu64, handle->handle_id);
janus_refcount_increase(&handle->ref);
handle->thread = g_thread_try_new(tname, &janus_ice_handle_thread, handle, &terror);
if(terror != NULL) {
/* FIXME We should clear some resources... */
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Got error %d (%s) trying to launch the handle thread...\n",
handle->handle_id, terror->code, terror->message ? terror->message : "??");
g_error_free(terror);
janus_refcount_decrease(&handle->ref); /* This is for the thread reference we just added */
janus_ice_handle_destroy(session, handle);
return -1;
}
}
/* Notify event handlers */
if(janus_events_is_enabled())
janus_events_notify_handlers(JANUS_EVENT_TYPE_HANDLE, JANUS_EVENT_SUBTYPE_NONE,
session->session_id, handle->handle_id, "attached", plugin->get_package(), handle->opaque_id, handle->token);
return 0;
}
gint janus_ice_handle_destroy(void *core_session, janus_ice_handle *handle) {
/* session->mutex has to be locked when calling this function */
janus_session *session = (janus_session *)core_session;
if(session == NULL)
return JANUS_ERROR_SESSION_NOT_FOUND;
if(handle == NULL)
return JANUS_ERROR_HANDLE_NOT_FOUND;
if(!g_atomic_int_compare_and_exchange(&handle->destroyed, 0, 1))
return 0;
/* First of all, hangup the PeerConnection, if any */
janus_ice_webrtc_hangup(handle, "Detach");
janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP);
/* Remove the session from active sessions map */
janus_mutex_lock(&plugin_sessions_mutex);
gboolean found = g_hash_table_remove(plugin_sessions, handle->app_handle);
if(!found) {
janus_mutex_unlock(&plugin_sessions_mutex);
return JANUS_ERROR_HANDLE_NOT_FOUND;
}
janus_mutex_unlock(&plugin_sessions_mutex);
janus_plugin *plugin_t = (janus_plugin *)handle->app;
if(plugin_t == NULL) {
/* There was no plugin attached, probably something went wrong there */
janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT);
janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP);
if(handle->mainloop != NULL) {
if(static_event_loops == 0 && handle->mainloop != NULL && g_main_loop_is_running(handle->mainloop)) {
g_main_loop_quit(handle->mainloop);
}
}
janus_refcount_decrease(&handle->ref);
return 0;
}
JANUS_LOG(LOG_INFO, "Detaching handle from %s; %p %p %p %p\n", plugin_t->get_name(), handle, handle->app_handle, handle->app_handle->gateway_handle, handle->app_handle->plugin_handle);
/* Actually detach handle... */
if(g_atomic_int_compare_and_exchange(&handle->app_handle->stopped, 0, 1)) {
/* Notify the plugin that the session's over (the plugin will
* remove the other reference to the plugin session handle) */
g_async_queue_push(handle->queued_packets, &janus_ice_detach_handle);
g_main_context_wakeup(handle->mainctx);
}
/* Get rid of the handle now */
if(g_atomic_int_compare_and_exchange(&handle->dump_packets, 1, 0)) {
janus_text2pcap_close(handle->text2pcap);
g_clear_pointer(&handle->text2pcap, janus_text2pcap_free);
}
/* We only actually destroy the handle later */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Handle detached, scheduling destruction\n", handle->handle_id);
/* Unref the handle: we only unref the session too when actually freeing the handle, so that it is freed before that */
janus_refcount_decrease(&handle->ref);
return 0;
}
static void janus_ice_handle_free(const janus_refcount *handle_ref) {
janus_ice_handle *handle = janus_refcount_containerof(handle_ref, janus_ice_handle, ref);
/* This stack can be destroyed, free all the resources */
janus_mutex_lock(&handle->mutex);
if(handle->queued_candidates != NULL) {
janus_ice_clear_queued_candidates(handle);
g_async_queue_unref(handle->queued_candidates);
}
if(handle->queued_packets != NULL) {
janus_ice_clear_queued_packets(handle);
g_async_queue_unref(handle->queued_packets);
}
if(static_event_loops == 0 && handle->mainloop != NULL) {
g_main_loop_unref(handle->mainloop);
handle->mainloop = NULL;
}
if(static_event_loops == 0 && handle->mainctx != NULL) {
g_main_context_unref(handle->mainctx);
handle->mainctx = NULL;
}
janus_mutex_unlock(&handle->mutex);
janus_ice_webrtc_free(handle);
JANUS_LOG(LOG_INFO, "[%"SCNu64"] Handle and related resources freed; %p %p\n", handle->handle_id, handle, handle->session);
/* Finally, unref the session and free the handle */
if(handle->session != NULL) {
janus_session *session = (janus_session *)handle->session;
janus_refcount_decrease(&session->ref);
}
g_free(handle->opaque_id);
g_free(handle->token);
g_free(handle);
}
static void janus_ice_plugin_session_free(const janus_refcount *app_handle_ref) {
janus_plugin_session *app_handle = janus_refcount_containerof(app_handle_ref, janus_plugin_session, ref);
/* This app handle can be destroyed, free all the resources */
if(app_handle->gateway_handle != NULL) {
janus_ice_handle *handle = (janus_ice_handle *)app_handle->gateway_handle;
app_handle->gateway_handle = NULL;
handle->app_handle = NULL;
janus_refcount_decrease(&handle->ref);
}
g_free(app_handle);
}
void janus_ice_webrtc_hangup(janus_ice_handle *handle, const char *reason) {
if(handle == NULL)
return;
g_atomic_int_set(&handle->closepc, 0);
if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT))
return;
janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT);
janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_CLEANING);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_GOT_OFFER);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_GOT_ANSWER);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_NEGOTIATED);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_AUDIO);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_VIDEO);
/* User will be notified only after the actual hangup */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Hanging up PeerConnection because of a %s\n",
handle->handle_id, reason);
handle->hangup_reason = reason;
/* Let's message the loop, we'll notify the plugin from there */
if(handle->queued_packets != NULL) {
#if GLIB_CHECK_VERSION(2, 46, 0)
g_async_queue_push_front(handle->queued_packets, &janus_ice_hangup_peerconnection);
#else
g_async_queue_push(handle->queued_packets, &janus_ice_hangup_peerconnection);
#endif
g_main_context_wakeup(handle->mainctx);
}
}
static void janus_ice_webrtc_free(janus_ice_handle *handle) {
if(handle == NULL)
return;
janus_mutex_lock(&handle->mutex);
if(!handle->agent_created) {
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_NEW_DATACHAN_SDP);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_READY);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_CLEANING);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_AGENT);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_E2EE);
janus_mutex_unlock(&handle->mutex);
return;
}
handle->agent_created = 0;
if(handle->stream != NULL) {
janus_ice_stream_destroy(handle->stream);
handle->stream = NULL;
}
if(handle->agent != NULL) {
if(G_IS_OBJECT(handle->agent))
g_object_unref(handle->agent);
handle->agent = NULL;
}
if(handle->pending_trickles) {
while(handle->pending_trickles) {
GList *temp = g_list_first(handle->pending_trickles);
handle->pending_trickles = g_list_remove_link(handle->pending_trickles, temp);
janus_ice_trickle *trickle = (janus_ice_trickle *)temp->data;
g_list_free(temp);
janus_ice_trickle_destroy(trickle);
}
}
handle->pending_trickles = NULL;
janus_ice_clear_queued_candidates(handle);
g_free(handle->rtp_profile);
handle->rtp_profile = NULL;
g_free(handle->local_sdp);
handle->local_sdp = NULL;
g_free(handle->remote_sdp);
handle->remote_sdp = NULL;
handle->stream_mid = NULL;
g_free(handle->audio_mid);
handle->audio_mid = NULL;
g_free(handle->video_mid);
handle->video_mid = NULL;
g_free(handle->data_mid);
handle->data_mid = NULL;
handle->thread = NULL;
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_NEW_DATACHAN_SDP);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_READY);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_CLEANING);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_AGENT);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_E2EE);
if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP) && handle->hangup_reason) {
janus_ice_notify_hangup(handle, handle->hangup_reason);
}
handle->hangup_reason = NULL;
janus_mutex_unlock(&handle->mutex);
JANUS_LOG(LOG_INFO, "[%"SCNu64"] WebRTC resources freed; %p %p\n", handle->handle_id, handle, handle->session);
}
void janus_ice_stream_destroy(janus_ice_stream *stream) {
if(stream == NULL)
return;
if(stream->component != NULL) {
janus_ice_component_destroy(stream->component);
stream->component = NULL;
}
if(stream->pending_nacked_cleanup && g_hash_table_size(stream->pending_nacked_cleanup) > 0) {
GHashTableIter iter;
gpointer val;
g_hash_table_iter_init(&iter, stream->pending_nacked_cleanup);
while(g_hash_table_iter_next(&iter, NULL, &val)) {
GSource *source = val;
g_source_destroy(source);
}
g_hash_table_destroy(stream->pending_nacked_cleanup);
}
stream->pending_nacked_cleanup = NULL;
janus_ice_handle *handle = stream->handle;
if(handle != NULL) {
janus_refcount_decrease(&handle->ref);
stream->handle = NULL;
}
janus_refcount_decrease(&stream->ref);
}
static void janus_ice_stream_free(const janus_refcount *stream_ref) {
janus_ice_stream *stream = janus_refcount_containerof(stream_ref, janus_ice_stream, ref);
/* This stream can be destroyed, free all the resources */
stream->handle = NULL;
g_free(stream->remote_hashing);
stream->remote_hashing = NULL;
g_free(stream->remote_fingerprint);
stream->remote_fingerprint = NULL;
g_free(stream->ruser);
stream->ruser = NULL;
g_free(stream->rpass);
stream->rpass = NULL;
g_free(stream->rid[0]);
stream->rid[0] = NULL;
g_free(stream->rid[1]);
stream->rid[1] = NULL;
g_free(stream->rid[2]);
stream->rid[2] = NULL;
g_list_free(stream->audio_payload_types);
stream->audio_payload_types = NULL;
g_list_free(stream->video_payload_types);
stream->video_payload_types = NULL;
if(stream->rtx_payload_types != NULL)
g_hash_table_destroy(stream->rtx_payload_types);
stream->rtx_payload_types = NULL;
if(stream->clock_rates != NULL)
g_hash_table_destroy(stream->clock_rates);
stream->clock_rates = NULL;
g_free(stream->audio_codec);
stream->audio_codec = NULL;
g_free(stream->video_codec);
stream->video_codec = NULL;
g_free(stream->audio_rtcp_ctx);
stream->audio_rtcp_ctx = NULL;
g_free(stream->video_rtcp_ctx[0]);
stream->video_rtcp_ctx[0] = NULL;
g_free(stream->video_rtcp_ctx[1]);
stream->video_rtcp_ctx[1] = NULL;
g_free(stream->video_rtcp_ctx[2]);
stream->video_rtcp_ctx[2] = NULL;
if(stream->rtx_nacked[0])
g_hash_table_destroy(stream->rtx_nacked[0]);
stream->rtx_nacked[0] = NULL;
if(stream->rtx_nacked[1])
g_hash_table_destroy(stream->rtx_nacked[1]);
stream->rtx_nacked[1] = NULL;
if(stream->rtx_nacked[2])
g_hash_table_destroy(stream->rtx_nacked[2]);
stream->rtx_nacked[2] = NULL;
g_slist_free_full(stream->transport_wide_received_seq_nums, (GDestroyNotify)g_free);
stream->transport_wide_received_seq_nums = NULL;
stream->audio_first_ntp_ts = 0;
stream->audio_first_rtp_ts = 0;
stream->video_first_ntp_ts[0] = 0;
stream->video_first_ntp_ts[1] = 0;
stream->video_first_ntp_ts[2] = 0;
stream->video_first_rtp_ts[0] = 0;
stream->video_first_rtp_ts[1] = 0;
stream->video_first_rtp_ts[2] = 0;
stream->audio_last_rtp_ts = 0;
stream->audio_last_ntp_ts = 0;
stream->video_last_rtp_ts = 0;
stream->video_last_ntp_ts = 0;
g_free(stream);
stream = NULL;
}
void janus_ice_component_destroy(janus_ice_component *component) {
if(component == NULL)
return;
janus_ice_stream *stream = component->stream;
if(stream != NULL) {
janus_refcount_decrease(&stream->ref);
component->stream = NULL;
}
janus_dtls_srtp_destroy(component->dtls);
janus_refcount_decrease(&component->ref);
}
static void janus_ice_component_free(const janus_refcount *component_ref) {
janus_ice_component *component = janus_refcount_containerof(component_ref, janus_ice_component, ref);
if(component->icestate_source != NULL) {
g_source_destroy(component->icestate_source);
g_source_unref(component->icestate_source);
component->icestate_source = NULL;
}
if(component->dtlsrt_source != NULL) {
g_source_destroy(component->dtlsrt_source);
g_source_unref(component->dtlsrt_source);
component->dtlsrt_source = NULL;
}
if(component->dtls != NULL) {
janus_dtls_srtp_destroy(component->dtls);
janus_refcount_decrease(&component->dtls->ref);
component->dtls = NULL;
}
if(component->audio_retransmit_buffer != NULL) {
janus_rtp_packet *p = NULL;
while((p = (janus_rtp_packet *)g_queue_pop_head(component->audio_retransmit_buffer)) != NULL) {
/* Remove from hashtable too */
janus_rtp_header *header = (janus_rtp_header *)p->data;
guint16 seq = ntohs(header->seq_number);
g_hash_table_remove(component->audio_retransmit_seqs, GUINT_TO_POINTER(seq));
/* Free the packet */
janus_ice_free_rtp_packet(p);
}
g_queue_free(component->audio_retransmit_buffer);
g_hash_table_destroy(component->audio_retransmit_seqs);
}
if(component->video_retransmit_buffer != NULL) {
janus_rtp_packet *p = NULL;
while((p = (janus_rtp_packet *)g_queue_pop_head(component->video_retransmit_buffer)) != NULL) {
/* Remove from hashtable too */
janus_rtp_header *header = (janus_rtp_header *)p->data;
guint16 seq = ntohs(header->seq_number);
g_hash_table_remove(component->video_retransmit_seqs, GUINT_TO_POINTER(seq));
/* Free the packet */
janus_ice_free_rtp_packet(p);
}
g_queue_free(component->video_retransmit_buffer);
g_hash_table_destroy(component->video_retransmit_seqs);
}
if(component->candidates != NULL) {
GSList *i = NULL, *candidates = component->candidates;
for(i = candidates; i; i = i->next) {
NiceCandidate *c = (NiceCandidate *) i->data;
if(c != NULL) {
nice_candidate_free(c);
c = NULL;
}
}
g_slist_free(candidates);
candidates = NULL;
}
component->candidates = NULL;
if(component->local_candidates != NULL) {
GSList *i = NULL, *candidates = component->local_candidates;
for(i = candidates; i; i = i->next) {
gchar *c = (gchar *) i->data;
g_free(c);
}
g_slist_free(candidates);
candidates = NULL;
}
component->local_candidates = NULL;
if(component->remote_candidates != NULL) {
GSList *i = NULL, *candidates = component->remote_candidates;
for(i = candidates; i; i = i->next) {
gchar *c = (gchar *) i->data;
g_free(c);
}
g_slist_free(candidates);
candidates = NULL;
}
component->remote_candidates = NULL;
g_free(component->selected_pair);
component->selected_pair = NULL;
if(component->last_seqs_audio)
janus_seq_list_free(&component->last_seqs_audio);
if(component->last_seqs_video[0])
janus_seq_list_free(&component->last_seqs_video[0]);
if(component->last_seqs_video[1])
janus_seq_list_free(&component->last_seqs_video[1]);
if(component->last_seqs_video[2])
janus_seq_list_free(&component->last_seqs_video[2]);
g_free(component);
//~ janus_mutex_unlock(&handle->mutex);
}
/* Call plugin slow_link callback if a minimum of lost packets are detected within a second */
static void
janus_slow_link_update(janus_ice_component *component, janus_ice_handle *handle,
gboolean video, gboolean uplink, guint lost) {
/* We keep the counters in different janus_ice_stats objects, depending on the direction */
guint sl_lost_last_count = uplink ?
(video ? component->in_stats.sl_lost_count_video : component->in_stats.sl_lost_count_audio) :
(video ? component->out_stats.sl_lost_count_video : component->out_stats.sl_lost_count_audio);
guint sl_lost_recently = (lost >= sl_lost_last_count) ? (lost - sl_lost_last_count) : 0;
if(slowlink_threshold > 0 && sl_lost_recently >= slowlink_threshold) {
/* Tell the plugin */
janus_plugin *plugin = (janus_plugin *)handle->app;
if(plugin && plugin->slow_link && janus_plugin_session_is_alive(handle->app_handle) &&
!g_atomic_int_get(&handle->destroyed))
plugin->slow_link(handle->app_handle, uplink, video);
/* Notify the user/application too */
janus_session *session = (janus_session *)handle->session;
if(session != NULL) {
json_t *event = json_object();
json_object_set_new(event, "janus", json_string("slowlink"));
json_object_set_new(event, "session_id", json_integer(session->session_id));
json_object_set_new(event, "sender", json_integer(handle->handle_id));
if(opaqueid_in_api && handle->opaque_id != NULL)
json_object_set_new(event, "opaque_id", json_string(handle->opaque_id));
json_object_set_new(event, "media", json_string(video ? "video" : "audio"));
json_object_set_new(event, "uplink", uplink ? json_true() : json_false());
json_object_set_new(event, "lost", json_integer(sl_lost_recently));
/* Send the event */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Sending event to transport...; %p\n", handle->handle_id, handle);
janus_session_notify_event(session, event);
/* Finally, notify event handlers */
if(janus_events_is_enabled()) {
json_t *info = json_object();
json_object_set_new(info, "media", json_string(video ? "video" : "audio"));
json_object_set_new(info, "slow_link", json_string(uplink ? "uplink" : "downlink"));
json_object_set_new(info, "lost_lastsec", json_integer(sl_lost_recently));
janus_events_notify_handlers(JANUS_EVENT_TYPE_MEDIA, JANUS_EVENT_SUBTYPE_MEDIA_SLOWLINK,
session->session_id, handle->handle_id, handle->opaque_id, info);
}
}
}
/* Update the counter */
if(uplink) {
if(video)
component->in_stats.sl_lost_count_video = lost;
else
component->in_stats.sl_lost_count_audio = lost;
} else {
if(video)
component->out_stats.sl_lost_count_video = lost;
else
component->out_stats.sl_lost_count_audio = lost;
}
}
/* ICE state check timer (needed to check if a failed really is definitive or if things can still improve) */
static gboolean janus_ice_check_failed(gpointer data) {
janus_ice_component *component = (janus_ice_component *)data;
if(component == NULL)
return FALSE;
janus_ice_stream *stream = component->stream;
if(!stream)
goto stoptimer;
janus_ice_handle *handle = stream->handle;
if(!handle)
goto stoptimer;
if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP) ||
janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT))
goto stoptimer;
if(component->state == NICE_COMPONENT_STATE_CONNECTED || component->state == NICE_COMPONENT_STATE_READY) {
/* ICE succeeded in the meanwhile, get rid of this timer */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] ICE succeeded, disabling ICE state check timer!\n", handle->handle_id);
goto stoptimer;
}
/* Still in the failed state, how much time passed since we first detected it? */
if(janus_get_monotonic_time() - component->icefailed_detected < 5*G_USEC_PER_SEC) {
/* Let's wait a little longer */
return TRUE;
}
/* If we got here it means the timer expired, and we should check if this is a failure */
gboolean trickle_recv = (!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_TRICKLE) || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALL_TRICKLES));
gboolean answer_recv = janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_GOT_ANSWER);
gboolean alert_set = janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT);
/* We may still be waiting for something... but we don't wait forever */
gboolean do_wait = TRUE;
if(janus_get_monotonic_time() - component->icefailed_detected >= 15*G_USEC_PER_SEC) {
do_wait = FALSE;
}
if(!do_wait || (handle && trickle_recv && answer_recv && !alert_set)) {
/* FIXME Should we really give up for what may be a failure in only one of the media? */
JANUS_LOG(LOG_ERR, "[%"SCNu64"] ICE failed for component %d in stream %d...\n",
handle->handle_id, component->component_id, stream->stream_id);
janus_ice_webrtc_hangup(handle, "ICE failed");
goto stoptimer;
}
/* Let's wait a little longer */
JANUS_LOG(LOG_WARN, "[%"SCNu64"] ICE failed for component %d in stream %d, but we're still waiting for some info so we don't care... (trickle %s, answer %s, alert %s)\n",
handle->handle_id, component->component_id, stream->stream_id,
trickle_recv ? "received" : "pending",
answer_recv ? "received" : "pending",
alert_set ? "set" : "not set");
return TRUE;
stoptimer:
if(component->icestate_source != NULL) {
g_source_destroy(component->icestate_source);
g_source_unref(component->icestate_source);
component->icestate_source = NULL;
}
return FALSE;
}
/* Callbacks */
static void janus_ice_cb_candidate_gathering_done(NiceAgent *agent, guint stream_id, gpointer user_data) {
janus_ice_handle *handle = (janus_ice_handle *)user_data;
if(!handle)
return;
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Gathering done for stream %d\n", handle->handle_id, stream_id);
handle->cdone++;
janus_ice_stream *stream = handle->stream;
if(!stream || stream->stream_id != stream_id) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] No stream %d??\n", handle->handle_id, stream_id);
return;
}
stream->cdone = 1;
/* If we're doing full-trickle, send an event to the user too */
if(janus_full_trickle_enabled) {
/* Send a "trickle" event with completed:true to the browser */
janus_ice_notify_trickle(handle, NULL);
}
}
static void janus_ice_cb_component_state_changed(NiceAgent *agent, guint stream_id, guint component_id, guint state, gpointer ice) {
janus_ice_handle *handle = (janus_ice_handle *)ice;
if(!handle)
return;
if(component_id > 1) {
/* State changed for a component we don't need anymore (rtcp-mux) */
return;
}
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Component state changed for component %d in stream %d: %d (%s)\n",
handle->handle_id, component_id, stream_id, state, janus_get_ice_state_name(state));
janus_ice_stream *stream = handle->stream;
if(!stream || stream->stream_id != stream_id) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] No stream %d??\n", handle->handle_id, stream_id);
return;
}
janus_ice_component *component = stream->component;
if(!component || component->component_id != component_id) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] No component %d in stream %d??\n", handle->handle_id, component_id, stream_id);
return;
}
component->state = state;
/* Notify event handlers */
if(janus_events_is_enabled()) {
janus_session *session = (janus_session *)handle->session;
json_t *info = json_object();
json_object_set_new(info, "ice", json_string(janus_get_ice_state_name(state)));
json_object_set_new(info, "stream_id", json_integer(stream_id));
json_object_set_new(info, "component_id", json_integer(component_id));
janus_events_notify_handlers(JANUS_EVENT_TYPE_WEBRTC, JANUS_EVENT_SUBTYPE_WEBRTC_ICE,
session->session_id, handle->handle_id, handle->opaque_id, info);
}
/* FIXME Even in case the state is 'connected', we wait for the 'new-selected-pair' callback to do anything */
if(state == NICE_COMPONENT_STATE_FAILED) {
/* Failed doesn't mean necessarily we need to give up: we may be trickling */
gboolean alert_set = janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT);
if(alert_set)
return;
gboolean trickle_recv = (!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_TRICKLE) || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALL_TRICKLES));
gboolean answer_recv = janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_GOT_ANSWER);
JANUS_LOG(LOG_WARN, "[%"SCNu64"] ICE failed for component %d in stream %d, but let's give it some time... (trickle %s, answer %s, alert %s)\n",
handle->handle_id, component_id, stream_id,
trickle_recv ? "received" : "pending",
answer_recv ? "received" : "pending",
alert_set ? "set" : "not set");
/* In case we haven't started a timer yet, let's do it now */
if(component->icestate_source == NULL && component->icefailed_detected == 0) {
component->icefailed_detected = janus_get_monotonic_time();
component->icestate_source = g_timeout_source_new(500);
g_source_set_callback(component->icestate_source, janus_ice_check_failed, component, NULL);
guint id = g_source_attach(component->icestate_source, handle->mainctx);
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Creating ICE state check timer with ID %u\n", handle->handle_id, id);
}
}
}
#ifndef HAVE_LIBNICE_TCP
static void janus_ice_cb_new_selected_pair (NiceAgent *agent, guint stream_id, guint component_id, gchar *local, gchar *remote, gpointer ice) {
#else
static void janus_ice_cb_new_selected_pair (NiceAgent *agent, guint stream_id, guint component_id, NiceCandidate *local, NiceCandidate *remote, gpointer ice) {
#endif
janus_ice_handle *handle = (janus_ice_handle *)ice;
if(!handle)
return;
if(component_id > 1) {
/* New selected pair for a component we don't need anymore (rtcp-mux) */
return;
}
#ifndef HAVE_LIBNICE_TCP
JANUS_LOG(LOG_VERB, "[%"SCNu64"] New selected pair for component %d in stream %d: %s <-> %s\n", handle ? handle->handle_id : 0, component_id, stream_id, local, remote);
#else
JANUS_LOG(LOG_VERB, "[%"SCNu64"] New selected pair for component %d in stream %d: %s <-> %s\n", handle ? handle->handle_id : 0, component_id, stream_id, local->foundation, remote->foundation);
#endif
janus_ice_stream *stream = handle->stream;
if(!stream || stream->stream_id != stream_id) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] No stream %d??\n", handle->handle_id, stream_id);
return;
}
janus_ice_component *component = stream->component;
if(!component || component->component_id != component_id) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] No component %d in stream %d??\n", handle->handle_id, component_id, stream_id);
return;
}
char sp[200];
#ifndef HAVE_LIBNICE_TCP
g_snprintf(sp, 200, "%s <-> %s", local, remote);
#else
gchar laddress[NICE_ADDRESS_STRING_LEN], raddress[NICE_ADDRESS_STRING_LEN];
gint lport = 0, rport = 0;
nice_address_to_string(&(local->addr), (gchar *)&laddress);
nice_address_to_string(&(remote->addr), (gchar *)&raddress);
lport = nice_address_get_port(&(local->addr));
rport = nice_address_get_port(&(remote->addr));
const char *ltype = NULL, *rtype = NULL;
switch(local->type) {
case NICE_CANDIDATE_TYPE_HOST:
ltype = "host";
break;
case NICE_CANDIDATE_TYPE_SERVER_REFLEXIVE:
ltype = "srflx";
break;
case NICE_CANDIDATE_TYPE_PEER_REFLEXIVE:
ltype = "prflx";
break;
case NICE_CANDIDATE_TYPE_RELAYED:
ltype = "relay";
break;
default:
break;
}
switch(remote->type) {
case NICE_CANDIDATE_TYPE_HOST:
rtype = "host";
break;
case NICE_CANDIDATE_TYPE_SERVER_REFLEXIVE:
rtype = "srflx";
break;
case NICE_CANDIDATE_TYPE_PEER_REFLEXIVE:
rtype = "prflx";
break;
case NICE_CANDIDATE_TYPE_RELAYED:
rtype = "relay";
break;
default:
break;
}
g_snprintf(sp, sizeof(sp), "%s:%d [%s,%s] <-> %s:%d [%s,%s]",
laddress, lport, ltype, local->transport == NICE_CANDIDATE_TRANSPORT_UDP ? "udp" : "tcp",
raddress, rport, rtype, remote->transport == NICE_CANDIDATE_TRANSPORT_UDP ? "udp" : "tcp");
#endif
gboolean newpair = FALSE;
if(component->selected_pair == NULL || strcmp(sp, component->selected_pair)) {
newpair = TRUE;
gchar *prev_selected_pair = component->selected_pair;
component->selected_pair = g_strdup(sp);
g_clear_pointer(&prev_selected_pair, g_free);
}
/* Notify event handlers */
if(newpair && janus_events_is_enabled()) {
janus_session *session = (janus_session *)handle->session;
json_t *info = json_object();
json_object_set_new(info, "selected-pair", json_string(sp));
#ifdef HAVE_LIBNICE_TCP
json_t *candidates = json_object();
json_t *lcand = json_object();
json_object_set_new(lcand, "address", json_string(laddress));
json_object_set_new(lcand, "port", json_integer(lport));
json_object_set_new(lcand, "type", json_string(ltype));
json_object_set_new(lcand, "transport", json_string(local->transport == NICE_CANDIDATE_TRANSPORT_UDP ? "udp" : "tcp"));
json_object_set_new(lcand, "family", json_integer(nice_address_ip_version(&local->addr)));
json_object_set_new(candidates, "local", lcand);
json_t *rcand = json_object();
json_object_set_new(rcand, "address", json_string(raddress));
json_object_set_new(rcand, "port", json_integer(rport));
json_object_set_new(rcand, "type", json_string(rtype));
json_object_set_new(rcand, "transport", json_string(remote->transport == NICE_CANDIDATE_TRANSPORT_UDP ? "udp" : "tcp"));
json_object_set_new(rcand, "family", json_integer(nice_address_ip_version(&remote->addr)));
json_object_set_new(candidates, "remote", rcand);
json_object_set_new(info, "candidates", candidates);
#endif
json_object_set_new(info, "stream_id", json_integer(stream_id));
json_object_set_new(info, "component_id", json_integer(component_id));
janus_events_notify_handlers(JANUS_EVENT_TYPE_WEBRTC, JANUS_EVENT_SUBTYPE_WEBRTC_PAIR,
session->session_id, handle->handle_id, handle->opaque_id, info);
}
/* Have we been here before? (might happen, when trickling) */
if(component->component_connected > 0)
return;
/* FIXME Clear the queue */
janus_ice_clear_queued_packets(handle);
/* Now we can start the DTLS handshake (FIXME This was on the 'connected' state notification, before) */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Component is ready enough, starting DTLS handshake...\n", handle->handle_id);
component->component_connected = janus_get_monotonic_time();
/* Start the DTLS handshake, at last */
#if GLIB_CHECK_VERSION(2, 46, 0)
g_async_queue_push_front(handle->queued_packets, &janus_ice_dtls_handshake);
#else
g_async_queue_push(handle->queued_packets, &janus_ice_dtls_handshake);
#endif
g_main_context_wakeup(handle->mainctx);
}
/* Candidates management */
static int janus_ice_candidate_to_string(janus_ice_handle *handle, NiceCandidate *c, char *buffer, int buflen, gboolean log_candidate, gboolean force_private, guint public_ip_index);
#ifndef HAVE_LIBNICE_TCP
static void janus_ice_cb_new_local_candidate (NiceAgent *agent, guint stream_id, guint component_id, gchar *foundation, gpointer ice) {
#else
static void janus_ice_cb_new_local_candidate (NiceAgent *agent, NiceCandidate *candidate, gpointer ice) {
#endif
if(!janus_full_trickle_enabled) {
/* Ignore if we're not full-trickling: for half-trickle
* janus_ice_candidates_to_sdp() is used instead */
return;
}
janus_ice_handle *handle = (janus_ice_handle *)ice;
if(!handle)
return;
#ifndef HAVE_LIBNICE_TCP
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Discovered new local candidate for component %d in stream %d: foundation=%s\n", handle ? handle->handle_id : 0, component_id, stream_id, foundation);
#else
const char *ctype = NULL;
switch(candidate->type) {
case NICE_CANDIDATE_TYPE_HOST:
ctype = "host";
break;
case NICE_CANDIDATE_TYPE_SERVER_REFLEXIVE:
ctype = "srflx";
break;
case NICE_CANDIDATE_TYPE_PEER_REFLEXIVE:
ctype = "prflx";
break;
case NICE_CANDIDATE_TYPE_RELAYED:
ctype = "relay";
break;
default:
break;
}
guint stream_id = candidate->stream_id;
guint component_id = candidate->component_id;
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Discovered new local candidate for component %d in stream %d: type=%s\n", handle ? handle->handle_id : 0, component_id, stream_id, ctype);
#endif
if(component_id > 1) {
/* New remote candidate for a component we don't need anymore (rtcp-mux) */
return;
}
janus_ice_stream *stream = handle->stream;
if(!stream || stream->stream_id != stream_id) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] No stream %d??\n", handle->handle_id, stream_id);
return;
}
janus_ice_component *component = stream->component;
if(!component || component->component_id != component_id) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] No component %d in stream %d??\n", handle->handle_id, component_id, stream_id);
return;
}
#ifndef HAVE_LIBNICE_TCP
/* Get local candidates and look for the related foundation */
NiceCandidate *candidate = NULL;
GSList *candidates = nice_agent_get_local_candidates(agent, component_id, stream_id), *tmp = candidates;
while(tmp) {
NiceCandidate *c = (NiceCandidate *)tmp->data;
/* Check if this is what we're looking for */
if(!candidate && !strcasecmp(c->foundation, foundation)) {
/* It is! */
candidate = c;
} else {
nice_candidate_free(c);
}
tmp = tmp->next;
}
g_slist_free(candidates);
if(candidate == NULL) {
JANUS_LOG(LOG_WARN, "Candidate with foundation %s not found?\n", foundation);
return;
}
#endif
char buffer[200];
guint public_ip_index = 0;
do {
if(janus_ice_candidate_to_string(handle, candidate, buffer, sizeof(buffer), TRUE, FALSE, public_ip_index) == 0) {
/* Candidate encoded, send a "trickle" event to the browser (but only if it's not a 'prflx') */
if(candidate->type == NICE_CANDIDATE_TYPE_PEER_REFLEXIVE) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Skipping prflx candidate...\n", handle->handle_id);
} else {
janus_ice_notify_trickle(handle, buffer);
/* If nat-1-1 is enabled but we want to keep the private host, add another candidate */
if(nat_1_1_enabled && keep_private_host && public_ip_index == 0 &&
janus_ice_candidate_to_string(handle, candidate, buffer, sizeof(buffer), TRUE, TRUE, public_ip_index) == 0) {
if(candidate->type == NICE_CANDIDATE_TYPE_PEER_REFLEXIVE) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Skipping prflx candidate...\n", handle->handle_id);
} else {
janus_ice_notify_trickle(handle, buffer);
}
}
}
}
public_ip_index++;
} while (public_ip_index < janus_get_public_ip_count());
#ifndef HAVE_LIBNICE_TCP
nice_candidate_free(candidate);
#endif
}
#ifndef HAVE_LIBNICE_TCP
static void janus_ice_cb_new_remote_candidate (NiceAgent *agent, guint stream_id, guint component_id, gchar *foundation, gpointer ice) {
#else
static void janus_ice_cb_new_remote_candidate (NiceAgent *agent, NiceCandidate *candidate, gpointer ice) {
#endif
janus_ice_handle *handle = (janus_ice_handle *)ice;
if(!handle)
return;
#ifndef HAVE_LIBNICE_TCP
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Discovered new remote candidate for component %d in stream %d: foundation=%s\n", handle ? handle->handle_id : 0, component_id, stream_id, foundation);
#else
const char *ctype = NULL;
switch(candidate->type) {
case NICE_CANDIDATE_TYPE_HOST:
ctype = "host";
break;
case NICE_CANDIDATE_TYPE_SERVER_REFLEXIVE:
ctype = "srflx";
break;
case NICE_CANDIDATE_TYPE_PEER_REFLEXIVE:
ctype = "prflx";
break;
case NICE_CANDIDATE_TYPE_RELAYED:
ctype = "relay";
break;
default:
break;
}
guint stream_id = candidate->stream_id;
guint component_id = candidate->component_id;
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Discovered new remote candidate for component %d in stream %d: type=%s\n", handle ? handle->handle_id : 0, component_id, stream_id, ctype);
#endif
if(component_id > 1) {
/* New remote candidate for a component we don't need anymore (rtcp-mux) */
return;
}
janus_ice_stream *stream = handle->stream;
if(!stream || stream->stream_id != stream_id) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] No stream %d??\n", handle->handle_id, stream_id);
return;
}
janus_ice_component *component = stream->component;
if(!component || component->component_id != component_id) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] No component %d in stream %d??\n", handle->handle_id, component_id, stream_id);
return;
}
#ifndef HAVE_LIBNICE_TCP
/* Get remote candidates and look for the related foundation */
NiceCandidate *candidate = NULL;
GSList *candidates = nice_agent_get_remote_candidates(agent, component_id, stream_id), *tmp = candidates;
while(tmp) {
NiceCandidate *c = (NiceCandidate *)tmp->data;
if(candidate == NULL) {
/* Check if this is what we're looking for */
if(!strcasecmp(c->foundation, foundation)) {
/* It is! */
candidate = c;
tmp = tmp->next;
continue;
}
}
nice_candidate_free(c);
tmp = tmp->next;
}
g_slist_free(candidates);
if(candidate == NULL) {
JANUS_LOG(LOG_WARN, "Candidate with foundation %s not found?\n", foundation);
return;
}
#endif
/* Render the candidate and add it to the remote_candidates cache for the admin API */
if(candidate->type != NICE_CANDIDATE_TYPE_PEER_REFLEXIVE) {
/* ... but only if it's 'prflx', the others we add ourselves */
goto candidatedone;
}
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Stream #%d, Component #%d\n", handle->handle_id, candidate->stream_id, candidate->component_id);
gchar address[NICE_ADDRESS_STRING_LEN], base_address[NICE_ADDRESS_STRING_LEN];
gint port = 0, base_port = 0;
nice_address_to_string(&(candidate->addr), (gchar *)&address);
port = nice_address_get_port(&(candidate->addr));
nice_address_to_string(&(candidate->base_addr), (gchar *)&base_address);
base_port = nice_address_get_port(&(candidate->base_addr));
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Address: %s:%d\n", handle->handle_id, address, port);
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Priority: %d\n", handle->handle_id, candidate->priority);
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Foundation: %s\n", handle->handle_id, candidate->foundation);
char buffer[100];
if(candidate->transport == NICE_CANDIDATE_TRANSPORT_UDP) {
g_snprintf(buffer, 100,
"%s %d %s %d %s %d typ prflx raddr %s rport %d\r\n",
candidate->foundation,
candidate->component_id,
"udp",
candidate->priority,
address,
port,
base_address,
base_port);
} else {
if(!janus_ice_tcp_enabled) {
/* ICETCP support disabled */
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Skipping prflx TCP candidate, ICETCP support disabled...\n", handle->handle_id);
goto candidatedone;
}
#ifndef HAVE_LIBNICE_TCP
/* TCP candidates are only supported since libnice 0.1.8 */
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Skipping prflx TCP candidate, the libnice version doesn't support it...\n", handle->handle_id);
goto candidatedone;
#else
const char *type = NULL;
switch(candidate->transport) {
case NICE_CANDIDATE_TRANSPORT_TCP_ACTIVE:
type = "active";
break;
case NICE_CANDIDATE_TRANSPORT_TCP_PASSIVE:
type = "passive";
break;
case NICE_CANDIDATE_TRANSPORT_TCP_SO:
type = "so";
break;
default:
break;
}
if(type == NULL) {
/* FIXME Unsupported transport */
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Unsupported transport, skipping nonUDP/TCP prflx candidate...\n", handle->handle_id);
goto candidatedone;
} else {
g_snprintf(buffer, 100,
"%s %d %s %d %s %d typ prflx raddr %s rport %d tcptype %s\r\n",
candidate->foundation,
candidate->component_id,
"tcp",
candidate->priority,
address,
port,
base_address,
base_port,
type);
}
#endif
}
/* Now parse the candidate as if we received it from the Janus API */
int res = janus_sdp_parse_candidate(stream, buffer, 1);
if(res != 0) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Failed to parse prflx candidate... (%d)\n", handle->handle_id, res);
}
candidatedone:
#ifndef HAVE_LIBNICE_TCP
nice_candidate_free(candidate);
#endif
return;
}
static void janus_ice_cb_nice_recv(NiceAgent *agent, guint stream_id, guint component_id, guint len, gchar *buf, gpointer ice) {
janus_ice_component *component = (janus_ice_component *)ice;
if(!component) {
JANUS_LOG(LOG_ERR, "No component %d in stream %d??\n", component_id, stream_id);
return;
}
janus_ice_stream *stream = component->stream;
if(!stream) {
JANUS_LOG(LOG_ERR, "No stream %d??\n", stream_id);
return;
}
janus_ice_handle *handle = stream->handle;
if(!handle) {
JANUS_LOG(LOG_ERR, "No handle for stream %d??\n", stream_id);
return;
}
janus_session *session = (janus_session *)handle->session;
if(!component->dtls) { /* Still waiting for the DTLS stack */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Still waiting for the DTLS stack for component %d in stream %d...\n", handle->handle_id, component_id, stream_id);
return;
}
if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP) || janus_is_stopping()) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Forced to stop it here...\n", handle->handle_id);
return;
}
/* What is this? */
if(janus_is_dtls(buf) || (!janus_is_rtp(buf, len) && !janus_is_rtcp(buf, len))) {
/* This is DTLS: either handshake stuff, or data coming from SCTP DataChannels */
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Looks like DTLS!\n", handle->handle_id);
janus_dtls_srtp_incoming_msg(component->dtls, buf, len);
/* Update stats (TODO Do the same for the last second window as well) */
component->in_stats.data.packets++;
component->in_stats.data.bytes += len;
return;
}
/* Not DTLS... RTP or RTCP? (http://tools.ietf.org/html/rfc5761#section-4) */
if(janus_is_rtp(buf, len)) {
/* This is RTP */
if(janus_is_webrtc_encryption_enabled() && (!component->dtls || !component->dtls->srtp_valid || !component->dtls->srtp_in)) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Missing valid SRTP session (packet arrived too early?), skipping...\n", handle->handle_id);
} else {
janus_rtp_header *header = (janus_rtp_header *)buf;
guint32 packet_ssrc = ntohl(header->ssrc);
/* Is this audio or video? */
int video = 0, vindex = 0, rtx = 0;
/* Bundled streams, check SSRC */
video = ((stream->video_ssrc_peer[0] == packet_ssrc
|| stream->video_ssrc_peer_rtx[0] == packet_ssrc
|| stream->video_ssrc_peer[1] == packet_ssrc
|| stream->video_ssrc_peer_rtx[1] == packet_ssrc
|| stream->video_ssrc_peer[2] == packet_ssrc
|| stream->video_ssrc_peer_rtx[2] == packet_ssrc) ? 1 : 0);
if(!video && stream->audio_ssrc_peer != packet_ssrc) {
/* Apparently we were not told the peer SSRCs, try the RTP mid extension (or payload types) */
gboolean found = FALSE;
if(handle->stream->mid_ext_id > 0) {
char sdes_item[16];
if(janus_rtp_header_extension_parse_mid(buf, len, handle->stream->mid_ext_id, sdes_item, sizeof(sdes_item)) == 0) {
if(handle->audio_mid && !strcmp(handle->audio_mid, sdes_item)) {
/* It's audio */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Unadvertized SSRC (%"SCNu32") is audio! (mid %s)\n", handle->handle_id, packet_ssrc, sdes_item);
video = 0;
stream->audio_ssrc_peer = packet_ssrc;
found = TRUE;
} else if(handle->video_mid && !strcmp(handle->video_mid, sdes_item)) {
/* It's video */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Unadvertized SSRC (%"SCNu32") is video! (mid %s)\n", handle->handle_id, packet_ssrc, sdes_item);
video = 1;
/* Check if simulcasting is involved */
if(stream->rid[0] == NULL || stream->rid_ext_id < 1) {
stream->video_ssrc_peer[0] = packet_ssrc;
found = TRUE;
} else {
if(janus_rtp_header_extension_parse_rid(buf, len, stream->rid_ext_id, sdes_item, sizeof(sdes_item)) == 0) {
/* Try the RTP stream ID */
if(stream->rid[2] != NULL && !strcmp(stream->rid[2], sdes_item)) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] -- Simulcasting: rid=%s\n", handle->handle_id, sdes_item);
stream->video_ssrc_peer[0] = packet_ssrc;
vindex = 0;
found = TRUE;
} else if(stream->rid[1] != NULL && !strcmp(stream->rid[1], sdes_item)) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] -- Simulcasting #1: rid=%s\n", handle->handle_id, sdes_item);
stream->video_ssrc_peer[1] = packet_ssrc;
vindex = 1;
found = TRUE;
} else if(stream->rid[0] != NULL && !strcmp(stream->rid[0], sdes_item)) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] -- Simulcasting #2: rid=%s\n", handle->handle_id, sdes_item);
stream->video_ssrc_peer[2] = packet_ssrc;
vindex = 2;
found = TRUE;
} else {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] -- Simulcasting: unknown rid %s..?\n", handle->handle_id, sdes_item);
}
} else if(stream->ridrtx_ext_id > 0 &&
janus_rtp_header_extension_parse_rid(buf, len, stream->ridrtx_ext_id, sdes_item, sizeof(sdes_item)) == 0) {
/* Try the repaired RTP stream ID */
if(stream->rid[2] != NULL && !strcmp(stream->rid[2], sdes_item)) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] -- Simulcasting: rid=%s (rtx)\n", handle->handle_id, sdes_item);
stream->video_ssrc_peer_rtx[0] = packet_ssrc;
vindex = 0;
rtx = 1;
found = TRUE;
} else if(stream->rid[1] != NULL && !strcmp(stream->rid[1], sdes_item)) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] -- Simulcasting #1: rid=%s (rtx)\n", handle->handle_id, sdes_item);
stream->video_ssrc_peer_rtx[1] = packet_ssrc;
vindex = 1;
rtx = 1;
found = TRUE;
} else if(stream->rid[0] != NULL && !strcmp(stream->rid[0], sdes_item)) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] -- Simulcasting #2: rid=%s (rtx)\n", handle->handle_id, sdes_item);
stream->video_ssrc_peer_rtx[2] = packet_ssrc;
vindex = 2;
rtx = 1;
found = TRUE;
} else {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] -- Simulcasting: unknown rid %s..?\n", handle->handle_id, sdes_item);
}
}
}
}
}
}
if(!found) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Not video and not audio? dropping (SSRC %"SCNu32")...\n", handle->handle_id, packet_ssrc);
return;
}
}
/* Make sure we're prepared to receive this media packet */
if((!video && !stream->audio_recv) || (video && !stream->video_recv))
return;
/* If this is video, check if this is simulcast and/or a retransmission using RFC4588 */
if(video) {
if(stream->video_ssrc_peer[1] == packet_ssrc) {
/* FIXME Simulcast (1) */
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Simulcast #1 (SSRC %"SCNu32")...\n", handle->handle_id, packet_ssrc);
vindex = 1;
} else if(stream->video_ssrc_peer[2] == packet_ssrc) {
/* FIXME Simulcast (2) */
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Simulcast #2 (SSRC %"SCNu32")...\n", handle->handle_id, packet_ssrc);
vindex = 2;
} else {
/* Maybe a video retransmission using RFC4588? */
if(stream->video_ssrc_peer_rtx[0] == packet_ssrc) {
rtx = 1;
vindex = 0;
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] RFC4588 rtx packet on video (SSRC %"SCNu32")...\n",
handle->handle_id, packet_ssrc);
} else if(stream->video_ssrc_peer_rtx[1] == packet_ssrc) {
rtx = 1;
vindex = 1;
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] RFC4588 rtx packet on video #%d (SSRC %"SCNu32")...\n",
handle->handle_id, vindex, packet_ssrc);
} else if(stream->video_ssrc_peer_rtx[2] == packet_ssrc) {
rtx = 1;
vindex = 2;
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] RFC4588 rtx packet on video #%d (SSRC %"SCNu32")...\n",
handle->handle_id, vindex, packet_ssrc);
}
}
}
int buflen = len;
srtp_err_status_t res = janus_is_webrtc_encryption_enabled() ?
srtp_unprotect(component->dtls->srtp_in, buf, &buflen) : srtp_err_status_ok;
if(res != srtp_err_status_ok) {
if(res != srtp_err_status_replay_fail && res != srtp_err_status_replay_old) {
/* Only print the error if it's not a 'replay fail' or 'replay old' (which is probably just the result of us NACKing a packet) */
guint32 timestamp = ntohl(header->timestamp);
guint16 seq = ntohs(header->seq_number);
JANUS_LOG(LOG_ERR, "[%"SCNu64"] SRTP unprotect error: %s (len=%d-->%d, ts=%"SCNu32", seq=%"SCNu16")\n", handle->handle_id, janus_srtp_error_str(res), len, buflen, timestamp, seq);
}
} else {
if(video) {
if(stream->video_ssrc_peer[0] == 0) {
stream->video_ssrc_peer[0] = ntohl(header->ssrc);
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Peer video SSRC: %u\n", handle->handle_id, stream->video_ssrc_peer[0]);
}
} else {
if(stream->audio_ssrc_peer == 0) {
stream->audio_ssrc_peer = ntohl(header->ssrc);
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Peer audio SSRC: %u\n", handle->handle_id, stream->audio_ssrc_peer);
}
}
/* Do we need to dump this packet for debugging? */
if(g_atomic_int_get(&handle->dump_packets))
janus_text2pcap_dump(handle->text2pcap, JANUS_TEXT2PCAP_RTP, TRUE, buf, buflen,
"[session=%"SCNu64"][handle=%"SCNu64"]", session->session_id, handle->handle_id);
/* If this is a retransmission using RFC4588, we have to do something first to get the original packet */
janus_rtp_header *header = (janus_rtp_header *)buf;
int plen = 0;
char *payload = janus_rtp_payload(buf, buflen, &plen);
if (!payload) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Error accessing the RTP payload len=%d\n", handle->handle_id, buflen);
}
if(rtx) {
/* The original sequence number is in the first two bytes of the payload */
/* Rewrite the header with the info from the original packet (payload type, SSRC, sequence number) */
header->type = stream->video_payload_type;
packet_ssrc = stream->video_ssrc_peer[vindex];
header->ssrc = htonl(packet_ssrc);
if(plen > 0) {
memcpy(&header->seq_number, payload, 2);
/* Finally, remove the original sequence number from the payload: move the whole
* payload back two bytes rather than shifting the header forward (avoid misaligned access) */
buflen -= 2;
plen -= 2;
memmove(payload, payload+2, plen);
header = (janus_rtp_header *)buf;
if(stream->rid_ext_id > 1 && stream->ridrtx_ext_id > 1) {
/* Replace the 'repaired' extension ID as well with the 'regular' one */
janus_rtp_header_extension_replace_id(buf, buflen, stream->ridrtx_ext_id, stream->rid_ext_id);
}
}
}
/* Check if we need to handle transport wide cc */
if(stream->do_transport_wide_cc) {
guint16 transport_seq_num;
/* Get transport wide seq num */
if(janus_rtp_header_extension_parse_transport_wide_cc(buf, buflen, stream->transport_wide_cc_ext_id, &transport_seq_num)==0) {
/* Get current timestamp */
struct timeval now;
gettimeofday(&now,0);
/* Create <seq num, time> pair */
janus_rtcp_transport_wide_cc_stats *stats = g_malloc0(sizeof(janus_rtcp_transport_wide_cc_stats));
/* Check if we have a sequence wrap */
if(transport_seq_num<0x0FFF && (stream->transport_wide_cc_last_seq_num&0xFFFF)>0xF000) {
/* Increase cycles */
stream->transport_wide_cc_cycles++;
}
/* Get extended value */
guint32 transport_ext_seq_num = stream->transport_wide_cc_cycles<<16 | transport_seq_num;
/* Store last received transport seq num */
stream->transport_wide_cc_last_seq_num = transport_seq_num;
/* Set stats values */
stats->transport_seq_num = transport_ext_seq_num;
stats->timestamp = (((guint64)now.tv_sec)*1E6+now.tv_usec);
/* Lock and append to received list */
janus_mutex_lock(&stream->mutex);
stream->transport_wide_received_seq_nums = g_slist_prepend(stream->transport_wide_received_seq_nums, stats);
janus_mutex_unlock(&stream->mutex);
}
}
if(video) {
/* Check if this packet is a duplicate: can happen with RFC4588 */
guint16 seqno = ntohs(header->seq_number);
int nstate = stream->rtx_nacked[vindex] ?
GPOINTER_TO_INT(g_hash_table_lookup(stream->rtx_nacked[vindex], GUINT_TO_POINTER(seqno))) : 0;
if(nstate == 1) {
/* Packet was NACKed and this is the first time we receive it: change state to received */
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Received NACKed packet %"SCNu16" (SSRC %"SCNu32", vindex %d)...\n",
handle->handle_id, seqno, packet_ssrc, vindex);
g_hash_table_insert(stream->rtx_nacked[vindex], GUINT_TO_POINTER(seqno), GUINT_TO_POINTER(2));
} else if(nstate == 2) {
/* We already received this packet: drop it */
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Detected duplicate packet %"SCNu16" (SSRC %"SCNu32", vindex %d)...\n",
handle->handle_id, seqno, packet_ssrc, vindex);
return;
} else if(rtx && nstate == 0) {
/* We received a retransmission for a packet we didn't NACK: drop it
* FIXME This seems to happen with Chrome when RFC4588 is enabled: in that case,
* Chrome sends the first packet ~8 times as a retransmission, probably to ensure
* we receive it, since the first packet cannot be NACKed (NACKs are triggered
* when there's a gap in between two packets, and the first doesn't have a reference)
* Rather than dropping, we should add a better check in the future */
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Got a retransmission for non-NACKed packet %"SCNu16" (SSRC %"SCNu32", vindex %d)...\n",
handle->handle_id, seqno, packet_ssrc, vindex);
return;
}
}
/* Backup the RTP header before passing it to the proper RTP switching context */
janus_rtp_header backup = *header;
if(!video) {
if(stream->audio_ssrc_peer_orig == 0)
stream->audio_ssrc_peer_orig = packet_ssrc;
janus_rtp_header_update(header, &stream->rtp_ctx[0], FALSE, 0);
header->ssrc = htonl(stream->audio_ssrc_peer_orig);
} else {
if(stream->video_ssrc_peer_orig[vindex] == 0)
stream->video_ssrc_peer_orig[vindex] = packet_ssrc;
janus_rtp_header_update(header, &stream->rtp_ctx[vindex], TRUE, 0);
header->ssrc = htonl(stream->video_ssrc_peer_orig[vindex]);
}
/* Keep track of payload types too */
if(!video && stream->audio_payload_type < 0) {
stream->audio_payload_type = header->type;
if(stream->audio_codec == NULL) {
const char *codec = janus_get_codec_from_pt(handle->local_sdp, stream->audio_payload_type);
if(codec != NULL)
stream->audio_codec = g_strdup(codec);
}
} else if(video && stream->video_payload_type < 0) {
stream->video_payload_type = header->type;
if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RFC4588_RTX) &&
stream->rtx_payload_types && g_hash_table_size(stream->rtx_payload_types) > 0) {
stream->video_rtx_payload_type = GPOINTER_TO_INT(g_hash_table_lookup(stream->rtx_payload_types, GINT_TO_POINTER(stream->video_payload_type)));
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Retransmissions will have payload type %d\n",
handle->handle_id, stream->video_rtx_payload_type);
}
if(stream->video_codec == NULL) {
const char *codec = janus_get_codec_from_pt(handle->local_sdp, stream->video_payload_type);
if(codec != NULL)
stream->video_codec = g_strdup(codec);
}
if(stream->video_is_keyframe == NULL && stream->video_codec != NULL) {
if(!strcasecmp(stream->video_codec, "vp8"))
stream->video_is_keyframe = &janus_vp8_is_keyframe;
else if(!strcasecmp(stream->video_codec, "vp9"))
stream->video_is_keyframe = &janus_vp9_is_keyframe;
else if(!strcasecmp(stream->video_codec, "h264"))
stream->video_is_keyframe = &janus_h264_is_keyframe;
else if(!strcasecmp(stream->video_codec, "av1"))
stream->video_is_keyframe = &janus_av1_is_keyframe;
else if(!strcasecmp(stream->video_codec, "h265"))
stream->video_is_keyframe = &janus_h265_is_keyframe;
}
}
/* Prepare the data to pass to the responsible plugin */
janus_plugin_rtp rtp = { .video = video, .buffer = buf, .length = buflen };
janus_plugin_rtp_extensions_reset(&rtp.extensions);
/* Parse RTP extensions before involving the plugin */
if(stream->audiolevel_ext_id != -1) {
gboolean vad = FALSE;
int level = -1;
if(janus_rtp_header_extension_parse_audio_level(buf, buflen,
stream->audiolevel_ext_id, &vad, &level) == 0) {
rtp.extensions.audio_level = level;
rtp.extensions.audio_level_vad = vad;
}
}
if(stream->videoorientation_ext_id != -1) {
gboolean c = FALSE, f = FALSE, r1 = FALSE, r0 = FALSE;
if(janus_rtp_header_extension_parse_video_orientation(buf, buflen,
stream->videoorientation_ext_id, &c, &f, &r1, &r0) == 0) {
rtp.extensions.video_rotation = 0;
if(r1 && r0)
rtp.extensions.video_rotation = 270;
else if(r1)
rtp.extensions.video_rotation = 180;
else if(r0)
rtp.extensions.video_rotation = 90;
rtp.extensions.video_back_camera = c;
rtp.extensions.video_flipped = f;
}
}
/* Pass the packet to the plugin */
janus_plugin *plugin = (janus_plugin *)handle->app;
if(plugin && plugin->incoming_rtp && handle->app_handle &&
!g_atomic_int_get(&handle->app_handle->stopped) &&
!g_atomic_int_get(&handle->destroyed))
plugin->incoming_rtp(handle->app_handle, &rtp);
/* Restore the header for the stats (plugins may have messed with it) */
*header = backup;
/* Update stats (overall data received, and data received in the last second) */
if(buflen > 0) {
gint64 now = janus_get_monotonic_time();
if(!video) {
if(component->in_stats.audio.bytes == 0 || component->in_stats.audio.notified_lastsec) {
/* We either received our first audio packet, or we started receiving it again after missing more than a second */
component->in_stats.audio.notified_lastsec = FALSE;
janus_ice_notify_media(handle, FALSE, TRUE);
}
/* Overall audio data */
component->in_stats.audio.packets++;
component->in_stats.audio.bytes += buflen;
/* Last second audio data */
if(component->in_stats.audio.updated == 0)
component->in_stats.audio.updated = now;
if(now > component->in_stats.audio.updated &&
now - component->in_stats.audio.updated >= G_USEC_PER_SEC) {
component->in_stats.audio.bytes_lastsec = component->in_stats.audio.bytes_lastsec_temp;
component->in_stats.audio.bytes_lastsec_temp = 0;
component->in_stats.audio.updated = now;
}
component->in_stats.audio.bytes_lastsec_temp += buflen;
} else {
if(component->in_stats.video[vindex].bytes == 0 || component->in_stats.video[vindex].notified_lastsec) {
/* We either received our first video packet, or we started receiving it again after missing more than a second */
component->in_stats.video[vindex].notified_lastsec = FALSE;
janus_ice_notify_media(handle, TRUE, TRUE);
}
/* Overall video data for this SSRC */
component->in_stats.video[vindex].packets++;
component->in_stats.video[vindex].bytes += buflen;
/* Last second video data for this SSRC */
if(component->in_stats.video[vindex].updated == 0)
component->in_stats.video[vindex].updated = now;
if(now > component->in_stats.video[vindex].updated &&
now - component->in_stats.video[vindex].updated >= G_USEC_PER_SEC) {
component->in_stats.video[vindex].bytes_lastsec = component->in_stats.video[vindex].bytes_lastsec_temp;
component->in_stats.video[vindex].bytes_lastsec_temp = 0;
component->in_stats.video[vindex].updated = now;
}
component->in_stats.video[vindex].bytes_lastsec_temp += buflen;
}
}
/* Update the RTCP context as well */
rtcp_context *rtcp_ctx = video ? stream->video_rtcp_ctx[vindex] : stream->audio_rtcp_ctx;
gboolean retransmissions_disabled = (!video && !component->do_audio_nacks) || (video && !component->do_video_nacks);
janus_rtcp_process_incoming_rtp(rtcp_ctx, buf, buflen,
(video && rtx) ? TRUE : FALSE,
(video && janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RFC4588_RTX)),
retransmissions_disabled, stream->clock_rates
);
/* Keep track of RTP sequence numbers, in case we need to NACK them */
/* Note: unsigned int overflow/underflow wraps (defined behavior) */
if(retransmissions_disabled) {
/* ... unless NACKs are disabled for this medium */
return;
}
guint16 new_seqn = ntohs(header->seq_number);
/* If this is video, check if this is a keyframe: if so, we empty our NACK queue */
if(video && stream->video_is_keyframe) {
if(stream->video_is_keyframe(payload, plen)) {
if(rtcp_ctx && (int16_t)(new_seqn - rtcp_ctx->max_seq_nr) > 0) {
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Keyframe received with a highest sequence number, resetting NACK queue\n", handle->handle_id);
janus_seq_list_free(&component->last_seqs_video[vindex]);
}
}
}
guint16 cur_seqn;
int last_seqs_len = 0;
janus_mutex_lock(&component->mutex);
janus_seq_info **last_seqs = video ? &component->last_seqs_video[vindex] : &component->last_seqs_audio;
janus_seq_info *cur_seq = *last_seqs;
if(cur_seq) {
cur_seq = cur_seq->prev;
cur_seqn = cur_seq->seq;
} else {
/* First seq, set up to add one seq */
cur_seqn = new_seqn - (guint16)1; /* Can wrap */
}
if(!janus_seq_in_range(new_seqn, cur_seqn, LAST_SEQS_MAX_LEN) &&
!janus_seq_in_range(cur_seqn, new_seqn, 1000)) {
/* Jump too big, start fresh */
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Big sequence number jump %hu -> %hu (%s stream #%d)\n",
handle->handle_id, cur_seqn, new_seqn, video ? "video" : "audio", vindex);
janus_seq_list_free(last_seqs);
cur_seq = NULL;
cur_seqn = new_seqn - (guint16)1;
}
GSList *nacks = NULL;
gint64 now = janus_get_monotonic_time();
if(janus_seq_in_range(new_seqn, cur_seqn, LAST_SEQS_MAX_LEN)) {
/* Add new seq objs forward */
while(cur_seqn != new_seqn) {
cur_seqn += (guint16)1; /* can wrap */
janus_seq_info *seq_obj = g_malloc0(sizeof(janus_seq_info));
seq_obj->seq = cur_seqn;
seq_obj->ts = now;
seq_obj->state = (cur_seqn == new_seqn) ? SEQ_RECVED : SEQ_MISSING;
janus_seq_append(last_seqs, seq_obj);
last_seqs_len++;
}
}
if(cur_seq) {
/* Scan old seq objs backwards */
while(cur_seq != NULL) {
last_seqs_len++;
if(cur_seq->seq == new_seqn) {
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Received missed sequence number %"SCNu16" (%s stream #%d)\n",
handle->handle_id, cur_seq->seq, video ? "video" : "audio", vindex);
cur_seq->state = SEQ_RECVED;
} else if(cur_seq->state == SEQ_MISSING && now - cur_seq->ts > SEQ_MISSING_WAIT) {
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Missed sequence number %"SCNu16" (%s stream #%d), sending 1st NACK\n",
handle->handle_id, cur_seq->seq, video ? "video" : "audio", vindex);
nacks = g_slist_prepend(nacks, GUINT_TO_POINTER(cur_seq->seq));
cur_seq->state = SEQ_NACKED;
if(video && janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RFC4588_RTX)) {
/* Keep track of this sequence number, we need to avoid duplicates */
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Tracking NACKed packet %"SCNu16" (SSRC %"SCNu32", vindex %d)...\n",
handle->handle_id, cur_seq->seq, packet_ssrc, vindex);
if(stream->rtx_nacked[vindex] == NULL)
stream->rtx_nacked[vindex] = g_hash_table_new(NULL, NULL);
g_hash_table_insert(stream->rtx_nacked[vindex], GUINT_TO_POINTER(cur_seq->seq), GINT_TO_POINTER(1));
/* We don't track it forever, though: add a timed source to remove it in a few seconds */
janus_ice_nacked_packet *np = g_malloc(sizeof(janus_ice_nacked_packet));
np->handle = handle;
np->seq_number = cur_seq->seq;
np->vindex = vindex;
if(stream->pending_nacked_cleanup == NULL)
stream->pending_nacked_cleanup = g_hash_table_new(NULL, NULL);
GSource *timeout_source = g_timeout_source_new_seconds(5);
g_source_set_callback(timeout_source, janus_ice_nacked_packet_cleanup, np, (GDestroyNotify)g_free);
np->source_id = g_source_attach(timeout_source, handle->mainctx);
g_source_unref(timeout_source);
g_hash_table_insert(stream->pending_nacked_cleanup, GUINT_TO_POINTER(np->source_id), timeout_source);
}
} else if(cur_seq->state == SEQ_NACKED && now - cur_seq->ts > SEQ_NACKED_WAIT) {
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Missed sequence number %"SCNu16" (%s stream #%d), sending 2nd NACK\n",
handle->handle_id, cur_seq->seq, video ? "video" : "audio", vindex);
nacks = g_slist_prepend(nacks, GUINT_TO_POINTER(cur_seq->seq));
cur_seq->state = SEQ_GIVEUP;
}
if(cur_seq == *last_seqs) {
/* Just processed head */
break;
}
cur_seq = cur_seq->prev;
}
}
while(last_seqs_len > LAST_SEQS_MAX_LEN) {
janus_seq_info *node = janus_seq_pop_head(last_seqs);
g_free(node);
last_seqs_len--;
}
guint nacks_count = g_slist_length(nacks);
if(nacks_count) {
/* Generate a NACK and send it */
JANUS_LOG(LOG_DBG, "[%"SCNu64"] Now sending NACK for %u missed packets (%s stream #%d)\n",
handle->handle_id, nacks_count, video ? "video" : "audio", vindex);
char nackbuf[120];
int res = janus_rtcp_nacks(nackbuf, sizeof(nackbuf), nacks);
if(res > 0) {
/* Set the right local and remote SSRC in the RTCP packet */
janus_rtcp_fix_ssrc(NULL, nackbuf, res, 1,
video ? stream->video_ssrc : stream->audio_ssrc,
video ? stream->video_ssrc_peer[vindex] : stream->audio_ssrc_peer);
janus_plugin_rtcp rtcp = { .video = video, .buffer = nackbuf, .length = res };
janus_ice_relay_rtcp_internal(handle, &rtcp, FALSE);
}
/* Update stats */
component->nack_sent_recent_cnt += nacks_count;
if(video) {
component->out_stats.video[vindex].nacks += nacks_count;
} else {
component->out_stats.audio.nacks += nacks_count;
}
}
if(component->nack_sent_recent_cnt &&
(now - component->nack_sent_log_ts) > 5*G_USEC_PER_SEC) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Sent NACKs for %u missing packets (%s stream #%d)\n",
handle->handle_id, component->nack_sent_recent_cnt, video ? "video" : "audio", vindex);
component->nack_sent_recent_cnt = 0;
component->nack_sent_log_ts = now;
}
janus_mutex_unlock(&component->mutex);
g_slist_free(nacks);
nacks = NULL;
}
}
return;
} else if(janus_is_rtcp(buf, len)) {
/* This is RTCP */
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Got an RTCP packet\n", handle->handle_id);
if(janus_is_webrtc_encryption_enabled() && (!component->dtls || !component->dtls->srtp_valid || !component->dtls->srtp_in)) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Missing valid SRTP session (packet arrived too early?), skipping...\n", handle->handle_id);
} else {
int buflen = len;
srtp_err_status_t res = janus_is_webrtc_encryption_enabled() ?
srtp_unprotect_rtcp(component->dtls->srtp_in, buf, &buflen) : srtp_err_status_ok;
if(res != srtp_err_status_ok) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] SRTCP unprotect error: %s (len=%d-->%d)\n", handle->handle_id, janus_srtp_error_str(res), len, buflen);
} else {
/* Do we need to dump this packet for debugging? */
if(g_atomic_int_get(&handle->dump_packets))
janus_text2pcap_dump(handle->text2pcap, JANUS_TEXT2PCAP_RTCP, TRUE, buf, buflen,
"[session=%"SCNu64"][handle=%"SCNu64"]", session->session_id, handle->handle_id);
/* Check if there's an RTCP BYE: in case, let's log it */
if(janus_rtcp_has_bye(buf, buflen)) {
/* Note: we used to use this as a trigger to close the PeerConnection, but not anymore
* Discussion here, https://groups.google.com/forum/#!topic/meetecho-janus/4XtfbYB7Jvc */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Got RTCP BYE on stream %u (component %u)\n", handle->handle_id, stream->stream_id, component->component_id);
}
/* Is this audio or video? */
int video = 0, vindex = 0;
if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RFC4588_RTX)) {
janus_rtcp_swap_report_blocks(buf, buflen, stream->video_ssrc_rtx);
}
/* Bundled streams, should we check the SSRCs? */
if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_AUDIO)) {
/* No audio has been negotiated, definitely video */
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Incoming RTCP, bundling: this is video (no audio has been negotiated)\n", handle->handle_id);
video = 1;
} else if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_VIDEO)) {
/* No video has been negotiated, definitely audio */
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Incoming RTCP, bundling: this is audio (no video has been negotiated)\n", handle->handle_id);
video = 0;
} else {
if(stream->audio_ssrc_peer == 0 || stream->video_ssrc_peer[0] == 0) {
/* We don't know the remote SSRC: this can happen for recvonly clients
* (see https://groups.google.com/forum/#!topic/discuss-webrtc/5yuZjV7lkNc)
* Check the local SSRC, compare it to what we have */
guint32 rtcp_ssrc = janus_rtcp_get_receiver_ssrc(buf, buflen);
if(rtcp_ssrc == 0) {
/* No SSRC, maybe an empty RR? */
return;
}
if(rtcp_ssrc == stream->audio_ssrc) {
video = 0;
} else if(rtcp_ssrc == stream->video_ssrc) {
video = 1;
} else if(rtcp_ssrc == stream->video_ssrc_rtx) {
/* rtx SSRC, we don't care */
return;
} else if(janus_rtcp_has_fir(buf, buflen) || janus_rtcp_has_pli(buf, buflen) || janus_rtcp_get_remb(buf, buflen)) {
/* Mh, no SR or RR? Try checking if there's any FIR, PLI or REMB */
video = 1;
} else {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Dropping RTCP packet with unknown SSRC (%"SCNu32")\n", handle->handle_id, rtcp_ssrc);
return;
}
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Incoming RTCP, bundling: this is %s (local SSRC: video=%"SCNu32", audio=%"SCNu32", got %"SCNu32")\n",
handle->handle_id, video ? "video" : "audio", stream->video_ssrc, stream->audio_ssrc, rtcp_ssrc);
} else {
/* Check the remote SSRC, compare it to what we have: in case
* we're simulcasting, let's compare to the other SSRCs too */
guint32 rtcp_ssrc = janus_rtcp_get_sender_ssrc(buf, buflen);
if(rtcp_ssrc == 0) {
/* No SSRC, maybe an empty RR? */
return;
}
if(rtcp_ssrc == stream->audio_ssrc_peer) {
video = 0;
} else if(rtcp_ssrc == stream->video_ssrc_peer[0]) {
video = 1;
} else if(stream->video_ssrc_peer[1] && rtcp_ssrc == stream->video_ssrc_peer[1]) {
video = 1;
vindex = 1;
} else if(stream->video_ssrc_peer[2] && rtcp_ssrc == stream->video_ssrc_peer[2]) {
video = 1;
vindex = 2;
} else {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Dropping RTCP packet with unknown SSRC (%"SCNu32")\n", handle->handle_id, rtcp_ssrc);
return;
}
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Incoming RTCP, bundling: this is %s (remote SSRC: video=%"SCNu32" #%d, audio=%"SCNu32", got %"SCNu32")\n",
handle->handle_id, video ? "video" : "audio", stream->video_ssrc_peer[vindex], vindex, stream->audio_ssrc_peer, rtcp_ssrc);
}
}
/* Let's process this RTCP (compound?) packet, and update the RTCP context for this stream in case */
rtcp_context *rtcp_ctx = video ? stream->video_rtcp_ctx[vindex] : stream->audio_rtcp_ctx;
uint32_t rtt = rtcp_ctx ? rtcp_ctx->rtt : 0;
if(janus_rtcp_parse(rtcp_ctx, buf, buflen) < 0) {
/* Drop the packet if the parsing function returns with an error */
return;
}
if(rtcp_ctx && rtcp_ctx->rtt != rtt) {
/* Check the current RTT, to see if we need to update the size of the queue: we take
* the highest RTT (audio or video) and add 100ms just to be conservative */
uint32_t audio_rtt = janus_rtcp_context_get_rtt(stream->audio_rtcp_ctx),
video_rtt = janus_rtcp_context_get_rtt(stream->video_rtcp_ctx[0]);
uint16_t nack_queue_ms = (audio_rtt > video_rtt ? audio_rtt : video_rtt) + 100;
if(nack_queue_ms > DEFAULT_MAX_NACK_QUEUE)
nack_queue_ms = DEFAULT_MAX_NACK_QUEUE;
else if(nack_queue_ms < min_nack_queue)
nack_queue_ms = min_nack_queue;
uint16_t mavg = rtt ? ((7*stream->nack_queue_ms + nack_queue_ms)/8) : nack_queue_ms;
if(mavg > DEFAULT_MAX_NACK_QUEUE)
mavg = DEFAULT_MAX_NACK_QUEUE;
else if(mavg < min_nack_queue)
mavg = min_nack_queue;
stream->nack_queue_ms = mavg;
}
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Got %s RTCP (%d bytes)\n", handle->handle_id, video ? "video" : "audio", buflen);
/* Now let's see if there are any NACKs to handle */
gint64 now = janus_get_monotonic_time();
GSList *nacks = janus_rtcp_get_nacks(buf, buflen);
guint nacks_count = g_slist_length(nacks);
if(nacks_count && ((!video && component->do_audio_nacks) || (video && component->do_video_nacks))) {
/* Handle NACK */
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Just got some NACKS (%d) we should handle...\n", handle->handle_id, nacks_count);
GHashTable *retransmit_seqs = (video ? component->video_retransmit_seqs : component->audio_retransmit_seqs);
GSList *list = (retransmit_seqs != NULL ? nacks : NULL);
int retransmits_cnt = 0;
janus_mutex_lock(&component->mutex);
while(list) {
unsigned int seqnr = GPOINTER_TO_UINT(list->data);
JANUS_LOG(LOG_DBG, "[%"SCNu64"] >> %u\n", handle->handle_id, seqnr);
int in_rb = 0;
/* Check if we have the packet */
janus_rtp_packet *p = g_hash_table_lookup(retransmit_seqs, GUINT_TO_POINTER(seqnr));
if(p == NULL) {
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] >> >> Can't retransmit packet %u, we don't have it...\n", handle->handle_id, seqnr);
} else {
/* Should we retransmit this packet? */
if((p->last_retransmit > 0) && (now-p->last_retransmit < MAX_NACK_IGNORE)) {
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] >> >> Packet %u was retransmitted just %"SCNi64"ms ago, skipping\n", handle->handle_id, seqnr, now-p->last_retransmit);
list = list->next;
continue;
}
in_rb = 1;
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] >> >> Scheduling %u for retransmission due to NACK\n", handle->handle_id, seqnr);
p->last_retransmit = now;
retransmits_cnt++;
/* Enqueue it */
janus_ice_queued_packet *pkt = g_malloc(sizeof(janus_ice_queued_packet));
pkt->data = g_malloc(p->length+SRTP_MAX_TAG_LEN);
memcpy(pkt->data, p->data, p->length);
pkt->length = p->length;
pkt->type = video ? JANUS_ICE_PACKET_VIDEO : JANUS_ICE_PACKET_AUDIO;
pkt->control = FALSE;
pkt->retransmission = TRUE;
pkt->label = NULL;
pkt->protocol = NULL;
pkt->added = janus_get_monotonic_time();
/* What to send and how depends on whether we're doing RFC4588 or not */
if(!video || !janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RFC4588_RTX)) {
/* We're not: just clarify the packet was already encrypted before */
pkt->encrypted = TRUE;
} else {
/* We are: overwrite the RTP header (which means we'll need a new SRTP encrypt) */
pkt->encrypted = FALSE;
janus_rtp_header *header = (janus_rtp_header *)pkt->data;
header->type = stream->video_rtx_payload_type;
header->ssrc = htonl(stream->video_ssrc_rtx);
component->rtx_seq_number++;
header->seq_number = htons(component->rtx_seq_number);
}
if(handle->queued_packets != NULL) {
#if GLIB_CHECK_VERSION(2, 46, 0)
g_async_queue_push_front(handle->queued_packets, pkt);
#else
g_async_queue_push(handle->queued_packets, pkt);
#endif
g_main_context_wakeup(handle->mainctx);
}
}
if(rtcp_ctx != NULL && in_rb) {
g_atomic_int_inc(&rtcp_ctx->nack_count);
}
list = list->next;
}
component->retransmit_recent_cnt += retransmits_cnt;
/* FIXME Remove the NACK compound packet, we've handled it */
buflen = janus_rtcp_remove_nacks(buf, buflen);
/* Update stats */
if(video) {
component->in_stats.video[vindex].nacks += nacks_count;
} else {
component->in_stats.audio.nacks += nacks_count;
}
janus_mutex_unlock(&component->mutex);
g_slist_free(nacks);
nacks = NULL;
}
if(component->retransmit_recent_cnt &&
now - component->retransmit_log_ts > 5*G_USEC_PER_SEC) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Retransmitted %u packets due to NACK (%s stream #%d)\n",
handle->handle_id, component->retransmit_recent_cnt, video ? "video" : "audio", vindex);
component->retransmit_recent_cnt = 0;
component->retransmit_log_ts = now;
}
/* Fix packet data for RTCP SR and RTCP RR */
janus_rtp_switching_context *rtp_ctx = video ? &stream->rtp_ctx[vindex] : &stream->rtp_ctx[0];
uint32_t base_ts = video ? rtp_ctx->v_base_ts : rtp_ctx->a_base_ts;
uint32_t base_ts_prev = video ? rtp_ctx->v_base_ts_prev : rtp_ctx->a_base_ts_prev;
uint32_t ssrc_peer = video ? stream->video_ssrc_peer_orig[vindex] : stream->audio_ssrc_peer_orig;
uint32_t ssrc_local = video ? stream->video_ssrc : stream->audio_ssrc;
uint32_t ssrc_expected = video ? rtp_ctx->v_last_ssrc : rtp_ctx->a_last_ssrc;
if (janus_rtcp_fix_report_data(buf, buflen, base_ts, base_ts_prev, ssrc_peer, ssrc_local, ssrc_expected, video) < 0) {
/* Drop packet in case of parsing error or SSRC different from the one expected. */
/* This might happen at the very beginning of the communication or early after */
/* a re-negotation has been concluded. */
return;
}
janus_plugin_rtcp rtcp = { .video = video, .buffer = buf, .length = buflen };
janus_plugin *plugin = (janus_plugin *)handle->app;
if(plugin && plugin->incoming_rtcp && handle->app_handle &&
!g_atomic_int_get(&handle->app_handle->stopped) &&
!g_atomic_int_get(&handle->destroyed))
plugin->incoming_rtcp(handle->app_handle, &rtcp);
}
}
return;
} else {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Not RTP and not RTCP... may these be data channels?\n", handle->handle_id);
janus_dtls_srtp_incoming_msg(component->dtls, buf, len);
/* Update stats (only overall data received) */
if(len > 0) {
component->in_stats.data.packets++;
component->in_stats.data.bytes += len;
}
return;
}
}
void janus_ice_incoming_data(janus_ice_handle *handle, char *label, char *protocol, gboolean textdata, char *buffer, int length) {
if(handle == NULL || buffer == NULL || length <= 0)
return;
janus_plugin_data data = { .label = label, .binary = !textdata, .buffer = buffer, .length = length };
janus_plugin *plugin = (janus_plugin *)handle->app;
if(plugin && plugin->incoming_data && handle->app_handle &&
!g_atomic_int_get(&handle->app_handle->stopped) &&
!g_atomic_int_get(&handle->destroyed))
plugin->incoming_data(handle->app_handle, &data);
}
/* Helper: encoding local candidates to string/SDP */
static int janus_ice_candidate_to_string(janus_ice_handle *handle, NiceCandidate *c, char *buffer, int buflen, gboolean log_candidate, gboolean force_private, guint public_ip_index) {
if(!handle || !handle->agent || !c || !buffer || buflen < 1)
return -1;
janus_ice_stream *stream = handle->stream;
if(!stream)
return -2;
janus_ice_component *component = stream->component;
if(!component)
return -3;
char *host_ip = NULL;
if(nat_1_1_enabled && !force_private) {
/* A 1:1 NAT mapping was specified, either overwrite all the host addresses with the public IP, or add new candidates */
host_ip = janus_get_public_ip(public_ip_index);
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Public IP specified and 1:1 NAT mapping enabled (%s), using that as host address in the candidates\n", handle->handle_id, host_ip);
}
/* Encode the candidate to a string */
gchar address[NICE_ADDRESS_STRING_LEN], base_address[NICE_ADDRESS_STRING_LEN];
gint port = 0, base_port = 0;
nice_address_to_string(&(c->addr), (gchar *)&address);
port = nice_address_get_port(&(c->addr));
nice_address_to_string(&(c->base_addr), (gchar *)&base_address);
base_port = nice_address_get_port(&(c->base_addr));
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Address: %s:%d\n", handle->handle_id, address, port);
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Priority: %d\n", handle->handle_id, c->priority);
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Foundation: %s\n", handle->handle_id, c->foundation);
/* Start */
if(c->type == NICE_CANDIDATE_TYPE_HOST) {
/* 'host' candidate */
if(c->transport == NICE_CANDIDATE_TRANSPORT_UDP) {
g_snprintf(buffer, buflen,
"%s %d %s %d %s %d typ host",
c->foundation, c->component_id,
"udp", c->priority,
host_ip ? host_ip : address, port);
} else {
if(!janus_ice_tcp_enabled) {
/* ICE-TCP support disabled */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Skipping host TCP candidate, ICE-TCP support disabled...\n", handle->handle_id);
return -4;
}
#ifndef HAVE_LIBNICE_TCP
/* TCP candidates are only supported since libnice 0.1.8 */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Skipping host TCP candidate, the libnice version doesn't support it...\n", handle->handle_id);
return -4;
#else
const char *type = NULL;
switch(c->transport) {
case NICE_CANDIDATE_TRANSPORT_TCP_ACTIVE:
type = "active";
break;
case NICE_CANDIDATE_TRANSPORT_TCP_PASSIVE:
type = "passive";
break;
case NICE_CANDIDATE_TRANSPORT_TCP_SO:
type = "so";
break;
default:
break;
}
if(type == NULL) {
/* FIXME Unsupported transport */
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Unsupported transport, skipping non-UDP/TCP host candidate...\n", handle->handle_id);
return -5;
}
g_snprintf(buffer, buflen,
"%s %d %s %d %s %d typ host tcptype %s",
c->foundation, c->component_id,
"tcp", c->priority,
host_ip ? host_ip : address, port, type);
#endif
}
} else if(c->type == NICE_CANDIDATE_TYPE_SERVER_REFLEXIVE ||
c->type == NICE_CANDIDATE_TYPE_PEER_REFLEXIVE ||
c->type == NICE_CANDIDATE_TYPE_RELAYED) {
/* 'srflx', 'prflx', or 'relay' candidate: what is this, exactly? */
const char *ltype = NULL;
switch(c->type) {
case NICE_CANDIDATE_TYPE_SERVER_REFLEXIVE:
ltype = "srflx";
break;
case NICE_CANDIDATE_TYPE_PEER_REFLEXIVE:
ltype = "prflx";
break;
case NICE_CANDIDATE_TYPE_RELAYED:
ltype = "relay";
break;
default:
break;
}
if(ltype == NULL)
return -5;
if(c->transport == NICE_CANDIDATE_TRANSPORT_UDP) {
nice_address_to_string(&(c->base_addr), (gchar *)&base_address);
gint base_port = nice_address_get_port(&(c->base_addr));
g_snprintf(buffer, buflen,
"%s %d %s %d %s %d typ %s raddr %s rport %d",
c->foundation, c->component_id,
"udp", c->priority,
address, port, ltype,
base_address, base_port);
} else {
if(!janus_ice_tcp_enabled) {
/* ICE-TCP support disabled */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Skipping srflx TCP candidate, ICE-TCP support disabled...\n", handle->handle_id);
return -4;
}
#ifndef HAVE_LIBNICE_TCP
/* TCP candidates are only supported since libnice 0.1.8 */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Skipping srflx TCP candidate, the libnice version doesn't support it...\n", handle->handle_id);
return -4;
#else
const char *type = NULL;
switch(c->transport) {
case NICE_CANDIDATE_TRANSPORT_TCP_ACTIVE:
type = "active";
break;
case NICE_CANDIDATE_TRANSPORT_TCP_PASSIVE:
type = "passive";
break;
case NICE_CANDIDATE_TRANSPORT_TCP_SO:
type = "so";
break;
default:
break;
}
if(type == NULL) {
/* FIXME Unsupported transport */
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Unsupported transport, skipping non-UDP/TCP srflx candidate...\n", handle->handle_id);
return -5;
} else {
g_snprintf(buffer, buflen,
"%s %d %s %d %s %d typ %s raddr %s rport %d tcptype %s",
c->foundation, c->component_id,
"tcp", c->priority,
address, port, ltype,
base_address, base_port, type);
}
#endif
}
}
JANUS_LOG(LOG_VERB, "[%"SCNu64"] %s\n", handle->handle_id, buffer);
if(log_candidate) {
/* Save for the summary, in case we need it */
component->local_candidates = g_slist_append(component->local_candidates, g_strdup(buffer));
/* Notify event handlers */
if(janus_events_is_enabled()) {
janus_session *session = (janus_session *)handle->session;
json_t *info = json_object();
json_object_set_new(info, "local-candidate", json_string(buffer));
json_object_set_new(info, "stream_id", json_integer(stream->stream_id));
json_object_set_new(info, "component_id", json_integer(component->component_id));
janus_events_notify_handlers(JANUS_EVENT_TYPE_WEBRTC, JANUS_EVENT_SUBTYPE_WEBRTC_LCAND,
session->session_id, handle->handle_id, handle->opaque_id, info);
}
}
return 0;
}
void janus_ice_candidates_to_sdp(janus_ice_handle *handle, janus_sdp_mline *mline, guint stream_id, guint component_id) {
if(!handle || !handle->agent || !mline)
return;
janus_ice_stream *stream = handle->stream;
if(!stream || stream->stream_id != stream_id) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] No stream %d??\n", handle->handle_id, stream_id);
return;
}
janus_ice_component *component = stream->component;
if(!component || component->component_id != component_id) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] No component %d in stream %d??\n", handle->handle_id, component_id, stream_id);
return;
}
NiceAgent *agent = handle->agent;
/* Iterate on all */
gchar buffer[200];
GSList *candidates, *i;
candidates = nice_agent_get_local_candidates (agent, stream_id, component_id);
JANUS_LOG(LOG_VERB, "[%"SCNu64"] We have %d candidates for Stream #%d, Component #%d\n", handle->handle_id, g_slist_length(candidates), stream_id, component_id);
gboolean log_candidates = (component->local_candidates == NULL);
for(i = candidates; i; i = i->next) {
NiceCandidate *c = (NiceCandidate *) i->data;
guint public_ip_index = 0;
do {
if(janus_ice_candidate_to_string(handle, c, buffer, sizeof(buffer), log_candidates, FALSE, public_ip_index) == 0) {
/* Candidate encoded, add to the SDP (but only if it's not a 'prflx') */
if(c->type == NICE_CANDIDATE_TYPE_PEER_REFLEXIVE) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Skipping prflx candidate...\n", handle->handle_id);
} else {
janus_sdp_attribute *a = janus_sdp_attribute_create("candidate", "%s", buffer);
mline->attributes = g_list_append(mline->attributes, a);
if(nat_1_1_enabled && keep_private_host && public_ip_index == 0 &&
janus_ice_candidate_to_string(handle, c, buffer, sizeof(buffer), log_candidates, TRUE, public_ip_index) == 0) {
/* Candidate with private host encoded, add to the SDP (but only if it's not a 'prflx') */
if(c->type == NICE_CANDIDATE_TYPE_PEER_REFLEXIVE) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Skipping prflx candidate...\n", handle->handle_id);
} else {
janus_sdp_attribute *a = janus_sdp_attribute_create("candidate", "%s", buffer);
mline->attributes = g_list_append(mline->attributes, a);
}
}
}
}
public_ip_index++;
} while (public_ip_index < janus_get_public_ip_count());
nice_candidate_free(c);
}
/* Done */
g_slist_free(candidates);
}
void janus_ice_add_remote_candidate(janus_ice_handle *handle, NiceCandidate *c) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Queueing candidate %p\n", handle->handle_id, c);
if(handle->queued_candidates != NULL)
g_async_queue_push(handle->queued_candidates, c);
if(handle->queued_packets != NULL) {
#if GLIB_CHECK_VERSION(2, 46, 0)
g_async_queue_push_front(handle->queued_packets, &janus_ice_add_candidates);
#else
g_async_queue_push(handle->queued_packets, &janus_ice_add_candidates);
#endif
g_main_context_wakeup(handle->mainctx);
}
}
void janus_ice_setup_remote_candidates(janus_ice_handle *handle, guint stream_id, guint component_id) {
if(!handle || !handle->agent)
return;
janus_ice_stream *stream = handle->stream;
if(!stream || stream->stream_id != stream_id) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] No such stream %d: cannot setup remote candidates for component %d\n", handle->handle_id, stream_id, component_id);
return;
}
janus_ice_component *component = stream->component;
if(!component || component->component_id != component_id) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] No such component %d in stream %d: cannot setup remote candidates\n", handle->handle_id, component_id, stream_id);
return;
}
if(component->process_started) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Component %d in stream %d has already been set up\n", handle->handle_id, component_id, stream_id);
return;
}
if(!component->candidates || !component->candidates->data) {
if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_TRICKLE)
|| janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALL_TRICKLES)) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] No remote candidates for component %d in stream %d: was the remote SDP parsed?\n", handle->handle_id, component_id, stream_id);
}
return;
}
JANUS_LOG(LOG_VERB, "[%"SCNu64"] ## Setting remote candidates: stream %d, component %d (%u in the list)\n",
handle->handle_id, stream_id, component_id, g_slist_length(component->candidates));
/* Add all candidates */
NiceCandidate *c = NULL;
GSList *gsc = component->candidates;
while(gsc) {
c = (NiceCandidate *) gsc->data;
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Queueing candidate %p (startup)\n", handle->handle_id, c);
if(handle->queued_candidates != NULL)
g_async_queue_push(handle->queued_candidates, c);
gsc = gsc->next;
}
if(handle->queued_packets != NULL) {
#if GLIB_CHECK_VERSION(2, 46, 0)
g_async_queue_push_front(handle->queued_packets, &janus_ice_add_candidates);
#else
g_async_queue_push(handle->queued_packets, &janus_ice_add_candidates);
#endif
g_main_context_wakeup(handle->mainctx);
}
component->process_started = TRUE;
}
int janus_ice_setup_local(janus_ice_handle *handle, int offer, int audio, int video, int data, int trickle) {
if(!handle)
return -1;
if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_AGENT)) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Agent already exists?\n", handle->handle_id);
return -2;
}
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Setting ICE locally: got %s (%d audios, %d videos)\n", handle->handle_id, offer ? "OFFER" : "ANSWER", audio, video);
g_atomic_int_set(&handle->closepc, 0);
janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_AGENT);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_START);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_NEGOTIATED);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_READY);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_STOP);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_CLEANING);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_AUDIO);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_VIDEO);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ICE_RESTART);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RESEND_TRICKLES);
/* Note: in case this is not an OFFER, we don't know whether any medium are supported on the other side or not yet */
if(audio) {
janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_AUDIO);
} else {
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_AUDIO);
}
if(video) {
janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_VIDEO);
} else {
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_VIDEO);
}
if(data) {
janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_DATA_CHANNELS);
} else {
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_DATA_CHANNELS);
}
/* Note: in case this is not an OFFER, we don't know whether ICE trickling is supported on the other side or not yet */
if(offer && trickle) {
janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_TRICKLE);
} else {
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_TRICKLE);
}
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALL_TRICKLES);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_TRICKLE_SYNCED);
/* Note: NICE_COMPATIBILITY_RFC5245 is only available in more recent versions of libnice */
handle->controlling = janus_ice_lite_enabled ? FALSE : !offer;
JANUS_LOG(LOG_INFO, "[%"SCNu64"] Creating ICE agent (ICE %s mode, %s)\n", handle->handle_id,
janus_ice_lite_enabled ? "Lite" : "Full", handle->controlling ? "controlling" : "controlled");
handle->agent = g_object_new(NICE_TYPE_AGENT,
"compatibility", NICE_COMPATIBILITY_DRAFT19,
"main-context", handle->mainctx,
"reliable", FALSE,
"full-mode", janus_ice_lite_enabled ? FALSE : TRUE,
#ifdef HAVE_LIBNICE_TCP
"ice-udp", TRUE,
"ice-tcp", janus_ice_tcp_enabled ? TRUE : FALSE,
#endif
NULL);
handle->agent_created = janus_get_monotonic_time();
handle->srtp_errors_count = 0;
handle->last_srtp_error = 0;
/* Any STUN server to use? */
if(janus_stun_server != NULL && janus_stun_port > 0) {
g_object_set(G_OBJECT(handle->agent),
"stun-server", janus_stun_server,
"stun-server-port", janus_stun_port,
NULL);
}
/* Any dynamic TURN credentials to retrieve via REST API? */
gboolean have_turnrest_credentials = FALSE;
#ifdef HAVE_TURNRESTAPI
/* When using the TURN REST API, we use the handle's opaque_id as a username
* by default, and fall back to the session_id when it's missing. Refer to this
* issue for more context: https://github.com/meetecho/janus-gateway/issues/2199 */
char turnrest_username[20];
if(handle->opaque_id == NULL) {
janus_session *session = (janus_session *)handle->session;
g_snprintf(turnrest_username, sizeof(turnrest_username), "%"SCNu64, session->session_id);
}
janus_turnrest_response *turnrest_credentials = janus_turnrest_request((const char *)(handle->opaque_id ?
handle->opaque_id : turnrest_username));
if(turnrest_credentials != NULL) {
have_turnrest_credentials = TRUE;
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Got credentials from the TURN REST API backend!\n", handle->handle_id);
JANUS_LOG(LOG_HUGE, " -- Username: %s\n", turnrest_credentials->username);
JANUS_LOG(LOG_HUGE, " -- Password: %s\n", turnrest_credentials->password);
JANUS_LOG(LOG_HUGE, " -- TTL: %"SCNu32"\n", turnrest_credentials->ttl);
JANUS_LOG(LOG_HUGE, " -- Servers: %d\n", g_list_length(turnrest_credentials->servers));
GList *server = turnrest_credentials->servers;
while(server != NULL) {
janus_turnrest_instance *instance = (janus_turnrest_instance *)server->data;
JANUS_LOG(LOG_HUGE, " -- -- URI: %s:%"SCNu16" (%d)\n", instance->server, instance->port, instance->transport);
server = server->next;
}
}
#endif
g_object_set(G_OBJECT(handle->agent), "upnp", FALSE, NULL);
g_object_set(G_OBJECT(handle->agent), "controlling-mode", handle->controlling, NULL);
g_signal_connect (G_OBJECT (handle->agent), "candidate-gathering-done",
G_CALLBACK (janus_ice_cb_candidate_gathering_done), handle);
g_signal_connect (G_OBJECT (handle->agent), "component-state-changed",
G_CALLBACK (janus_ice_cb_component_state_changed), handle);
#ifndef HAVE_LIBNICE_TCP
g_signal_connect (G_OBJECT (handle->agent), "new-selected-pair",
#else
g_signal_connect (G_OBJECT (handle->agent), "new-selected-pair-full",
#endif
G_CALLBACK (janus_ice_cb_new_selected_pair), handle);
if(janus_full_trickle_enabled) {
#ifndef HAVE_LIBNICE_TCP
g_signal_connect (G_OBJECT (handle->agent), "new-candidate",
#else
g_signal_connect (G_OBJECT (handle->agent), "new-candidate-full",
#endif
G_CALLBACK (janus_ice_cb_new_local_candidate), handle);
}
#ifndef HAVE_LIBNICE_TCP
g_signal_connect (G_OBJECT (handle->agent), "new-remote-candidate",
#else
g_signal_connect (G_OBJECT (handle->agent), "new-remote-candidate-full",
#endif
G_CALLBACK (janus_ice_cb_new_remote_candidate), handle);
/* Add all local addresses, except those in the ignore list */
struct ifaddrs *ifaddr, *ifa;
int family, s, n;
char host[NI_MAXHOST];
if(getifaddrs(&ifaddr) == -1) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Error getting list of interfaces... %d (%s)\n",
handle->handle_id, errno, strerror(errno));
} else {
for(ifa = ifaddr, n = 0; ifa != NULL; ifa = ifa->ifa_next, n++) {
if(ifa->ifa_addr == NULL)
continue;
/* Skip interfaces which are not up and running */
if(!((ifa->ifa_flags & IFF_UP) && (ifa->ifa_flags & IFF_RUNNING)))
continue;
/* Skip loopback interfaces */
if(ifa->ifa_flags & IFF_LOOPBACK)
continue;
family = ifa->ifa_addr->sa_family;
if(family != AF_INET && family != AF_INET6)
continue;
/* We only add IPv6 addresses if support for them has been explicitly enabled (still WIP, mostly) */
if(family == AF_INET6 && !janus_ipv6_enabled)
continue;
/* Check the interface name first, we can ignore that as well: enforce list would be checked later */
if(janus_ice_enforce_list == NULL && ifa->ifa_name != NULL && janus_ice_is_ignored(ifa->ifa_name))
continue;
s = getnameinfo(ifa->ifa_addr,
(family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6),
host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
if(s != 0) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] getnameinfo() failed: %s\n", handle->handle_id, gai_strerror(s));
continue;
}
/* Skip 0.0.0.0, :: and local scoped addresses */
if(!strcmp(host, "0.0.0.0") || !strcmp(host, "::") || !strncmp(host, "fe80:", 5))
continue;
/* Check if this IP address is in the ignore/enforce list, now: the enforce list has the precedence */
if(janus_ice_enforce_list != NULL) {
if(ifa->ifa_name != NULL && !janus_ice_is_enforced(ifa->ifa_name) && !janus_ice_is_enforced(host))
continue;
} else {
if(janus_ice_is_ignored(host))
continue;
}
/* Ok, add interface to the ICE agent */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Adding %s to the addresses to gather candidates for\n", handle->handle_id, host);
NiceAddress addr_local;
nice_address_init (&addr_local);
if(!nice_address_set_from_string (&addr_local, host)) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Skipping invalid address %s\n", handle->handle_id, host);
continue;
}
nice_agent_add_local_address (handle->agent, &addr_local);
}
freeifaddrs(ifaddr);
}
handle->cdone = 0;
handle->stream_id = 0;
/* If this is our first offer, let's generate some mids */
if(!offer) {
if(audio) {
if(handle->audio_mid == NULL)
handle->audio_mid = g_strdup("audio");
if(handle->stream_mid == NULL)
handle->stream_mid = handle->audio_mid;
}
if(video) {
if(handle->video_mid == NULL)
handle->video_mid = g_strdup("video");
if(handle->stream_mid == NULL)
handle->stream_mid = handle->video_mid;
}
#ifdef HAVE_SCTP
if(data) {
if(handle->data_mid == NULL)
handle->data_mid = g_strdup("data");
if(handle->stream_mid == NULL)
handle->stream_mid = handle->data_mid;
}
#endif
}
/* Now create an ICE stream for all the media we'll handle */
handle->stream_id = nice_agent_add_stream(handle->agent, 1);
if(dscp_ef > 0) {
/* A DSCP value was configured, shift it and pass it to libnice as a TOS */
nice_agent_set_stream_tos(handle->agent, handle->stream_id, dscp_ef << 2);
}
janus_ice_stream *stream = g_malloc0(sizeof(janus_ice_stream));
janus_refcount_init(&stream->ref, janus_ice_stream_free);
janus_refcount_increase(&handle->ref);
stream->stream_id = handle->stream_id;
stream->handle = handle;
stream->audio_payload_type = -1;
stream->video_payload_type = -1;
stream->video_rtx_payload_type = -1;
stream->nack_queue_ms = min_nack_queue;
/* FIXME By default, if we're being called we're DTLS clients, but this may be changed by ICE... */
stream->dtls_role = offer ? JANUS_DTLS_ROLE_CLIENT : JANUS_DTLS_ROLE_ACTPASS;
if(audio) {
stream->audio_ssrc = janus_random_uint32(); /* FIXME Should we look for conflicts? */
stream->audio_rtcp_ctx = g_malloc0(sizeof(janus_rtcp_context));
stream->audio_rtcp_ctx->tb = 48000; /* May change later */
}
if(video) {
stream->video_ssrc = janus_random_uint32(); /* FIXME Should we look for conflicts? */
if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RFC4588_RTX)) {
/* Create an SSRC for RFC4588 as well */
stream->video_ssrc_rtx = janus_random_uint32(); /* FIXME Should we look for conflicts? */
}
stream->video_rtcp_ctx[0] = g_malloc0(sizeof(janus_rtcp_context));
stream->video_rtcp_ctx[0]->tb = 90000;
}
janus_mutex_init(&stream->mutex);
if(!have_turnrest_credentials) {
/* No TURN REST API server and credentials, any static ones? */
if(janus_turn_server != NULL) {
/* We need relay candidates as well */
gboolean ok = nice_agent_set_relay_info(handle->agent, handle->stream_id, 1,
janus_turn_server, janus_turn_port, janus_turn_user, janus_turn_pwd, janus_turn_type);
if(!ok) {
JANUS_LOG(LOG_WARN, "Could not set TURN server, is the address correct? (%s:%"SCNu16")\n",
janus_turn_server, janus_turn_port);
}
}
#ifdef HAVE_TURNRESTAPI
} else {
/* We need relay candidates as well: add all those we got */
GList *server = turnrest_credentials->servers;
while(server != NULL) {
janus_turnrest_instance *instance = (janus_turnrest_instance *)server->data;
gboolean ok = nice_agent_set_relay_info(handle->agent, handle->stream_id, 1,
instance->server, instance->port,
turnrest_credentials->username, turnrest_credentials->password,
instance->transport);
if(!ok) {
JANUS_LOG(LOG_WARN, "Could not set TURN server, is the address correct? (%s:%"SCNu16")\n",
instance->server, instance->port);
}
server = server->next;
}
#endif
}
handle->stream = stream;
janus_ice_component *component = g_malloc0(sizeof(janus_ice_component));
janus_refcount_init(&component->ref, janus_ice_component_free);
component->stream = stream;
janus_refcount_increase(&stream->ref);
component->stream_id = stream->stream_id;
component->component_id = 1;
janus_mutex_init(&component->mutex);
stream->component = component;
#ifdef HAVE_PORTRANGE
/* FIXME: libnice supports this since 0.1.0, but the 0.1.3 on Fedora fails with an undefined reference! */
nice_agent_set_port_range(handle->agent, handle->stream_id, 1, rtp_range_min, rtp_range_max);
#endif
/* Gather now only if we're doing hanf-trickle */
if(!janus_full_trickle_enabled && !nice_agent_gather_candidates(handle->agent, handle->stream_id)) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Error gathering candidates...\n", handle->handle_id);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_AGENT);
janus_ice_webrtc_hangup(handle, "Gathering error");
return -1;
}
nice_agent_attach_recv(handle->agent, handle->stream_id, 1, g_main_loop_get_context(handle->mainloop),
janus_ice_cb_nice_recv, component);
#ifdef HAVE_TURNRESTAPI
if(turnrest_credentials != NULL) {
janus_turnrest_response_destroy(turnrest_credentials);
turnrest_credentials = NULL;
}
#endif
/* Create DTLS-SRTP context, at last */
component->dtls = janus_dtls_srtp_create(component, stream->dtls_role);
if(!component->dtls) {
/* FIXME We should clear some resources... */
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Error creating DTLS-SRTP stack...\n", handle->handle_id);
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_AGENT);
janus_ice_webrtc_hangup(handle, "DTLS-SRTP stack error");
return -1;
}
janus_refcount_increase(&component->dtls->ref);
/* If we're doing full-tricke, start gathering asynchronously */
if(janus_full_trickle_enabled) {
#if GLIB_CHECK_VERSION(2, 46, 0)
g_async_queue_push_front(handle->queued_packets, &janus_ice_start_gathering);
#else
g_async_queue_push(handle->queued_packets, &janus_ice_start_gathering);
#endif
g_main_context_wakeup(handle->mainctx);
}
return 0;
}
void janus_ice_restart(janus_ice_handle *handle) {
if(!handle || !handle->agent || !handle->stream)
return;
/* Restart ICE */
if(nice_agent_restart(handle->agent) == FALSE) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] ICE restart failed...\n", handle->handle_id);
}
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ICE_RESTART);
}
void janus_ice_resend_trickles(janus_ice_handle *handle) {
if(!handle || !handle->agent)
return;
janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RESEND_TRICKLES);
janus_ice_stream *stream = handle->stream;
if(!stream)
return;
janus_ice_component *component = stream->component;
if(!component)
return;
NiceAgent *agent = handle->agent;
/* Iterate on all existing local candidates */
gchar buffer[200];
GSList *candidates, *i;
candidates = nice_agent_get_local_candidates (agent, stream->stream_id, component->component_id);
JANUS_LOG(LOG_VERB, "[%"SCNu64"] We have %d candidates for Stream #%d, Component #%d\n",
handle->handle_id, g_slist_length(candidates), stream->stream_id, component->component_id);
for(i = candidates; i; i = i->next) {
NiceCandidate *c = (NiceCandidate *) i->data;
if(c->type == NICE_CANDIDATE_TYPE_PEER_REFLEXIVE) {
nice_candidate_free(c);
continue;
}
guint public_ip_index = 0;
do {
if(janus_ice_candidate_to_string(handle, c, buffer, sizeof(buffer), FALSE, FALSE, public_ip_index) == 0) {
/* Candidate encoded, send a "trickle" event to the browser */
janus_ice_notify_trickle(handle, buffer);
/* If nat-1-1 is enabled but we want to keep the private host, add another candidate */
if(nat_1_1_enabled && keep_private_host && public_ip_index == 0 &&
janus_ice_candidate_to_string(handle, c, buffer, sizeof(buffer), FALSE, TRUE, public_ip_index) == 0) {
/* Candidate encoded, send a "trickle" event to the browser */
janus_ice_notify_trickle(handle, buffer);
}
}
public_ip_index++;
} while (public_ip_index < janus_get_public_ip_count());
nice_candidate_free(c);
}
/* Send a "completed" trickle at the end */
janus_ice_notify_trickle(handle, NULL);
}
static gint rtcp_transport_wide_cc_stats_comparator(gconstpointer item1, gconstpointer item2) {
return ((rtcp_transport_wide_cc_stats*)item1)->transport_seq_num - ((rtcp_transport_wide_cc_stats*)item2)->transport_seq_num;
}
static gboolean janus_ice_outgoing_transport_wide_cc_feedback(gpointer user_data) {
janus_ice_handle *handle = (janus_ice_handle *)user_data;
janus_ice_stream *stream = handle->stream;
if(stream && stream->video_recv && stream->do_transport_wide_cc) {
/* Create a transport wide feedback message */
size_t size = 1300;
char rtcpbuf[1300];
/* Order packet list */
stream->transport_wide_received_seq_nums = g_slist_sort(stream->transport_wide_received_seq_nums,
rtcp_transport_wide_cc_stats_comparator);
/* Create full stats queue */
GQueue *packets = g_queue_new();
/* For all packets */
GSList *it = NULL;
for(it = stream->transport_wide_received_seq_nums; it; it = it->next) {
/* Get stat */
janus_rtcp_transport_wide_cc_stats *stats = (janus_rtcp_transport_wide_cc_stats *)it->data;
/* Get transport seq */
guint32 transport_seq_num = stats->transport_seq_num;
/* Check if it is an out of order */
if(transport_seq_num < stream->transport_wide_cc_last_feedback_seq_num) {
/* Skip, it was already reported as lost */
g_free(stats);
continue;
}
/* If not first */
if(stream->transport_wide_cc_last_feedback_seq_num) {
/* For each lost */
guint32 i = 0;
for(i = stream->transport_wide_cc_last_feedback_seq_num+1; i<transport_seq_num; ++i) {
/* Create new stat */
janus_rtcp_transport_wide_cc_stats *missing = g_malloc(sizeof(janus_rtcp_transport_wide_cc_stats));
/* Add missing packet */
missing->transport_seq_num = i;
missing->timestamp = 0;
/* Add it */
g_queue_push_tail(packets, missing);
}
}
/* Store last */
stream->transport_wide_cc_last_feedback_seq_num = transport_seq_num;
/* Add this one */
g_queue_push_tail(packets, stats);
}
/* Free and reset stats list */
g_slist_free(stream->transport_wide_received_seq_nums);
stream->transport_wide_received_seq_nums = NULL;
/* Create and enqueue RTCP packets */
guint packets_len = 0;
while((packets_len = g_queue_get_length(packets)) > 0) {
GQueue *packets_to_process;
/* If we have more than 400 packets to acknowledge, let's send more than one message */
if(packets_len > 400) {
/* Split the queue into two */
GList *new_head = g_queue_peek_nth_link(packets, 400);
GList *new_tail = new_head->prev;
new_head->prev = NULL;
new_tail->next = NULL;
packets_to_process = g_queue_new();
packets_to_process->head = packets->head;
packets_to_process->tail = new_tail;
packets_to_process->length = 400;
packets->head = new_head;
/* packets->tail is unchanged */
packets->length = packets_len - 400;
} else {
packets_to_process = packets;
}
/* Get feedback packet count and increase it for next one */
guint8 feedback_packet_count = stream->transport_wide_cc_feedback_count++;
/* Create RTCP packet */
int len = janus_rtcp_transport_wide_cc_feedback(rtcpbuf, size,
stream->video_ssrc, stream->video_ssrc_peer[0], feedback_packet_count, packets_to_process);
/* Enqueue it, we'll send it later */
if(len > 0) {
janus_plugin_rtcp rtcp = { .video = TRUE, .buffer = rtcpbuf, .length = len };
janus_ice_relay_rtcp_internal(handle, &rtcp, FALSE);
}
if(packets_to_process != packets) {
g_queue_free(packets_to_process);
}
}
/* Free mem */
g_queue_free(packets);
}
return G_SOURCE_CONTINUE;
}
static gboolean janus_ice_outgoing_rtcp_handle(gpointer user_data) {
janus_ice_handle *handle = (janus_ice_handle *)user_data;
janus_ice_stream *stream = handle->stream;
/* Audio */
if(stream && stream->audio_send && stream->component && stream->component->out_stats.audio.packets > 0) {
/* Create a SR/SDES compound */
int srlen = 28;
int sdeslen = 16;
char rtcpbuf[srlen+sdeslen];
memset(rtcpbuf, 0, sizeof(rtcpbuf));
rtcp_sr *sr = (rtcp_sr *)&rtcpbuf;
sr->header.version = 2;
sr->header.type = RTCP_SR;
sr->header.rc = 0;
sr->header.length = htons((srlen/4)-1);
sr->ssrc = htonl(stream->audio_ssrc);
struct timeval tv;
gettimeofday(&tv, NULL);
uint32_t s = tv.tv_sec + 2208988800u;
uint32_t u = tv.tv_usec;
uint32_t f = (u << 12) + (u << 8) - ((u * 3650) >> 6);
sr->si.ntp_ts_msw = htonl(s);
sr->si.ntp_ts_lsw = htonl(f);
/* Compute an RTP timestamp coherent with the NTP one */
rtcp_context *rtcp_ctx = stream->audio_rtcp_ctx;
if(rtcp_ctx == NULL) {
sr->si.rtp_ts = htonl(stream->audio_last_rtp_ts); /* FIXME */
} else {
int64_t ntp = tv.tv_sec*G_USEC_PER_SEC + tv.tv_usec;
uint32_t rtp_ts = ((ntp-stream->audio_last_ntp_ts)*(rtcp_ctx->tb))/1000000 + stream->audio_last_rtp_ts;
sr->si.rtp_ts = htonl(rtp_ts);
}
sr->si.s_packets = htonl(stream->component->out_stats.audio.packets);
sr->si.s_octets = htonl(stream->component->out_stats.audio.bytes);
rtcp_sdes *sdes = (rtcp_sdes *)&rtcpbuf[28];
janus_rtcp_sdes_cname((char *)sdes, sdeslen, "janus", 5);
sdes->chunk.ssrc = htonl(stream->audio_ssrc);
/* Enqueue it, we'll send it later */
janus_plugin_rtcp rtcp = { .video = FALSE, .buffer = rtcpbuf, .length = srlen+sdeslen };
janus_ice_relay_rtcp_internal(handle, &rtcp, FALSE);
/* Check if we detected too many losses, and send a slowlink event in case */
guint lost = janus_rtcp_context_get_lost_all(rtcp_ctx, TRUE);
janus_slow_link_update(stream->component, handle, FALSE, TRUE, lost);
}
if(stream && stream->audio_recv) {
/* Create a RR too */
int rrlen = 32;
char rtcpbuf[32];
memset(rtcpbuf, 0, sizeof(rtcpbuf));
rtcp_rr *rr = (rtcp_rr *)&rtcpbuf;
rr->header.version = 2;
rr->header.type = RTCP_RR;
rr->header.rc = 1;
rr->header.length = htons((rrlen/4)-1);
rr->ssrc = htonl(stream->audio_ssrc);
janus_rtcp_report_block(stream->audio_rtcp_ctx, &rr->rb[0]);
rr->rb[0].ssrc = htonl(stream->audio_ssrc_peer);
/* Enqueue it, we'll send it later */
janus_plugin_rtcp rtcp = { .video = FALSE, .buffer = rtcpbuf, .length = 32 };
janus_ice_relay_rtcp_internal(handle, &rtcp, FALSE);
/* Check if we detected too many losses, and send a slowlink event in case */
guint lost = janus_rtcp_context_get_lost_all(stream->audio_rtcp_ctx, FALSE);
janus_slow_link_update(stream->component, handle, FALSE, FALSE, lost);
}
/* Now do the same for video */
if(stream && stream->video_send && stream->component && stream->component->out_stats.video[0].packets > 0) {
/* Create a SR/SDES compound */
int srlen = 28;
int sdeslen = 16;
char rtcpbuf[srlen+sdeslen];
memset(rtcpbuf, 0, sizeof(rtcpbuf));
rtcp_sr *sr = (rtcp_sr *)&rtcpbuf;
sr->header.version = 2;
sr->header.type = RTCP_SR;
sr->header.rc = 0;
sr->header.length = htons((srlen/4)-1);
sr->ssrc = htonl(stream->video_ssrc);
struct timeval tv;
gettimeofday(&tv, NULL);
uint32_t s = tv.tv_sec + 2208988800u;
uint32_t u = tv.tv_usec;
uint32_t f = (u << 12) + (u << 8) - ((u * 3650) >> 6);
sr->si.ntp_ts_msw = htonl(s);
sr->si.ntp_ts_lsw = htonl(f);
/* Compute an RTP timestamp coherent with the NTP one */
rtcp_context *rtcp_ctx = stream->video_rtcp_ctx[0];
if(rtcp_ctx == NULL) {
sr->si.rtp_ts = htonl(stream->video_last_rtp_ts); /* FIXME */
} else {
int64_t ntp = tv.tv_sec*G_USEC_PER_SEC + tv.tv_usec;
uint32_t rtp_ts = ((ntp-stream->video_last_ntp_ts)*(rtcp_ctx->tb))/1000000 + stream->video_last_rtp_ts;
sr->si.rtp_ts = htonl(rtp_ts);
}
sr->si.s_packets = htonl(stream->component->out_stats.video[0].packets);
sr->si.s_octets = htonl(stream->component->out_stats.video[0].bytes);
rtcp_sdes *sdes = (rtcp_sdes *)&rtcpbuf[28];
janus_rtcp_sdes_cname((char *)sdes, sdeslen, "janus", 5);
sdes->chunk.ssrc = htonl(stream->video_ssrc);
/* Enqueue it, we'll send it later */
janus_plugin_rtcp rtcp = { .video = TRUE, .buffer = rtcpbuf, .length = srlen+sdeslen };
janus_ice_relay_rtcp_internal(handle, &rtcp, FALSE);
/* Check if we detected too many losses, and send a slowlink event in case */
guint lost = janus_rtcp_context_get_lost_all(rtcp_ctx, TRUE);
janus_slow_link_update(stream->component, handle, TRUE, TRUE, lost);
}
if(stream && stream->video_recv) {
/* Create a RR too (for each SSRC, if we're simulcasting) */
int vindex=0;
for(vindex=0; vindex<3; vindex++) {
if(stream->video_rtcp_ctx[vindex] && stream->video_rtcp_ctx[vindex]->rtp_recvd) {
/* Create a RR */
int rrlen = 32;
char rtcpbuf[32];
memset(rtcpbuf, 0, sizeof(rtcpbuf));
rtcp_rr *rr = (rtcp_rr *)&rtcpbuf;
rr->header.version = 2;
rr->header.type = RTCP_RR;
rr->header.rc = 1;
rr->header.length = htons((rrlen/4)-1);
rr->ssrc = htonl(stream->video_ssrc);
janus_rtcp_report_block(stream->video_rtcp_ctx[vindex], &rr->rb[0]);
rr->rb[0].ssrc = htonl(stream->video_ssrc_peer[vindex]);
/* Enqueue it, we'll send it later */
janus_plugin_rtcp rtcp = { .video = TRUE, .buffer = rtcpbuf, .length = 32 };
janus_ice_relay_rtcp_internal(handle, &rtcp, FALSE);
}
}
/* Check if we detected too many losses, and send a slowlink event in case */
guint lost = janus_rtcp_context_get_lost_all(stream->video_rtcp_ctx[0], FALSE);
janus_slow_link_update(stream->component, handle, TRUE, FALSE, lost);
}
if(twcc_period == 1000) {
/* The Transport Wide CC feedback period is 1s as well, send it here */
janus_ice_outgoing_transport_wide_cc_feedback(handle);
}
return G_SOURCE_CONTINUE;
}
static gboolean janus_ice_outgoing_stats_handle(gpointer user_data) {
janus_ice_handle *handle = (janus_ice_handle *)user_data;
/* This callback is for stats and other things we need to do on a regular basis (typically called once per second) */
janus_session *session = (janus_session *)handle->session;
gint64 now = janus_get_monotonic_time();
/* Reset the last second counters if too much time passed with no data in or out */
janus_ice_stream *stream = handle->stream;
if(stream == NULL || stream->component == NULL)
return G_SOURCE_CONTINUE;
janus_ice_component *component = stream->component;
/* Audio */
gint64 last = component->in_stats.audio.updated;
if(last && now > last && now-last >= 2*G_USEC_PER_SEC && component->in_stats.audio.bytes_lastsec_temp > 0) {
component->in_stats.audio.bytes_lastsec = 0;
component->in_stats.audio.bytes_lastsec_temp = 0;
}
last = component->out_stats.audio.updated;
if(last && now > last && now-last >= 2*G_USEC_PER_SEC && component->out_stats.audio.bytes_lastsec_temp > 0) {
component->out_stats.audio.bytes_lastsec = 0;
component->out_stats.audio.bytes_lastsec_temp = 0;
}
/* Video */
int vindex = 0;
for(vindex=0; vindex < 3; vindex++) {
gint64 last = component->in_stats.video[vindex].updated;
if(last && now > last && now-last >= 2*G_USEC_PER_SEC && component->in_stats.video[vindex].bytes_lastsec_temp > 0) {
component->in_stats.video[vindex].bytes_lastsec = 0;
component->in_stats.video[vindex].bytes_lastsec_temp = 0;
}
last = component->out_stats.video[vindex].updated;
if(last && now > last && now-last >= 2*G_USEC_PER_SEC && component->out_stats.video[vindex].bytes_lastsec_temp > 0) {
component->out_stats.video[vindex].bytes_lastsec = 0;
component->out_stats.video[vindex].bytes_lastsec_temp = 0;
}
}
/* Now let's see if we need to notify the user about no incoming audio or video */
if(no_media_timer > 0 && component->dtls && component->dtls->dtls_connected > 0 && (now - component->dtls->dtls_connected >= G_USEC_PER_SEC)) {
/* Audio */
gint64 last = component->in_stats.audio.updated;
if(!component->in_stats.audio.notified_lastsec && last &&
!component->in_stats.audio.bytes_lastsec && !component->in_stats.audio.bytes_lastsec_temp &&
now-last >= (gint64)no_media_timer*G_USEC_PER_SEC) {
/* We missed more than no_second_timer seconds of audio! */
component->in_stats.audio.notified_lastsec = TRUE;
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Didn't receive audio for more than %d seconds...\n", handle->handle_id, no_media_timer);
janus_ice_notify_media(handle, FALSE, FALSE);
}
/* Video */
last = component->in_stats.video[0].updated;
if(!component->in_stats.video[0].notified_lastsec && last &&
!component->in_stats.video[0].bytes_lastsec && !component->in_stats.video[0].bytes_lastsec_temp &&
now-last >= (gint64)no_media_timer*G_USEC_PER_SEC) {
/* We missed more than no_second_timer seconds of video! */
component->in_stats.video[0].notified_lastsec = TRUE;
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Didn't receive video for more than a second...\n", handle->handle_id);
janus_ice_notify_media(handle, TRUE, FALSE);
}
}
/* We also send live stats to event handlers every tot-seconds (configurable) */
handle->last_event_stats++;
if(janus_ice_event_stats_period > 0 && handle->last_event_stats >= janus_ice_event_stats_period) {
handle->last_event_stats = 0;
/* Audio */
if(janus_events_is_enabled() && janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_AUDIO)) {
if(stream && stream->audio_rtcp_ctx) {
json_t *info = json_object();
json_object_set_new(info, "media", json_string("audio"));
json_object_set_new(info, "base", json_integer(stream->audio_rtcp_ctx->tb));
json_object_set_new(info, "rtt", json_integer(janus_rtcp_context_get_rtt(stream->audio_rtcp_ctx)));
json_object_set_new(info, "lost", json_integer(janus_rtcp_context_get_lost_all(stream->audio_rtcp_ctx, FALSE)));
json_object_set_new(info, "lost-by-remote", json_integer(janus_rtcp_context_get_lost_all(stream->audio_rtcp_ctx, TRUE)));
json_object_set_new(info, "jitter-local", json_integer(janus_rtcp_context_get_jitter(stream->audio_rtcp_ctx, FALSE)));
json_object_set_new(info, "jitter-remote", json_integer(janus_rtcp_context_get_jitter(stream->audio_rtcp_ctx, TRUE)));
json_object_set_new(info, "in-link-quality", json_integer(janus_rtcp_context_get_in_link_quality(stream->audio_rtcp_ctx)));
json_object_set_new(info, "in-media-link-quality", json_integer(janus_rtcp_context_get_in_media_link_quality(stream->audio_rtcp_ctx)));
json_object_set_new(info, "out-link-quality", json_integer(janus_rtcp_context_get_out_link_quality(stream->audio_rtcp_ctx)));
json_object_set_new(info, "out-media-link-quality", json_integer(janus_rtcp_context_get_out_media_link_quality(stream->audio_rtcp_ctx)));
if(stream->component) {
json_object_set_new(info, "packets-received", json_integer(stream->component->in_stats.audio.packets));
json_object_set_new(info, "packets-sent", json_integer(stream->component->out_stats.audio.packets));
json_object_set_new(info, "bytes-received", json_integer(stream->component->in_stats.audio.bytes));
json_object_set_new(info, "bytes-sent", json_integer(stream->component->out_stats.audio.bytes));
json_object_set_new(info, "bytes-received-lastsec", json_integer(stream->component->in_stats.audio.bytes_lastsec));
json_object_set_new(info, "bytes-sent-lastsec", json_integer(stream->component->out_stats.audio.bytes_lastsec));
json_object_set_new(info, "nacks-received", json_integer(stream->component->in_stats.audio.nacks));
json_object_set_new(info, "nacks-sent", json_integer(stream->component->out_stats.audio.nacks));
json_object_set_new(info, "retransmissions-received", json_integer(stream->audio_rtcp_ctx->retransmitted));
}
janus_events_notify_handlers(JANUS_EVENT_TYPE_MEDIA, JANUS_EVENT_SUBTYPE_MEDIA_STATS,
session->session_id, handle->handle_id, handle->opaque_id, info);
}
}
/* Do the same for video */
if(janus_events_is_enabled() && janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_VIDEO)) {
int vindex=0;
for(vindex=0; vindex<3; vindex++) {
if(stream && stream->video_rtcp_ctx[vindex]) {
json_t *info = json_object();
if(vindex == 0)
json_object_set_new(info, "media", json_string("video"));
else if(vindex == 1)
json_object_set_new(info, "media", json_string("video-sim1"));
else
json_object_set_new(info, "media", json_string("video-sim2"));
json_object_set_new(info, "base", json_integer(stream->video_rtcp_ctx[vindex]->tb));
if(vindex == 0)
json_object_set_new(info, "rtt", json_integer(janus_rtcp_context_get_rtt(stream->video_rtcp_ctx[vindex])));
json_object_set_new(info, "lost", json_integer(janus_rtcp_context_get_lost_all(stream->video_rtcp_ctx[vindex], FALSE)));
json_object_set_new(info, "lost-by-remote", json_integer(janus_rtcp_context_get_lost_all(stream->video_rtcp_ctx[vindex], TRUE)));
json_object_set_new(info, "jitter-local", json_integer(janus_rtcp_context_get_jitter(stream->video_rtcp_ctx[vindex], FALSE)));
json_object_set_new(info, "jitter-remote", json_integer(janus_rtcp_context_get_jitter(stream->video_rtcp_ctx[vindex], TRUE)));
json_object_set_new(info, "in-link-quality", json_integer(janus_rtcp_context_get_in_link_quality(stream->video_rtcp_ctx[vindex])));
json_object_set_new(info, "in-media-link-quality", json_integer(janus_rtcp_context_get_in_media_link_quality(stream->video_rtcp_ctx[vindex])));
json_object_set_new(info, "out-link-quality", json_integer(janus_rtcp_context_get_out_link_quality(stream->video_rtcp_ctx[vindex])));
json_object_set_new(info, "out-media-link-quality", json_integer(janus_rtcp_context_get_out_media_link_quality(stream->video_rtcp_ctx[vindex])));
if(stream->component) {
json_object_set_new(info, "packets-received", json_integer(stream->component->in_stats.video[vindex].packets));
json_object_set_new(info, "packets-sent", json_integer(stream->component->out_stats.video[vindex].packets));
json_object_set_new(info, "bytes-received", json_integer(stream->component->in_stats.video[vindex].bytes));
json_object_set_new(info, "bytes-sent", json_integer(stream->component->out_stats.video[vindex].bytes));
json_object_set_new(info, "bytes-received-lastsec", json_integer(stream->component->in_stats.video[vindex].bytes_lastsec));
json_object_set_new(info, "bytes-sent-lastsec", json_integer(stream->component->out_stats.video[vindex].bytes_lastsec));
json_object_set_new(info, "nacks-received", json_integer(stream->component->in_stats.video[vindex].nacks));
json_object_set_new(info, "nacks-sent", json_integer(stream->component->out_stats.video[vindex].nacks));
json_object_set_new(info, "retransmissions-received", json_integer(stream->video_rtcp_ctx[vindex]->retransmitted));
}
janus_events_notify_handlers(JANUS_EVENT_TYPE_MEDIA, JANUS_EVENT_SUBTYPE_MEDIA_STATS,
session->session_id, handle->handle_id, handle->opaque_id, info);
}
}
}
}
/* Should we clean up old NACK buffers for any of the streams? */
janus_cleanup_nack_buffer(now, handle->stream, TRUE, TRUE);
/* Check if we should also print a summary of SRTP-related errors */
handle->last_srtp_summary++;
if(handle->last_srtp_summary == 0 || handle->last_srtp_summary == 2) {
if(handle->srtp_errors_count > 0) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Got %d SRTP/SRTCP errors in the last few seconds (last error: %s)\n",
handle->handle_id, handle->srtp_errors_count, janus_srtp_error_str(handle->last_srtp_error));
handle->srtp_errors_count = 0;
handle->last_srtp_error = 0;
}
handle->last_srtp_summary = 0;
}
return G_SOURCE_CONTINUE;
}
static gboolean janus_ice_outgoing_traffic_handle(janus_ice_handle *handle, janus_ice_queued_packet *pkt) {
janus_session *session = (janus_session *)handle->session;
janus_ice_stream *stream = handle->stream;
janus_ice_component *component = stream ? stream->component : NULL;
if(pkt == &janus_ice_start_gathering) {
/* Start gathering candidates */
if(handle->agent == NULL) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] No ICE agent, not going to gather candidates...\n", handle->handle_id);
} else if(!nice_agent_gather_candidates(handle->agent, handle->stream_id)) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Error gathering candidates...\n", handle->handle_id);
janus_ice_webrtc_hangup(handle, "ICE gathering error");
}
return G_SOURCE_CONTINUE;
} else if(pkt == &janus_ice_add_candidates) {
/* There are remote candidates pending, add them now */
GSList *candidates = NULL;
NiceCandidate *c = NULL;
while((c = g_async_queue_try_pop(handle->queued_candidates)) != NULL) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Processing candidate %p\n", handle->handle_id, c);
candidates = g_slist_append(candidates, c);
}
guint count = g_slist_length(candidates);
if(stream != NULL && component != NULL && count > 0) {
int added = nice_agent_set_remote_candidates(handle->agent, stream->stream_id, component->component_id, candidates);
if(added < 0 || (guint)added != count) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Failed to add some remote candidates (added %u, expected %u)\n",
handle->handle_id, added, count);
} else {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] %d remote %s added\n", handle->handle_id,
count, (count > 1 ? "candidates" : "candidate"));
}
}
g_slist_free(candidates);
return G_SOURCE_CONTINUE;
} else if(pkt == &janus_ice_dtls_handshake) {
if(!janus_is_webrtc_encryption_enabled()) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] WebRTC encryption disabled, skipping DTLS handshake\n", handle->handle_id);
janus_ice_dtls_handshake_done(handle, component);
return G_SOURCE_CONTINUE;
} else if(!component) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] ICE component not initialized, aborting DTLS handshake\n", handle->handle_id);
return G_SOURCE_CONTINUE;
}
/* Start the DTLS handshake */
janus_dtls_srtp_handshake(component->dtls);
/* Create retransmission timer */
component->dtlsrt_source = g_timeout_source_new(50);
g_source_set_callback(component->dtlsrt_source, janus_dtls_retry, component->dtls, NULL);
guint id = g_source_attach(component->dtlsrt_source, handle->mainctx);
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Creating retransmission timer with ID %u\n", handle->handle_id, id);
return G_SOURCE_CONTINUE;
} else if(pkt == &janus_ice_hangup_peerconnection) {
/* The media session is over, send an alert on all streams and components */
if(handle->stream && handle->stream->component && janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_READY)) {
janus_dtls_srtp_send_alert(handle->stream->component->dtls);
}
/* Notify the plugin about the fact this PeerConnection has just gone */
janus_plugin *plugin = (janus_plugin *)handle->app;
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Telling the plugin about the hangup (%s)\n",
handle->handle_id, plugin ? plugin->get_name() : "??");
if(plugin != NULL && handle->app_handle != NULL) {
plugin->hangup_media(handle->app_handle);
}
/* Get rid of the attached sources */
if(handle->rtcp_source) {
g_source_destroy(handle->rtcp_source);
g_source_unref(handle->rtcp_source);
handle->rtcp_source = NULL;
}
if(handle->twcc_source) {
g_source_destroy(handle->twcc_source);
g_source_unref(handle->twcc_source);
handle->twcc_source = NULL;
}
if(handle->stats_source) {
g_source_destroy(handle->stats_source);
g_source_unref(handle->stats_source);
handle->stats_source = NULL;
}
/* If event handlers are active, send stats one last time */
if(janus_events_is_enabled()) {
handle->last_event_stats = janus_ice_event_stats_period;
(void)janus_ice_outgoing_stats_handle(handle);
}
janus_ice_webrtc_free(handle);
return G_SOURCE_CONTINUE;
} else if(pkt == &janus_ice_detach_handle) {
/* This handle has just been detached, notify the plugin */
janus_plugin *plugin = (janus_plugin *)handle->app;
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Telling the plugin about the handle detach (%s)\n",
handle->handle_id, plugin ? plugin->get_name() : "??");
if(plugin != NULL && handle->app_handle != NULL) {
int error = 0;
plugin->destroy_session(handle->app_handle, &error);
}
handle->app_handle = NULL;
/* TODO Get rid of the loop by removing the source */
if(handle->rtp_source) {
g_source_destroy(handle->rtp_source);
g_source_unref(handle->rtp_source);
handle->rtp_source = NULL;
}
/* Prepare JSON event to notify user/application */
json_t *event = json_object();
json_object_set_new(event, "janus", json_string("detached"));
json_object_set_new(event, "session_id", json_integer(session->session_id));
json_object_set_new(event, "sender", json_integer(handle->handle_id));
if(opaqueid_in_api && handle->opaque_id != NULL)
json_object_set_new(event, "opaque_id", json_string(handle->opaque_id));
/* Send the event */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Sending event to transport...; %p\n", handle->handle_id, handle);
janus_session_notify_event(session, event);
/* Notify event handlers as well */
if(janus_events_is_enabled())
janus_events_notify_handlers(JANUS_EVENT_TYPE_HANDLE, JANUS_EVENT_SUBTYPE_NONE,
session->session_id, handle->handle_id, "detached",
plugin ? plugin->get_package() : NULL, handle->opaque_id, handle->token);
return G_SOURCE_REMOVE;
} else if(pkt == &janus_ice_data_ready) {
/* Data is writable on this PeerConnection, notify the plugin */
janus_plugin *plugin = (janus_plugin *)handle->app;
if(plugin != NULL && plugin->data_ready != NULL && handle->app_handle != NULL) {
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Telling the plugin about the data channel being ready (%s)\n",
handle->handle_id, plugin ? plugin->get_name() : "??");
plugin->data_ready(handle->app_handle);
}
}
if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_READY)) {
janus_ice_free_queued_packet(pkt);
return G_SOURCE_CONTINUE;
}
/* Now let's get on with the packet */
if(pkt == NULL)
return G_SOURCE_CONTINUE;
if(pkt->data == NULL || stream == NULL) {
janus_ice_free_queued_packet(pkt);
return G_SOURCE_CONTINUE;
}
gint64 age = (janus_get_monotonic_time() - pkt->added);
if(age > G_USEC_PER_SEC) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Discarding too old outgoing packet (age=%"SCNi64"us)\n", handle->handle_id, age);
janus_ice_free_queued_packet(pkt);
return G_SOURCE_CONTINUE;
}
if(!stream->cdone) {
if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT) && !stream->noerrorlog) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] No candidates not gathered yet for stream??\n", handle->handle_id);
stream->noerrorlog = TRUE; /* Don't flood with the same error all over again */
}
janus_ice_free_queued_packet(pkt);
return G_SOURCE_CONTINUE;
}
if(pkt->control) {
/* RTCP */
int video = (pkt->type == JANUS_ICE_PACKET_VIDEO);
stream->noerrorlog = FALSE;
if(janus_is_webrtc_encryption_enabled() && (!component->dtls || !component->dtls->srtp_valid || !component->dtls->srtp_out)) {
if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT) && !component->noerrorlog) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] %s stream (#%u) component has no valid SRTP session (yet?)\n",
handle->handle_id, video ? "video" : "audio", stream->stream_id);
component->noerrorlog = TRUE; /* Don't flood with the same error all over again */
}
janus_ice_free_queued_packet(pkt);
return G_SOURCE_CONTINUE;
}
component->noerrorlog = FALSE;
if(pkt->encrypted) {
/* Already SRTCP */
int sent = nice_agent_send(handle->agent, stream->stream_id, component->component_id, pkt->length, (const gchar *)pkt->data);
if(sent < pkt->length) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] ... only sent %d bytes? (was %d)\n", handle->handle_id, sent, pkt->length);
}
} else {
/* Check if there's anything we need to do before sending */
uint32_t bitrate = janus_rtcp_get_remb(pkt->data, pkt->length);
if(bitrate > 0) {
/* There's a REMB, prepend a RR as it won't work otherwise */
int rrlen = 8;
char *rtcpbuf = g_malloc0(rrlen+pkt->length+SRTP_MAX_TAG_LEN+4);
rtcp_rr *rr = (rtcp_rr *)rtcpbuf;
rr->header.version = 2;
rr->header.type = RTCP_RR;
rr->header.rc = 0;
rr->header.length = htons((rrlen/4)-1);
janus_ice_stream *stream = handle->stream;
/* Append REMB */
memcpy(rtcpbuf+rrlen, pkt->data, pkt->length);
/* If we're simulcasting, set the extra SSRCs (the first one will be set by janus_rtcp_fix_ssrc) */
if(stream->video_ssrc_peer[1] && pkt->length >= 28) {
rtcp_fb *rtcpfb = (rtcp_fb *)(rtcpbuf+rrlen);
rtcp_remb *remb = (rtcp_remb *)rtcpfb->fci;
remb->ssrc[1] = htonl(stream->video_ssrc_peer[1]);
if(stream->video_ssrc_peer[2] && pkt->length >= 32) {
remb->ssrc[2] = htonl(stream->video_ssrc_peer[2]);
}
}
/* Free old packet and update */
char *prev_data = pkt->data;
pkt->data = rtcpbuf;
pkt->length = rrlen+pkt->length;
g_clear_pointer(&prev_data, g_free);
}
/* Do we need to dump this packet for debugging? */
if(g_atomic_int_get(&handle->dump_packets))
janus_text2pcap_dump(handle->text2pcap, JANUS_TEXT2PCAP_RTCP, FALSE, pkt->data, pkt->length,
"[session=%"SCNu64"][handle=%"SCNu64"]", session->session_id, handle->handle_id);
/* Encrypt SRTCP */
int protected = pkt->length;
int res = janus_is_webrtc_encryption_enabled() ?
srtp_protect_rtcp(component->dtls->srtp_out, pkt->data, &protected) : srtp_err_status_ok;
if(res != srtp_err_status_ok) {
/* We don't spam the logs for every SRTP error: just take note of this, and print a summary later */
handle->srtp_errors_count++;
handle->last_srtp_error = res;
/* If we're debugging, though, print every occurrence */
JANUS_LOG(LOG_DBG, "[%"SCNu64"] ... SRTCP protect error... %s (len=%d-->%d)...\n", handle->handle_id, janus_srtp_error_str(res), pkt->length, protected);
} else {
/* Shoot! */
int sent = nice_agent_send(handle->agent, stream->stream_id, component->component_id, protected, pkt->data);
if(sent < protected) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] ... only sent %d bytes? (was %d)\n", handle->handle_id, sent, protected);
}
}
}
janus_ice_free_queued_packet(pkt);
} else {
/* RTP or data */
if(pkt->type == JANUS_ICE_PACKET_AUDIO || pkt->type == JANUS_ICE_PACKET_VIDEO) {
/* RTP */
int video = (pkt->type == JANUS_ICE_PACKET_VIDEO);
if((!video && !stream->audio_send) || (video && !stream->video_send)) {
janus_ice_free_queued_packet(pkt);
return G_SOURCE_CONTINUE;
}
if(janus_is_webrtc_encryption_enabled() && (!component->dtls || !component->dtls->srtp_valid || !component->dtls->srtp_out)) {
if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT) && !component->noerrorlog) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] %s stream component has no valid SRTP session (yet?)\n",
handle->handle_id, video ? "video" : "audio");
component->noerrorlog = TRUE; /* Don't flood with the same error all over again */
}
janus_ice_free_queued_packet(pkt);
return G_SOURCE_CONTINUE;
}
component->noerrorlog = FALSE;
if(pkt->encrypted) {
/* Already RTP (probably a retransmission?) */
janus_rtp_header *header = (janus_rtp_header *)pkt->data;
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] ... Retransmitting seq.nr %"SCNu16"\n\n", handle->handle_id, ntohs(header->seq_number));
int sent = nice_agent_send(handle->agent, stream->stream_id, component->component_id, pkt->length, (const gchar *)pkt->data);
if(sent < pkt->length) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] ... only sent %d bytes? (was %d)\n", handle->handle_id, sent, pkt->length);
}
} else {
/* Overwrite SSRC */
janus_rtp_header *header = (janus_rtp_header *)pkt->data;
if(!pkt->retransmission) {
/* ... but only if this isn't a retransmission (for those we already set it before) */
header->ssrc = htonl(video ? stream->video_ssrc : stream->audio_ssrc);
}
/* Set the transport-wide sequence number, if needed */
if(video && stream->transport_wide_cc_ext_id > 0) {
stream->transport_wide_cc_out_seq_num++;
if(janus_rtp_header_extension_set_transport_wide_cc(pkt->data, pkt->length,
stream->transport_wide_cc_ext_id, stream->transport_wide_cc_out_seq_num) < 0) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Error setting transport wide CC sequence number...\n", handle->handle_id);
}
}
/* Keep track of payload types too */
if(!video && stream->audio_payload_type < 0) {
stream->audio_payload_type = header->type;
if(stream->audio_codec == NULL) {
const char *codec = janus_get_codec_from_pt(handle->local_sdp, stream->audio_payload_type);
if(codec != NULL)
stream->audio_codec = g_strdup(codec);
}
} else if(video && stream->video_payload_type < 0) {
stream->video_payload_type = header->type;
if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RFC4588_RTX) &&
stream->rtx_payload_types && g_hash_table_size(stream->rtx_payload_types) > 0) {
stream->video_rtx_payload_type = GPOINTER_TO_INT(g_hash_table_lookup(stream->rtx_payload_types, GINT_TO_POINTER(stream->video_payload_type)));
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Retransmissions will have payload type %d\n",
handle->handle_id, stream->video_rtx_payload_type);
}
if(stream->video_codec == NULL) {
const char *codec = janus_get_codec_from_pt(handle->local_sdp, stream->video_payload_type);
if(codec != NULL)
stream->video_codec = g_strdup(codec);
}
if(stream->video_is_keyframe == NULL && stream->video_codec != NULL) {
if(!strcasecmp(stream->video_codec, "vp8"))
stream->video_is_keyframe = &janus_vp8_is_keyframe;
else if(!strcasecmp(stream->video_codec, "vp9"))
stream->video_is_keyframe = &janus_vp9_is_keyframe;
else if(!strcasecmp(stream->video_codec, "h264"))
stream->video_is_keyframe = &janus_h264_is_keyframe;
else if(!strcasecmp(stream->video_codec, "av1"))
stream->video_is_keyframe = &janus_av1_is_keyframe;
else if(!strcasecmp(stream->video_codec, "h265"))
stream->video_is_keyframe = &janus_h265_is_keyframe;
}
}
/* Do we need to dump this packet for debugging? */
if(g_atomic_int_get(&handle->dump_packets))
janus_text2pcap_dump(handle->text2pcap, JANUS_TEXT2PCAP_RTP, FALSE, pkt->data, pkt->length,
"[session=%"SCNu64"][handle=%"SCNu64"]", session->session_id, handle->handle_id);
/* If this is video, check if this is a keyframe: if so, we empty our retransmit buffer for incoming NACKs */
if(video && stream->video_is_keyframe) {
int plen = 0;
char *payload = janus_rtp_payload(pkt->data, pkt->length, &plen);
if(stream->video_is_keyframe(payload, plen)) {
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Keyframe sent, cleaning retransmit buffer\n", handle->handle_id);
janus_cleanup_nack_buffer(0, stream, FALSE, TRUE);
}
}
/* Before encrypting, check if we need to copy the unencrypted payload (e.g., for rtx/90000) */
janus_rtp_packet *p = NULL;
if(stream->nack_queue_ms > 0 && !pkt->retransmission && pkt->type == JANUS_ICE_PACKET_VIDEO && component->do_video_nacks &&
janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RFC4588_RTX)) {
/* Save the packet for retransmissions that may be needed later: start by
* making room for two more bytes to store the original sequence number */
p = g_malloc(sizeof(janus_rtp_packet));
janus_rtp_header *header = (janus_rtp_header *)pkt->data;
guint16 original_seq = header->seq_number;
p->data = g_malloc(pkt->length+2);
p->length = pkt->length+2;
/* Check where the payload starts */
int plen = 0;
char *payload = janus_rtp_payload(pkt->data, pkt->length, &plen);
if(plen == 0) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Discarding outgoing empty RTP packet\n", handle->handle_id);
janus_ice_free_rtp_packet(p);
janus_ice_free_queued_packet(pkt);
return G_SOURCE_CONTINUE;
}
size_t hsize = payload - pkt->data;
/* Copy the header first */
memcpy(p->data, pkt->data, hsize);
/* Copy the original sequence number */
memcpy(p->data+hsize, &original_seq, 2);
/* Copy the payload */
memcpy(p->data+hsize+2, payload, pkt->length - hsize);
}
/* Encrypt SRTP */
int protected = pkt->length;
int res = janus_is_webrtc_encryption_enabled() ?
srtp_protect(component->dtls->srtp_out, pkt->data, &protected) : srtp_err_status_ok;
if(res != srtp_err_status_ok) {
/* We don't spam the logs for every SRTP error: just take note of this, and print a summary later */
handle->srtp_errors_count++;
handle->last_srtp_error = res;
/* If we're debugging, though, print every occurrence */
janus_rtp_header *header = (janus_rtp_header *)pkt->data;
guint32 timestamp = ntohl(header->timestamp);
guint16 seq = ntohs(header->seq_number);
JANUS_LOG(LOG_DBG, "[%"SCNu64"] ... SRTP protect error... %s (len=%d-->%d, ts=%"SCNu32", seq=%"SCNu16")...\n",
handle->handle_id, janus_srtp_error_str(res), pkt->length, protected, timestamp, seq);
janus_ice_free_rtp_packet(p);
} else {
/* Shoot! */
int sent = nice_agent_send(handle->agent, stream->stream_id, component->component_id, protected, pkt->data);
if(sent < protected) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] ... only sent %d bytes? (was %d)\n", handle->handle_id, sent, protected);
}
/* Update stats */
if(sent > 0) {
/* Update the RTCP context as well */
janus_rtp_header *header = (janus_rtp_header *)pkt->data;
guint32 timestamp = ntohl(header->timestamp);
if(pkt->type == JANUS_ICE_PACKET_AUDIO) {
component->out_stats.audio.packets++;
component->out_stats.audio.bytes += pkt->length;
/* Last second outgoing audio */
gint64 now = janus_get_monotonic_time();
if(component->out_stats.audio.updated == 0)
component->out_stats.audio.updated = now;
if(now > component->out_stats.audio.updated &&
now - component->out_stats.audio.updated >= G_USEC_PER_SEC) {
component->out_stats.audio.bytes_lastsec = component->out_stats.audio.bytes_lastsec_temp;
component->out_stats.audio.bytes_lastsec_temp = 0;
component->out_stats.audio.updated = now;
}
component->out_stats.audio.bytes_lastsec_temp += pkt->length;
struct timeval tv;
gettimeofday(&tv, NULL);
if(stream->audio_last_ntp_ts == 0 || (gint32)(timestamp - stream->audio_last_rtp_ts) > 0) {
stream->audio_last_ntp_ts = (gint64)tv.tv_sec*G_USEC_PER_SEC + tv.tv_usec;
stream->audio_last_rtp_ts = timestamp;
}
if(stream->audio_first_ntp_ts == 0) {
stream->audio_first_ntp_ts = (gint64)tv.tv_sec*G_USEC_PER_SEC + tv.tv_usec;
stream->audio_first_rtp_ts = timestamp;
}
/* Let's check if this is not Opus: in case we may need to change the timestamp base */
rtcp_context *rtcp_ctx = stream->audio_rtcp_ctx;
int pt = header->type;
uint32_t clock_rate = stream->clock_rates ?
GPOINTER_TO_UINT(g_hash_table_lookup(stream->clock_rates, GINT_TO_POINTER(pt))) : 48000;
if(rtcp_ctx->tb != clock_rate)
rtcp_ctx->tb = clock_rate;
} else if(pkt->type == JANUS_ICE_PACKET_VIDEO) {
component->out_stats.video[0].packets++;
component->out_stats.video[0].bytes += pkt->length;
/* Last second outgoing video */
gint64 now = janus_get_monotonic_time();
if(component->out_stats.video[0].updated == 0)
component->out_stats.video[0].updated = now;
if(now > component->out_stats.video[0].updated &&
now - component->out_stats.video[0].updated >= G_USEC_PER_SEC) {
component->out_stats.video[0].bytes_lastsec = component->out_stats.video[0].bytes_lastsec_temp;
component->out_stats.video[0].bytes_lastsec_temp = 0;
component->out_stats.video[0].updated = now;
}
component->out_stats.video[0].bytes_lastsec_temp += pkt->length;
struct timeval tv;
gettimeofday(&tv, NULL);
if(stream->video_last_ntp_ts == 0 || (gint32)(timestamp - stream->video_last_rtp_ts) > 0) {
stream->video_last_ntp_ts = (gint64)tv.tv_sec*G_USEC_PER_SEC + tv.tv_usec;
stream->video_last_rtp_ts = timestamp;
}
if(stream->video_first_ntp_ts[0] == 0) {
stream->video_first_ntp_ts[0] = (gint64)tv.tv_sec*G_USEC_PER_SEC + tv.tv_usec;
stream->video_first_rtp_ts[0] = timestamp;
}
}
/* Update sent packets counter */
rtcp_context *rtcp_ctx = video ? stream->video_rtcp_ctx[0] : stream->audio_rtcp_ctx;
if(rtcp_ctx)
g_atomic_int_inc(&rtcp_ctx->sent_packets_since_last_rr);
}
if(stream->nack_queue_ms > 0 && !pkt->retransmission) {
/* Save the packet for retransmissions that may be needed later */
if((pkt->type == JANUS_ICE_PACKET_AUDIO && !component->do_audio_nacks) ||
(pkt->type == JANUS_ICE_PACKET_VIDEO && !component->do_video_nacks)) {
/* ... unless NACKs are disabled for this medium */
janus_ice_free_queued_packet(pkt);
return G_SOURCE_CONTINUE;
}
if(p == NULL) {
/* If we're not doing RFC4588, we're saving the SRTP packet as it is */
p = g_malloc(sizeof(janus_rtp_packet));
p->data = g_malloc(protected);
memcpy(p->data, pkt->data, protected);
p->length = protected;
}
p->created = janus_get_monotonic_time();
p->last_retransmit = 0;
janus_rtp_header *header = (janus_rtp_header *)pkt->data;
guint16 seq = ntohs(header->seq_number);
if(!video) {
if(component->audio_retransmit_buffer == NULL) {
component->audio_retransmit_buffer = g_queue_new();
component->audio_retransmit_seqs = g_hash_table_new(NULL, NULL);
}
g_queue_push_tail(component->audio_retransmit_buffer, p);
/* Insert in the table too, for quick lookup */
g_hash_table_insert(component->audio_retransmit_seqs, GUINT_TO_POINTER(seq), p);
} else {
if(component->video_retransmit_buffer == NULL) {
component->video_retransmit_buffer = g_queue_new();
component->video_retransmit_seqs = g_hash_table_new(NULL, NULL);
}
g_queue_push_tail(component->video_retransmit_buffer, p);
/* Insert in the table too, for quick lookup */
g_hash_table_insert(component->video_retransmit_seqs, GUINT_TO_POINTER(seq), p);
}
} else {
janus_ice_free_rtp_packet(p);
}
}
}
} else if(pkt->type == JANUS_ICE_PACKET_TEXT || pkt->type == JANUS_ICE_PACKET_BINARY) {
/* Data */
if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_DATA_CHANNELS)) {
janus_ice_free_queued_packet(pkt);
return G_SOURCE_CONTINUE;
}
#ifdef HAVE_SCTP
if(!component->dtls) {
if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT) && !component->noerrorlog) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] SCTP stream component has no valid DTLS session (yet?)\n", handle->handle_id);
component->noerrorlog = TRUE; /* Don't flood with the same error all over again */
}
janus_ice_free_queued_packet(pkt);
return G_SOURCE_CONTINUE;
}
component->noerrorlog = FALSE;
/* TODO Support binary data */
janus_dtls_wrap_sctp_data(component->dtls, pkt->label, pkt->protocol,
pkt->type == JANUS_ICE_PACKET_TEXT, pkt->data, pkt->length);
#endif
} else if(pkt->type == JANUS_ICE_PACKET_SCTP) {
/* SCTP data to push */
if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_DATA_CHANNELS)) {
janus_ice_free_queued_packet(pkt);
return G_SOURCE_CONTINUE;
}
#ifdef HAVE_SCTP
/* Encapsulate this data in DTLS and send it */
if(!component->dtls) {
if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALERT) && !component->noerrorlog) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] SCTP stream component has no valid DTLS session (yet?)\n", handle->handle_id);
component->noerrorlog = TRUE; /* Don't flood with the same error all over again */
}
janus_ice_free_queued_packet(pkt);
return G_SOURCE_CONTINUE;
}
component->noerrorlog = FALSE;
janus_dtls_send_sctp_data(component->dtls, pkt->data, pkt->length);
#endif
} else {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Unsupported packet type %d\n", handle->handle_id, pkt->type);
}
janus_ice_free_queued_packet(pkt);
}
return G_SOURCE_CONTINUE;
}
static void janus_ice_queue_packet(janus_ice_handle *handle, janus_ice_queued_packet *pkt) {
/* TODO: There is a potential race condition where the "queued_packets"
* could get released between the condition and pushing the packet. */
if(handle->queued_packets != NULL) {
g_async_queue_push(handle->queued_packets, pkt);
g_main_context_wakeup(handle->mainctx);
} else {
janus_ice_free_queued_packet(pkt);
}
}
void janus_ice_relay_rtp(janus_ice_handle *handle, janus_plugin_rtp *packet) {
if(!handle || !handle->stream || handle->queued_packets == NULL || packet == NULL || packet->buffer == NULL ||
!janus_is_rtp(packet->buffer, packet->length))
return;
if((!packet->video && !janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_AUDIO))
|| (packet->video && !janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_VIDEO)))
return;
uint16_t totlen = RTP_HEADER_SIZE;
/* Check how large the payload is */
int plen = 0;
char *payload = janus_rtp_payload(packet->buffer, packet->length, &plen);
if(payload != NULL)
totlen += plen;
/* We need to strip extensions, here, and add those that need to be there manually */
uint16_t extlen = 0;
char extensions[50];
janus_rtp_header *header = (janus_rtp_header *)packet->buffer;
int origext = header->extension;
header->extension = 0;
/* Add core and plugin extensions, if any */
if((packet->video && handle->stream->transport_wide_cc_ext_id > 0) || handle->stream->mid_ext_id > 0 ||
(!packet->video && packet->extensions.audio_level != -1 && handle->stream->audiolevel_ext_id > 0) ||
(packet->video && packet->extensions.video_rotation != -1 && handle->stream->videoorientation_ext_id > 0)) {
header->extension = 1;
memset(extensions, 0, sizeof(extensions));
janus_rtp_header_extension *extheader = (janus_rtp_header_extension *)extensions;
extheader->type = htons(0xBEDE);
extheader->length = 0;
/* Iterate on all extensions we need */
char *index = extensions + 4;
/* Check if we need to add the transport-wide CC extension */
if(packet->video && handle->stream->transport_wide_cc_ext_id > 0) {
*index = (handle->stream->transport_wide_cc_ext_id << 4) + 1;
/* We'll actually set the sequence number later, when sending the packet */
memset(index+1, 0, 2);
index += 3;
extlen += 3;
}
/* Check if we need to add the mid extension */
if(handle->stream->mid_ext_id > 0) {
char *mid = packet->video ? handle->video_mid : handle->audio_mid;
if(mid != NULL) {
size_t midlen = strlen(mid) & 0x0F;
*index = (handle->stream->mid_ext_id << 4) + (midlen ? midlen-1 : 0);
memcpy(index+1, mid, midlen);
index += (midlen + 1);
extlen += (midlen + 1);
}
}
/* Check if the plugin (or source) included other extensions */
if(!packet->video && packet->extensions.audio_level != -1 && handle->stream->audiolevel_ext_id > 0) {
/* Add audio-level extension */
*index = (handle->stream->audiolevel_ext_id << 4);
*(index+1) = (packet->extensions.audio_level_vad << 7) + (packet->extensions.audio_level & 0x7F);
index += 2;
extlen += 2;
}
if(packet->video && packet->extensions.video_rotation != -1 && handle->stream->videoorientation_ext_id > 0) {
/* Add video-orientation extension */
*index = (handle->stream->videoorientation_ext_id << 4);
gboolean c = packet->extensions.video_back_camera,
f = packet->extensions.video_flipped, r1 = FALSE, r0 = FALSE;
switch(packet->extensions.video_rotation) {
case 270:
r1 = TRUE;
r0 = TRUE;
break;
case 180:
r1 = TRUE;
r0 = FALSE;
break;
case 90:
r1 = FALSE;
r0 = TRUE;
break;
case 0:
default:
r1 = FALSE;
r0 = FALSE;
break;
}
*(index+1) = (c<<3) + (f<<2) + (r1<<1) + r0;
index += 2;
extlen += 2;
}
/* Calculate the whole length */
uint16_t words = extlen/4;
if(extlen%4 != 0)
words++;
extheader->length = htons(words);
/* Update lengths (taking into account the RFC5285 header) */
extlen = 4 + (words*4);
totlen += extlen;
}
/* Queue this packet */
janus_ice_queued_packet *pkt = g_malloc(sizeof(janus_ice_queued_packet));
pkt->data = g_malloc(totlen + SRTP_MAX_TAG_LEN);
/* RTP header first */
memcpy(pkt->data, packet->buffer, RTP_HEADER_SIZE);
/* Then RTP extensions, if any */
if(extlen > 0)
memcpy(pkt->data + RTP_HEADER_SIZE, extensions, extlen);
/* Finally the RTP payload, if available */
if(payload != NULL && plen > 0)
memcpy(pkt->data + RTP_HEADER_SIZE + extlen, payload, plen);
pkt->length = totlen;
pkt->type = packet->video ? JANUS_ICE_PACKET_VIDEO : JANUS_ICE_PACKET_AUDIO;
pkt->control = FALSE;
pkt->encrypted = FALSE;
pkt->retransmission = FALSE;
pkt->label = NULL;
pkt->protocol = NULL;
pkt->added = janus_get_monotonic_time();
janus_ice_queue_packet(handle, pkt);
/* Restore the extension flag to what the plugin set it to */
header->extension = origext;
}
void janus_ice_relay_rtcp_internal(janus_ice_handle *handle, janus_plugin_rtcp *packet, gboolean filter_rtcp) {
if(!handle || handle->queued_packets == NULL || packet == NULL || packet->buffer == NULL ||
!janus_is_rtcp(packet->buffer, packet->length))
return;
/* We use this internal method to check whether we need to filter RTCP (e.g., to make
* sure we don't just forward any SR/RR from peers/plugins, but use our own) or it has
* already been done, and so this is actually a packet added by the ICE send thread */
char *rtcp_buf = packet->buffer;
int rtcp_len = packet->length;
if(filter_rtcp) {
/* FIXME Strip RR/SR/SDES/NACKs/etc. */
janus_ice_stream *stream = handle->stream;
if(stream == NULL)
return;
rtcp_buf = janus_rtcp_filter(packet->buffer, packet->length, &rtcp_len);
if(rtcp_buf == NULL || rtcp_len < 1)
return;
/* Fix all SSRCs before enqueueing, as we need to use the ones for this media
* leg. Note that this is only needed for RTCP packets coming from plugins: the
* ones created by the core already have the right SSRCs in the right place */
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Fixing SSRCs (local %u, peer %u)\n", handle->handle_id,
packet->video ? stream->video_ssrc : stream->audio_ssrc,
packet->video ? stream->video_ssrc_peer[0] : stream->audio_ssrc_peer);
janus_rtcp_fix_ssrc(NULL, rtcp_buf, rtcp_len, 1,
packet->video ? stream->video_ssrc : stream->audio_ssrc,
packet->video ? stream->video_ssrc_peer[0] : stream->audio_ssrc_peer);
}
/* Queue this packet */
janus_ice_queued_packet *pkt = g_malloc(sizeof(janus_ice_queued_packet));
pkt->data = g_malloc(rtcp_len+SRTP_MAX_TAG_LEN+4);
memcpy(pkt->data, rtcp_buf, rtcp_len);
pkt->length = rtcp_len;
pkt->type = packet->video ? JANUS_ICE_PACKET_VIDEO : JANUS_ICE_PACKET_AUDIO;
pkt->control = TRUE;
pkt->encrypted = FALSE;
pkt->retransmission = FALSE;
pkt->label = NULL;
pkt->protocol = NULL;
pkt->added = janus_get_monotonic_time();
janus_ice_queue_packet(handle, pkt);
if(rtcp_buf != packet->buffer) {
/* We filtered the original packet, deallocate it */
g_free(rtcp_buf);
}
}
void janus_ice_relay_rtcp(janus_ice_handle *handle, janus_plugin_rtcp *packet) {
janus_ice_relay_rtcp_internal(handle, packet, TRUE);
/* If this is a PLI and we're simulcasting, send a PLI on other layers as well */
if(janus_rtcp_has_pli(packet->buffer, packet->length)) {
janus_ice_stream *stream = handle->stream;
if(stream == NULL)
return;
if(stream->video_ssrc_peer[1]) {
char plibuf[12];
memset(plibuf, 0, 12);
janus_rtcp_pli((char *)&plibuf, 12);
janus_rtcp_fix_ssrc(NULL, plibuf, sizeof(plibuf), 1,
stream->video_ssrc, stream->video_ssrc_peer[1]);
janus_plugin_rtcp rtcp = { .video = TRUE, .buffer = plibuf, .length = sizeof(plibuf) };
janus_ice_relay_rtcp_internal(handle, &rtcp, FALSE);
}
if(stream->video_ssrc_peer[2]) {
char plibuf[12];
memset(plibuf, 0, 12);
janus_rtcp_pli((char *)&plibuf, 12);
janus_rtcp_fix_ssrc(NULL, plibuf, sizeof(plibuf), 1,
stream->video_ssrc, stream->video_ssrc_peer[2]);
janus_plugin_rtcp rtcp = { .video = TRUE, .buffer = plibuf, .length = sizeof(plibuf) };
janus_ice_relay_rtcp_internal(handle, &rtcp, FALSE);
}
}
}
void janus_ice_send_pli(janus_ice_handle *handle) {
char rtcpbuf[12];
memset(rtcpbuf, 0, 12);
janus_rtcp_pli((char *)&rtcpbuf, 12);
janus_plugin_rtcp rtcp = { .video = TRUE, .buffer = rtcpbuf, .length = 12 };
janus_ice_relay_rtcp(handle, &rtcp);
}
void janus_ice_send_remb(janus_ice_handle *handle, uint32_t bitrate) {
char rtcpbuf[24];
janus_rtcp_remb((char *)&rtcpbuf, 24, bitrate);
janus_plugin_rtcp rtcp = { .video = TRUE, .buffer = rtcpbuf, .length = 24 };
janus_ice_relay_rtcp(handle, &rtcp);
}
#ifdef HAVE_SCTP
void janus_ice_relay_data(janus_ice_handle *handle, janus_plugin_data *packet) {
if(!handle || handle->queued_packets == NULL || packet == NULL || packet->buffer == NULL || packet->length < 1)
return;
/* Queue this packet */
janus_ice_queued_packet *pkt = g_malloc(sizeof(janus_ice_queued_packet));
pkt->data = g_malloc(packet->length);
memcpy(pkt->data, packet->buffer, packet->length);
pkt->length = packet->length;
pkt->type = packet->binary ? JANUS_ICE_PACKET_BINARY : JANUS_ICE_PACKET_TEXT;
pkt->control = FALSE;
pkt->encrypted = FALSE;
pkt->retransmission = FALSE;
pkt->label = packet->label ? g_strdup(packet->label) : NULL;
pkt->protocol = packet->protocol ? g_strdup(packet->protocol) : NULL;
pkt->added = janus_get_monotonic_time();
janus_ice_queue_packet(handle, pkt);
}
#endif
void janus_ice_relay_sctp(janus_ice_handle *handle, char *buffer, int length) {
#ifdef HAVE_SCTP
if(!handle || handle->queued_packets == NULL || buffer == NULL || length < 1)
return;
/* Queue this packet */
janus_ice_queued_packet *pkt = g_malloc(sizeof(janus_ice_queued_packet));
pkt->data = g_malloc(length);
memcpy(pkt->data, buffer, length);
pkt->length = length;
pkt->type = JANUS_ICE_PACKET_SCTP;
pkt->control = FALSE;
pkt->encrypted = FALSE;
pkt->retransmission = FALSE;
pkt->label = NULL;
pkt->protocol = NULL;
pkt->added = janus_get_monotonic_time();
janus_ice_queue_packet(handle, pkt);
#endif
}
void janus_ice_notify_data_ready(janus_ice_handle *handle) {
#ifdef HAVE_SCTP
if(!handle || handle->queued_packets == NULL)
return;
/* Queue this event */
#if GLIB_CHECK_VERSION(2, 46, 0)
g_async_queue_push_front(handle->queued_packets, &janus_ice_data_ready);
#else
g_async_queue_push(handle->queued_packets, &janus_ice_data_ready);
#endif
g_main_context_wakeup(handle->mainctx);
#endif
}
void janus_ice_dtls_handshake_done(janus_ice_handle *handle, janus_ice_component *component) {
if(!handle || !component)
return;
JANUS_LOG(LOG_VERB, "[%"SCNu64"] The DTLS handshake for the component %d in stream %d has been completed\n",
handle->handle_id, component->component_id, component->stream_id);
/* Check if all components are ready */
janus_mutex_lock(&handle->mutex);
if(handle->stream && janus_is_webrtc_encryption_enabled()) {
if(handle->stream->component && (!handle->stream->component->dtls ||
!handle->stream->component->dtls->srtp_valid)) {
/* Still waiting for this component to become ready */
janus_mutex_unlock(&handle->mutex);
return;
}
}
if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_READY)) {
/* Already notified */
janus_mutex_unlock(&handle->mutex);
return;
}
janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_READY);
/* Create a source for RTCP and one for stats */
handle->rtcp_source = g_timeout_source_new_seconds(1);
g_source_set_priority(handle->rtcp_source, G_PRIORITY_DEFAULT);
g_source_set_callback(handle->rtcp_source, janus_ice_outgoing_rtcp_handle, handle, NULL);
g_source_attach(handle->rtcp_source, handle->mainctx);
if(twcc_period != 1000) {
/* The Transport Wide CC feedback period is different, create another source */
handle->twcc_source = g_timeout_source_new(twcc_period);
g_source_set_priority(handle->twcc_source, G_PRIORITY_DEFAULT);
g_source_set_callback(handle->twcc_source, janus_ice_outgoing_transport_wide_cc_feedback, handle, NULL);
g_source_attach(handle->twcc_source, handle->mainctx);
}
handle->last_event_stats = 0;
handle->last_srtp_summary = -1;
handle->stats_source = g_timeout_source_new_seconds(1);
g_source_set_callback(handle->stats_source, janus_ice_outgoing_stats_handle, handle, NULL);
g_source_set_priority(handle->stats_source, G_PRIORITY_DEFAULT);
g_source_attach(handle->stats_source, handle->mainctx);
janus_mutex_unlock(&handle->mutex);
JANUS_LOG(LOG_INFO, "[%"SCNu64"] The DTLS handshake has been completed\n", handle->handle_id);
/* Notify the plugin that the WebRTC PeerConnection is ready to be used */
janus_plugin *plugin = (janus_plugin *)handle->app;
if(plugin != NULL) {
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Telling the plugin about it (%s)\n", handle->handle_id, plugin->get_name());
if(plugin && plugin->setup_media && janus_plugin_session_is_alive(handle->app_handle))
plugin->setup_media(handle->app_handle);
}
/* Also prepare JSON event to notify user/application */
janus_session *session = (janus_session *)handle->session;
if(session == NULL)
return;
json_t *event = json_object();
json_object_set_new(event, "janus", json_string("webrtcup"));
json_object_set_new(event, "session_id", json_integer(session->session_id));
json_object_set_new(event, "sender", json_integer(handle->handle_id));
if(opaqueid_in_api && handle->opaque_id != NULL)
json_object_set_new(event, "opaque_id", json_string(handle->opaque_id));
/* Send the event */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Sending event to transport...; %p\n", handle->handle_id, handle);
janus_session_notify_event(session, event);
/* Notify event handlers as well */
if(janus_events_is_enabled()) {
json_t *info = json_object();
json_object_set_new(info, "connection", json_string("webrtcup"));
janus_events_notify_handlers(JANUS_EVENT_TYPE_WEBRTC, JANUS_EVENT_SUBTYPE_WEBRTC_STATE,
session->session_id, handle->handle_id, handle->opaque_id, info);
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化