Replaced collector.py with monitor.py 11/23211/1
authorBryan Sullivan <bryan.sullivan@att.com>
Mon, 17 Oct 2016 04:55:48 +0000 (21:55 -0700)
committerBryan Sullivan <bryan.sullivan@att.com>
Mon, 17 Oct 2016 04:55:48 +0000 (21:55 -0700)
JIRA: VES-1

Change-Id: Iacfa032786ebe9a2e475c858c582e92e820b277c
Signed-off-by: Bryan Sullivan <bryan.sullivan@att.com>
tests/blueprints/tosca-vnfd-hello-ves/monitor.py
tests/vHello_VES.sh

index 372d586..5307435 100644 (file)
 #
 # Status: this is a work in progress, under test.
 
+from wsgiref.simple_server import make_server, WSGIRequestHandler
+import sys
 import os
+import platform
+import traceback
 import time
-import sys
+from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
+import ConfigParser
+import logging.handlers
+from base64 import b64decode
+import string
+import json
+import jsonschema
 import select
 
 report_time = ""
-request_rate = ""
-app_state = ""
-mode = "f"
+requestRate = ""
+vfStatus = ""
+monitor_mode = "f"
 summary = ""
 status = ""
+vfStatus = ""
+base_url = ''
+template_404 = b'''POST {0}'''
+columns = 0
+rows = 0
+
+class JSONObject:
+  def __init__(self, d):
+    self.__dict__ = d
+
+class NoLoggingWSGIRequestHandler(WSGIRequestHandler):
+  def log_message(self, format, *args):
+    pass
 
 def print_there(x, y, text):
-     sys.stdout.write("\x1b7\x1b[%d;%df%s\x1b8" % (x, y, text))
-     sys.stdout.flush()
-
-a,b = os.popen('stty size', 'r').read().split()
-columns = int(b)
-
-with open('/home/ubuntu/ves.log') as f:
-  while True:
-    if sys.stdin in select.select([sys.stdin], [], [], 0)[0]:
-      line = sys.stdin.readline()
-      if "f" in line: mode = "f"
-      if "c" in line: mode = "c"
-      # Update screen as the <cr> messed up the display!
-      print_there(1,columns-56,summary)
-      print_there(2,columns-56,status)
-
-    line = f.readline()
-    if line:
-      if mode == "f": 
-        print line,
-
-      if "lastEpochMicrosec" in line:
-#0....5....1....5....2....5....3....5....4....5....5
-#            "lastEpochMicrosec": 1476552393091008,
-# Note: the above is expected, but sometimes it's in a different position or
-# corrupted with other output for some reason...
-
-        fields = line.split( )
-        e = fields[1][0:-1]
-        if e.isdigit():
-#          print "report_time: ", e, "\n"
-          report_time = time.strftime('%Y-%m-%d %H:%M:%S', 
-            time.localtime(int(e)/1000000))
-
-      if "requestRate" in line:
-#....5....1....5....2....5....3....5
-#            "requestRate": 2264,
-        request_rate = line[27:-2]
-        summary = report_time + " app state: " + app_state + ", request rate: " + request_rate 
-        print_there(1,columns-56,summary)
-#2016-10-16 17:15:29 app state: Started, request rate: 99
-#....5....1....5....2....5....3....5....4....5....5....5....6
-        if mode == "c": print '{0} *** app state: {1}\trequest rate: {2}'.format(
-          report_time, app_state, request_rate)
-
-      if "\"specificProblem\": \"Started\"" in line:
-        app_state = "Started"
-        status = report_time + " app state change: Started"
-        if mode == "c": print '{0} *** app state change: Started'.format(report_time)
-
-      if "\"specificProblem\": \"Stopped\"" in line:
-        app_state = "Stopped"
-        status = report_time + " app state change: Stopped"
-        if mode == "c": print '{0} *** app state change: Stopped'.format(report_time)
-
-      print_there(1,columns-56,summary)
-      print_there(2,columns-56,status)
+  sys.stdout.write("\x1b7\x1b[%d;%df%s\x1b8" % (x, y, text))
+  sys.stdout.flush()
+
+base_url = ''
+template_404 = b'''POST {0}'''
+
+def notfound_404(environ, start_response):
+  print('Unexpected URL/Method: {0} {1}'.format(
+                                           environ['REQUEST_METHOD'].upper(),
+                                           environ['PATH_INFO']))
+  start_response('404 Not Found', [ ('Content-type', 'text/plain') ])
+  return [template_404.format(base_url)]
+
+class PathDispatcher:
+  def __init__(self):
+    self.pathmap = { }
+
+  def __call__(self, environ, start_response):
+    #----------------------------------------------------------------------
+    # Extract the method and path from the environment.
+    #----------------------------------------------------------------------
+    method = environ['REQUEST_METHOD'].lower()
+    path = environ['PATH_INFO']
+
+    #----------------------------------------------------------------------
+    # See if we have a handler for this path, and if so invoke it.
+    # Otherwise, return a 404.
+    #----------------------------------------------------------------------
+    handler = self.pathmap.get((method, path), notfound_404)
+    return handler(environ, start_response)
+
+  def register(self, method, path, function):
+    print('Registering for {0} at {1}'.format(method, path))
+    self.pathmap[method.lower(), path] = function
+    return function
+
+#--------------------------------------------------------------------------
+# Event processing
+#--------------------------------------------------------------------------
+def process_event(e):
+  global status
+  global summary
+  global vfStatus
+
+  epoch = e.event.commonEventHeader.lastEpochMicrosec
+
+  report_time = time.strftime('%Y-%m-%d %H:%M:%S', 
+                  time.localtime(int(epoch)/1000000))
+
+  domain = e.event.commonEventHeader.domain
+
+  if domain == 'measurementsForVfScaling':
+
+    aggregateCpuUsage = e.event.measurementsForVfScaling.aggregateCpuUsage
+    requestRate = e.event.measurementsForVfScaling.requestRate
+    summary = report_time + " app state: " + vfStatus + ", request rate: " + str(requestRate)
+    if monitor_mode == "c": print '{0} *** app state: {1}\trequest rate: {2}'.format(
+      report_time, vfStatus, str(requestRate))
+
+  if domain == 'fault':
+
+    alarmCondition = e.event.faultFields.alarmCondition
+    specificProblem = e.event.faultFields.specificProblem
+#    vfStatus = e.event.faultFields.vfStatus
+    vfStatus = e.event.faultFields.specificProblem
+
+    status = report_time + " app state change: " + specificProblem
+    if monitor_mode == "c": print '{0} *** vfStatus change: {1}'.format(report_time,
+                      specificProblem)
+
+# print_there only works if SSH'd to the VM manually - need to investigate
+#  print_there(1,columns-56,summary)
+  print '{0}'.format(summary)
+#  print_there(2,columns-56,status)
+  print '{0}'.format(status)
+
+#--------------------------------------------------------------------------
+# Main monitoring and logging procedure
+#--------------------------------------------------------------------------
+def ves_monitor(environ, start_response):
+
+  # Check for keyboard input
+  if sys.stdin in select.select([sys.stdin], [], [], 0)[0]:
+    line = sys.stdin.readline()
+    if "f" in line: monitor_mode = "f"
+    if "c" in line: monitor_mode = "c"
+
+  print('==== ' + time.asctime() + ' ' + '=' * 49)
+
+  #--------------------------------------------------------------------------
+  # Extract the content from the request.
+  #--------------------------------------------------------------------------
+  length = int(environ.get('CONTENT_LENGTH', '0'))
+  body = environ['wsgi.input'].read(length)
+
+  mode, b64_credentials = string.split(environ.get('HTTP_AUTHORIZATION',
+                                                     'None None'))
+  if (b64_credentials != 'None'):
+      credentials = b64decode(b64_credentials)
+  else:
+      credentials = None
+
+  #--------------------------------------------------------------------------
+  # See whether the user authenticated themselves correctly.
+  #--------------------------------------------------------------------------
+  if (credentials == (vel_username + ':' + vel_password)):
+    start_response('204 No Content', [])
+    yield ''
+  else:
+    print('Failed to authenticate agent')
+    start_response('401 Unauthorized', [ ('Content-type',
+                                          'application/json')])
+    req_error = { 'requestError': {
+                    'policyException': {
+                      'messageId': 'POL0001',
+                       'text': 'Failed to authenticate'
+                    }
+                  }
+                }
+    yield json.dumps(req_error)
+
+  #--------------------------------------------------------------------------
+  # Decode the JSON body
+  #--------------------------------------------------------------------------
+
+  try:
+    decoded_body = json.loads(body)
+    print('{0}'.format(json.dumps(decoded_body,
+                                sort_keys=True,
+                                indent=4,
+                                separators=(',', ': '))))
+    decoded_body = json.loads(body, object_hook=JSONObject)
+    process_event(decoded_body)
+
+  except Exception as e:
+    print('JSON body is not valid for unexpected reason! {0}'.format(e))
+
+def main(argv=None):
+  global columns
+  global rows
+  a,b = os.popen('stty size', 'r').read().split()
+  rows = int(a)
+  columns = int(b)
+
+  if argv is None:
+    argv = sys.argv
+  else:
+    sys.argv.extend(argv)
+
+  try:
+    #----------------------------------------------------------------------
+    # Setup argument parser so we can parse the command-line.
+    #----------------------------------------------------------------------
+    parser = ArgumentParser(description='',
+                            formatter_class=ArgumentDefaultsHelpFormatter)
+    parser.add_argument('-v', '--verbose',
+                        dest='verbose',
+                        action='count',
+                        help='set verbosity level')
+    parser.add_argument('-V', '--version',
+                        action='version',
+                        version='1.0',
+                        help='Display version information')
+    parser.add_argument('-c', '--config',
+                        dest='config',
+                        default='/etc/opt/att/collector.conf',
+                        help='Use this config file.',
+                        metavar='<file>')
+    parser.add_argument('-s', '--section',
+                        dest='section',
+                        default='default',
+                        metavar='<section>',
+                        help='section to use in the config file')
+
+    #----------------------------------------------------------------------
+    # Process arguments received.
+    #----------------------------------------------------------------------
+    args = parser.parse_args()
+    verbose = args.verbose
+    config_file = args.config
+    config_section = args.section
+    #----------------------------------------------------------------------
+    # Now read the config file, using command-line supplied values as
+    # overrides.
+    #----------------------------------------------------------------------
+    defaults = {'log_file': 'ves.log',
+                'vel_port': '30000',
+                'vel_path': '',
+                'vel_topic_name': ''
+               }
+    overrides = {}
+    config = ConfigParser.SafeConfigParser(defaults)
+    config.read(config_file)
+
+    #----------------------------------------------------------------------
+    # extract the values we want.
+    #----------------------------------------------------------------------
+    log_file = config.get(config_section, 'log_file', vars=overrides)
+    vel_port = config.get(config_section, 'vel_port', vars=overrides)
+    vel_path = config.get(config_section, 'vel_path', vars=overrides)
+    vel_topic_name = config.get(config_section,
+                                'vel_topic_name',
+                                vars=overrides)
+    global vel_username
+    global vel_password
+    vel_username = config.get(config_section,
+                              'vel_username',
+                              vars=overrides)
+    vel_password = config.get(config_section,
+                              'vel_password',
+                              vars=overrides)
+    vel_schema_file = config.get(config_section,
+                                 'schema_file',
+                                 vars=overrides)
+    base_schema_file = config.get(config_section,
+                             'base_schema_file',
+                              vars=overrides)
+
+    #----------------------------------------------------------------------
+    # Perform some basic error checking on the config.
+    #----------------------------------------------------------------------
+    if (int(vel_port) < 1024 or int(vel_port) > 65535):
+      raise RuntimeError('Invalid Vendor Event Listener port ({0}) '
+                         'specified'.format(vel_port))
+
+    if (len(vel_path) > 0 and vel_path[-1] != '/'):
+      vel_path += '/'
+
+    #----------------------------------------------------------------------
+    # Load up the vel_schema and base_schema, if they exist.
+    #----------------------------------------------------------------------
+    if (os.path.exists(vel_schema_file)):
+        global vel_schema
+        vel_schema = json.load(open(vel_schema_file, 'r'))
+        if (os.path.exists(base_schema_file)):
+          base_schema = json.load(open(base_schema_file, 'r'))
+          vel_schema.update(base_schema)
+
+    #----------------------------------------------------------------------
+    # We are now ready to get started with processing. Start-up the various
+    # components of the system in order:
+    #
+    #  1) Create the dispatcher.
+    #  2) Register the functions for the URLs of interest.
+    #  3) Run the webserver.
+    #----------------------------------------------------------------------
+    root_url = '/{0}eventListener/v{1}{2}'.format(vel_path,
+                                               '1',
+                                               '/' + vel_topic_name
+                                                 if len(vel_topic_name) > 0
+                                                 else '')
+
+    base_url = root_url
+    dispatcher = PathDispatcher()
+    dispatcher.register('GET', root_url, ves_monitor)
+    dispatcher.register('POST', root_url, ves_monitor)
+    httpd = make_server('', 30000, dispatcher, handler_class=NoLoggingWSGIRequestHandler)
+    httpd.serve_forever()
+
+    return 0
+
+  except Exception as e:
+    #----------------------------------------------------------------------
+    # Handle unexpected exceptions.
+    #----------------------------------------------------------------------
+    indent = len('VES Monitor') * ' '
+    sys.stderr.write('VES Monitor: ' + repr(e) + '\n')
+    sys.stderr.write(indent + '  for help use --help\n')
+    sys.stderr.write(traceback.format_exc())
+    return 2
 
+#------------------------------------------------------------------------------
+# MAIN SCRIPT ENTRY POINT.
+#------------------------------------------------------------------------------
+if __name__ == '__main__':
+    sys.exit(main())    
index abb9687..6bc6bbf 100644 (file)
@@ -22,7 +22,7 @@
 #   $ git clone https://gerrit.opnfv.org/gerrit/ves
 #   $ cd ves/tests
 #   $ bash vHello_VES.sh [setup|start|run|test|stop|clean] 
-#        [collector|traffic|pause|nic]
+#        [monitor|traffic|pause|nic]
 #   setup: setup test environment
 #   start: install blueprint and run test
 #   run: setup test environment and run test
@@ -30,7 +30,7 @@
 #   stop: stop test and uninstall blueprint
 #   clean: cleanup after test
 #   Test:
-#     collector: attach to the collector VM and run the collector
+#     monitor: attach to the collector VM and run the VES Monitor
 #     traffic: generate some traffic
 #     pause: pause the VNF (web server) for a minute to generate a state change
 #     nic: timed ifdown/ifup to generate a NIC fault report
@@ -228,6 +228,8 @@ git clone https://github.com/att/evel-test-collector.git
 sed -i -- 's/vel_username = /vel_username = hello/' evel-test-collector/config/collector.conf
 sed -i -- 's/vel_password = /vel_password = world/' evel-test-collector/config/collector.conf
 EOF
+  # Replacing the default collector with monitor.py which has processing logic as well
+  scp -i /tmp/tacker/vHello.pem -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no /tmp/tacker/blueprints/tosca-vnfd-hello-ves/monitor.py ubuntu@$VDU2_IP:/home/ubuntu/monitor.py
 
   echo "$0: start vHello web server in VDU1"
   ssh -i /tmp/tacker/vHello.pem -x -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no ubuntu@$VDU1_IP "sudo chown ubuntu /home/ubuntu"
@@ -270,19 +272,15 @@ get_vdu_ip () {
   vdu_ip=$(openstack server list | awk "/$1/ { print \$10 }")
 }
 
-collector () {
-  echo "$0: Start the VES Collector in VDU2 - Stop first if running"
+monitor () {
+  echo "$0: Start the VES Monitor in VDU2 - Stop first if running"
   get_vdu_ip VDU2
   sudo cp /tmp/tacker/vHello.pem /tmp/vHello.pem
   sudo chown $USER:$USER /tmp/vHello.pem
   chmod 600 /tmp/vHello.pem
-  ssh -i /tmp/vHello.pem -x -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no ubuntu@$vdu_ip << 'EOF'
+  ssh -t -t -i /tmp/vHello.pem -x -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no ubuntu@$vdu_ip << 'EOF'
 sudo kill $(ps -ef | grep evel-test-collector | awk '{print $2}')
-cd /home/ubuntu/
-nohup python evel-test-collector/code/collector/collector.py \
-       --config evel-test-collector/config/collector.conf \
-       --section default \
-       --verbose >~/ves.log &
+python monitor.py --config evel-test-collector/config/collector.conf --section default 
 EOF
 }
 
@@ -350,5 +348,16 @@ case "$1" in
     echo "run: setup test environment and run test"
     echo "stop: stop test and uninstall blueprint"
     echo "clean: cleanup after test"
+    echo "usage: bash vHello_VES.sh [setup|start|run|test|stop|clean] [monitor|traffic|pause|nic]"
+    echo "setup: setup test environment"
+    echo "start: install blueprint and run test"
+    echo "run: setup test environment and run test"
+    echo "test: run test tools/scenario - see below"
+    echo "stop: stop test and uninstall blueprint"
+    echo "clean: cleanup after test"
+    echo "Test:"
+    echo "  monitor: attach to the collector VM and run the VES Monitor"
+    echo "  traffic: generate some traffic"
+    echo "  pause: pause the VNF (web server) for a minute to generate a state change"
     fail
 esac