X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=vstf%2Fvstf%2Fcontroller%2Fsettings%2Fsettings.py;fp=vstf%2Fvstf%2Fcontroller%2Fsettings%2Fsettings.py;h=0000000000000000000000000000000000000000;hb=f84c8dcc22f1499128893e62b0e15b4b592c47ba;hp=2c712bb2174073506ab7bbac7d7cad8a64f6f9bf;hpb=7ba76747d55669e2bbaf70a3061e1c0b5dea912e;p=bottlenecks.git diff --git a/vstf/vstf/controller/settings/settings.py b/vstf/vstf/controller/settings/settings.py deleted file mode 100644 index 2c712bb2..00000000 --- a/vstf/vstf/controller/settings/settings.py +++ /dev/null @@ -1,289 +0,0 @@ -############################################################################## -# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## - -import json -import re -import os -import copy -import logging -import sys - -LOG = logging.getLogger(__name__) - - -def object2dict(obj): - # convert object to a dict - dic = {'__class__': obj.__class__.__name__, '__module__': obj.__module__} - dic.update(obj.__dict__) - return dic - - -def dict2object(dic): - # convert dict to object - if '__class__' in dic: - class_name = dic.pop('__class__') - module_name = dic.pop('__module__') - module = __import__(module_name) - class_ = getattr(module, class_name) - args = dict((key.encode('ascii'), value) for key, value in dic.items()) # get args - inst = class_(**args) # create new instance - else: - inst = dic - return inst - - -def filter_comments(filename, flags="//"): - result = [] - with open(filename, "r") as ifile: - lines = ifile.readlines() - for data in lines: - data = re.sub("%s.*$" % (flags), '', data) - data = re.sub("^\s*$", '', data) - if data: - result.append(data) - LOG.debug(result) - return ''.join(result) - - -class BaseSettings(object): - def _load(self, fullname): - data = filter_comments(fullname) - LOG.debug(fullname) - LOG.debug(data) - jparams = None - if data: - jparams = json.loads(data) - return jparams - - def _sub(self, ldata, rdata): - if isinstance(ldata, list) and isinstance(rdata, list): - data = [] - if ldata: - for litem in ldata: - if rdata: - for ritem in rdata: - if isinstance(litem, dict) or isinstance(litem, list): - tmp = self._sub(litem, ritem) - else: - tmp = ritem - if tmp and tmp not in data: - data.append(tmp) - else: - data.append(litem) - - else: - data = rdata - - elif isinstance(ldata, dict) and isinstance(rdata, dict): - data = {} - rdata_bak = copy.deepcopy(rdata) - for rkey, rvalue in rdata_bak.items(): - if rkey not in ldata: - rdata_bak.pop(rkey) - for lkey, lvalue in ldata.items(): - if lkey in rdata: - if isinstance(lvalue, dict) or isinstance(lvalue, list): - data[lkey] = self._sub(lvalue, rdata[lkey]) - else: - data[lkey] = rdata[lkey] - else: - if rdata_bak: - data[lkey] = lvalue - else: - data = rdata - - return data - - def _save(self, data, filename): - if os.path.exists(filename): - os.remove(filename) - with open(filename, 'w') as ofile: - content = json.dumps(data, sort_keys=True, indent=4, separators=(',', ':')) - ofile.write(content) - - -class DefaultSettings(BaseSettings): - def __init__(self, path): - self._default = os.path.join(path, 'default') - self._user = os.path.join(path, 'user') - - def load(self, filename): - dfile = os.path.join(self._default, filename) - if os.path.exists(dfile): - ddata = self._load(dfile) - data = ddata - else: - err = "default file is missing : %s" % (dfile) - LOG.error(err) - raise Exception(err) - ufile = os.path.join(self._user, filename) - if os.path.exists(ufile): - udata = self._load(ufile) - if udata: - data = self._sub(ddata, udata) - else: - LOG.info("no user file :%s" % (ufile)) - return data - - def save(self, data, filename): - ufile = os.path.join(self._user, filename) - self._save(data, ufile) - - -class SingleSettings(BaseSettings): - def __init__(self, path): - self._path = path - - def load(self, filename): - pfile = os.path.join(self._path, filename) - if os.path.exists(pfile): - ddata = self._load(pfile) - data = ddata - else: - err = "settings file is missing : %s" % (pfile) - LOG.error(err) - raise Exception(err) - return data - - def save(self, data, filename): - pfile = os.path.join(self._path, filename) - self._save(data, pfile) - -SETS_DEFAULT = "Default" -SETS_SINGLE = "Single" -SETTINGS = [SETS_SINGLE, SETS_DEFAULT] - - -class Settings(object): - def __init__(self, path, filename, mode=SETS_SINGLE): - if mode not in SETTINGS: - raise Exception("error Settings mode : %s" % (mode)) - cls_name = mode + "Settings" - thismodule = sys.modules[__name__] - cls = getattr(thismodule, cls_name) - self._settings = cls(path) - self._filename = filename - self._fset = self._settings.load(filename) - self._mset = copy.deepcopy(self._fset) - self._register_func() - - def reset(self): - self._fset = self._settings.load(self._filename) - self._mset = copy.deepcopy(self._fset) - - @property - def settings(self): - return self._mset - - def _setting_file(self, func_name, mset, fset, key, check=None): - def infunc(value): - if hasattr(check, '__call__'): - check(value) - if isinstance(fset, dict): - mset[key] = copy.deepcopy(value) - fset[key] = copy.deepcopy(value) - elif isinstance(fset, list): - del (mset[:]) - del (fset[:]) - mset.extend(copy.deepcopy(value)) - fset.extend(copy.deepcopy(value)) - self._settings.save(self._fset, self._filename) - infunc.__name__ = func_name - LOG.debug(self._mset) - LOG.debug(self._fset) - - return infunc - - def _setting_memory(self, func_name, mset, key, check=None): - def infunc(value): - if hasattr(check, '__call__'): - check(value) - if isinstance(mset, dict): - mset[key] = copy.deepcopy(value) - elif isinstance(mset, list): - for i in range(len(mset)): - mset.pop() - mset.extend(copy.deepcopy(value)) - - infunc.__name__ = func_name - LOG.debug(self._mset) - LOG.debug(self._fset) - - return infunc - - def _adding_file(self, func_name, mset, fset, key, check=None): - def infunc(value): - if hasattr(check, '__call__'): - check(value) - if key: - mset[key].append(copy.deepcopy(value)) - fset[key].append(copy.deepcopy(value)) - else: - mset.append(copy.deepcopy(value)) - fset.append(copy.deepcopy(value)) - - self._settings.save(self._fset, self._filename) - infunc.__name__ = func_name - LOG.debug(self._mset) - LOG.debug(self._fset) - - return infunc - - def _adding_memory(self, func_name, mset, key, check=None): - def infunc(value): - if hasattr(check, '__call__'): - check(value) - if key: - mset[key].append(copy.deepcopy(value)) - else: - mset.append(copy.deepcopy(value)) - infunc.__name__ = func_name - LOG.debug(self._mset) - LOG.debug(self._fset) - - return infunc - - def _register_func(self): - if isinstance(self._fset, dict): - items = set( - self._fset.keys() - ) - for item in items: - item = item.encode() - func_name = "set_%s" % item - setattr(self, func_name, self._setting_file(func_name, self._mset, self._fset, item)) - func_name = "mset_%s" % item - setattr(self, func_name, self._setting_memory(func_name, self._mset, item)) - elif isinstance(self._fset, list): - func_name = "set" - setattr(self, func_name, self._setting_file(func_name, self._mset, self._fset, None)) - func_name = "mset" - setattr(self, func_name, self._setting_memory(func_name, self._mset, None)) - func_name = "add" - setattr(self, func_name, self._adding_file(func_name, self._mset, self._fset, None)) - func_name = "madd" - setattr(self, func_name, self._adding_memory(func_name, self._mset, None)) - - -def unit_test(): - from vstf.common.log import setup_logging - setup_logging(level=logging.DEBUG, log_file="/var/log/vstf-settings.log", clevel=logging.INFO) - - path = '/etc/vstf' - setting = DefaultSettings(path) - filename = 'reporters.mail.mail-settings' - data = setting.load(filename) - - setting.save(data, filename) - LOG.info(type(data)) - LOG.info(data) - - -if __name__ == '__main__': - unit_test()