Improve data ingestion reliability and functionality 83/63383/2
authorearrage <eddie.arrage@huawei.com>
Wed, 10 Oct 2018 18:54:51 +0000 (11:54 -0700)
committerearrage <eddie.arrage@huawei.com>
Wed, 10 Oct 2018 18:59:49 +0000 (11:59 -0700)
- Modify deployment namespace to clover-system and account
for cassandra moving to the clover-system namespace
- Increase k8s compute resource assigned to cassandra to deal
with performance issues
- Add additional fields (user-agent, request/response size,
status codes) to span schema definition and modify primary keys
- Improve exception handling to prevent collect process from
crashing
- Minor changes to support tracing/monitoring with Istio 1.0
- Inhibit logging for debug messages
- Increase time back and number of traces to fetch in
each sampling interval to deal with Jaeger REST interface
returning trace data out of order under load
(tested to 300 conn/sec; 12K connections currently)
- Move trace insert into batch mode to cassandra
- Read visibility services to analyze from redis rather than
defaults (cloverctl, UI or clover-controller REST will set)
- Remove local directory copies in docker build, as image is
based on base clover container

Change-Id: Ibae98ef5057e52a6eeddd9ebbcfaeb644caec36c
Signed-off-by: earrage <eddie.arrage@huawei.com>
clover/collector/db/cassops.py
clover/collector/db/redisops.py
clover/collector/docker/Dockerfile
clover/collector/grpc/collector_client.py
clover/collector/grpc/collector_server.py
clover/collector/process/collect.py
clover/collector/yaml/manifest.template
clover/tools/yaml/cassandra.yaml

index 6553cff..0bc9d84 100644 (file)
@@ -9,7 +9,7 @@ from cassandra.cluster import Cluster
 from cassandra.query import BatchStatement
 import logging
 
-CASSANDRA_HOSTS = ['cassandra.default']
+CASSANDRA_HOSTS = ['cassandra.clover-system']
 
 
 class CassandraOps:
@@ -57,13 +57,18 @@ class CassandraOps:
                         spanid text,
                         traceid text,
                         duration int,
-                        start_time int,
+                        start_time timestamp,
                         processid text,
                         operation_name text,
                         node_id text,
                         http_url text,
+                        user_agent text,
+                        request_size text,
+                        response_size text,
+                        status_code text,
                         upstream_cluster text,
-                        PRIMARY KEY (spanid, traceid)
+                        insert_time timestamp,
+                        PRIMARY KEY (traceid, spanid)
                     )
                     """)
 
@@ -82,11 +87,18 @@ class CassandraOps:
 
     def set_prepared(self):
         self.session.set_keyspace(self.keyspace)
-        self.insert_tracing_stmt = self.session.prepare(
+        self.insert_span_stmt = self.session.prepare(
             """
             INSERT INTO spans (spanid, traceid, duration, operation_name,
-            node_id, http_url, upstream_cluster)
-            VALUES (?, ?, ?, ?, ?, ?, ?)
+            node_id, http_url, upstream_cluster, start_time, user_agent,
+            request_size, response_size, status_code, insert_time)
+            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, toTimestamp(now()))
+            """
+        )
+        self.insert_trace_stmt = self.session.prepare(
+            """
+            INSERT INTO traces (traceid, processes)
+            VALUES (?, ?)
             """
         )
         self.insert_metric_stmt = self.session.prepare(
@@ -103,31 +115,31 @@ class CassandraOps:
     def execute_batch(self):
         self.session.execute(self.batch)
 
-    def insert_tracing(self, table, traceid, s, tags):
+    def insert_span(self, traceid, s, tags):
         self.session.set_keyspace(self.keyspace)
         if 'upstream_cluster' not in tags:
-            logging.debug('NO UPSTREAM_CLUSTER KEY')
+            logging.debug('NO UPSTREAM_CLUSTER KEY')
             tags['upstream_cluster'] = 'none'
         try:
-            self.batch.add(self.insert_tracing_stmt,
+            self.batch.add(self.insert_span_stmt,
                            (s['spanID'], traceid, s['duration'],
                             s['operationName'], tags['node_id'],
-                            tags['http.url'], tags['upstream_cluster']))
+                            tags['http.url'], tags['upstream_cluster'],
+                            int(str(s['startTime'])[0:13]), tags['user_agent'],
+                            tags['request_size'], tags['response_size'],
+                            tags['http.status_code']))
+        except KeyError as e:
+            logging.debug('Insert span error: {}, Tags: {}'.format(e, tags))
         except Exception as e:
-            logging.debug('{} {} {} {} {} {} {}'.format(s['spanID'], traceid,
-                          s['duration'], s['operationName'], tags['node_id'],
-                          tags['http.url'], tags['upstream_cluster']))
-            logging.debug(e)
+            logging.debug('Insert span error: {}'.format(e))
+            logging.debug('Tags: {}'.format(tags))
+            logging.debug('Span toplevel: {}'.format(s))
+            logging.debug(
+                'startTime: {}'.format(int(str(s['startTime'])[0:13])))
 
     def insert_trace(self, traceid, processes):
         self.session.set_keyspace(self.keyspace)
-        self.session.execute(
-            """
-            INSERT INTO traces (traceid, processes)
-            VALUES (%s, %s)
-            """,
-            (traceid,  processes)
-        )
+        self.batch.add(self.insert_trace_stmt, (traceid,  processes))
 
     def insert_metric(self, m_name, m_value, m_time, service):
         self.session.set_keyspace(self.keyspace)
index e80c417..24fbeb9 100644 (file)
@@ -8,8 +8,7 @@
 import redis
 import logging
 
-REDIS_HOST = 'redis'
-# REDIS_HOST = '10.244.0.85'
+REDIS_HOST = 'redis.default'
 
 
 class RedisOps:
@@ -27,11 +26,16 @@ class RedisOps:
         for s in service_names:
             self.r.sadd(skey, s)
 
+    def set_tracing_services(self, services, skey='tracing_services'):
+        self.r.delete(skey)
+        for s in services:
+            self.r.sadd(skey, s)
+
     def init_metrics(self, pkey='metric_prefixes', skey='metric_suffixes'):
-        metric_prefixes = ['envoy_cluster_out_', 'envoy_cluster_in_']
+        metric_prefixes = ['envoy_cluster_outbound_', 'envoy_cluster_inbound_']
         metric_suffixes = [
-            '_default_svc_cluster_local_http_internal_upstream_rq_2xx',
-            '_default_svc_cluster_local_http_upstream_cx_active']
+            '_default_svc_cluster_local_upstream_rq_2xx',
+            '_default_svc_cluster_local_upstream_cx_active']
         for p in metric_prefixes:
             self.r.sadd(pkey, p)
         for s in metric_suffixes:
index 1714420..7b6effd 100644 (file)
@@ -16,15 +16,6 @@ ENV CLOVER_REPO_DIR="${REPOS_DIR}/clover"
 RUN python -m pip install cassandra-driver redis
 
 # Set work directory
-WORKDIR ${CLOVER_REPO_DIR}
-
-COPY /process clover/collector/process
-COPY /grpc clover/collector/grpc
-COPY /db clover/collector/db
-COPY __init__.py clover/collector/__init__.py
-
-RUN pip install .
-
 WORKDIR "${CLOVER_REPO_DIR}/clover/collector"
 
 CMD ./process/grpc_process.sh no_schema_init
index b9e9f67..65ff2ff 100644 (file)
@@ -55,7 +55,7 @@ def get_podip(pod_name):
 
 def init_visibility(stub):
     try:
-        cassandra_hosts = pickle.dumps(['cassandra.default'])
+        cassandra_hosts = pickle.dumps(['cassandra.clover-system'])
         response = stub.InitVisibility(collector_pb2.ConfigCassandra(
             cassandra_hosts=cassandra_hosts, cassandra_port=9042))
     except Exception as e:
@@ -65,7 +65,7 @@ def init_visibility(stub):
 
 def clean_visibility(stub):
     try:
-        cassandra_hosts = pickle.dumps(['cassandra.default'])
+        cassandra_hosts = pickle.dumps(['cassandra.clover-system'])
         schemas = pickle.dumps(['spans', 'traces', 'metrics'])
         response = stub.TruncateVisibility(collector_pb2.Schemas(
             schemas=schemas, cassandra_hosts=cassandra_hosts,
@@ -77,7 +77,7 @@ def clean_visibility(stub):
 
 def start_collector(stub):
     try:
-        cassandra_hosts = pickle.dumps(['cassandra.default'])
+        cassandra_hosts = pickle.dumps(['cassandra.clover-system'])
         response = stub.StartCollector(collector_pb2.ConfigCollector(
             t_port='16686', t_host='jaeger-deployment.istio-system',
             m_port='9090', m_host='prometheus.istio-system',
index c2eb221..a10078e 100644 (file)
@@ -29,7 +29,7 @@ class Controller(collector_pb2_grpc.ControllerServicer):
                             level=logging.DEBUG)
         self.collector = 0
         if init_visibility == 'set_schemas':
-            cassandra_hosts = pickle.dumps(['cassandra.default'])
+            cassandra_hosts = pickle.dumps(['cassandra.clover-system'])
             self.InitVisibility(collector_pb2.ConfigCassandra(
                cassandra_port=9042, cassandra_hosts=cassandra_hosts), "")
 
index d8beb49..3d9df8a 100644 (file)
@@ -16,19 +16,25 @@ import argparse
 import logging
 import ast
 
-TRACING_SERVICES = ['istio-ingress']
 TRACING_PORT = "16686"
 MONITORING_PORT = "9090"
 CASSANDRA_PORT = 9042  # Provide as integer
 MONITORING_HOST = "prometheus.istio-system"
-TRACING_HOST = "jaeger-deployment.istio-system"
-CASSANDRA_HOSTS = ['cassandra.default']
+TRACING_HOST = "tracing.istio-system"
+CASSANDRA_HOSTS = ['cassandra.clover-system']
 
 
 class Collector:
 
     def __init__(self, t_port, t_host, m_port, m_host, c_port, c_hosts):
-        logging.basicConfig(filename='collector.log', level=logging.DEBUG)
+
+        # logging.basicConfig(filename='collector.log', level=logging.DEBUG)
+        logging.basicConfig(filename='collector.log', level=logging.ERROR)
+        # logging.getLogger("requests").setLevel(logging.DEBUG)
+        logging.getLogger("requests").setLevel(logging.ERROR)
+        # logging.getLogger("urllib3").setLevel(logging.DEBUG)
+        logging.getLogger("urllib3").setLevel(logging.ERROR)
+
         try:
             self.t = Tracing(t_host, t_port, '', False)
             monitoring_url = "http://{}:{}".format(m_host, m_port)
@@ -40,63 +46,89 @@ class Collector:
                 logging.debug(e)
 
     # Toplevel tracing retrieval and batch insert
-    def get_tracing(self, services, time_back=20):
-        self.c.set_batch()
-        for service in services:
-            traces = self.t.getTraces(service, time_back)
-            try:
-                self.set_tracing(traces)
-            except Exception as e:
-                logging.debug(e)
-        self.c.execute_batch()
+    def get_tracing(self, time_back=300):
+        try:
+            services = self.r.get_services()
+            for service in services:
+                traces = self.t.getTraces(service.replace("_", "-"), time_back,
+                                          '20000')
+                try:
+                    self.set_tracing(traces)
+                except Exception as e:
+                    logging.debug(e)
+
+            # Update list of available services from tracing
+            services = self.t.getServices()
+            self.r.set_tracing_services(services)
+        except Exception as e:
+            logging.debug(e)
 
     # Insert to cassandra visibility traces and spans tables
     def set_tracing(self, trace):
         for traces in trace['data']:
+            self.c.set_batch()
             for spans in traces['spans']:
+                try:
                     span = {}
                     span['spanID'] = spans['spanID']
                     span['duration'] = spans['duration']
                     span['startTime'] = spans['startTime']
                     span['operationName'] = spans['operationName']
+
                     tag = {}
                     for tags in spans['tags']:
                         tag[tags['key']] = tags['value']
-                    self.c.insert_tracing('spans', traces['traceID'],
-                                          span, tag)
+                    self.c.insert_span(traces['traceID'], span, tag)
+                except Exception as e:
+                    logging.debug("spans loop")
+                    logging.debug(e)
+
             process_list = []
             for p in traces['processes']:
                 process_list.append(p)
             service_names = []
             for pname in process_list:
                 service_names.append(traces['processes'][pname]['serviceName'])
-            self.c.insert_trace(traces['traceID'], service_names)
+            try:
+                self.c.insert_trace(traces['traceID'], service_names)
+                self.c.execute_batch()
+            except Exception as e:
+                logging.debug(e)
 
     # Insert to cassandra visibility metrics table
     def get_monitoring(self):
 
-        # Fetch collector service/metric lists from redis
-        service_names = self.r.get_services()
-        metric_prefixes, metric_suffixes = self.r.get_metrics()
-
-        self.c.set_batch()
-        for sname in service_names:
-            for prefix in metric_prefixes:
-                for suffix in metric_suffixes:
-                    try:
-                        metric_name = prefix + sname + suffix
-                        query_params = {
-                            "type": "instant",
-                            "query": metric_name
-                        }
-                        data = self.m.query(query_params)
-                        m_value = data['data']['result'][0]['value'][1]
-                        m_time = data['data']['result'][0]['value'][0]
-                        mn = data['data']['result'][0]['metric']['__name__']
-                        self.c.insert_metric(mn, m_value, str(m_time), sname)
-                    except Exception as e:
-                        logging.debug(e)
-        self.c.execute_batch()
+        try:
+            # Fetch collector service/metric lists from redis
+            service_names = self.r.get_services()
+            metric_prefixes, metric_suffixes = self.r.get_metrics()
+
+            self.c.set_batch()
+            for sname in service_names:
+                for prefix in metric_prefixes:
+                    for suffix in metric_suffixes:
+                        try:
+                            metric_name = prefix + sname + suffix
+                            query_params = {
+                                "type": "instant",
+                                "query": metric_name
+                            }
+                            data = self.m.query(query_params)
+                            m_value = data['data']['result'][0]['value'][1]
+                            m_time = data['data']['result'][0]['value'][0]
+                            mn = data[
+                                    'data']['result'][0]['metric']['__name__']
+                            self.c.insert_metric(
+                                           mn, m_value, str(m_time), sname)
+
+                            # Add to redis temporarily
+                            self.r.r.set(mn, m_value)
+
+                        except Exception as e:
+                            logging.debug(e)
+            self.c.execute_batch()
+        except Exception as e:
+            logging.debug(e)
 
         # TODO add batch retrieval for monitoring metrics
         # query_range_param = {
@@ -124,11 +156,13 @@ def main(args):
     loop = True
     while loop:
         try:
-            c.get_tracing(args['t_services'])
+            c.get_tracing()
             c.get_monitoring()
             time.sleep(int(args['sinterval']))
         except KeyboardInterrupt:
             loop = False
+        except Exception as e:
+            logging.debug(e)
 
 
 if __name__ == '__main__':
@@ -154,9 +188,6 @@ if __name__ == '__main__':
     parser.add_argument(
             '-c_port', default=CASSANDRA_PORT,
             help='Port to access Cassandra cluster')
-    parser.add_argument(
-            '-t_services', default=TRACING_SERVICES,
-            help='Collect services on this list of services')
 
     args, unknown = parser.parse_known_args()
     print(main(vars(args)))
index c7aa3e7..795bd8f 100644 (file)
@@ -5,6 +5,7 @@ metadata:
   name: {{ deploy_name }}
   labels:
     app: {{ deploy_name }}
+  namespace: clover-system
 spec:
   template:
     metadata:
@@ -27,6 +28,7 @@ metadata:
   name: {{ deploy_name }}
   labels:
     app: {{ deploy_name }}
+  namespace: clover-system
 spec:
   ports:
   - port: {{ grpc_port }}
index 0206d75..dc1c46f 100644 (file)
@@ -36,6 +36,7 @@ metadata:
   labels:
     app: cassandra
   name: cassandra
+  namespace: clover-system
 spec:
   clusterIP: None
   ports:
@@ -49,6 +50,7 @@ metadata:
   name: cassandra
   labels:
     app: cassandra
+  namespace: clover-system
 spec:
   serviceName: cassandra
   replicas: 1
@@ -76,18 +78,18 @@ spec:
           name: cql
         resources:
           limits:
-            cpu: "500m"
-            memory: 1Gi
+            cpu: "1000m"
+            memory: 5Gi
           requests:
-           cpu: "500m"
-           memory: 1Gi
+           cpu: "1000m"
+           memory: 5Gi
         env:
           - name: MAX_HEAP_SIZE
             value: 512M
           - name: HEAP_NEWSIZE
             value: 100M
           - name: CASSANDRA_SEEDS
-            value: "cassandra-0.cassandra.default.svc.cluster.local"
+            value: "cassandra-0.cassandra.clover-system.svc.cluster.local"
           - name: CASSANDRA_CLUSTER_NAME
             value: "MyCassandraDemo"
           - name: CASSANDRA_DC