Auto Generated INFO.yaml file
[domino.git] / DominoServer.py
1 #!/usr/bin/env python
2
3 #Copyright 2016 Open Platform for NFV Project, Inc. and its contributors
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #       http://www.apache.org/licenses/LICENSE-2.0
8 #   Unless required by applicable law or agreed to in writing, software
9 #   distributed under the License is distributed on an "AS IS" BASIS,
10 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 #   See the License for the specific language governing permissions and
12 #   limitations under the License.
13
14 import sys, os, glob, random, errno
15 import getopt, socket
16 import logging, json
17 import sqlite3, yaml
18 import uuid
19
20 sys.path.insert(0, glob.glob('./lib')[0])
21
22
23 from dominoRPC import Communication
24 from dominoRPC.ttypes import *
25 from dominoRPC.constants import *
26
27 from thrift import Thrift
28 from thrift.transport import TSocket
29 from thrift.transport import TTransport
30 from thrift.protocol import TBinaryProtocol
31 from thrift.server import TServer
32
33 from toscaparser.tosca_template import ToscaTemplate
34 #from toscaparser.utils.gettextutils import _
35 #import toscaparser.utils.urlutils
36 from translator.hot.tosca_translator import TOSCATranslator
37
38
39 from mapper import *
40 from partitioner import *
41 from util import miscutil
42
43 #Load configuration parameters
44 from domino_conf import *
45
46
47 class CommunicationHandler:
48   def __init__(self):
49     self.log = {}
50
51   def __init__(self, dominoserver):
52     self.log = {}
53     self.dominoServer = dominoserver
54     self.seqno = SERVER_SEQNO;
55    
56   def openconnection(self, ipaddr, tcpport):
57     try:
58       # Make socket
59       transport = TSocket.TSocket(ipaddr, tcpport)
60       transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
61       # Add buffering to compensate for slow raw sockets
62       self.transport = TTransport.TBufferedTransport(transport)
63       # Wrap in a protocol
64       self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
65       # Create a client to use the protocol encoder
66       self.sender = Communication.Client(self.protocol)
67       self.transport.open()
68     except:
69       raise
70
71
72   def closeconnection(self):
73     self.transport.close()
74
75   def push_template(self,template,ipaddr,tcpport,TUID):
76     try:
77       self.openconnection(ipaddr,tcpport)
78       pushm = PushMessage()
79       pushm.domino_udid = SERVER_UDID 
80       pushm.seq_no = self.seqno
81       pushm.template_type = 'tosca-nfv-v1.0'
82       pushm.template = template
83       pushm.template_UUID = TUID
84       self.seqno = self.seqno + 1
85
86       push_r = self.sender.d_push(pushm)  
87       logging.info('Push Response received from %s' , push_r.domino_udid)
88       self.closeconnection()
89     except (socket.timeout) as tx:
90       self.dominoServer.handle_RPC_timeout(pushm)
91       raise tx
92     except:       
93       logging.error('Unexpected error: %s', sys.exc_info()[0])
94       raise
95  
96   #Heartbeat from Domino Client is received
97   #Actions:
98   #     - Respond Back with a heartbeat
99
100   def d_heartbeat(self, hb_msg):
101     logging.info('heartbeat received from %s' , hb_msg.domino_udid)
102
103     hb_r = HeartBeatMessage()
104     hb_r.domino_udid = SERVER_UDID
105     hb_r.seq_no = self.seqno
106
107     self.seqno = self.seqno + 1 
108
109     return hb_r
110
111   #Registration from Domino Client is received
112   #Actions:
113   #
114   #       - Respond Back with Registration Response
115   def d_register(self, reg_msg):
116
117     #Prepare and send Registration Response
118     reg_r = RegisterResponseMessage()
119     logging.info('Registration Request received for UUID %s from IP: %s port: %d', reg_msg.domino_udid_desired, reg_msg.ipaddr, reg_msg.tcpport)
120
121    
122     reg_r.domino_udid_assigned = self.dominoServer.assign_udid(reg_msg.domino_udid_desired)
123     reg_r.seq_no = self.seqno
124     reg_r.domino_udid = SERVER_UDID
125     #return unconditional success 
126     #To be implemented:
127     #Define conditions for unsuccessful registration (e.g., unsupported mapping)
128     reg_r.responseCode = SUCCESS 
129     #no need to send comments
130     #To be implemented:
131     #Logic for a new UDID assignment
132  
133     self.seqno = self.seqno + 1
134
135     self.dominoServer.registration_record[reg_r.domino_udid_assigned] = reg_msg    
136
137     #commit to the database
138     dbconn = sqlite3.connect(SERVER_DBFILE)
139     c = dbconn.cursor()
140     try:
141       newrow = [(reg_r.domino_udid_assigned, reg_msg.ipaddr, reg_msg.tcpport, ','.join(reg_msg.supported_templates), reg_msg.seq_no),]
142       c.executemany('INSERT INTO clients VALUES (?,?,?,?,?)',newrow)
143     except sqlite3.OperationalError as ex:
144       logging.error('Could not add the new registration record into %s for Domino Client %s :  %s', SERVER_DBFILE, reg_r.domino_udid_assigned, ex.message)
145     except:
146       logging.error('Could not add the new registration record into %s for Domino Client %s', SERVER_DBFILE, reg_r.domino_udid_assigned)
147       logging.error('Unexpected error: %s', sys.exc_info()[0])
148  
149     dbconn.commit()
150     dbconn.close()
151
152     return reg_r
153
154
155   #Subscription from Domino Client is received
156   #Actions:
157   #       - Save the templates  & labels
158   #       - Respond Back with Subscription Response
159   def d_subscribe(self, sub_msg):
160     logging.info('Subscribe Request received from %s' , sub_msg.domino_udid)
161
162     if sub_msg.template_op == APPEND:
163       if self.dominoServer.subscribed_templateformats.has_key(sub_msg.domino_udid):
164         self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].update(set(sub_msg.supported_template_types))
165       else:
166         self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
167     elif sub_msg.template_op == OVERWRITE:
168       self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
169     elif sub_msg.template_op == DELETE:
170       self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].difference_update(set(sub_msg.supported_template_types))
171
172 #    if sub_msg.labels != []:
173     if sub_msg.label_op == APPEND:
174       logging.debug('APPENDING Labels...')
175       if self.dominoServer.subscribed_labels.has_key(sub_msg.domino_udid):
176         self.dominoServer.subscribed_labels[sub_msg.domino_udid].update(set(sub_msg.labels))
177       else:
178         self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
179     elif sub_msg.label_op == OVERWRITE:
180       logging.debug('OVERWRITING Labels...')
181       self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
182     elif sub_msg.label_op == DELETE:
183       logging.debug('DELETING Labels...')
184       self.dominoServer.subscribed_labels[sub_msg.domino_udid].difference_update(set(sub_msg.labels))
185
186     logging.debug('Supported Template: %s Supported Labels: %s' , self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] , self.dominoServer.subscribed_labels[sub_msg.domino_udid])
187
188     #commit to the database
189     dbconn = sqlite3.connect(SERVER_DBFILE)
190     c = dbconn.cursor()
191     newlabelset = self.dominoServer.subscribed_labels[sub_msg.domino_udid]
192     try:
193       newvalue=','.join(list(newlabelset))
194       c.execute( "REPLACE INTO labels VALUES (?,?)", (sub_msg.domino_udid,newvalue) )
195     except sqlite3.OperationalError as ex1:
196       logging.error('Could not add the new labels to %s for Domino Client %s :  %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
197     except:
198       logging.error('Could not add the new labels to %s for Domino Client %s', SERVER_DBFILE, sub_msg.domino_udid)
199       logging.error('Unexpected error: %s', sys.exc_info()[0])
200
201     newttypeset = self.dominoServer.subscribed_templateformats[sub_msg.domino_udid]
202     try:
203       newvalue=','.join(list(newttypeset))
204       c.execute( "REPLACE INTO ttypes VALUES (?,?)", (sub_msg.domino_udid,newvalue) )
205     except sqlite3.OperationalError as ex1:
206       logging.error('Could not add the new labels to %s for Domino Client %s :  %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
207     except:
208       logging.error('Could not add the new labels to %s for Domino Client %s', SERVER_DBFILE, sub_msg.domino_udid)
209       logging.error('Unexpected error: %s', sys.exc_info()[0])
210
211
212     dbconn.commit()
213     dbconn.close()
214
215  
216     #Fill in the details
217     sub_r = SubscribeResponseMessage()
218     sub_r.domino_udid = SERVER_UDID
219     sub_r.seq_no = self.seqno
220     sub_r.responseCode = SUCCESS
221     self.seqno = self.seqno + 1
222
223     return sub_r
224
225   #Template Publication from Domino Client is received
226   #Actions:
227   #       - Parse the template, perform mapping, partition the template
228   #       - Launch Push service
229   #       - Respond Back with Publication Response    
230   def d_publish(self, pub_msg):
231     logging.info('Publish Request received from %s' , pub_msg.domino_udid)
232     #logging.debug(pub_msg.template)
233
234     # Create response with response code as SUCCESS by default
235     # Response code will be overwritten if partial or full failure occurs
236     pub_r = PublishResponseMessage()
237     pub_r.domino_udid = SERVER_UDID
238     pub_r.seq_no = self.seqno
239     pub_r.responseCode = SUCCESS
240     pub_r.template_UUID = pub_msg.template_UUID
241     self.seqno = self.seqno + 1
242
243     if (pub_msg.template_UUID is not None) and (self.dominoServer.TUID2Publisher.has_key(pub_msg.template_UUID) == False):
244       logging.debug('TEMPLATE UUID %s does not exist', pub_msg.template_UUID)
245       pub_r.responseCode = FAILED
246       return pub_r
247       
248     # Save as file
249     try:
250       os.makedirs(TOSCADIR)
251     except OSError as exception:
252       if exception.errno == errno.EEXIST:
253         logging.debug('ERRNO %d; %s exists. Creating: %s', exception.errno, TOSCADIR,  TOSCADIR+TOSCA_DEFAULT_FNAME)
254       else:
255         logging.error('IGNORING error occurred in creating %s. Err no: %d', exception.errno)
256
257     #Risking a race condition if another process is attempting to write to same file
258     try:
259       miscutil.write_templatefile(TOSCADIR+TOSCA_DEFAULT_FNAME , pub_msg.template)
260     except:
261       #Some sort of race condition should have occured that prevented the write operation
262       #treat as failure
263       logging.error('FAILED to write the published file: %s', sys.exc_info()[0])
264       pub_r.responseCode = FAILED
265       return pub_r
266     
267     # Load tosca object from file into memory
268     try:
269       #tosca = ToscaTemplate( TOSCADIR+TOSCA_DEFAULT_FNAME )
270       tpl = yaml.load(file(TOSCADIR+TOSCA_DEFAULT_FNAME,'r'))
271     except:
272       logging.error('Tosca Parser error: %s', sys.exc_info()[0])
273       #tosca file could not be read
274       pub_r.responseCode = FAILED
275       return pub_r 
276
277     # Extract Labels
278     node_labels = label.extract_labels( tpl )
279     logging.debug('Node Labels: %s', node_labels)
280
281     # Map nodes in the template to resource domains
282     site_map = label.map_nodes( self.dominoServer.subscribed_labels , node_labels )
283     logging.debug('Site Maps: %s' , site_map)
284
285     # Select a site for each VNF
286     node_site = label.select_site( site_map ) 
287     logging.debug('Selected Sites: %s', node_site)
288
289     # Create per-site Tosca files
290     tpl_site = {}
291     file_paths = partitioner.partition_tosca('./toscafiles/template',node_site,tpl,tpl_site)
292     logging.debug('Per domain file paths: %s', file_paths)
293     logging.debug('Per domain topologies: %s', tpl_site)
294   
295     # Detect boundary links
296     boundary_VLs, VL_sites = partitioner.return_boundarylinks(tpl_site)
297     logging.debug('Boundary VLs: %s', boundary_VLs)
298     logging.debug('VL sites: %s', VL_sites)
299
300     # Create work-flow
301
302     # Assign template UUID if no UUID specified
303     # Otherwise update the existing domains subscribed to TUID
304     unsuccessful_updates = []
305     if pub_msg.template_UUID is None:
306       pub_r.template_UUID = self.dominoServer.assign_tuid() #update response message with the newly assigned template UUID
307     else:
308       logging.debug('TEMPLATE UUID %s exists, verify publisher and update subscribers', pub_msg.template_UUID)
309       if self.dominoServer.TUID2Publisher[pub_msg.template_UUID] != pub_msg.domino_udid: #publisher is not the owner, reject
310         logging.error('FAILED to verify publisher: %s against the publisher on record: %s', pub_msg.domino_udid, self.dominoServer.TUID2Publisher[pub_msg.template_UUID])
311         pub_r.responseCode = FAILED
312         return pub_r  
313       else: #Template exists, we need to find clients that are no longer in the subscription list list
314         TUID_unsubscribed_list = list(set(self.dominoServer.TUID2Subscribers[pub_r.template_UUID]) - set(file_paths.keys()))
315         if len(TUID_unsubscribed_list) > 0:
316           logging.debug('%s no longer host any nodes for TUID %s', TUID_unsubscribed_list, pub_r.template_UUID)
317         # Send empty bodied templates to domains which no longer has any assigned resource
318         template_lines = []
319         for i in range(len(TUID_unsubscribed_list)):
320           domino_client_ip = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].ipaddr
321           domino_client_port = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].tcpport  
322           try:
323             self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UUID)        
324           except:       
325             logging.error('Error in pushing template: %s', sys.exc_info()[0]) 
326             unsuccessful_updates.append(TUID_unsubscribed_list[i])
327
328     # The following template distribution is not transactional, meaning that some domains
329     # might be successfull receiving their sub-templates while some other might not
330     # The function returns FAILED code to the publisher in such situations, meaning that
331     # publisher must republish to safely orchestrate/manage NS or VNF
332
333     # Send domain templates to each domain agent/client 
334     # FOR NOW: send untranslated but partitioned tosca files to scheduled sites
335     # TBD: read from work-flow
336     domainInfo = []
337     for site in file_paths:
338       domino_client_ip = self.dominoServer.registration_record[site].ipaddr
339       domino_client_port = self.dominoServer.registration_record[site].tcpport
340       domainInfo.append(DomainInfo(ipaddr=domino_client_ip,tcpport=domino_client_port))
341       try:
342         if 'hot' in self.dominoServer.subscribed_templateformats[site]:
343           tosca = ToscaTemplate(file_paths[site])
344           translator = TOSCATranslator(tosca, {}, False)
345           output = translator.translate()
346           logging.debug('HOT translation: \n %s', output)
347           template_lines = [ output ]
348         else: 
349           template_lines = miscutil.read_templatefile(file_paths[site]) 
350         self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UUID)
351       except IOError as e:
352         logging.error('I/O error(%d): %s' , e.errno, e.strerror)
353         pub_r.responseCode = FAILED
354       except:
355         logging.error('Error: %s', sys.exc_info()[0])
356         pub_r.responseCode = FAILED
357
358     # Check if any file is generated for distribution, if not
359     # return FAILED as responseCode, we should also send description for
360     # reason
361     if len(file_paths) == 0:
362       pub_r.responseCode = FAILED
363
364
365     dbconn = sqlite3.connect(SERVER_DBFILE)
366     c = dbconn.cursor()
367
368     if pub_r.responseCode == SUCCESS:
369       # send domain information only if all domains have received the domain templates
370       pub_r.domainInfo = domainInfo
371       # update in memory database
372       self.dominoServer.TUID2Publisher[pub_r.template_UUID] = pub_msg.domino_udid
373       try:
374         c.execute( "REPLACE INTO templates VALUES (?,?)", (pub_r.template_UUID,pub_msg.domino_udid) )
375         dbconn.commit()
376       except sqlite3.OperationalError as ex1:
377         logging.error('Could not add new TUID %s  DB for Domino Client %s :  %s', pub_r.template_UUID, pub_msg.domino_udid, ex1.message)
378       except:
379         logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UUID, pub_msg.domino_udid)
380         logging.error('Unexpected error: %s', sys.exc_info()[0])
381       else:
382         self.dominoServer.TUID2Publisher[pub_r.template_UUID] = pub_msg.domino_udid
383
384     # update in memory database
385     self.dominoServer.TUID2Subscribers[pub_r.template_UUID] = list(set(unsuccessful_updates).union(set(file_paths.keys()))) #file_paths.keys()
386     logging.debug('Subscribers: %s for TUID: %s', self.dominoServer.TUID2Subscribers[pub_r.template_UUID], pub_r.template_UUID)
387     try:
388       newvalue = ','.join(self.dominoServer.TUID2Subscribers[pub_r.template_UUID])
389       c.execute( "REPLACE INTO subscribers VALUES (?,?)", (pub_r.template_UUID,newvalue) )
390       dbconn.commit()
391     except sqlite3.OperationalError as ex1:
392       logging.error('Could not add new subscribers for TUID %s for Domino Client %s:  %s', pub_r.template_UUID, pub_msg.domino_udid, ex1.message)
393     except:
394       logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UUID, pub_msg.domino_udid)
395       logging.error('Unexpected error: %s', sys.exc_info()[0])
396
397     dbconn.close()
398
399     return pub_r
400     
401   #Query from Domino Client is received
402   #Actions:
403   #
404   #       - Respond Back with Query Response
405   def d_query(self, qu_msg):
406     #Fill in the details
407     qu_r = QueryResponseMessage()
408     qu_r.domino_udid = SERVER_UDID
409     qu_r.seq_no = self.seqno
410     qu_r.responseCode = SUCCESS
411     qu_r.queryResponse = []
412     
413     for i in range(len(qu_msg.queryString)):
414       if qu_msg.queryString[i] == 'list-tuids': # limit the response to TUIDs that belong to this domino client
415          qu_r.queryResponse.extend([j for j in self.dominoServer.TUID2Publisher.keys() if self.dominoServer.TUID2Publisher[j] == qu_msg.domino_udid])
416
417     self.seqno = self.seqno + 1
418     return qu_r
419
420
421 class DominoServer:
422    def __init__(self):
423      self.assignedUUIDs = list()
424      self.subscribed_labels = dict()
425      self.subscribed_templateformats = dict()
426      self.registration_record = dict() 
427      self.assignedTUIDs = list()
428      self.TUID2Publisher = dict()
429      self.TUID2Subscribers = dict()
430      self.communicationHandler = CommunicationHandler(self)
431      self.processor = Communication.Processor(self.communicationHandler)
432      self.transport = TSocket.TServerSocket(port=DOMINO_SERVER_PORT)
433      self.tfactory = TTransport.TBufferedTransportFactory()
434      self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
435      #Use TThreadedServer or TThreadPoolServer for a multithreaded server
436      #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
437      self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
438
439
440    def start_communicationService(self):
441      self.communicationServer.serve()
442
443    #For now assign the desired UDID
444    #To be implemented:
445    #Check if ID is already assigned and in use
446    #If not assigned, assign it
447    #If assigned, offer a new random id
448    def assign_udid(self, udid_desired):
449      new_udid = udid_desired 
450      while new_udid in self.assignedUUIDs:
451        new_udid = uuid.uuid4().hex  
452      self.assignedUUIDs.append(new_udid)
453      return new_udid
454
455    def assign_tuid(self):
456      new_TUID = uuid.uuid4().hex
457      while new_TUID in self.assignedTUIDs:
458        new_TUID = uuid.uuid4().hex
459      self.assignedTUIDs.append(new_TUID)  
460      return new_TUID
461
462    def handle_RPC_timeout(self, RPCmessage):
463      if RPCmessage.messageType == PUSH:
464       logging.debug('RPC Timeout for message type: PUSH')
465       # TBD: handle each RPC timeout separately
466
467 def main():
468   server = DominoServer()
469   loglevel = LOGLEVEL
470   #process input arguments
471   try:
472     opts, args = getopt.getopt(sys.argv[1:],"hc:l:",["conf=","log="])
473   except getopt.GetoptError:
474     print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>'
475     sys.exit(2)
476   for opt, arg in opts:
477       if opt == '-h':
478          print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>'
479          sys.exit()
480       elif opt in ("-c", "--conf"):
481          configfile = arg
482       elif opt in ("-l", "--log"):
483          loglevel= arg
484   #Set logging level
485   numeric_level = getattr(logging, loglevel.upper(), None)
486   try:
487     if not isinstance(numeric_level, int):
488       raise ValueError('Invalid log level: %s' % loglevel)
489     logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
490   except ValueError, ex:
491     print ex.message
492     sys.exit(2)
493
494   #start the database with schemas
495   dbconn = sqlite3.connect(SERVER_DBFILE)
496   c = dbconn.cursor()
497   try:
498     c.execute('''CREATE TABLE labels (udid TEXT PRIMARY KEY, label_list TEXT)''')
499   except sqlite3.OperationalError as ex:
500     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
501
502   try:
503     c.execute('''CREATE TABLE ttypes (udid TEXT PRIMARY KEY, ttype_list TEXT)''')
504   except sqlite3.OperationalError as ex:
505     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
506
507   try:
508     c.execute('''CREATE TABLE clients (udid TEXT PRIMARY KEY, ipaddr TEXT, tcpport INTEGER, templatetypes TEXT, seqno INTEGER)''')
509   except sqlite3.OperationalError as ex:
510     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
511
512   try:
513     c.execute('''CREATE TABLE templates (uuid_t TEXT PRIMARY KEY, udid TEXT)''')
514   except sqlite3.OperationalError as ex:
515     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
516
517   try:
518     c.execute('''CREATE TABLE subscribers (tuid TEXT PRIMARY KEY, subscriber_list TEXT)''')
519   except sqlite3.OperationalError as ex: 
520     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
521
522   dbconn.commit()
523   dbconn.close()
524
525   logging.debug('Domino Server Starting...')
526   server.start_communicationService()
527   print 'done.'
528
529 if __name__ == "__main__":
530   sys.exit(main())