2 // Copyright (c) 2010-2017 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
21 #include "mappedfile.hpp"
22 #include "memreader.hpp"
23 #include "streamsorter.hpp"
25 #include "allocator.hpp"
26 #include "pcapreader.hpp"
27 #include "progress.hpp"
29 StreamSorter::StreamSorter(size_t flowTableSize, const string& workingDirectory, size_t memoryLimit)
30 : flowTableSize(flowTableSize),
31 workingDirectory(workingDirectory),
32 allocator(memoryLimit, 1024*10),
37 void StreamSorter::sort(const string &inputPcapFilePath, const string &outputBinFilePath)
40 sortChunks(inputPcapFilePath);
41 mergeChunks(outputBinFilePath);
44 void StreamSorter::sortChunks(const string &inputPcapFilePath)
46 ofstream outputTempFile;
48 outputTempFile.open(tempFilePath.c_str());
50 if (!outputTempFile.is_open())
56 if (pr.open(inputPcapFilePath)) {
60 PcapPkt::allocator = &allocator;
62 Progress progress(pr.end());
63 uint32_t packetDetail = progress.addDetail("packet count");
65 ft = new FlowTable<pkt_tuple, uint32_t>(flowTableSize);
68 while (pr.read(&pkt)) {
70 if (progress.couldRefresh()) {
71 progress.setProgress(pr.pos());
72 progress.setDetail(packetDetail, pr.getPktReadCount());
75 if (allocator.lowThresholdReached()) {
76 flushStreams(&outputTempFile);
79 progress.setProgress();
80 progress.setDetail(packetDetail, pr.getPktReadCount());
81 progress.refresh(true);
84 flushStreams(&outputTempFile);
85 PcapPkt::allocator = NULL;
86 outputTempFile.close();
90 void StreamSorter::resetStreams()
95 void StreamSorter::flushStreams(ofstream *outputTempFile)
97 size_t flushCount = 0;
98 size_t offset = outputTempFile->tellp();
100 Progress progress(streams.size());
103 progress.setTitle("flush ");
104 for (size_t i = 0; i < streams.size(); ++i) {
105 if (streams[i].hasFlushablePackets()) {
106 streams[i].flush(outputTempFile);
110 if (progress.couldRefresh()) {
111 progress.setProgress(i);
115 progress.setProgress();
116 progress.refresh(true);
119 flushOffsets.push_back(offset);
123 Stream3 *StreamSorter::addNewStream(PcapPkt::L4Proto proto)
125 streams.push_back(Stream3(streamID++, proto));
126 return &streams.back();
129 FlowTable<pkt_tuple, uint32_t>::entry* StreamSorter::getFlowEntry(const PcapPkt &pkt)
131 FlowTable<pkt_tuple, uint32_t>::entry *a;
132 struct pkt_tuple pt = pkt.parsePkt();
133 Stream3 *stream = NULL;
135 a = ft->lookup(pt.flip());
139 stream = addNewStream(pkt.getProto());
141 a = ft->insert(pt, stream->getID(), pkt.ts());
145 if (a->expired(pkt.ts(), streams[a->value].getTimeout())) {
148 stream = addNewStream(pkt.getProto());
150 a = ft->insert(pt, stream->getID(), pkt.ts());
155 void StreamSorter::processPkt(const PcapPkt &pkt)
157 FlowTable<pkt_tuple, uint32_t>::entry *a;
159 a = getFlowEntry(pkt);
161 streams[a->value].addPkt(pkt);
164 void StreamSorter::mergeChunks(const string &outputBinFile)
166 cout << "merging chunks: " << tempFilePath << " to " << outputBinFile << endl;
167 cout << "have " << flushOffsets.size() << " parts to merge" << endl;
170 if (tempFile.open(tempFilePath)) {
171 cerr << "failed to open temp file" << endl;
176 file.open(outputBinFile.c_str());
178 if (!file.is_open()) {
179 cerr << "failed top open file '" << outputBinFile << "'" << endl;
182 MemReader memReader(&tempFile, flushOffsets);
185 Progress progress(memReader.getTotalLength());
187 while (memReader.read(&stream)) {
189 if (progress.couldRefresh()) {
190 progress.setProgress(memReader.consumed());
195 progress.setProgress();
196 progress.refresh(true);
200 void StreamSorter::setTempFileName()
202 tempFilePath = Path(workingDirectory).add("/tmp").str();