Merge "Replace toscaparser with native yaml to load published Tosca file into memory"
[domino.git] / DominoServer.py
index 4e8836c..9530170 100755 (executable)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 
-#Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+#Copyright 2016 Open Platform for NFV Project, Inc. and its contributors
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
 #   You may obtain a copy of the License at
@@ -14,6 +14,7 @@
 import sys, os, glob, random, errno
 import getopt, socket
 import logging, json
+import sqlite3, yaml
 #sys.path.append('gen-py')
 #sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
 sys.path.insert(0, glob.glob('./lib')[0])
@@ -32,6 +33,8 @@ from thrift.server import TServer
 from toscaparser.tosca_template import ToscaTemplate
 #from toscaparser.utils.gettextutils import _
 #import toscaparser.utils.urlutils
+from translator.hot.tosca_translator import TOSCATranslator
+
 
 from mapper import *
 from partitioner import *
@@ -48,7 +51,7 @@ class CommunicationHandler:
   def __init__(self, dominoserver):
     self.log = {}
     self.dominoServer = dominoserver
-    self.seqno = 0;
+    self.seqno = SERVER_SEQNO;
    
   def openconnection(self, ipaddr, tcpport):
     try:
@@ -96,7 +99,6 @@ class CommunicationHandler:
   #    - Respond Back with a heartbeat
 
   def d_heartbeat(self, hb_msg):
-    global SERVER_UDID
     logging.info('heartbeat received from %d' , hb_msg.domino_udid)
 
     hb_r = HeartBeatMessage()
@@ -112,7 +114,6 @@ class CommunicationHandler:
   #
   #       - Respond Back with Registration Response
   def d_register(self, reg_msg):
-    global SERVER_UDID
 
     #Prepare and send Registration Response
     reg_r = RegisterResponseMessage()
@@ -132,14 +133,22 @@ class CommunicationHandler:
  
     self.seqno = self.seqno + 1
 
-    # Store the Domino Client info
-    # TBD: check the sequence number to ensure the most recent record is saved
-    self.dominoServer.registration_record[reg_r.domino_udid_assigned] = reg_msg 
-    data = {}
-    data[reg_r.domino_udid_assigned] = [reg_msg.ipaddr, reg_msg.tcpport, reg_msg.supported_templates, reg_msg.seq_no]
-    with open(SERVER_DBFILE, 'a') as f:
-      json.dump(data, f)
-      f.close()
+    self.dominoServer.registration_record[reg_r.domino_udid_assigned] = reg_msg    
+
+    #commit to the database
+    dbconn = sqlite3.connect(SERVER_DBFILE)
+    c = dbconn.cursor()
+    try:
+      newrow = [(reg_r.domino_udid_assigned, reg_msg.ipaddr, reg_msg.tcpport, ','.join(reg_msg.supported_templates), reg_msg.seq_no),]
+      c.executemany('INSERT INTO clients VALUES (?,?,?,?,?)',newrow)
+    except sqlite3.OperationalError as ex:
+      logging.error('Could not add the new registration record into %s for Domino Client %d :  %s', SERVER_DBFILE, reg_r.domino_udid_assigned, ex.message)
+    except:
+      logging.error('Could not add the new registration record into %s for Domino Client %d', SERVER_DBFILE, reg_r.domino_udid_assigned)
+      logging.error('Unexpected error: %s', sys.exc_info()[0])
+    dbconn.commit()
+    dbconn.close()
 
     return reg_r
 
@@ -149,7 +158,6 @@ class CommunicationHandler:
   #       - Save the templates  & labels
   #       - Respond Back with Subscription Response
   def d_subscribe(self, sub_msg):
-    global SERVER_UDID, SERVER_SEQNO
     logging.info('Subscribe Request received from %d' , sub_msg.domino_udid)
 
     if sub_msg.template_op == APPEND:
@@ -162,18 +170,49 @@ class CommunicationHandler:
     elif sub_msg.template_op == DELETE:
       self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].difference_update(set(sub_msg.supported_template_types))
 
-    if sub_msg.labels != []:
-      if sub_msg.label_op == APPEND:
-        if self.dominoServer.subscribed_labels.has_key(sub_msg.domino_udid):
-          self.dominoServer.subscribed_labels[sub_msg.domino_udid].update(set(sub_msg.labels))
-        else:
-          self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
-      elif sub_msg.label_op == OVERWRITE:
-        self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
-      elif sub_msg.label_op == DELETE:
-        self.dominoServer.subscribed_labels[sub_msg.domino_udid].difference_update(set(sub_msg.labels))
+#    if sub_msg.labels != []:
+    if sub_msg.label_op == APPEND:
+      logging.debug('APPENDING Labels...')
+      if self.dominoServer.subscribed_labels.has_key(sub_msg.domino_udid):
+       self.dominoServer.subscribed_labels[sub_msg.domino_udid].update(set(sub_msg.labels))
+      else:
+       self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
+    elif sub_msg.label_op == OVERWRITE:
+      logging.debug('OVERWRITING Labels...')
+      self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
+    elif sub_msg.label_op == DELETE:
+      logging.debug('DELETING Labels...')
+      self.dominoServer.subscribed_labels[sub_msg.domino_udid].difference_update(set(sub_msg.labels))
 
     logging.debug('Supported Template: %s Supported Labels: %s' , self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] , self.dominoServer.subscribed_labels[sub_msg.domino_udid])
+
+    #commit to the database
+    dbconn = sqlite3.connect(SERVER_DBFILE)
+    c = dbconn.cursor()
+    newlabelset = self.dominoServer.subscribed_labels[sub_msg.domino_udid]
+    try:
+      c.execute("REPLACE INTO labels (udid, label_list) VALUES ({udid}, '{newvalue}')".\
+               format(udid=sub_msg.domino_udid, newvalue=','.join(list(newlabelset)) ))
+    except sqlite3.OperationalError as ex1:
+      logging.error('Could not add the new labels to %s for Domino Client %d :  %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
+    except:
+      logging.error('Could not add the new labels to %s for Domino Client %d', SERVER_DBFILE, sub_msg.domino_udid)
+      logging.error('Unexpected error: %s', sys.exc_info()[0])
+
+    newttypeset = self.dominoServer.subscribed_templateformats[sub_msg.domino_udid]
+    try:
+      c.execute("REPLACE INTO ttypes (udid, ttype_list) VALUES ({udid}, '{newvalue}')".\
+               format(udid=sub_msg.domino_udid, newvalue=','.join(list(newttypeset)) ))
+    except sqlite3.OperationalError as ex1:
+      logging.error('Could not add the new labels to %s for Domino Client %d :  %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
+    except:
+      logging.error('Could not add the new labels to %s for Domino Client %d', SERVER_DBFILE, sub_msg.domino_udid)
+      logging.error('Unexpected error: %s', sys.exc_info()[0])
+
+
+    dbconn.commit()
+    dbconn.close()
+
  
     #Fill in the details
     sub_r = SubscribeResponseMessage()
@@ -190,7 +229,6 @@ class CommunicationHandler:
   #       - Launch Push service
   #       - Respond Back with Publication Response
   def d_publish(self, pub_msg):
-    global SERVER_UDID, SERVER_SEQNO, TOSCADIR, TOSCA_DEFAULT_FNAME
     logging.info('Publish Request received from %d' , pub_msg.domino_udid)
     logging.debug(pub_msg.template)
 
@@ -201,19 +239,38 @@ class CommunicationHandler:
       if exception.errno == errno.EEXIST:
         logging.debug('ERRNO %d; %s exists. Creating: %s', exception.errno, TOSCADIR,  TOSCADIR+TOSCA_DEFAULT_FNAME)
       else:
-        logging.error('Error occurred in creating %s. Err no: %d', exception.errno)
+        logging.error('IGNORING error occurred in creating %s. Err no: %d', exception.errno)
 
     #Risking a race condition if another process is attempting to write to same file
-    f = open(TOSCADIR+TOSCA_DEFAULT_FNAME, 'w')  
-    for item in pub_msg.template:
-      print>>f, item
-    f.close()
+    try:
+      miscutil.write_templatefile(TOSCADIR+TOSCA_DEFAULT_FNAME , pub_msg.template)
+    except:
+      #Some sort of race condition should have occured that prevented the write operation
+      #treat as failure
+      logging.error('FAILED to write the published file: %s', sys.exc_info()[0])
+      pub_r = PublishResponseMessage()
+      pub_r.domino_udid = SERVER_UDID
+      pub_r.seq_no = self.seqno
+      pub_r.responseCode = FAILED
+      self.seqno = self.seqno + 1
+      return pub_r
 
     # Load tosca object from file into memory
-    tosca = ToscaTemplate( TOSCADIR+TOSCA_DEFAULT_FNAME )
-    
+    try:
+      #tosca = ToscaTemplate( TOSCADIR+TOSCA_DEFAULT_FNAME )
+      tpl = yaml.load(file(TOSCADIR+TOSCA_DEFAULT_FNAME,'r'))
+    except:
+      logging.error('Tosca Parser error: %s', sys.exc_info()[0])
+      #tosca file could not be read
+      pub_r = PublishResponseMessage()
+      pub_r.domino_udid = SERVER_UDID
+      pub_r.seq_no = self.seqno
+      pub_r.responseCode = FAILED
+      self.seqno = self.seqno + 1 
+      return pub_r 
+
     # Extract Labels
-    node_labels = label.extract_labels( tosca )
+    node_labels = label.extract_labels( tpl )
     logging.debug('Node Labels: %s', node_labels)
 
     # Map nodes in the template to resource domains
@@ -225,26 +282,48 @@ class CommunicationHandler:
     logging.debug('Selected Sites: %s', node_site)
 
     # Create per-domain Tosca files
-    file_paths = partitioner.partition_tosca('./toscafiles/template1.yaml',node_site,tosca.tpl)
-    
-    # Create list of translated template files
-
+    file_paths = partitioner.partition_tosca('./toscafiles/template',node_site,tpl)
+    logging.debug('Per domain file paths: %s', file_paths)
     # Create work-flow
 
+    # Create response with response code as SUCCESS by default
+    # Response code will be overwrittent if partial or full failure occurs
+    pub_r = PublishResponseMessage()
+    pub_r.domino_udid = SERVER_UDID
+    pub_r.seq_no = self.seqno
+    pub_r.responseCode = SUCCESS
+    self.seqno = self.seqno + 1
+
     # Send domain templates to each domain agent/client 
     # FOR NOW: send untranslated but partitioned tosca files to scheduled sites
     # TBD: read from work-flow
     for site in file_paths:
       domino_client_ip = self.dominoServer.registration_record[site].ipaddr
       domino_client_port = self.dominoServer.registration_record[site].tcpport
-      self.push_template(miscutil.read_templatefile(file_paths[site]), domino_client_ip, domino_client_port)
+      try:
+        if 'hot' in self.dominoServer.subscribed_templateformats[site]:
+          tosca = ToscaTemplate(file_paths[site])
+          translator = TOSCATranslator(tosca, {}, False)
+          output = translator.translate()
+          logging.debug('HOT translation: \n %s', output)
+          template_lines = [ output ]
+        else: 
+          template_lines = miscutil.read_templatefile(file_paths[site]) 
+        self.push_template(template_lines, domino_client_ip, domino_client_port)
+      except IOError as e:
+        logging.error('I/O error(%d): %s' , e.errno, e.strerror)
+        pub_r.responseCode = FAILED
+      except:
+        logging.error('Error: %s', sys.exc_info()[0])
+        pub_r.responseCode = FAILED
+
+    # Check if any file is generated for distribution, if not
+    # return FAILED as responseCode, we should also send description for
+    # reason
+    if len(file_paths) == 0:
+      pub_r.responseCode = FAILED
 
-    #Fill in the details
-    pub_r = PublishResponseMessage()
-    pub_r.domino_udid = SERVER_UDID
-    pub_r.seq_no = self.seqno
-    pub_r.responseCode = SUCCESS
-    self.seqno = self.seqno + 1 
     return pub_r
     
   #Query from Domino Client is received
@@ -266,7 +345,7 @@ class DominoServer:
      self.registration_record = dict() 
      self.communicationHandler = CommunicationHandler(self)
      self.processor = Communication.Processor(self.communicationHandler)
-     self.transport = TSocket.TServerSocket(port=9090)
+     self.transport = TSocket.TServerSocket(port=DOMINO_SERVER_PORT)
      self.tfactory = TTransport.TBufferedTransportFactory()
      self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
      #Use TThreadedServer or TThreadPoolServer for a multithreaded server
@@ -301,7 +380,7 @@ class DominoServer:
 
 def main(argv):
   server = DominoServer()
-
+  loglevel = LOGLEVEL
   #process input arguments
   try:
       opts, args = getopt.getopt(argv,"hc:l:",["conf=","log="])
@@ -310,11 +389,11 @@ def main(argv):
       sys.exit(2)
   for opt, arg in opts:
       if opt == '-h':
-         print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel>'
+         print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>'
          sys.exit()
       elif opt in ("-c", "--conf"):
          configfile = arg
-      elif opt in ("--log"):
+      elif opt in ("-l", "--log"):
          loglevel= arg
   #Set logging level
   numeric_level = getattr(logging, loglevel.upper(), None)
@@ -326,6 +405,27 @@ def main(argv):
     print ex.message
     sys.exit(2)
 
+  #start the database with schemas
+  dbconn = sqlite3.connect(SERVER_DBFILE)
+  c = dbconn.cursor()
+  try:
+    c.execute('''CREATE TABLE labels (udid INTEGER PRIMARY KEY, label_list TEXT)''')
+  except sqlite3.OperationalError as ex:
+    logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
+
+  try:
+    c.execute('''CREATE TABLE ttypes (udid INTEGER PRIMARY KEY, ttype_list TEXT)''')
+  except sqlite3.OperationalError as ex:
+    logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
+
+  try:
+    c.execute('''CREATE TABLE clients (udid INTEGER PRIMARY KEY, ipaddr TEXT, tcpport INTEGER, templatetypes TEXT, seqno INTEGER)''')
+  except sqlite3.OperationalError as ex:
+    logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
+
+  dbconn.commit()
+  dbconn.close()
+
   logging.debug('Domino Server Starting...')
   server.start_communicationService()
   print 'done.'