def __init__(self, **kwargs):
super(K8sTesting, self).__init__(**kwargs)
+ self.config = '/root/.kube/config'
self.cmd = []
self.result = 0
self.start_time = 0
i = 0
while i < len(lines):
if '[k8s.io]' in lines[i]:
- if i != 0 and 'seconds' in lines[i-1]:
- self.__logger.debug(lines[i-1])
- while lines[i] != '-'*len(lines[i]):
+ if i != 0 and 'seconds' in lines[i - 1]:
+ self.__logger.debug(lines[i - 1])
+ while lines[i] != '-' * len(lines[i]):
if lines[i].startswith('STEP:') or ('INFO:' in lines[i]):
break
self.__logger.debug(lines[i])
- i = i+1
+ i = i + 1
success = 'SUCCESS!' in lines[i]
failure = 'FAIL!' in lines[i]
if success or failure:
- if i != 0 and 'seconds' in lines[i-1]:
- remarks.append(lines[i-1])
+ if i != 0 and 'seconds' in lines[i - 1]:
+ remarks.append(lines[i - 1])
remarks = remarks + lines[i].replace('--', '|').split('|')
break
- i = i+1
+ i = i + 1
- self.__logger.debug('-'*10)
+ self.__logger.debug('-' * 10)
self.__logger.info("Remarks:")
for remark in remarks:
if 'seconds' in remark:
def run(self, **kwargs):
- if not os.path.isfile(os.getenv('KUBECONFIG')):
+ if not os.path.isfile(self.config):
self.__logger.error(
"Cannot run k8s testcases. Config file not found ")
return self.EX_RUN_ERROR
"""Check if required environment variables are set"""
try:
assert 'DEPLOY_SCENARIO' in os.environ
- assert 'KUBECONFIG' in os.environ
assert 'KUBE_MASTER_IP' in os.environ
assert 'KUBERNETES_PROVIDER' in os.environ
assert 'KUBE_MASTER_URL' in os.environ
super(K8sConformanceTest, self).__init__(**kwargs)
self.check_envs()
self.cmd = ['/src/k8s.io/kubernetes/_output/bin/e2e.test',
- '--ginkgo.focus', 'Conformance']
+ '-ginkgo.focus', 'Conformance',
+ '-kubeconfig', self.config]
def setUp(self):
os.environ["DEPLOY_SCENARIO"] = "k8-test"
- os.environ["KUBECONFIG"] = "/home/config"
os.environ["KUBE_MASTER_IP"] = "127.0.0.1"
os.environ["KUBE_MASTER_URL"] = "https://127.0.0.1:6443"
os.environ["KUBERNETES_PROVIDER"] = "local"
def test_no_deploy_scenario(self):
self._test_no_env_var("DEPLOY_SCENARIO")
- def test_no_kubeconfig(self):
- self._test_no_env_var("KUBECONFIG")
-
def test_no_kube_master_ip(self):
self._test_no_env_var("KUBE_MASTER_IP")