JIRA: DOMINO-25
[domino.git] / DominoServer.py
1 #!/usr/bin/env python
2
3 #Copyright 2016 Open Platform for NFV Project, Inc. and its contributors
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #       http://www.apache.org/licenses/LICENSE-2.0
8 #   Unless required by applicable law or agreed to in writing, software
9 #   distributed under the License is distributed on an "AS IS" BASIS,
10 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 #   See the License for the specific language governing permissions and
12 #   limitations under the License.
13
14 import sys, os, glob, random, errno
15 import getopt, socket
16 import logging, json
17 import sqlite3, yaml
18 import uuid
19
20 sys.path.insert(0, glob.glob('./lib')[0])
21
22
23 from dominoRPC import Communication
24 from dominoRPC.ttypes import *
25 from dominoRPC.constants import *
26
27 from thrift import Thrift
28 from thrift.transport import TSocket
29 from thrift.transport import TTransport
30 from thrift.protocol import TBinaryProtocol
31 from thrift.server import TServer
32
33 from toscaparser.tosca_template import ToscaTemplate
34 #from toscaparser.utils.gettextutils import _
35 #import toscaparser.utils.urlutils
36 from translator.hot.tosca_translator import TOSCATranslator
37
38
39 from mapper import *
40 from partitioner import *
41 from util import miscutil
42
43 #Load configuration parameters
44 from domino_conf import *
45
46
47 class CommunicationHandler:
48   def __init__(self):
49     self.log = {}
50
51   def __init__(self, dominoserver):
52     self.log = {}
53     self.dominoServer = dominoserver
54     self.seqno = SERVER_SEQNO;
55    
56   def openconnection(self, ipaddr, tcpport):
57     try:
58       # Make socket
59       transport = TSocket.TSocket(ipaddr, tcpport)
60       transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
61       # Add buffering to compensate for slow raw sockets
62       self.transport = TTransport.TBufferedTransport(transport)
63       # Wrap in a protocol
64       self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
65       # Create a client to use the protocol encoder
66       self.sender = Communication.Client(self.protocol)
67       self.transport.open()
68     except:
69       raise
70
71
72   def closeconnection(self):
73     self.transport.close()
74
75   def push_template(self,template,ipaddr,tcpport,TUID):
76     try:
77       self.openconnection(ipaddr,tcpport)
78       pushm = PushMessage()
79       pushm.domino_udid = SERVER_UDID 
80       pushm.seq_no = self.seqno
81       pushm.template_type = 'tosca-nfv-v1.0'
82       pushm.template = template
83       pushm.template_UUID = TUID
84       self.seqno = self.seqno + 1
85
86       push_r = self.sender.d_push(pushm)  
87       logging.info('Push Response received from %s' , push_r.domino_udid)
88       self.closeconnection()
89     except (socket.timeout) as tx:
90       self.dominoServer.handle_RPC_timeout(pushm)
91       raise tx
92     except:       
93       logging.error('Unexpected error: %s', sys.exc_info()[0])
94       raise
95  
96   #Heartbeat from Domino Client is received
97   #Actions:
98   #     - Respond Back with a heartbeat
99
100   def d_heartbeat(self, hb_msg):
101     logging.info('heartbeat received from %s' , hb_msg.domino_udid)
102
103     hb_r = HeartBeatMessage()
104     hb_r.domino_udid = SERVER_UDID
105     hb_r.seq_no = self.seqno
106
107     self.seqno = self.seqno + 1 
108
109     return hb_r
110
111   #Registration from Domino Client is received
112   #Actions:
113   #
114   #       - Respond Back with Registration Response
115   def d_register(self, reg_msg):
116
117     #Prepare and send Registration Response
118     reg_r = RegisterResponseMessage()
119     logging.info('Registration Request received for UUID %s from IP: %s port: %d', reg_msg.domino_udid_desired, reg_msg.ipaddr, reg_msg.tcpport)
120
121    
122     reg_r.domino_udid_assigned = self.dominoServer.assign_udid(reg_msg.domino_udid_desired)
123     reg_r.seq_no = self.seqno
124     reg_r.domino_udid = SERVER_UDID
125     #return unconditional success 
126     #To be implemented:
127     #Define conditions for unsuccessful registration (e.g., unsupported mapping)
128     reg_r.responseCode = SUCCESS 
129     #no need to send comments
130     #To be implemented:
131     #Logic for a new UDID assignment
132  
133     self.seqno = self.seqno + 1
134
135     self.dominoServer.registration_record[reg_r.domino_udid_assigned] = reg_msg    
136
137     #commit to the database
138     dbconn = sqlite3.connect(SERVER_DBFILE)
139     c = dbconn.cursor()
140     try:
141       newrow = [(reg_r.domino_udid_assigned, reg_msg.ipaddr, reg_msg.tcpport, ','.join(reg_msg.supported_templates), reg_msg.seq_no),]
142       c.executemany('INSERT INTO clients VALUES (?,?,?,?,?)',newrow)
143     except sqlite3.OperationalError as ex:
144       logging.error('Could not add the new registration record into %s for Domino Client %s :  %s', SERVER_DBFILE, reg_r.domino_udid_assigned, ex.message)
145     except:
146       logging.error('Could not add the new registration record into %s for Domino Client %s', SERVER_DBFILE, reg_r.domino_udid_assigned)
147       logging.error('Unexpected error: %s', sys.exc_info()[0])
148  
149     dbconn.commit()
150     dbconn.close()
151
152     return reg_r
153
154
155   #Subscription from Domino Client is received
156   #Actions:
157   #       - Save the templates  & labels
158   #       - Respond Back with Subscription Response
159   def d_subscribe(self, sub_msg):
160     logging.info('Subscribe Request received from %s' , sub_msg.domino_udid)
161
162     if sub_msg.template_op == APPEND:
163       if self.dominoServer.subscribed_templateformats.has_key(sub_msg.domino_udid):
164         self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].update(set(sub_msg.supported_template_types))
165       else:
166         self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
167     elif sub_msg.template_op == OVERWRITE:
168       self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
169     elif sub_msg.template_op == DELETE:
170       self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].difference_update(set(sub_msg.supported_template_types))
171
172 #    if sub_msg.labels != []:
173     if sub_msg.label_op == APPEND:
174       logging.debug('APPENDING Labels...')
175       if self.dominoServer.subscribed_labels.has_key(sub_msg.domino_udid):
176         self.dominoServer.subscribed_labels[sub_msg.domino_udid].update(set(sub_msg.labels))
177       else:
178         self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
179     elif sub_msg.label_op == OVERWRITE:
180       logging.debug('OVERWRITING Labels...')
181       self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
182     elif sub_msg.label_op == DELETE:
183       logging.debug('DELETING Labels...')
184       self.dominoServer.subscribed_labels[sub_msg.domino_udid].difference_update(set(sub_msg.labels))
185
186     logging.debug('Supported Template: %s Supported Labels: %s' , self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] , self.dominoServer.subscribed_labels[sub_msg.domino_udid])
187
188     #commit to the database
189     dbconn = sqlite3.connect(SERVER_DBFILE)
190     c = dbconn.cursor()
191     newlabelset = self.dominoServer.subscribed_labels[sub_msg.domino_udid]
192     try:
193       newvalue=','.join(list(newlabelset))
194       c.execute( "REPLACE INTO labels VALUES (?,?)", (sub_msg.domino_udid,newvalue) )
195     except sqlite3.OperationalError as ex1:
196       logging.error('Could not add the new labels to %s for Domino Client %s :  %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
197     except:
198       logging.error('Could not add the new labels to %s for Domino Client %s', SERVER_DBFILE, sub_msg.domino_udid)
199       logging.error('Unexpected error: %s', sys.exc_info()[0])
200
201     newttypeset = self.dominoServer.subscribed_templateformats[sub_msg.domino_udid]
202     try:
203       newvalue=','.join(list(newttypeset))
204       c.execute( "REPLACE INTO ttypes VALUES (?,?)", (sub_msg.domino_udid,newvalue) )
205     except sqlite3.OperationalError as ex1:
206       logging.error('Could not add the new labels to %s for Domino Client %s :  %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
207     except:
208       logging.error('Could not add the new labels to %s for Domino Client %s', SERVER_DBFILE, sub_msg.domino_udid)
209       logging.error('Unexpected error: %s', sys.exc_info()[0])
210
211
212     dbconn.commit()
213     dbconn.close()
214
215  
216     #Fill in the details
217     sub_r = SubscribeResponseMessage()
218     sub_r.domino_udid = SERVER_UDID
219     sub_r.seq_no = self.seqno
220     sub_r.responseCode = SUCCESS
221     self.seqno = self.seqno + 1
222
223     return sub_r
224
225   #Template Publication from Domino Client is received
226   #Actions:
227   #       - Parse the template, perform mapping, partition the template
228   #       - Launch Push service
229   #       - Respond Back with Publication Response    
230   def d_publish(self, pub_msg):
231     logging.info('Publish Request received from %s' , pub_msg.domino_udid)
232     #logging.debug(pub_msg.template)
233
234     # Create response with response code as SUCCESS by default
235     # Response code will be overwritten if partial or full failure occurs
236     pub_r = PublishResponseMessage()
237     pub_r.domino_udid = SERVER_UDID
238     pub_r.seq_no = self.seqno
239     pub_r.responseCode = SUCCESS
240     pub_r.template_UDID = pub_msg.template_UUID
241     self.seqno = self.seqno + 1
242
243     if (pub_msg.template_UUID is not None) and (self.dominoServer.TUID2Publisher.has_key(pub_msg.template_UUID) == False):
244       logging.debug('TEMPLATE UUID %s does not exist', pub_msg.template_UUID)
245       pub_r.responseCode = FAILED
246       return pub_r
247       
248     # Save as file
249     try:
250       os.makedirs(TOSCADIR)
251     except OSError as exception:
252       if exception.errno == errno.EEXIST:
253         logging.debug('ERRNO %d; %s exists. Creating: %s', exception.errno, TOSCADIR,  TOSCADIR+TOSCA_DEFAULT_FNAME)
254       else:
255         logging.error('IGNORING error occurred in creating %s. Err no: %d', exception.errno)
256
257     #Risking a race condition if another process is attempting to write to same file
258     try:
259       miscutil.write_templatefile(TOSCADIR+TOSCA_DEFAULT_FNAME , pub_msg.template)
260     except:
261       #Some sort of race condition should have occured that prevented the write operation
262       #treat as failure
263       logging.error('FAILED to write the published file: %s', sys.exc_info()[0])
264       pub_r.responseCode = FAILED
265       return pub_r
266     
267     # Load tosca object from file into memory
268     try:
269       #tosca = ToscaTemplate( TOSCADIR+TOSCA_DEFAULT_FNAME )
270       tpl = yaml.load(file(TOSCADIR+TOSCA_DEFAULT_FNAME,'r'))
271     except:
272       logging.error('Tosca Parser error: %s', sys.exc_info()[0])
273       #tosca file could not be read
274       pub_r.responseCode = FAILED
275       return pub_r 
276
277     # Extract Labels
278     node_labels = label.extract_labels( tpl )
279     logging.debug('Node Labels: %s', node_labels)
280
281     # Map nodes in the template to resource domains
282     site_map = label.map_nodes( self.dominoServer.subscribed_labels , node_labels )
283     logging.debug('Site Maps: %s' , site_map)
284
285     # Select a site for each VNF
286     node_site = label.select_site( site_map ) 
287     logging.debug('Selected Sites: %s', node_site)
288
289     # Create per-site Tosca files
290     tpl_site = {}
291     file_paths = partitioner.partition_tosca('./toscafiles/template',node_site,tpl,tpl_site)
292     logging.debug('Per domain file paths: %s', file_paths)
293     logging.debug('Per domain topologies: %s', tpl_site)
294   
295     # Detect boundary links
296     boundary_VLs, VL_sites = partitioner.return_boundarylinks(tpl_site)
297     logging.debug('Boundary VLs: %s', boundary_VLs)
298     logging.debug('VL sites: %s', VL_sites)
299
300     # Create work-flow
301
302     # Assign template UUID if no UUID specified
303     # Otherwise update the existing domains subscribed to TUID
304     unsuccessful_updates = []
305     if pub_msg.template_UUID is None:
306       pub_r.template_UDID = self.dominoServer.assign_tuid() #update response message with the newly assigned template UUID
307     else:
308       logging.debug('TEMPLATE UUID %s exists, verify publisher and update subscribers', pub_msg.template_UUID)
309       if self.dominoServer.TUID2Publisher[pub_msg.template_UUID] != pub_msg.domino_udid: #publisher is not the owner, reject
310         logging.error('FAILED to verify publisher: %s against the publisher on record: %s', pub_msg.domino_udid, self.dominoServer.TUID2Publisher[pub_msg.template_UUID])
311         pub_r.responseCode = FAILED
312         return pub_r  
313       else: #Template exists, we need to find clients that are no longer in the subscription list list
314         TUID_unsubscribed_list = list(set(self.dominoServer.TUID2Subscribers[pub_r.template_UDID]) - set(file_paths.keys()))
315         if len(TUID_unsubscribed_list) > 0:
316           logging.debug('%s no longer host any nodes for TUID %s', TUID_unsubscribed_list, pub_r.template_UDID)
317         # Send empty bodied templates to domains which no longer has any assigned resource
318         template_lines = []
319         for i in range(len(TUID_unsubscribed_list)):
320           domino_client_ip = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].ipaddr
321           domino_client_port = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].tcpport  
322           try:
323             self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UDID)        
324           except:       
325             logging.error('Error in pushing template: %s', sys.exc_info()[0]) 
326             unsuccessful_updates.append(TUID_unsubscribed_list[i])
327
328     # The following template distribution is not transactional, meaning that some domains
329     # might be successfull receiving their sub-templates while some other might not
330     # The function returns FAILED code to the publisher in such situations, meaning that
331     # publisher must republish to safely orchestrate/manage NS or VNF
332
333     # Send domain templates to each domain agent/client 
334     # FOR NOW: send untranslated but partitioned tosca files to scheduled sites
335     # TBD: read from work-flow
336     for site in file_paths:
337       domino_client_ip = self.dominoServer.registration_record[site].ipaddr
338       domino_client_port = self.dominoServer.registration_record[site].tcpport
339       try:
340         if 'hot' in self.dominoServer.subscribed_templateformats[site]:
341           tosca = ToscaTemplate(file_paths[site])
342           translator = TOSCATranslator(tosca, {}, False)
343           output = translator.translate()
344           logging.debug('HOT translation: \n %s', output)
345           template_lines = [ output ]
346         else: 
347           template_lines = miscutil.read_templatefile(file_paths[site]) 
348         self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UDID)
349       except IOError as e:
350         logging.error('I/O error(%d): %s' , e.errno, e.strerror)
351         pub_r.responseCode = FAILED
352       except:
353         logging.error('Error: %s', sys.exc_info()[0])
354         pub_r.responseCode = FAILED
355
356     # Check if any file is generated for distribution, if not
357     # return FAILED as responseCode, we should also send description for
358     # reason
359     if len(file_paths) == 0:
360       pub_r.responseCode = FAILED
361
362
363     dbconn = sqlite3.connect(SERVER_DBFILE)
364     c = dbconn.cursor()
365
366     if pub_r.responseCode == SUCCESS:
367       # update in memory database
368       self.dominoServer.TUID2Publisher[pub_r.template_UDID] = pub_msg.domino_udid
369       try:
370         c.execute( "REPLACE INTO templates VALUES (?,?)", (pub_r.template_UDID,pub_msg.domino_udid) )
371         dbconn.commit()
372       except sqlite3.OperationalError as ex1:
373         logging.error('Could not add new TUID %s  DB for Domino Client %s :  %s', pub_r.template_UDID, pub_msg.domino_udid, ex1.message)
374       except:
375         logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UDID, pub_msg.domino_udid)
376         logging.error('Unexpected error: %s', sys.exc_info()[0])
377       else:
378         self.dominoServer.TUID2Publisher[pub_r.template_UDID] = pub_msg.domino_udid
379
380     # update in memory database
381     self.dominoServer.TUID2Subscribers[pub_r.template_UDID] = list(set(unsuccessful_updates).union(set(file_paths.keys()))) #file_paths.keys()
382     logging.debug('Subscribers: %s for TUID: %s', self.dominoServer.TUID2Subscribers[pub_r.template_UDID], pub_r.template_UDID)
383     try:
384       newvalue = ','.join(self.dominoServer.TUID2Subscribers[pub_r.template_UDID])
385       c.execute( "REPLACE INTO subscribers VALUES (?,?)", (pub_r.template_UDID,newvalue) )
386       dbconn.commit()
387     except sqlite3.OperationalError as ex1:
388       logging.error('Could not add new subscribers for TUID %s for Domino Client %s:  %s', pub_r.template_UDID, pub_msg.domino_udid, ex1.message)
389     except:
390       logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UDID, pub_msg.domino_udid)
391       logging.error('Unexpected error: %s', sys.exc_info()[0])
392
393     dbconn.close()
394
395     return pub_r
396     
397   #Query from Domino Client is received
398   #Actions:
399   #
400   #       - Respond Back with Query Response
401   def d_query(self, qu_msg):
402     #Fill in the details
403     qu_r = QueryResponseMessage()
404     qu_r.domino_udid = SERVER_UDID
405     qu_r.seq_no = self.seqno
406     qu_r.responseCode = SUCCESS
407     qu_r.queryResponse = []
408     
409     for i in range(len(qu_msg.queryString)):
410       if qu_msg.queryString[i] == 'list-tuids': # limit the response to TUIDs that belong to this domino client
411          qu_r.queryResponse.extend([j for j in self.dominoServer.TUID2Publisher.keys() if self.dominoServer.TUID2Publisher[j] == qu_msg.domino_udid])
412
413     self.seqno = self.seqno + 1
414     return qu_r
415
416
417 class DominoServer:
418    def __init__(self):
419      self.assignedUUIDs = list()
420      self.subscribed_labels = dict()
421      self.subscribed_templateformats = dict()
422      self.registration_record = dict() 
423      self.assignedTUIDs = list()
424      self.TUID2Publisher = dict()
425      self.TUID2Subscribers = dict()
426      self.communicationHandler = CommunicationHandler(self)
427      self.processor = Communication.Processor(self.communicationHandler)
428      self.transport = TSocket.TServerSocket(port=DOMINO_SERVER_PORT)
429      self.tfactory = TTransport.TBufferedTransportFactory()
430      self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
431      #Use TThreadedServer or TThreadPoolServer for a multithreaded server
432      #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
433      self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
434
435
436    def start_communicationService(self):
437      self.communicationServer.serve()
438
439    #For now assign the desired UDID
440    #To be implemented:
441    #Check if ID is already assigned and in use
442    #If not assigned, assign it
443    #If assigned, offer a new random id
444    def assign_udid(self, udid_desired):
445      new_udid = udid_desired 
446      while new_udid in self.assignedUUIDs:
447        new_udid = uuid.uuid4().hex  
448      self.assignedUUIDs.append(new_udid)
449      return new_udid
450
451    def assign_tuid(self):
452      new_TUID = uuid.uuid4().hex
453      while new_TUID in self.assignedTUIDs:
454        new_TUID = uuid.uuid4().hex
455      self.assignedTUIDs.append(new_TUID)  
456      return new_TUID
457
458    def handle_RPC_timeout(self, RPCmessage):
459      if RPCmessage.messageType == PUSH:
460       logging.debug('RPC Timeout for message type: PUSH')
461       # TBD: handle each RPC timeout separately
462
463 def main(argv):
464   server = DominoServer()
465   loglevel = LOGLEVEL
466   #process input arguments
467   try:
468       opts, args = getopt.getopt(argv,"hc:l:",["conf=","log="])
469   except getopt.GetoptError:
470       print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>'
471       sys.exit(2)
472   for opt, arg in opts:
473       if opt == '-h':
474          print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>'
475          sys.exit()
476       elif opt in ("-c", "--conf"):
477          configfile = arg
478       elif opt in ("-l", "--log"):
479          loglevel= arg
480   #Set logging level
481   numeric_level = getattr(logging, loglevel.upper(), None)
482   try:
483     if not isinstance(numeric_level, int):
484       raise ValueError('Invalid log level: %s' % loglevel)
485     logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
486   except ValueError, ex:
487     print ex.message
488     sys.exit(2)
489
490   #start the database with schemas
491   dbconn = sqlite3.connect(SERVER_DBFILE)
492   c = dbconn.cursor()
493   try:
494     c.execute('''CREATE TABLE labels (udid TEXT PRIMARY KEY, label_list TEXT)''')
495   except sqlite3.OperationalError as ex:
496     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
497
498   try:
499     c.execute('''CREATE TABLE ttypes (udid TEXT PRIMARY KEY, ttype_list TEXT)''')
500   except sqlite3.OperationalError as ex:
501     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
502
503   try:
504     c.execute('''CREATE TABLE clients (udid TEXT PRIMARY KEY, ipaddr TEXT, tcpport INTEGER, templatetypes TEXT, seqno INTEGER)''')
505   except sqlite3.OperationalError as ex:
506     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
507
508   try:
509     c.execute('''CREATE TABLE templates (uuid_t TEXT PRIMARY KEY, udid TEXT)''')
510   except sqlite3.OperationalError as ex:
511     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
512
513   try:
514     c.execute('''CREATE TABLE subscribers (tuid TEXT PRIMARY KEY, subscriber_list TEXT)''')
515   except sqlite3.OperationalError as ex: 
516     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
517
518   dbconn.commit()
519   dbconn.close()
520
521   logging.debug('Domino Server Starting...')
522   server.start_communicationService()
523   print 'done.'
524
525 if __name__ == "__main__":
526    main(sys.argv[1:])