Initial commit for tracing 99/53899/6
authorEddie Arrage <eddie.arrage@huawei.com>
Thu, 15 Mar 2018 17:08:30 +0000 (10:08 -0700)
committerEddie Arrage <eddie.arrage@huawei.com>
Thu, 29 Mar 2018 23:49:25 +0000 (16:49 -0700)
- Uses REST interface to obtain traces for services from Jaeger
- Discover services availabe in tracing
- Works only with Jaeger at the moment (not zipkin)
- Optional Redis interface added to store traces per test
- Install doc and validation script added for Jaeger

- Renamed doc to docs

Change-Id: I420137c818df290ecd40aa8d318c6961c511a947
Signed-off-by: Eddie Arrage <eddie.arrage@huawei.com>
clover/tracing/tracing.py [new file with mode: 0644]
clover/tracing/tracing_sample.py [new file with mode: 0644]
clover/tracing/validate.py [new file with mode: 0644]
docs/tracing.rst [new file with mode: 0644]

diff --git a/clover/tracing/tracing.py b/clover/tracing/tracing.py
new file mode 100644 (file)
index 0000000..16b952c
--- /dev/null
@@ -0,0 +1,201 @@
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+import requests
+import time
+import redis
+
+TRACING_IP = "localhost"
+TRACING_PORT = "30888"
+
+
+class Tracing:
+
+    def __init__(
+      self, tracing_ip, tracing_port, redis_ip='localhost', use_redis=True):
+        self.tracing_ip = tracing_ip
+        self.tracing_port = tracing_port
+        self.testid = '0'
+        self.test_start_time = 0
+        self.use_redis = use_redis
+        if use_redis:
+            try:
+                self.r = redis.StrictRedis(host=redis_ip, port=6379, db=0)
+            except Exception:
+                print("Failed to connect to redis")
+
+    def setRedisSet(self, rkey, rvalue):
+        if self.use_redis:
+            self.r.sadd(rkey, rvalue)
+
+    def setRedisList(self, rkey, rvalue):
+        if self.use_redis:
+            self.r.lpush(rkey, rvalue)
+
+    def setRedisHash(self, rkey, rvalue):
+        if self.use_redis:
+            self.r.hmset(rkey, rvalue)
+
+    def getRedisTestid(self, index):
+        testid = self.r.lrange("testids", index, index)
+        return testid[0]
+
+    def getRedisTraceids(self, testid):
+        rkey = "traceids:" + str(testid)
+        traceids = self.r.smembers(rkey)
+        return traceids
+
+    def getRedisSpanids(self, traceid):
+        rkey = "spanids:" + str(traceid)
+        spanids = self.r.smembers(rkey)
+        return spanids
+
+    def getRedisSpan(self, spanid, traceid):
+        rkey = "spans:" + str(traceid) + ':' + str(spanid)
+        span = self.r.hgetall(rkey)
+        return span
+
+    def getRedisSpanValue(self, spanid, traceid, span_key):
+        rkey = "spans:" + str(traceid) + ':' + str(spanid)
+        span_value = self.r.hget(rkey, span_key)
+        return span_value
+
+    def getRedisTags(self, spanid, traceid):
+        rkey = "tags:" + str(spanid) + ':' + str(traceid)
+        tags = self.r.hgetall(rkey)
+        return tags
+
+    def getRedisTagsValue(self, spanid, traceid, tag_key):
+        rkey = "tags:" + str(spanid) + ':' + str(traceid)
+        tag_value = self.r.hget(rkey, tag_key)
+        return tag_value
+
+    def getRedisTestAll(self, testid):
+        traceids = self.getRedisTraceids(testid)
+        for trace in traceids:
+            spanids = self.getRedisSpanids(trace)
+            for span in spanids:
+                # print(self.getRedisSpan(span, trace))
+                print(self.getRedisSpanValue(span, trace, 'duration'))
+                # print(self.getRedisTags(span, trace))
+                print(self.getRedisTagsValue(span, trace, 'node_id'))
+
+    def setTest(self, testid):
+        self.testid = testid
+        self.setRedisList("testids", testid)
+        self.test_start_time = int(time.time())
+
+    def getServices(self):
+        req_url = 'http://' + self.tracing_ip + ':' + self.tracing_port + \
+                                                        '/api/services'
+        try:
+            response = requests.get(req_url)
+            if response.status_code != 200:
+                print("ERROR: Cannot connect to tracing: {}".format(
+                                        response.status_code))
+                return False
+        except Exception as e:
+            print("ERROR: Cannot connect to tracing")
+            print(e)
+            return False
+
+        data = response.json()
+        services = data['data']
+        return services
+
+    def getTraces(self, service, time_back=3600, limit='1000'):
+        ref_time = int(time.time())
+        pad_time = '757000'
+        end_time = 'end=' + str(ref_time) + pad_time + '&'
+        if time_back == 0:
+            delta = self.test_start_time
+        else:
+            delta = ref_time - time_back
+        start_time = 'start=' + str(delta) + pad_time
+        limit = 'limit=' + limit + '&'
+        loopback = 'loopback=1h&'
+        max_dur = 'maxDuration&'
+        min_dur = 'minDuration&'
+        service = 'service=' + service + '&'
+        url_prefix = 'http://' + self.tracing_ip + ':' + self.tracing_port + \
+            '/api/traces?'
+        req_url = url_prefix + end_time + limit + loopback + max_dur + \
+            min_dur + service + start_time
+
+        try:
+            response = requests.get(req_url)
+            if response.status_code != 200:
+                print("ERROR: Cannot connect to tracing: {}".format(
+                                        response.status_code))
+                return False
+        except Exception as e:
+            print("ERROR: Cannot connect to tracing")
+            print(e)
+            return False
+
+        traces = response.json()
+        return traces
+
+    def numTraces(self, trace):
+        num_traces = len(trace['data'])
+        return str(num_traces)
+
+    def outProcesses(self, trace):
+        processes = []
+        if trace['data']:
+            first_trace = trace['data'][0]
+            for process in first_trace['processes']:
+                processes.append(process)
+            print(processes)
+        return processes
+
+    def outTraces(self, trace):
+        for traces in trace['data']:
+            print("TraceID: {}".format(traces['traceID']))
+            self.setRedisSet(
+              "traceids:{}".format(str(self.testid)), traces['traceID'])
+            for spans in traces['spans']:
+                    print("SpanID: {}".format(spans['spanID']))
+                    self.setRedisSet(
+                       "spanids:{}".format(traces['traceID']), spans['spanID'])
+                    print("Duration: {} usec".format(spans['duration']))
+                    span = {}
+                    span['spanID'] = spans['spanID']
+                    span['duration'] = spans['duration']
+                    span['startTime'] = spans['startTime']
+                    span['operationName'] = spans['operationName']
+                    # print("Tags:\n {} \n".format(spans['tags']))
+                    self.setRedisHash(
+                        "spans:{}:{}".format(
+                           traces['traceID'], spans['spanID']), span)
+                    tag = {}
+                    for tags in spans['tags']:
+                        print("Tag key: {}, value: {}".format(
+                                tags['key'], tags['value']))
+                        tag[tags['key']] = tags['value']
+                    self.setRedisHash("tags:{}:{}".format(
+                                spans['spanID'], traces['traceID']), tag)
+
+    def monitorTraces(self, sample_interval, service='istio-ingress'):
+        loop = True
+        while loop:
+            try:
+                t = self.getTraces(service, 10)
+                num_traces = self.numTraces(t)
+                print("Number of traces: " + num_traces)
+                self.outTraces(t)
+                time.sleep(sample_interval)
+            except KeyboardInterrupt:
+                print("Test Start: {}".format(self.test_start_time))
+                loop = False
+
+    def main(self):
+        self.monitorTraces(1)
+
+
+if __name__ == '__main__':
+    Tracing(TRACING_IP, TRACING_PORT).main()
diff --git a/clover/tracing/tracing_sample.py b/clover/tracing/tracing_sample.py
new file mode 100644 (file)
index 0000000..f0234bf
--- /dev/null
@@ -0,0 +1,47 @@
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+import uuid
+import time
+from tracing import Tracing
+
+t = Tracing('localhost', '30888')
+
+# Get toplevel services stored in tracing
+services = t.getServices()
+print(services)
+
+# Get traces from the last hour for istio-ingress service
+service = 'istio-ingress'
+traces = t.getTraces(service, 3600)
+# Get process names for first trace service
+t.outProcesses(traces)
+
+# Turn off redis tracing store and output basic trace info
+t.use_redis = False
+t.outTraces(traces)
+
+# Setup basic test and store in redis
+t.use_redis = True
+t.setTest(uuid.uuid4())
+time.sleep(20)
+# Get all traces from test start time when time_back=0
+traces = t.getTraces(service, 0)
+# Store traces in redis
+t.outTraces(traces)
+
+# Get test id for some number of tests back
+testid = t.getRedisTestid('0')
+print(testid)
+traceids = t.getRedisTraceids(testid)
+print(traceids)
+
+# Print out span and tag info for all traces in test
+# Will continue to consider what to extract from hashes for e2e validation
+t.getRedisTestAll(testid)
+
+# t.monitorTraces(1)
diff --git a/clover/tracing/validate.py b/clover/tracing/validate.py
new file mode 100644 (file)
index 0000000..eed6f9a
--- /dev/null
@@ -0,0 +1,66 @@
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+from tracing import Tracing
+from kubernetes import client, config
+
+
+JAEGER_IP = "localhost"
+# JAEGER_IP = "1.1.1.1"
+JAEGER_PORT = "30888"
+JAEGER_DEPLOYMENT = "jaeger-deployment"
+ISTIO_NAMESPACE = "istio-system"
+ISTIO_SERVICES = ["istio-ingress", "istio-mixer"]
+
+
+def validateDeploy():
+    config.load_kube_config()
+    v1 = client.AppsV1Api()
+
+    deployments = []
+    namespaces = []
+    validate = False
+    ret = v1.list_deployment_for_all_namespaces(watch=False)
+    for i in ret.items:
+        deployments.append(i.metadata.name)
+        namespaces.append(i.metadata.namespace)
+    if JAEGER_DEPLOYMENT in deployments:
+        d_index = deployments.index(JAEGER_DEPLOYMENT)
+        if ISTIO_NAMESPACE in namespaces[d_index]:
+            print("Deployment: {} present in {} namespace".format(
+                          JAEGER_DEPLOYMENT, ISTIO_NAMESPACE))
+            validate = True
+    return validate
+
+# Services in Jaeger will only be present when traffic passes through Istio
+# Requires a deployment in Istio service mesh with some traffic targeting nodes
+def validateServices():
+    t = Tracing(JAEGER_IP, JAEGER_PORT)
+    services = t.getServices()
+    validate = True
+    if services:
+        for s in ISTIO_SERVICES:
+            if s in services:
+                print("Service in tracing: {} present".format(s))
+            else:
+                validate = False
+    else:
+        validate = False
+    return validate
+
+
+def main():
+    if validateDeploy() and validateServices():
+        print"Jaeger tracing validation has passed"
+        return True
+    else:
+        print"Jaeger tracing validation has failed"
+        return False
+
+
+if __name__ == '__main__':
+    main()
diff --git a/docs/tracing.rst b/docs/tracing.rst
new file mode 100644 (file)
index 0000000..79d686c
--- /dev/null
@@ -0,0 +1,44 @@
+#######
+Tracing
+#######
+
+************
+Installation
+************
+
+Currently, we use the Jaeger tracing all-in-one Kubernetes template for development and testing,
+which uses in-memory storage. It can be deployed to the istio-system namespace with the
+following command::
+
+    kubectl apply -n istio-system -f https://raw.githubusercontent.com/jaegertracing/jaeger-kubernetes/master/all-in-one/jaeger-all-in-one-template.yml
+
+The standard Jaeger REST port is at 16686. To make this service available outside of the
+Kubernetes cluster, use the following command::
+
+    kubectl expose -n istio-system deployment jaeger-deployment --port=16686 --type=NodePort
+
+Kubernetes will expose the Jaeger service on another port, which can be found with::
+
+    kubectl get svc -n istio-system
+
+An example listing from the command above is shown below where the Jaeger service is exposed
+externally on port 30888::
+
+    istio-system   jaeger-deployment   NodePort  10.104.113.94  <none> 16686:30888/TCP
+
+Jaeger will be accessible using the host IP of the Kubernetes cluster and port provided.
+
+********
+Validate
+********
+
+The script in ``clover/tracing`` validates Jaeger installation::
+
+    python clover/tracing/validate.py
+
+It validates the installation with the following criteria:
+
+#. Existence of Jaeger all-in-one deployment using Kubernetes
+#. Jaeger service is accessible using IP address and port configured in installation steps
+#. Jaeger can retrieve default service listing for default Istio components
+#. TBD - consider installation of production setup with cassandra or elastic search