RINASim  October 2016
Documentation of framework for OMNeT++
RMTPort.cc
Go to the documentation of this file.
1 // The MIT License (MIT)
2 //
3 // Copyright (c) 2014-2016 Brno University of Technology, PRISTINE project
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22 
23 #include "RMTPort.h"
24 
25 const char* SIG_STAT_RMTPORT_UP = "RMTPort_PassUp";
26 const char* SIG_STAT_RMTPORT_DOWN = "RMTPort_PassDown";
27 
29 
31 {
32  outputReady = false; // port should get activated by RA
33  inputReady = false; // port should get activated by RA
34  blockedOutput = false;
35  blockedInput = false;
36 
37  inputReadRate = 0;
38  postReadDelay = 0.0;
39 
40  waitingOnInput = 0;
41  waitingOnOutput = 0;
42 
43  southInputGate = gateHalf(GATE_SOUTHIO, cGate::INPUT);
44  southOutputGate = gateHalf(GATE_SOUTHIO, cGate::OUTPUT);
45 
46  queueIdGen = getRINAModule<QueueIDGenBase*>(this, 3, {MOD_RESALLOC, MOD_POL_RA_IDGENERATOR});
47 
48  sigStatRMTPortUp = registerSignal(SIG_STAT_RMTPORT_UP);
49  sigStatRMTPortDown = registerSignal(SIG_STAT_RMTPORT_DOWN);
52 
53  WATCH(outputReady);
54  WATCH(inputReady);
55 
56  WATCH(waitingOnOutput);
57  WATCH(waitingOnInput);
58  WATCH(inputReadRate);
59 
60  WATCH(blockedOutput);
61  WATCH(blockedInput);
62  WATCH_PTR(flow);
63 }
64 
66 {
67  protectIn = gateHalf("protect$i", cGate::INPUT);
68  protectOut = gateHalf("protect$o", cGate::OUTPUT);
69  unprotectIn = gateHalf("unprotect$i", cGate::INPUT);
70  unprotectOut = gateHalf("unprotect$o", cGate::OUTPUT);
71 
72  // this will be nullptr if this IPC doesn't use a channel
73  outputChannel = southOutputGate->findTransmissionChannel();
74 }
75 
76 void RMTPort::handleMessage(cMessage* msg)
77 {
78  if (msg->isSelfMessage())
79  {
80  if (!opp_strcmp(msg->getFullName(), "portTransmitEnd"))
81  { // a PDU transmit procedure has just been finished
82  emit(sigStatRMTPortDown, true);
84  }
85  else if (!opp_strcmp(msg->getFullName(), "readyToServe"))
86  {
88  }
89  else if (!opp_strcmp(msg->getFullName(), "readyForRead"))
90  {
91  setInputReady();
92  }
93 
94  delete msg;
95  }
96  else
97  {
98  if (msg->getArrivalGate() == southInputGate) // incoming message
99  { // pass to SDU protection
100  send(msg, unprotectOut);
101  }
102  else if (northInputGates.count(msg->getArrivalGate())) // outgoing message
103  { // pass to SDU protection
104  send(msg, protectOut);
105  }
106  else if (msg->getArrivalGate() == unprotectIn) // unprotected incoming message
107  {
108  SDUData* sduData = dynamic_cast<SDUData*>(msg);
109  if(sduData == nullptr){
110  delete msg;
111  EV << "this type of message isn't supported!" << endl;
112  return;
113  }
114  PDU* pdu = static_cast<PDU*>(sduData->decapsulate());
115  delete sduData;
116  // get a proper queue for this message
117  std::string queueID =
119 
120  RMTQueue* inQueue = getQueueById(RMTQueue::INPUT, queueID.c_str());
121 
122  if (inQueue != nullptr)
123  {
124  send(pdu, inQueue->getInputGate()->getPreviousGate());
125  emit(sigStatRMTPortUp, true);
126  }
127  else
128  {
129  EV << getFullPath()
130  << ": no input queue of such queue-id (" << queueID
131  << ") available!" << endl;
132  EV << pdu->getConnId().getQoSId() << endl;
133  }
134  }
135  else if (msg->getArrivalGate() == protectIn) // protected outgoing message
136  {
137  PDU* pdu = dynamic_cast<PDU*>(msg);
138  if (pdu == nullptr){
139  delete msg;
140  EV << "this type of message isn't supported!" << endl;
141  return;
142  }
143  setOutputBusy();
144  // start the transmission
145  SDUData* sduData = new SDUData();
146  sduData->encapsulate(pdu);
147  send(sduData, southOutputGate);
148 
149  // determine when should the port be ready to serve again
150  if (outputChannel != nullptr)
151  { // we're using a channel, likely with some sort of data rate/delay
152  simtime_t transmitEnd = outputChannel->getTransmissionFinishTime();
153  if (transmitEnd > simTime())
154  { // transmit requires some simulation time
155  scheduleAt(transmitEnd, new cMessage("portTransmitEnd"));
156  }
157  else
158  {
160  emit(sigStatRMTPortDown, true);
161  }
162  }
163  else
164  { // there isn't any delay or rate control in place, the PDU is already sent
166  emit(sigStatRMTPortDown, true);
167  emit(sigRMTPortReadyToWrite, this);
168  }
169  }
170  }
171 }
172 
173 
175 {
176  return inputQueues;
177 }
178 
180 {
181  return outputQueues;
182 }
183 
185 {
186  inputQueues.push_back(queue);
187 }
188 
189 
191 {
192  northInputGates.insert(queue->getOutputGate()->getNextGate());
193  outputQueues.push_back(queue);
194 }
195 
197 {
198  inputQueues.erase(std::remove(inputQueues.begin(), inputQueues.end(), queue),
199  inputQueues.end());
200 }
201 
203 {
204  northInputGates.erase(queue->getOutputGate()->getNextGate());
205  outputQueues.erase(std::remove(outputQueues.begin(), outputQueues.end(), queue),
206  outputQueues.end());
207 }
208 
210 {
211  return southInputGate;
212 }
213 
215 {
216  return southOutputGate;
217 }
218 
220 {
221  const RMTQueues& queueVect = (type == RMTQueue::INPUT ? inputQueues : outputQueues);
222 
223  return queueVect.front();
224 }
225 
227 {
228  const RMTQueues& queueVect = (type == RMTQueue::INPUT ? inputQueues : outputQueues);
229 
230  int longest = 0;
231  RMTQueue* result = nullptr;
232 
233  for(auto const q : queueVect)
234  {
235  if (q->getLength() > longest)
236  {
237  longest = q->getLength();
238  result = q;
239  }
240  }
241  return result;
242 }
243 
244 RMTQueue* RMTPort::getQueueById(RMTQueueType type, const char* queueId) const
245 {
246  const RMTQueues& queueVect = (type == RMTQueue::INPUT ? inputQueues : outputQueues);
247 
248  std::ostringstream fullId;
249  fullId << (type == RMTQueue::INPUT ? "inQ_" : "outQ_") << queueId;
250 
251  for(auto const q : queueVect)
252  {
253  if (!opp_strcmp(q->getFullName(), fullId.str().c_str()))
254  {
255  return q;
256  }
257  }
258  return nullptr;
259 }
260 
262 {
263  return outputReady;
264 }
265 
267 {
268  if (blockedOutput == false)
269  {
270  outputReady = true;
271  emit(sigRMTPortReadyToWrite, this);
272  redrawGUI();
273  }
274 }
275 
277 {
278  outputReady = false;
279  redrawGUI();
280 }
281 
283 {
284  return inputReady;
285 }
286 
288 {
289  if (blockedInput == false)
290  {
291  inputReady = true;
292  emit(sigRMTPortReadyForRead, this);
293  redrawGUI();
294  }
295 }
296 
298 {
299  inputReady = false;
300  redrawGUI();
301 }
302 
304 {
305  scheduleAt(simTime(), new cMessage("readyToServe"));
306 }
307 
309 {
310  Enter_Method_Silent("scheduleNextRead()");
311  setInputBusy();
312  scheduleAt(simTime() + postReadDelay, new cMessage("readyForRead"));
313 }
314 
315 unsigned long RMTPort::getWaiting(RMTQueueType direction)
316 {
317  return (direction == RMTQueue::INPUT ? waitingOnInput : waitingOnOutput);
318 }
319 
321 {
322  direction == RMTQueue::INPUT ? waitingOnInput++ : waitingOnOutput++;
323 }
324 
326 {
327  direction == RMTQueue::INPUT ? waitingOnInput-- : waitingOnOutput--;
328 }
329 
331 {
332  return inputReadRate;
333 }
334 
335 void RMTPort::setInputRate(long pdusPerSecond)
336 {
337  inputReadRate = pdusPerSecond;
338  postReadDelay = 60.0 / pdusPerSecond;
339 }
340 
341 void RMTPort::redrawGUI(bool redrawParent)
342 {
343  if (getEnvir()->isGUI())
344  {
345  getDisplayString().setTagArg("i2", 0, (isOutputReady() ? "status/green" : "status/noentry"));
346 
347  if (redrawParent)
348  {
349  std::ostringstream ostr;
350  ostr << "dstApp: " << endl << dstAppAddr << endl
351  << "QoS-id: " << endl << dstAppQoS;
352  if (blockedInput)
353  {
354  ostr << endl << "input blocked";
355  }
356  if (blockedOutput)
357  {
358  ostr << endl << "output blocked" << endl;
359  }
360 
361  cDisplayString& dStr = getParentModule()->getDisplayString();
362 
363  dStr.setTagArg("t", 0, ostr.str().c_str());
364  dStr.setTagArg("t", 1, "r");
365  }
366  }
367 }
368 
369 const Flow* RMTPort::getFlow() const
370 {
371  return flow;
372 }
373 
375 {
376  this->flow = flow;
377 
378  // display address of the remote IPC on top of the module
379  if (getEnvir()->isGUI())
380  {
381  if (flow != nullptr)
382  {
383  // shitty temporary (yeah, right) hack to strip the layer name off
384  const std::string& dstAppFull = flow->getDstApni().getApn().getName();
385  dstAppAddr = dstAppFull.substr(0, dstAppFull.find("_"));
386  dstAppQoS = flow->getConId().getQoSId();
387  }
388  else
389  {
390  dstAppAddr = "N/A (PHY)";
391  dstAppQoS = "N/A (medium)";
392  }
393  redrawGUI(true);
394  }
395 }
396 
398 {
399  EV << getFullPath() << ": blocking the port output." << endl;
400  blockedOutput = true;
401  if (outputReady)
402  {
403  setOutputBusy();
404  }
405  redrawGUI(true);
406 }
407 
409 {
410  EV << getFullPath() << ": unblocking the port output." << endl;
411  blockedOutput = false;
412  setOutputReady();
413  redrawGUI(true);
414 }
415 
417 {
418  EV << getFullPath() << ": blocking the port input." << endl;
419  blockedInput = true;
420  redrawGUI(true);
421 }
422 
424 {
425  EV << getFullPath() << ": unblocking the port input." << endl;
426  blockedInput = false;
427  setInputReady();
428  redrawGUI(true);
429 }
Class representing flow object with attributes from specs.
Definition: Flow.h:45
void unregisterOutputQueue(RMTQueue *queue)
Definition: RMTPort.cc:202
const char * MOD_RESALLOC
Definition: ExternConsts.cc:58
void unblockInput()
Definition: RMTPort.cc:423
void redrawGUI(bool redrawParent=false)
Definition: RMTPort.cc:341
const char * SIG_STAT_RMTPORT_DOWN
Definition: RMTPort.cc:26
simsignal_t sigRMTPortReadyForRead
Definition: RMTPort.h:228
cGate * getOutputGate() const
Definition: RMTQueue.cc:261
cGate * southOutputGate
Definition: RMTPort.h:185
simsignal_t sigStatRMTPortDown
Definition: RMTPort.h:231
QueueIDGenBase * queueIdGen
Definition: RMTPort.h:181
std::string getQoSId() const
Getter of selected QoS-cube identifier.
Definition: ConnectionId.cc:44
void scheduleNextWrite()
Definition: RMTPort.cc:303
unsigned long waitingOnInput
Definition: RMTPort.h:173
void postInitialize()
Definition: RMTPort.cc:65
Define_Module(RMTPort)
cGate * protectIn
Definition: RMTPort.h:186
RMTQueue * getFirstQueue(RMTQueueType type) const
Definition: RMTPort.cc:219
void setInputRate(long pdusPerSecond)
Definition: RMTPort.cc:335
void unregisterInputQueue(RMTQueue *queue)
Definition: RMTPort.cc:196
const char * SIG_RMT_PortReadyToServe
Definition: RINASignals.cc:135
void scheduleNextRead()
Definition: RMTPort.cc:308
RMTQueue * getLongestQueue(RMTQueueType type) const
Definition: RMTPort.cc:226
RMTQueues & getInputQueues()
Definition: RMTPort.cc:174
std::set< cGate * > northInputGates
Definition: RMTPort.h:183
virtual ConnectionId & getConnId()
Definition: PDU_m.cc:336
void unblockOutput()
Definition: RMTPort.cc:408
void setOutputReady()
Definition: RMTPort.cc:266
unsigned long waitingOnOutput
Definition: RMTPort.h:174
const APN & getApn() const
Getter of APN.
Definition: APNamingInfo.h:142
Flow * flow
Definition: RMTPort.h:180
Definition: PDU.h:42
const char * SIG_STAT_RMTPORT_UP
Definition: RMTPort.cc:25
const char * SIG_RMT_PortReadyForRead
Definition: RINASignals.cc:136
int getLength() const
Definition: RMTQueue.cc:210
virtual std::string generateInputQueueID(PDU *pdu)
double postReadDelay
Definition: RMTPort.h:176
void blockOutput()
Definition: RMTPort.cc:397
cGate * unprotectOut
Definition: RMTPort.h:189
RMTQueues outputQueues
Definition: RMTPort.h:192
RMTQueues & getOutputQueues()
Definition: RMTPort.cc:179
bool outputReady
Definition: RMTPort.h:170
simsignal_t sigStatRMTPortUp
Definition: RMTPort.h:230
const char * MOD_POL_RA_IDGENERATOR
Definition: ExternConsts.cc:69
const char * GATE_SOUTHIO
cChannel * outputChannel
Definition: RMTPort.h:190
std::string dstAppAddr
Definition: RMTPort.h:177
void setInputReady()
Definition: RMTPort.cc:287
unsigned long getWaiting(RMTQueueType direction)
Definition: RMTPort.cc:315
bool blockedOutput
Definition: RMTPort.h:172
cGate * protectOut
Definition: RMTPort.h:187
cGate * getSouthInputGate() const
Definition: RMTPort.cc:209
bool isOutputReady()
Definition: RMTPort.cc:261
cGate * southInputGate
Definition: RMTPort.h:184
void substractWaiting(RMTQueueType direction)
Definition: RMTPort.cc:325
void addWaiting(RMTQueueType direction)
Definition: RMTPort.cc:320
const ConnectionId & getConId() const
Gets read-only Flow's ConnectionId.
Definition: Flow.cc:86
void setInputBusy()
Definition: RMTPort.cc:297
void registerInputQueue(RMTQueue *queue)
Definition: RMTPort.cc:184
cGate * getInputGate() const
Definition: RMTQueue.cc:266
long inputReadRate
Definition: RMTPort.h:175
simsignal_t sigRMTPortReadyToWrite
Definition: RMTPort.h:229
RMTQueue * getQueueById(RMTQueueType type, const char *queueId) const
Definition: RMTPort.cc:244
bool blockedInput
Definition: RMTPort.h:171
RMTQueues inputQueues
Definition: RMTPort.h:193
void blockInput()
Definition: RMTPort.cc:416
bool isInputReady()
Definition: RMTPort.cc:282
std::string dstAppQoS
Definition: RMTPort.h:178
cGate * getSouthOutputGate() const
Definition: RMTPort.cc:214
long getInputRate()
Definition: RMTPort.cc:330
void setFlow(Flow *flow)
Definition: RMTPort.cc:374
const std::string & getName() const
Gets APN string name representation.
Definition: APN.cc:40
void registerOutputQueue(RMTQueue *queue)
Definition: RMTPort.cc:190
const Flow * getFlow() const
Definition: RMTPort.cc:369
void setOutputBusy()
Definition: RMTPort.cc:276
std::vector< RMTQueue * > RMTQueues
Definition: RMTQueue.h:180
const APNamingInfo & getDstApni() const
Gets read-only destination APNamingInfo.
Definition: Flow.cc:102
bool inputReady
Definition: RMTPort.h:169
virtual void handleMessage(cMessage *msg)
Definition: RMTPort.cc:76
virtual void initialize()
Definition: RMTPort.cc:30
cGate * unprotectIn
Definition: RMTPort.h:188