Merge "test_kubernetes: mock file operations in test_ssh_key"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / iniparser.py
1 # Copyright 2012 OpenStack Foundation
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15
16 class ParseError(Exception):
17     def __init__(self, message, lineno, line):
18         self.msg = message
19         self.line = line
20         self.lineno = lineno
21
22     def __str__(self):
23         return 'at line %d, %s: %r' % (self.lineno, self.msg, self.line)
24
25
26 class BaseParser(object):
27     lineno = 0
28     parse_exc = ParseError
29
30     def _assignment(self, key, value):
31         self.assignment(key, value)
32         return None, []
33
34     def _get_section(self, line):
35         if not line.endswith(']'):
36             return self.error_no_section_end_bracket(line)
37         if len(line) <= 2:
38             return self.error_no_section_name(line)
39
40         return line[1:-1]
41
42     def _split_key_value(self, line):
43         colon = line.find(':')
44         equal = line.find('=')
45         if colon < 0 and equal < 0:
46             return line.strip(), '@'
47
48         if colon < 0 or (equal >= 0 and equal < colon):
49             key, value = line[:equal], line[equal + 1:]
50         else:
51             key, value = line[:colon], line[colon + 1:]
52
53         value = value.strip()
54         if value and value[0] == value[-1] and value.startswith(("\"", "'")):
55             value = value[1:-1]
56         return key.strip(), [value]
57
58     def parse(self, lineiter):
59         key = None
60         value = []
61
62         for line in lineiter:
63             self.lineno += 1
64
65             line = line.rstrip()
66             lines = line.split(';')
67             line = lines[0]
68             if not line:
69                 # Blank line, ends multi-line values
70                 if key:
71                     key, value = self._assignment(key, value)
72                 continue
73             elif line.startswith((' ', '\t')):
74                 # Continuation of previous assignment
75                 if key is None:
76                     self.error_unexpected_continuation(line)
77                 else:
78                     value.append(line.lstrip())
79                 continue
80
81             if key:
82                 # Flush previous assignment, if any
83                 key, value = self._assignment(key, value)
84
85             if line.startswith('['):
86                 # Section start
87                 section = self._get_section(line)
88                 if section:
89                     self.new_section(section)
90             elif line.startswith(('#', ';')):
91                 self.comment(line[1:].lstrip())
92             else:
93                 key, value = self._split_key_value(line)
94                 if not key:
95                     return self.error_empty_key(line)
96
97         if key:
98             # Flush previous assignment, if any
99             self._assignment(key, value)
100
101     def assignment(self, key, value):
102         """Called when a full assignment is parsed."""
103         raise NotImplementedError()
104
105     def new_section(self, section):
106         """Called when a new section is started."""
107         raise NotImplementedError()
108
109     def comment(self, comment):
110         """Called when a comment is parsed."""
111         pass
112
113     def error_invalid_assignment(self, line):
114         raise self.parse_exc("No ':' or '=' found in assignment",
115                              self.lineno, line)
116
117     def error_empty_key(self, line):
118         raise self.parse_exc('Key cannot be empty', self.lineno, line)
119
120     def error_unexpected_continuation(self, line):
121         raise self.parse_exc('Unexpected continuation line',
122                              self.lineno, line)
123
124     def error_no_section_end_bracket(self, line):
125         raise self.parse_exc('Invalid section (must end with ])',
126                              self.lineno, line)
127
128     def error_no_section_name(self, line):
129         raise self.parse_exc('Empty section name', self.lineno, line)
130
131
132 class ConfigParser(BaseParser):
133     """Parses a single config file, populating 'sections' to look like:
134
135         {'DEFAULT': {'key': [value, ...], ...},
136          ...}
137     """
138
139     def __init__(self, filename, sections):
140         super(ConfigParser, self).__init__()
141         self.filename = filename
142         self.sections = sections
143         self.section = None
144
145     def parse(self):
146         with open(self.filename) as f:
147             return super(ConfigParser, self).parse(f)
148
149     def find_section(self, sections, section):
150         return next((i for i, sect in enumerate(sections) if sect == section), -1)
151
152     def new_section(self, section):
153         self.section = section
154         index = self.find_section(self.sections, section)
155         if index == -1:
156             self.sections.append([section, []])
157
158     def assignment(self, key, value):
159         if not self.section:
160             raise self.error_no_section()
161
162         value = '\n'.join(value)
163
164         def append(sections, section):
165             entry = [key, value]
166             index = self.find_section(sections, section)
167             sections[index][1].append(entry)
168
169         append(self.sections, self.section)
170
171     def parse_exc(self, msg, lineno, line=None):
172         return ParseError(msg, lineno, line)
173
174     def error_no_section(self):
175         return self.parse_exc('Section must be started before assignment',
176                               self.lineno)