#!/usr/bin/env python
-#Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+#Copyright 2016 Open Platform for NFV Project, Inc. and its contributors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
import sys, os, glob, random, errno
import getopt, socket
import logging, json
-#sys.path.append('gen-py')
-#sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
+import sqlite3, yaml
+import uuid
+
sys.path.insert(0, glob.glob('./lib')[0])
from toscaparser.tosca_template import ToscaTemplate
#from toscaparser.utils.gettextutils import _
#import toscaparser.utils.urlutils
+from translator.hot.tosca_translator import TOSCATranslator
+
from mapper import *
from partitioner import *
def __init__(self, dominoserver):
self.log = {}
self.dominoServer = dominoserver
- self.seqno = 0;
+ self.seqno = SERVER_SEQNO;
def openconnection(self, ipaddr, tcpport):
try:
# Create a client to use the protocol encoder
self.sender = Communication.Client(self.protocol)
self.transport.open()
- except Thrift.TException, tx:
- logging.error('%s' , tx.message)
-
+ except:
+ raise
def closeconnection(self):
self.transport.close()
- def push_template(self,template,ipaddr,tcpport):
- self.openconnection(ipaddr,tcpport)
- pushm = PushMessage()
- pushm.domino_udid = SERVER_UDID
- pushm.seq_no = self.seqno
- pushm.template_type = 'tosca-nfv-v1.0'
- pushm.template = template
+ def push_template(self,template,ipaddr,tcpport,TUID):
try:
+ self.openconnection(ipaddr,tcpport)
+ pushm = PushMessage()
+ pushm.domino_udid = SERVER_UDID
+ pushm.seq_no = self.seqno
+ pushm.template_type = 'tosca-nfv-v1.0'
+ pushm.template = template
+ pushm.template_UUID = TUID
+ self.seqno = self.seqno + 1
+
push_r = self.sender.d_push(pushm)
- logging.info('Push Response received from %d' , push_r.domino_udid)
- except (Thrift.TException, TSocket.TTransportException) as tx:
- logging.error('%s' , tx.message)
+ logging.info('Push Response received from %s' , push_r.domino_udid)
+ self.closeconnection()
except (socket.timeout) as tx:
self.dominoServer.handle_RPC_timeout(pushm)
+ raise tx
except:
logging.error('Unexpected error: %s', sys.exc_info()[0])
-
- self.seqno = self.seqno + 1
-
- self.closeconnection()
+ raise
#Heartbeat from Domino Client is received
#Actions:
# - Respond Back with a heartbeat
def d_heartbeat(self, hb_msg):
- global SERVER_UDID
- logging.info('heartbeat received from %d' , hb_msg.domino_udid)
+ logging.info('heartbeat received from %s' , hb_msg.domino_udid)
hb_r = HeartBeatMessage()
hb_r.domino_udid = SERVER_UDID
#
# - Respond Back with Registration Response
def d_register(self, reg_msg):
- global SERVER_UDID
#Prepare and send Registration Response
reg_r = RegisterResponseMessage()
- logging.info('Registration Request received for UDID %d from IP: %s port: %d', reg_msg.domino_udid_desired, reg_msg.ipaddr, reg_msg.tcpport)
+ logging.info('Registration Request received for UUID %s from IP: %s port: %d', reg_msg.domino_udid_desired, reg_msg.ipaddr, reg_msg.tcpport)
reg_r.domino_udid_assigned = self.dominoServer.assign_udid(reg_msg.domino_udid_desired)
self.seqno = self.seqno + 1
- # Store the Domino Client info
- # TBD: check the sequence number to ensure the most recent record is saved
- self.dominoServer.registration_record[reg_r.domino_udid_assigned] = reg_msg
- data = {}
- data[reg_r.domino_udid_assigned] = [reg_msg.ipaddr, reg_msg.tcpport, reg_msg.supported_templates, reg_msg.seq_no]
- with open(SERVER_DBFILE, 'a') as f:
- json.dump(data, f)
- f.close()
+ self.dominoServer.registration_record[reg_r.domino_udid_assigned] = reg_msg
+
+ #commit to the database
+ dbconn = sqlite3.connect(SERVER_DBFILE)
+ c = dbconn.cursor()
+ try:
+ newrow = [(reg_r.domino_udid_assigned, reg_msg.ipaddr, reg_msg.tcpport, ','.join(reg_msg.supported_templates), reg_msg.seq_no),]
+ c.executemany('INSERT INTO clients VALUES (?,?,?,?,?)',newrow)
+ except sqlite3.OperationalError as ex:
+ logging.error('Could not add the new registration record into %s for Domino Client %s : %s', SERVER_DBFILE, reg_r.domino_udid_assigned, ex.message)
+ except:
+ logging.error('Could not add the new registration record into %s for Domino Client %s', SERVER_DBFILE, reg_r.domino_udid_assigned)
+ logging.error('Unexpected error: %s', sys.exc_info()[0])
+
+ dbconn.commit()
+ dbconn.close()
return reg_r
# - Save the templates & labels
# - Respond Back with Subscription Response
def d_subscribe(self, sub_msg):
- global SERVER_UDID, SERVER_SEQNO
- logging.info('Subscribe Request received from %d' , sub_msg.domino_udid)
+ logging.info('Subscribe Request received from %s' , sub_msg.domino_udid)
if sub_msg.template_op == APPEND:
if self.dominoServer.subscribed_templateformats.has_key(sub_msg.domino_udid):
elif sub_msg.template_op == DELETE:
self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].difference_update(set(sub_msg.supported_template_types))
- if sub_msg.labels != []:
- if sub_msg.label_op == APPEND:
- if self.dominoServer.subscribed_labels.has_key(sub_msg.domino_udid):
- self.dominoServer.subscribed_labels[sub_msg.domino_udid].update(set(sub_msg.labels))
- else:
- self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
- elif sub_msg.label_op == OVERWRITE:
- self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
- elif sub_msg.label_op == DELETE:
- self.dominoServer.subscribed_labels[sub_msg.domino_udid].difference_update(set(sub_msg.labels))
+# if sub_msg.labels != []:
+ if sub_msg.label_op == APPEND:
+ logging.debug('APPENDING Labels...')
+ if self.dominoServer.subscribed_labels.has_key(sub_msg.domino_udid):
+ self.dominoServer.subscribed_labels[sub_msg.domino_udid].update(set(sub_msg.labels))
+ else:
+ self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
+ elif sub_msg.label_op == OVERWRITE:
+ logging.debug('OVERWRITING Labels...')
+ self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
+ elif sub_msg.label_op == DELETE:
+ logging.debug('DELETING Labels...')
+ self.dominoServer.subscribed_labels[sub_msg.domino_udid].difference_update(set(sub_msg.labels))
logging.debug('Supported Template: %s Supported Labels: %s' , self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] , self.dominoServer.subscribed_labels[sub_msg.domino_udid])
+
+ #commit to the database
+ dbconn = sqlite3.connect(SERVER_DBFILE)
+ c = dbconn.cursor()
+ newlabelset = self.dominoServer.subscribed_labels[sub_msg.domino_udid]
+ try:
+ newvalue=','.join(list(newlabelset))
+ c.execute( "REPLACE INTO labels VALUES (?,?)", (sub_msg.domino_udid,newvalue) )
+ except sqlite3.OperationalError as ex1:
+ logging.error('Could not add the new labels to %s for Domino Client %s : %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
+ except:
+ logging.error('Could not add the new labels to %s for Domino Client %s', SERVER_DBFILE, sub_msg.domino_udid)
+ logging.error('Unexpected error: %s', sys.exc_info()[0])
+
+ newttypeset = self.dominoServer.subscribed_templateformats[sub_msg.domino_udid]
+ try:
+ newvalue=','.join(list(newttypeset))
+ c.execute( "REPLACE INTO ttypes VALUES (?,?)", (sub_msg.domino_udid,newvalue) )
+ except sqlite3.OperationalError as ex1:
+ logging.error('Could not add the new labels to %s for Domino Client %s : %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
+ except:
+ logging.error('Could not add the new labels to %s for Domino Client %s', SERVER_DBFILE, sub_msg.domino_udid)
+ logging.error('Unexpected error: %s', sys.exc_info()[0])
+
+
+ dbconn.commit()
+ dbconn.close()
+
#Fill in the details
sub_r = SubscribeResponseMessage()
#Actions:
# - Parse the template, perform mapping, partition the template
# - Launch Push service
- # - Respond Back with Publication Response
+ # - Respond Back with Publication Response
def d_publish(self, pub_msg):
- global SERVER_UDID, SERVER_SEQNO, TOSCADIR, TOSCA_DEFAULT_FNAME
- logging.info('Publish Request received from %d' , pub_msg.domino_udid)
- logging.debug(pub_msg.template)
+ logging.info('Publish Request received from %s' , pub_msg.domino_udid)
+ #logging.debug(pub_msg.template)
+ # Create response with response code as SUCCESS by default
+ # Response code will be overwritten if partial or full failure occurs
+ pub_r = PublishResponseMessage()
+ pub_r.domino_udid = SERVER_UDID
+ pub_r.seq_no = self.seqno
+ pub_r.responseCode = SUCCESS
+ pub_r.template_UDID = pub_msg.template_UUID
+ self.seqno = self.seqno + 1
+
+ if (pub_msg.template_UUID is not None) and (self.dominoServer.TUID2Publisher.has_key(pub_msg.template_UUID) == False):
+ logging.debug('TEMPLATE UUID %s does not exist', pub_msg.template_UUID)
+ pub_r.responseCode = FAILED
+ return pub_r
+
# Save as file
try:
os.makedirs(TOSCADIR)
if exception.errno == errno.EEXIST:
logging.debug('ERRNO %d; %s exists. Creating: %s', exception.errno, TOSCADIR, TOSCADIR+TOSCA_DEFAULT_FNAME)
else:
- logging.error('Error occurred in creating %s. Err no: %d', exception.errno)
+ logging.error('IGNORING error occurred in creating %s. Err no: %d', exception.errno)
#Risking a race condition if another process is attempting to write to same file
- f = open(TOSCADIR+TOSCA_DEFAULT_FNAME, 'w')
- for item in pub_msg.template:
- print>>f, item
- f.close()
-
- # Load tosca object from file into memory
- tosca = ToscaTemplate( TOSCADIR+TOSCA_DEFAULT_FNAME )
+ try:
+ miscutil.write_templatefile(TOSCADIR+TOSCA_DEFAULT_FNAME , pub_msg.template)
+ except:
+ #Some sort of race condition should have occured that prevented the write operation
+ #treat as failure
+ logging.error('FAILED to write the published file: %s', sys.exc_info()[0])
+ pub_r.responseCode = FAILED
+ return pub_r
+ # Load tosca object from file into memory
+ try:
+ #tosca = ToscaTemplate( TOSCADIR+TOSCA_DEFAULT_FNAME )
+ tpl = yaml.load(file(TOSCADIR+TOSCA_DEFAULT_FNAME,'r'))
+ except:
+ logging.error('Tosca Parser error: %s', sys.exc_info()[0])
+ #tosca file could not be read
+ pub_r.responseCode = FAILED
+ return pub_r
+
# Extract Labels
- node_labels = label.extract_labels( tosca )
+ node_labels = label.extract_labels( tpl )
logging.debug('Node Labels: %s', node_labels)
# Map nodes in the template to resource domains
node_site = label.select_site( site_map )
logging.debug('Selected Sites: %s', node_site)
- # Create per-domain Tosca files
- file_paths = partitioner.partition_tosca('./toscafiles/template1.yaml',node_site,tosca.tpl)
-
- # Create list of translated template files
+ # Create per-site Tosca files
+ tpl_site = {}
+ file_paths = partitioner.partition_tosca('./toscafiles/template',node_site,tpl,tpl_site)
+ logging.debug('Per domain file paths: %s', file_paths)
+ logging.debug('Per domain topologies: %s', tpl_site)
+
+ # Detect boundary links
+ boundary_VLs, VL_sites = partitioner.return_boundarylinks(tpl_site)
+ logging.debug('Boundary VLs: %s', boundary_VLs)
+ logging.debug('VL sites: %s', VL_sites)
# Create work-flow
+ # Assign template UUID if no UUID specified
+ # Otherwise update the existing domains subscribed to TUID
+ unsuccessful_updates = []
+ if pub_msg.template_UUID is None:
+ pub_r.template_UDID = self.dominoServer.assign_tuid() #update response message with the newly assigned template UUID
+ else:
+ logging.debug('TEMPLATE UUID %s exists, verify publisher and update subscribers', pub_msg.template_UUID)
+ if self.dominoServer.TUID2Publisher[pub_msg.template_UUID] != pub_msg.domino_udid: #publisher is not the owner, reject
+ logging.error('FAILED to verify publisher: %s against the publisher on record: %s', pub_msg.domino_udid, self.dominoServer.TUID2Publisher[pub_msg.template_UUID])
+ pub_r.responseCode = FAILED
+ return pub_r
+ else: #Template exists, we need to find clients that are no longer in the subscription list list
+ TUID_unsubscribed_list = list(set(self.dominoServer.TUID2Subscribers[pub_r.template_UDID]) - set(file_paths.keys()))
+ if len(TUID_unsubscribed_list) > 0:
+ logging.debug('%s no longer host any nodes for TUID %s', TUID_unsubscribed_list, pub_r.template_UDID)
+ # Send empty bodied templates to domains which no longer has any assigned resource
+ template_lines = []
+ for i in range(len(TUID_unsubscribed_list)):
+ domino_client_ip = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].ipaddr
+ domino_client_port = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].tcpport
+ try:
+ self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UDID)
+ except:
+ logging.error('Error in pushing template: %s', sys.exc_info()[0])
+ unsuccessful_updates.append(TUID_unsubscribed_list[i])
+
+ # The following template distribution is not transactional, meaning that some domains
+ # might be successfull receiving their sub-templates while some other might not
+ # The function returns FAILED code to the publisher in such situations, meaning that
+ # publisher must republish to safely orchestrate/manage NS or VNF
+
# Send domain templates to each domain agent/client
# FOR NOW: send untranslated but partitioned tosca files to scheduled sites
# TBD: read from work-flow
for site in file_paths:
domino_client_ip = self.dominoServer.registration_record[site].ipaddr
domino_client_port = self.dominoServer.registration_record[site].tcpport
- self.push_template(miscutil.read_templatefile(file_paths[site]), domino_client_ip, domino_client_port)
+ try:
+ if 'hot' in self.dominoServer.subscribed_templateformats[site]:
+ tosca = ToscaTemplate(file_paths[site])
+ translator = TOSCATranslator(tosca, {}, False)
+ output = translator.translate()
+ logging.debug('HOT translation: \n %s', output)
+ template_lines = [ output ]
+ else:
+ template_lines = miscutil.read_templatefile(file_paths[site])
+ self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UDID)
+ except IOError as e:
+ logging.error('I/O error(%d): %s' , e.errno, e.strerror)
+ pub_r.responseCode = FAILED
+ except:
+ logging.error('Error: %s', sys.exc_info()[0])
+ pub_r.responseCode = FAILED
+
+ # Check if any file is generated for distribution, if not
+ # return FAILED as responseCode, we should also send description for
+ # reason
+ if len(file_paths) == 0:
+ pub_r.responseCode = FAILED
+
+
+ dbconn = sqlite3.connect(SERVER_DBFILE)
+ c = dbconn.cursor()
+
+ if pub_r.responseCode == SUCCESS:
+ # update in memory database
+ self.dominoServer.TUID2Publisher[pub_r.template_UDID] = pub_msg.domino_udid
+ try:
+ c.execute( "REPLACE INTO templates VALUES (?,?)", (pub_r.template_UDID,pub_msg.domino_udid) )
+ dbconn.commit()
+ except sqlite3.OperationalError as ex1:
+ logging.error('Could not add new TUID %s DB for Domino Client %s : %s', pub_r.template_UDID, pub_msg.domino_udid, ex1.message)
+ except:
+ logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UDID, pub_msg.domino_udid)
+ logging.error('Unexpected error: %s', sys.exc_info()[0])
+ else:
+ self.dominoServer.TUID2Publisher[pub_r.template_UDID] = pub_msg.domino_udid
+
+ # update in memory database
+ self.dominoServer.TUID2Subscribers[pub_r.template_UDID] = list(set(unsuccessful_updates).union(set(file_paths.keys()))) #file_paths.keys()
+ logging.debug('Subscribers: %s for TUID: %s', self.dominoServer.TUID2Subscribers[pub_r.template_UDID], pub_r.template_UDID)
+ try:
+ newvalue = ','.join(self.dominoServer.TUID2Subscribers[pub_r.template_UDID])
+ c.execute( "REPLACE INTO subscribers VALUES (?,?)", (pub_r.template_UDID,newvalue) )
+ dbconn.commit()
+ except sqlite3.OperationalError as ex1:
+ logging.error('Could not add new subscribers for TUID %s for Domino Client %s: %s', pub_r.template_UDID, pub_msg.domino_udid, ex1.message)
+ except:
+ logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UDID, pub_msg.domino_udid)
+ logging.error('Unexpected error: %s', sys.exc_info()[0])
+
+ dbconn.close()
- #Fill in the details
- pub_r = PublishResponseMessage()
- pub_r.domino_udid = SERVER_UDID
- pub_r.seq_no = self.seqno
- pub_r.responseCode = SUCCESS
- self.seqno = self.seqno + 1
return pub_r
#Query from Domino Client is received
def d_query(self, qu_msg):
#Fill in the details
qu_r = QueryResponseMessage()
+ qu_r.domino_udid = SERVER_UDID
+ qu_r.seq_no = self.seqno
+ qu_r.responseCode = SUCCESS
+ qu_r.queryResponse = []
+
+ for i in range(len(qu_msg.queryString)):
+ if qu_msg.queryString[i] == 'list-tuids': # limit the response to TUIDs that belong to this domino client
+ qu_r.queryResponse.extend([j for j in self.dominoServer.TUID2Publisher.keys() if self.dominoServer.TUID2Publisher[j] == qu_msg.domino_udid])
+ self.seqno = self.seqno + 1
return qu_r
self.subscribed_labels = dict()
self.subscribed_templateformats = dict()
self.registration_record = dict()
+ self.assignedTUIDs = list()
+ self.TUID2Publisher = dict()
+ self.TUID2Subscribers = dict()
self.communicationHandler = CommunicationHandler(self)
self.processor = Communication.Processor(self.communicationHandler)
self.transport = TSocket.TServerSocket(port=DOMINO_SERVER_PORT)
#If not assigned, assign it
#If assigned, offer a new random id
def assign_udid(self, udid_desired):
- if udid_desired in self.assignedUUIDs:
- new_udid = random.getrandbits(63)
- while new_udid in self.assignedUUIDs:
- new_udid = random.getrandbits(63)
-
- self.assignedUUIDs.append(new_udid)
- return new_udid
- else:
- self.assignedUUIDs.append(udid_desired)
- return udid_desired
-
+ new_udid = udid_desired
+ while new_udid in self.assignedUUIDs:
+ new_udid = uuid.uuid4().hex
+ self.assignedUUIDs.append(new_udid)
+ return new_udid
+
+ def assign_tuid(self):
+ new_TUID = uuid.uuid4().hex
+ while new_TUID in self.assignedTUIDs:
+ new_TUID = uuid.uuid4().hex
+ self.assignedTUIDs.append(new_TUID)
+ return new_TUID
+
def handle_RPC_timeout(self, RPCmessage):
if RPCmessage.messageType == PUSH:
logging.debug('RPC Timeout for message type: PUSH')
def main(argv):
server = DominoServer()
- loglevel = 'WARNING'
+ loglevel = LOGLEVEL
#process input arguments
try:
opts, args = getopt.getopt(argv,"hc:l:",["conf=","log="])
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
- print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel>'
+ print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>'
sys.exit()
elif opt in ("-c", "--conf"):
configfile = arg
- elif opt in ("--log"):
+ elif opt in ("-l", "--log"):
loglevel= arg
#Set logging level
numeric_level = getattr(logging, loglevel.upper(), None)
print ex.message
sys.exit(2)
+ #start the database with schemas
+ dbconn = sqlite3.connect(SERVER_DBFILE)
+ c = dbconn.cursor()
+ try:
+ c.execute('''CREATE TABLE labels (udid TEXT PRIMARY KEY, label_list TEXT)''')
+ except sqlite3.OperationalError as ex:
+ logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
+
+ try:
+ c.execute('''CREATE TABLE ttypes (udid TEXT PRIMARY KEY, ttype_list TEXT)''')
+ except sqlite3.OperationalError as ex:
+ logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
+
+ try:
+ c.execute('''CREATE TABLE clients (udid TEXT PRIMARY KEY, ipaddr TEXT, tcpport INTEGER, templatetypes TEXT, seqno INTEGER)''')
+ except sqlite3.OperationalError as ex:
+ logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
+
+ try:
+ c.execute('''CREATE TABLE templates (uuid_t TEXT PRIMARY KEY, udid TEXT)''')
+ except sqlite3.OperationalError as ex:
+ logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
+
+ try:
+ c.execute('''CREATE TABLE subscribers (tuid TEXT PRIMARY KEY, subscriber_list TEXT)''')
+ except sqlite3.OperationalError as ex:
+ logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
+
+ dbconn.commit()
+ dbconn.close()
+
logging.debug('Domino Server Starting...')
server.start_communicationService()
print 'done.'