Updated monitor.py and grafana dashboard 93/47293/4
authorBryan Sullivan <bryan.sullivan@att.com>
Thu, 16 Nov 2017 00:27:17 +0000 (16:27 -0800)
committerBryan Sullivan <bryan.sullivan@att.com>
Mon, 20 Nov 2017 04:48:54 +0000 (20:48 -0800)
JIRA: VES-2
Various fixes in ves-setup
Add demo_deploy.sh
Refactored to containerized agent and collector

Change-Id: I8851465742aaf40a4cce265508a3d2d66abced08
Signed-off-by: Bryan Sullivan <bryan.sullivan@att.com>
tools/demo_deploy.sh [new file with mode: 0644]
tools/grafana/Dashboard.json [new file with mode: 0644]
tools/monitor.py [new file with mode: 0644]
tools/ves-setup.sh

diff --git a/tools/demo_deploy.sh b/tools/demo_deploy.sh
new file mode 100644 (file)
index 0000000..b0b76e8
--- /dev/null
@@ -0,0 +1,75 @@
+#!/bin/bash
+# Copyright 2017 AT&T Intellectual Property, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#. What this is: Complete scripted deployment of the VES monitoring framework
+#  When complete, the following will be installed:
+#.  - On the specified master node, a Kafka server and containers running the 
+#     OPNFV Barometer VES agent, OPNFV VES collector, InfluxDB, and Grafana 
+#.  - On each specified worker node, collectd configured per OPNFV Barometer
+#.
+#. Prerequisites:
+#. - Ubuntu server for kubernetes cluster nodes (master and worker nodes)
+#. - MAAS server as cluster admin for kubernetes master/worker nodes
+#. - Password-less ssh key provided for node setup
+#. - hostname of kubernetes master setup in DNS or /etc/hosts
+#. Usage: on the MAAS server
+#. $ git clone https://gerrit.opnfv.org/gerrit/ves ~/ves
+#. $ bash ~/ves/tools/demo_deploy.sh master <node> <key> 
+#.   master: setup VES on k8s master
+#.   <node>: IP of cluster master node
+#.   <key>: SSH key enabling password-less SSH to nodes
+#. $ bash ~/ves/tools/demo_deploy.sh worker <node> <key> 
+#.   worker: setup VES on k8s worker
+#.   <node>: IP of worker node
+#.   <key>: SSH key enabling password-less SSH to nodes
+
+node=$2
+key=$3
+
+eval `ssh-agent`
+ssh-add $key
+if [[ "$1" == "master" ]]; then
+  echo; echo "$0 $(date): Setting up master node"
+  scp -r -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no \
+    ~/ves ubuntu@$node:/tmp
+  ssh -x -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no ubuntu@$node <<EOF
+  ves_host="$master"
+  export ves_host
+  ves_mode="guest"
+  export ves_mode
+  ves_user="hello"
+  export ves_user
+  ves_pass="world"
+  export ves_pass
+  ves_kafka_host="$node"
+  export ves_kafka_host
+  bash /tmp/ves/tools/ves-setup.sh collector
+  bash /tmp/ves/tools/ves-setup.sh kafka
+  bash /tmp/ves/tools/ves-setup.sh collectd
+  bash /tmp/ves/tools/ves-setup.sh agent
+EOF
+  mkdir /tmp/ves
+  scp -r -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no \
+    ubuntu@$node:/tmp/ves/ves_env.sh /tmp/ves/.
+  echo "VES Grafana dashboards are available at http://$node:3001 (login as admin/admin)"
+else
+  echo; echo "$0 $(date): Setting up collectd at $node"
+  scp -r -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no \
+    ~/ves ubuntu@$node:/tmp
+  scp -r -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no \
+    /tmp/ves/ves_env.sh ubuntu@$node:/tmp/ves/.
+  ssh -x -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no \
+    ubuntu@$node bash /tmp/ves/tools/ves-setup.sh collectd
+fi
diff --git a/tools/grafana/Dashboard.json b/tools/grafana/Dashboard.json
new file mode 100644 (file)
index 0000000..d59e5da
--- /dev/null
@@ -0,0 +1,763 @@
+{
+"dashboard": {
+  "editable": true,
+  "gnetId": null,
+  "graphTooltip": 0,
+  "hideControls": false,
+  "id": null,
+  "links": [],
+  "refresh": "10s",
+  "rows": [
+    {
+      "collapse": false,
+      "height": 273,
+      "panels": [
+        {
+          "aliasColors": {},
+          "bars": false,
+          "dashLength": 10,
+          "dashes": false,
+          "datasource": "VESEvents",
+          "fill": 1,
+          "grid": {
+            "leftLogBase": 1,
+            "leftMax": null,
+            "leftMin": null,
+            "rightLogBase": 1,
+            "rightMax": null,
+            "rightMin": null
+          },
+          "id": 3,
+          "interval": "30s",
+          "legend": {
+            "alignAsTable": false,
+            "avg": true,
+            "current": true,
+            "max": true,
+            "min": true,
+            "rightSide": false,
+            "show": true,
+            "total": false,
+            "values": true
+          },
+          "lines": true,
+          "linewidth": 1,
+          "links": [],
+          "nullPointMode": "null",
+          "percentage": false,
+          "pointradius": 5,
+          "points": false,
+          "renderer": "flot",
+          "seriesOverrides": [],
+          "spaceLength": 10,
+          "span": 6,
+          "stack": false,
+          "steppedLine": false,
+          "targets": [
+            {
+              "alias": "CpuUser",
+              "dsType": "influxdb",
+              "groupBy": [
+                {
+                  "params": [
+                    "system"
+                  ],
+                  "type": "tag"
+                }
+              ],
+              "measurement": "load",
+              "orderByTime": "ASC",
+              "policy": "default",
+              "query": "SELECT mean(\"cpuuser\") FROM \"cpu\" WHERE \"system\" = 'computehost' AND $timeFilter GROUP BY time(1m) fill(null)",
+              "refId": "A",
+              "resultFormat": "time_series",
+              "select": [
+                [
+                  {
+                    "params": [
+                      "load-longterm"
+                    ],
+                    "type": "field"
+                  }
+                ]
+              ],
+              "tags": []
+            },
+            {
+              "alias": "CpuSystem",
+              "dsType": "influxdb",
+              "groupBy": [
+                {
+                  "params": [
+                    "system"
+                  ],
+                  "type": "tag"
+                }
+              ],
+              "measurement": "load",
+              "orderByTime": "ASC",
+              "policy": "default",
+              "query": "SELECT mean(\"cpusystem\") FROM \"cpu\" WHERE $timeFilter GROUP BY time(1m) fill(null)",
+              "refId": "B",
+              "resultFormat": "time_series",
+              "select": [
+                [
+                  {
+                    "params": [
+                      "load-shortterm"
+                    ],
+                    "type": "field"
+                  }
+                ]
+              ],
+              "tags": [
+                {
+                  "key": "system",
+                  "operator": "=",
+                  "value": "OPNFV01"
+                }
+              ]
+            },
+            {
+              "alias": "CpuIdle",
+              "dsType": "influxdb",
+              "groupBy": [
+                {
+                  "params": [
+                    "system"
+                  ],
+                  "type": "tag"
+                }
+              ],
+              "measurement": "load",
+              "orderByTime": "ASC",
+              "policy": "default",
+              "query": "SELECT mean(\"cpuidle\") FROM \"cpu\" WHERE $timeFilter GROUP BY time(1m) fill(null)",
+              "refId": "C",
+              "resultFormat": "time_series",
+              "select": [
+                [
+                  {
+                    "params": [
+                      "load-midterm"
+                    ],
+                    "type": "field"
+                  }
+                ]
+              ],
+              "tags": [
+                {
+                  "key": "system",
+                  "operator": "=",
+                  "value": "OPNFV01"
+                }
+              ]
+            }
+          ],
+          "thresholds": [],
+          "timeFrom": null,
+          "timeShift": null,
+          "title": "host load",
+          "tooltip": {
+            "shared": true,
+            "sort": 0,
+            "value_type": "individual"
+          },
+          "type": "graph",
+          "x-axis": true,
+          "xaxis": {
+            "buckets": null,
+            "mode": "time",
+            "name": null,
+            "show": true,
+            "values": []
+          },
+          "y-axis": true,
+          "y_formats": [
+            "short",
+            "short"
+          ],
+          "yaxes": [
+            {
+              "format": "short",
+              "label": "Percent",
+              "logBase": 1,
+              "max": null,
+              "min": null,
+              "show": true
+            },
+            {
+              "format": "short",
+              "label": null,
+              "logBase": 1,
+              "max": null,
+              "min": null,
+              "show": true
+            }
+          ]
+        },
+        {
+          "aliasColors": {},
+          "bars": false,
+          "dashLength": 10,
+          "dashes": false,
+          "datasource": "VESEvents",
+          "fill": 1,
+          "id": 5,
+          "legend": {
+            "avg": false,
+            "current": false,
+            "max": false,
+            "min": false,
+            "show": true,
+            "total": false,
+            "values": false
+          },
+          "lines": true,
+          "linewidth": 1,
+          "links": [],
+          "nullPointMode": "null",
+          "percentage": false,
+          "pointradius": 5,
+          "points": false,
+          "renderer": "flot",
+          "seriesOverrides": [],
+          "spaceLength": 10,
+          "span": 6,
+          "stack": false,
+          "steppedLine": false,
+          "targets": [
+            {
+              "alias": "Free",
+              "dsType": "influxdb",
+              "groupBy": [
+                {
+                  "params": [
+                    "system"
+                  ],
+                  "type": "tag"
+                }
+              ],
+              "measurement": "memoryUsage",
+              "orderByTime": "ASC",
+              "policy": "autogen",
+              "refId": "A",
+              "resultFormat": "time_series",
+              "select": [
+                [
+                  {
+                    "params": [
+                      "memoryFree"
+                    ],
+                    "type": "field"
+                  }
+                ]
+              ],
+              "tags": []
+            },
+            {
+              "alias": "Used",
+              "dsType": "influxdb",
+              "groupBy": [
+                {
+                  "params": [
+                    "system"
+                  ],
+                  "type": "tag"
+                }
+              ],
+              "measurement": "memoryUsage",
+              "orderByTime": "ASC",
+              "policy": "autogen",
+              "refId": "B",
+              "resultFormat": "time_series",
+              "select": [
+                [
+                  {
+                    "params": [
+                      "memoryUsed"
+                    ],
+                    "type": "field"
+                  }
+                ]
+              ],
+              "tags": []
+            },
+            {
+              "alias": "Buffered",
+              "dsType": "influxdb",
+              "groupBy": [
+                {
+                  "params": [
+                    "system"
+                  ],
+                  "type": "tag"
+                }
+              ],
+              "measurement": "memoryUsage",
+              "orderByTime": "ASC",
+              "policy": "autogen",
+              "refId": "C",
+              "resultFormat": "time_series",
+              "select": [
+                [
+                  {
+                    "params": [
+                      "memoryBuffered"
+                    ],
+                    "type": "field"
+                  }
+                ]
+              ],
+              "tags": []
+            },
+            {
+              "alias": "Cached",
+              "dsType": "influxdb",
+              "groupBy": [
+                {
+                  "params": [
+                    "system"
+                  ],
+                  "type": "tag"
+                }
+              ],
+              "measurement": "memoryUsage",
+              "orderByTime": "ASC",
+              "policy": "autogen",
+              "refId": "D",
+              "resultFormat": "time_series",
+              "select": [
+                [
+                  {
+                    "params": [
+                      "memoryCached"
+                    ],
+                    "type": "field"
+                  }
+                ]
+              ],
+              "tags": []
+            },
+            {
+              "alias": "SlabRecl",
+              "dsType": "influxdb",
+              "groupBy": [
+                {
+                  "params": [
+                    "system"
+                  ],
+                  "type": "tag"
+                }
+              ],
+              "measurement": "memoryUsage",
+              "orderByTime": "ASC",
+              "policy": "autogen",
+              "refId": "E",
+              "resultFormat": "time_series",
+              "select": [
+                [
+                  {
+                    "params": [
+                      "memorySlabRecl"
+                    ],
+                    "type": "field"
+                  }
+                ]
+              ],
+              "tags": []
+            },
+            {
+              "alias": "SlabUnrecl",
+              "dsType": "influxdb",
+              "groupBy": [
+                {
+                  "params": [
+                    "system"
+                  ],
+                  "type": "tag"
+                }
+              ],
+              "measurement": "memoryUsage",
+              "orderByTime": "ASC",
+              "policy": "autogen",
+              "refId": "F",
+              "resultFormat": "time_series",
+              "select": [
+                [
+                  {
+                    "params": [
+                      "memorySlabUnrecl"
+                    ],
+                    "type": "field"
+                  }
+                ]
+              ],
+              "tags": []
+            }
+          ],
+          "thresholds": [],
+          "timeFrom": null,
+          "timeShift": null,
+          "title": "Memory",
+          "tooltip": {
+            "shared": true,
+            "sort": 0,
+            "value_type": "individual"
+          },
+          "type": "graph",
+          "xaxis": {
+            "buckets": null,
+            "mode": "time",
+            "name": null,
+            "show": true,
+            "values": []
+          },
+          "yaxes": [
+            {
+              "format": "short",
+              "label": null,
+              "logBase": 10,
+              "max": null,
+              "min": null,
+              "show": true
+            },
+            {
+              "format": "short",
+              "label": null,
+              "logBase": 1,
+              "max": null,
+              "min": null,
+              "show": true
+            }
+          ]
+        }
+      ],
+      "repeat": null,
+      "repeatIteration": null,
+      "repeatRowId": null,
+      "showTitle": false,
+      "title": "Dashboard Row",
+      "titleSize": "h6"
+    },
+    {
+      "collapse": false,
+      "height": 250,
+      "panels": [
+        {
+          "alert": {
+            "conditions": [
+              {
+                "evaluator": {
+                  "params": [
+                    15000000
+                  ],
+                  "type": "gt"
+                },
+                "operator": {
+                  "type": "and"
+                },
+                "query": {
+                  "params": [
+                    "A",
+                    "1s",
+                    "now"
+                  ]
+                },
+                "reducer": {
+                  "params": [],
+                  "type": "avg"
+                },
+                "type": "query"
+              }
+            ],
+            "executionErrorState": "alerting",
+            "frequency": "1s",
+            "handler": 1,
+            "message": "Transmitted Traffic Exceeded limits\nClosed Loop Action:Apply Firewall Rules",
+            "name": "VES webserver_1 Network Usage alert",
+            "noDataState": "no_data",
+            "notifications": []
+          },
+          "aliasColors": {},
+          "bars": false,
+          "dashLength": 10,
+          "dashes": false,
+          "datasource": "VESEvents",
+          "fill": 1,
+          "grid": {
+            "leftLogBase": 1,
+            "leftMax": null,
+            "leftMin": null,
+            "rightLogBase": 1,
+            "rightMax": null,
+            "rightMin": null
+          },
+          "id": 2,
+          "interval": "30s",
+          "legend": {
+            "alignAsTable": false,
+            "avg": true,
+            "current": false,
+            "max": false,
+            "min": false,
+            "rightSide": false,
+            "show": true,
+            "total": false,
+            "values": true
+          },
+          "lines": true,
+          "linewidth": 2,
+          "links": [],
+          "nullPointMode": "null",
+          "percentage": false,
+          "pointradius": 5,
+          "points": false,
+          "renderer": "flot",
+          "seriesOverrides": [],
+          "spaceLength": 10,
+          "span": 6,
+          "stack": false,
+          "steppedLine": false,
+          "targets": [
+            {
+              "alias": "rxOctets",
+              "dsType": "influxdb",
+              "groupBy": [
+                {
+                  "params": [
+                    "system"
+                  ],
+                  "type": "tag"
+                },
+                {
+                  "params": [
+                    "vnic"
+                  ],
+                  "type": "tag"
+                }
+              ],
+              "measurement": "vNicPerformance",
+              "orderByTime": "ASC",
+              "policy": "default",
+              "query": "SELECT derivative(mean(\"rxoctetsacc\"), 10s) FROM \"vnic\" WHERE \"system\" = 'computehost' AND $timeFilter GROUP BY time(1m) fill(null)",
+              "refId": "A",
+              "resultFormat": "time_series",
+              "select": [
+                [
+                  {
+                    "params": [
+                      "receivedOctetsAccumulated"
+                    ],
+                    "type": "field"
+                  }
+                ]
+              ],
+              "tags": []
+            }
+          ],
+          "thresholds": [],
+          "timeFrom": null,
+          "timeShift": null,
+          "title": "Received Octets",
+          "tooltip": {
+            "shared": true,
+            "sort": 0,
+            "value_type": "individual"
+          },
+          "type": "graph",
+          "x-axis": true,
+          "xaxis": {
+            "buckets": null,
+            "mode": "time",
+            "name": null,
+            "show": true,
+            "values": []
+          },
+          "y-axis": true,
+          "y_formats": [
+            "short",
+            "short"
+          ],
+          "yaxes": [
+            {
+              "format": "short",
+              "label": "Octets/Packets",
+              "logBase": 1,
+              "max": null,
+              "min": null,
+              "show": true
+            },
+            {
+              "format": "short",
+              "label": null,
+              "logBase": 1,
+              "max": null,
+              "min": null,
+              "show": true
+            }
+          ]
+        },
+        {
+          "aliasColors": {},
+          "bars": false,
+          "dashLength": 10,
+          "dashes": false,
+          "datasource": "VESEvents",
+          "fill": 1,
+          "grid": {
+            "leftLogBase": 1,
+            "leftMax": null,
+            "leftMin": null,
+            "rightLogBase": 1,
+            "rightMax": null,
+            "rightMin": null
+          },
+          "id": 4,
+          "interval": "30s",
+          "legend": {
+            "alignAsTable": false,
+            "avg": true,
+            "current": false,
+            "max": false,
+            "min": false,
+            "rightSide": false,
+            "show": true,
+            "total": false,
+            "values": true
+          },
+          "lines": true,
+          "linewidth": 2,
+          "links": [],
+          "nullPointMode": "null",
+          "percentage": false,
+          "pointradius": 5,
+          "points": false,
+          "renderer": "flot",
+          "seriesOverrides": [],
+          "spaceLength": 10,
+          "span": 6,
+          "stack": false,
+          "steppedLine": false,
+          "targets": [
+            {
+              "alias": "txOctets",
+              "dsType": "influxdb",
+              "groupBy": [
+                {
+                  "params": [
+                    "system"
+                  ],
+                  "type": "tag"
+                },
+                {
+                  "params": [
+                    "vnic"
+                  ],
+                  "type": "tag"
+                }
+              ],
+              "measurement": "vNicPerformance",
+              "orderByTime": "ASC",
+              "policy": "default",
+              "refId": "B",
+              "resultFormat": "time_series",
+              "select": [
+                [
+                  {
+                    "params": [
+                      "transmittedOctetsAccumulated"
+                    ],
+                    "type": "field"
+                  }
+                ]
+              ],
+              "tags": []
+            }
+          ],
+          "thresholds": [],
+          "timeFrom": null,
+          "timeShift": null,
+          "title": "Transmitted Octets",
+          "tooltip": {
+            "shared": true,
+            "sort": 0,
+            "value_type": "individual"
+          },
+          "type": "graph",
+          "x-axis": true,
+          "xaxis": {
+            "buckets": null,
+            "mode": "time",
+            "name": null,
+            "show": true,
+            "values": []
+          },
+          "y-axis": true,
+          "y_formats": [
+            "short",
+            "short"
+          ],
+          "yaxes": [
+            {
+              "format": "short",
+              "label": "Octets/Packets",
+              "logBase": 1,
+              "max": null,
+              "min": null,
+              "show": true
+            },
+            {
+              "format": "short",
+              "label": null,
+              "logBase": 1,
+              "max": null,
+              "min": null,
+              "show": true
+            }
+          ]
+        }
+      ],
+      "repeat": null,
+      "repeatIteration": null,
+      "repeatRowId": null,
+      "showTitle": false,
+      "title": "Dashboard Row",
+      "titleSize": "h6"
+    }
+  ],
+  "schemaVersion": 14,
+  "style": "dark",
+  "tags": [],
+  "templating": {
+    "list": []
+  },
+  "time": {
+    "from": "now-30m",
+    "to": "now"
+  },
+  "timepicker": {
+    "now": true,
+    "refresh_intervals": [
+      "10s",
+      "20s",
+      "30s",
+      "1m"
+    ],
+    "time_options": [
+      "5m",
+      "15m",
+      "1h",
+      "6h",
+      "12h",
+      "24h",
+      "2d",
+      "7d",
+      "30d"
+    ]
+  },
+  "timezone": "browser",
+  "title": "VES Demo",
+  "version": 3
+}
+}
diff --git a/tools/monitor.py b/tools/monitor.py
new file mode 100644 (file)
index 0000000..91c0eae
--- /dev/null
@@ -0,0 +1,765 @@
+#!/usr/bin/env python
+#
+# Copyright 2016-2017 AT&T Intellectual Property, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# What this is: Monitor and closed-loop policy agent as part of the OPNFV VES
+# ves_onap_demo.
+#
+# Status: this is a work in progress, under test.
+
+from rest_dispatcher import PathDispatcher, set_404_content
+from wsgiref.simple_server import make_server
+import sys
+import os
+import platform
+import traceback
+import time
+from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
+import ConfigParser
+import logging.handlers
+from base64 import b64decode
+import string
+import json
+import jsonschema
+from functools import partial
+import requests
+
+monitor_mode = "f"
+vdu_id = ['','','','','','']
+summary_e = ['***** Summary of key stats *****','','','']
+summary_c = ['Collectd agents:']
+status = ['','Started','Started','Started']
+base_url = ''
+template_404 = b'''POST {0}'''
+columns = 0
+rows = 0
+
+class JSONObject:
+  def __init__(self, d):
+    self.__dict__ = d
+
+__all__ = []
+__version__ = 0.1
+__date__ = '2015-12-04'
+__updated__ = '2015-12-04'
+
+TESTRUN = False
+DEBUG = False
+PROFILE = False
+
+#------------------------------------------------------------------------------
+# Address of influxdb server.
+#------------------------------------------------------------------------------
+
+influxdb = '127.0.0.1'
+
+#------------------------------------------------------------------------------
+# Credentials we expect clients to authenticate themselves with.
+#------------------------------------------------------------------------------
+vel_username = ''
+vel_password = ''
+
+#------------------------------------------------------------------------------
+# The JSON schema which we will use to validate events.
+#------------------------------------------------------------------------------
+vel_schema = None
+
+#------------------------------------------------------------------------------
+# The JSON schema which we will use to validate client throttle state.
+#------------------------------------------------------------------------------
+throttle_schema = None
+
+#------------------------------------------------------------------------------
+# The JSON schema which we will use to provoke throttling commands for testing.
+#------------------------------------------------------------------------------
+test_control_schema = None
+
+#------------------------------------------------------------------------------
+# Pending command list from the testControl API
+# This is sent as a response commandList to the next received event.
+#------------------------------------------------------------------------------
+pending_command_list = None
+
+#------------------------------------------------------------------------------
+# Logger for this module.
+#------------------------------------------------------------------------------
+logger = None
+
+def listener(environ, start_response, schema):
+    '''
+    Handler for the Vendor Event Listener REST API.
+
+    Extract headers and the body and check that:
+
+      1)  The client authenticated themselves correctly.
+      2)  The body validates against the provided schema for the API.
+
+    '''
+    logger.info('Got a Vendor Event request')
+    print('==== ' + time.asctime() + ' ' + '=' * 49)
+
+    #--------------------------------------------------------------------------
+    # Extract the content from the request.
+    #--------------------------------------------------------------------------
+    length = int(environ.get('CONTENT_LENGTH', '0'))
+    logger.debug('Content Length: {0}'.format(length))
+    body = environ['wsgi.input'].read(length)
+    logger.debug('Content Body: {0}'.format(body))
+
+    mode, b64_credentials = string.split(environ.get('HTTP_AUTHORIZATION',
+                                                     'None None'))
+    # logger.debug('Auth. Mode: {0} Credentials: {1}'.format(mode,
+    #                                                     b64_credentials))
+    logger.debug('Auth. Mode: {0} Credentials: ****'.format(mode))
+    if (b64_credentials != 'None'):
+        credentials = b64decode(b64_credentials)
+    else:
+        credentials = None
+
+    # logger.debug('Credentials: {0}'.format(credentials))
+    logger.debug('Credentials: ****')
+
+    #--------------------------------------------------------------------------
+    # If we have a schema file then check that the event matches that expected.
+    #--------------------------------------------------------------------------
+    if (schema is not None):
+        logger.debug('Attempting to validate data: {0}\n'
+                     'Against schema: {1}'.format(body, schema))
+        try:
+            decoded_body = json.loads(body)
+            jsonschema.validate(decoded_body, schema)
+            logger.info('Event is valid!')
+            print('Valid body decoded & checked against schema OK:\n'
+                  '{0}'.format(json.dumps(decoded_body,
+                                          sort_keys=True,
+                                          indent=4,
+                                          separators=(',', ': '))))
+
+        except jsonschema.SchemaError as e:
+            logger.error('Schema is not valid! {0}'.format(e))
+            print('Schema is not valid! {0}'.format(e))
+
+        except jsonschema.ValidationError as e:
+            logger.warn('Event is not valid against schema! {0}'.format(e))
+            print('Event is not valid against schema! {0}'.format(e))
+            print('Bad JSON body decoded:\n'
+                  '{0}'.format(json.dumps(decoded_body,
+                                         sort_keys=True,
+                                         indent=4,
+                                         separators=(',', ': '))))
+
+        except Exception as e:
+            logger.error('Event invalid for unexpected reason! {0}'.format(e))
+            print('Schema is not valid for unexpected reason! {0}'.format(e))
+    else:
+        logger.debug('No schema so just decode JSON: {0}'.format(body))
+        try:
+            decoded_body = json.loads(body)
+            print('Valid JSON body (no schema checking) decoded:\n'
+                  '{0}'.format(json.dumps(decoded_body,
+                                         sort_keys=True,
+                                         indent=4,
+                                         separators=(',', ': '))))
+            logger.info('Event is valid JSON but not checked against schema!')
+
+        except Exception as e:
+            logger.error('Event invalid for unexpected reason! {0}'.format(e))
+            print('JSON body not valid for unexpected reason! {0}'.format(e))
+
+    #--------------------------------------------------------------------------
+    # See whether the user authenticated themselves correctly.
+    #--------------------------------------------------------------------------
+    if (credentials == (vel_username + ':' + vel_password)):
+        logger.debug('Authenticated OK')
+#        print('Authenticated OK')
+
+        #----------------------------------------------------------------------
+        # Respond to the caller. If we have a pending commandList from the
+        # testControl API, send it in response.
+        #----------------------------------------------------------------------
+        global pending_command_list
+        if pending_command_list is not None:
+            start_response('202 Accepted',
+                           [('Content-type', 'application/json')])
+            response = pending_command_list
+            pending_command_list = None
+
+            print('\n'+ '='*80)
+            print('Sending pending commandList in the response:\n'
+                  '{0}'.format(json.dumps(response,
+                                          sort_keys=True,
+                                          indent=4,
+                                          separators=(',', ': '))))
+            print('='*80 + '\n')
+            yield json.dumps(response)
+        else:
+            start_response('202 Accepted', [])
+            yield ''
+    else:
+        logger.warn('Failed to authenticate OK; creds: ' +  credentials)
+        print('Failed to authenticate agent credentials: ', credentials, 
+                 'against expected ', vel_username, ':', vel_password)
+
+        #----------------------------------------------------------------------
+        # Respond to the caller.
+        #----------------------------------------------------------------------
+        start_response('401 Unauthorized', [ ('Content-type',
+                                              'application/json')])
+        req_error = { 'requestError': {
+                        'policyException': {
+                            'messageId': 'POL0001',
+                            'text': 'Failed to authenticate'
+                            }
+                        }
+                    }
+        yield json.dumps(req_error)
+
+    save_event(body)
+
+#--------------------------------------------------------------------------
+# Send event to influxdb
+#--------------------------------------------------------------------------
+def send_to_influxdb(event,pdata):
+  url = 'http://{}:8086/write?db=veseventsdb'.format(influxdb)
+  print('Send {} to influxdb at {}: {}'.format(event,influxdb,pdata))
+  r = requests.post(url, data=pdata, headers={'Content-Type': 'text/plain'})
+  print('influxdb return code {}'.format(r.status_code))
+  if r.status_code != 204:
+    print('*** Influxdb save failed, return code {} ***'.format(r.status_code))
+
+#--------------------------------------------------------------------------
+# Save event data
+#--------------------------------------------------------------------------
+def save_event(body):
+  jobj = json.loads(body)
+  e = json.loads(body, object_hook=JSONObject)
+
+  domain = jobj['event']['commonEventHeader']['domain']
+  timestamp = jobj['event']['commonEventHeader']['lastEpochMicrosec']
+  agent = jobj['event']['commonEventHeader']['reportingEntityName'].upper(     )
+  if "LOCALHOST" in agent:
+    agent = "computehost"
+  source = jobj['event']['commonEventHeader']['sourceId'].upper(       )
+
+  if e.event.commonEventHeader.domain == "heartbeat":
+    print('Found Heartbeat')
+    send_to_influxdb(event,'heartbeat,system={},sequence={}'.format(agent,e.event.commonEventHeader.sequence))
+
+  if 'measurementsForVfScalingFields' in jobj['event']:
+    print('Found measurementsForVfScalingFields')
+
+#        "measurementsForVfScalingFields": {
+#            "additionalMeasurements": [
+#                {
+#                    "arrayOfFields": [
+#                        {
+#                            "name": "load-longterm",
+#                            "value": "0.34"
+#                        },
+#                        {
+#                            "name": "load-shortterm",
+#                            "value": "0.32"
+#                        },
+#                        {
+#                            "name": "load-midterm",
+#                            "value": "0.34"
+#                        }
+#                    ],
+#                    "name": "load"
+#                }
+#            ],
+
+    if 'additionalMeasurements' in jobj['event']['measurementsForVfScalingFields']:
+      for meas in jobj['event']['measurementsForVfScalingFields']['additionalMeasurements']:
+        name = meas['name']
+        pdata = '{},system={}'.format(name,source)
+        for field in meas['arrayOfFields']:
+          pdata = pdata + ",{}={}".format(field['name'],field['value'])
+        i=pdata.find(',', pdata.find('system'))
+        pdata = pdata[:i] + ' ' + pdata[i+1:]
+        send_to_influxdb("systemLoad", pdata)
+
+#            "memoryUsageArray": [
+#                {
+#                    "memoryBuffered": 269056.0,
+#                    "memoryCached": 17636956.0,
+#                    "memoryFree": 244731658240,
+#                    "memorySlabRecl": 753160.0,
+#                    "memorySlabUnrecl": 210800.0,
+#                    "memoryUsed": 6240064.0,
+#                    "vmIdentifier": "opnfv01"
+#                }
+#            ],
+
+    if 'memoryUsageArray' in jobj['event']['measurementsForVfScalingFields']:
+      print('Found memoryUsageArray')
+      pdata = 'memoryUsage,system={}'.format(source)
+      vmid=e.event.measurementsForVfScalingFields.memoryUsageArray[0].vmIdentifier
+      d = jobj['event']['measurementsForVfScalingFields']['memoryUsageArray'][0].items()
+      for key,val in d:
+        if key != 'vmIdentifier':
+          pdata = pdata + ',{}={}'.format(key,val)
+      i=pdata.find(',', pdata.find('system'))
+      pdata = pdata[:i] + ' ' + pdata[i+1:]
+      send_to_influxdb("memoryUsage", pdata)
+
+#            "vNicPerformanceArray": [
+#                {
+#                    "receivedDiscardedPacketsAccumulated": 0,
+#                    "receivedErrorPacketsAccumulated": 0,
+#                    "receivedOctetsAccumulated": 476.801524578,
+#                    "receivedTotalPacketsAccumulated": 2.90000899705,
+#                    "transmittedDiscardedPacketsAccumulated": 0,
+#                    "transmittedErrorPacketsAccumulated": 0,
+#                    "transmittedOctetsAccumulated": 230.100735749,
+#                    "transmittedTotalPacketsAccumulated": 1.20000372292,
+#                    "vNicIdentifier": "eno4",
+#                    "valuesAreSuspect": "true"
+#                },
+
+    if 'vNicPerformanceArray' in jobj['event']['measurementsForVfScalingFields']:
+      print('Found vNicPerformanceArray')
+      for vnic in jobj['event']['measurementsForVfScalingFields']['vNicPerformanceArray']:
+        vnid=vnic['vNicIdentifier']
+        pdata = 'vNicPerformance,system={},vnic={}'.format(vmid,vnid)
+        d = vnic.items()
+        for key,val in d:
+          if key != 'vNicIdentifier':
+            pdata = pdata + ',{}={}'.format(key,val)
+        i=pdata.find(',', pdata.find('vnic'))
+        pdata = pdata[:i] + ' ' + pdata[i+1:]
+        send_to_influxdb("vNicPerformance", pdata)
+
+def test_listener(environ, start_response, schema):
+    '''
+    Handler for the Test Collector Test Control API.
+
+    There is no authentication on this interface.
+
+    This simply stores a commandList which will be sent in response to the next
+    incoming event on the EVEL interface.
+    '''
+    global pending_command_list
+    logger.info('Got a Test Control input')
+    print('============================')
+    print('==== TEST CONTROL INPUT ====')
+
+    #--------------------------------------------------------------------------
+    # GET allows us to get the current pending request.
+    #--------------------------------------------------------------------------
+    if environ.get('REQUEST_METHOD') == 'GET':
+        start_response('200 OK', [('Content-type', 'application/json')])
+        yield json.dumps(pending_command_list)
+        return
+
+    #--------------------------------------------------------------------------
+    # Extract the content from the request.
+    #--------------------------------------------------------------------------
+    length = int(environ.get('CONTENT_LENGTH', '0'))
+    logger.debug('TestControl Content Length: {0}'.format(length))
+    body = environ['wsgi.input'].read(length)
+    logger.debug('TestControl Content Body: {0}'.format(body))
+
+    #--------------------------------------------------------------------------
+    # If we have a schema file then check that the event matches that expected.
+    #--------------------------------------------------------------------------
+    if (schema is not None):
+        logger.debug('Attempting to validate data: {0}\n'
+                     'Against schema: {1}'.format(body, schema))
+        try:
+            decoded_body = json.loads(body)
+            jsonschema.validate(decoded_body, schema)
+            logger.info('TestControl is valid!')
+            print('TestControl:\n'
+                  '{0}'.format(json.dumps(decoded_body,
+                                          sort_keys=True,
+                                          indent=4,
+                                          separators=(',', ': '))))
+
+        except jsonschema.SchemaError as e:
+            logger.error('TestControl Schema is not valid: {0}'.format(e))
+            print('TestControl Schema is not valid: {0}'.format(e))
+
+        except jsonschema.ValidationError as e:
+            logger.warn('TestControl input not valid: {0}'.format(e))
+            print('TestControl input not valid: {0}'.format(e))
+            print('Bad JSON body decoded:\n'
+                  '{0}'.format(json.dumps(decoded_body,
+                                          sort_keys=True,
+                                          indent=4,
+                                          separators=(',', ': '))))
+
+        except Exception as e:
+            logger.error('TestControl input not valid: {0}'.format(e))
+            print('TestControl Schema not valid: {0}'.format(e))
+    else:
+        logger.debug('Missing schema just decode JSON: {0}'.format(body))
+        try:
+            decoded_body = json.loads(body)
+            print('Valid JSON body (no schema checking) decoded:\n'
+                  '{0}'.format(json.dumps(decoded_body,
+                                          sort_keys=True,
+                                          indent=4,
+                                          separators=(',', ': '))))
+            logger.info('TestControl input not checked against schema!')
+
+        except Exception as e:
+            logger.error('TestControl input not valid: {0}'.format(e))
+            print('TestControl input not valid: {0}'.format(e))
+
+    #--------------------------------------------------------------------------
+    # Respond to the caller. If we received otherField 'ThrottleRequest',
+    # generate the appropriate canned response.
+    #--------------------------------------------------------------------------
+    pending_command_list = decoded_body
+    print('===== TEST CONTROL END =====')
+    print('============================')
+    start_response('202 Accepted', [])
+    yield ''
+
+def main(argv=None):
+    '''
+    Main function for the collector start-up.
+
+    Called with command-line arguments:
+        *    --config *<file>*
+        *    --section *<section>*
+        *    --verbose
+
+    Where:
+
+        *<file>* specifies the path to the configuration file.
+
+        *<section>* specifies the section within that config file.
+
+        *verbose* generates more information in the log files.
+
+    The process listens for REST API invocations and checks them. Errors are
+    displayed to stdout and logged.
+    '''
+
+    if argv is None:
+        argv = sys.argv
+    else:
+        sys.argv.extend(argv)
+
+    program_name = os.path.basename(sys.argv[0])
+    program_version = 'v{0}'.format(__version__)
+    program_build_date = str(__updated__)
+    program_version_message = '%%(prog)s {0} ({1})'.format(program_version,
+                                                         program_build_date)
+    if (__import__('__main__').__doc__ is not None):
+        program_shortdesc = __import__('__main__').__doc__.split('\n')[1]
+    else:
+        program_shortdesc = 'Running in test harness'
+    program_license = '''{0}
+
+  Created  on {1}.
+  Copyright 2015 Metaswitch Networks Ltd. All rights reserved.
+
+  Distributed on an "AS IS" basis without warranties
+  or conditions of any kind, either express or implied.
+
+USAGE
+'''.format(program_shortdesc, str(__date__))
+
+    try:
+        #----------------------------------------------------------------------
+        # Setup argument parser so we can parse the command-line.
+        #----------------------------------------------------------------------
+        parser = ArgumentParser(description=program_license,
+                                formatter_class=ArgumentDefaultsHelpFormatter)
+        parser.add_argument('-i', '--influxdb',
+                            dest='influxdb',
+                            default='localhost',
+                            help='InfluxDB server addresss')
+        parser.add_argument('-v', '--verbose',
+                            dest='verbose',
+                            action='count',
+                            help='set verbosity level')
+        parser.add_argument('-V', '--version',
+                            action='version',
+                            version=program_version_message,
+                            help='Display version information')
+        parser.add_argument('-a', '--api-version',
+                            dest='api_version',
+                            default='5',
+                            help='set API version')
+        parser.add_argument('-c', '--config',
+                            dest='config',
+                            default='/etc/opt/att/collector.conf',
+                            help='Use this config file.',
+                            metavar='<file>')
+        parser.add_argument('-s', '--section',
+                            dest='section',
+                            default='default',
+                            metavar='<section>',
+                            help='section to use in the config file')
+
+        #----------------------------------------------------------------------
+        # Process arguments received.
+        #----------------------------------------------------------------------
+        args = parser.parse_args()
+        verbose = args.verbose
+        api_version = args.api_version
+        config_file = args.config
+        config_section = args.section
+
+        #----------------------------------------------------------------------
+        # Now read the config file, using command-line supplied values as
+        # overrides.
+        #----------------------------------------------------------------------
+        defaults = {'log_file': 'collector.log',
+                    'vel_port': '12233',
+                    'vel_path': '',
+                    'vel_topic_name': ''
+                   }
+        overrides = {}
+        config = ConfigParser.SafeConfigParser(defaults)
+        config.read(config_file)
+
+        #----------------------------------------------------------------------
+        # extract the values we want.
+        #----------------------------------------------------------------------
+        global influxdb
+        global vel_username
+        global vel_password
+        global vel_topic_name
+        influxdb = config.get(config_section, 'influxdb', vars=overrides)
+        log_file = config.get(config_section, 'log_file', vars=overrides)
+        vel_port = config.get(config_section, 'vel_port', vars=overrides)
+        vel_path = config.get(config_section, 'vel_path', vars=overrides)
+
+        vel_topic_name = config.get(config_section,
+                                    'vel_topic_name',
+                                    vars=overrides)
+        vel_username = config.get(config_section,
+                                  'vel_username',
+                                  vars=overrides)
+        vel_password = config.get(config_section,
+                                  'vel_password',
+                                  vars=overrides)
+        vel_schema_file = config.get(config_section,
+                                     'schema_file',
+                                     vars=overrides)
+        base_schema_file = config.get(config_section,
+                                      'base_schema_file',
+                                      vars=overrides)
+        throttle_schema_file = config.get(config_section,
+                                          'throttle_schema_file',
+                                          vars=overrides)
+        test_control_schema_file = config.get(config_section,
+                                           'test_control_schema_file',
+                                           vars=overrides)
+
+        #----------------------------------------------------------------------
+        # Finally we have enough info to start a proper flow trace.
+        #----------------------------------------------------------------------
+        global logger
+        print('Logfile: {0}'.format(log_file))
+        logger = logging.getLogger('monitor')
+        if verbose > 0:
+            print('Verbose mode on')
+            logger.setLevel(logging.DEBUG)
+        else:
+            logger.setLevel(logging.INFO)
+        handler = logging.handlers.RotatingFileHandler(log_file,
+                                                       maxBytes=1000000,
+                                                       backupCount=10)
+        if (platform.system() == 'Windows'):
+            date_format = '%Y-%m-%d %H:%M:%S'
+        else:
+            date_format = '%Y-%m-%d %H:%M:%S.%f %z'
+        formatter = logging.Formatter('%(asctime)s %(name)s - '
+                                      '%(levelname)s - %(message)s',
+                                      date_format)
+        handler.setFormatter(formatter)
+        logger.addHandler(handler)
+        logger.info('Started')
+
+        #----------------------------------------------------------------------
+        # Log the details of the configuration.
+        #----------------------------------------------------------------------
+        logger.debug('Log file = {0}'.format(log_file))
+        logger.debug('Influxdb server = {0}'.format(influxdb))
+        logger.debug('Event Listener Port = {0}'.format(vel_port))
+        logger.debug('Event Listener Path = {0}'.format(vel_path))
+        logger.debug('Event Listener Topic = {0}'.format(vel_topic_name))
+        logger.debug('Event Listener Username = {0}'.format(vel_username))
+        # logger.debug('Event Listener Password = {0}'.format(vel_password))
+        logger.debug('Event Listener JSON Schema File = {0}'.format(
+                                                              vel_schema_file))
+        logger.debug('Base JSON Schema File = {0}'.format(base_schema_file))
+        logger.debug('Throttle JSON Schema File = {0}'.format(
+                                                         throttle_schema_file))
+        logger.debug('Test Control JSON Schema File = {0}'.format(
+                                                     test_control_schema_file))
+
+        #----------------------------------------------------------------------
+        # Perform some basic error checking on the config.
+        #----------------------------------------------------------------------
+        if (int(vel_port) < 1024 or int(vel_port) > 65535):
+            logger.error('Invalid Vendor Event Listener port ({0}) '
+                         'specified'.format(vel_port))
+            raise RuntimeError('Invalid Vendor Event Listener port ({0}) '
+                               'specified'.format(vel_port))
+
+        if (len(vel_path) > 0 and vel_path[-1] != '/'):
+            logger.warning('Event Listener Path ({0}) should have terminating '
+                           '"/"!  Adding one on to configured string.'.format(
+                                                                     vel_path))
+            vel_path += '/'
+
+        #----------------------------------------------------------------------
+        # Load up the vel_schema, if it exists.
+        #----------------------------------------------------------------------
+        if not os.path.exists(vel_schema_file):
+            logger.warning('Event Listener Schema File ({0}) not found. '
+                           'No validation will be undertaken.'.format(
+                                                              vel_schema_file))
+        else:
+            global vel_schema
+            global throttle_schema
+            global test_control_schema
+            vel_schema = json.load(open(vel_schema_file, 'r'))
+            logger.debug('Loaded the JSON schema file')
+
+            #------------------------------------------------------------------
+            # Load up the throttle_schema, if it exists.
+            #------------------------------------------------------------------
+            if (os.path.exists(throttle_schema_file)):
+                logger.debug('Loading throttle schema')
+                throttle_fragment = json.load(open(throttle_schema_file, 'r'))
+                throttle_schema = {}
+                throttle_schema.update(vel_schema)
+                throttle_schema.update(throttle_fragment)
+                logger.debug('Loaded the throttle schema')
+
+            #------------------------------------------------------------------
+            # Load up the test control _schema, if it exists.
+            #------------------------------------------------------------------
+            if (os.path.exists(test_control_schema_file)):
+                logger.debug('Loading test control schema')
+                test_control_fragment = json.load(
+                    open(test_control_schema_file, 'r'))
+                test_control_schema = {}
+                test_control_schema.update(vel_schema)
+                test_control_schema.update(test_control_fragment)
+                logger.debug('Loaded the test control schema')
+
+            #------------------------------------------------------------------
+            # Load up the base_schema, if it exists.
+            #------------------------------------------------------------------
+            if (os.path.exists(base_schema_file)):
+                logger.debug('Updating the schema with base definition')
+                base_schema = json.load(open(base_schema_file, 'r'))
+                vel_schema.update(base_schema)
+                logger.debug('Updated the JSON schema file')
+
+        #----------------------------------------------------------------------
+        # We are now ready to get started with processing. Start-up the various
+        # components of the system in order:
+        #
+        #  1) Create the dispatcher.
+        #  2) Register the functions for the URLs of interest.
+        #  3) Run the webserver.
+        #----------------------------------------------------------------------
+        root_url = '/{0}eventListener/v{1}{2}'.\
+                   format(vel_path,
+                          api_version,
+                          '/' + vel_topic_name
+                          if len(vel_topic_name) > 0
+                          else '')
+        throttle_url = '/{0}eventListener/v{1}/clientThrottlingState'.\
+                       format(vel_path, api_version)
+        set_404_content(root_url)
+        dispatcher = PathDispatcher()
+        vendor_event_listener = partial(listener, schema = vel_schema)
+        dispatcher.register('GET', root_url, vendor_event_listener)
+        dispatcher.register('POST', root_url, vendor_event_listener)
+        vendor_throttle_listener = partial(listener, schema = throttle_schema)
+        dispatcher.register('GET', throttle_url, vendor_throttle_listener)
+        dispatcher.register('POST', throttle_url, vendor_throttle_listener)
+
+        #----------------------------------------------------------------------
+        # We also add a POST-only mechanism for test control, so that we can
+        # send commands to a single attached client.
+        #----------------------------------------------------------------------
+        test_control_url = '/testControl/v{0}/commandList'.format(api_version)
+        test_control_listener = partial(test_listener,
+                                        schema = test_control_schema)
+        dispatcher.register('POST', test_control_url, test_control_listener)
+        dispatcher.register('GET', test_control_url, test_control_listener)
+
+        httpd = make_server('', int(vel_port), dispatcher)
+        print('Serving on port {0}...'.format(vel_port))
+        httpd.serve_forever()
+
+        logger.error('Main loop exited unexpectedly!')
+        return 0
+
+    except KeyboardInterrupt:
+        #----------------------------------------------------------------------
+        # handle keyboard interrupt
+        #----------------------------------------------------------------------
+        logger.info('Exiting on keyboard interrupt!')
+        return 0
+
+    except Exception as e:
+        #----------------------------------------------------------------------
+        # Handle unexpected exceptions.
+        #----------------------------------------------------------------------
+        if DEBUG or TESTRUN:
+            raise(e)
+        indent = len(program_name) * ' '
+        sys.stderr.write(program_name + ': ' + repr(e) + '\n')
+        sys.stderr.write(indent + '  for help use --help\n')
+        sys.stderr.write(traceback.format_exc())
+        logger.critical('Exiting because of exception: {0}'.format(e))
+        logger.critical(traceback.format_exc())
+        return 2
+
+#------------------------------------------------------------------------------
+# MAIN SCRIPT ENTRY POINT.
+#------------------------------------------------------------------------------
+if __name__ == '__main__':
+    if TESTRUN:
+        #----------------------------------------------------------------------
+        # Running tests - note that doctest comments haven't been included so
+        # this is a hook for future improvements.
+        #----------------------------------------------------------------------
+        import doctest
+        doctest.testmod()
+
+    if PROFILE:
+        #----------------------------------------------------------------------
+        # Profiling performance.  Performance isn't expected to be a major
+        # issue, but this should all work as expected.
+        #----------------------------------------------------------------------
+        import cProfile
+        import pstats
+        profile_filename = 'collector_profile.txt'
+        cProfile.run('main()', profile_filename)
+        statsfile = open('collector_profile_stats.txt', 'wb')
+        p = pstats.Stats(profile_filename, stream=statsfile)
+        stats = p.strip_dirs().sort_stats('cumulative')
+        stats.print_stats()
+        statsfile.close()
+        sys.exit(0)
+
+    #--------------------------------------------------------------------------
+    # Normal operation - call through to the main function.
+    #--------------------------------------------------------------------------
+    sys.exit(main())
index 65dd613..b37662b 100644 (file)
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-#. What this is: Setup script for VES agent framework.
+#. What this is: Setup script for the VES monitoring framework.
 #. With this script VES support can be installed in one or more hosts, with:
 #. - a dedicated or shared Kafka server for collection of events from collectd
 #. - VES collectd agents running in host or guest mode
@@ -23,6 +23,7 @@
 #.    pre-installed VES collector e.g. from the ONAP project.
 #.  - Install Kafka server on one of the hosts, or use a pre-installed server
 #.    accessible from the agent hosts.
+#.  - Install collectd on each host.
 #.  - Install the VES agent on each host.
 #.  - As needed, install the VES agent on each virtual host. This could include
 #.    pre-installed VES agents in VM or container images, which are configured
 #.   ves_kafka_host: kafka host IP or hostname (default: 127.0.0.1)
 #.
 #. Usage:
-#.   wget https://raw.githubusercontent.com/opnfv/ves/master/tools/ves-setup.sh
-#.   bash ves-setup.sh <collector|kafka|agent>
+#.   git clone https://gerrit.opnfv.org/gerrit/ves /tmp/ves
+#.   bash /tmp/ves/ves-setup.sh <collector|kafka|collectd|agent>
 #.     collector: setup VES collector (test collector) 
 #.     kafka: setup kafka server for VES events from collect agent(s)
-#.     agent: setup VES agent in host or guest mode
-#.   Recommended sequence is:
-#.     ssh into your collector host and run these commands:
-#.     $ ves_host=$(ip route get 8.8.8.8 | awk '{print $NF; exit}') 
-#.     $ export ves_host
-#.     $ bash ves-setup.sh collector 
-#.   ...then for each agent host:
-#.     copy ~/ves_env.sh and ves-setup.sh to the host e.g. via scp
-#.     ssh into the host and run, directly or via ssh -x
-#.     $ bash ves-setup.sh kafka
-#.     $ bash ves-setup.sh agent
+#.     collectd: setup collectd with libvirt plugin, as a kafka publisher
+#.     agent: setup VES agent in host or guest mode, as a kafka consumer
+#.   See demo_deploy.sh in this repo for a recommended sequence of the above.
 #.
 #. Status: this is a work in progress, under test.
 
@@ -81,18 +74,27 @@ function log() {
 
 function common_prereqs() {
   log "install common prerequisites"
-  sudo apt-get install -y default-jre
-  sudo apt-get install -y zookeeperd
-  sudo apt-get install -y python-pip
-  sudo pip install kafka-python
+  if [[ ! -f /.dockerenv ]]; then dosudo="sudo"; fi
+  $dosudo apt-get update
+  $dosudo apt-get install -y git
+  # Required for kafka
+  $dosudo apt-get install -y default-jre
+  $dosudo apt-get install -y zookeeperd
+  $dosudo apt-get install -y python-pip
+  $dosudo pip install kafka-python
+  # Required for building collectd
+  $dosudo apt-get install -y pkg-config
 }
 
 function setup_env() {
-  if [[ ! -f ~/ves_env.sh ]]; then
-    cat <<EOF >~/ves_env.sh
+  if [[ ! -d /tmp/ves ]]; then mkdir /tmp/ves; fi
+  cp $0 /tmp/ves
+  if [[ ! -f /tmp/ves/ves_env.sh ]]; then
+    cat <<EOF >/tmp/ves/ves_env.sh
 #!/bin/bash
 ves_mode="${ves_mode:=host}"
 ves_host="${ves_host:=127.0.0.1}"
+ves_hostname="${ves_hostname:=localhost}"
 ves_port="${ves_port:=30000}"
 ves_path="${ves_path:=}"
 ves_topic="${ves_topic:=}"
@@ -105,6 +107,7 @@ ves_kafka_host="${ves_kafka_host:=127.0.0.1}"
 ves_kafka_port="${ves_kafka_port:=9092}"
 export ves_mode
 export ves_host
+export ves_hostname
 export ves_port
 export ves_path
 export ves_topic
@@ -112,57 +115,62 @@ export ves_https
 export ves_user
 export ves_pass
 export ves_interval
-export ves_kafka_host
-export ves_port
+export ves_kafka_hostame
 export ves_kafka_port
 EOF
   fi
-  source ~/ves_env.sh
+  source /tmp/ves/ves_env.sh
 }
 
 function setup_kafka() {
   log "setup kafka server"
   common_prereqs
+  setup_env
 
-  log "get and unpack kafka_2.11-0.11.0.0.tgz"
-  wget "http://www-eu.apache.org/dist/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz"
-  tar -xvzf kafka_2.11-0.11.0.0.tgz
+  cd
+  ver="0.11.0.2"
+  log "get and unpack kafka_2.11-$ver.tgz"
+  wget "http://www-eu.apache.org/dist/kafka/$ver/kafka_2.11-$ver.tgz"
+  tar -xvzf kafka_2.11-$ver.tgz
 
   log "set delete.topic.enable=true"
   sed -i -- 's/#delete.topic.enable=true/delete.topic.enable=true/' \
-    kafka_2.11-0.11.0.0/config/server.properties
-  grep delete.topic.enable kafka_2.11-0.11.0.0/config/server.properties
+    kafka_2.11-$ver/config/server.properties
+  grep delete.topic.enable kafka_2.11-$ver/config/server.properties
   # TODO: Barometer VES guide to clarify hostname must be in /etc/hosts
-  sudo nohup kafka_2.11-0.11.0.0/bin/kafka-server-start.sh \
-    kafka_2.11-0.11.0.0/config/server.properties \
-    > kafka_2.11-0.11.0.0/kafka.log 2>&1 &
-  # TODO: find a test that does not hang the script at 
-  # echo "Hello, World" | ~/kafka_2.11-0.11.0.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TopicTest > /dev/null
-  # ~/kafka_2.11-0.11.0.0/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic TopicTest --from-beginning
+  sudo nohup kafka_2.11-$ver/bin/kafka-server-start.sh \
+    kafka_2.11-$ver/config/server.properties \
+    > kafka_2.11-$ver/kafka.log 2>&1 &
 }
 
-function setup_agent() {
-  log "setup agent"
-  common_prereqs
-
-  log "cleanup any previous failed install"
-  sudo rm -rf ~/collectd-virt
-  sudo rm -rf ~/librdkafka
-  sudo rm -rf ~/collectd
-
+function setup_kafka_client() {
   log "Install Apache Kafka C/C++ client library"
-  sudo apt-get install -y build-essential
+  if [[ ! -f /.dockerenv ]]; then dosudo="sudo"; fi
+  $dosudo apt-get install -y build-essential
   git clone https://github.com/edenhill/librdkafka.git ~/librdkafka
   cd ~/librdkafka
   git checkout -b v0.9.5 v0.9.5
   # TODO: Barometer VES guide to clarify specific prerequisites for Ubuntu
-  sudo apt-get install -y libpthread-stubs0-dev
-  sudo apt-get install -y libssl-dev
-  sudo apt-get install -y libsasl2-dev
-  sudo apt-get install -y liblz4-dev
+  $dosudo apt-get install -y libpthread-stubs0-dev
+  $dosudo apt-get install -y libssl-dev
+  $dosudo apt-get install -y libsasl2-dev
+  $dosudo apt-get install -y liblz4-dev
   ./configure --prefix=/usr
   make
-  sudo make install
+  $dosudo make install
+}
+
+function setup_collectd() {
+  log "setup collectd"
+  common_prereqs
+  setup_env
+
+  log "cleanup any previous failed install"
+  sudo rm -rf ~/collectd-virt
+  sudo rm -rf ~/librdkafka
+  sudo rm -rf ~/collectd
+
+  setup_kafka_client
 
   log "Build collectd with Kafka support"
   git clone https://github.com/collectd/collectd.git ~/collectd
@@ -194,29 +202,6 @@ function setup_agent() {
   sudo systemctl daemon-reload
   sudo systemctl start collectd.service
 
-  log "install VES agent prerequisites"
-  sudo pip install pyyaml
-
-  log "clone OPNFV Barometer"
-  git clone https://gerrit.opnfv.org/gerrit/barometer ~/barometer
-
-  log "setup ves_app_config.conf"
-  cd ~/barometer/3rd_party/collectd-ves-app/ves_app
-  cat <<EOF >ves_app_config.conf
-[config]
-Domain = $ves_host
-Port = $ves_port
-Path = $ves_path
-Topic = $ves_topic
-UseHttps = $ves_https
-Username = $ves_user
-Password = $ves_pass
-SendEventInterval = $ves_interval
-ApiVersion = $ves_version
-KafkaPort = $ves_kafka_port
-KafkaBroker = $ves_kafka_host
-EOF
-
   log "setup VES collectd config for VES $ves_mode mode"
   if [[ "$ves_mode" == "host" ]]; then
     # TODO: Barometer VES guide to clarify prerequisites install for Ubuntu
@@ -228,19 +213,16 @@ EOF
     # http://docs.opnfv.org/en/latest/submodules/barometer/docs/release/userguide/feature.userguide.html#virt-plugin
     sudo systemctl start libvirtd
 
-    git clone https://github.com/maryamtahhan/collectd ~/collectd-virt
-    cd ~/collectd-virt
+    rm -rf /tmp/ves/collectd-virt
+    git clone https://github.com/maryamtahhan/collectd /tmp/ves/collectd-virt
+    cd /tmp/ves/collectd-virt
     ./build.sh
     ./configure --enable-syslog --enable-logfile --enable-debug
     make
     sudo make install
 
-
-    # TODO: Barometer VES guide refers to "At least one VM instance should be 
-    # up and running by hypervisor on the host." The process needs to accomodate
-    # pre-installation of the VES agent *prior* to the first VM being created. 
-
     cat <<EOF | sudo tee -a /opt/collectd/etc/collectd.conf
+# for VES plugin
 LoadPlugin logfile
 <Plugin logfile>
   LogLevel info
@@ -270,6 +252,7 @@ LoadPlugin write_kafka
 EOF
   else
     cat <<EOF | sudo tee -a /opt/collectd/etc/collectd.conf
+# for VES plugin
 LoadPlugin logfile
 <Plugin logfile>
   LogLevel info
@@ -280,15 +263,6 @@ LoadPlugin logfile
 
 LoadPlugin cpu
 
-#LoadPlugin virt
-#<Plugin virt>
-#  Connection "qemu:///system"
-#  RefreshInterval 60
-#  HostnameFormat uuid
-#  PluginInstanceFormat name
-#  ExtraStats "cpu_util"
-#</Plugin>
-
 LoadPlugin write_kafka
 <Plugin write_kafka>
   Property "metadata.broker.list" "$ves_kafka_host:$ves_kafka_port"
@@ -299,14 +273,61 @@ LoadPlugin write_kafka
 EOF
   fi
 
+  if [[ $(grep -c $ves_hostname /etc/hosts) -eq 0 ]]; then
+    log "add to /etc/hosts: $ves_kafka_host $ves_hostname"
+    echo "$ves_kafka_host $ves_hostname" | sudo tee -a /etc/hosts
+
   log "restart collectd to apply updated config"
   sudo systemctl restart collectd
+}
 
-  log "start VES agent"
-  cd ~/barometer/3rd_party/collectd-ves-app/ves_app
-  nohup python ves_app.py \
-    --events-schema=$ves_mode.yaml \
-    --config=ves_app_config.conf > ~/ves_app.stdout.log 2>&1 &
+function setup_agent() {
+  log "setup VES agent"
+  if [[ ! -f /.dockerenv ]]; then
+    log "start the ves-agent container"
+    sudo docker run -it -d -v /tmp/ves:/opt/ves --name=ves-agent \
+    ubuntu:xenial /bin/bash 
+    log "execute the agent setup script in the container"
+    sudo docker exec ves-agent /bin/bash /opt/ves/ves-setup.sh agent
+  else
+    common_prereqs
+    log "setup the VES environment"
+    source /opt/ves/ves_env.sh
+    log "install agent prerequisites"
+    pip install pyaml
+
+    setup_kafka_client
+
+    log "clone OPNFV Barometer"
+    rm -rf /opt/ves/barometer
+    git clone https://gerrit.opnfv.org/gerrit/barometer /opt/ves/barometer
+
+    log "setup ves_app_config.conf"
+    source /opt/ves/ves_env.sh
+    cd /opt/ves/barometer/3rd_party/collectd-ves-app/ves_app
+    cat <<EOF >ves_app_config.conf
+[config]
+Domain = $ves_host
+Port = $ves_port
+Path = $ves_path
+Topic = $ves_topic
+UseHttps = $ves_https
+Username = $ves_user
+Password = $ves_pass
+SendEventInterval = $ves_interval
+ApiVersion = $ves_version
+KafkaPort = $ves_kafka_port
+KafkaBroker = $ves_kafka_host
+EOF
+
+#    log "add guest.yaml measurements to host.yaml (enables actual host data)"
+#    tail --lines=+24 guest.yaml >>host.yaml
+
+    log "start VES agent"
+    echo "$ves_kafka_host $ves_hostname">>/etc/hosts
+    nohup python ves_app.py --events-schema=$ves_mode.yaml --loglevel ERROR \
+      --config=ves_app_config.conf > /opt/ves/ves_app.stdout.log 2>&1 &
+  fi
 }
 
 function setup_collector() {
@@ -314,29 +335,19 @@ function setup_collector() {
   log "install prerequistes"
   sudo apt-get install -y  jq
 
+  ves_hostname=$HOSTNAME
+  export ves_hostname
   ves_host=$(ip route get 8.8.8.8 | awk '{print $NF; exit}')
   export ves_host
   setup_env
 
-  echo "cleanup any earlier install attempts"
-  sudo docker stop influxdb
-  sudo docker rm influxdb
-  sudo docker stop grafana
-  sudo docker rm grafana
-  sudo docker stop ves-collector
-  sudo docker rm -v ves-collector
-  sudo rm -rf /tmp/ves
-
-  log "clone OPNFV VES"
-  git clone https://gerrit.opnfv.org/gerrit/ves /tmp/ves
-
   log "setup influxdb container"
-  sudo docker run -d --name=influxdb -p 8086:8086 influxdb
-  status=$(sudo docker inspect influxdb | jq -r '.[0].State.Status')
+  sudo docker run -d --name=ves-influxdb -p 8086:8086 influxdb
+  status=$(sudo docker inspect ves-influxdb | jq -r '.[0].State.Status')
   while [[ "x$status" != "xrunning" ]]; do
     log "InfluxDB container state is ($status)"
     sleep 10
-    status=$(sudo docker inspect influxdb | jq -r '.[0].State.Status')
+    status=$(sudo docker inspect ves-influxdb | jq -r '.[0].State.Status')
   done
   log "InfluxDB container state is $status"
 
@@ -351,23 +362,23 @@ function setup_collector() {
     --data-urlencode "q=CREATE DATABASE veseventsdb"
 
   log "install Grafana container"
-  sudo docker run -d --name grafana -p 3000:3000 grafana/grafana
-  status=$(sudo docker inspect grafana | jq -r '.[0].State.Status')
+  sudo docker run -d --name ves-grafana -p 3001:3000 grafana/grafana
+  status=$(sudo docker inspect ves-grafana | jq -r '.[0].State.Status')
   while [[ "x$status" != "xrunning" ]]; do
     log "Grafana container state is ($status)"
     sleep 10
-    status=$(sudo docker inspect grafana | jq -r '.[0].State.Status')
+    status=$(sudo docker inspect ves-grafana | jq -r '.[0].State.Status')
   done
   log "Grafana container state is $status"
 
   log "wait for Grafana API to be active"
-  while ! curl http://$ves_host:3000 ; do
+  while ! curl http://$ves_host:3001 ; do
     log "Grafana API is not yet responding... waiting 10 seconds"
     sleep 10
   done
 
   log "add VESEvents datasource to Grafana"
-  cat <<EOF >datasource.json
+  cat <<EOF >/tmp/ves/datasource.json
 { "name":"VESEvents",
   "type":"influxdb",
   "access":"direct",
@@ -385,18 +396,19 @@ function setup_collector() {
 EOF
 
   curl -H "Accept: application/json" -H "Content-type: application/json" \
-    -X POST -d @datasource.json \
-    http://admin:admin@$ves_host:3000/api/datasources
+    -X POST -d @/tmp/ves/datasource.json \
+    http://admin:admin@$ves_host:3001/api/datasources
 
   log "add VES dashboard to Grafana"
   curl -H "Accept: application/json" -H "Content-type: application/json" \
     -X POST \
-    -d @/tmp/ves/tests/onap-demo/blueprints/tosca-vnfd-onap-demo/Dashboard.json\
-    http://admin:admin@$ves_host:3000/api/dashboards/db        
+    -d @/tmp/ves/tools/grafana/Dashboard.json\
+    http://admin:admin@$ves_host:3001/api/dashboards/db        
 
   log "setup collector container"
   cd /tmp/ves
   touch monitor.log
+  rm -rf /tmp/ves/evel-test-collector
   git clone https://github.com/att/evel-test-collector.git
   sed -i -- \
     "s~log_file = /var/log/att/collector.log~log_file = /opt/ves/collector.log~" \
@@ -411,8 +423,10 @@ EOF
     evel-test-collector/config/collector.conf
   sed -i -- "s~vel_topic_name = example_vnf~vel_topic_name = $ves_topic~g" \
     evel-test-collector/config/collector.conf
+  sed -i -- "/vel_topic_name = /a influxdb = $ves_host" \
+    evel-test-collector/config/collector.conf
 
-  cp tests/onap-demo/blueprints/tosca-vnfd-onap-demo/monitor.py \
+  cp /tmp/ves/tools/monitor.py \
     evel-test-collector/code/collector/monitor.py
 
   # Note below: python (2.7) is required due to dependency on module 'ConfigParser'
@@ -424,28 +438,46 @@ pip install requests
 python /opt/ves/evel-test-collector/code/collector/monitor.py \
 --config /opt/ves/evel-test-collector/config/collector.conf \
 --influxdb $ves_host \
---section default > /opt/ves/monitor.log
+--section default > /opt/ves/monitor.log 2>&1 &
 EOF
 
   sudo docker run -it -d -v /tmp/ves:/opt/ves --name=ves-collector \
     -p 30000:30000 ubuntu:xenial /bin/bash
-  sudo docker exec -it -d ves-collector bash /opt/ves/setup-collector.sh
+  sudo docker exec ves-collector /bin/bash /opt/ves/setup-collector.sh
   # debug hints
   # sudo docker exec -it ves-collector apt-get install -y tcpdump
   # sudo docker exec -it ves-collector tcpdump -A -v -s 0 -i any port 30000
   # curl http://$ves_host:30000
   # sudo docker exec -it ves-collector /bin/bash
-  # ~/kafka_2.11-0.11.0.0/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic collectd
+  # ~/kafka_2.11-0.11.0.2/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic collectd
+}
+
+function clean() {
+  log "clean installation for $1 at $2"
+  if [[ "$1" == "master" ]]; then
+    cs="ves-agent ves-collector ves-grafana ves-influxdb"
+    for c in $cs; do
+      log "stop and remove container $c"
+      sudo docker stop $c
+      sudo docker rm -v $c
+    done
+  fi
+  log "remove collectd config for VES"
+  sudo sed -i -- '/VES plugin/,$d' /opt/collectd/etc/collectd.conf
+  sudo systemctl restart collectd
+  sudo rm -rf /tmp/ves
 }
 
 dist=`grep DISTRIB_ID /etc/*-release | awk -F '=' '{print $2}'`
-setup_env
 if [[ $(grep -c $HOSTNAME /etc/hosts) -eq 0 ]]; then 
   echo "$(ip route get 8.8.8.8 | awk '{print $NF; exit}') $HOSTNAME" |\
     sudo tee -a /etc/hosts
 fi
 
 case "$1" in
+  "collectd")
+    setup_collectd
+    ;;
   "agent")
     setup_agent
     ;;
@@ -455,6 +487,9 @@ case "$1" in
   "kafka")
     setup_kafka 
     ;;
+  "clean")
+    clean $2 $3
+    ;;
   *)
     grep '#. ' $0
 esac