00001
00035 #include <stdlib.h>
00036 #include "config.h"
00037 #include "agent.h"
00038 #include "random.h"
00039 #include "rtp.h"
00040 #define INITIAL_TIMEOUT 10
00041 #define TIMEOUT 10
00042
00043
00044 int hdr_rtp::offset_;
00046 class RTPHeaderClass : public PacketHeaderClass {
00047 public:
00048 RTPHeaderClass() : PacketHeaderClass("PacketHeader/RTP",
00049 sizeof(hdr_rtp)) {
00050 bind_offset(&hdr_rtp::offset_);
00051 }
00052 } class_rtphdr;
00056 static class RTPAgentClass : public TclClass {
00057 public:
00058 RTPAgentClass() : TclClass("Agent/RTP") {}
00059 TclObject* create(int, const char*const*) {
00060 return (new RTPAgent());
00061
00062 }
00063 } class_rtp_agent;
00064
00068 RTPAgent::RTPAgent() : Agent(PT_RTP), session_(0), lastpkttime_(-1e6),
00069 running_(0), rtp_timer_(this),join_timer_(this)
00070 {
00071 bind("seqno_", &seqno_);
00072 bind_time("interval_", &interval_);
00073 bind("packetSize_", &size_);
00074 bind("maxpkts_", &maxpkts_);
00075 bind("random_", &random_);
00076 timestamp_ = 0;
00077 join_flag_ = 0;
00078
00079
00080 }
00082 void RTPAgent::start()
00083 {
00084 running_ = 1;
00085 join_timer_.resched(INITIAL_TIMEOUT);
00086 sendpkt();
00087 rtp_timer_.resched(interval_);
00088 if (session_) {
00089 session_->start_timer();
00090 }
00091 }
00092
00094 void RTPAgent::stop()
00095 {
00096 session_->stop_timer();
00097 rtp_timer_.force_cancel();
00098
00099 finish();
00100 }
00101
00103 void RTPAgent::sendmsg(int nbytes, const char* )
00104 {
00105 Packet *p;
00106 int n;
00107
00108 assert (size_ > 0);
00109
00110 if (++seqno_ < maxpkts_) {
00111 n = nbytes / size_;
00112
00113 if (nbytes == -1) {
00114 start();
00115 return;
00116 }
00117 while (n-- > 0) {
00118 p = allocpkt();
00119 hdr_rtp* rh = hdr_rtp::access(p);
00120 rh->seqno() = seqno_;
00121 target_->recv(p);
00122 }
00123 n = nbytes % size_;
00124 if (n > 0) {
00125 p = allocpkt();
00126 hdr_rtp* rh = hdr_rtp::access(p);
00127 rh->seqno() = seqno_;
00128 target_->recv(p);
00129 }
00130 idle();
00131 } else {
00132 finish();
00133
00134 };
00135 }
00136
00141 void RTPAgent::timeout(int)
00142 {
00143 if (running_) {
00144 sendpkt();
00145 if (session_)
00146 session_->localsrc_update(size_);
00147 session_->localsrc_update_nbytes(size_);
00148
00149
00150 double t = interval_;
00151 if (random_)
00152
00153 t += interval_ * Random::uniform(-0.5, 0.5);
00154 rtp_timer_.resched(t);
00155 }
00156 }
00157
00158 void RTPAgent::join_timeout(int)
00159 {
00160 if (running_) {
00161 if(join_flag_ == 0) join_flag_ = 1;
00162 join_timer_.resched(TIMEOUT);
00163 }
00164 }
00165
00166
00171 void RTPAgent::finish()
00172 {
00173 running_ = 0;
00174 Tcl::instance().evalf("%s done", this->name());
00175 }
00176
00181 void RTPAgent::advanceby(int delta)
00182 {
00183 maxpkts_ += delta;
00184 if (seqno_ < maxpkts_ && !running_)
00185 start();
00186 }
00187
00192 void RTPAgent::recv(Packet* p, Handler*)
00193 {
00194 if (session_)
00195 session_->recv(p, 0);
00196 else
00197 Packet::free(p);
00198 }
00199
00200 int RTPAgent::command(int argc, const char*const* argv)
00201 {
00202 if (argc == 2) {
00203 if (strcmp(argv[1], "rate-change") == 0) {
00204 rate_change();
00205 return (TCL_OK);
00206 } else if (strcmp(argv[1], "start") == 0) {
00207 start();
00208 return (TCL_OK);
00209 } else if (strcmp(argv[1], "stop") == 0) {
00210 stop();
00211 return (TCL_OK);
00212 }
00213 } else if (argc == 3) {
00214 if (strcmp(argv[1], "session") == 0) {
00215 session_ = (RTPSession*)TclObject::lookup(argv[2]);
00216 return (TCL_OK);
00217 } else if (strcmp(argv[1], "advance") == 0) {
00218 int newseq = atoi(argv[2]);
00219 advanceby(newseq - seqno_);
00220 return (TCL_OK);
00221 } else if (strcmp(argv[1], "advanceby") == 0) {
00222 advanceby(atoi(argv[2]));
00223 return (TCL_OK);
00224 }
00225 }
00226 return (Agent::command(argc, argv));
00227 }
00228
00235 void RTPAgent::rate_change()
00236 {
00237 rtp_timer_.force_cancel();
00238
00239 double t = lastpkttime_ + interval_;
00240
00241 double now = Scheduler::instance().clock();
00242 if ( t > now)
00243 rtp_timer_.resched(t - now);
00244 else {
00245 sendpkt();
00246 rtp_timer_.resched(interval_);
00247 }
00248 }
00250 void RTPAgent::sendpkt()
00251 {
00252 Packet* p = allocpkt();
00253 lastpkttime_ = Scheduler::instance().clock();
00254 timestamp_ = lastpkttime_;
00255 makepkt(p);
00256 target_->recv(p, (Handler*)0);
00257 }
00259 void RTPAgent::makepkt(Packet* p)
00260 {
00261
00262 hdr_rtp *rh = hdr_rtp::access(p);
00263
00264 rh->seqno() = seqno_++;
00265 rh->srcid() = session_ ? session_->srcid() : 0;
00266 rh->timestamp()= timestamp_;
00267 if(join_flag_ == 1 ) {
00268 rh->T_epoch() = 1;
00269 join_flag_ = 0;
00270 } else rh->T_epoch() = 0;
00271
00272
00273 }
00274
00275 void RTPTimer::expire(Event* ) {
00276 a_->timeout(0);
00277 }
00278
00279 void JoinTimer::expire(Event* ) {
00280 a_->join_timeout(0);
00281 }
00282
00283
00284
00285
00286
00287
00288