# Copyright 2012 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. class ParseError(Exception): def __init__(self, message, lineno, line): self.msg = message self.line = line self.lineno = lineno def __str__(self): return 'at line %d, %s: %r' % (self.lineno, self.msg, self.line) class BaseParser(object): lineno = 0 parse_exc = ParseError def _assignment(self, key, value): self.assignment(key, value) return None, [] def _get_section(self, line): if not line.endswith(']'): return self.error_no_section_end_bracket(line) if len(line) <= 2: return self.error_no_section_name(line) return line[1:-1] def _split_key_value(self, line): colon = line.find(':') equal = line.find('=') if colon < 0 and equal < 0: return line.strip(), '@' if colon < 0 or (equal >= 0 and equal < colon): key, value = line[:equal], line[equal + 1:] else: key, value = line[:colon], line[colon + 1:] value = value.strip() if value and value[0] == value[-1] and value.startswith(("\"", "'")): value = value[1:-1] return key.strip(), [value] def parse(self, lineiter): key = None value = [] for line in lineiter: self.lineno += 1 line = line.rstrip() lines = line.split(';') line = lines[0] if not line: # Blank line, ends multi-line values if key: key, value = self._assignment(key, value) continue elif line.startswith((' ', '\t')): # Continuation of previous assignment if key is None: self.error_unexpected_continuation(line) else: value.append(line.lstrip()) continue if key: # Flush previous assignment, if any key, value = self._assignment(key, value) if line.startswith('['): # Section start section = self._get_section(line) if section: self.new_section(section) elif line.startswith(('#', ';')): self.comment(line[1:].lstrip()) else: key, value = self._split_key_value(line) if not key: return self.error_empty_key(line) if key: # Flush previous assignment, if any self._assignment(key, value) def assignment(self, key, value): """Called when a full assignment is parsed.""" raise NotImplementedError() def new_section(self, section): """Called when a new section is started.""" raise NotImplementedError() def comment(self, comment): """Called when a comment is parsed.""" pass def error_invalid_assignment(self, line): raise self.parse_exc("No ':' or '=' found in assignment", self.lineno, line) def error_empty_key(self, line): raise self.parse_exc('Key cannot be empty', self.lineno, line) def error_unexpected_continuation(self, line): raise self.parse_exc('Unexpected continuation line', self.lineno, line) def error_no_section_end_bracket(self, line): raise self.parse_exc('Invalid section (must end with ])', self.lineno, line) def error_no_section_name(self, line): raise self.parse_exc('Empty section name', self.lineno, line) class ConfigParser(BaseParser): """Parses a single config file, populating 'sections' to look like: {'DEFAULT': {'key': [value, ...], ...}, ...} """ def __init__(self, filename, sections): super(ConfigParser, self).__init__() self.filename = filename self.sections = sections self.section = None def parse(self): with open(self.filename) as f: return super(ConfigParser, self).parse(f) def find_section(self, sections, section): return next((i for i, sect in enumerate(sections) if sect == section), -1) def new_section(self, section): self.section = section index = self.find_section(self.sections, section) if index == -1: self.sections.append([section, []]) def assignment(self, key, value): if not self.section: raise self.error_no_section() value = '\n'.join(value) def append(sections, section): entry = [key, value] index = self.find_section(sections, section) sections[index][1].append(entry) append(self.sections, self.section) def parse_exc(self, msg, lineno, line=None): return ParseError(msg, lineno, line) def error_no_section(self): return self.parse_exc('Section must be started before assignment', self.lineno)