JIRA DOMINO-22
[domino.git] / DominoServer.py
1 #!/usr/bin/env python
2
3 #Copyright 2016 Open Platform for NFV Project, Inc. and its contributors
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #       http://www.apache.org/licenses/LICENSE-2.0
8 #   Unless required by applicable law or agreed to in writing, software
9 #   distributed under the License is distributed on an "AS IS" BASIS,
10 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 #   See the License for the specific language governing permissions and
12 #   limitations under the License.
13
14 import sys, os, glob, random, errno
15 import getopt, socket
16 import logging, json
17 import sqlite3, yaml
18 import uuid
19 #sys.path.append('gen-py')
20 #sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
21 sys.path.insert(0, glob.glob('./lib')[0])
22
23
24 from dominoRPC import Communication
25 from dominoRPC.ttypes import *
26 from dominoRPC.constants import *
27
28 from thrift import Thrift
29 from thrift.transport import TSocket
30 from thrift.transport import TTransport
31 from thrift.protocol import TBinaryProtocol
32 from thrift.server import TServer
33
34 from toscaparser.tosca_template import ToscaTemplate
35 #from toscaparser.utils.gettextutils import _
36 #import toscaparser.utils.urlutils
37 from translator.hot.tosca_translator import TOSCATranslator
38
39
40 from mapper import *
41 from partitioner import *
42 from util import miscutil
43
44 #Load configuration parameters
45 from domino_conf import *
46
47
48 class CommunicationHandler:
49   def __init__(self):
50     self.log = {}
51
52   def __init__(self, dominoserver):
53     self.log = {}
54     self.dominoServer = dominoserver
55     self.seqno = SERVER_SEQNO;
56    
57   def openconnection(self, ipaddr, tcpport):
58     try:
59       # Make socket
60       transport = TSocket.TSocket(ipaddr, tcpport)
61       transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
62       # Add buffering to compensate for slow raw sockets
63       self.transport = TTransport.TBufferedTransport(transport)
64       # Wrap in a protocol
65       self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
66       # Create a client to use the protocol encoder
67       self.sender = Communication.Client(self.protocol)
68       self.transport.open()
69     except:
70       raise
71
72
73   def closeconnection(self):
74     self.transport.close()
75
76   def push_template(self,template,ipaddr,tcpport,TUID):
77     try:
78       self.openconnection(ipaddr,tcpport)
79       pushm = PushMessage()
80       pushm.domino_udid = SERVER_UDID 
81       pushm.seq_no = self.seqno
82       pushm.template_type = 'tosca-nfv-v1.0'
83       pushm.template = template
84       pushm.template_UUID = TUID
85       self.seqno = self.seqno + 1
86
87       push_r = self.sender.d_push(pushm)  
88       logging.info('Push Response received from %s' , push_r.domino_udid)
89       self.closeconnection()
90     except (socket.timeout) as tx:
91       self.dominoServer.handle_RPC_timeout(pushm)
92       raise tx
93     except:       
94       logging.error('Unexpected error: %s', sys.exc_info()[0])
95       raise
96  
97   #Heartbeat from Domino Client is received
98   #Actions:
99   #     - Respond Back with a heartbeat
100
101   def d_heartbeat(self, hb_msg):
102     logging.info('heartbeat received from %s' , hb_msg.domino_udid)
103
104     hb_r = HeartBeatMessage()
105     hb_r.domino_udid = SERVER_UDID
106     hb_r.seq_no = self.seqno
107
108     self.seqno = self.seqno + 1 
109
110     return hb_r
111
112   #Registration from Domino Client is received
113   #Actions:
114   #
115   #       - Respond Back with Registration Response
116   def d_register(self, reg_msg):
117
118     #Prepare and send Registration Response
119     reg_r = RegisterResponseMessage()
120     logging.info('Registration Request received for UUID %s from IP: %s port: %d', reg_msg.domino_udid_desired, reg_msg.ipaddr, reg_msg.tcpport)
121
122    
123     reg_r.domino_udid_assigned = self.dominoServer.assign_udid(reg_msg.domino_udid_desired)
124     reg_r.seq_no = self.seqno
125     reg_r.domino_udid = SERVER_UDID
126     #return unconditional success 
127     #To be implemented:
128     #Define conditions for unsuccessful registration (e.g., unsupported mapping)
129     reg_r.responseCode = SUCCESS 
130     #no need to send comments
131     #To be implemented:
132     #Logic for a new UDID assignment
133  
134     self.seqno = self.seqno + 1
135
136     self.dominoServer.registration_record[reg_r.domino_udid_assigned] = reg_msg    
137
138     #commit to the database
139     dbconn = sqlite3.connect(SERVER_DBFILE)
140     c = dbconn.cursor()
141     try:
142       newrow = [(reg_r.domino_udid_assigned, reg_msg.ipaddr, reg_msg.tcpport, ','.join(reg_msg.supported_templates), reg_msg.seq_no),]
143       c.executemany('INSERT INTO clients VALUES (?,?,?,?,?)',newrow)
144     except sqlite3.OperationalError as ex:
145       logging.error('Could not add the new registration record into %s for Domino Client %s :  %s', SERVER_DBFILE, reg_r.domino_udid_assigned, ex.message)
146     except:
147       logging.error('Could not add the new registration record into %s for Domino Client %s', SERVER_DBFILE, reg_r.domino_udid_assigned)
148       logging.error('Unexpected error: %s', sys.exc_info()[0])
149  
150     dbconn.commit()
151     dbconn.close()
152
153     return reg_r
154
155
156   #Subscription from Domino Client is received
157   #Actions:
158   #       - Save the templates  & labels
159   #       - Respond Back with Subscription Response
160   def d_subscribe(self, sub_msg):
161     logging.info('Subscribe Request received from %s' , sub_msg.domino_udid)
162
163     if sub_msg.template_op == APPEND:
164       if self.dominoServer.subscribed_templateformats.has_key(sub_msg.domino_udid):
165         self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].update(set(sub_msg.supported_template_types))
166       else:
167         self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
168     elif sub_msg.template_op == OVERWRITE:
169       self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
170     elif sub_msg.template_op == DELETE:
171       self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].difference_update(set(sub_msg.supported_template_types))
172
173 #    if sub_msg.labels != []:
174     if sub_msg.label_op == APPEND:
175       logging.debug('APPENDING Labels...')
176       if self.dominoServer.subscribed_labels.has_key(sub_msg.domino_udid):
177         self.dominoServer.subscribed_labels[sub_msg.domino_udid].update(set(sub_msg.labels))
178       else:
179         self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
180     elif sub_msg.label_op == OVERWRITE:
181       logging.debug('OVERWRITING Labels...')
182       self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
183     elif sub_msg.label_op == DELETE:
184       logging.debug('DELETING Labels...')
185       self.dominoServer.subscribed_labels[sub_msg.domino_udid].difference_update(set(sub_msg.labels))
186
187     logging.debug('Supported Template: %s Supported Labels: %s' , self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] , self.dominoServer.subscribed_labels[sub_msg.domino_udid])
188
189     #commit to the database
190     dbconn = sqlite3.connect(SERVER_DBFILE)
191     c = dbconn.cursor()
192     newlabelset = self.dominoServer.subscribed_labels[sub_msg.domino_udid]
193     try:
194       newvalue=','.join(list(newlabelset))
195       c.execute( "REPLACE INTO labels VALUES (?,?)", (sub_msg.domino_udid,newvalue) )
196     except sqlite3.OperationalError as ex1:
197       logging.error('Could not add the new labels to %s for Domino Client %s :  %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
198     except:
199       logging.error('Could not add the new labels to %s for Domino Client %s', SERVER_DBFILE, sub_msg.domino_udid)
200       logging.error('Unexpected error: %s', sys.exc_info()[0])
201
202     newttypeset = self.dominoServer.subscribed_templateformats[sub_msg.domino_udid]
203     try:
204       newvalue=','.join(list(newttypeset))
205       c.execute( "REPLACE INTO ttypes VALUES (?,?)", (sub_msg.domino_udid,newvalue) )
206     except sqlite3.OperationalError as ex1:
207       logging.error('Could not add the new labels to %s for Domino Client %s :  %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
208     except:
209       logging.error('Could not add the new labels to %s for Domino Client %s', SERVER_DBFILE, sub_msg.domino_udid)
210       logging.error('Unexpected error: %s', sys.exc_info()[0])
211
212
213     dbconn.commit()
214     dbconn.close()
215
216  
217     #Fill in the details
218     sub_r = SubscribeResponseMessage()
219     sub_r.domino_udid = SERVER_UDID
220     sub_r.seq_no = self.seqno
221     sub_r.responseCode = SUCCESS
222     self.seqno = self.seqno + 1
223
224     return sub_r
225
226   #Template Publication from Domino Client is received
227   #Actions:
228   #       - Parse the template, perform mapping, partition the template
229   #       - Launch Push service
230   #       - Respond Back with Publication Response    
231   def d_publish(self, pub_msg):
232     logging.info('Publish Request received from %s' , pub_msg.domino_udid)
233     #logging.debug(pub_msg.template)
234
235     # Create response with response code as SUCCESS by default
236     # Response code will be overwritten if partial or full failure occurs
237     pub_r = PublishResponseMessage()
238     pub_r.domino_udid = SERVER_UDID
239     pub_r.seq_no = self.seqno
240     pub_r.responseCode = SUCCESS
241     pub_r.template_UDID = pub_msg.template_UUID
242     self.seqno = self.seqno + 1
243
244     if (pub_msg.template_UUID is not None) and (self.dominoServer.TUID2Publisher.has_key(pub_msg.template_UUID) == False):
245       logging.debug('TEMPLATE UUID %s does not exist', pub_msg.template_UUID)
246       pub_r.responseCode = FAILED
247       return pub_r
248       
249     # Save as file
250     try:
251       os.makedirs(TOSCADIR)
252     except OSError as exception:
253       if exception.errno == errno.EEXIST:
254         logging.debug('ERRNO %d; %s exists. Creating: %s', exception.errno, TOSCADIR,  TOSCADIR+TOSCA_DEFAULT_FNAME)
255       else:
256         logging.error('IGNORING error occurred in creating %s. Err no: %d', exception.errno)
257
258     #Risking a race condition if another process is attempting to write to same file
259     try:
260       miscutil.write_templatefile(TOSCADIR+TOSCA_DEFAULT_FNAME , pub_msg.template)
261     except:
262       #Some sort of race condition should have occured that prevented the write operation
263       #treat as failure
264       logging.error('FAILED to write the published file: %s', sys.exc_info()[0])
265       pub_r.responseCode = FAILED
266       return pub_r
267     
268     # Load tosca object from file into memory
269     try:
270       #tosca = ToscaTemplate( TOSCADIR+TOSCA_DEFAULT_FNAME )
271       tpl = yaml.load(file(TOSCADIR+TOSCA_DEFAULT_FNAME,'r'))
272     except:
273       logging.error('Tosca Parser error: %s', sys.exc_info()[0])
274       #tosca file could not be read
275       pub_r.responseCode = FAILED
276       return pub_r 
277
278     # Extract Labels
279     node_labels = label.extract_labels( tpl )
280     logging.debug('Node Labels: %s', node_labels)
281
282     # Map nodes in the template to resource domains
283     site_map = label.map_nodes( self.dominoServer.subscribed_labels , node_labels )
284     logging.debug('Site Maps: %s' , site_map)
285
286     # Select a site for each VNF
287     node_site = label.select_site( site_map ) 
288     logging.debug('Selected Sites: %s', node_site)
289
290     # Create per-domain Tosca files
291     file_paths = partitioner.partition_tosca('./toscafiles/template',node_site,tpl)
292     logging.debug('Per domain file paths: %s', file_paths)
293  
294     # Create work-flow
295
296     # Assign template UUID if no UUID specified
297     # Otherwise update the existing domains subscribed to TUID
298     unsuccessful_updates = []
299     if pub_msg.template_UUID is None:
300       pub_r.template_UDID = self.dominoServer.assign_tuid() #update response message with the newly assigned template UUID
301     else:
302       logging.debug('TEMPLATE UUID %s exists, verify publisher and update subscribers', pub_msg.template_UUID)
303       if self.dominoServer.TUID2Publisher[pub_msg.template_UUID] != pub_msg.domino_udid: #publisher is not the owner, reject
304         logging.error('FAILED to verify publisher: %s against the publisher on record: %s', pub_msg.domino_udid, self.dominoServer.TUID2Publisher[pub_msg.template_UUID])
305         pub_r.responseCode = FAILED
306         return pub_r  
307       else: #Template exists, we need to find clients that are no longer in the subscription list list
308         TUID_unsubscribed_list = list(set(self.dominoServer.TUID2Subscribers[pub_r.template_UDID]) - set(file_paths.keys()))
309         if len(TUID_unsubscribed_list) > 0:
310           logging.debug('%s no longer host any nodes for TUID %s', TUID_unsubscribed_list, pub_r.template_UDID)
311         # Send empty bodied templates to domains which no longer has any assigned resource
312         template_lines = []
313         for i in range(len(TUID_unsubscribed_list)):
314           domino_client_ip = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].ipaddr
315           domino_client_port = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].tcpport  
316           try:
317             self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UDID)        
318           except:       
319             logging.error('Error in pushing template: %s', sys.exc_info()[0]) 
320             unsuccessful_updates.append(TUID_unsubscribed_list[i])
321
322     # The following template distribution is not transactional, meaning that some domains
323     # might be successfull receiving their sub-templates while some other might not
324     # The function returns FAILED code to the publisher in such situations, meaning that
325     # publisher must republish to safely orchestrate/manage NS or VNF
326
327     # Send domain templates to each domain agent/client 
328     # FOR NOW: send untranslated but partitioned tosca files to scheduled sites
329     # TBD: read from work-flow
330     for site in file_paths:
331       domino_client_ip = self.dominoServer.registration_record[site].ipaddr
332       domino_client_port = self.dominoServer.registration_record[site].tcpport
333       try:
334         if 'hot' in self.dominoServer.subscribed_templateformats[site]:
335           tosca = ToscaTemplate(file_paths[site])
336           translator = TOSCATranslator(tosca, {}, False)
337           output = translator.translate()
338           logging.debug('HOT translation: \n %s', output)
339           template_lines = [ output ]
340         else: 
341           template_lines = miscutil.read_templatefile(file_paths[site]) 
342         self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UDID)
343       except IOError as e:
344         logging.error('I/O error(%d): %s' , e.errno, e.strerror)
345         pub_r.responseCode = FAILED
346       except:
347         logging.error('Error: %s', sys.exc_info()[0])
348         pub_r.responseCode = FAILED
349
350     # Check if any file is generated for distribution, if not
351     # return FAILED as responseCode, we should also send description for
352     # reason
353     if len(file_paths) == 0:
354       pub_r.responseCode = FAILED
355
356
357     dbconn = sqlite3.connect(SERVER_DBFILE)
358     c = dbconn.cursor()
359
360     if pub_r.responseCode == SUCCESS:
361       # update in memory database
362       self.dominoServer.TUID2Publisher[pub_r.template_UDID] = pub_msg.domino_udid
363       try:
364         c.execute( "REPLACE INTO templates VALUES (?,?)", (pub_r.template_UDID,pub_msg.domino_udid) )
365         dbconn.commit()
366       except sqlite3.OperationalError as ex1:
367         logging.error('Could not add new TUID %s  DB for Domino Client %s :  %s', pub_r.template_UDID, pub_msg.domino_udid, ex1.message)
368       except:
369         logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UDID, pub_msg.domino_udid)
370         logging.error('Unexpected error: %s', sys.exc_info()[0])
371       else:
372         self.dominoServer.TUID2Publisher[pub_r.template_UDID] = pub_msg.domino_udid
373
374     # update in memory database
375     self.dominoServer.TUID2Subscribers[pub_r.template_UDID] = list(set(unsuccessful_updates).union(set(file_paths.keys()))) #file_paths.keys()
376     logging.debug('Subscribers: %s for TUID: %s', self.dominoServer.TUID2Subscribers[pub_r.template_UDID], pub_r.template_UDID)
377     try:
378       newvalue = ','.join(self.dominoServer.TUID2Subscribers[pub_r.template_UDID])
379       c.execute( "REPLACE INTO subscribers VALUES (?,?)", (pub_r.template_UDID,newvalue) )
380       dbconn.commit()
381     except sqlite3.OperationalError as ex1:
382       logging.error('Could not add new subscribers for TUID %s for Domino Client %s:  %s', pub_r.template_UDID, pub_msg.domino_udid, ex1.message)
383     except:
384       logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UDID, pub_msg.domino_udid)
385       logging.error('Unexpected error: %s', sys.exc_info()[0])
386
387     dbconn.close()
388
389     return pub_r
390     
391   #Query from Domino Client is received
392   #Actions:
393   #
394   #       - Respond Back with Query Response
395   def d_query(self, qu_msg):
396     #Fill in the details
397     qu_r = QueryResponseMessage()
398     qu_r.domino_udid = SERVER_UDID
399     qu_r.seq_no = self.seqno
400     qu_r.responseCode = SUCCESS
401     qu_r.queryResponse = []
402     
403     for i in range(len(qu_msg.queryString)):
404       if qu_msg.queryString[i] == 'list-tuids': # limit the response to TUIDs that belong to this domino client
405          qu_r.queryResponse.extend([j for j in self.dominoServer.TUID2Publisher.keys() if self.dominoServer.TUID2Publisher[j] == qu_msg.domino_udid])
406
407     self.seqno = self.seqno + 1
408     return qu_r
409
410
411 class DominoServer:
412    def __init__(self):
413      self.assignedUUIDs = list()
414      self.subscribed_labels = dict()
415      self.subscribed_templateformats = dict()
416      self.registration_record = dict() 
417      self.assignedTUIDs = list()
418      self.TUID2Publisher = dict()
419      self.TUID2Subscribers = dict()
420      self.communicationHandler = CommunicationHandler(self)
421      self.processor = Communication.Processor(self.communicationHandler)
422      self.transport = TSocket.TServerSocket(port=DOMINO_SERVER_PORT)
423      self.tfactory = TTransport.TBufferedTransportFactory()
424      self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
425      #Use TThreadedServer or TThreadPoolServer for a multithreaded server
426      #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
427      self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
428
429
430    def start_communicationService(self):
431      self.communicationServer.serve()
432
433    #For now assign the desired UDID
434    #To be implemented:
435    #Check if ID is already assigned and in use
436    #If not assigned, assign it
437    #If assigned, offer a new random id
438    def assign_udid(self, udid_desired):
439      new_udid = udid_desired 
440      while new_udid in self.assignedUUIDs:
441        new_udid = uuid.uuid4().hex  
442      self.assignedUUIDs.append(new_udid)
443      return new_udid
444
445    def assign_tuid(self):
446      new_TUID = uuid.uuid4().hex
447      while new_TUID in self.assignedTUIDs:
448        new_TUID = uuid.uuid4().hex
449      self.assignedTUIDs.append(new_TUID)  
450      return new_TUID
451
452    def handle_RPC_timeout(self, RPCmessage):
453      if RPCmessage.messageType == PUSH:
454       logging.debug('RPC Timeout for message type: PUSH')
455       # TBD: handle each RPC timeout separately
456
457 def main(argv):
458   server = DominoServer()
459   loglevel = LOGLEVEL
460   #process input arguments
461   try:
462       opts, args = getopt.getopt(argv,"hc:l:",["conf=","log="])
463   except getopt.GetoptError:
464       print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>'
465       sys.exit(2)
466   for opt, arg in opts:
467       if opt == '-h':
468          print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>'
469          sys.exit()
470       elif opt in ("-c", "--conf"):
471          configfile = arg
472       elif opt in ("-l", "--log"):
473          loglevel= arg
474   #Set logging level
475   numeric_level = getattr(logging, loglevel.upper(), None)
476   try:
477     if not isinstance(numeric_level, int):
478       raise ValueError('Invalid log level: %s' % loglevel)
479     logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
480   except ValueError, ex:
481     print ex.message
482     sys.exit(2)
483
484   #start the database with schemas
485   dbconn = sqlite3.connect(SERVER_DBFILE)
486   c = dbconn.cursor()
487   try:
488     c.execute('''CREATE TABLE labels (udid TEXT PRIMARY KEY, label_list TEXT)''')
489   except sqlite3.OperationalError as ex:
490     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
491
492   try:
493     c.execute('''CREATE TABLE ttypes (udid TEXT PRIMARY KEY, ttype_list TEXT)''')
494   except sqlite3.OperationalError as ex:
495     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
496
497   try:
498     c.execute('''CREATE TABLE clients (udid TEXT PRIMARY KEY, ipaddr TEXT, tcpport INTEGER, templatetypes TEXT, seqno INTEGER)''')
499   except sqlite3.OperationalError as ex:
500     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
501
502   try:
503     c.execute('''CREATE TABLE templates (uuid_t TEXT PRIMARY KEY, udid TEXT)''')
504   except sqlite3.OperationalError as ex:
505     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
506
507   try:
508     c.execute('''CREATE TABLE subscribers (tuid TEXT PRIMARY KEY, subscriber_list TEXT)''')
509   except sqlite3.OperationalError as ex: 
510     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
511
512   dbconn.commit()
513   dbconn.close()
514
515   logging.debug('Domino Server Starting...')
516   server.start_communicationService()
517   print 'done.'
518
519 if __name__ == "__main__":
520    main(sys.argv[1:])