RINASim  October 2016
Documentation of framework for OMNeT++
RA.cc
Go to the documentation of this file.
1 
2 // The MIT License (MIT)
3 //
4 // Copyright (c) 2014-2016 Brno University of Technology, PRISTINE project
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23 
31 #include "RA.h"
32 
34 
35 // QoS loader parameters
36 const char* PAR_QOSDATA = "qoscubesData";
37 const char* ELEM_QOSCUBE = "QoSCube";
38 const char* PAR_QOSREQ = "qosReqData";
39 const char* ELEM_QOSREQ = "QoSReq";
40 const char* ATTR_ID = "id";
41 
42 
43 void RA::initialize(int stage)
44 {
45  if (stage == 1)
46  {
47  // determine and set RMT mode of operation
48  setRMTMode();
49  // preallocate flows
50  initFlowAlloc();
51  return;
52  }
53 
54  // retrieve pointers to other modules
55  thisIPC = this->getModuleByPath("^.^");
56  rmtModule = (getRINAModule<cModule*>(this, 2, {MOD_RELAYANDMUX}));
57 
58  // Get access to the forwarding and routing functionalities...
59  fwdtg = getRINAModule<IntPDUFG *>(this, 1, {MOD_PDUFWDGEN});
60  difAllocator = getRINAModule<DA*>(this, 3, {MOD_DIFALLOC, MOD_DA});;
61  flowTable = getRINAModule<NM1FlowTable*>(this, 1, {MOD_RANM1FLOWTABLE});
62  rmt = getRINAModule<RMT*>(this, 2, {MOD_RELAYANDMUX, MOD_RMT});
63  rmtAllocator = getRINAModule<RMTModuleAllocator*>(this, 2, {MOD_RELAYANDMUX, MOD_RMTALLOC});
64  fa = getRINAModule<FABase*>(this, 2, {MOD_FLOWALLOC, MOD_FA});
65 
66  // retrieve pointers to policies
67  qAllocPolicy = getRINAModule<QueueAllocBase*>(this, 1, {MOD_POL_QUEUEALLOC});
68 
69  // initialize attributes
70  std::ostringstream os;
71  os << thisIPC->par(PAR_IPCADDR).stringValue() << "_"
72  << thisIPC->par(PAR_DIFNAME).stringValue();
73  processName = os.str();
74 
76  initQoSCubes();
77 
78  WATCH_LIST(this->QoSCubes);
79 }
80 
81 void RA::handleMessage(cMessage *msg)
82 {
83  if (msg->isSelfMessage())
84  {
85  if (!opp_strcmp(msg->getName(), "RA-CreateConnections"))
86  {
87  auto flows = preAllocs[simTime()];
88 
89  while (!flows->empty())
90  {
91  // Data connections are carried by (N-1)-flows alone, whereas
92  // management connections use (N-1)-management flows ALONG WITH
93  // (N)-management flows. Thus, data flows are allocated via
94  // (N-1)-FA and management flows via (N)-FA.
95  // In addition to that, we can assume that (N-1)-data flows
96  // are going to require (N)-management, so it's allocated as well.
97 
98  Flow* flow = flows->front();
99  if (flow->isManagementFlow())
100  { // mgmt flow
101  //std::cout << "prepare MGMT flow" << endl;
102  //createNM1Flow(flow);
103  createNFlow(flow);
104  }
105  else
106  { // data flow
107  createNM1Flow(flow);
108  }
109 
110  flows->pop_front();
111  }
112 
113  delete flows;
114  delete msg;
115  }
116  else if (!opp_strcmp(msg->getName(), "RA-TerminateConnections"))
117  {
118  auto flows = preDeallocs[simTime()];
119 
120  while (!flows->empty())
121  {
122  Flow* flow = flows->front();
123  if (flow->isManagementFlow())
124  { // mgmt flow
125  // huh?
126  }
127  else
128  { // data flow
129  removeNM1Flow(flow);
130  }
131 
132  flows->pop_front();
133  }
134 
135  delete flows;
136  delete msg;
137  }
138  }
139 }
140 
142 {
143  sigRACreFloPosi = registerSignal(SIG_RA_CreateFlowPositive);
144  sigRACreFloNega = registerSignal(SIG_RA_CreateFlowNegative);
145  sigRASDReqFromRMT = registerSignal(SIG_RA_InvokeSlowdown);
146  sigRASDReqFromRIB = registerSignal(SIG_RA_ExecuteSlowdown);
147  sigRAMgmtAllocd = registerSignal(SIG_RA_MgmtFlowAllocated);
149 
152 
155 
156  lisRACreFlow = new LisRACreFlow(this);
158 
159  lisRACreResPosi = new LisRACreResPosi(this);
160  thisIPC->getParentModule()->
162 
163  lisRADelFlow = new LisRADelFlow(this);
164  thisIPC->getParentModule()->
166 
168  thisIPC->getParentModule()->
169  subscribe(SIG_EFCP_StopSending, this->lisEFCPStopSending);
170 
172  thisIPC->getParentModule()->
173  subscribe(SIG_EFCP_StartSending, this->lisEFCPStartSending);
174 
176  thisIPC->subscribe(SIG_RMT_SlowdownRequest, this->lisRMTSDReq);
177 
178  lisRIBCongNotif = new LisRIBCongNotif(this);
180 }
181 
183 {
184  cXMLElement* dirXml = par("preallocation").xmlValue();
185  const cXMLElementList& timeMap = dirXml->getChildrenByTagName("SimTime");
186 
187  for (auto const m : timeMap)
188  {
189  simtime_t time = static_cast<simtime_t>(atoi(m->getAttribute("t")));
190  const cXMLElementList& connMap = m->getChildrenByTagName("Connection");
191 
192  for (auto const n : connMap)
193  {
194  // retrieve the parameters
195  const char* src = n->getAttribute("src");
196  const char* dst = n->getAttribute("dst");
197  const char* qosReqID = n->getAttribute("qosReq");
198 
199  if (!src || !dst || !qosReqID)
200  {
201  throw cRuntimeError("Invalid preallocation parameter(s) for SimTime %s",
202  time.str().c_str());
203  }
204 
205  // keep going only if this is a configuration meant for this IPC process
206  if (opp_strcmp(src, processName.c_str()))
207  {
208  continue;
209  }
210 
211  APNamingInfo srcAPN = APNamingInfo(APN(src));
212  APNamingInfo dstAPN = APNamingInfo(APN(dst));
213 
214  QoSReq* qosReq = nullptr;
215  if (!opp_strcmp(qosReqID, "mgmt"))
216  {
217  qosReq = &mgmtReqs;
218  }
219  else
220  {
221  qosReq = initQoSReqById(qosReqID);
222  }
223 
224  if (!qosReq)
225  {
226  throw cRuntimeError("Invalid QoSReqId %s for SimTime=%s, src=\"%s\", dst=\"%s\"",
227  qosReqID,
228  time.str().c_str(),
229  src,
230  dst);
231  }
232 
233  Flow *flow = new Flow(srcAPN, dstAPN);
234  flow->setQosRequirements(*qosReq);
235 
236  if (preAllocs[time] == nullptr)
237  {
238  preAllocs[time] = new std::list<Flow*>;
239  cMessage* msg = new cMessage("RA-CreateConnections");
240  scheduleAt(time, msg);
241 
242  }
243 
244  preAllocs[time]->push_back(flow);
245 
246  const char* until_s = n->getAttribute("until");
247  if (until_s)
248  {
249  simtime_t until = static_cast<simtime_t>(atoi(until_s));
250 
251  if (until <= time)
252  {
253  throw cRuntimeError(
254  "Invalid time of deallocation for flow {SimTime=%s, src=\"%s\", dst=\"%s\", qosReq=\"%s\"}",
255  time.str().c_str(),
256  src,
257  dst,
258  qosReqID);
259  }
260 
261  if (preDeallocs[until] == nullptr)
262  {
263  preDeallocs[until] = new std::list<Flow*>;
264  cMessage* msg = new cMessage("RA-TerminateConnections");
265  scheduleAt(until, msg);
266  }
267 
268  preDeallocs[until]->push_back(flow);
269  }
270  }
271  }
272 }
273 
278 {
279  // identify the role of this IPC process in processing system
280  std::string bottomGate = thisIPC->gate("southIo$o", 0)->getNextGate()->getName();
281 
282  if (par("onWire").boolValue() || bottomGate == "medium$o" || bottomGate == "mediumIntra$o" || bottomGate == "mediumInter$o")
283  {
284  // we're on wire! this is the bottommost "interface" DIF
285  rmt->setOnWire(true);
286  // connect RMT to the medium
287  bindMediumToRMT();
288  }
289  else if (bottomGate == "northIo$i")
290  { // other IPC processes are below us
291  rmt->setOnWire(false);
292  }
293 
294  if (thisIPC->par("relay").boolValue() == true)
295  { // this is an IPC process that uses PDU forwarding
296  rmt->enableRelay();
297  }
298 }
299 
304 {
305  cXMLElement* qosXml = nullptr;
306  if (par(PAR_QOSDATA).xmlValue() != nullptr
307  && par(PAR_QOSDATA).xmlValue()->hasChildren())
308  qosXml = par(PAR_QOSDATA).xmlValue();
309  else
310  error("qoscubesData parameter not initialized!");
311 
312  // load cubes from XML
313  cXMLElementList cubes = qosXml->getChildrenByTagName(ELEM_QOSCUBE);
314  for (auto const m : cubes)
315  {
316  if (!m->getAttribute(ATTR_ID))
317  {
318  EV << "Error parsing QoSCube. Its ID is missing!" << endl;
319  continue;
320  }
321 
322  cXMLElementList attrs = m->getChildren();
323  QoSCube cube = QoSCube(attrs);
324  cube.setQosId(m->getAttribute(ATTR_ID));
325 
326  //Integrity check!!!
327  if (cube.isDefined())
328  {
329  QoSCubes.push_back(cube);
330  }
331  else
332  {
333  EV << "QoSCube with ID " << cube.getQosId()
334  << " contains DO-NOT-CARE parameter. It is not fully defined,"
335  << " thus it is not loaded into RA's QoS-cube set!"
336  << endl;
337  }
338  }
339 
340  if (!QoSCubes.size())
341  {
342  std::ostringstream os;
343  os << this->getFullPath()
344  << " does not have any QoSCube in its set. "
345  << "It cannot work without at least one valid QoS cube!"
346  << endl;
347  error(os.str().c_str());
348  }
349 
350  // add a static QoS cube for management
351  QoSCubes.push_back(QoSCube::MANAGEMENT);
352 
353  // add a QoS requirements object
355 }
356 
361 QoSReq* RA::initQoSReqById(const char* id)
362 {
363  cXMLElement* qosXml = nullptr;
364  if (par(PAR_QOSREQ).xmlValue() != nullptr
365  && par(PAR_QOSREQ).xmlValue()->hasChildren())
366  {
367  qosXml = par(PAR_QOSREQ).xmlValue();
368  }
369  else
370  {
371  return nullptr;
372  }
373 
374  cXMLElement* reqs = qosXml->getFirstChildWithAttribute(ELEM_QOSREQ, ATTR_ID, id);
375  cXMLElementList attrs;
376 
377  if (reqs)
378  {
379  attrs = reqs->getChildren();
380  }
381 
382  return (reqs ? new QoSReq(attrs) : nullptr);
383 }
384 
390 {
391  // retrieve the south gate
392  cGate* thisIPCIn = thisIPC->gateHalf("southIo$i", cGate::INPUT, 0);
393  cGate* thisIPCOut = thisIPC->gateHalf("southIo$o", cGate::OUTPUT, 0);
394 
396  // create an INOUT gate on the bottom of rmtModule
397  std::ostringstream rmtGate;
398  rmtGate << GATE_SOUTHIO_ << "PHY";
399  rmtModule->addGate(rmtGate.str().c_str(), cGate::INOUT, false);
400  cGate* rmtModuleIn = rmtModule->gateHalf(rmtGate.str().c_str(), cGate::INPUT);
401  cGate* rmtModuleOut = rmtModule->gateHalf(rmtGate.str().c_str(), cGate::OUTPUT);
402 
403  rmtModuleOut->connectTo(thisIPCOut);
404  thisIPCIn->connectTo(rmtModuleIn);
405 
406  // create a mock "(N-1)-port" for interface
407  RMTPort* port = rmtAllocator->addPort(nullptr);
408  // connect the port to the bottom
409  interconnectModules(rmtModule, port->getParentModule(), rmtGate.str(), std::string(GATE_SOUTHIO));
410  // finalize initial port parameters
411  port->postInitialize();
412  port->setOutputReady();
413  port->setInputReady();
414 
415  // apply queue allocation policy handler
417 }
418 
427 RMTPort* RA::bindNM1FlowToRMT(cModule* bottomIPC, FABase* fab, Flow* flow)
428 {
429  // get (N-1)-port-id and expand it so it's unambiguous within this IPC
430  int portID = flow->getSrcPortId();
431  std::string combinedPortID = normalizePortID(bottomIPC->getFullName(), portID);
432 
433  // binding begins at the bottom and progresses upwards:
434  // 1) interconnect bottom IPC <-> this IPC <-> compound RMT module
435  std::ostringstream bottomIPCGate, thisIPCGate;
436  bottomIPCGate << GATE_NORTHIO_ << portID;
437  thisIPCGate << GATE_SOUTHIO_ << combinedPortID;
438  interconnectModules(bottomIPC, thisIPC, bottomIPCGate.str(), thisIPCGate.str());
439  interconnectModules(thisIPC, rmtModule, thisIPCGate.str(), thisIPCGate.str());
440 
441  // 2) attach a RMTPort instance (pretty much a representation of an (N-1)-port)
442  RMTPort* port = rmtAllocator->addPort(flow);
443  interconnectModules(rmtModule, port->getParentModule(), thisIPCGate.str(), std::string(GATE_SOUTHIO));
444  // finalize initial port parameters
445  port->postInitialize();
446 
447  // 3) allocate queues
448  // apply queue allocation policy handler (if applicable)
450 
451  // 4) update the flow table
452  flowTable->insert(flow, fab, port, thisIPCGate.str().c_str());
453 
454  return port;
455 }
456 
465 std::string RA::normalizePortID(std::string ipcName, int flowPortID)
466 {
467  std::ostringstream newPortID;
468  newPortID << ipcName << '_' << flowPortID;
469  return newPortID.str();
470 }
471 
478 {
480 }
481 
488 {
489  Enter_Method("createNM1Flow()");
490 
491  const APN& dstApn = flow->getDstApni().getApn();
492  const std::string& qosID = flow->getConId().getQoSId();
493 
494  //
495  // A flow already exists from this ipc to the destination one(passing through a neighbor)?
496  //
497  PDUFGNeighbor * e = fwdtg->getNextNeighbor(Address(dstApn.getName()), flow->getConId().getQoSId());
498 
499  if(e)
500  {
502  e->getDestAddr().getApn().getName(), qosID);
503 
504  if(fi)
505  {
506  return;
507  }
508  }
509  //
510  // End flow exists check.
511  //
512 
513  //Ask DA which IPC to use to reach dst App
514  const Address* ad = difAllocator->resolveApnToBestAddress(dstApn);
515  if (ad == nullptr)
516  {
517  EV << "DifAllocator returned nullptr for resolving " << dstApn << endl;
518  return;
519  }
520  Address addr = *ad;
521 
522  //TODO: Vesely - New IPC must be enrolled or DIF created
523  if (!difAllocator->isDifLocal(addr.getDifName()))
524  {
525  EV << "Local CS does not have any IPC in DIF " << addr.getDifName() << endl;
526  return;
527  }
528 
529  // retrieve local IPC process enrolled in given DIF
530  cModule* targetIPC = difAllocator->getDifMember(addr.getDifName());
531  // retrieve the IPC process's Flow Allocator
532  FABase* fab = difAllocator->findFaInsideIpc(targetIPC);
533  // command the (N-1)-FA to allocate the flow
534  bool status = fab->receiveAllocateRequest(flow);
535 
536  if (status)
537  { // the Allocate procedure has successfully begun (and M_CREATE request has been sent)
538  // bind the new (N-1)-flow to an RMT port
539  RMTPort* port = bindNM1FlowToRMT(targetIPC, fab, flow);
540 
541  fab->invokeNewFlowRequestPolicy(flow);
542 
543  // notify the PDUFG of the new flow
545  Address(flow->getDstApni().getApn().getName()),
546  flow->getQosCube(),
547  port);
548  }
549  else
550  { // Allocate procedure couldn't be invoked
551  EV << "Flow not allocated!" << endl;
552  }
553 
554  // flow creation will be finalized by postNM1FlowAllocation(flow)
555  // (on arrival of M_CREATE_R)
556 }
557 
565 {
566  Enter_Method("createNM1FlowWoAlloc()");
567 
568  const APN& dstAPN = flow->getDstApni().getApn();
569  const std::string& qosID = flow->getConId().getQoSId();
570 
571  //
572  // A flow already exists from this ipc to the destination one(passing through a neighbor)?
573  //
574  Address addrs = Address(dstAPN.getName());
575  PDUFGNeighbor* e = fwdtg->getNextNeighbor(addrs, qosID);
576 
577  if(e)
578  {
580  e->getDestAddr().getApn().getName(), qosID);
581 
582  if(fi)
583  {
584  return;
585  }
586  }
587  //
588  // End flow exists check.
589  //
590 
591 
592  // Ask DA which IPC to use to reach dst App
593  const Address* ad = difAllocator->resolveApnToBestAddress(dstAPN);
594  if (ad == nullptr)
595  {
596  EV << "DifAllocator returned nullptr for resolving " << dstAPN << endl;
598  return;
599  }
600  Address addr = *ad;
601 
602  //TODO: Vesely - New IPC must be enrolled or DIF created
603  if (!difAllocator->isDifLocal(addr.getDifName()))
604  {
605  EV << "Local CS does not have any IPC in DIF " << addr.getDifName()
606  << endl;
608  return;
609  }
610 
611  // retrieve local IPC process enrolled in given DIF
612  cModule* targetIpc = difAllocator->getDifMember(addr.getDifName());
613  // retrieve the IPC process's Flow Allocator
614  FABase* fab = difAllocator->findFaInsideIpc(targetIpc);
615  // attach the new flow to RMT
616  RMTPort* port = bindNM1FlowToRMT(targetIpc, fab, flow);
617  // notify the PDUFG of the new flow
619  Address(flow->getDstApni().getApn().getName()),
620  flow->getQosCube(),
621  port);
622  // answer the M_CREATE request
624 
625  // mark this flow as connected
626  flowTable->findFlowByDstApni(dstAPN.getName(), qosID)->
627  setConnectionStatus(NM1FlowTableItem::CON_ESTABLISHED);
628  port->setOutputReady();
629  port->setInputReady();
630 }
631 
638 {
639  Enter_Method("postNFlowAllocation()");
640 
641  // invoke QueueAlloc policy on relevant (N-1)-ports
642  if (rmt->isOnWire())
643  {
645  }
646  else
647  {
648  auto neighApn = flow->getDstNeighbor().getApn().getName();
649  auto qosId = flow->getConId().getQoSId();
650  auto item = flowTable->findFlowByDstApni(neighApn, qosId);
651  if (item != nullptr)
652  {
653  qAllocPolicy->onNFlowAlloc(item->getRMTPort(), flow);
654  }
655  }
656 }
657 
664 {
665  Enter_Method("postNM1FlowAllocation()");
666  // mark this flow as connected and update info
668  RMTPort* port = ftItem->getRMTPort();
669  port->setOutputReady();
670  port->setInputReady();
671  port->setFlow(ftItem->getFlow());
672 
673  // if this is a management flow, notify the Enrollment module
674  if (ftItem->getFlow()->isManagementFlow())
675  {
676  APNIPair* apnip = new APNIPair(ftItem->getFlow()->getSrcApni(), ftItem->getFlow()->getDstApni());
678  }
679 }
680 
682 {
683  Enter_Method("removeNM1FlowBindings()");
684 
685  // disconnect and delete gates
686  RMTPort* port = ftItem->getRMTPort();
687  const char* gateName = ftItem->getGateName().c_str();
688  cGate* thisIpcIn = thisIPC->gateHalf(gateName, cGate::INPUT);
689  cGate* thisIpcOut = thisIPC->gateHalf(gateName, cGate::OUTPUT);
690  cGate* rmtModuleIn = rmtModule->gateHalf(gateName, cGate::INPUT);
691  cGate* rmtModuleOut = rmtModule->gateHalf(gateName, cGate::OUTPUT);
692  cGate* portOut = port->getSouthOutputGate()->getNextGate();
693 
694  portOut->disconnect();
695  rmtModuleOut->disconnect();
696  thisIpcOut->disconnect();
697  thisIpcIn->disconnect();
698  rmtModuleIn->disconnect();
699 
700  rmtModule->deleteGate(gateName);
701  thisIPC->deleteGate(gateName);
702 
703  // remove table entries
704  fwdtg->removeFlowInfo(port);
705  rmtAllocator->removePort(port);
706  flowTable->remove(ftItem->getFlow());
707 }
708 
715 {
716  Enter_Method("removeNM1Flow()");
717  auto flowItem = flowTable->lookup(flow);
718  ASSERT(flowItem != nullptr);
719  flowItem->setConnectionStatus(NM1FlowTableItem::CON_RELEASING);
720  flowItem->getFABase()->receiveDeallocateRequest(flow);
721 
722  // TODO: discuss whether the bindings should be removed afterwards
723  removeNM1FlowBindings(flowItem);
724 }
725 
733 {
734  Enter_Method("bindNFlowToNM1Flow()");
735 
736  EV << "Received a request to bind an (N)-flow (dst "
737  << flow->getDstApni().getApn().getName() << ", QoS-id "
738  << flow->getConId().getQoSId() << ") to an (N-1)-flow." << endl;
739 
740  if (rmt->isOnWire())
741  {
742  EV << "This is a bottommost IPCP, the (N)-flow is bound to a medium." << endl;
743  return true;
744  }
745 
746  std::string dstAddr = flow->getDstAddr().getApn().getName();
747  std::string neighAddr = flow->getDstNeighbor().getApn().getName();
748  std::string qosID = flow->getConId().getQoSId();
749 
750  EV << "Binding to an (N-1)-flow leading to " << dstAddr;
751  if (dstAddr != neighAddr)
752  {
753  EV << " through neighbor " << neighAddr;
754  }
755  EV << "." << endl;
756 
758  APNamingInfo neighAPN = APNamingInfo(APN(neighAddr));
759  APNamingInfo dstAPN = APNamingInfo(APN(dstAddr));
760 
761  Address addrs = Address(dstAddr);
762  PDUFGNeighbor * te = fwdtg->getNextNeighbor(addrs, qosID);
763 
764  if (te)
765  {
766  neighAddr = te->getDestAddr().getApn().getName();
767  }
768 
769  auto nm1FlowItem = flowTable->findFlowByDstApni(neighAddr, qosID);
770 
771  if (nm1FlowItem != nullptr)
772  { // a flow exists
773  if (nm1FlowItem->getConnectionStatus()
775  {
776  EV << "Such (N-1)-flow is already present, using it." << endl;
777  return true;
778  }
779  else if (nm1FlowItem->getConnectionStatus() == NM1FlowTableItem::CON_FLOWPENDING)
780  {
781  EV << "Such (N-1)-flow is already present but its allocation is not yet finished"
782  << endl;
783  }
784  }
785  else
786  { // no suitable flow exists
787  EV << "No such (N-1)-flow present, allocating a new one." << endl;
788  Flow *nm1Flow = new Flow(srcAPN, neighAPN);
789  nm1Flow->setQosRequirements(flow->getQosRequirements());
790  createNM1Flow(nm1Flow);
791  }
792 
793  return false;
794 }
795 
797 {
798  Enter_Method("blockNM1PortOutput()");
799  ftItem->getRMTPort()->blockOutput();
800 }
801 
803 {
804  Enter_Method("unblockNM1PortOutput()");
805  ftItem->getRMTPort()->unblockOutput();
806 }
807 
809 {
810  emit(sigRACreFloPosi, flow);
811 }
812 
814 {
815  emit(sigRACreFloNega, flow);
816 }
817 
819 {
820  Enter_Method("signalizeSlowdownRequestToRIBd()");
821  emit(sigRASDReqFromRMT, pdu);
822 }
823 
825 {
826  Enter_Method("signalizeSlowdownRequestToEFCP()");
827  CongestionDescriptor* congInfo = check_and_cast<CongestionDescriptor*>(obj);
828  emit(sigRASDReqFromRIB, congInfo);
829 }
830 
832 {
833  Enter_Method("signalizeMgmtAllocToEnrollment()");
834  emit(sigRAMgmtAllocd, apnip);
835 }
836 
838 {
839  Enter_Method("signalizeMgmtDeallocToEnrollment()");
840  emit(sigRAMgmtDeallocd, flow);
841 }
842 
844 {
845  return flowTable;
846 }
847 
848 bool RA::hasFlow(std::string addr, std::string qosId) {
849  return rmt->isOnWire() ? true : (flowTable->findFlowByDstApni(addr, qosId) != nullptr);
850 }
851 
852 bool RA::sleepFlow(Flow * flow, simtime_t wakeUp) {
853  Enter_Method_Silent();
854  if(flowTable->lookup(flow) != nullptr) {
855  simtime_t now = simTime();
856  if(wakeUp >= now) {
857  Flow *nflow = new Flow(flow->getSrcApni(), flow->getDstApni());
858  nflow->setQosRequirements(flow->getQosReqs());
859 
860  if (preAllocs[wakeUp] == nullptr) {
861  preAllocs[wakeUp] = new std::list<Flow*>;
862  cMessage* msg = new cMessage("RA-CreateConnections");
863  scheduleAt(wakeUp, msg);
864  }
865  preAllocs[wakeUp]->push_back(nflow);
866  }
867 
868  if (preDeallocs[now] == nullptr)
869  {
870  preDeallocs[now] = new std::list<Flow*>;
871  cMessage* msg = new cMessage("RA-TerminateConnections");
872  scheduleAt(now, msg);
873  }
874  preDeallocs[now]->push_back(flow);
875 
876  return true;
877  }
878  return false;
879 }
const DAP & getDifName() const
Getter of common DIF name.
Definition: Address.cc:73
NM1FlowTable * flowTable
Definition: RA.h:93
static const QoSCube MANAGEMENT
Definition: QoSCube.h:201
void initSignalsAndListeners()
Definition: RA.cc:141
void signalizeCreateFlowPositiveToRIBd(Flow *flow)
Definition: RA.cc:808
void signalizeSlowdownRequestToRIBd(cPacket *pdu)
Definition: RA.cc:818
Class representing flow object with attributes from specs.
Definition: Flow.h:45
void setQosRequirements(const QoSReq &qosReqs)
Sets QoS parameters wanted by flow initiator.
Definition: Flow.cc:323
const char * SIG_RIBD_CreateFlow
Definition: RINASignals.cc:49
void insert(const NM1FlowTableItem *entry)
Definition: NM1FlowTable.cc:37
const char * SIG_RMT_SlowdownRequest
Definition: RINASignals.cc:130
const char * PAR_QOSREQ
Definition: RA.cc:38
virtual void postNFlowAllocation(Flow *flow)
Definition: RA.cc:637
const APNamingInfo & getSrcApni() const
Gets read-only source APNamingInfo.
Definition: Flow.cc:134
virtual void handleMessage(cMessage *msg)
Definition: RA.cc:81
virtual void onNFlowAlloc(RMTPort *port, Flow *flow)
const char * ELEM_QOSREQ
Definition: RA.cc:39
const char * MOD_DIFALLOC
Definition: ExternConsts.cc:34
virtual void unblockNM1PortOutput(NM1FlowTableItem *ftItem)
Definition: RA.cc:802
const char * SIG_RA_MgmtFlowAllocated
Definition: RINASignals.cc:120
LisRIBCongNotif * lisRIBCongNotif
Definition: RA.h:130
Address & getDestAddr()
Application Process Name class.
Definition: APN.h:36
void signalizeMgmtDeallocToEnrollment(Flow *flow)
Definition: RA.cc:837
virtual bool isOnWire()
Definition: RMT.h:70
void setOnWire(bool status)
Definition: RMT.h:129
const char * SIG_RIBD_DeleteRequestFlow
Definition: RINASignals.cc:47
virtual bool receiveAllocateRequest(Flow *flow)=0
std::string getQoSId() const
Getter of selected QoS-cube identifier.
Definition: ConnectionId.cc:44
std::string normalizePortID(std::string ipcName, int flowPortID)
Definition: RA.cc:465
std::string processName
Definition: RA.h:99
virtual bool receiveMgmtAllocateRequest(Flow *mgmtflow)=0
void postInitialize()
Definition: RMTPort.cc:65
virtual bool hasFlow(std::string addr, std::string qosId)
Definition: RA.cc:848
QoSReq * initQoSReqById(const char *id)
Definition: RA.cc:361
RMTPort * getRMTPort() const
void remove(Flow *flow)
Definition: NM1FlowTable.cc:88
const char * SIG_RIBD_CongestionNotification
Definition: RINASignals.cc:55
void signalizeSlowdownRequestToEFCP(cObject *obj)
Definition: RA.cc:824
virtual void removeNM1Flow(Flow *flow)
Definition: RA.cc:714
const char * ELEM_QOSCUBE
Definition: RA.cc:37
const char * PAR_IPCADDR
Definition: ExternConsts.cc:93
virtual void postNM1FlowAllocation(NM1FlowTableItem *ftItem)
Definition: RA.cc:663
void setRMTMode()
Definition: RA.cc:277
const char * GATE_NORTHIO_
const char * ATTR_ID
Definition: RA.cc:40
NM1FlowTableItem * findFlowByDstAddr(const std::string &addr, const std::string &qosId)
Definition: NM1FlowTable.cc:74
const char * MOD_PDUFWDGEN
Definition: ExternConsts.cc:54
bool isDifLocal(const DAP &difName)
Definition: DA.cc:134
bool isDefined()
Definition: QoSCube.cc:493
QoSReq & getQosReqs()
Definition: Flow.cc:301
void unblockOutput()
Definition: RMTPort.cc:408
simsignal_t sigRAMgmtAllocd
Definition: RA.h:118
void insertFlowInfo(Address addr, QoSCube qos, RMTPort *port)
Definition: IntPDUFG.cc:68
void setOutputReady()
Definition: RMTPort.cc:266
const char * MOD_RMTALLOC
Definition: ExternConsts.cc:63
QueueAllocBase * qAllocPolicy
Definition: RA.h:94
const APN & getApn() const
Getter of APN.
Definition: APNamingInfo.h:142
virtual bool sleepFlow(Flow *flow, simtime_t wakeUp)
Definition: RA.cc:852
const char * GATE_SOUTHIO_
simsignal_t sigRACreFloNega
Definition: RA.h:115
LisRACreResPosi * lisRACreResPosi
Definition: RA.h:124
FABase * findFaInsideIpc(cModule *ipc)
Definition: DA.cc:183
Monitoring and adjustment of IPC process operations .
LisRMTSlowdownRequest * lisRMTSDReq
Definition: RA.h:129
FABase * fa
Definition: RA.h:92
APNamingInfo holds complete naming info for particular application process.
Definition: APNamingInfo.h:43
const char * SIG_RA_ExecuteSlowdown
Definition: RINASignals.cc:119
virtual void initialize(int stage)
Definition: RA.cc:43
virtual void createNM1FlowWithoutAllocate(Flow *flow)
Definition: RA.cc:564
std::map< simtime_t, std::list< Flow * > * > preAllocs
Definition: RA.h:100
const QoSReq & getQosRequirements() const
Gets QoS parameters wanted by flow initiator.
Definition: Flow.cc:176
RMTPort * addPort(Flow *flow)
void initFlowAlloc()
Definition: RA.cc:182
void interconnectModules(cModule *m1, cModule *m2, std::string n1, std::string n2)
Definition: Utils.cc:79
const Address & getDstNeighbor() const
Gets neighbor destination Address, which is the address of (interim) hop-by-hop destination Used duri...
Definition: Flow.cc:340
bool isManagementFlow() const
Definition: Flow.cc:366
const char * SIG_RA_MgmtFlowDeallocated
Definition: RINASignals.cc:121
const char * MOD_RANM1FLOWTABLE
Definition: ExternConsts.cc:56
const char * MOD_RELAYANDMUX
Definition: ExternConsts.cc:57
void removePort(RMTPort *port)
simsignal_t sigRASDReqFromRIB
Definition: RA.h:117
void blockOutput()
Definition: RMTPort.cc:397
const char * SIG_FAI_CreateFlowResponsePositive
Definition: RINASignals.cc:112
NM1FlowTableItem * findFlowByDstApni(const std::string &addr, const std::string &qosId)
Definition: NM1FlowTable.cc:60
const char * MOD_FLOWALLOC
Definition: ExternConsts.cc:47
virtual void createNFlow(Flow *flow)
Definition: RA.cc:477
simsignal_t sigRASDReqFromRMT
Definition: RA.h:116
RMTModuleAllocator * rmtAllocator
Definition: RA.h:91
virtual PDUFGNeighbor * getNextNeighbor(const Address &destination, const std::string &qos)
Definition: IntPDUFG.cc:42
Definition: RA.h:61
void signalizeCreateFlowNegativeToRIBd(Flow *flow)
Definition: RA.cc:813
const char * GATE_SOUTHIO
virtual void blockNM1PortOutput(NM1FlowTableItem *ftItem)
Definition: RA.cc:796
void signalizeMgmtAllocToEnrollment(APNIPair *apnip)
Definition: RA.cc:831
int getSrcPortId() const
Gets source PortId.
Definition: Flow.cc:142
void setInputReady()
Definition: RMTPort.cc:287
virtual NM1FlowTable * getFlowTable()
Definition: RA.cc:843
const char * SIG_EFCP_StopSending
Definition: RINASignals.cc:142
Flow * getFlow() const
void bindMediumToRMT()
Definition: RA.cc:389
virtual bool bindNFlowToNM1Flow(Flow *flow)
Definition: RA.cc:732
LisEFCPStartSending * lisEFCPStartSending
Definition: RA.h:127
const char * SIG_RA_CreateFlowNegative
Definition: RINASignals.cc:117
RMT * rmt
Definition: RA.h:90
std::map< simtime_t, std::list< Flow * > * > preDeallocs
Definition: RA.h:101
const char * SIG_FAI_AllocateResponsePositive
Definition: RINASignals.cc:108
void initQoSCubes()
Definition: RA.cc:303
const char * PAR_DIFNAME
Definition: ExternConsts.cc:94
simsignal_t sigRACreFloPosi
Definition: RA.h:114
void removeFlowInfo(RMTPort *port)
Definition: IntPDUFG.cc:78
Class representing QoSCube with all its properties that is primarily used by FA, RMT and RA Specifica...
Definition: QoSCube.h:57
const ConnectionId & getConId() const
Gets read-only Flow's ConnectionId.
Definition: Flow.cc:86
QoSReq mgmtReqs
Definition: RA.h:103
void setQosId(std::string qoSId)
Sets QoSCube identifier.
Definition: QoSCube.cc:392
LisRADelFlow * lisRADelFlow
Definition: RA.h:125
QoSCubeSet QoSCubes
Definition: RABase.h:65
LisRAAllocResPos * lisRAAllocResPos
Definition: RA.h:122
const char * SIG_RIBD_CreateFlowResponsePositive
Definition: RINASignals.cc:51
std::string getQosId() const
Gets QoSCube identifier.
Definition: QoSCube.cc:364
void enableRelay()
Definition: RMT.h:130
const char * MOD_DA
Definition: ExternConsts.cc:33
std::string getGateName() const
virtual void removeNM1FlowBindings(NM1FlowTableItem *ftItem)
Definition: RA.cc:681
const APN & getApn() const
Getter of unique APN which is initialized during object construction.
Definition: Address.cc:119
Class representing QoSReq with all its properties that is primarily used by FA, RMT and RA Specificat...
Definition: QoSReq.h:44
virtual void onNM1PortInit(RMTPort *port)
const char * SIG_RA_InvokeSlowdown
Definition: RINASignals.cc:118
Define_Module(RA)
const char * SIG_EFCP_StartSending
Definition: RINASignals.cc:143
cModule * thisIPC
Definition: RA.h:88
DA * difAllocator
Definition: RA.h:87
Definition: FABase.h:33
LisEFCPStopSending * lisEFCPStopSending
Definition: RA.h:126
const Address & getDstAddr() const
Gets source Address, which is the address of communication end-point.
Definition: Flow.cc:150
NM1FlowTableItem * lookup(Flow *flow)
Definition: NM1FlowTable.cc:48
LisRACreAllocResPos * lisRACreAllocResPos
Definition: RA.h:123
virtual bool invokeNewFlowRequestPolicy(Flow *flow)=0
const char * MOD_RMT
Definition: ExternConsts.cc:62
cGate * getSouthOutputGate() const
Definition: RMTPort.cc:214
LisRACreFlow * lisRACreFlow
Definition: RA.h:121
RMTPort * bindNM1FlowToRMT(cModule *ipc, FABase *fab, Flow *flow)
Definition: RA.cc:427
void setFlow(Flow *flow)
Definition: RMTPort.cc:374
const std::string & getName() const
Gets APN string name representation.
Definition: APN.cc:40
const QoSCube & getQosCube() const
Definition: Flow.cc:171
cModule * getDifMember(const DAP &difName)
Definition: DA.cc:158
const char * MOD_POL_QUEUEALLOC
Definition: ExternConsts.cc:76
virtual void createNM1Flow(Flow *flow)
Definition: RA.cc:487
const char * SIG_RA_CreateFlowPositive
Definition: RINASignals.cc:116
const char * PAR_QOSDATA
Definition: RA.cc:36
const char * MOD_FA
Definition: ExternConsts.cc:46
Address class holds IPC Process identification.
Definition: Address.h:42
const APNamingInfo & getDstApni() const
Gets read-only destination APNamingInfo.
Definition: Flow.cc:102
const Address * resolveApnToBestAddress(const APN &apn)
Definition: DA.cc:31
simsignal_t sigRAMgmtDeallocd
Definition: RA.h:119
IntPDUFG * fwdtg
Definition: RA.h:97
cModule * rmtModule
Definition: RA.h:89
static const QoSReq MANAGEMENT
Definition: QoSReq.h:152
void setConnectionStatus(ConnectionStatus status)