7b349ece10e07cf4b2e93170e7b4dbf4a4cf77b1
[vswitchperf.git] / testcases / testcase.py
1 # Copyright 2015 Intel Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #   http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """TestCase base class
15 """
16
17 import csv
18 import os
19 import logging
20 from collections import OrderedDict
21
22 from core.results.results_constants import ResultsConstants
23 import core.component_factory as component_factory
24 from core.loader import Loader
25
26 class TestCase(object):
27     """TestCase base class
28
29     In this basic form runs RFC2544 throughput test
30     """
31     def __init__(self, cfg, results_dir):
32         """Pull out fields from test config
33
34         :param cfg: A dictionary of string-value pairs describing the test
35             configuration. Both the key and values strings use well-known
36             values.
37         :param results_dir: Where the csv formatted results are written.
38         """
39         self._logger = logging.getLogger(__name__)
40         self.name = cfg['Name']
41         self.desc = cfg.get('Description', 'No description given.')
42         self._traffic_type = cfg['Traffic Type']
43         self.deployment = cfg['Deployment']
44         self._collector = cfg['Collector']
45         self._bidir = cfg['biDirectional']
46         self._frame_mod = cfg.get('Frame Modification', None)
47
48         # check if test requires background load and which generator it uses
49         self._load_cfg = cfg.get('Load', None)
50         if self._load_cfg and 'tool' in self._load_cfg:
51             self._loadgen = self._load_cfg['tool']
52         else:
53             # background load is not requested, so use dummy implementation
54             self._loadgen = "Dummy"
55
56         if self._frame_mod:
57             self._frame_mod = self._frame_mod.lower()
58         self._results_dir = results_dir
59         self._multistream = cfg.get('MultiStream', 0)
60
61     def run(self):
62         """Run the test
63
64         All setup and teardown through controllers is included.
65         """
66         self._logger.debug(self.name)
67
68         self._logger.debug("Controllers:")
69         loader = Loader()
70         traffic_ctl = component_factory.create_traffic(
71             self._traffic_type,
72             loader.get_trafficgen_class())
73         vnf_ctl = component_factory.create_vnf(
74             self.deployment,
75             loader.get_vnf_class())
76         vswitch_ctl = component_factory.create_vswitch(
77             self.deployment,
78             loader.get_vswitch_class(),
79             self._bidir)
80         collector_ctl = component_factory.create_collector(
81             self._collector,
82             loader.get_collector_class())
83         loadgen = component_factory.create_loadgen(
84             self._loadgen,
85             self._load_cfg)
86
87         self._logger.debug("Setup:")
88         collector_ctl.log_cpu_stats()
89         with vswitch_ctl, loadgen:
90             with vnf_ctl:
91                 traffic = {'traffic_type': self._traffic_type,
92                            'bidir': self._bidir,
93                            'multistream': self._multistream}
94
95                 vswitch = vswitch_ctl.get_vswitch()
96                 if self._frame_mod == "vlan":
97                     flow = {'table':'2', 'priority':'1000', 'metadata':'2',
98                             'actions': ['push_vlan:0x8100', 'goto_table:3']}
99                     vswitch.add_flow('br0', flow)
100                     flow = {'table':'2', 'priority':'1000', 'metadata':'1',
101                             'actions': ['push_vlan:0x8100', 'goto_table:3']}
102                     vswitch.add_flow('br0', flow)
103
104                 with traffic_ctl:
105                     traffic_ctl.send_traffic(traffic)
106
107         self._logger.debug("Traffic Results:")
108         traffic_ctl.print_results()
109
110         self._logger.debug("Collector Results:")
111         self._logger.debug(collector_ctl.get_results())
112
113         output_file = "result_" + self.name + "_" + self.deployment +".csv"
114
115         TestCase._write_result_to_file(
116             self._append_results(traffic_ctl.get_results()),
117             os.path.join(self._results_dir, output_file))
118
119     def _append_results(self, results):
120         """
121         Method appends mandatory Test Case results to list of dictionaries.
122
123         :param results: list of dictionaries which contains results from
124                 traffic generator.
125
126         :returns: modified list of dictionaries.
127         """
128         for item in results:
129             item[ResultsConstants.ID] = self.name
130             item[ResultsConstants.DEPLOYMENT] = self.deployment
131
132         return results
133
134
135     @staticmethod
136     def _write_result_to_file(results, output):
137         """Write list of dictionaries to a CSV file.
138
139         Each element on list will create separate row in output file.
140         If output file already exists, data will be appended at the end,
141         otherwise it will be created.
142
143         :param results: list of dictionaries.
144         :param output: path to output file.
145         """
146         with open(output, 'a') as csvfile:
147
148             logging.info("Write results to file: " + output)
149             fieldnames = TestCase._get_unique_keys(results)
150
151             writer = csv.DictWriter(csvfile, fieldnames)
152
153             if not csvfile.tell():  # file is now empty
154                 writer.writeheader()
155
156             for result in results:
157                 writer.writerow(result)
158
159
160     @staticmethod
161     def _get_unique_keys(list_of_dicts):
162         """Gets unique key values as ordered list of strings in given dicts
163
164         :param list_of_dicts: list of dictionaries.
165
166         :returns: list of unique keys(strings).
167         """
168         result = OrderedDict()
169         for item in list_of_dicts:
170             for key in item.keys():
171                 result[key] = ''
172
173         return list(result.keys())