55 tracing = getParentModule()->par(
"pduTracing").boolValue();
58 std::ostringstream filename;
59 filename <<
"results/" << getEnvir()->getConfigEx()->getActiveConfigName() <<
"-"
60 << getEnvir()->getConfigEx()->getActiveRunNumber() <<
".tr";
65 EV <<
"Couldn't create a trace file!" << endl;
70 efcpiIn[0] = gateHalf(
"ribdIo", cGate::INPUT);
71 efcpiOut[0] = gateHalf(
"ribdIo", cGate::OUTPUT);
125 EV <<
"RMT " << this->getFullPath() <<
" still contains " << pduCount
126 <<
" unprocessed PDUs!" << endl;
130 EV << m->getClassName() <<
" received at " << m->getArrivalTime()
131 <<
" from " << m->getSenderModule()->getFullPath() << endl;
150 const PDU* pdu =
dynamic_cast<const PDU*
>(pkt);
156 std::ostringstream flowID;
160 std::string flags = std::bitset<8>(pdu->
getFlags()).to_string().c_str();
164 << getModuleByPath(
"^.^.^.")->getFullName() <<
" "
165 << getModuleByPath(
"^.^.")->getFullName() <<
" "
166 << pdu->getClassName() <<
" "
167 << pdu->getBitLength() <<
" "
169 << flowID.str().c_str() <<
" "
185 Enter_Method(
"preQueueArrival()");
198 cPacket * enc = p->decapsulate();
199 if(enc !=
nullptr) {
recDel(enc); }
205 Enter_Method(
"onQueueArrival()");
222 EV <<
"PDU arriving on " << port->getParentModule()->getFullName()
223 <<
" contains one or more bit errors! Dropping." << endl;
246 const auto dropped = queue->
dropLast();
253 port->addWaiting(queue->
getType());
264 Enter_Method(
"preQueueDeparture()");
286 Enter_Method(
"postQueueDeparture()");
317 Enter_Method_Silent(
"writeToPort()");
329 Enter_Method_Silent(
"readFromPort()");
346 cModule* rmtModule = getParentModule();
348 std::ostringstream gateName_str;
351 this->addGate(gateName_str.str().c_str(), cGate::INOUT,
false);
352 cGate* rmtIn = this->gateHalf(gateName_str.str().c_str(), cGate::INPUT);
353 cGate* rmtOut = this->gateHalf(gateName_str.str().c_str(), cGate::OUTPUT);
355 rmtModule->addGate(gateName_str.str().c_str(), cGate::INOUT,
false);
356 cGate* rmtModuleIn = rmtModule->gateHalf(gateName_str.str().c_str(), cGate::INPUT);
357 cGate* rmtModuleOut = rmtModule->gateHalf(gateName_str.str().c_str(), cGate::OUTPUT);
359 rmtModuleIn->connectTo(rmtIn);
360 rmtOut->connectTo(rmtModuleOut);
380 cModule* rmtModule = getParentModule();
382 std::ostringstream gateName_str;
385 cGate* rmtOut = this->gateHalf(gateName_str.str().c_str(), cGate::OUTPUT);
386 cGate* rmtModuleIn = rmtModule->gateHalf(gateName_str.str().c_str(), cGate::INPUT);
388 rmtOut->disconnect();
389 rmtModuleIn->disconnect();
391 this->deleteGate(gateName_str.str().c_str());
392 rmtModule->deleteGate(gateName_str.str().c_str());
429 if (outPorts.empty())
431 EV <<
"!!! Empty PDUForwarding policy lookup result!" << endl
432 <<
"At " << getParentModule()->getParentModule()->par(
"ipcAddress").stdstringValue() << endl
435 <<
"PDUForwarding contents: " << endl <<
fwd->
toString() << endl;
437 std::cout <<
"!!! Empty PDUForwarding policy lookup result!" << endl
438 <<
"At " << getParentModule()->getParentModule()->par(
"ipcAddress").stdstringValue() << endl
441 <<
"PDUForwarding contents: " << endl <<
fwd->
toString() << endl;
446 PDU* outPDU =
nullptr;
448 for (
auto it = outPorts.begin(); it != outPorts.end(); ++it)
452 if (it != (outPorts.end() - 1))
464 if (outQueue !=
nullptr)
467 send(outPDU, outGate);
471 EV <<
"Queue with ID \"" <<
id <<
"\" doesn't exist!" << endl;
485 if(cepId < 0) {
delete pdu;
return; }
488 if (efcpiGate !=
nullptr)
490 send(pdu, efcpiGate);
494 std::cout <<
"WTF " << cepId << endl;
495 EV << this->getFullPath() <<
": EFCPI " << cepId
496 <<
" isn't present on this system! Notifying other modules." << endl;
512 PDU* pdu =
dynamic_cast<PDU*
>(msg);
516 cModule* senderModule = msg->getArrivalGate()->getPathStartGate()->getOwnerModule();
518 if (dynamic_cast<RMTQueue*>(senderModule) !=
nullptr)
530 EV << getFullPath() <<
" This PDU isn't for me! Holding it here." << endl;
547 EV << this->getFullPath() <<
" message type not supported" << endl;
554 if (msg->isSelfMessage())
const DAP & getDifName() const
Getter of common DIF name.
LisRMTPortReadyForRead * lisRMTPortReadyForRead
const char * SIG_RMT_QueuePDUPreRcvd
const char * MOD_RESALLOC
RMTSchedulingBase * schedPolicy
queueType getType() const
virtual std::vector< RMTPort * > lookup(const PDU *pdu)=0
void relayPDUToEFCPI(PDU *msg)
LisRMTQueuePDUPreRcvd * lisRMTQueuePDUPreRcvd
virtual void preQueueArrival(cObject *obj)
LisRMTPortReadyToServe * lisRMTPortReadyToServe
virtual void prePDURelease(RMTQueue *queue)
virtual bool matchesThisIPC(const Address &addr, PDU *pdu)
std::string getQoSId() const
Getter of selected QoS-cube identifier.
std::ofstream rmtTraceFile
const char * SIG_RMT_QueuePDUPreSend
const APN & getIpcAddress() const
Getter of IPC Process address which should be unambiguous within DIF.
std::vector< RMTPort * > RMTPorts
const cPacket * getFirstPDU() const
RMTModuleAllocator * rmtAllocator
const char * MOD_POL_RA_QUEUEALLOC
virtual PDU * dup() const
virtual void initialize()
virtual void postPDUInsertion(RMTQueue *queue)
const char * SIG_RMT_PortReadyToServe
int getSrcCepId() const
Getter of source Connection-Endpoint identifier.
virtual ConnectionId & getConnId()
void deleteEfcpiGate(unsigned int efcpiId)
RMTPort * getInterfacePort()
const char * MOD_RMTALLOC
QueueAllocBase * qAllocPolicy
RMTPort * getQueueToPortMapping(RMTQueue *queue)
const char * MOD_POL_RMT_SCHEDULER
const char * MOD_POL_RMT_QMONITOR
const char * SIG_RMT_PortReadyForRead
virtual void prePDUInsertion(RMTQueue *queue)
virtual std::string generateOutputQueueID(PDU *pdu)
const char * SIG_RMT_ErrornousPacket
void tracePDUEvent(const cPacket *pkt, TraceEventType eventType)
const char * SIG_RMT_QueuePDUSent
const cPacket * getLastPDU() const
simsignal_t sigRMTPacketError
virtual unsigned int getSeqNum() const
virtual Address & getDstAddr()
virtual void postQueueArrival(cObject *obj)
virtual void handleMessage(cMessage *msg)
Relaying and Multiplexing Task .
simsignal_t sigRMTNoConnID
int getDstCepId() const
Getter of destination Connection-Endpoint identifier.
virtual Address & getSrcAddr()
virtual void postQueueDeparture(cObject *obj)
virtual std::string toString()=0
QueueIDGenBase * queueIdGenerator
int getThreshLength() const
const char * MOD_POL_RA_IDGENERATOR
LisRMTQueuePDUPreSend * lisRMTQueuePDUPreSend
LisRMTQueuePDUSent * lisRMTQueuePDUSent
virtual void preQueueDeparture(cObject *obj)
AddressComparatorBase * addrComparator
const char * SIG_RMT_QueuePDUPostRcvd
const char * SIG_RMT_NoConnId
std::vector< RMTPort * > fwTableLookup(const PDU *pdu)
void createEfcpiGate(unsigned int efcpiId)
void substractWaiting(RMTQueueType direction)
LisRMTQueuePDUPostRcvd * lisRMTQueuePDUPostRcvd
RMTQMonitorBase * qMonPolicy
virtual bool run(RMTQueue *queue)
virtual void processQueues(RMTPort *port, RMTQueueType direction)
void processMessage(cMessage *msg)
RMTQueue * getQueueById(RMTQueueType type, const char *queueId) const
const APN & getApn() const
Getter of unique APN which is initialized during object construction.
virtual int getFlags() const
virtual void writeToPort(cObject *obj)
const char * GATE_EFCPIO_
virtual void onQueueLengthDrop(RMTQueue *queue)
const std::string & getName() const
Gets APN string name representation.
const char * MOD_POL_RA_ADDRCOMPARATOR
void relayPDUToPort(PDU *msg)
const char * MOD_POL_RMT_PDUFWD
virtual void postPDURelease(RMTQueue *queue)
const char * MOD_POL_RMT_MAXQ
virtual void onMessageDrop(RMTQueue *queue, const cPacket *pdu)
cGate * getRMTAccessGate() const
std::deque< cMessage * > invalidPDUs
virtual void readFromPort(cObject *obj)