Merge "[l2l3 stack] implements new nd state machine & nd buffering"
[samplevnf.git] / VNFs / DPPD-PROX / tools / flow_extract / streamsorter.cpp
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <iostream>
18 #include <fstream>
19 #include <cstdlib>
20
21 #include "mappedfile.hpp"
22 #include "memreader.hpp"
23 #include "streamsorter.hpp"
24 #include "path.hpp"
25 #include "allocator.hpp"
26 #include "pcapreader.hpp"
27 #include "progress.hpp"
28
29 StreamSorter::StreamSorter(size_t flowTableSize, const string& workingDirectory, size_t memoryLimit)
30         : flowTableSize(flowTableSize),
31           workingDirectory(workingDirectory),
32           allocator(memoryLimit, 1024*10),
33           streamID(0)
34 {
35 }
36
37 void StreamSorter::sort(const string &inputPcapFilePath, const string &outputBinFilePath)
38 {
39         setTempFileName();
40         sortChunks(inputPcapFilePath);
41         mergeChunks(outputBinFilePath);
42 }
43
44 void StreamSorter::sortChunks(const string &inputPcapFilePath)
45 {
46         ofstream outputTempFile;
47
48         outputTempFile.open(tempFilePath.c_str());
49
50         if (!outputTempFile.is_open())
51                 return ;
52
53         PcapReader pr;
54         PcapPkt pkt;
55
56         if (pr.open(inputPcapFilePath)) {
57                 pr.getError();
58                 return;
59         }
60         PcapPkt::allocator = &allocator;
61
62         Progress progress(pr.end());
63         uint32_t packetDetail = progress.addDetail("packet count");
64
65         ft = new FlowTable<pkt_tuple, uint32_t>(flowTableSize);
66         resetStreams();
67
68         while (pr.read(&pkt)) {
69                 processPkt(pkt);
70                 if (progress.couldRefresh()) {
71                         progress.setProgress(pr.pos());
72                         progress.setDetail(packetDetail, pr.getPktReadCount());
73                         progress.refresh();
74                 }
75                 if (allocator.lowThresholdReached()) {
76                         flushStreams(&outputTempFile);
77                 }
78         }
79         progress.setProgress();
80         progress.setDetail(packetDetail, pr.getPktReadCount());
81         progress.refresh(true);
82
83         pr.close();
84         flushStreams(&outputTempFile);
85         PcapPkt::allocator = NULL;
86         outputTempFile.close();
87         delete ft;
88 }
89
90 void StreamSorter::resetStreams()
91 {
92         streams.clear();
93 }
94
95 void StreamSorter::flushStreams(ofstream *outputTempFile)
96 {
97         size_t flushCount = 0;
98         size_t offset = outputTempFile->tellp();
99
100         Progress progress(streams.size());
101
102         cout << endl;
103         progress.setTitle("flush ");
104         for (size_t i = 0; i < streams.size(); ++i) {
105                 if (streams[i].hasFlushablePackets()) {
106                         streams[i].flush(outputTempFile);
107                         flushCount++;
108                 }
109
110                 if (progress.couldRefresh()) {
111                         progress.setProgress(i);
112                         progress.refresh();
113                 }
114         }
115         progress.setProgress();
116         progress.refresh(true);
117
118         if (flushCount)
119                 flushOffsets.push_back(offset);
120         allocator.reset();
121 }
122
123 Stream3 *StreamSorter::addNewStream(PcapPkt::L4Proto proto)
124 {
125         streams.push_back(Stream3(streamID++, proto));
126         return &streams.back();
127 }
128
129 FlowTable<pkt_tuple, uint32_t>::entry* StreamSorter::getFlowEntry(const PcapPkt &pkt)
130 {
131         FlowTable<pkt_tuple, uint32_t>::entry *a;
132         struct pkt_tuple pt = pkt.parsePkt();
133         Stream3 *stream = NULL;
134
135         a = ft->lookup(pt.flip());
136         if (!a) {
137                 a = ft->lookup(pt);
138                 if (!a) {
139                         stream = addNewStream(pkt.getProto());
140
141                         a = ft->insert(pt, stream->getID(), pkt.ts());
142                 }
143         }
144
145         if (a->expired(pkt.ts(), streams[a->value].getTimeout())) {
146                 ft->remove(a);
147
148                 stream = addNewStream(pkt.getProto());
149
150                 a = ft->insert(pt, stream->getID(), pkt.ts());
151         }
152         return a;
153 }
154
155 void StreamSorter::processPkt(const PcapPkt &pkt)
156 {
157         FlowTable<pkt_tuple, uint32_t>::entry *a;
158
159         a = getFlowEntry(pkt);
160         a->tv = pkt.ts();
161         streams[a->value].addPkt(pkt);
162 }
163
164 void StreamSorter::mergeChunks(const string &outputBinFile)
165 {
166         cout << "merging chunks: " << tempFilePath << " to " << outputBinFile << endl;
167         cout << "have " << flushOffsets.size() << " parts to merge" << endl;
168         MappedFile tempFile;
169
170         if (tempFile.open(tempFilePath)) {
171                 cerr << "failed to open temp file" << endl;
172                 return;
173         }
174         ofstream file;
175
176         file.open(outputBinFile.c_str());
177
178         if (!file.is_open()) {
179                 cerr << "failed top open file '" << outputBinFile << "'" << endl;
180                 return;
181         }
182         MemReader memReader(&tempFile, flushOffsets);
183         Stream3 stream;
184
185         Progress progress(memReader.getTotalLength());
186
187         while (memReader.read(&stream)) {
188                 stream.flush(&file);
189                 if (progress.couldRefresh()) {
190                         progress.setProgress(memReader.consumed());
191                         progress.refresh();
192                 }
193         }
194
195         progress.setProgress();
196         progress.refresh(true);
197         tempFile.close();
198 }
199
200 void StreamSorter::setTempFileName()
201 {
202         tempFilePath = Path(workingDirectory).add("/tmp").str();
203 }