RINASim  October 2016
Documentation of framework for OMNeT++
PortsLoadGenerator.cc
Go to the documentation of this file.
1 //
2 // This program is free software: you can redistribute it and/or modify
3 // it under the terms of the GNU Lesser General Public License as published by
4 // the Free Software Foundation, either version 3 of the License, or
5 // (at your option) any later version.
6 //
7 // This program is distributed in the hope that it will be useful,
8 // but WITHOUT ANY WARRANTY; without even the implied warranty of
9 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 // GNU Lesser General Public License for more details.
11 //
12 // You should have received a copy of the GNU Lesser General Public License
13 // along with this program. If not, see http://www.gnu.org/licenses/.
14 //
15 // NOTE: Procedures are sorted by name.
16 
17 #include <PortsLoadGenerator.h>
18 #include "APN.h"
19 
20 #define PORTSLOAD_GENERATOR_TIMEOUT "PDUFG_RateGenerator"
21 #define PORTSLOAD_GENERATOR_UPDATE 0
22 #define PORTSLOAD_GENERATOR_LOCAL 1
23 
25 
27 {
28  // React to self message only.
29  if(msg->isSelfMessage())
30  {
31 
32 #ifdef PORTSLOADGENERATOR_ENHANCED_DEBUG
33  std::ostringstream str;
34 #endif
35 
36  for(NTableIt t = neighbours.begin(); t != neighbours.end(); ++t)
37  {
38  for(NentriesIt e = t->second.begin(); e != t->second.end(); ++e)
39  {
40  for(
41  PortsSetIt p = e->second.begin();
42  p != e->second.end();
43  ++p)
44  {
45  // Acquire the rate of the port.
46  unsigned short rate = (unsigned short)(SCALE_BYTES(
47  rmtp->getByteRate(*p)));
48  const Address addr =
49  Address((*p)->getFlow()->getDstApni().getApn().
50  getName());
51 
52 #ifdef PORTSLOADGENERATOR_ENHANCED_DEBUG
53  str << t->first << ", " <<
54  SCALE_BYTES(rmtp->getByteRate(*p)) <<"\n";
55 #endif
56 
57  // Do not update if the rate does not change.
58  if(rateCacheEntryExists(t->first, e->first))
59  {
60  if(rateCache[t->first][e->first] != rate)
61  {
62  rateCache[t->first][e->first] = rate;
63 
64  // This do not cause an update to be dispatched.
65  rt->insertFlow(addr, t->first, e->first, rate, false);
66 
67  EV << "Rate to " << t->first << ", " << e->first
68  << " updated to " << rate << "." << endl;
69  }
70  else
71  {
72  EV << "Rate to " << t->first << ", " << e->first
73  << " is the same(at " << rate << ")." << endl;
74  }
75  }
76  else
77  {
78  rateCache[t->first][e->first] = rate;
79 
80  // This do not cause an update to be dispatched.
81  rt->insertFlow(addr, t->first, e->first, rate, false);
82 
83  EV << "Rate to " << t->first << ", " << e->first
84  << " updated to " << rate << "." << endl;
85  }
86  }
87  }
88  }
89 
90  // Update the fwd table using the local infos.
92 
93 #ifdef PORTSLOADGENERATOR_ENHANCED_DEBUG
94  cModule * ipcm = check_and_cast<cModule *>(getModuleByPath("^.^.^"));
95  cDisplayString & cs = ipcm->getDisplayString();
96  cs.setTagArg("t", 1, "l");
97  cs.setTagArg("t", 0, str.str().c_str());
98 #endif
99 
100  if(msg->getKind() == PORTSLOAD_GENERATOR_UPDATE)
101  {
102  rt->scheduleUpdate();
103 
104  // Schedule the next routing update.
105  scheduleAt(
106  simTime() + upInt,
108  }
109  else if(msg->getKind() == PORTSLOAD_GENERATOR_LOCAL)
110  {
111  // Schedule the next rate update.
112  scheduleAt(
113  simTime() + rtInt,
115  }
116  }
117 }
118 
120  const Address &addr,
121  const QoSCube& qos,
122  RMTPort * port)
123 {
124  std::string dst = addr.getIpcAddress().getName();
125 
126  // Acquire the rate of the port.
127  unsigned short rate = (unsigned short)SCALE_BYTES(
128  rmtp->getByteRate(port));
129 
130  neighbours[dst][qos.getQosId()].insert(port);
131 
132  if(neighbours[dst][qos.getQosId()].size() == 1)
133  {
134  // Add to cache.
135  rateCache[dst][qos.getQosId()] = rate;
136  rt->insertFlow(addr, dst, qos.getQosId(), rate, true);
137 
138  routingUpdated();
139  }
140 }
141 
143 {
144  rtInt = par("rateInterval");
145  upInt = par("updateInterval");
146 
147  // Obtain a pointer to the forwarding policy.
148  fwd = check_and_cast<SimpleTable::SimpleTable *>(
149  getModuleByPath("^.^.relayAndMux.pduForwardingPolicy"));
150 
151  // Obtain a pointer to the routing policy.
152  rt = check_and_cast<IntPortsLoadRouting *>(
153  getModuleByPath("^.^.routingPolicy"));
154 
155  // Obtain a pointer to the queue monitor policy.
156  rmtp = check_and_cast<PortsLoadMonitor *>(
157  getModuleByPath("^.^.relayAndMux.queueMonitorPolicy"));
158 
159  // Obtain a pointer to the DIF allocator module.
160  difA = check_and_cast<DA *>(getModuleByPath("^.^.^.difAllocator.da"));
161 
162  // 0 means do not send periodic routing updates.
163  if(upInt > 0)
164  {
165  // Start the route update timeout.
166  scheduleAt(
167  simTime() + upInt,
169  }
170 
171  // 0 means do not do mix routing with local and remote info.
172  // Use only routing updates.
173  if(rtInt > 0)
174  {
175  // Start the rate updating timeout.
176  scheduleAt(
177  simTime() + rtInt,
179  }
180 }
181 
183  std::string dest,
184  std::string qos)
185 {
186  RateIter ri = rateCache.find(dest);
187 
188  // First level entry found.
189  if(ri != rateCache.end())
190  {
191  QTRIter qi = ri->second.find(qos);
192 
193  // Second level entry found.
194  if(qi != ri->second.end())
195  {
196  return true;
197  }
198  }
199 
200  return false;
201 }
202 
204  const Address &addr,
205  const QoSCube& qos,
206  RMTPort * port)
207 {
208  std::string dst = addr.getIpcAddress().getName();
209  neighbours[dst][qos.getQosId()].erase(port);
210 
211  if(neighbours[dst][qos.getQosId()].size() <= 0)
212  {
213  neighbours[dst].erase(qos.getQosId());
214  rt->removeFlow(addr, dst, qos.getQosId(), true);
215 
216  if(neighbours[dst].size() <= 0)
217  {
218  neighbours.erase(dst);
219  }
220 
221  if(rateCacheEntryExists(dst, qos.getQosId()))
222  {
223  // Removes the entry from the cache.
224  rateCache[dst].erase(qos.getQosId());
225  rateCache.erase(dst);
226  }
227 
228  routingUpdated();
229  }
230 }
231 
233 {
234  entries2Next changes = rt->getChanges();
235 
236  for(entries2NextIt it = changes.begin(); it!= changes.end(); it++)
237  {
238  qosPaddr dst = it->first;
239  std::string nextHop = it->second;
240  RMTPort * p = NULL;
241 
242  if(nextHop != "")
243  {
244  NTableIt n = neighbours.find(nextHop);
245 
246  if(n != neighbours.end())
247  {
248  NentriesIt pit = n->second.find(dst.first);
249 
250  if(pit != n->second.end())
251  {
252  if(pit->second.size()>0)
253  {
254  p = *(pit->second.begin());
255  }
256  }
257  }
258  }
259 
260  if(p == NULL)
261  {
262  fwd->remove(dst.second, dst.first);
263  }
264  else
265  {
266  fwd->insert(dst.second, dst.first, p);
267  }
268  }
269 }
virtual void onPolicyInit()
Nentries::iterator NentriesIt
#define PORTSLOAD_GENERATOR_TIMEOUT
void remove(const std::string &addr, const std::string &qos)
SimpleTable::SimpleTable * fwd
NTable::iterator NTableIt
#define PORTSLOAD_GENERATOR_UPDATE
RateMap::iterator RateIter
PortsSet::iterator PortsSetIt
const APN & getIpcAddress() const
Getter of IPC Process address which should be unambiguous within DIF.
Definition: Address.cc:83
virtual void removedFlow(const Address &addr, const QoSCube &qos, RMTPort *port)
QosToRateMap::iterator QTRIter
virtual void insertFlow(const Address &addr, const std::string &dst, const std::string &qos, const unsigned short &metric, bool sendUpdate)=0
virtual entries2Next getChanges()=0
virtual void removeFlow(const Address &addr, const std::string &dst, const std::string &qos, bool sendUpdate)=0
std::map< qosPaddr, std::string > entries2Next
PortsLoadMonitor * rmtp
Definition: DA.h:43
virtual void scheduleUpdate()=0
#define PORTSLOAD_GENERATOR_LOCAL
#define SCALE_BYTES(x)
virtual void handleMessage(cMessage *msg)
Class representing QoSCube with all its properties that is primarily used by FA, RMT and RA Specifica...
Definition: QoSCube.h:57
Define_Module(PortsLoadGenerator)
void insert(const std::string &addr, const std::string &qos, RMTPort *port)
std::string getQosId() const
Gets QoSCube identifier.
Definition: QoSCube.cc:364
entries2Next::iterator entries2NextIt
int64_t getByteRate(RMTPort *port)
std::pair< std::string, std::string > qosPaddr
IntPortsLoadRouting * rt
const std::string & getName() const
Gets APN string name representation.
Definition: APN.cc:40
virtual void routingUpdated()
Address class holds IPC Process identification.
Definition: Address.h:42
bool rateCacheEntryExists(std::string dest, std::string qos)
virtual void insertedFlow(const Address &addr, const QoSCube &qos, RMTPort *port)