ns-allinone-2.30/ns-2.30/rtp/session-rtp.cc

00001 
00035 #include <stdlib.h>
00036 #include "packet.h"
00037 #include "ip.h"
00038 #include "rtp.h"
00039 #include <math.h>
00040 
00041 #define NOW Scheduler::instance().clock()
00042 
00046 static class RTPSourceClass : public TclClass {
00047 public:
00048         RTPSourceClass() : TclClass("RTPSource") {}
00049         TclObject* create(int argc, const char*const* argv) {
00050                 if (argc >= 5)
00051                         return (new RTPSource(atoi(argv[4])));
00052 
00053                 return 0;
00054         }
00055 } class_rtp_source;
00056 
00062 static class RTPSessionClass : public TclClass {
00063 public:
00064         RTPSessionClass() : TclClass("Session/RTP") {}
00065         TclObject* create(int, const char*const*) {
00066                 return (new RTPSession());
00067         }
00068 } class_rtp_session;
00069 
00073 RTPSession::RTPSession()
00074         : allsrcs_(0), localsrc_(0), receivers_(0),last_np_(0), T_one_way_(0),rh_(0),
00075         we_sent(0),last_pkts_lost_(0),last_ehsr_(-1),tx_rate_(0), time_elapsed_(0)
00076 {
00077         bind("enableFlowControl_", &enableFlowControl_);
00078         bind("rx_recv_",&rx_recv_);
00079         bind("jitter_",&jitter_);
00080         bind("RTT_",&RTT_);
00081         bind("smooth_loss_",&smooth_loss_);
00082         alpha = 1;  //we set alpha=1 in initialization 
00083         last_time_report_ = NOW;
00084 
00085         weight[0] = 1;
00086         weight[1] = 1;
00087         weight[2] = 1;
00088         weight[3] = 1;
00089         weight[4] = 0.8;
00090         weight[5] = 0.6;
00091         weight[6] = 0.4;
00092         weight[7] = 0.2;
00093 }
00094 
00098 RTPSession::~RTPSession()
00099 {
00100         while (allsrcs_ != 0) {
00101                 RTPSource* p = allsrcs_;
00102                 allsrcs_ = allsrcs_->next;
00103                 delete p;
00104         }
00105         delete localsrc_;
00106         while (receivers_ != 0) {
00107                 RTPReceiver* p = receivers_;
00108                 receivers_ = receivers_->next;
00109                 delete p;
00110         }
00111         lst.~list<double>();
00112         
00113 }
00114 
00121 void RTPSession::localsrc_update(int)
00122 {
00123         localsrc_->np(1);
00124         localsrc_->is_sender(true);
00125 }
00126 
00133 void RTPSession::localsrc_update_nbytes(int n)
00134 {
00135         localsrc_->nbytes(n);
00136         
00137 }
00138 
00139 #define RTCP_HDRSIZE 8
00140 #define RTCP_SR_SIZE 20
00141 #define RTCP_RR_SIZE 48
00142 
00149 int RTPSession::build_report(int bye)
00150 {
00151         rh_ = new hdr_rtp;
00152         rh_->rr_ = 0;
00153         rh_->sr_ = 0;
00154         int nsrc = 0;
00155         int nrr = 0;
00156         int len = RTCP_HDRSIZE;
00157         we_sent = 0;
00158         double fraction = 0.0;
00159         time_elapsed_ = NOW - last_time_report_;
00160         last_time_report_ = NOW;
00161         if (localsrc_->np() != last_np_) {
00162                 last_np_ = localsrc_->np();
00163                 we_sent = 1;
00164                 len += RTCP_SR_SIZE;
00165                 //add sender report
00166                 sender_report* sr;
00167                 //fill the report
00168                 sr = new sender_report;
00169                 sr->sender_srcid()= localsrc_->srcid();
00170                 sr->pkts_sent() = localsrc_->np();
00171                 sr->octets_sent() = localsrc_->nbytes();
00172                 sr->rcvrs_ = receivers_;
00173                 //store the report
00174                 rh_->sr_ = sr;
00175         }
00176         for (RTPSource* sp = allsrcs_; sp != 0; sp = sp->next) {
00177                 ++nsrc;
00178                 int received = sp->np() - sp->snp();
00179 
00180                 if (received == 0) {
00181 
00182                         continue;
00183                 }
00184                 if(localsrc_->srcid() != sp->srcid()) {
00185                 int bytes = received * (sp->ps());
00186                 rx_recv_ = ( 8 * (double)bytes)/time_elapsed_;
00187                 sp->snp(sp->np());
00188                 len += RTCP_RR_SIZE;
00189                 // calculate loss fraction since previous report
00190                 int expected_interval  = sp->ehsr() - last_ehsr_;
00191                 last_ehsr_ = sp->ehsr();
00192                 int lost_interval = expected_interval -  received;
00193 
00194 
00195                 if (lost_interval <= 0 || expected_interval == 0 ) {
00196                          fraction = 0;
00197 
00198                 } else fraction = ((double)lost_interval  / (double)expected_interval);
00199                 calculate_RTT();
00200                 // Update the R_tcp according to fraction value
00201                 if( sp->np() >= 1) {// I have already received RTP packets from the source
00202                         if(fraction == 0) {
00203                                 increase_rate(sp->ps());
00204 
00205                         }       else /*if(fraction != 0)*/ {
00206                                         measure_smooth_loss(fraction);
00207                                         calculateR_tcp(sp->ps());
00208                                 }
00209                 }
00210                 //add receiver report
00211                 receiver_report* rr;
00212                 rr = new receiver_report;
00213                 //fill the report
00214                 rr->cum_pkts_lost() = sp->cum_pkts_lost();
00215                 rr->LSR() = sp->LSR();
00216                 rr->DLSR()= NOW - sp->SRT();
00217                 rr->R_tcp() = tx_rate_;
00218                 rr->jitter() = sp->jitter();
00219                 if(bye) {
00220                 printf ("TIME for BYE %f \n", NOW);
00221                 rr->bye()= 1;
00222                 } else rr->bye() = 0;
00223                 rh_->rr_ = rr;
00224 
00225                 if (++nrr >= 31)
00226                         break;
00227                 } // end if
00228 
00229         }// end for loop
00230 
00231 
00232         if (bye) {
00233                 len += build_bye();
00234                 }
00235         else
00236                 len += build_sdes();
00237 
00238         Tcl::instance().evalf("%s adapt-timer %d %d %d", name(),
00239                               nsrc, nrr, we_sent);
00240         Tcl::instance().evalf("%s sample-size %d", name(), len);
00241 
00242         return (len);
00243 }
00244 
00248 int RTPSession::build_bye()
00249 {
00250         
00251         return (8);
00252 }
00256 int RTPSession::build_sdes()
00257 {
00258 
00259         /* XXX We'll get to this later... */
00260         return (20);
00261 }
00271 void RTPSession::recv(Packet* p, Handler*)
00272 {
00273 //      printf("time %f localsrc_ %d \n", Scheduler::instance().clock(),localsrc_->srcid());
00274 //      printf("time %f localsrc_ %d \n", NOW,localsrc_->srcid());
00275         hdr_cmn* mh = hdr_cmn::access(p);
00276         hdr_rtp* rh = hdr_rtp::access(p);
00277     
00278         RTPSource* s = lookup(rh->srcid());
00279 
00280         if (s == 0) {
00281                 if(rh->srcid()!=localsrc_->srcid()){
00282                         Tcl& tcl = Tcl::instance();
00283                         tcl.evalf("%s new-source %d", name(),rh->srcid());
00284                         s = (RTPSource*)TclObject::lookup(tcl.result());
00285                 }
00286         }
00287         
00288         if(rh->srcid()!=localsrc_->srcid())
00289         {
00290                 // calculate one-way trip time
00291 
00292                 T_one_way_ = NOW - rh->timestamp();
00293                 // count packet losses
00294                 int pkts_lost = 0;
00295                 int difference =  rh->seqno() - s->ehsr();  
00296                 if (difference <= 1) { // to deal with the first RTP packet in which s->ehrs() = -1
00297                         pkts_lost = 0;
00298                         
00299                 }
00300                 
00301                 if (difference > 1) {
00302                         pkts_lost = difference -1;
00303                         
00304                 }
00305                 // calculate delay jitter, see RFC 3550 Appendix A8
00306                 double transit = NOW - rh->timestamp();
00307                 double d = transit - s->transit();
00308                 s->transit(transit);
00309                 if (d < 0) d = -d;
00310                 s->jitter( s->jitter() + (1./16.) * (d - s->jitter()));
00311                 jitter_= s->jitter();
00312                 s->np(1); // counts the packets that I have received from sender s
00313                 s->cum_pkts_lost(pkts_lost);// counts the packets that I have lost from sender s
00314                 s->ehsr(rh->seqno());// the first ehsr = -1
00315                 s->nbytes(mh->size()); // counts the number of bytes that I have lost from sender s
00316                 s->ps(mh->size());// the packet size in bytes
00317                 
00318         }
00319 
00320         Packet::free(p);
00321 }
00322 
00328 void RTPSession::calculate_alpha(double value)
00329 {
00330         if( value == 0) {
00331                 alpha = 1;
00332         } else
00333         alpha = (value/T_one_way_) -1;
00334 }
00335 
00336 
00340 void RTPSession::calculate_RTT()
00341 {
00342         double temp=(1+alpha) * T_one_way_;
00343         RTT_ = temp * 0.1 + 0.9 * RTT_;   //taken from RFC 3448
00344 }
00345 
00352 void RTPSession::increase_rate(int ps)
00353 {
00354         tx_rate_ = tx_rate_ +  (double)ps/RTT_;
00355 
00356 }
00357 
00364  void RTPSession::measure_smooth_loss (double fraction)
00365 {
00366         
00367         double I_tot0 = 0;
00368         double I_tot1 = 0;
00369         double W_tot = 0;
00370         for (int i=0; i<7; i++) {
00371         pkt_loss_history[i+1]= pkt_loss_history[i];
00372         }
00373         pkt_loss_history[0] = fraction;
00374 
00375         for (int i=0; i<7; i++) {
00376         I_tot0 = I_tot0+ (pkt_loss_history[i] * weight[i]);
00377         W_tot = W_tot +  weight[i];
00378         }
00379 
00380         for (int i=1; i<8; i++) {
00381         I_tot1 = I_tot1 + (pkt_loss_history[i] * weight[i-1]);
00382         }
00383         double I_tot =0;
00384         if( I_tot0 > I_tot1) {
00385         I_tot = I_tot0;
00386         } else {
00387         I_tot = I_tot1;
00388         }
00389         double  I_mean = I_tot/W_tot;
00390 
00391         smooth_loss_= I_mean;
00392         
00393         
00394 
00395 }           
00396 
00401 void RTPSession::calculateR_tcp(int ps)
00402 {
00403 
00404         double min =0;
00405         double fraction = 3 * sqrt((3 * smooth_loss_) / 8);
00406         if(fraction <  1 ){
00407                 min = fraction;
00408         }       else min =1;
00409         // TCP friendly BW share equation
00410          tx_rate_ = (double)ps / (RTT_ *  (sqrt(2 * smooth_loss_ /3)) + 4 * (RTT_ * min * smooth_loss_) * (1 + (32 * pow(smooth_loss_,2))));
00411          printf("Time:%f Receiver:%d smooth_loss_:%f \n", NOW,localsrc_->srcid(),smooth_loss_);
00412 
00413         
00414 }
00415 
00425 void RTPSession::recv_ctrl(Packet* p)
00426 {
00427         hdr_cmn* mh = hdr_cmn::access(p);
00428         hdr_rtp* rh = hdr_rtp::access(p);
00429         u_int32_t local_src = localsrc_->srcid();
00430         
00431         if(rh->srcid() != local_src) {
00432         
00433                 if (rh->sr_ != 0) { // get the Sender report
00434                         double temp = 0.0;
00435                         RTPSource* source = lookup(rh->sr_->sender_srcid());
00436                         if (source != 0 && source->srcid()!= local_src) {
00437                                 source->LSR(rh->timestamp());
00438                                 source->SRT(Scheduler::instance().clock());
00439                                 for (RTPReceiver* p = rh->sr_->rcvrs_;   p != 0; p = p->next) {
00440  
00441                                         if(p->srcid() == localsrc_->srcid()) {
00442                                                 temp = p->eff_rtt();
00443                                                 calculate_alpha(temp);
00444                                         }
00445                                 }
00446                         }
00447                 }
00448 
00449                 if(localsrc_->is_sender()){ // get the Receiver report
00450                                  
00451                                 if(rh->rr_!= 0){
00452                                         
00453                                         RTPReceiver* s = lookup_rcv(rh->srcid());
00454                                                 if (s == 0 ) {
00455                                                         s = new RTPReceiver(rh->srcid());
00456                                                         enter_rcv(s);
00457                                                         return;
00458                                                         }
00459                                                  
00460                                                 if (s != 0) {
00461                                                         for (s = receivers_; s!= 0; s = s->next) {
00462                 
00463                                                                 if(rh->srcid() == s->srcid()) {
00464                                                                 double eff_rtt = 0.0;
00465                                                                 double alpha = Scheduler::instance().clock();
00466                                                                         if(rh->rr_->LSR()!=0) {
00467                                                                         eff_rtt = alpha - rh->rr_->LSR() - rh->rr_->DLSR();
00468                                                                         }
00469                                                                         s->eff_rtt(eff_rtt);
00470                                                                         s->rate(rh->rr_->R_tcp());
00471                                                                         if(rh->rr_->bye() == 1) remove_receiver(s);
00472                                                                 }
00473 
00474                                                         }// end for loop
00475 
00476                                                 } //end if
00477                                                 if(enableFlowControl_ == 1) {
00478                                                 update_rate();
00479                                                 }
00480                                         }
00481                                 }
00482                         }
00483 
00484         Tcl::instance().evalf("%s sample-size %d", name(), mh->size());
00485         Packet::free(p);
00486 }
00487 
00488 
00493 void RTPSession::update_rate()
00494 {
00495         RTPReceiver* s;
00496         for (s = receivers_; s!= 0; s = s->next) {
00497                 if(s->rate() !=0) {
00498                         lst.push_front(s->rate());
00499                 }
00500         }
00501         lst.sort();
00502         double inst_tx =lst.front();
00503         lst.clear();
00504         if(inst_tx != tx_rate_) {
00505                 if(inst_tx < (tx_rate_/2)) {
00506                         tx_rate_ = tx_rate_/2;
00507                 } else tx_rate_ = inst_tx;
00508         printf("Time:%f Sender:%d ...Rate change to:%f \n", NOW,localsrc_->srcid(),8*tx_rate_);
00509         Tcl::instance().evalf("%s session_bw %fb/s",name(),8*tx_rate_);
00510         Tcl::instance().evalf("%s transmit %fb/s",name(),8*tx_rate_);
00511         }
00512         
00513 }
00514 
00519 hdr_rtp* RTPSession::access_hdr_rtp()
00520 {
00521         return rh_;
00522 }
00523 
00530 RTPSource* RTPSession::lookup(u_int32_t srcid)
00531 {
00532         RTPSource *p;
00533         for (p = allsrcs_; p != 0; p = p->next)
00534         {       
00535                 if (p->srcid() == srcid)
00536                         return (p);
00537         }
00538         return (0);
00539 }
00540 
00547 RTPReceiver* RTPSession::lookup_rcv(u_int32_t srcid)
00548 {
00549 
00550         RTPReceiver *p;
00551         for (p = receivers_; p != 0; p = p->next)
00552         {
00553                 if (p->srcid() == srcid)
00554                         return (p);
00555         }
00556         return (0);
00557 }
00558 
00562 void RTPSession::print_rcv( )
00563 {
00564 
00565         RTPReceiver *p;
00566         for (p = receivers_; p != 0; p = p->next)
00567         {
00568             printf("Receiver:%d\n", p->srcid());
00569         }
00570 }
00571 
00576 void RTPSession::enter(RTPSource* s)
00577 {
00578         if(s->srcid() != localsrc_->srcid() ) {
00579         printf("Time:%f Receiver:%d ...I add sender %d \n", NOW,localsrc_->srcid(), s->srcid());
00580         s->next = allsrcs_;
00581         allsrcs_ = s;
00582         }
00583 }
00584 
00589 void RTPSession::enter_rcv(RTPReceiver* s)
00590 {
00591         printf("Time:%f Sender:%d ...I add receiver %d \n", NOW,localsrc_->srcid(), s->srcid());
00592         s->next = receivers_;
00593         receivers_ = s;
00594 }
00595 
00600 void RTPSession::remove_receiver(RTPReceiver* p)
00601 {
00602 
00603         printf("Time:%f Sender:%d ...I remove receiver %d \n", NOW,localsrc_->srcid(),p->srcid());
00604         RTPReceiver* s;
00605         RTPReceiver* q;
00606         if (receivers_==p)
00607         {
00608                 q = receivers_;
00609                 receivers_=receivers_->next;
00610                 delete q;
00611         }
00612         else
00613         {
00614                 for (s = receivers_; s!=0; s=s->next)
00615                 {
00616                         if (s->next==p)
00617                         {
00618                                 q = s->next;
00619                                 s->next=q->next;
00620                                 delete q; 
00621                         }       
00622                 }
00623         }
00624 
00625 }
00626 
00632 void RTPSession::initial_rate(double a)
00633 {
00634         tx_rate_ =(double)a/8;
00635         
00636         
00637 }
00638 
00643 int RTPSession::command(int argc, const char*const* argv)
00644 {
00645         if (argc == 3) {
00646                 if (strcmp(argv[1], "enter") == 0) {
00647                         RTPSource* s = (RTPSource*)TclObject::lookup(argv[2]);
00648                         enter(s);
00649                         return (TCL_OK);
00650                 }
00651                 if (strcmp(argv[1], "localsrc") == 0) {
00652                         localsrc_ = (RTPSource*)TclObject::lookup(argv[2]);
00653                         enter(localsrc_);
00654                         return (TCL_OK);
00655                 }
00656                 if (strcmp(argv[1], "rate") == 0) {
00657                         initial_rate(atoi(argv[2]));
00658                         return(TCL_OK);
00659                 }
00660         }
00661 
00662         return (TclObject::command(argc, argv));
00663 }
00664 
00669 RTPReceiver::RTPReceiver(u_int32_t srcid)
00670         : next(0),cum_pkts_lost_(0),eff_rtt_(0),rate_(0)
00671 {
00672         srcid_ = srcid;
00673 }
00674 
00679 RTPSource::RTPSource(u_int32_t srcid)
00680         : next(0),np_(0), snp_(0), ehsr_(-1), nbytes_(0),cum_pkts_lost_(0),LSR_(0),SRT_(0),is_sender_(false),ps_(0),rate_(0),transit_(0),jitter_(0)
00681 {
00682         bind("srcid_", (int*)&srcid_);
00683         srcid_ = srcid;
00684 }
00685 
00686 
00687 
00688 
00689 

Generated on Sat Mar 8 18:04:09 2008 for RTP by  doxygen 1.5.1