3 #Copyright 2016 Open Platform for NFV Project, Inc. and its contributors
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 import sys, os, glob, random, errno
20 sys.path.insert(0, glob.glob('./lib')[0])
23 from dominoRPC import Communication
24 from dominoRPC.ttypes import *
25 from dominoRPC.constants import *
27 from thrift import Thrift
28 from thrift.transport import TSocket
29 from thrift.transport import TTransport
30 from thrift.protocol import TBinaryProtocol
31 from thrift.server import TServer
33 from toscaparser.tosca_template import ToscaTemplate
34 #from toscaparser.utils.gettextutils import _
35 #import toscaparser.utils.urlutils
36 from translator.hot.tosca_translator import TOSCATranslator
40 from partitioner import *
41 from util import miscutil
43 #Load configuration parameters
44 from domino_conf import *
47 class CommunicationHandler:
51 def __init__(self, dominoserver):
53 self.dominoServer = dominoserver
54 self.seqno = SERVER_SEQNO;
56 def openconnection(self, ipaddr, tcpport):
59 transport = TSocket.TSocket(ipaddr, tcpport)
60 transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
61 # Add buffering to compensate for slow raw sockets
62 self.transport = TTransport.TBufferedTransport(transport)
64 self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
65 # Create a client to use the protocol encoder
66 self.sender = Communication.Client(self.protocol)
72 def closeconnection(self):
73 self.transport.close()
75 def push_template(self,template,ipaddr,tcpport,TUID):
77 self.openconnection(ipaddr,tcpport)
79 pushm.domino_udid = SERVER_UDID
80 pushm.seq_no = self.seqno
81 pushm.template_type = 'tosca-nfv-v1.0'
82 pushm.template = template
83 pushm.template_UUID = TUID
84 self.seqno = self.seqno + 1
86 push_r = self.sender.d_push(pushm)
87 logging.info('Push Response received from %s' , push_r.domino_udid)
88 self.closeconnection()
89 except (socket.timeout) as tx:
90 self.dominoServer.handle_RPC_timeout(pushm)
93 logging.error('Unexpected error: %s', sys.exc_info()[0])
96 #Heartbeat from Domino Client is received
98 # - Respond Back with a heartbeat
100 def d_heartbeat(self, hb_msg):
101 logging.info('heartbeat received from %s' , hb_msg.domino_udid)
103 hb_r = HeartBeatMessage()
104 hb_r.domino_udid = SERVER_UDID
105 hb_r.seq_no = self.seqno
107 self.seqno = self.seqno + 1
111 #Registration from Domino Client is received
114 # - Respond Back with Registration Response
115 def d_register(self, reg_msg):
117 #Prepare and send Registration Response
118 reg_r = RegisterResponseMessage()
119 logging.info('Registration Request received for UUID %s from IP: %s port: %d', reg_msg.domino_udid_desired, reg_msg.ipaddr, reg_msg.tcpport)
122 reg_r.domino_udid_assigned = self.dominoServer.assign_udid(reg_msg.domino_udid_desired)
123 reg_r.seq_no = self.seqno
124 reg_r.domino_udid = SERVER_UDID
125 #return unconditional success
127 #Define conditions for unsuccessful registration (e.g., unsupported mapping)
128 reg_r.responseCode = SUCCESS
129 #no need to send comments
131 #Logic for a new UDID assignment
133 self.seqno = self.seqno + 1
135 self.dominoServer.registration_record[reg_r.domino_udid_assigned] = reg_msg
137 #commit to the database
138 dbconn = sqlite3.connect(SERVER_DBFILE)
141 newrow = [(reg_r.domino_udid_assigned, reg_msg.ipaddr, reg_msg.tcpport, ','.join(reg_msg.supported_templates), reg_msg.seq_no),]
142 c.executemany('INSERT INTO clients VALUES (?,?,?,?,?)',newrow)
143 except sqlite3.OperationalError as ex:
144 logging.error('Could not add the new registration record into %s for Domino Client %s : %s', SERVER_DBFILE, reg_r.domino_udid_assigned, ex.message)
146 logging.error('Could not add the new registration record into %s for Domino Client %s', SERVER_DBFILE, reg_r.domino_udid_assigned)
147 logging.error('Unexpected error: %s', sys.exc_info()[0])
155 #Subscription from Domino Client is received
157 # - Save the templates & labels
158 # - Respond Back with Subscription Response
159 def d_subscribe(self, sub_msg):
160 logging.info('Subscribe Request received from %s' , sub_msg.domino_udid)
162 if sub_msg.template_op == APPEND:
163 if self.dominoServer.subscribed_templateformats.has_key(sub_msg.domino_udid):
164 self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].update(set(sub_msg.supported_template_types))
166 self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
167 elif sub_msg.template_op == OVERWRITE:
168 self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
169 elif sub_msg.template_op == DELETE:
170 self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].difference_update(set(sub_msg.supported_template_types))
172 # if sub_msg.labels != []:
173 if sub_msg.label_op == APPEND:
174 logging.debug('APPENDING Labels...')
175 if self.dominoServer.subscribed_labels.has_key(sub_msg.domino_udid):
176 self.dominoServer.subscribed_labels[sub_msg.domino_udid].update(set(sub_msg.labels))
178 self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
179 elif sub_msg.label_op == OVERWRITE:
180 logging.debug('OVERWRITING Labels...')
181 self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
182 elif sub_msg.label_op == DELETE:
183 logging.debug('DELETING Labels...')
184 self.dominoServer.subscribed_labels[sub_msg.domino_udid].difference_update(set(sub_msg.labels))
186 logging.debug('Supported Template: %s Supported Labels: %s' , self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] , self.dominoServer.subscribed_labels[sub_msg.domino_udid])
188 #commit to the database
189 dbconn = sqlite3.connect(SERVER_DBFILE)
191 newlabelset = self.dominoServer.subscribed_labels[sub_msg.domino_udid]
193 newvalue=','.join(list(newlabelset))
194 c.execute( "REPLACE INTO labels VALUES (?,?)", (sub_msg.domino_udid,newvalue) )
195 except sqlite3.OperationalError as ex1:
196 logging.error('Could not add the new labels to %s for Domino Client %s : %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
198 logging.error('Could not add the new labels to %s for Domino Client %s', SERVER_DBFILE, sub_msg.domino_udid)
199 logging.error('Unexpected error: %s', sys.exc_info()[0])
201 newttypeset = self.dominoServer.subscribed_templateformats[sub_msg.domino_udid]
203 newvalue=','.join(list(newttypeset))
204 c.execute( "REPLACE INTO ttypes VALUES (?,?)", (sub_msg.domino_udid,newvalue) )
205 except sqlite3.OperationalError as ex1:
206 logging.error('Could not add the new labels to %s for Domino Client %s : %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
208 logging.error('Could not add the new labels to %s for Domino Client %s', SERVER_DBFILE, sub_msg.domino_udid)
209 logging.error('Unexpected error: %s', sys.exc_info()[0])
217 sub_r = SubscribeResponseMessage()
218 sub_r.domino_udid = SERVER_UDID
219 sub_r.seq_no = self.seqno
220 sub_r.responseCode = SUCCESS
221 self.seqno = self.seqno + 1
225 #Template Publication from Domino Client is received
227 # - Parse the template, perform mapping, partition the template
228 # - Launch Push service
229 # - Respond Back with Publication Response
230 def d_publish(self, pub_msg):
231 logging.info('Publish Request received from %s' , pub_msg.domino_udid)
232 #logging.debug(pub_msg.template)
234 # Create response with response code as SUCCESS by default
235 # Response code will be overwritten if partial or full failure occurs
236 pub_r = PublishResponseMessage()
237 pub_r.domino_udid = SERVER_UDID
238 pub_r.seq_no = self.seqno
239 pub_r.responseCode = SUCCESS
240 pub_r.template_UDID = pub_msg.template_UUID
241 self.seqno = self.seqno + 1
243 if (pub_msg.template_UUID is not None) and (self.dominoServer.TUID2Publisher.has_key(pub_msg.template_UUID) == False):
244 logging.debug('TEMPLATE UUID %s does not exist', pub_msg.template_UUID)
245 pub_r.responseCode = FAILED
250 os.makedirs(TOSCADIR)
251 except OSError as exception:
252 if exception.errno == errno.EEXIST:
253 logging.debug('ERRNO %d; %s exists. Creating: %s', exception.errno, TOSCADIR, TOSCADIR+TOSCA_DEFAULT_FNAME)
255 logging.error('IGNORING error occurred in creating %s. Err no: %d', exception.errno)
257 #Risking a race condition if another process is attempting to write to same file
259 miscutil.write_templatefile(TOSCADIR+TOSCA_DEFAULT_FNAME , pub_msg.template)
261 #Some sort of race condition should have occured that prevented the write operation
263 logging.error('FAILED to write the published file: %s', sys.exc_info()[0])
264 pub_r.responseCode = FAILED
267 # Load tosca object from file into memory
269 #tosca = ToscaTemplate( TOSCADIR+TOSCA_DEFAULT_FNAME )
270 tpl = yaml.load(file(TOSCADIR+TOSCA_DEFAULT_FNAME,'r'))
272 logging.error('Tosca Parser error: %s', sys.exc_info()[0])
273 #tosca file could not be read
274 pub_r.responseCode = FAILED
278 node_labels = label.extract_labels( tpl )
279 logging.debug('Node Labels: %s', node_labels)
281 # Map nodes in the template to resource domains
282 site_map = label.map_nodes( self.dominoServer.subscribed_labels , node_labels )
283 logging.debug('Site Maps: %s' , site_map)
285 # Select a site for each VNF
286 node_site = label.select_site( site_map )
287 logging.debug('Selected Sites: %s', node_site)
289 # Create per-site Tosca files
291 file_paths = partitioner.partition_tosca('./toscafiles/template',node_site,tpl,tpl_site)
292 logging.debug('Per domain file paths: %s', file_paths)
293 logging.debug('Per domain topologies: %s', tpl_site)
295 # Detect boundary links
296 boundary_VLs, VL_sites = partitioner.return_boundarylinks(tpl_site)
297 logging.debug('Boundary VLs: %s', boundary_VLs)
298 logging.debug('VL sites: %s', VL_sites)
302 # Assign template UUID if no UUID specified
303 # Otherwise update the existing domains subscribed to TUID
304 unsuccessful_updates = []
305 if pub_msg.template_UUID is None:
306 pub_r.template_UDID = self.dominoServer.assign_tuid() #update response message with the newly assigned template UUID
308 logging.debug('TEMPLATE UUID %s exists, verify publisher and update subscribers', pub_msg.template_UUID)
309 if self.dominoServer.TUID2Publisher[pub_msg.template_UUID] != pub_msg.domino_udid: #publisher is not the owner, reject
310 logging.error('FAILED to verify publisher: %s against the publisher on record: %s', pub_msg.domino_udid, self.dominoServer.TUID2Publisher[pub_msg.template_UUID])
311 pub_r.responseCode = FAILED
313 else: #Template exists, we need to find clients that are no longer in the subscription list list
314 TUID_unsubscribed_list = list(set(self.dominoServer.TUID2Subscribers[pub_r.template_UDID]) - set(file_paths.keys()))
315 if len(TUID_unsubscribed_list) > 0:
316 logging.debug('%s no longer host any nodes for TUID %s', TUID_unsubscribed_list, pub_r.template_UDID)
317 # Send empty bodied templates to domains which no longer has any assigned resource
319 for i in range(len(TUID_unsubscribed_list)):
320 domino_client_ip = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].ipaddr
321 domino_client_port = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].tcpport
323 self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UDID)
325 logging.error('Error in pushing template: %s', sys.exc_info()[0])
326 unsuccessful_updates.append(TUID_unsubscribed_list[i])
328 # The following template distribution is not transactional, meaning that some domains
329 # might be successfull receiving their sub-templates while some other might not
330 # The function returns FAILED code to the publisher in such situations, meaning that
331 # publisher must republish to safely orchestrate/manage NS or VNF
333 # Send domain templates to each domain agent/client
334 # FOR NOW: send untranslated but partitioned tosca files to scheduled sites
335 # TBD: read from work-flow
336 for site in file_paths:
337 domino_client_ip = self.dominoServer.registration_record[site].ipaddr
338 domino_client_port = self.dominoServer.registration_record[site].tcpport
340 if 'hot' in self.dominoServer.subscribed_templateformats[site]:
341 tosca = ToscaTemplate(file_paths[site])
342 translator = TOSCATranslator(tosca, {}, False)
343 output = translator.translate()
344 logging.debug('HOT translation: \n %s', output)
345 template_lines = [ output ]
347 template_lines = miscutil.read_templatefile(file_paths[site])
348 self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UDID)
350 logging.error('I/O error(%d): %s' , e.errno, e.strerror)
351 pub_r.responseCode = FAILED
353 logging.error('Error: %s', sys.exc_info()[0])
354 pub_r.responseCode = FAILED
356 # Check if any file is generated for distribution, if not
357 # return FAILED as responseCode, we should also send description for
359 if len(file_paths) == 0:
360 pub_r.responseCode = FAILED
363 dbconn = sqlite3.connect(SERVER_DBFILE)
366 if pub_r.responseCode == SUCCESS:
367 # update in memory database
368 self.dominoServer.TUID2Publisher[pub_r.template_UDID] = pub_msg.domino_udid
370 c.execute( "REPLACE INTO templates VALUES (?,?)", (pub_r.template_UDID,pub_msg.domino_udid) )
372 except sqlite3.OperationalError as ex1:
373 logging.error('Could not add new TUID %s DB for Domino Client %s : %s', pub_r.template_UDID, pub_msg.domino_udid, ex1.message)
375 logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UDID, pub_msg.domino_udid)
376 logging.error('Unexpected error: %s', sys.exc_info()[0])
378 self.dominoServer.TUID2Publisher[pub_r.template_UDID] = pub_msg.domino_udid
380 # update in memory database
381 self.dominoServer.TUID2Subscribers[pub_r.template_UDID] = list(set(unsuccessful_updates).union(set(file_paths.keys()))) #file_paths.keys()
382 logging.debug('Subscribers: %s for TUID: %s', self.dominoServer.TUID2Subscribers[pub_r.template_UDID], pub_r.template_UDID)
384 newvalue = ','.join(self.dominoServer.TUID2Subscribers[pub_r.template_UDID])
385 c.execute( "REPLACE INTO subscribers VALUES (?,?)", (pub_r.template_UDID,newvalue) )
387 except sqlite3.OperationalError as ex1:
388 logging.error('Could not add new subscribers for TUID %s for Domino Client %s: %s', pub_r.template_UDID, pub_msg.domino_udid, ex1.message)
390 logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UDID, pub_msg.domino_udid)
391 logging.error('Unexpected error: %s', sys.exc_info()[0])
397 #Query from Domino Client is received
400 # - Respond Back with Query Response
401 def d_query(self, qu_msg):
403 qu_r = QueryResponseMessage()
404 qu_r.domino_udid = SERVER_UDID
405 qu_r.seq_no = self.seqno
406 qu_r.responseCode = SUCCESS
407 qu_r.queryResponse = []
409 for i in range(len(qu_msg.queryString)):
410 if qu_msg.queryString[i] == 'list-tuids': # limit the response to TUIDs that belong to this domino client
411 qu_r.queryResponse.extend([j for j in self.dominoServer.TUID2Publisher.keys() if self.dominoServer.TUID2Publisher[j] == qu_msg.domino_udid])
413 self.seqno = self.seqno + 1
419 self.assignedUUIDs = list()
420 self.subscribed_labels = dict()
421 self.subscribed_templateformats = dict()
422 self.registration_record = dict()
423 self.assignedTUIDs = list()
424 self.TUID2Publisher = dict()
425 self.TUID2Subscribers = dict()
426 self.communicationHandler = CommunicationHandler(self)
427 self.processor = Communication.Processor(self.communicationHandler)
428 self.transport = TSocket.TServerSocket(port=DOMINO_SERVER_PORT)
429 self.tfactory = TTransport.TBufferedTransportFactory()
430 self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
431 #Use TThreadedServer or TThreadPoolServer for a multithreaded server
432 #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
433 self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
436 def start_communicationService(self):
437 self.communicationServer.serve()
439 #For now assign the desired UDID
441 #Check if ID is already assigned and in use
442 #If not assigned, assign it
443 #If assigned, offer a new random id
444 def assign_udid(self, udid_desired):
445 new_udid = udid_desired
446 while new_udid in self.assignedUUIDs:
447 new_udid = uuid.uuid4().hex
448 self.assignedUUIDs.append(new_udid)
451 def assign_tuid(self):
452 new_TUID = uuid.uuid4().hex
453 while new_TUID in self.assignedTUIDs:
454 new_TUID = uuid.uuid4().hex
455 self.assignedTUIDs.append(new_TUID)
458 def handle_RPC_timeout(self, RPCmessage):
459 if RPCmessage.messageType == PUSH:
460 logging.debug('RPC Timeout for message type: PUSH')
461 # TBD: handle each RPC timeout separately
464 server = DominoServer()
466 #process input arguments
468 opts, args = getopt.getopt(argv,"hc:l:",["conf=","log="])
469 except getopt.GetoptError:
470 print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>'
472 for opt, arg in opts:
474 print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>'
476 elif opt in ("-c", "--conf"):
478 elif opt in ("-l", "--log"):
481 numeric_level = getattr(logging, loglevel.upper(), None)
483 if not isinstance(numeric_level, int):
484 raise ValueError('Invalid log level: %s' % loglevel)
485 logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
486 except ValueError, ex:
490 #start the database with schemas
491 dbconn = sqlite3.connect(SERVER_DBFILE)
494 c.execute('''CREATE TABLE labels (udid TEXT PRIMARY KEY, label_list TEXT)''')
495 except sqlite3.OperationalError as ex:
496 logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
499 c.execute('''CREATE TABLE ttypes (udid TEXT PRIMARY KEY, ttype_list TEXT)''')
500 except sqlite3.OperationalError as ex:
501 logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
504 c.execute('''CREATE TABLE clients (udid TEXT PRIMARY KEY, ipaddr TEXT, tcpport INTEGER, templatetypes TEXT, seqno INTEGER)''')
505 except sqlite3.OperationalError as ex:
506 logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
509 c.execute('''CREATE TABLE templates (uuid_t TEXT PRIMARY KEY, udid TEXT)''')
510 except sqlite3.OperationalError as ex:
511 logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
514 c.execute('''CREATE TABLE subscribers (tuid TEXT PRIMARY KEY, subscriber_list TEXT)''')
515 except sqlite3.OperationalError as ex:
516 logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
521 logging.debug('Domino Server Starting...')
522 server.start_communicationService()
525 if __name__ == "__main__":