3 #Copyright 2016 Open Platform for NFV Project, Inc. and its contributors
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 import sys, os, glob, random, errno
18 #sys.path.append('gen-py')
19 #sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
20 sys.path.insert(0, glob.glob('./lib')[0])
23 from dominoRPC import Communication
24 from dominoRPC.ttypes import *
25 from dominoRPC.constants import *
27 from thrift import Thrift
28 from thrift.transport import TSocket
29 from thrift.transport import TTransport
30 from thrift.protocol import TBinaryProtocol
31 from thrift.server import TServer
33 from toscaparser.tosca_template import ToscaTemplate
34 #from toscaparser.utils.gettextutils import _
35 #import toscaparser.utils.urlutils
36 from translator.hot.tosca_translator import TOSCATranslator
40 from partitioner import *
41 from util import miscutil
43 #Load configuration parameters
44 from domino_conf import *
47 class CommunicationHandler:
51 def __init__(self, dominoserver):
53 self.dominoServer = dominoserver
54 self.seqno = SERVER_SEQNO;
56 def openconnection(self, ipaddr, tcpport):
59 transport = TSocket.TSocket(ipaddr, tcpport)
60 transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
61 # Add buffering to compensate for slow raw sockets
62 self.transport = TTransport.TBufferedTransport(transport)
64 self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
65 # Create a client to use the protocol encoder
66 self.sender = Communication.Client(self.protocol)
68 except Thrift.TException, tx:
69 logging.error('%s' , tx.message)
73 def closeconnection(self):
74 self.transport.close()
76 def push_template(self,template,ipaddr,tcpport):
77 self.openconnection(ipaddr,tcpport)
79 pushm.domino_udid = SERVER_UDID
80 pushm.seq_no = self.seqno
81 pushm.template_type = 'tosca-nfv-v1.0'
82 pushm.template = template
84 push_r = self.sender.d_push(pushm)
85 logging.info('Push Response received from %d' , push_r.domino_udid)
86 except (Thrift.TException, TSocket.TTransportException) as tx:
87 logging.error('%s' , tx.message)
88 except (socket.timeout) as tx:
89 self.dominoServer.handle_RPC_timeout(pushm)
91 logging.error('Unexpected error: %s', sys.exc_info()[0])
93 self.seqno = self.seqno + 1
95 self.closeconnection()
97 #Heartbeat from Domino Client is received
99 # - Respond Back with a heartbeat
101 def d_heartbeat(self, hb_msg):
102 logging.info('heartbeat received from %d' , hb_msg.domino_udid)
104 hb_r = HeartBeatMessage()
105 hb_r.domino_udid = SERVER_UDID
106 hb_r.seq_no = self.seqno
108 self.seqno = self.seqno + 1
112 #Registration from Domino Client is received
115 # - Respond Back with Registration Response
116 def d_register(self, reg_msg):
118 #Prepare and send Registration Response
119 reg_r = RegisterResponseMessage()
120 logging.info('Registration Request received for UDID %d from IP: %s port: %d', reg_msg.domino_udid_desired, reg_msg.ipaddr, reg_msg.tcpport)
123 reg_r.domino_udid_assigned = self.dominoServer.assign_udid(reg_msg.domino_udid_desired)
124 reg_r.seq_no = self.seqno
125 reg_r.domino_udid = SERVER_UDID
126 #return unconditional success
128 #Define conditions for unsuccessful registration (e.g., unsupported mapping)
129 reg_r.responseCode = SUCCESS
130 #no need to send comments
132 #Logic for a new UDID assignment
134 self.seqno = self.seqno + 1
136 self.dominoServer.registration_record[reg_r.domino_udid_assigned] = reg_msg
138 #commit to the database
139 dbconn = sqlite3.connect(SERVER_DBFILE)
142 newrow = [(reg_r.domino_udid_assigned, reg_msg.ipaddr, reg_msg.tcpport, ','.join(reg_msg.supported_templates), reg_msg.seq_no),]
143 c.executemany('INSERT INTO clients VALUES (?,?,?,?,?)',newrow)
144 except sqlite3.OperationalError as ex:
145 logging.error('Could not add the new registration record into %s for Domino Client %d : %s', SERVER_DBFILE, reg_r.domino_udid_assigned, ex.message)
147 logging.error('Could not add the new registration record into %s for Domino Client %d', SERVER_DBFILE, reg_r.domino_udid_assigned)
148 logging.error('Unexpected error: %s', sys.exc_info()[0])
156 #Subscription from Domino Client is received
158 # - Save the templates & labels
159 # - Respond Back with Subscription Response
160 def d_subscribe(self, sub_msg):
161 logging.info('Subscribe Request received from %d' , sub_msg.domino_udid)
163 if sub_msg.template_op == APPEND:
164 if self.dominoServer.subscribed_templateformats.has_key(sub_msg.domino_udid):
165 self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].update(set(sub_msg.supported_template_types))
167 self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
168 elif sub_msg.template_op == OVERWRITE:
169 self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
170 elif sub_msg.template_op == DELETE:
171 self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].difference_update(set(sub_msg.supported_template_types))
173 # if sub_msg.labels != []:
174 if sub_msg.label_op == APPEND:
175 logging.debug('APPENDING Labels...')
176 if self.dominoServer.subscribed_labels.has_key(sub_msg.domino_udid):
177 self.dominoServer.subscribed_labels[sub_msg.domino_udid].update(set(sub_msg.labels))
179 self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
180 elif sub_msg.label_op == OVERWRITE:
181 logging.debug('OVERWRITING Labels...')
182 self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
183 elif sub_msg.label_op == DELETE:
184 logging.debug('DELETING Labels...')
185 self.dominoServer.subscribed_labels[sub_msg.domino_udid].difference_update(set(sub_msg.labels))
187 logging.debug('Supported Template: %s Supported Labels: %s' , self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] , self.dominoServer.subscribed_labels[sub_msg.domino_udid])
189 #commit to the database
190 dbconn = sqlite3.connect(SERVER_DBFILE)
192 newlabelset = self.dominoServer.subscribed_labels[sub_msg.domino_udid]
194 c.execute("REPLACE INTO labels (udid, label_list) VALUES ({udid}, '{newvalue}')".\
195 format(udid=sub_msg.domino_udid, newvalue=','.join(list(newlabelset)) ))
196 except sqlite3.OperationalError as ex1:
197 logging.error('Could not add the new labels to %s for Domino Client %d : %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
199 logging.error('Could not add the new labels to %s for Domino Client %d', SERVER_DBFILE, sub_msg.domino_udid)
200 logging.error('Unexpected error: %s', sys.exc_info()[0])
202 newttypeset = self.dominoServer.subscribed_templateformats[sub_msg.domino_udid]
204 c.execute("REPLACE INTO ttypes (udid, ttype_list) VALUES ({udid}, '{newvalue}')".\
205 format(udid=sub_msg.domino_udid, newvalue=','.join(list(newttypeset)) ))
206 except sqlite3.OperationalError as ex1:
207 logging.error('Could not add the new labels to %s for Domino Client %d : %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
209 logging.error('Could not add the new labels to %s for Domino Client %d', SERVER_DBFILE, sub_msg.domino_udid)
210 logging.error('Unexpected error: %s', sys.exc_info()[0])
218 sub_r = SubscribeResponseMessage()
219 sub_r.domino_udid = SERVER_UDID
220 sub_r.seq_no = self.seqno
221 sub_r.responseCode = SUCCESS
222 self.seqno = self.seqno + 1
226 #Template Publication from Domino Client is received
228 # - Parse the template, perform mapping, partition the template
229 # - Launch Push service
230 # - Respond Back with Publication Response
231 def d_publish(self, pub_msg):
232 logging.info('Publish Request received from %d' , pub_msg.domino_udid)
233 logging.debug(pub_msg.template)
237 os.makedirs(TOSCADIR)
238 except OSError as exception:
239 if exception.errno == errno.EEXIST:
240 logging.debug('ERRNO %d; %s exists. Creating: %s', exception.errno, TOSCADIR, TOSCADIR+TOSCA_DEFAULT_FNAME)
242 logging.error('IGNORING error occurred in creating %s. Err no: %d', exception.errno)
244 #Risking a race condition if another process is attempting to write to same file
246 miscutil.write_templatefile(TOSCADIR+TOSCA_DEFAULT_FNAME , pub_msg.template)
248 #Some sort of race condition should have occured that prevented the write operation
250 logging.error('FAILED to write the published file: %s', sys.exc_info()[0])
251 pub_r = PublishResponseMessage()
252 pub_r.domino_udid = SERVER_UDID
253 pub_r.seq_no = self.seqno
254 pub_r.responseCode = FAILED
255 self.seqno = self.seqno + 1
258 # Load tosca object from file into memory
260 #tosca = ToscaTemplate( TOSCADIR+TOSCA_DEFAULT_FNAME )
261 tpl = yaml.load(file(TOSCADIR+TOSCA_DEFAULT_FNAME,'r'))
263 logging.error('Tosca Parser error: %s', sys.exc_info()[0])
264 #tosca file could not be read
265 pub_r = PublishResponseMessage()
266 pub_r.domino_udid = SERVER_UDID
267 pub_r.seq_no = self.seqno
268 pub_r.responseCode = FAILED
269 self.seqno = self.seqno + 1
273 node_labels = label.extract_labels( tpl )
274 logging.debug('Node Labels: %s', node_labels)
276 # Map nodes in the template to resource domains
277 site_map = label.map_nodes( self.dominoServer.subscribed_labels , node_labels )
278 logging.debug('Site Maps: %s' , site_map)
280 # Select a site for each VNF
281 node_site = label.select_site( site_map )
282 logging.debug('Selected Sites: %s', node_site)
284 # Create per-domain Tosca files
285 file_paths = partitioner.partition_tosca('./toscafiles/template',node_site,tpl)
286 logging.debug('Per domain file paths: %s', file_paths)
290 # Create response with response code as SUCCESS by default
291 # Response code will be overwrittent if partial or full failure occurs
292 pub_r = PublishResponseMessage()
293 pub_r.domino_udid = SERVER_UDID
294 pub_r.seq_no = self.seqno
295 pub_r.responseCode = SUCCESS
296 self.seqno = self.seqno + 1
298 # Send domain templates to each domain agent/client
299 # FOR NOW: send untranslated but partitioned tosca files to scheduled sites
300 # TBD: read from work-flow
301 for site in file_paths:
302 domino_client_ip = self.dominoServer.registration_record[site].ipaddr
303 domino_client_port = self.dominoServer.registration_record[site].tcpport
305 if 'hot' in self.dominoServer.subscribed_templateformats[site]:
306 tosca = ToscaTemplate(file_paths[site])
307 translator = TOSCATranslator(tosca, {}, False)
308 output = translator.translate()
309 logging.debug('HOT translation: \n %s', output)
310 template_lines = [ output ]
312 template_lines = miscutil.read_templatefile(file_paths[site])
313 self.push_template(template_lines, domino_client_ip, domino_client_port)
315 logging.error('I/O error(%d): %s' , e.errno, e.strerror)
316 pub_r.responseCode = FAILED
318 logging.error('Error: %s', sys.exc_info()[0])
319 pub_r.responseCode = FAILED
321 # Check if any file is generated for distribution, if not
322 # return FAILED as responseCode, we should also send description for
324 if len(file_paths) == 0:
325 pub_r.responseCode = FAILED
329 #Query from Domino Client is received
332 # - Respond Back with Query Response
333 def d_query(self, qu_msg):
335 qu_r = QueryResponseMessage()
342 self.assignedUUIDs = list()
343 self.subscribed_labels = dict()
344 self.subscribed_templateformats = dict()
345 self.registration_record = dict()
346 self.communicationHandler = CommunicationHandler(self)
347 self.processor = Communication.Processor(self.communicationHandler)
348 self.transport = TSocket.TServerSocket(port=DOMINO_SERVER_PORT)
349 self.tfactory = TTransport.TBufferedTransportFactory()
350 self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
351 #Use TThreadedServer or TThreadPoolServer for a multithreaded server
352 #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
353 self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
356 def start_communicationService(self):
357 self.communicationServer.serve()
359 #For now assign the desired UDID
361 #Check if ID is already assigned and in use
362 #If not assigned, assign it
363 #If assigned, offer a new random id
364 def assign_udid(self, udid_desired):
365 if udid_desired in self.assignedUUIDs:
366 new_udid = random.getrandbits(63)
367 while new_udid in self.assignedUUIDs:
368 new_udid = random.getrandbits(63)
370 self.assignedUUIDs.append(new_udid)
373 self.assignedUUIDs.append(udid_desired)
376 def handle_RPC_timeout(self, RPCmessage):
377 if RPCmessage.messageType == PUSH:
378 logging.debug('RPC Timeout for message type: PUSH')
379 # TBD: handle each RPC timeout separately
382 server = DominoServer()
384 #process input arguments
386 opts, args = getopt.getopt(argv,"hc:l:",["conf=","log="])
387 except getopt.GetoptError:
388 print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>'
390 for opt, arg in opts:
392 print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>'
394 elif opt in ("-c", "--conf"):
396 elif opt in ("-l", "--log"):
399 numeric_level = getattr(logging, loglevel.upper(), None)
401 if not isinstance(numeric_level, int):
402 raise ValueError('Invalid log level: %s' % loglevel)
403 logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
404 except ValueError, ex:
408 #start the database with schemas
409 dbconn = sqlite3.connect(SERVER_DBFILE)
412 c.execute('''CREATE TABLE labels (udid INTEGER PRIMARY KEY, label_list TEXT)''')
413 except sqlite3.OperationalError as ex:
414 logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
417 c.execute('''CREATE TABLE ttypes (udid INTEGER PRIMARY KEY, ttype_list TEXT)''')
418 except sqlite3.OperationalError as ex:
419 logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
422 c.execute('''CREATE TABLE clients (udid INTEGER PRIMARY KEY, ipaddr TEXT, tcpport INTEGER, templatetypes TEXT, seqno INTEGER)''')
423 except sqlite3.OperationalError as ex:
424 logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
429 logging.debug('Domino Server Starting...')
430 server.start_communicationService()
433 if __name__ == "__main__":