Initial commit for Clover Collector 45/57245/3
authorEddie Arrage <eddie.arrage@huawei.com>
Wed, 9 May 2018 18:33:55 +0000 (18:33 +0000)
committerEddie Arrage <eddie.arrage@huawei.com>
Tue, 12 Jun 2018 06:43:48 +0000 (06:43 +0000)
- Added a container named clover-collector using clover
container as a base with build script
- GRPC server to manage collector process
- Cassandra DB client interface to initialize visibility keyspace
- Init messaging adds table schemas for tracing - traces & spans
- Adds table for monitoring - metrics
- Does not implement Cassandra server but developed using
public Cassandra docker container
- Collector process in simple loop that periodically fetches
traces and monitoring data and inserts to Cassandra - not optimized
for batch retrieval yet for monitoring
- CLI interface added to collector process and used
by GRPC server for configuration
- Simple GRPC client script to test GRPC server and start/stop
of collector process
- Collector process can be configured with access for tracing,
monitoring and Cassandra
- Added a return value in monitoring query method

- Added ability to truncate tracing, metrics and spans tables
in cql
- Added cql prepared statements and batch insert for metrics
and spans
- Align cql connection to cql deployment within k8s
- Fix issue with cql host list using ast and collect process
args with background argument
- Added redis interface to accept service/metric list
externally for monitoring (will work in conjunction
with clover-controller)
- Use k8s DNS names and default ports for monitoring, tracing
and cassandra
- Added yaml manifest renderer/template for collector

Change-Id: I3e4353e28844c4ce9c185ff4638012b66c7fff67
Signed-off-by: Eddie Arrage <eddie.arrage@huawei.com>
19 files changed:
clover/collector/__init__.py [new file with mode: 0644]
clover/collector/build.sh [new file with mode: 0755]
clover/collector/db/__init__.py [new file with mode: 0644]
clover/collector/db/cassops.py [new file with mode: 0644]
clover/collector/db/redisops.py [new file with mode: 0644]
clover/collector/docker/Dockerfile [new file with mode: 0644]
clover/collector/grpc/__init__.py [new file with mode: 0644]
clover/collector/grpc/build_proto.sh [new file with mode: 0755]
clover/collector/grpc/collector.proto [new file with mode: 0644]
clover/collector/grpc/collector_client.py [new file with mode: 0644]
clover/collector/grpc/collector_pb2.py [new file with mode: 0644]
clover/collector/grpc/collector_pb2_grpc.py [new file with mode: 0644]
clover/collector/grpc/collector_server.py [new file with mode: 0644]
clover/collector/process/__init__.py [new file with mode: 0644]
clover/collector/process/collect.py [new file with mode: 0644]
clover/collector/process/grpc_process.sh [new file with mode: 0755]
clover/collector/yaml/manifest.template [new file with mode: 0644]
clover/collector/yaml/render_yaml.py [new file with mode: 0644]
clover/monitoring/monitoring.py

diff --git a/clover/collector/__init__.py b/clover/collector/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/clover/collector/build.sh b/clover/collector/build.sh
new file mode 100755 (executable)
index 0000000..f305a02
--- /dev/null
@@ -0,0 +1,16 @@
+#!/bin/bash
+#
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+
+IMAGE_PATH=${IMAGE_PATH:-"localhost:5000"}
+IMAGE_NAME=${IMAGE_NAME:-"clover-collector"}
+
+docker build -f docker/Dockerfile -t $IMAGE_NAME .
+docker tag $IMAGE_NAME $IMAGE_PATH/$IMAGE_NAME
+docker push $IMAGE_PATH/$IMAGE_NAME
diff --git a/clover/collector/db/__init__.py b/clover/collector/db/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/clover/collector/db/cassops.py b/clover/collector/db/cassops.py
new file mode 100644 (file)
index 0000000..6553cff
--- /dev/null
@@ -0,0 +1,144 @@
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+from cassandra.cluster import Cluster
+from cassandra.query import BatchStatement
+import logging
+
+CASSANDRA_HOSTS = ['cassandra.default']
+
+
+class CassandraOps:
+
+    def __init__(self, hosts, port=9042, keyspace='visibility'):
+        logging.basicConfig(filename='cassops.log',
+                            level=logging.DEBUG)
+        cluster = Cluster(hosts, port=port)
+        self.session = cluster.connect()
+        self.keyspace = keyspace
+
+    def truncate(self, tables=['traces', 'metrics', 'spans']):
+        self.session.set_keyspace(self.keyspace)
+        try:
+            for table in tables:
+                self.session.execute("""
+                        TRUNCATE %s
+                        """ % table)
+        except Exception as e:
+            logging.debug(e)
+
+    def init_visibility(self):
+        try:
+            self.session.execute("""
+                    CREATE KEYSPACE %s
+                    WITH replication = { 'class': 'SimpleStrategy',
+                    'replication_factor': '1' }
+                    """ % self.keyspace)
+        except Exception as e:
+            logging.debug(e)
+
+        self.session.set_keyspace(self.keyspace)
+
+        try:
+            self.session.execute("""
+                    CREATE TABLE IF NOT EXISTS traces (
+                        traceid text,
+                        processes list<text>,
+                        PRIMARY KEY (traceid)
+                    )
+                    """)
+
+            self.session.execute("""
+                    CREATE TABLE IF NOT EXISTS spans (
+                        spanid text,
+                        traceid text,
+                        duration int,
+                        start_time int,
+                        processid text,
+                        operation_name text,
+                        node_id text,
+                        http_url text,
+                        upstream_cluster text,
+                        PRIMARY KEY (spanid, traceid)
+                    )
+                    """)
+
+            self.session.execute("""
+                    CREATE TABLE IF NOT EXISTS metrics (
+                        m_name text,
+                        m_value text,
+                        m_time text,
+                        service text,
+                        monitor_time timestamp,
+                        PRIMARY KEY (m_name, monitor_time)
+                    )
+                    """)
+        except Exception as e:
+            logging.debug(e)
+
+    def set_prepared(self):
+        self.session.set_keyspace(self.keyspace)
+        self.insert_tracing_stmt = self.session.prepare(
+            """
+            INSERT INTO spans (spanid, traceid, duration, operation_name,
+            node_id, http_url, upstream_cluster)
+            VALUES (?, ?, ?, ?, ?, ?, ?)
+            """
+        )
+        self.insert_metric_stmt = self.session.prepare(
+            """
+            INSERT INTO metrics
+            (m_name, m_value, m_time, service, monitor_time)
+            VALUES (?, ?, ?, ?, toTimestamp(now()))
+            """
+        )
+
+    def set_batch(self):
+        self.batch = BatchStatement()
+
+    def execute_batch(self):
+        self.session.execute(self.batch)
+
+    def insert_tracing(self, table, traceid, s, tags):
+        self.session.set_keyspace(self.keyspace)
+        if 'upstream_cluster' not in tags:
+            logging.debug('NO UPSTREAM_CLUSTER KEY')
+            tags['upstream_cluster'] = 'none'
+        try:
+            self.batch.add(self.insert_tracing_stmt,
+                           (s['spanID'], traceid, s['duration'],
+                            s['operationName'], tags['node_id'],
+                            tags['http.url'], tags['upstream_cluster']))
+        except Exception as e:
+            logging.debug('{} {} {} {} {} {} {}'.format(s['spanID'], traceid,
+                          s['duration'], s['operationName'], tags['node_id'],
+                          tags['http.url'], tags['upstream_cluster']))
+            logging.debug(e)
+
+    def insert_trace(self, traceid, processes):
+        self.session.set_keyspace(self.keyspace)
+        self.session.execute(
+            """
+            INSERT INTO traces (traceid, processes)
+            VALUES (%s, %s)
+            """,
+            (traceid,  processes)
+        )
+
+    def insert_metric(self, m_name, m_value, m_time, service):
+        self.session.set_keyspace(self.keyspace)
+        self.batch.add(self.insert_metric_stmt,
+                       (m_name, m_value, m_time, service))
+
+
+def main():
+    cass = CassandraOps(CASSANDRA_HOSTS)
+    cass.init_visibility()
+
+
+if __name__ == '__main__':
+    main()
diff --git a/clover/collector/db/redisops.py b/clover/collector/db/redisops.py
new file mode 100644 (file)
index 0000000..e80c417
--- /dev/null
@@ -0,0 +1,59 @@
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+import redis
+import logging
+
+REDIS_HOST = 'redis'
+# REDIS_HOST = '10.244.0.85'
+
+
+class RedisOps:
+
+    def __init__(self, host=REDIS_HOST):
+        logging.basicConfig(filename='redisops.log',
+                            level=logging.DEBUG)
+        try:
+            self.r = redis.StrictRedis(host=host, port=6379, db=0)
+        except Exception as e:
+            logging.debug(e)
+
+    def init_services(self, skey='visibility_services'):
+        service_names = ['http_lb', 'proxy_access_control']
+        for s in service_names:
+            self.r.sadd(skey, s)
+
+    def init_metrics(self, pkey='metric_prefixes', skey='metric_suffixes'):
+        metric_prefixes = ['envoy_cluster_out_', 'envoy_cluster_in_']
+        metric_suffixes = [
+            '_default_svc_cluster_local_http_internal_upstream_rq_2xx',
+            '_default_svc_cluster_local_http_upstream_cx_active']
+        for p in metric_prefixes:
+            self.r.sadd(pkey, p)
+        for s in metric_suffixes:
+            self.r.sadd(skey, s)
+
+    def get_services(self, skey='visibility_services'):
+        services = self.r.smembers(skey)
+        return services
+
+    def get_metrics(self, pkey='metric_prefixes', skey='metric_suffixes'):
+        prefixes = self.r.smembers(pkey)
+        suffixes = self.r.smembers(skey)
+        return prefixes, suffixes
+
+
+def main():
+    r = RedisOps()
+    r.init_services()
+    r.init_metrics()
+    r.get_services()
+    r.get_metrics()
+
+
+if __name__ == '__main__':
+    main()
diff --git a/clover/collector/docker/Dockerfile b/clover/collector/docker/Dockerfile
new file mode 100644 (file)
index 0000000..1714420
--- /dev/null
@@ -0,0 +1,30 @@
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+FROM opnfv/clover
+
+ENV REPOS_DIR="/home/opnfv/repos"
+
+# Clover repo
+ENV CLOVER_REPO_DIR="${REPOS_DIR}/clover"
+
+# Install required python packages
+RUN python -m pip install cassandra-driver redis
+
+# Set work directory
+WORKDIR ${CLOVER_REPO_DIR}
+
+COPY /process clover/collector/process
+COPY /grpc clover/collector/grpc
+COPY /db clover/collector/db
+COPY __init__.py clover/collector/__init__.py
+
+RUN pip install .
+
+WORKDIR "${CLOVER_REPO_DIR}/clover/collector"
+
+CMD ./process/grpc_process.sh no_schema_init
diff --git a/clover/collector/grpc/__init__.py b/clover/collector/grpc/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/clover/collector/grpc/build_proto.sh b/clover/collector/grpc/build_proto.sh
new file mode 100755 (executable)
index 0000000..44467ad
--- /dev/null
@@ -0,0 +1,11 @@
+#!/bin/bash
+#
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+
+python -m grpc_tools.protoc -I./ --python_out=. --grpc_python_out=. collector.proto
diff --git a/clover/collector/grpc/collector.proto b/clover/collector/grpc/collector.proto
new file mode 100644 (file)
index 0000000..fc8b636
--- /dev/null
@@ -0,0 +1,45 @@
+// Copyright (c) Authors of Clover
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Apache License, Version 2.0
+// which accompanies this distribution, and is available at
+// http://www.apache.org/licenses/LICENSE-2.0
+
+syntax = "proto3";
+
+package collector;
+
+// The controller service definition.
+service Controller {
+
+  rpc StopCollector (ConfigCollector) returns (CollectorReply) {}
+  rpc StartCollector (ConfigCollector) returns (CollectorReply) {}
+  rpc InitVisibility (ConfigCassandra) returns (CollectorReply) {}
+  rpc TruncateVisibility (Schemas) returns (CollectorReply) {}
+}
+
+message ConfigCassandra {
+  string cassandra_hosts = 1;
+  int32 cassandra_port = 2;
+}
+
+message ConfigCollector {
+  string t_port = 1;
+  string t_host = 2;
+  string m_port = 3;
+  string m_host = 4;
+  string c_port = 5;
+  string c_hosts = 6;
+  string sinterval = 7;
+}
+
+message Schemas {
+  string schemas = 1;
+  string cassandra_hosts = 2;
+  int32 cassandra_port = 3;
+
+}
+
+message CollectorReply {
+  string message = 1;
+}
diff --git a/clover/collector/grpc/collector_client.py b/clover/collector/grpc/collector_client.py
new file mode 100644 (file)
index 0000000..b9e9f67
--- /dev/null
@@ -0,0 +1,105 @@
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+from __future__ import print_function
+from kubernetes import client, config
+
+import grpc
+import argparse
+import pickle
+
+import collector_pb2
+import collector_pb2_grpc
+
+# This is a basic client script to test server GRPC messaging
+# TODO improve interface overall
+
+
+def run(args, grpc_port='50054'):
+    pod_ip = get_podip('clover-collector')
+    if pod_ip == '':
+        return "Can not find service: {}".format(args['service_name'])
+    collector_grpc = pod_ip + ':' + grpc_port
+    channel = grpc.insecure_channel(collector_grpc)
+    stub = collector_pb2_grpc.ControllerStub(channel)
+
+    if args['cmd'] == 'init':
+        return init_visibility(stub)
+    elif args['cmd'] == 'start':
+        return start_collector(stub)
+    elif args['cmd'] == 'stop':
+        return stop_collector(stub)
+    elif args['cmd'] == 'clean':
+        return clean_visibility(stub)
+    else:
+        return "Invalid command: {}".format(args['cmd'])
+
+
+def get_podip(pod_name):
+    ip = ''
+    if pod_name != '':
+        config.load_kube_config()
+        v1 = client.CoreV1Api()
+        ret = v1.list_pod_for_all_namespaces(watch=False)
+        for i in ret.items:
+            if i.metadata.name.lower().find(pod_name.lower()) != -1:
+                print("Pod IP: {}".format(i.status.pod_ip))
+                ip = i.status.pod_ip
+                return str(ip)
+    return str(ip)
+
+
+def init_visibility(stub):
+    try:
+        cassandra_hosts = pickle.dumps(['cassandra.default'])
+        response = stub.InitVisibility(collector_pb2.ConfigCassandra(
+            cassandra_hosts=cassandra_hosts, cassandra_port=9042))
+    except Exception as e:
+        return e
+    return response.message
+
+
+def clean_visibility(stub):
+    try:
+        cassandra_hosts = pickle.dumps(['cassandra.default'])
+        schemas = pickle.dumps(['spans', 'traces', 'metrics'])
+        response = stub.TruncateVisibility(collector_pb2.Schemas(
+            schemas=schemas, cassandra_hosts=cassandra_hosts,
+            cassandra_port=9042))
+    except Exception as e:
+        return e
+    return response.message
+
+
+def start_collector(stub):
+    try:
+        cassandra_hosts = pickle.dumps(['cassandra.default'])
+        response = stub.StartCollector(collector_pb2.ConfigCollector(
+            t_port='16686', t_host='jaeger-deployment.istio-system',
+            m_port='9090', m_host='prometheus.istio-system',
+            c_port='9042', c_hosts=cassandra_hosts,
+            sinterval='5'))
+    except Exception as e:
+        return e
+    return response.message
+
+
+def stop_collector(stub):
+    try:
+        response = stub.StopCollector(collector_pb2.ConfigCollector())
+    except Exception as e:
+        return e
+    return response.message
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser()
+    parser.add_argument(
+            '--cmd', required=True,
+            help='Command to execute in collector')
+    args = parser.parse_args()
+    print(run(vars(args)))
diff --git a/clover/collector/grpc/collector_pb2.py b/clover/collector/grpc/collector_pb2.py
new file mode 100644 (file)
index 0000000..f67c880
--- /dev/null
@@ -0,0 +1,300 @@
+# Generated by the protocol buffer compiler.  DO NOT EDIT!
+# source: collector.proto
+
+import sys
+_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import message as _message
+from google.protobuf import reflection as _reflection
+from google.protobuf import symbol_database as _symbol_database
+from google.protobuf import descriptor_pb2
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+
+
+DESCRIPTOR = _descriptor.FileDescriptor(
+  name='collector.proto',
+  package='collector',
+  syntax='proto3',
+  serialized_pb=_b('\n\x0f\x63ollector.proto\x12\tcollector\"B\n\x0f\x43onfigCassandra\x12\x17\n\x0f\x63\x61ssandra_hosts\x18\x01 \x01(\t\x12\x16\n\x0e\x63\x61ssandra_port\x18\x02 \x01(\x05\"\x85\x01\n\x0f\x43onfigCollector\x12\x0e\n\x06t_port\x18\x01 \x01(\t\x12\x0e\n\x06t_host\x18\x02 \x01(\t\x12\x0e\n\x06m_port\x18\x03 \x01(\t\x12\x0e\n\x06m_host\x18\x04 \x01(\t\x12\x0e\n\x06\x63_port\x18\x05 \x01(\t\x12\x0f\n\x07\x63_hosts\x18\x06 \x01(\t\x12\x11\n\tsinterval\x18\x07 \x01(\t\"K\n\x07Schemas\x12\x0f\n\x07schemas\x18\x01 \x01(\t\x12\x17\n\x0f\x63\x61ssandra_hosts\x18\x02 \x01(\t\x12\x16\n\x0e\x63\x61ssandra_port\x18\x03 \x01(\x05\"!\n\x0e\x43ollectorReply\x12\x0f\n\x07message\x18\x01 \x01(\t2\xb3\x02\n\nController\x12H\n\rStopCollector\x12\x1a.collector.ConfigCollector\x1a\x19.collector.CollectorReply\"\x00\x12I\n\x0eStartCollector\x12\x1a.collector.ConfigCollector\x1a\x19.collector.CollectorReply\"\x00\x12I\n\x0eInitVisibility\x12\x1a.collector.ConfigCassandra\x1a\x19.collector.CollectorReply\"\x00\x12\x45\n\x12TruncateVisibility\x12\x12.collector.Schemas\x1a\x19.collector.CollectorReply\"\x00\x62\x06proto3')
+)
+
+
+
+
+_CONFIGCASSANDRA = _descriptor.Descriptor(
+  name='ConfigCassandra',
+  full_name='collector.ConfigCassandra',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='cassandra_hosts', full_name='collector.ConfigCassandra.cassandra_hosts', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='cassandra_port', full_name='collector.ConfigCassandra.cassandra_port', index=1,
+      number=2, type=5, cpp_type=1, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=30,
+  serialized_end=96,
+)
+
+
+_CONFIGCOLLECTOR = _descriptor.Descriptor(
+  name='ConfigCollector',
+  full_name='collector.ConfigCollector',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='t_port', full_name='collector.ConfigCollector.t_port', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='t_host', full_name='collector.ConfigCollector.t_host', index=1,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='m_port', full_name='collector.ConfigCollector.m_port', index=2,
+      number=3, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='m_host', full_name='collector.ConfigCollector.m_host', index=3,
+      number=4, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='c_port', full_name='collector.ConfigCollector.c_port', index=4,
+      number=5, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='c_hosts', full_name='collector.ConfigCollector.c_hosts', index=5,
+      number=6, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='sinterval', full_name='collector.ConfigCollector.sinterval', index=6,
+      number=7, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=99,
+  serialized_end=232,
+)
+
+
+_SCHEMAS = _descriptor.Descriptor(
+  name='Schemas',
+  full_name='collector.Schemas',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='schemas', full_name='collector.Schemas.schemas', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='cassandra_hosts', full_name='collector.Schemas.cassandra_hosts', index=1,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='cassandra_port', full_name='collector.Schemas.cassandra_port', index=2,
+      number=3, type=5, cpp_type=1, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=234,
+  serialized_end=309,
+)
+
+
+_COLLECTORREPLY = _descriptor.Descriptor(
+  name='CollectorReply',
+  full_name='collector.CollectorReply',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='message', full_name='collector.CollectorReply.message', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=311,
+  serialized_end=344,
+)
+
+DESCRIPTOR.message_types_by_name['ConfigCassandra'] = _CONFIGCASSANDRA
+DESCRIPTOR.message_types_by_name['ConfigCollector'] = _CONFIGCOLLECTOR
+DESCRIPTOR.message_types_by_name['Schemas'] = _SCHEMAS
+DESCRIPTOR.message_types_by_name['CollectorReply'] = _COLLECTORREPLY
+_sym_db.RegisterFileDescriptor(DESCRIPTOR)
+
+ConfigCassandra = _reflection.GeneratedProtocolMessageType('ConfigCassandra', (_message.Message,), dict(
+  DESCRIPTOR = _CONFIGCASSANDRA,
+  __module__ = 'collector_pb2'
+  # @@protoc_insertion_point(class_scope:collector.ConfigCassandra)
+  ))
+_sym_db.RegisterMessage(ConfigCassandra)
+
+ConfigCollector = _reflection.GeneratedProtocolMessageType('ConfigCollector', (_message.Message,), dict(
+  DESCRIPTOR = _CONFIGCOLLECTOR,
+  __module__ = 'collector_pb2'
+  # @@protoc_insertion_point(class_scope:collector.ConfigCollector)
+  ))
+_sym_db.RegisterMessage(ConfigCollector)
+
+Schemas = _reflection.GeneratedProtocolMessageType('Schemas', (_message.Message,), dict(
+  DESCRIPTOR = _SCHEMAS,
+  __module__ = 'collector_pb2'
+  # @@protoc_insertion_point(class_scope:collector.Schemas)
+  ))
+_sym_db.RegisterMessage(Schemas)
+
+CollectorReply = _reflection.GeneratedProtocolMessageType('CollectorReply', (_message.Message,), dict(
+  DESCRIPTOR = _COLLECTORREPLY,
+  __module__ = 'collector_pb2'
+  # @@protoc_insertion_point(class_scope:collector.CollectorReply)
+  ))
+_sym_db.RegisterMessage(CollectorReply)
+
+
+
+_CONTROLLER = _descriptor.ServiceDescriptor(
+  name='Controller',
+  full_name='collector.Controller',
+  file=DESCRIPTOR,
+  index=0,
+  options=None,
+  serialized_start=347,
+  serialized_end=654,
+  methods=[
+  _descriptor.MethodDescriptor(
+    name='StopCollector',
+    full_name='collector.Controller.StopCollector',
+    index=0,
+    containing_service=None,
+    input_type=_CONFIGCOLLECTOR,
+    output_type=_COLLECTORREPLY,
+    options=None,
+  ),
+  _descriptor.MethodDescriptor(
+    name='StartCollector',
+    full_name='collector.Controller.StartCollector',
+    index=1,
+    containing_service=None,
+    input_type=_CONFIGCOLLECTOR,
+    output_type=_COLLECTORREPLY,
+    options=None,
+  ),
+  _descriptor.MethodDescriptor(
+    name='InitVisibility',
+    full_name='collector.Controller.InitVisibility',
+    index=2,
+    containing_service=None,
+    input_type=_CONFIGCASSANDRA,
+    output_type=_COLLECTORREPLY,
+    options=None,
+  ),
+  _descriptor.MethodDescriptor(
+    name='TruncateVisibility',
+    full_name='collector.Controller.TruncateVisibility',
+    index=3,
+    containing_service=None,
+    input_type=_SCHEMAS,
+    output_type=_COLLECTORREPLY,
+    options=None,
+  ),
+])
+_sym_db.RegisterServiceDescriptor(_CONTROLLER)
+
+DESCRIPTOR.services_by_name['Controller'] = _CONTROLLER
+
+# @@protoc_insertion_point(module_scope)
diff --git a/clover/collector/grpc/collector_pb2_grpc.py b/clover/collector/grpc/collector_pb2_grpc.py
new file mode 100644 (file)
index 0000000..a7be73c
--- /dev/null
@@ -0,0 +1,97 @@
+# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
+import grpc
+
+import collector_pb2 as collector__pb2
+
+
+class ControllerStub(object):
+  """The controller service definition.
+  """
+
+  def __init__(self, channel):
+    """Constructor.
+
+    Args:
+      channel: A grpc.Channel.
+    """
+    self.StopCollector = channel.unary_unary(
+        '/collector.Controller/StopCollector',
+        request_serializer=collector__pb2.ConfigCollector.SerializeToString,
+        response_deserializer=collector__pb2.CollectorReply.FromString,
+        )
+    self.StartCollector = channel.unary_unary(
+        '/collector.Controller/StartCollector',
+        request_serializer=collector__pb2.ConfigCollector.SerializeToString,
+        response_deserializer=collector__pb2.CollectorReply.FromString,
+        )
+    self.InitVisibility = channel.unary_unary(
+        '/collector.Controller/InitVisibility',
+        request_serializer=collector__pb2.ConfigCassandra.SerializeToString,
+        response_deserializer=collector__pb2.CollectorReply.FromString,
+        )
+    self.TruncateVisibility = channel.unary_unary(
+        '/collector.Controller/TruncateVisibility',
+        request_serializer=collector__pb2.Schemas.SerializeToString,
+        response_deserializer=collector__pb2.CollectorReply.FromString,
+        )
+
+
+class ControllerServicer(object):
+  """The controller service definition.
+  """
+
+  def StopCollector(self, request, context):
+    # missing associated documentation comment in .proto file
+    pass
+    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+    context.set_details('Method not implemented!')
+    raise NotImplementedError('Method not implemented!')
+
+  def StartCollector(self, request, context):
+    # missing associated documentation comment in .proto file
+    pass
+    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+    context.set_details('Method not implemented!')
+    raise NotImplementedError('Method not implemented!')
+
+  def InitVisibility(self, request, context):
+    # missing associated documentation comment in .proto file
+    pass
+    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+    context.set_details('Method not implemented!')
+    raise NotImplementedError('Method not implemented!')
+
+  def TruncateVisibility(self, request, context):
+    # missing associated documentation comment in .proto file
+    pass
+    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+    context.set_details('Method not implemented!')
+    raise NotImplementedError('Method not implemented!')
+
+
+def add_ControllerServicer_to_server(servicer, server):
+  rpc_method_handlers = {
+      'StopCollector': grpc.unary_unary_rpc_method_handler(
+          servicer.StopCollector,
+          request_deserializer=collector__pb2.ConfigCollector.FromString,
+          response_serializer=collector__pb2.CollectorReply.SerializeToString,
+      ),
+      'StartCollector': grpc.unary_unary_rpc_method_handler(
+          servicer.StartCollector,
+          request_deserializer=collector__pb2.ConfigCollector.FromString,
+          response_serializer=collector__pb2.CollectorReply.SerializeToString,
+      ),
+      'InitVisibility': grpc.unary_unary_rpc_method_handler(
+          servicer.InitVisibility,
+          request_deserializer=collector__pb2.ConfigCassandra.FromString,
+          response_serializer=collector__pb2.CollectorReply.SerializeToString,
+      ),
+      'TruncateVisibility': grpc.unary_unary_rpc_method_handler(
+          servicer.TruncateVisibility,
+          request_deserializer=collector__pb2.Schemas.FromString,
+          response_serializer=collector__pb2.CollectorReply.SerializeToString,
+      ),
+  }
+  generic_handler = grpc.method_handlers_generic_handler(
+      'collector.Controller', rpc_method_handlers)
+  server.add_generic_rpc_handlers((generic_handler,))
diff --git a/clover/collector/grpc/collector_server.py b/clover/collector/grpc/collector_server.py
new file mode 100644 (file)
index 0000000..c2eb221
--- /dev/null
@@ -0,0 +1,98 @@
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+
+from concurrent import futures
+from clover.collector.db.cassops import CassandraOps
+import time
+import sys
+import grpc
+import subprocess
+import pickle
+import logging
+import collector_pb2
+import collector_pb2_grpc
+
+
+_ONE_DAY_IN_SECONDS = 60 * 60 * 24
+GRPC_PORT = '[::]:50054'
+
+
+class Controller(collector_pb2_grpc.ControllerServicer):
+
+    def __init__(self, init_visibility):
+        logging.basicConfig(filename='collector_server.log',
+                            level=logging.DEBUG)
+        self.collector = 0
+        if init_visibility == 'set_schemas':
+            cassandra_hosts = pickle.dumps(['cassandra.default'])
+            self.InitVisibility(collector_pb2.ConfigCassandra(
+               cassandra_port=9042, cassandra_hosts=cassandra_hosts), "")
+
+    def StopCollector(self, r, context):
+        try:
+            subprocess.Popen.kill(self.collector)
+            msg = "Stopped collector on pid: {}".format(self.collector.pid)
+        except Exception as e:
+            logging.debug(e)
+            msg = "Failed to stop collector"
+        return collector_pb2.CollectorReply(message=msg)
+
+    def StartCollector(self, r, context):
+        try:
+            self.collector = subprocess.Popen(
+              ["python", "process/collect.py",
+               "-sinterval={}".format(r.sinterval),
+               "-c_port={}".format(r.c_port),
+               "-t_port={}".format(r.t_port), "-t_host={}".format(r.t_host),
+               "-m_port={}".format(r.m_port), "-m_host={}".format(r.m_host),
+               "-c_hosts={}".format(pickle.loads(r.c_hosts)), "&"],
+              shell=False)
+            msg = "Started collector on pid: {}".format(self.collector.pid)
+        except Exception as e:
+            logging.debug(e)
+            msg = e
+        return collector_pb2.CollectorReply(message=msg)
+
+    def InitVisibility(self, r, context):
+        try:
+            cass = CassandraOps(pickle.loads(r.cassandra_hosts),
+                                r.cassandra_port)
+            cass.init_visibility()
+            msg = "Added visibility schemas in cassandra"
+        except Exception as e:
+            logging.debug(e)
+            msg = "Failed to initialize cassandra"
+        return collector_pb2.CollectorReply(message=msg)
+
+    def TruncateVisibility(self, r, context):
+        try:
+            cass = CassandraOps(pickle.loads(r.cassandra_hosts),
+                                r.cassandra_port)
+            cass.truncate(pickle.loads(r.schemas))
+            msg = "Truncated visibility tables"
+        except Exception as e:
+            logging.debug(e)
+            msg = "Failed to truncate visibility"
+        return collector_pb2.CollectorReply(message=msg)
+
+
+def serve(init_visibility):
+    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+    collector_pb2_grpc.add_ControllerServicer_to_server(
+                    Controller(init_visibility), server)
+    server.add_insecure_port(GRPC_PORT)
+    server.start()
+    try:
+        while True:
+            time.sleep(_ONE_DAY_IN_SECONDS)
+    except KeyboardInterrupt:
+        server.stop(0)
+
+
+if __name__ == '__main__':
+    serve(sys.argv[1])
diff --git a/clover/collector/process/__init__.py b/clover/collector/process/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/clover/collector/process/collect.py b/clover/collector/process/collect.py
new file mode 100644 (file)
index 0000000..d8beb49
--- /dev/null
@@ -0,0 +1,162 @@
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+from clover.tracing.tracing import Tracing
+from clover.monitoring.monitoring import Monitoring
+from clover.collector.db.cassops import CassandraOps
+from clover.collector.db.redisops import RedisOps
+
+# import pprint
+import time
+import argparse
+import logging
+import ast
+
+TRACING_SERVICES = ['istio-ingress']
+TRACING_PORT = "16686"
+MONITORING_PORT = "9090"
+CASSANDRA_PORT = 9042  # Provide as integer
+MONITORING_HOST = "prometheus.istio-system"
+TRACING_HOST = "jaeger-deployment.istio-system"
+CASSANDRA_HOSTS = ['cassandra.default']
+
+
+class Collector:
+
+    def __init__(self, t_port, t_host, m_port, m_host, c_port, c_hosts):
+        logging.basicConfig(filename='collector.log', level=logging.DEBUG)
+        try:
+            self.t = Tracing(t_host, t_port, '', False)
+            monitoring_url = "http://{}:{}".format(m_host, m_port)
+            self.m = Monitoring(monitoring_url)
+            self.c = CassandraOps(c_hosts, int(c_port))
+            self.c.set_prepared()
+            self.r = RedisOps()
+        except Exception as e:
+                logging.debug(e)
+
+    # Toplevel tracing retrieval and batch insert
+    def get_tracing(self, services, time_back=20):
+        self.c.set_batch()
+        for service in services:
+            traces = self.t.getTraces(service, time_back)
+            try:
+                self.set_tracing(traces)
+            except Exception as e:
+                logging.debug(e)
+        self.c.execute_batch()
+
+    # Insert to cassandra visibility traces and spans tables
+    def set_tracing(self, trace):
+        for traces in trace['data']:
+            for spans in traces['spans']:
+                    span = {}
+                    span['spanID'] = spans['spanID']
+                    span['duration'] = spans['duration']
+                    span['startTime'] = spans['startTime']
+                    span['operationName'] = spans['operationName']
+                    tag = {}
+                    for tags in spans['tags']:
+                        tag[tags['key']] = tags['value']
+                    self.c.insert_tracing('spans', traces['traceID'],
+                                          span, tag)
+            process_list = []
+            for p in traces['processes']:
+                process_list.append(p)
+            service_names = []
+            for pname in process_list:
+                service_names.append(traces['processes'][pname]['serviceName'])
+            self.c.insert_trace(traces['traceID'], service_names)
+
+    # Insert to cassandra visibility metrics table
+    def get_monitoring(self):
+
+        # Fetch collector service/metric lists from redis
+        service_names = self.r.get_services()
+        metric_prefixes, metric_suffixes = self.r.get_metrics()
+
+        self.c.set_batch()
+        for sname in service_names:
+            for prefix in metric_prefixes:
+                for suffix in metric_suffixes:
+                    try:
+                        metric_name = prefix + sname + suffix
+                        query_params = {
+                            "type": "instant",
+                            "query": metric_name
+                        }
+                        data = self.m.query(query_params)
+                        m_value = data['data']['result'][0]['value'][1]
+                        m_time = data['data']['result'][0]['value'][0]
+                        mn = data['data']['result'][0]['metric']['__name__']
+                        self.c.insert_metric(mn, m_value, str(m_time), sname)
+                    except Exception as e:
+                        logging.debug(e)
+        self.c.execute_batch()
+
+        # TODO add batch retrieval for monitoring metrics
+        # query_range_param = {
+        #         "type": "range",
+        #         "query": "tbd",
+        #         "start": "60m",
+        #         "end": "5m",
+        #         "step": "30s"
+        # }
+        # data = self.m.query(query_range_param)
+        # pp = pprint.PrettyPrinter(indent=2)
+        # pp.pprint(data)
+
+
+def main(args):
+    if isinstance(args['c_hosts'], basestring):
+        ch = ast.literal_eval(args['c_hosts'])
+    else:
+        ch = args['c_hosts']
+
+    c = Collector(args['t_port'], args['t_host'], args['m_port'],
+                  args['m_host'], args['c_port'], ch)
+
+    # Collector loop
+    loop = True
+    while loop:
+        try:
+            c.get_tracing(args['t_services'])
+            c.get_monitoring()
+            time.sleep(int(args['sinterval']))
+        except KeyboardInterrupt:
+            loop = False
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser()
+    parser.add_argument(
+            '-sinterval', default=5,
+            help='Sample interval for collector loop')
+    parser.add_argument(
+            '-t_port', default=TRACING_PORT,
+            help='Port to access Jaeger tracing')
+    parser.add_argument(
+            '-m_port', default=MONITORING_PORT,
+            help='Port to access Prometheus monitoring')
+    parser.add_argument(
+            '-t_host', default=TRACING_HOST,
+            help='Host to access Jaeger tracing')
+    parser.add_argument(
+            '-m_host', default=MONITORING_HOST,
+            help='Host to access Prometheus monitoring')
+    parser.add_argument(
+            '-c_hosts', default=CASSANDRA_HOSTS,
+            help='Host(s) to access Cassandra cluster')
+    parser.add_argument(
+            '-c_port', default=CASSANDRA_PORT,
+            help='Port to access Cassandra cluster')
+    parser.add_argument(
+            '-t_services', default=TRACING_SERVICES,
+            help='Collect services on this list of services')
+
+    args, unknown = parser.parse_known_args()
+    print(main(vars(args)))
diff --git a/clover/collector/process/grpc_process.sh b/clover/collector/process/grpc_process.sh
new file mode 100755 (executable)
index 0000000..30e0171
--- /dev/null
@@ -0,0 +1,11 @@
+#!/bin/bash
+#
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+
+python grpc/collector_server.py test1
diff --git a/clover/collector/yaml/manifest.template b/clover/collector/yaml/manifest.template
new file mode 100644 (file)
index 0000000..c7aa3e7
--- /dev/null
@@ -0,0 +1,43 @@
+---
+apiVersion: extensions/v1beta1
+kind: Deployment
+metadata:
+  name: {{ deploy_name }}
+  labels:
+    app: {{ deploy_name }}
+spec:
+  template:
+    metadata:
+      labels:
+        app: {{ deploy_name }}
+    spec:
+      containers:
+        - name: {{ deploy_name }}
+          image: {{ image_path }}/{{ image_name }}:{{ image_tag }}
+          ports:
+           - containerPort: {{ grpc_port }}
+           - containerPort: {{ redis_port }}
+           - containerPort: {{ monitor_port }}
+           - containerPort: {{ trace_port }}
+           - containerPort: {{ cass_port }}
+---
+apiVersion: v1
+kind: Service
+metadata:
+  name: {{ deploy_name }}
+  labels:
+    app: {{ deploy_name }}
+spec:
+  ports:
+  - port: {{ grpc_port }}
+    name: grpc
+  - port: {{ redis_port }}
+    name: redis
+  - port: {{ trace_port }}
+    name: jaeger-deployment
+  - port: {{ monitor_port }}
+    name: prometheus
+  - port: {{ cass_port }}
+    name: cassandra
+  selector:
+    app: {{ deploy_name }}
diff --git a/clover/collector/yaml/render_yaml.py b/clover/collector/yaml/render_yaml.py
new file mode 100644 (file)
index 0000000..c1d8be7
--- /dev/null
@@ -0,0 +1,73 @@
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+import argparse
+
+from jinja2 import Template
+
+
+def render_yaml(args):
+    template_file = 'manifest.template'
+    out_file = args['deploy_name'] + '.yaml'
+
+    try:
+        with open(template_file) as f:
+            tmpl = Template(f.read())
+        output = tmpl.render(
+            image_path=args['image_path'],
+            image_name=args['image_name'],
+            image_tag=args['image_tag'],
+            deploy_name=args['deploy_name'],
+            grpc_port=args['grpc_port'],
+            monitor_port=args['monitor_port'],
+            redis_port=args['redis_port'],
+            cass_port=args['cass_port'],
+            trace_port=args['trace_port']
+        )
+        with open(out_file, "wb") as fh:
+            fh.write(output)
+        return "Generated manifest for {}".format(args['deploy_name'])
+    except Exception as e:
+        print(e)
+        return "Unable to generate manifest for {}".format(
+                                        args['deploy_name'])
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser()
+    parser.add_argument(
+            '--image_name', default='clover-collector',
+            help='The image name to use')
+    parser.add_argument(
+            # '--image_path', default='opnfv',
+            '--image_path', default='localhost:5000',
+            help='The path to the image to use')
+    parser.add_argument(
+            # '--image_tag', default='opnfv-6.0.0',
+            '--image_tag', default='latest',
+            help='The image tag to use')
+    parser.add_argument(
+            '--deploy_name', default='clover-collector',
+            help='The k8s deploy name to use')
+    parser.add_argument(
+            '--redis_port', default='6379',
+            help='The redis port to connect for management')
+    parser.add_argument(
+            '--monitor_port', default='9090',
+            help='The Prometheus monitoring port')
+    parser.add_argument(
+            '--grpc_port', default='50054',
+            help='The GRPC server port for collector management')
+    parser.add_argument(
+            '--trace_port', default='16686',
+            help='The Jaeger tracing port')
+    parser.add_argument(
+            '--cass_port', default='9042',
+            help='The Cassandra port')
+
+    args = parser.parse_args()
+    print(render_yaml(vars(args)))
index 9726fd1..ec97e82 100644 (file)
@@ -90,8 +90,9 @@ class Monitoring(object):
 
             print("query %s %s, status=%s, size=%d, dur=%.3f" % \
                 (self.host, query_params["query"], resp.status_code, len(resp.text), dur))
-            pp = pprint.PrettyPrinter(indent=2)
-            pp.pprint(resp.json())
+            #pp = pprint.PrettyPrinter(indent=2)
+            ##pp.pprint(resp.json())
+            return resp.json()
 
         except Exception as e:
             print("ERROR: Could not query prometheus instance %s. \n %s" % (url, e))