2 // Copyright (c) 2010-2017 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
22 #include <sys/types.h>
25 #include <arpa/inet.h>
36 #include "stream2.hpp"
37 #include "allocator.hpp"
38 #include "timestamp.hpp"
39 #include "streamextract.hpp"
40 #include "pcapreader.hpp"
41 #include "pcapwriter.hpp"
42 #include "flowtable.hpp"
43 #include "stream3.hpp"
44 #include "netsocket.hpp"
45 #include "pcappktref.hpp"
46 #include "progress.hpp"
47 #include "mappedfile.hpp"
48 #include "streamsorter.hpp"
52 static bool is_dir(const string& path_dir_out)
54 struct stat s = { 0 };
56 if (stat(path_dir_out.c_str(), &s)) {
60 return s.st_mode & S_IFDIR;
63 StreamExtract::StreamExtract(const ProgramConfig &cfg)
64 : ft2(cfg.flowTableSize),
65 streamSorter(cfg.flowTableSize, cfg.path_dir_out, 1024UL*1024*1024*8),
70 vector<Bundle> StreamExtract::createBundles(const string& streamPath)
72 map<uint32_t, Bundle>::iterator iterBundle;
73 map<uint32_t, Bundle> bundles;
74 set<uint32_t> servers;
79 binIn.open(streamPath.c_str());
80 binIn.seekg(0, binIn.end);
81 Progress progress(binIn.tellg());
82 binIn.seekg(0, binIn.beg);
84 while (!s.fromFile(&binIn)) {
85 if (progress.couldRefresh()) {
86 progress.setProgress(binIn.tellg());
89 if (!s.streamHdr.completedTCP)
91 if (!s.streamHdr.serverHdrLen)
93 /* The current implementation does not support clients
94 that are also servers. */
95 servers.insert(s.streamHdr.serverIP);
96 if (servers.find(s.streamHdr.clientIP) != servers.end())
99 /* Since each application is represented as a path
100 graph (there is only one reply for a given request
101 and only one request after a given reply), each
102 application must run on a unique server. For this
103 reason, check if the socket on the server already
104 is occupied and if so, keep incrementing the socket
105 until the collision is resolved. */
106 iterBundle = bundles.find(s.streamHdr.clientIP);
108 if (iterBundle == bundles.end()) {
109 bundles.insert(make_pair(s.streamHdr.clientIP, Bundle()));
110 iterBundle = bundles.find(s.streamHdr.clientIP);
113 (*iterBundle).second.addStream(s.streamHdr.streamId, s.getServerNetSocket().port);
116 progress.setProgress();
117 progress.refresh(true);
123 ret.reserve(bundles.size());
125 for (map<uint32_t, Bundle>::const_iterator i = bundles.begin(); i != bundles.end(); ++i)
126 ret.push_back(i->second);
131 set<uint32_t> StreamExtract::getBundleStreamIDs(const vector<Bundle*>& bundleSamples)
133 set<uint32_t> streamIDs;
135 for (size_t i = 0; i < bundleSamples.size(); ++i) {
136 const vector<uint32_t> &bundleStreamIDs = bundleSamples[i]->getStream();
138 for (vector<uint32_t>::const_iterator j = bundleStreamIDs.begin(); j != bundleStreamIDs.end(); ++j) {
139 streamIDs.insert(*j);
146 static size_t getRandom(size_t limit)
149 size_t rand_limit = (RAND_MAX/limit)*limit;
151 while (r > rand_limit)
157 static void removeFill(vector<Bundle*> *from, size_t idx)
159 Bundle *last = from->back();
162 if (idx != from->size())
166 static vector<Bundle*> takeSamples(vector<Bundle>& bundles, size_t sampleCount)
168 vector<Bundle*> bundleSamples;
170 bundleSamples.reserve(bundles.size());
172 cout << "Sampling " << sampleCount << " bundles out of " << bundles.size() << endl;
173 for (size_t i = 0; i < bundles.size(); ++i)
174 bundleSamples.push_back(&bundles[i]);
177 while (bundleSamples.size() > sampleCount) {
178 size_t r = getRandom(bundleSamples.size());
179 removeFill(&bundleSamples, r);
181 return bundleSamples;
184 static size_t replaceWithRunningTotals(vector<size_t> *streamLength)
186 size_t runningTotal = 0;
187 for (size_t i = 0; i < streamLength->size(); ++i) {
188 size_t len = (*streamLength)[i] + sizeof(uint32_t);
189 (*streamLength)[i] = runningTotal;
195 static void printPorts(const vector<Bundle> &bundles)
197 set<uint32_t> streamIDs;
199 for (size_t i = 0; i < bundles.size(); ++i) {
200 const vector<uint32_t> &ports = bundles[i].getPorts();
202 for (size_t j = 0; j < ports.size(); ++j) {
203 if (j + 1 == ports.size())
204 cout << ports[j] << ",END" << endl;
206 cout << ports[j] << "," << ports[j +1] << endl;
211 string StreamExtract::createStreamPcapFileName(int id)
215 ss << cfg.path_dir_out << "/s" << id << ".pcap";
220 int StreamExtract::writeToPcaps(const string &sourceFilePath, const set<uint32_t> &streamIDs)
222 set<uint32_t>::const_iterator i = streamIDs.begin();
224 MappedFile mappedFile;
225 if (mappedFile.open(sourceFilePath)) {
226 cerr << "Failed to open file " << sourceFilePath << ":" << strerror(errno) << endl;
230 PcapPkt::allocator = NULL;
232 Progress progress((uint64_t)mappedFile.getMapEnd() - (uint64_t)mappedFile.getMapBeg());
233 cout << "Writing " << streamIDs.size() << " streams to pcaps" << endl;
234 uint8_t *data2 = mappedFile.getMapBeg();
235 while (data2 < mappedFile.getMapEnd()) {
236 uint32_t id = *reinterpret_cast<uint32_t *>(data2);
239 uint32_t pktCount = *reinterpret_cast<uint32_t *>(data2);
240 data2 += sizeof(pktCount);
241 Stream s(id, pktCount);
245 data2 += p.memSize();
249 while (i != streamIDs.end() && (*i) < id)
251 if (i == streamIDs.end())
256 const string pcapPath = createStreamPcapFileName(id);
259 if (progress.couldRefresh()) {
260 progress.setProgress((uint64_t)data2 - (uint64_t)mappedFile.getMapBeg());
266 progress.setProgress(data2 - mappedFile.getMapBeg());
267 progress.refresh(true);
273 int StreamExtract::writeToLua(const string& binFilePath, const Path &smallFinalBin, const string& luaFilePath, const string &orderedTemp)
275 vector<Bundle> bundles = createBundles(binFilePath);
276 vector<Bundle*> bundleSamples = takeSamples(bundles, cfg.sampleCount);
277 set<uint32_t> streamIDs = getBundleStreamIDs(bundleSamples);
280 writeToPcaps(orderedTemp, streamIDs);
283 ofstream outSmallBin;
284 outLua.open(luaFilePath.c_str());
285 outLua << "bf = \""<< smallFinalBin.getFileName() << "\"" << endl;
286 outLua << "s = {}\n";
287 set<uint32_t>::iterator i = streamIDs.begin();
289 set<NetSocket> serverSockets;
293 outSmallBin.open(smallFinalBin.str().c_str());
294 binIn.open(binFilePath.c_str());
295 while (!s.fromFile(&binIn)) {
296 while (i != streamIDs.end() && (*i) < s.streamHdr.streamId)
298 if (i == streamIDs.end())
300 if (*i > s.streamHdr.streamId)
302 s.calcOffsets(&outSmallBin);
303 s.toFile(&outSmallBin);
304 while (serverSockets.find(s.getServerNetSocket()) != serverSockets.end()) {
305 NetSocket ns = s.getServerNetSocket();
308 s.setServerNetSocket(ns);
310 serverSockets.insert(s.getServerNetSocket());
312 s.toLua(&outLua, "bf", "s");
316 uint32_t bundleCount = 0;
318 outLua << "bundles = {}" << endl;
319 for (size_t i = 0; i < bundleSamples.size(); ++i) {
320 bundleSamples[i]->toLua(&outLua, "s", ++bundleCount);
322 outLua << "return bundles" << endl;
327 int StreamExtract::writeFinalBin(const string& sourceFilePath, const string& destFilePath)
329 MappedFile mappedFile;
330 if (mappedFile.open(sourceFilePath)) {
331 cerr << "Failed to open file " << sourceFilePath << ":" << strerror(errno) << endl;
336 binOut.open(destFilePath.c_str());
337 PcapPkt::allocator = NULL;
339 Progress progress((uint64_t)mappedFile.getMapEnd() - (uint64_t)mappedFile.getMapBeg());
342 uint8_t *data2 = mappedFile.getMapBeg();
343 while (data2 < mappedFile.getMapEnd()) {
344 uint32_t id = *reinterpret_cast<uint32_t *>(data2);
347 uint32_t pktCount = *reinterpret_cast<uint32_t *>(data2);
348 data2 += sizeof(pktCount);
349 Stream s(id, pktCount);
353 data2 += p.memSize();
358 if (progress.couldRefresh()) {
359 progress.setProgress((uint64_t)data2 - (uint64_t)mappedFile.getMapBeg());
365 progress.setProgress(data2 - mappedFile.getMapBeg());
366 progress.refresh(true);
373 int StreamExtract::run()
375 Path p(cfg.path_dir_out);
378 string orderedTemp = p.add("/a").str();
380 string finalBin = p.add("/b").str();
381 Path smallfinalBin = p.add("/data.bin").str();
382 string luaFile = p.add("/cfg.lua").str();
384 cout << "Writing to directory '" << p.str() << "'" << endl;
385 cout << "Ordered streams '" << orderedTemp << "'" << endl;
386 cout << "Final binary output '" << finalBin << "'" << endl;
387 cout << "lua file '" << luaFile << "' will contain " << cfg.sampleCount << " bundles" << endl;
389 if (cfg.run_first_step) {
390 cout << "starting sorting" << endl;
391 streamSorter.sort(cfg.path_file_in_pcap, orderedTemp);
392 cout << "writing final binary file (converting format)" << endl;
393 if (writeFinalBin(orderedTemp, finalBin))
396 cout << "Skipping first step" << endl;
397 if (!Path(finalBin).isFile()) {
398 cerr << "File is missing:" << finalBin << endl;
402 cout << "writing Lua '" << luaFile << "'" << endl;
403 if (writeToLua(finalBin, smallfinalBin, luaFile, orderedTemp))