RINASim  October 2016
Documentation of framework for OMNeT++
RMT.cc
Go to the documentation of this file.
1 // The MIT License (MIT)
2 //
3 // Copyright (c) 2014-2016 Brno University of Technology, PRISTINE project
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22 
30 #include <RMT.h>
31 
32 // shared access to trace logger
33 #ifndef RMT_TRACING
34 #define RMT_TRACING
35 std::ofstream rmtTraceFile;
36 #endif
37 
39 
41 {
42  while (!invalidPDUs.empty())
43  {
44  delete invalidPDUs.front();
45  invalidPDUs.pop_front();
46  }
47 }
48 
49 
51 {
52  // set up features
53  relayOn = false;
54  onWire = false;
55  tracing = getParentModule()->par("pduTracing").boolValue();
56  if (tracing && !rmtTraceFile.is_open())
57  {
58  std::ostringstream filename;
59  filename << "results/" << getEnvir()->getConfigEx()->getActiveConfigName() << "-"
60  << getEnvir()->getConfigEx()->getActiveRunNumber() << ".tr";
61  rmtTraceFile.open(filename.str().c_str());
62 
63  if (!rmtTraceFile.is_open())
64  {
65  EV << "Couldn't create a trace file!" << endl;
66  tracing = false;
67  }
68  }
69 
70  efcpiIn[0] = gateHalf("ribdIo", cGate::INPUT);
71  efcpiOut[0] = gateHalf("ribdIo", cGate::OUTPUT);
72 
73  // get pointers to other components
74  fwd = getRINAModule<IntPDUForwarding*>(this, 1, {MOD_POL_RMT_PDUFWD});
75  rmtAllocator = getRINAModule<RMTModuleAllocator*>(this, 1, {MOD_RMTALLOC});
76  schedPolicy = getRINAModule<RMTSchedulingBase*>(this, 1, {MOD_POL_RMT_SCHEDULER});
77  maxQPolicy = getRINAModule<RMTMaxQBase*>(this, 1, {MOD_POL_RMT_MAXQ});
78  qMonPolicy = getRINAModule<RMTQMonitorBase*>(this, 1, {MOD_POL_RMT_QMONITOR});
79  qAllocPolicy = getRINAModule<QueueAllocBase*>(this, 2, {MOD_RESALLOC, MOD_POL_RA_QUEUEALLOC});
80  queueIdGenerator = getRINAModule<QueueIDGenBase*>(this, 2, {MOD_RESALLOC, MOD_POL_RA_IDGENERATOR});
81  addrComparator = getRINAModule<AddressComparatorBase*>(this, 2, {MOD_RESALLOC, MOD_POL_RA_ADDRCOMPARATOR});
82 
83  // register a signal for notifying others about a missing local EFCP instance
84  sigRMTNoConnID = registerSignal(SIG_RMT_NoConnId);
85 
86  // register a signal for notifying others about a packet bit error
88 
89  // listen for a signal indicating that a new message is to arrive into a queue
91  getParentModule()->subscribe(SIG_RMT_QueuePDUPreRcvd, lisRMTQueuePDUPreRcvd);
92 
93  // listen for a signal indicating that a new message has arrived into a queue
95  getParentModule()->subscribe(SIG_RMT_QueuePDUPostRcvd, lisRMTQueuePDUPostRcvd);
96 
97  // listen for a signal indicating that a message is leaving a queue
99  getParentModule()->subscribe(SIG_RMT_QueuePDUPreSend, lisRMTQueuePDUPreSend);
100 
101  // listen for a signal indicating that a message has left a queue
103  getParentModule()->subscribe(SIG_RMT_QueuePDUSent, lisRMTQueuePDUSent);
104 
105  // listen for a signal indicating that a port is ready to serve
107  getParentModule()->subscribe(SIG_RMT_PortReadyToServe, lisRMTPortReadyToServe);
108 
109  // listen for a signal indicating that a port is ready to be read from
111  getParentModule()->subscribe(SIG_RMT_PortReadyForRead, lisRMTPortReadyForRead);
112 
113  WATCH(relayOn);
114  WATCH(onWire);
115 }
116 
121 {
122  size_t pduCount = invalidPDUs.size();
123  if (pduCount)
124  {
125  EV << "RMT " << this->getFullPath() << " still contains " << pduCount
126  << " unprocessed PDUs!" << endl;
127 
128  for (auto const& m : invalidPDUs)
129  {
130  EV << m->getClassName() << " received at " << m->getArrivalTime()
131  << " from " << m->getSenderModule()->getFullPath() << endl;
132  }
133  }
134 
135  if (rmtTraceFile.is_open())
136  {
137  rmtTraceFile.flush();
138  rmtTraceFile.close();
139  }
140 }
141 
148 void RMT::tracePDUEvent(const cPacket* pkt, TraceEventType eventType)
149 {
150  const PDU* pdu = dynamic_cast<const PDU*>(pkt);
151  if (pdu == nullptr)
152  {
153  return;
154  }
155 
156  std::ostringstream flowID;
157  flowID << pdu->getConnId().getSrcCepId() << pdu->getConnId().getDstCepId()
158  << pdu->getConnId().getQoSId();
159 
160  std::string flags = std::bitset<8>(pdu->getFlags()).to_string().c_str();
161 
162  rmtTraceFile << char(eventType) << " "
163  << simTime() << " "
164  << getModuleByPath("^.^.^.")->getFullName() << " "
165  << getModuleByPath("^.^.")->getFullName() << " "
166  << pdu->getClassName() << " "
167  << pdu->getBitLength() << " "
168  << flags << " "
169  << flowID.str().c_str() << " "
170  << pdu->getDstAddr().getDifName() << " "
171  << pdu->getSrcAddr().getIpcAddress() << " "
172  << pdu->getDstAddr().getIpcAddress() << " "
173  << pdu->getSeqNum() << " "
174  << pdu->getId()
175  << endl;
176 }
177 
183 void RMT::preQueueArrival(cObject* obj)
184 {
185  Enter_Method("preQueueArrival()");
186  RMTQueue* queue = check_and_cast<RMTQueue*>(obj);
187 
188  // invoke monitor policy
189  qMonPolicy->prePDUInsertion(queue);
190 }
191 
197 void RMT::recDel(cPacket * p) {
198  cPacket * enc = p->decapsulate();
199  if(enc != nullptr) { recDel(enc); }
200  delete p;
201 }
202 
203 void RMT::postQueueArrival(cObject* obj)
204 {
205  Enter_Method("onQueueArrival()");
206 
207  RMTQueue* queue = check_and_cast<RMTQueue*>(obj);
209 
210  if (tracing)
211  {
212  if (queue->getType() == RMTQueue::INPUT)
213  {
215  }
217  }
218 
219  // detection of channel-induced bit error
220  if (queue->getLastPDU()->hasBitError())
221  {
222  EV << "PDU arriving on " << port->getParentModule()->getFullName()
223  << " contains one or more bit errors! Dropping." << endl;
224  if (tracing)
225  {
226  tracePDUEvent(queue->getLastPDU(), MSG_DROP);
227  }
228  emit(sigRMTPacketError, obj);
229  recDel( queue->dropLast() );
230  return;
231  }
232 
233  // invoke monitor policy
235 
236  // invoke maxQueue policy if applicable
237  if (queue->getLength() >= queue->getThreshLength())
238  {
239  // if the PDU has to be dropped, finish it here
240  if (maxQPolicy->run(queue))
241  {
242  if (tracing)
243  {
244  tracePDUEvent(queue->getLastPDU(), MSG_DROP);
245  }
246  const auto dropped = queue->dropLast();
247  qMonPolicy->onMessageDrop(queue, dropped);
248  recDel( dropped );
249  return;
250  }
251  }
252 
253  port->addWaiting(queue->getType());
254  schedPolicy->processQueues(port, queue->getType());
255 }
256 
262 void RMT::preQueueDeparture(cObject* obj)
263 {
264  Enter_Method("preQueueDeparture()");
265  RMTQueue* queue = check_and_cast<RMTQueue*>(obj);
266 
267  qMonPolicy->prePDURelease(queue);
268 
269  if (tracing)
270  {
272  if (queue->getType() == RMTQueue::OUTPUT)
273  {
275  }
276  }
277 }
278 
284 void RMT::postQueueDeparture(cObject* obj)
285 {
286  Enter_Method("postQueueDeparture()");
287  RMTQueue* queue = check_and_cast<RMTQueue*>(obj);
288  qMonPolicy->postPDURelease(queue);
289 
291  port->substractWaiting(queue->getType());
292 
293  // notify MaxQ in case the queue length just went back under its threshold
294  if (queue->getLength() == (queue->getThreshLength() - 1))
295  {
297  }
298 
299  // if this is an incoming PDU, take care of scheduler reinvocation
300  if (queue->getType() == RMTQueue::INPUT)
301  {
302  port->scheduleNextRead();
303  }
304  else
305  { // if this is an outgoing PDU, set the port as busy
306  port->setOutputBusy();
307  }
308 }
309 
315 void RMT::writeToPort(cObject* obj)
316 {
317  Enter_Method_Silent("writeToPort()");
318  RMTPort* port = check_and_cast<RMTPort*>(obj);
320 }
321 
327 void RMT::readFromPort(cObject* obj)
328 {
329  Enter_Method_Silent("readFromPort()");
330  RMTPort* port = check_and_cast<RMTPort*>(obj);
332 }
333 
339 void RMT::createEfcpiGate(unsigned int efcpiId)
340 {
341  if (efcpiOut.count(efcpiId))
342  {
343  return;
344  }
345 
346  cModule* rmtModule = getParentModule();
347 
348  std::ostringstream gateName_str;
349  gateName_str << GATE_EFCPIO_ << efcpiId;
350 
351  this->addGate(gateName_str.str().c_str(), cGate::INOUT, false);
352  cGate* rmtIn = this->gateHalf(gateName_str.str().c_str(), cGate::INPUT);
353  cGate* rmtOut = this->gateHalf(gateName_str.str().c_str(), cGate::OUTPUT);
354 
355  rmtModule->addGate(gateName_str.str().c_str(), cGate::INOUT, false);
356  cGate* rmtModuleIn = rmtModule->gateHalf(gateName_str.str().c_str(), cGate::INPUT);
357  cGate* rmtModuleOut = rmtModule->gateHalf(gateName_str.str().c_str(), cGate::OUTPUT);
358 
359  rmtModuleIn->connectTo(rmtIn);
360  rmtOut->connectTo(rmtModuleOut);
361 
362  efcpiOut[efcpiId] = rmtOut;
363  efcpiIn[efcpiId] = rmtIn;
364 
365  // RMT<->EFCP interconnection shall be done by the FAI
366 }
367 
373 void RMT::deleteEfcpiGate(unsigned int efcpiId)
374 {
375  if (!efcpiOut.count(efcpiId))
376  {
377  return;
378  }
379 
380  cModule* rmtModule = getParentModule();
381 
382  std::ostringstream gateName_str;
383  gateName_str << GATE_EFCPIO_ << efcpiId;
384 
385  cGate* rmtOut = this->gateHalf(gateName_str.str().c_str(), cGate::OUTPUT);
386  cGate* rmtModuleIn = rmtModule->gateHalf(gateName_str.str().c_str(), cGate::INPUT);
387 
388  rmtOut->disconnect();
389  rmtModuleIn->disconnect();
390 
391  this->deleteGate(gateName_str.str().c_str());
392  rmtModule->deleteGate(gateName_str.str().c_str());
393 
394  efcpiOut.erase(efcpiId);
395  efcpiIn.erase(efcpiId);
396 }
397 
404 std::vector<RMTPort*> RMT::fwTableLookup(const PDU * pdu)
405 {
406  RMTPorts ports;
407 
408  if (onWire)
409  { // get the interface port
410  ports.push_back(rmtAllocator->getInterfacePort());
411  }
412  else
413  { // get output ports from PDUFT
414  ports = fwd->lookup(pdu);
415  }
416 
417  return ports;
418 }
419 
426 {
427  auto outPorts = fwTableLookup(pdu);
428 
429  if (outPorts.empty())
430  {
431  EV << "!!! Empty PDUForwarding policy lookup result!" << endl
432  << "At " << getParentModule()->getParentModule()->par("ipcAddress").stdstringValue() << endl
433  << "PDU dstAddr = " << pdu->getDstAddr().getApn().getName()
434  << ", qosId = " << pdu->getConnId().getQoSId() << endl
435  << "PDUForwarding contents: " << endl << fwd->toString() << endl;
436 
437  std::cout << "!!! Empty PDUForwarding policy lookup result!" << endl
438  << "At " << getParentModule()->getParentModule()->par("ipcAddress").stdstringValue() << endl
439  << "PDU dstAddr = " << pdu->getDstAddr().getApn().getName()
440  << ", qosId = " << pdu->getConnId().getQoSId() << endl
441  << "PDUForwarding contents: " << endl << fwd->toString() << endl;
442 
443  invalidPDUs.push_back(pdu);
444  }
445 
446  PDU* outPDU = nullptr;
447 
448  for (auto it = outPorts.begin(); it != outPorts.end(); ++it)
449  {
450  RMTPort* port = *it;
451 
452  if (it != (outPorts.end() - 1))
453  { // clone the message if sending via multiple ports
454  outPDU = pdu->dup();
455  }
456  else
457  {
458  outPDU = pdu;
459  }
460 
461  const std::string& id = queueIdGenerator->generateOutputQueueID(pdu);
462  RMTQueue* outQueue = port->getQueueById(RMTQueue::OUTPUT, id.c_str());
463 
464  if (outQueue != nullptr)
465  {
466  cGate* outGate = outQueue->getRMTAccessGate();
467  send(outPDU, outGate);
468  }
469  else
470  {
471  EV << "Queue with ID \"" << id << "\" doesn't exist!" << endl;
472  }
473  }
474 }
475 
482 {
483  int cepId = pdu->getConnId().getDstCepId();
484 
485  if(cepId < 0) { delete pdu; return; }
486 
487  cGate* efcpiGate = efcpiOut[cepId];
488  if (efcpiGate != nullptr)
489  {
490  send(pdu, efcpiGate);
491  }
492  else
493  {
494  std::cout << "WTF " << cepId << endl;
495  EV << this->getFullPath() << ": EFCPI " << cepId
496  << " isn't present on this system! Notifying other modules." << endl;
497  emit(sigRMTNoConnID, pdu);
498  invalidPDUs.push_back(pdu);
499 
500  }
501 }
502 
510 void RMT::processMessage(cMessage* msg)
511 {
512  PDU* pdu = dynamic_cast<PDU*>(msg);
513 
514  if (pdu != nullptr)
515  { // PDU arrival
516  cModule* senderModule = msg->getArrivalGate()->getPathStartGate()->getOwnerModule();
517 
518  if (dynamic_cast<RMTQueue*>(senderModule) != nullptr)
519  { // message from a port
520  if (addrComparator->matchesThisIPC(pdu->getDstAddr(), pdu) )
521  {
522  relayPDUToEFCPI(pdu);
523  }
524  else if (relayOn)
525  {
526  relayPDUToPort(pdu);
527  }
528  else
529  {
530  EV << getFullPath() << " This PDU isn't for me! Holding it here." << endl;
531  }
532  }
533  else
534  { // message from an EFCPI
535  if (addrComparator->matchesThisIPC(pdu->getDstAddr(), pdu) )
536  {
537  relayPDUToEFCPI(pdu);
538  }
539  else
540  {
541  relayPDUToPort(pdu);
542  }
543  }
544  }
545  else
546  {
547  EV << this->getFullPath() << " message type not supported" << endl;
548  invalidPDUs.push_back(msg);
549  }
550 }
551 
552 void RMT::handleMessage(cMessage *msg)
553 {
554  if (msg->isSelfMessage())
555  {
556  // ?
557  invalidPDUs.push_back(msg);
558  }
559  else
560  {
561  processMessage(msg);
562  }
563 }
564 
const DAP & getDifName() const
Getter of common DIF name.
Definition: Address.cc:73
LisRMTPortReadyForRead * lisRMTPortReadyForRead
Definition: RMT.h:126
const char * SIG_RMT_QueuePDUPreRcvd
Definition: RINASignals.cc:131
Definition: RMTListeners.h:49
const char * MOD_RESALLOC
Definition: ExternConsts.cc:58
RMTSchedulingBase * schedPolicy
Definition: RMT.h:97
queueType getType() const
Definition: RMTQueue.cc:241
virtual std::vector< RMTPort * > lookup(const PDU *pdu)=0
void relayPDUToEFCPI(PDU *msg)
Definition: RMT.cc:481
LisRMTQueuePDUPreRcvd * lisRMTQueuePDUPreRcvd
Definition: RMT.h:121
virtual void preQueueArrival(cObject *obj)
Definition: RMT.cc:183
EfcpiMapping efcpiIn
Definition: RMT.h:93
LisRMTPortReadyToServe * lisRMTPortReadyToServe
Definition: RMT.h:125
virtual void prePDURelease(RMTQueue *queue)
virtual bool matchesThisIPC(const Address &addr, PDU *pdu)
void recDel(cPacket *p)
Definition: RMT.cc:197
RMTMaxQBase * maxQPolicy
Definition: RMT.h:96
std::string getQoSId() const
Getter of selected QoS-cube identifier.
Definition: ConnectionId.cc:44
std::ofstream rmtTraceFile
Definition: RMT.cc:35
const char * SIG_RMT_QueuePDUPreSend
Definition: RINASignals.cc:133
const APN & getIpcAddress() const
Getter of IPC Process address which should be unambiguous within DIF.
Definition: Address.cc:83
std::vector< RMTPort * > RMTPorts
Definition: RMTPort.h:234
const cPacket * getFirstPDU() const
Definition: RMTQueue.cc:271
RMTModuleAllocator * rmtAllocator
Definition: RMT.h:87
const char * MOD_POL_RA_QUEUEALLOC
Definition: ExternConsts.cc:71
virtual PDU * dup() const
Definition: PDU.h:51
TraceEventType
Definition: RMT.h:103
virtual void initialize()
Definition: RMT.cc:50
virtual void postPDUInsertion(RMTQueue *queue)
const char * SIG_RMT_PortReadyToServe
Definition: RINASignals.cc:135
void scheduleNextRead()
Definition: RMTPort.cc:308
EfcpiMapping efcpiOut
Definition: RMT.h:92
int getSrcCepId() const
Getter of source Connection-Endpoint identifier.
Definition: ConnectionId.cc:54
virtual ~RMT()
Definition: RMT.cc:40
virtual ConnectionId & getConnId()
Definition: PDU_m.cc:336
IntPDUForwarding * fwd
Definition: RMT.h:86
void deleteEfcpiGate(unsigned int efcpiId)
Definition: RMT.cc:373
virtual void finish()
Definition: RMT.cc:120
const char * MOD_RMTALLOC
Definition: ExternConsts.cc:63
QueueAllocBase * qAllocPolicy
Definition: RMT.h:99
RMTPort * getQueueToPortMapping(RMTQueue *queue)
const char * MOD_POL_RMT_SCHEDULER
Definition: ExternConsts.cc:75
Definition: PDU.h:42
const char * MOD_POL_RMT_QMONITOR
Definition: ExternConsts.cc:74
const char * SIG_RMT_PortReadyForRead
Definition: RINASignals.cc:136
virtual void prePDUInsertion(RMTQueue *queue)
cPacket * dropLast()
Definition: RMTQueue.cc:185
bool tracing
Definition: RMT.h:108
virtual std::string generateOutputQueueID(PDU *pdu)
const char * SIG_RMT_ErrornousPacket
Definition: RINASignals.cc:129
void tracePDUEvent(const cPacket *pkt, TraceEventType eventType)
Definition: RMT.cc:148
const char * SIG_RMT_QueuePDUSent
Definition: RINASignals.cc:134
Define_Module(RMT)
int getLength() const
Definition: RMTQueue.cc:210
const cPacket * getLastPDU() const
Definition: RMTQueue.cc:276
bool onWire
Definition: RMT.h:90
simsignal_t sigRMTPacketError
Definition: RMT.h:120
virtual unsigned int getSeqNum() const
Definition: PDU_m.cc:376
virtual Address & getDstAddr()
Definition: PDU_m.cc:306
virtual void postQueueArrival(cObject *obj)
Definition: RMT.cc:203
virtual void handleMessage(cMessage *msg)
Definition: RMT.cc:552
Relaying and Multiplexing Task .
simsignal_t sigRMTNoConnID
Definition: RMT.h:119
int getDstCepId() const
Getter of destination Connection-Endpoint identifier.
Definition: ConnectionId.cc:34
virtual Address & getSrcAddr()
Definition: PDU_m.cc:296
virtual void postQueueDeparture(cObject *obj)
Definition: RMT.cc:284
virtual std::string toString()=0
QueueIDGenBase * queueIdGenerator
Definition: RMT.h:100
int getThreshLength() const
Definition: RMTQueue.cc:226
const char * MOD_POL_RA_IDGENERATOR
Definition: ExternConsts.cc:69
LisRMTQueuePDUPreSend * lisRMTQueuePDUPreSend
Definition: RMT.h:123
Definition: RMT.h:60
LisRMTQueuePDUSent * lisRMTQueuePDUSent
Definition: RMT.h:124
virtual void preQueueDeparture(cObject *obj)
Definition: RMT.cc:262
AddressComparatorBase * addrComparator
Definition: RMT.h:101
const char * SIG_RMT_QueuePDUPostRcvd
Definition: RINASignals.cc:132
const char * SIG_RMT_NoConnId
Definition: RINASignals.cc:128
std::vector< RMTPort * > fwTableLookup(const PDU *pdu)
Definition: RMT.cc:404
void createEfcpiGate(unsigned int efcpiId)
Definition: RMT.cc:339
void substractWaiting(RMTQueueType direction)
Definition: RMTPort.cc:325
LisRMTQueuePDUPostRcvd * lisRMTQueuePDUPostRcvd
Definition: RMT.h:122
RMTQMonitorBase * qMonPolicy
Definition: RMT.h:95
virtual bool run(RMTQueue *queue)
Definition: RMTMaxQBase.cc:52
virtual void processQueues(RMTPort *port, RMTQueueType direction)
void processMessage(cMessage *msg)
Definition: RMT.cc:510
RMTQueue * getQueueById(RMTQueueType type, const char *queueId) const
Definition: RMTPort.cc:244
const APN & getApn() const
Getter of unique APN which is initialized during object construction.
Definition: Address.cc:119
virtual int getFlags() const
Definition: PDU_m.cc:356
virtual void writeToPort(cObject *obj)
Definition: RMT.cc:315
bool relayOn
Definition: RMT.h:89
const char * GATE_EFCPIO_
virtual void onQueueLengthDrop(RMTQueue *queue)
Definition: RMTMaxQBase.cc:57
const std::string & getName() const
Gets APN string name representation.
Definition: APN.cc:40
const char * MOD_POL_RA_ADDRCOMPARATOR
Definition: ExternConsts.cc:68
void setOutputBusy()
Definition: RMTPort.cc:276
void relayPDUToPort(PDU *msg)
Definition: RMT.cc:425
const char * MOD_POL_RMT_PDUFWD
Definition: ExternConsts.cc:73
virtual void postPDURelease(RMTQueue *queue)
const char * MOD_POL_RMT_MAXQ
Definition: ExternConsts.cc:72
virtual void onMessageDrop(RMTQueue *queue, const cPacket *pdu)
cGate * getRMTAccessGate() const
Definition: RMTQueue.cc:251
std::deque< cMessage * > invalidPDUs
Definition: RMT.h:115
virtual void readFromPort(cObject *obj)
Definition: RMT.cc:327