__logger = logging.getLogger(__name__)
+ config = '/root/.kube/config'
+
def __init__(self, **kwargs):
super(K8sTesting, self).__init__(**kwargs)
- self.config = '/root/.kube/config'
self.cmd = []
self.result = 0
self.start_time = 0
process = subprocess.Popen(cmd_line, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
output = process.stdout.read()
- # Remove color code escape sequences
- output = re.sub(r'\x1B\[[0-?]*[ -/]*[@-~]', '', str(output))
-
if ('Error loading client' in output or
'Unexpected error' in output):
raise Exception(output)
self.stop_time = time.time()
return res
- def check_envs(self): # pylint: disable=no-self-use
- """Check if required environment variables are set"""
- try:
- assert 'DEPLOY_SCENARIO' in os.environ
- assert 'KUBE_MASTER_IP' in os.environ
- assert 'KUBERNETES_PROVIDER' in os.environ
- assert 'KUBE_MASTER_URL' in os.environ
- except Exception as ex:
- raise Exception("Cannot run k8s testcases. "
- "Please check env var: %s" % str(ex))
-
class K8sSmokeTest(K8sTesting):
"""Kubernetes smoke test suite"""
if "case_name" not in kwargs:
kwargs.get("case_name", 'k8s_smoke')
super(K8sSmokeTest, self).__init__(**kwargs)
- self.check_envs()
- self.cmd = ['/src/k8s.io/kubernetes/cluster/test-smoke.sh', '--host',
- os.getenv('KUBE_MASTER_URL')]
+ self.cmd = ['e2e.test', '-ginkgo.focus', 'Guestbook.application',
+ '-ginkgo.noColor', '-kubeconfig', self.config,
+ '--provider', 'local']
class K8sConformanceTest(K8sTesting):
if "case_name" not in kwargs:
kwargs.get("case_name", 'k8s_conformance')
super(K8sConformanceTest, self).__init__(**kwargs)
- self.check_envs()
- self.cmd = ['/src/k8s.io/kubernetes/_output/bin/e2e.test',
- '-ginkgo.focus', 'Conformance',
- '-kubeconfig', self.config]
+ self.cmd = ['e2e.test', '-ginkgo.focus', 'Conformance',
+ '-ginkgo.noColor', '-kubeconfig', self.config,
+ '--provider', 'local']