X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=DominoServer.py;h=679ca423fe8605b0ba6c9b319670a120ad6a27d1;hb=f5c639eebbb0b60c68c159579fc2711760db8759;hp=c7d58efbf2be8fc6c8880faf231adbb99bc3c419;hpb=64eec6741cea5e4b4dd9fe859a36b46b8e353889;p=domino.git diff --git a/DominoServer.py b/DominoServer.py index c7d58ef..679ca42 100755 --- a/DominoServer.py +++ b/DominoServer.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -#Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +#Copyright 2016 Open Platform for NFV Project, Inc. and its contributors # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -14,9 +14,9 @@ import sys, os, glob, random, errno import getopt, socket import logging, json -import sqlite3 -#sys.path.append('gen-py') -#sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0]) +import sqlite3, yaml +import uuid + sys.path.insert(0, glob.glob('./lib')[0]) @@ -65,41 +65,40 @@ class CommunicationHandler: # Create a client to use the protocol encoder self.sender = Communication.Client(self.protocol) self.transport.open() - except Thrift.TException, tx: - logging.error('%s' , tx.message) - + except: + raise def closeconnection(self): self.transport.close() - def push_template(self,template,ipaddr,tcpport): - self.openconnection(ipaddr,tcpport) - pushm = PushMessage() - pushm.domino_udid = SERVER_UDID - pushm.seq_no = self.seqno - pushm.template_type = 'tosca-nfv-v1.0' - pushm.template = template + def push_template(self,template,ipaddr,tcpport,TUID): try: + self.openconnection(ipaddr,tcpport) + pushm = PushMessage() + pushm.domino_udid = SERVER_UDID + pushm.seq_no = self.seqno + pushm.template_type = 'tosca-nfv-v1.0' + pushm.template = template + pushm.template_UUID = TUID + self.seqno = self.seqno + 1 + push_r = self.sender.d_push(pushm) - logging.info('Push Response received from %d' , push_r.domino_udid) - except (Thrift.TException, TSocket.TTransportException) as tx: - logging.error('%s' , tx.message) + logging.info('Push Response received from %s' , push_r.domino_udid) + self.closeconnection() except (socket.timeout) as tx: self.dominoServer.handle_RPC_timeout(pushm) + raise tx except: logging.error('Unexpected error: %s', sys.exc_info()[0]) - - self.seqno = self.seqno + 1 - - self.closeconnection() + raise #Heartbeat from Domino Client is received #Actions: # - Respond Back with a heartbeat def d_heartbeat(self, hb_msg): - logging.info('heartbeat received from %d' , hb_msg.domino_udid) + logging.info('heartbeat received from %s' , hb_msg.domino_udid) hb_r = HeartBeatMessage() hb_r.domino_udid = SERVER_UDID @@ -117,7 +116,7 @@ class CommunicationHandler: #Prepare and send Registration Response reg_r = RegisterResponseMessage() - logging.info('Registration Request received for UDID %d from IP: %s port: %d', reg_msg.domino_udid_desired, reg_msg.ipaddr, reg_msg.tcpport) + logging.info('Registration Request received for UUID %s from IP: %s port: %d', reg_msg.domino_udid_desired, reg_msg.ipaddr, reg_msg.tcpport) reg_r.domino_udid_assigned = self.dominoServer.assign_udid(reg_msg.domino_udid_desired) @@ -142,9 +141,9 @@ class CommunicationHandler: newrow = [(reg_r.domino_udid_assigned, reg_msg.ipaddr, reg_msg.tcpport, ','.join(reg_msg.supported_templates), reg_msg.seq_no),] c.executemany('INSERT INTO clients VALUES (?,?,?,?,?)',newrow) except sqlite3.OperationalError as ex: - logging.error('Could not add the new registration record into %s for Domino Client %d : %s', SERVER_DBFILE, reg_r.domino_udid_assigned, ex.message) + logging.error('Could not add the new registration record into %s for Domino Client %s : %s', SERVER_DBFILE, reg_r.domino_udid_assigned, ex.message) except: - logging.error('Could not add the new registration record into %s for Domino Client %d', SERVER_DBFILE, reg_r.domino_udid_assigned) + logging.error('Could not add the new registration record into %s for Domino Client %s', SERVER_DBFILE, reg_r.domino_udid_assigned) logging.error('Unexpected error: %s', sys.exc_info()[0]) dbconn.commit() @@ -158,7 +157,7 @@ class CommunicationHandler: # - Save the templates & labels # - Respond Back with Subscription Response def d_subscribe(self, sub_msg): - logging.info('Subscribe Request received from %d' , sub_msg.domino_udid) + logging.info('Subscribe Request received from %s' , sub_msg.domino_udid) if sub_msg.template_op == APPEND: if self.dominoServer.subscribed_templateformats.has_key(sub_msg.domino_udid): @@ -191,22 +190,22 @@ class CommunicationHandler: c = dbconn.cursor() newlabelset = self.dominoServer.subscribed_labels[sub_msg.domino_udid] try: - c.execute("REPLACE INTO labels (udid, label_list) VALUES ({udid}, '{newvalue}')".\ - format(udid=sub_msg.domino_udid, newvalue=','.join(list(newlabelset)) )) + newvalue=','.join(list(newlabelset)) + c.execute( "REPLACE INTO labels VALUES (?,?)", (sub_msg.domino_udid,newvalue) ) except sqlite3.OperationalError as ex1: - logging.error('Could not add the new labels to %s for Domino Client %d : %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message) + logging.error('Could not add the new labels to %s for Domino Client %s : %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message) except: - logging.error('Could not add the new labels to %s for Domino Client %d', SERVER_DBFILE, sub_msg.domino_udid) + logging.error('Could not add the new labels to %s for Domino Client %s', SERVER_DBFILE, sub_msg.domino_udid) logging.error('Unexpected error: %s', sys.exc_info()[0]) newttypeset = self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] try: - c.execute("REPLACE INTO ttypes (udid, ttype_list) VALUES ({udid}, '{newvalue}')".\ - format(udid=sub_msg.domino_udid, newvalue=','.join(list(newttypeset)) )) + newvalue=','.join(list(newttypeset)) + c.execute( "REPLACE INTO ttypes VALUES (?,?)", (sub_msg.domino_udid,newvalue) ) except sqlite3.OperationalError as ex1: - logging.error('Could not add the new labels to %s for Domino Client %d : %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message) + logging.error('Could not add the new labels to %s for Domino Client %s : %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message) except: - logging.error('Could not add the new labels to %s for Domino Client %d', SERVER_DBFILE, sub_msg.domino_udid) + logging.error('Could not add the new labels to %s for Domino Client %s', SERVER_DBFILE, sub_msg.domino_udid) logging.error('Unexpected error: %s', sys.exc_info()[0]) @@ -227,11 +226,25 @@ class CommunicationHandler: #Actions: # - Parse the template, perform mapping, partition the template # - Launch Push service - # - Respond Back with Publication Response + # - Respond Back with Publication Response def d_publish(self, pub_msg): - logging.info('Publish Request received from %d' , pub_msg.domino_udid) - logging.debug(pub_msg.template) + logging.info('Publish Request received from %s' , pub_msg.domino_udid) + #logging.debug(pub_msg.template) + + # Create response with response code as SUCCESS by default + # Response code will be overwritten if partial or full failure occurs + pub_r = PublishResponseMessage() + pub_r.domino_udid = SERVER_UDID + pub_r.seq_no = self.seqno + pub_r.responseCode = SUCCESS + pub_r.template_UDID = pub_msg.template_UUID + self.seqno = self.seqno + 1 + if (pub_msg.template_UUID is not None) and (self.dominoServer.TUID2Publisher.has_key(pub_msg.template_UUID) == False): + logging.debug('TEMPLATE UUID %s does not exist', pub_msg.template_UUID) + pub_r.responseCode = FAILED + return pub_r + # Save as file try: os.makedirs(TOSCADIR) @@ -248,28 +261,21 @@ class CommunicationHandler: #Some sort of race condition should have occured that prevented the write operation #treat as failure logging.error('FAILED to write the published file: %s', sys.exc_info()[0]) - pub_r = PublishResponseMessage() - pub_r.domino_udid = SERVER_UDID - pub_r.seq_no = self.seqno pub_r.responseCode = FAILED - self.seqno = self.seqno + 1 return pub_r - + # Load tosca object from file into memory try: - tosca = ToscaTemplate( TOSCADIR+TOSCA_DEFAULT_FNAME ) + #tosca = ToscaTemplate( TOSCADIR+TOSCA_DEFAULT_FNAME ) + tpl = yaml.load(file(TOSCADIR+TOSCA_DEFAULT_FNAME,'r')) except: logging.error('Tosca Parser error: %s', sys.exc_info()[0]) #tosca file could not be read - pub_r = PublishResponseMessage() - pub_r.domino_udid = SERVER_UDID - pub_r.seq_no = self.seqno pub_r.responseCode = FAILED - self.seqno = self.seqno + 1 return pub_r # Extract Labels - node_labels = label.extract_labels( tosca ) + node_labels = label.extract_labels( tpl ) logging.debug('Node Labels: %s', node_labels) # Map nodes in the template to resource domains @@ -280,13 +286,49 @@ class CommunicationHandler: node_site = label.select_site( site_map ) logging.debug('Selected Sites: %s', node_site) - # Create per-domain Tosca files - file_paths = partitioner.partition_tosca('./toscafiles/template',node_site,tosca.tpl) + # Create per-site Tosca files + tpl_site = {} + file_paths = partitioner.partition_tosca('./toscafiles/template',node_site,tpl,tpl_site) logging.debug('Per domain file paths: %s', file_paths) - - # Create work-flow + logging.debug('Per domain topologies: %s', tpl_site) + + # Detect boundary links + boundary_VLs, VL_sites = partitioner.return_boundarylinks(tpl_site) + logging.debug('Boundary VLs: %s', boundary_VLs) + logging.debug('VL sites: %s', VL_sites) + # Create work-flow + # Assign template UUID if no UUID specified + # Otherwise update the existing domains subscribed to TUID + unsuccessful_updates = [] + if pub_msg.template_UUID is None: + pub_r.template_UDID = self.dominoServer.assign_tuid() #update response message with the newly assigned template UUID + else: + logging.debug('TEMPLATE UUID %s exists, verify publisher and update subscribers', pub_msg.template_UUID) + if self.dominoServer.TUID2Publisher[pub_msg.template_UUID] != pub_msg.domino_udid: #publisher is not the owner, reject + logging.error('FAILED to verify publisher: %s against the publisher on record: %s', pub_msg.domino_udid, self.dominoServer.TUID2Publisher[pub_msg.template_UUID]) + pub_r.responseCode = FAILED + return pub_r + else: #Template exists, we need to find clients that are no longer in the subscription list list + TUID_unsubscribed_list = list(set(self.dominoServer.TUID2Subscribers[pub_r.template_UDID]) - set(file_paths.keys())) + if len(TUID_unsubscribed_list) > 0: + logging.debug('%s no longer host any nodes for TUID %s', TUID_unsubscribed_list, pub_r.template_UDID) + # Send empty bodied templates to domains which no longer has any assigned resource + template_lines = [] + for i in range(len(TUID_unsubscribed_list)): + domino_client_ip = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].ipaddr + domino_client_port = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].tcpport + try: + self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UDID) + except: + logging.error('Error in pushing template: %s', sys.exc_info()[0]) + unsuccessful_updates.append(TUID_unsubscribed_list[i]) + + # The following template distribution is not transactional, meaning that some domains + # might be successfull receiving their sub-templates while some other might not + # The function returns FAILED code to the publisher in such situations, meaning that + # publisher must republish to safely orchestrate/manage NS or VNF # Send domain templates to each domain agent/client # FOR NOW: send untranslated but partitioned tosca files to scheduled sites @@ -303,18 +345,53 @@ class CommunicationHandler: template_lines = [ output ] else: template_lines = miscutil.read_templatefile(file_paths[site]) - self.push_template(template_lines, domino_client_ip, domino_client_port) + self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UDID) except IOError as e: logging.error('I/O error(%d): %s' , e.errno, e.strerror) + pub_r.responseCode = FAILED except: logging.error('Error: %s', sys.exc_info()[0]) + pub_r.responseCode = FAILED + + # Check if any file is generated for distribution, if not + # return FAILED as responseCode, we should also send description for + # reason + if len(file_paths) == 0: + pub_r.responseCode = FAILED + + + dbconn = sqlite3.connect(SERVER_DBFILE) + c = dbconn.cursor() + + if pub_r.responseCode == SUCCESS: + # update in memory database + self.dominoServer.TUID2Publisher[pub_r.template_UDID] = pub_msg.domino_udid + try: + c.execute( "REPLACE INTO templates VALUES (?,?)", (pub_r.template_UDID,pub_msg.domino_udid) ) + dbconn.commit() + except sqlite3.OperationalError as ex1: + logging.error('Could not add new TUID %s DB for Domino Client %s : %s', pub_r.template_UDID, pub_msg.domino_udid, ex1.message) + except: + logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UDID, pub_msg.domino_udid) + logging.error('Unexpected error: %s', sys.exc_info()[0]) + else: + self.dominoServer.TUID2Publisher[pub_r.template_UDID] = pub_msg.domino_udid + + # update in memory database + self.dominoServer.TUID2Subscribers[pub_r.template_UDID] = list(set(unsuccessful_updates).union(set(file_paths.keys()))) #file_paths.keys() + logging.debug('Subscribers: %s for TUID: %s', self.dominoServer.TUID2Subscribers[pub_r.template_UDID], pub_r.template_UDID) + try: + newvalue = ','.join(self.dominoServer.TUID2Subscribers[pub_r.template_UDID]) + c.execute( "REPLACE INTO subscribers VALUES (?,?)", (pub_r.template_UDID,newvalue) ) + dbconn.commit() + except sqlite3.OperationalError as ex1: + logging.error('Could not add new subscribers for TUID %s for Domino Client %s: %s', pub_r.template_UDID, pub_msg.domino_udid, ex1.message) + except: + logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UDID, pub_msg.domino_udid) + logging.error('Unexpected error: %s', sys.exc_info()[0]) + + dbconn.close() - #Fill in the details - pub_r = PublishResponseMessage() - pub_r.domino_udid = SERVER_UDID - pub_r.seq_no = self.seqno - pub_r.responseCode = SUCCESS - self.seqno = self.seqno + 1 return pub_r #Query from Domino Client is received @@ -324,7 +401,16 @@ class CommunicationHandler: def d_query(self, qu_msg): #Fill in the details qu_r = QueryResponseMessage() + qu_r.domino_udid = SERVER_UDID + qu_r.seq_no = self.seqno + qu_r.responseCode = SUCCESS + qu_r.queryResponse = [] + + for i in range(len(qu_msg.queryString)): + if qu_msg.queryString[i] == 'list-tuids': # limit the response to TUIDs that belong to this domino client + qu_r.queryResponse.extend([j for j in self.dominoServer.TUID2Publisher.keys() if self.dominoServer.TUID2Publisher[j] == qu_msg.domino_udid]) + self.seqno = self.seqno + 1 return qu_r @@ -334,6 +420,9 @@ class DominoServer: self.subscribed_labels = dict() self.subscribed_templateformats = dict() self.registration_record = dict() + self.assignedTUIDs = list() + self.TUID2Publisher = dict() + self.TUID2Subscribers = dict() self.communicationHandler = CommunicationHandler(self) self.processor = Communication.Processor(self.communicationHandler) self.transport = TSocket.TServerSocket(port=DOMINO_SERVER_PORT) @@ -353,17 +442,19 @@ class DominoServer: #If not assigned, assign it #If assigned, offer a new random id def assign_udid(self, udid_desired): - if udid_desired in self.assignedUUIDs: - new_udid = random.getrandbits(63) - while new_udid in self.assignedUUIDs: - new_udid = random.getrandbits(63) - - self.assignedUUIDs.append(new_udid) - return new_udid - else: - self.assignedUUIDs.append(udid_desired) - return udid_desired - + new_udid = udid_desired + while new_udid in self.assignedUUIDs: + new_udid = uuid.uuid4().hex + self.assignedUUIDs.append(new_udid) + return new_udid + + def assign_tuid(self): + new_TUID = uuid.uuid4().hex + while new_TUID in self.assignedTUIDs: + new_TUID = uuid.uuid4().hex + self.assignedTUIDs.append(new_TUID) + return new_TUID + def handle_RPC_timeout(self, RPCmessage): if RPCmessage.messageType == PUSH: logging.debug('RPC Timeout for message type: PUSH') @@ -400,20 +491,30 @@ def main(argv): dbconn = sqlite3.connect(SERVER_DBFILE) c = dbconn.cursor() try: - c.execute('''CREATE TABLE labels (udid INTEGER PRIMARY KEY, label_list TEXT)''') + c.execute('''CREATE TABLE labels (udid TEXT PRIMARY KEY, label_list TEXT)''') except sqlite3.OperationalError as ex: logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message) try: - c.execute('''CREATE TABLE ttypes (udid INTEGER PRIMARY KEY, ttype_list TEXT)''') + c.execute('''CREATE TABLE ttypes (udid TEXT PRIMARY KEY, ttype_list TEXT)''') except sqlite3.OperationalError as ex: logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message) try: - c.execute('''CREATE TABLE clients (udid INTEGER PRIMARY KEY, ipaddr TEXT, tcpport INTEGER, templatetypes TEXT, seqno INTEGER)''') + c.execute('''CREATE TABLE clients (udid TEXT PRIMARY KEY, ipaddr TEXT, tcpport INTEGER, templatetypes TEXT, seqno INTEGER)''') except sqlite3.OperationalError as ex: logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message) + try: + c.execute('''CREATE TABLE templates (uuid_t TEXT PRIMARY KEY, udid TEXT)''') + except sqlite3.OperationalError as ex: + logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message) + + try: + c.execute('''CREATE TABLE subscribers (tuid TEXT PRIMARY KEY, subscriber_list TEXT)''') + except sqlite3.OperationalError as ex: + logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message) + dbconn.commit() dbconn.close()