Fix Idle count
[samplevnf.git] / VNFs / DPPD-PROX / tools / flow_extract / streamextract.cpp
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <inttypes.h>
18 #include <string>
19 #include <cstdio>
20 #include <iostream>
21 #include <sys/stat.h>
22 #include <sys/types.h>
23 #include <sstream>
24 #include <set>
25 #include <arpa/inet.h>
26 #include <fcntl.h>
27 #include <unistd.h>
28 #include <sys/mman.h>
29 #include <cerrno>
30 #include <cstdlib>
31 #include <map>
32
33 #include "path.hpp"
34 #include "bundle.hpp"
35 #include "stream.hpp"
36 #include "stream2.hpp"
37 #include "allocator.hpp"
38 #include "timestamp.hpp"
39 #include "streamextract.hpp"
40 #include "pcapreader.hpp"
41 #include "pcapwriter.hpp"
42 #include "flowtable.hpp"
43 #include "stream3.hpp"
44 #include "netsocket.hpp"
45 #include "pcappktref.hpp"
46 #include "progress.hpp"
47 #include "mappedfile.hpp"
48 #include "streamsorter.hpp"
49
50 using namespace std;
51
52 static bool is_dir(const string& path_dir_out)
53 {
54         struct stat s = { 0 };
55
56         if (stat(path_dir_out.c_str(), &s)) {
57                 return false;
58         }
59
60         return s.st_mode & S_IFDIR;
61 }
62
63 StreamExtract::StreamExtract(const ProgramConfig &cfg)
64         : ft2(cfg.flowTableSize),
65           streamSorter(cfg.flowTableSize, cfg.path_dir_out, 1024UL*1024*1024*8),
66           cfg(cfg)
67 {
68 }
69
70 vector<Bundle> StreamExtract::createBundles(const string& streamPath)
71 {
72         map<uint32_t, Bundle>::iterator iterBundle;
73         map<uint32_t, Bundle> bundles;
74         set<uint32_t> servers;
75
76         Stream2 s;
77         ifstream binIn;
78
79         binIn.open(streamPath.c_str());
80         binIn.seekg(0, binIn.end);
81         Progress progress(binIn.tellg());
82         binIn.seekg(0, binIn.beg);
83
84         while (!s.fromFile(&binIn)) {
85                 if (progress.couldRefresh()) {
86                         progress.setProgress(binIn.tellg());
87                         progress.refresh();
88                 }
89                 if (!s.streamHdr.completedTCP)
90                         continue;
91                 if (!s.streamHdr.serverHdrLen)
92                         continue;
93                 /* The current implementation does not support clients
94                    that are also servers. */
95                 servers.insert(s.streamHdr.serverIP);
96                 if (servers.find(s.streamHdr.clientIP) != servers.end())
97                         continue;
98
99                 /* Since each application is represented as a path
100                    graph (there is only one reply for a given request
101                    and only one request after a given reply), each
102                    application must run on a unique server. For this
103                    reason, check if the socket on the server already
104                    is occupied and if so, keep incrementing the socket
105                    until the collision is resolved. */
106                 iterBundle = bundles.find(s.streamHdr.clientIP);
107
108                 if (iterBundle == bundles.end()) {
109                         bundles.insert(make_pair(s.streamHdr.clientIP, Bundle()));
110                         iterBundle = bundles.find(s.streamHdr.clientIP);
111                 }
112
113                 (*iterBundle).second.addStream(s.streamHdr.streamId, s.getServerNetSocket().port);
114         }
115
116         progress.setProgress();
117         progress.refresh(true);
118
119         binIn.close();
120
121         vector<Bundle> ret;
122
123         ret.reserve(bundles.size());
124
125         for (map<uint32_t, Bundle>::const_iterator i = bundles.begin(); i != bundles.end(); ++i)
126                 ret.push_back(i->second);
127
128         return ret;
129 }
130
131 set<uint32_t> StreamExtract::getBundleStreamIDs(const vector<Bundle*>& bundleSamples)
132 {
133         set<uint32_t> streamIDs;
134
135         for (size_t i = 0; i < bundleSamples.size(); ++i) {
136                 const vector<uint32_t> &bundleStreamIDs = bundleSamples[i]->getStream();
137
138                 for (vector<uint32_t>::const_iterator j = bundleStreamIDs.begin(); j != bundleStreamIDs.end(); ++j) {
139                         streamIDs.insert(*j);
140                 }
141         }
142
143         return streamIDs;
144 }
145
146 static size_t getRandom(size_t limit)
147 {
148         size_t r = rand();
149         size_t rand_limit = (RAND_MAX/limit)*limit;
150
151         while (r > rand_limit)
152                 r = rand();
153
154         return r % limit;
155 }
156
157 static void removeFill(vector<Bundle*> *from, size_t idx)
158 {
159         Bundle *last = from->back();
160         from->pop_back();
161
162         if (idx != from->size())
163                 (*from)[idx] = last;
164 }
165
166 static vector<Bundle*> takeSamples(vector<Bundle>& bundles, size_t sampleCount)
167 {
168         vector<Bundle*> bundleSamples;
169
170         bundleSamples.reserve(bundles.size());
171
172         cout << "Sampling " << sampleCount << " bundles out of " << bundles.size() << endl;
173         for (size_t i = 0; i < bundles.size(); ++i)
174                 bundleSamples.push_back(&bundles[i]);
175
176         srand(1000);
177         while (bundleSamples.size() > sampleCount) {
178                 size_t r = getRandom(bundleSamples.size());
179                 removeFill(&bundleSamples, r);
180         }
181         return bundleSamples;
182 }
183
184 static size_t replaceWithRunningTotals(vector<size_t> *streamLength)
185 {
186         size_t runningTotal = 0;
187         for (size_t i = 0; i < streamLength->size(); ++i) {
188                 size_t len = (*streamLength)[i] + sizeof(uint32_t);
189                 (*streamLength)[i] = runningTotal;
190                 runningTotal += len;
191         }
192         return runningTotal;
193 }
194
195 static void printPorts(const vector<Bundle> &bundles)
196 {
197         set<uint32_t> streamIDs;
198
199         for (size_t i = 0; i < bundles.size(); ++i) {
200                 const vector<uint32_t> &ports = bundles[i].getPorts();
201
202                 for (size_t j = 0; j < ports.size(); ++j) {
203                         if (j + 1 == ports.size())
204                                 cout << ports[j] << ",END" << endl;
205                         else
206                                 cout << ports[j] << "," << ports[j +1] << endl;
207                 }
208         }
209 }
210
211 string StreamExtract::createStreamPcapFileName(int id)
212 {
213         stringstream ss;
214
215         ss << cfg.path_dir_out << "/s" << id << ".pcap";
216
217         return ss.str();
218 }
219
220 int StreamExtract::writeToPcaps(const string &sourceFilePath, const set<uint32_t> &streamIDs)
221 {
222         set<uint32_t>::const_iterator i = streamIDs.begin();
223
224         MappedFile mappedFile;
225         if (mappedFile.open(sourceFilePath)) {
226                 cerr << "Failed to open file " << sourceFilePath << ":" << strerror(errno) << endl;
227                 return -1;
228         }
229
230         PcapPkt::allocator = NULL;
231
232         Progress progress((uint64_t)mappedFile.getMapEnd() - (uint64_t)mappedFile.getMapBeg());
233         cout << "Writing  " << streamIDs.size() << " streams to pcaps" << endl;
234         uint8_t *data2 = mappedFile.getMapBeg();
235         while (data2 < mappedFile.getMapEnd()) {
236                 uint32_t id = *reinterpret_cast<uint32_t *>(data2);
237
238                 data2 += sizeof(id);
239                 uint32_t pktCount = *reinterpret_cast<uint32_t *>(data2);
240                 data2 += sizeof(pktCount);
241                 Stream s(id, pktCount);
242                 while (pktCount--) {
243                         PcapPkt p(data2);
244
245                         data2 += p.memSize();
246                         s.addPkt(p);
247                 }
248
249                 while (i != streamIDs.end() && (*i) < id)
250                         i++;
251                 if (i == streamIDs.end())
252                         break;
253                 if (*i > id)
254                         continue;
255
256                 const string pcapPath = createStreamPcapFileName(id);
257
258                 s.toPcap(pcapPath);
259                 if (progress.couldRefresh()) {
260                         progress.setProgress((uint64_t)data2 - (uint64_t)mappedFile.getMapBeg());
261                         progress.refresh();
262                         mappedFile.sync();
263                 }
264         }
265
266         progress.setProgress(data2 - mappedFile.getMapBeg());
267         progress.refresh(true);
268
269         mappedFile.close();
270         return 0;
271 }
272
273 int StreamExtract::writeToLua(const string& binFilePath, const Path &smallFinalBin, const string& luaFilePath, const string &orderedTemp)
274 {
275         vector<Bundle> bundles = createBundles(binFilePath);
276         vector<Bundle*> bundleSamples = takeSamples(bundles, cfg.sampleCount);
277         set<uint32_t> streamIDs = getBundleStreamIDs(bundleSamples);
278
279         if (cfg.write_pcaps)
280                 writeToPcaps(orderedTemp, streamIDs);
281
282         ofstream outLua;
283         ofstream outSmallBin;
284         outLua.open(luaFilePath.c_str());
285         outLua << "bf = \""<< smallFinalBin.getFileName() << "\"" << endl;
286         outLua << "s = {}\n";
287         set<uint32_t>::iterator i = streamIDs.begin();
288
289         set<NetSocket> serverSockets;
290         ifstream binIn;
291         Stream2 s;
292
293         outSmallBin.open(smallFinalBin.str().c_str());
294         binIn.open(binFilePath.c_str());
295         while (!s.fromFile(&binIn)) {
296                 while (i != streamIDs.end() && (*i) < s.streamHdr.streamId)
297                         i++;
298                 if (i == streamIDs.end())
299                         break;
300                 if (*i > s.streamHdr.streamId)
301                         continue;
302                 s.calcOffsets(&outSmallBin);
303                 s.toFile(&outSmallBin);
304                 while (serverSockets.find(s.getServerNetSocket()) != serverSockets.end()) {
305                         NetSocket ns = s.getServerNetSocket();
306
307                         ns.port++;
308                         s.setServerNetSocket(ns);
309                 }
310                 serverSockets.insert(s.getServerNetSocket());
311
312                 s.toLua(&outLua, "bf", "s");
313         }
314         binIn.close();
315
316         uint32_t bundleCount = 0;
317
318         outLua << "bundles = {}" << endl;
319         for (size_t i = 0; i < bundleSamples.size(); ++i) {
320                 bundleSamples[i]->toLua(&outLua, "s", ++bundleCount);
321         }
322         outLua << "return bundles" << endl;
323         outLua.close();
324         return 0;
325 }
326
327 int StreamExtract::writeFinalBin(const string& sourceFilePath, const string& destFilePath)
328 {
329         MappedFile mappedFile;
330         if (mappedFile.open(sourceFilePath)) {
331                 cerr << "Failed to open file " << sourceFilePath << ":" << strerror(errno) << endl;
332                 return -1;
333         }
334         ofstream binOut;
335
336         binOut.open(destFilePath.c_str());
337         PcapPkt::allocator = NULL;
338
339         Progress progress((uint64_t)mappedFile.getMapEnd() - (uint64_t)mappedFile.getMapBeg());
340
341         int streamCount = 0;
342         uint8_t *data2 = mappedFile.getMapBeg();
343         while (data2 < mappedFile.getMapEnd()) {
344                 uint32_t id = *reinterpret_cast<uint32_t *>(data2);
345
346                 data2 += sizeof(id);
347                 uint32_t pktCount = *reinterpret_cast<uint32_t *>(data2);
348                 data2 += sizeof(pktCount);
349                 Stream s(id, pktCount);
350                 while (pktCount--) {
351                         PcapPkt p(data2);
352
353                         data2 += p.memSize();
354                         s.addPkt(p);
355                 }
356                 s.toFile(&binOut);
357                 streamCount++;
358                 if (progress.couldRefresh()) {
359                         progress.setProgress((uint64_t)data2 - (uint64_t)mappedFile.getMapBeg());
360                         progress.refresh();
361                         mappedFile.sync();
362                 }
363         }
364
365         progress.setProgress(data2 - mappedFile.getMapBeg());
366         progress.refresh(true);
367
368         binOut.close();
369         mappedFile.close();
370         return 0;
371 }
372
373 int StreamExtract::run()
374 {
375         Path p(cfg.path_dir_out);
376         p.mkdir();
377
378         string orderedTemp = p.add("/a").str();
379
380         string finalBin = p.add("/b").str();
381         Path smallfinalBin = p.add("/data.bin").str();
382         string luaFile = p.add("/cfg.lua").str();
383
384         cout << "Writing to directory '" << p.str() << "'" << endl;
385         cout << "Ordered streams '" << orderedTemp << "'" << endl;
386         cout << "Final binary output '" << finalBin << "'" << endl;
387         cout << "lua file '" << luaFile << "' will contain " << cfg.sampleCount << " bundles" << endl;
388
389         if (cfg.run_first_step) {
390                 cout << "starting sorting" << endl;
391                 streamSorter.sort(cfg.path_file_in_pcap, orderedTemp);
392                 cout << "writing final binary file (converting format)" << endl;
393                 if (writeFinalBin(orderedTemp, finalBin))
394                         return -1;
395         } else {
396                 cout << "Skipping first step" << endl;
397                 if (!Path(finalBin).isFile()) {
398                         cerr << "File is missing:" << finalBin << endl;
399                         return -1;
400                 }
401         }
402         cout << "writing Lua '" << luaFile << "'" << endl;
403         if (writeToLua(finalBin, smallfinalBin, luaFile, orderedTemp))
404                 return -1;
405         return 0;
406 }