3 #Copyright 2016 Open Platform for NFV Project, Inc. and its contributors
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 import sys, os, glob, random, errno
19 #sys.path.append('gen-py')
20 #sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
21 sys.path.insert(0, glob.glob('./lib')[0])
24 from dominoRPC import Communication
25 from dominoRPC.ttypes import *
26 from dominoRPC.constants import *
28 from thrift import Thrift
29 from thrift.transport import TSocket
30 from thrift.transport import TTransport
31 from thrift.protocol import TBinaryProtocol
32 from thrift.server import TServer
34 from toscaparser.tosca_template import ToscaTemplate
35 #from toscaparser.utils.gettextutils import _
36 #import toscaparser.utils.urlutils
37 from translator.hot.tosca_translator import TOSCATranslator
41 from partitioner import *
42 from util import miscutil
44 #Load configuration parameters
45 from domino_conf import *
48 class CommunicationHandler:
52 def __init__(self, dominoserver):
54 self.dominoServer = dominoserver
55 self.seqno = SERVER_SEQNO;
57 def openconnection(self, ipaddr, tcpport):
60 transport = TSocket.TSocket(ipaddr, tcpport)
61 transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
62 # Add buffering to compensate for slow raw sockets
63 self.transport = TTransport.TBufferedTransport(transport)
65 self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
66 # Create a client to use the protocol encoder
67 self.sender = Communication.Client(self.protocol)
73 def closeconnection(self):
74 self.transport.close()
76 def push_template(self,template,ipaddr,tcpport,TUID):
78 self.openconnection(ipaddr,tcpport)
80 pushm.domino_udid = SERVER_UDID
81 pushm.seq_no = self.seqno
82 pushm.template_type = 'tosca-nfv-v1.0'
83 pushm.template = template
84 pushm.template_UUID = TUID
85 self.seqno = self.seqno + 1
87 push_r = self.sender.d_push(pushm)
88 logging.info('Push Response received from %s' , push_r.domino_udid)
89 self.closeconnection()
90 except (socket.timeout) as tx:
91 self.dominoServer.handle_RPC_timeout(pushm)
94 logging.error('Unexpected error: %s', sys.exc_info()[0])
97 #Heartbeat from Domino Client is received
99 # - Respond Back with a heartbeat
101 def d_heartbeat(self, hb_msg):
102 logging.info('heartbeat received from %s' , hb_msg.domino_udid)
104 hb_r = HeartBeatMessage()
105 hb_r.domino_udid = SERVER_UDID
106 hb_r.seq_no = self.seqno
108 self.seqno = self.seqno + 1
112 #Registration from Domino Client is received
115 # - Respond Back with Registration Response
116 def d_register(self, reg_msg):
118 #Prepare and send Registration Response
119 reg_r = RegisterResponseMessage()
120 logging.info('Registration Request received for UUID %s from IP: %s port: %d', reg_msg.domino_udid_desired, reg_msg.ipaddr, reg_msg.tcpport)
123 reg_r.domino_udid_assigned = self.dominoServer.assign_udid(reg_msg.domino_udid_desired)
124 reg_r.seq_no = self.seqno
125 reg_r.domino_udid = SERVER_UDID
126 #return unconditional success
128 #Define conditions for unsuccessful registration (e.g., unsupported mapping)
129 reg_r.responseCode = SUCCESS
130 #no need to send comments
132 #Logic for a new UDID assignment
134 self.seqno = self.seqno + 1
136 self.dominoServer.registration_record[reg_r.domino_udid_assigned] = reg_msg
138 #commit to the database
139 dbconn = sqlite3.connect(SERVER_DBFILE)
142 newrow = [(reg_r.domino_udid_assigned, reg_msg.ipaddr, reg_msg.tcpport, ','.join(reg_msg.supported_templates), reg_msg.seq_no),]
143 c.executemany('INSERT INTO clients VALUES (?,?,?,?,?)',newrow)
144 except sqlite3.OperationalError as ex:
145 logging.error('Could not add the new registration record into %s for Domino Client %s : %s', SERVER_DBFILE, reg_r.domino_udid_assigned, ex.message)
147 logging.error('Could not add the new registration record into %s for Domino Client %s', SERVER_DBFILE, reg_r.domino_udid_assigned)
148 logging.error('Unexpected error: %s', sys.exc_info()[0])
156 #Subscription from Domino Client is received
158 # - Save the templates & labels
159 # - Respond Back with Subscription Response
160 def d_subscribe(self, sub_msg):
161 logging.info('Subscribe Request received from %s' , sub_msg.domino_udid)
163 if sub_msg.template_op == APPEND:
164 if self.dominoServer.subscribed_templateformats.has_key(sub_msg.domino_udid):
165 self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].update(set(sub_msg.supported_template_types))
167 self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
168 elif sub_msg.template_op == OVERWRITE:
169 self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
170 elif sub_msg.template_op == DELETE:
171 self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].difference_update(set(sub_msg.supported_template_types))
173 # if sub_msg.labels != []:
174 if sub_msg.label_op == APPEND:
175 logging.debug('APPENDING Labels...')
176 if self.dominoServer.subscribed_labels.has_key(sub_msg.domino_udid):
177 self.dominoServer.subscribed_labels[sub_msg.domino_udid].update(set(sub_msg.labels))
179 self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
180 elif sub_msg.label_op == OVERWRITE:
181 logging.debug('OVERWRITING Labels...')
182 self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
183 elif sub_msg.label_op == DELETE:
184 logging.debug('DELETING Labels...')
185 self.dominoServer.subscribed_labels[sub_msg.domino_udid].difference_update(set(sub_msg.labels))
187 logging.debug('Supported Template: %s Supported Labels: %s' , self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] , self.dominoServer.subscribed_labels[sub_msg.domino_udid])
189 #commit to the database
190 dbconn = sqlite3.connect(SERVER_DBFILE)
192 newlabelset = self.dominoServer.subscribed_labels[sub_msg.domino_udid]
194 newvalue=','.join(list(newlabelset))
195 c.execute( "REPLACE INTO labels VALUES (?,?)", (sub_msg.domino_udid,newvalue) )
196 except sqlite3.OperationalError as ex1:
197 logging.error('Could not add the new labels to %s for Domino Client %s : %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
199 logging.error('Could not add the new labels to %s for Domino Client %s', SERVER_DBFILE, sub_msg.domino_udid)
200 logging.error('Unexpected error: %s', sys.exc_info()[0])
202 newttypeset = self.dominoServer.subscribed_templateformats[sub_msg.domino_udid]
204 newvalue=','.join(list(newttypeset))
205 c.execute( "REPLACE INTO ttypes VALUES (?,?)", (sub_msg.domino_udid,newvalue) )
206 except sqlite3.OperationalError as ex1:
207 logging.error('Could not add the new labels to %s for Domino Client %s : %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
209 logging.error('Could not add the new labels to %s for Domino Client %s', SERVER_DBFILE, sub_msg.domino_udid)
210 logging.error('Unexpected error: %s', sys.exc_info()[0])
218 sub_r = SubscribeResponseMessage()
219 sub_r.domino_udid = SERVER_UDID
220 sub_r.seq_no = self.seqno
221 sub_r.responseCode = SUCCESS
222 self.seqno = self.seqno + 1
226 #Template Publication from Domino Client is received
228 # - Parse the template, perform mapping, partition the template
229 # - Launch Push service
230 # - Respond Back with Publication Response
231 def d_publish(self, pub_msg):
232 logging.info('Publish Request received from %s' , pub_msg.domino_udid)
233 #logging.debug(pub_msg.template)
235 # Create response with response code as SUCCESS by default
236 # Response code will be overwritten if partial or full failure occurs
237 pub_r = PublishResponseMessage()
238 pub_r.domino_udid = SERVER_UDID
239 pub_r.seq_no = self.seqno
240 pub_r.responseCode = SUCCESS
241 pub_r.template_UDID = pub_msg.template_UUID
242 self.seqno = self.seqno + 1
244 if (pub_msg.template_UUID is not None) and (self.dominoServer.TUID2Publisher.has_key(pub_msg.template_UUID) == False):
245 logging.debug('TEMPLATE UUID %s does not exist', pub_msg.template_UUID)
246 pub_r.responseCode = FAILED
251 os.makedirs(TOSCADIR)
252 except OSError as exception:
253 if exception.errno == errno.EEXIST:
254 logging.debug('ERRNO %d; %s exists. Creating: %s', exception.errno, TOSCADIR, TOSCADIR+TOSCA_DEFAULT_FNAME)
256 logging.error('IGNORING error occurred in creating %s. Err no: %d', exception.errno)
258 #Risking a race condition if another process is attempting to write to same file
260 miscutil.write_templatefile(TOSCADIR+TOSCA_DEFAULT_FNAME , pub_msg.template)
262 #Some sort of race condition should have occured that prevented the write operation
264 logging.error('FAILED to write the published file: %s', sys.exc_info()[0])
265 pub_r.responseCode = FAILED
268 # Load tosca object from file into memory
270 #tosca = ToscaTemplate( TOSCADIR+TOSCA_DEFAULT_FNAME )
271 tpl = yaml.load(file(TOSCADIR+TOSCA_DEFAULT_FNAME,'r'))
273 logging.error('Tosca Parser error: %s', sys.exc_info()[0])
274 #tosca file could not be read
275 pub_r.responseCode = FAILED
279 node_labels = label.extract_labels( tpl )
280 logging.debug('Node Labels: %s', node_labels)
282 # Map nodes in the template to resource domains
283 site_map = label.map_nodes( self.dominoServer.subscribed_labels , node_labels )
284 logging.debug('Site Maps: %s' , site_map)
286 # Select a site for each VNF
287 node_site = label.select_site( site_map )
288 logging.debug('Selected Sites: %s', node_site)
290 # Create per-domain Tosca files
291 file_paths = partitioner.partition_tosca('./toscafiles/template',node_site,tpl)
292 logging.debug('Per domain file paths: %s', file_paths)
296 # Assign template UUID if no UUID specified
297 # Otherwise update the existing domains subscribed to TUID
298 unsuccessful_updates = []
299 if pub_msg.template_UUID is None:
300 pub_r.template_UDID = self.dominoServer.assign_tuid() #update response message with the newly assigned template UUID
302 logging.debug('TEMPLATE UUID %s exists, verify publisher and update subscribers', pub_msg.template_UUID)
303 if self.dominoServer.TUID2Publisher[pub_msg.template_UUID] != pub_msg.domino_udid: #publisher is not the owner, reject
304 logging.error('FAILED to verify publisher: %s against the publisher on record: %s', pub_msg.domino_udid, self.dominoServer.TUID2Publisher[pub_msg.template_UUID])
305 pub_r.responseCode = FAILED
307 else: #Template exists, we need to find clients that are no longer in the subscription list list
308 TUID_unsubscribed_list = list(set(self.dominoServer.TUID2Subscribers[pub_r.template_UDID]) - set(file_paths.keys()))
309 if len(TUID_unsubscribed_list) > 0:
310 logging.debug('%s no longer host any nodes for TUID %s', TUID_unsubscribed_list, pub_r.template_UDID)
311 # Send empty bodied templates to domains which no longer has any assigned resource
313 for i in range(len(TUID_unsubscribed_list)):
314 domino_client_ip = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].ipaddr
315 domino_client_port = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].tcpport
317 self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UDID)
319 logging.error('Error in pushing template: %s', sys.exc_info()[0])
320 unsuccessful_updates.append(TUID_unsubscribed_list[i])
322 # The following template distribution is not transactional, meaning that some domains
323 # might be successfull receiving their sub-templates while some other might not
324 # The function returns FAILED code to the publisher in such situations, meaning that
325 # publisher must republish to safely orchestrate/manage NS or VNF
327 # Send domain templates to each domain agent/client
328 # FOR NOW: send untranslated but partitioned tosca files to scheduled sites
329 # TBD: read from work-flow
330 for site in file_paths:
331 domino_client_ip = self.dominoServer.registration_record[site].ipaddr
332 domino_client_port = self.dominoServer.registration_record[site].tcpport
334 if 'hot' in self.dominoServer.subscribed_templateformats[site]:
335 tosca = ToscaTemplate(file_paths[site])
336 translator = TOSCATranslator(tosca, {}, False)
337 output = translator.translate()
338 logging.debug('HOT translation: \n %s', output)
339 template_lines = [ output ]
341 template_lines = miscutil.read_templatefile(file_paths[site])
342 self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UDID)
344 logging.error('I/O error(%d): %s' , e.errno, e.strerror)
345 pub_r.responseCode = FAILED
347 logging.error('Error: %s', sys.exc_info()[0])
348 pub_r.responseCode = FAILED
350 # Check if any file is generated for distribution, if not
351 # return FAILED as responseCode, we should also send description for
353 if len(file_paths) == 0:
354 pub_r.responseCode = FAILED
357 dbconn = sqlite3.connect(SERVER_DBFILE)
360 if pub_r.responseCode == SUCCESS:
361 # update in memory database
362 self.dominoServer.TUID2Publisher[pub_r.template_UDID] = pub_msg.domino_udid
364 c.execute( "REPLACE INTO templates VALUES (?,?)", (pub_r.template_UDID,pub_msg.domino_udid) )
366 except sqlite3.OperationalError as ex1:
367 logging.error('Could not add new TUID %s DB for Domino Client %s : %s', pub_r.template_UDID, pub_msg.domino_udid, ex1.message)
369 logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UDID, pub_msg.domino_udid)
370 logging.error('Unexpected error: %s', sys.exc_info()[0])
372 self.dominoServer.TUID2Publisher[pub_r.template_UDID] = pub_msg.domino_udid
374 # update in memory database
375 self.dominoServer.TUID2Subscribers[pub_r.template_UDID] = list(set(unsuccessful_updates).union(set(file_paths.keys()))) #file_paths.keys()
376 logging.debug('Subscribers: %s for TUID: %s', self.dominoServer.TUID2Subscribers[pub_r.template_UDID], pub_r.template_UDID)
378 newvalue = ','.join(self.dominoServer.TUID2Subscribers[pub_r.template_UDID])
379 c.execute( "REPLACE INTO subscribers VALUES (?,?)", (pub_r.template_UDID,newvalue) )
381 except sqlite3.OperationalError as ex1:
382 logging.error('Could not add new subscribers for TUID %s for Domino Client %s: %s', pub_r.template_UDID, pub_msg.domino_udid, ex1.message)
384 logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UDID, pub_msg.domino_udid)
385 logging.error('Unexpected error: %s', sys.exc_info()[0])
391 #Query from Domino Client is received
394 # - Respond Back with Query Response
395 def d_query(self, qu_msg):
397 qu_r = QueryResponseMessage()
398 qu_r.domino_udid = SERVER_UDID
399 qu_r.seq_no = self.seqno
400 qu_r.responseCode = SUCCESS
401 qu_r.queryResponse = []
403 for i in range(len(qu_msg.queryString)):
404 if qu_msg.queryString[i] == 'list-tuids': # limit the response to TUIDs that belong to this domino client
405 qu_r.queryResponse.extend([j for j in self.dominoServer.TUID2Publisher.keys() if self.dominoServer.TUID2Publisher[j] == qu_msg.domino_udid])
407 self.seqno = self.seqno + 1
413 self.assignedUUIDs = list()
414 self.subscribed_labels = dict()
415 self.subscribed_templateformats = dict()
416 self.registration_record = dict()
417 self.assignedTUIDs = list()
418 self.TUID2Publisher = dict()
419 self.TUID2Subscribers = dict()
420 self.communicationHandler = CommunicationHandler(self)
421 self.processor = Communication.Processor(self.communicationHandler)
422 self.transport = TSocket.TServerSocket(port=DOMINO_SERVER_PORT)
423 self.tfactory = TTransport.TBufferedTransportFactory()
424 self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
425 #Use TThreadedServer or TThreadPoolServer for a multithreaded server
426 #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
427 self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
430 def start_communicationService(self):
431 self.communicationServer.serve()
433 #For now assign the desired UDID
435 #Check if ID is already assigned and in use
436 #If not assigned, assign it
437 #If assigned, offer a new random id
438 def assign_udid(self, udid_desired):
439 new_udid = udid_desired
440 while new_udid in self.assignedUUIDs:
441 new_udid = uuid.uuid4().hex
442 self.assignedUUIDs.append(new_udid)
445 def assign_tuid(self):
446 new_TUID = uuid.uuid4().hex
447 while new_TUID in self.assignedTUIDs:
448 new_TUID = uuid.uuid4().hex
449 self.assignedTUIDs.append(new_TUID)
452 def handle_RPC_timeout(self, RPCmessage):
453 if RPCmessage.messageType == PUSH:
454 logging.debug('RPC Timeout for message type: PUSH')
455 # TBD: handle each RPC timeout separately
458 server = DominoServer()
460 #process input arguments
462 opts, args = getopt.getopt(argv,"hc:l:",["conf=","log="])
463 except getopt.GetoptError:
464 print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>'
466 for opt, arg in opts:
468 print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>'
470 elif opt in ("-c", "--conf"):
472 elif opt in ("-l", "--log"):
475 numeric_level = getattr(logging, loglevel.upper(), None)
477 if not isinstance(numeric_level, int):
478 raise ValueError('Invalid log level: %s' % loglevel)
479 logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
480 except ValueError, ex:
484 #start the database with schemas
485 dbconn = sqlite3.connect(SERVER_DBFILE)
488 c.execute('''CREATE TABLE labels (udid TEXT PRIMARY KEY, label_list TEXT)''')
489 except sqlite3.OperationalError as ex:
490 logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
493 c.execute('''CREATE TABLE ttypes (udid TEXT PRIMARY KEY, ttype_list TEXT)''')
494 except sqlite3.OperationalError as ex:
495 logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
498 c.execute('''CREATE TABLE clients (udid TEXT PRIMARY KEY, ipaddr TEXT, tcpport INTEGER, templatetypes TEXT, seqno INTEGER)''')
499 except sqlite3.OperationalError as ex:
500 logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
503 c.execute('''CREATE TABLE templates (uuid_t TEXT PRIMARY KEY, udid TEXT)''')
504 except sqlite3.OperationalError as ex:
505 logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
508 c.execute('''CREATE TABLE subscribers (tuid TEXT PRIMARY KEY, subscriber_list TEXT)''')
509 except sqlite3.OperationalError as ex:
510 logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
515 logging.debug('Domino Server Starting...')
516 server.start_communicationService()
519 if __name__ == "__main__":