Bug fix in max-packing scheduler logic
[domino.git] / DominoServer.py
1 #!/usr/bin/env python
2
3 #Copyright 2016 Open Platform for NFV Project, Inc. and its contributors
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #       http://www.apache.org/licenses/LICENSE-2.0
8 #   Unless required by applicable law or agreed to in writing, software
9 #   distributed under the License is distributed on an "AS IS" BASIS,
10 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 #   See the License for the specific language governing permissions and
12 #   limitations under the License.
13
14 import sys, os, glob, random, errno
15 import getopt, socket
16 import logging, json
17 import sqlite3, yaml
18 #sys.path.append('gen-py')
19 #sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
20 sys.path.insert(0, glob.glob('./lib')[0])
21
22
23 from dominoRPC import Communication
24 from dominoRPC.ttypes import *
25 from dominoRPC.constants import *
26
27 from thrift import Thrift
28 from thrift.transport import TSocket
29 from thrift.transport import TTransport
30 from thrift.protocol import TBinaryProtocol
31 from thrift.server import TServer
32
33 from toscaparser.tosca_template import ToscaTemplate
34 #from toscaparser.utils.gettextutils import _
35 #import toscaparser.utils.urlutils
36 from translator.hot.tosca_translator import TOSCATranslator
37
38
39 from mapper import *
40 from partitioner import *
41 from util import miscutil
42
43 #Load configuration parameters
44 from domino_conf import *
45
46
47 class CommunicationHandler:
48   def __init__(self):
49     self.log = {}
50
51   def __init__(self, dominoserver):
52     self.log = {}
53     self.dominoServer = dominoserver
54     self.seqno = SERVER_SEQNO;
55    
56   def openconnection(self, ipaddr, tcpport):
57     try:
58       # Make socket
59       transport = TSocket.TSocket(ipaddr, tcpport)
60       transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
61       # Add buffering to compensate for slow raw sockets
62       self.transport = TTransport.TBufferedTransport(transport)
63       # Wrap in a protocol
64       self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
65       # Create a client to use the protocol encoder
66       self.sender = Communication.Client(self.protocol)
67       self.transport.open()
68     except Thrift.TException, tx:
69       logging.error('%s' , tx.message) 
70
71
72
73   def closeconnection(self):
74     self.transport.close()
75
76   def push_template(self,template,ipaddr,tcpport):
77     self.openconnection(ipaddr,tcpport)
78     pushm = PushMessage()
79     pushm.domino_udid = SERVER_UDID 
80     pushm.seq_no = self.seqno
81     pushm.template_type = 'tosca-nfv-v1.0'
82     pushm.template = template
83     try:
84       push_r = self.sender.d_push(pushm)  
85       logging.info('Push Response received from %d' , push_r.domino_udid)
86     except (Thrift.TException, TSocket.TTransportException) as tx:
87       logging.error('%s' , tx.message)
88     except (socket.timeout) as tx:
89       self.dominoServer.handle_RPC_timeout(pushm)
90     except:       
91       logging.error('Unexpected error: %s', sys.exc_info()[0])
92
93     self.seqno = self.seqno + 1
94
95     self.closeconnection()
96  
97   #Heartbeat from Domino Client is received
98   #Actions:
99   #     - Respond Back with a heartbeat
100
101   def d_heartbeat(self, hb_msg):
102     logging.info('heartbeat received from %d' , hb_msg.domino_udid)
103
104     hb_r = HeartBeatMessage()
105     hb_r.domino_udid = SERVER_UDID
106     hb_r.seq_no = self.seqno
107
108     self.seqno = self.seqno + 1 
109
110     return hb_r
111
112   #Registration from Domino Client is received
113   #Actions:
114   #
115   #       - Respond Back with Registration Response
116   def d_register(self, reg_msg):
117
118     #Prepare and send Registration Response
119     reg_r = RegisterResponseMessage()
120     logging.info('Registration Request received for UDID %d from IP: %s port: %d', reg_msg.domino_udid_desired, reg_msg.ipaddr, reg_msg.tcpport)
121
122    
123     reg_r.domino_udid_assigned = self.dominoServer.assign_udid(reg_msg.domino_udid_desired)
124     reg_r.seq_no = self.seqno
125     reg_r.domino_udid = SERVER_UDID
126     #return unconditional success 
127     #To be implemented:
128     #Define conditions for unsuccessful registration (e.g., unsupported mapping)
129     reg_r.responseCode = SUCCESS 
130     #no need to send comments
131     #To be implemented:
132     #Logic for a new UDID assignment
133  
134     self.seqno = self.seqno + 1
135
136     self.dominoServer.registration_record[reg_r.domino_udid_assigned] = reg_msg    
137
138     #commit to the database
139     dbconn = sqlite3.connect(SERVER_DBFILE)
140     c = dbconn.cursor()
141     try:
142       newrow = [(reg_r.domino_udid_assigned, reg_msg.ipaddr, reg_msg.tcpport, ','.join(reg_msg.supported_templates), reg_msg.seq_no),]
143       c.executemany('INSERT INTO clients VALUES (?,?,?,?,?)',newrow)
144     except sqlite3.OperationalError as ex:
145       logging.error('Could not add the new registration record into %s for Domino Client %d :  %s', SERVER_DBFILE, reg_r.domino_udid_assigned, ex.message)
146     except:
147       logging.error('Could not add the new registration record into %s for Domino Client %d', SERVER_DBFILE, reg_r.domino_udid_assigned)
148       logging.error('Unexpected error: %s', sys.exc_info()[0])
149  
150     dbconn.commit()
151     dbconn.close()
152
153     return reg_r
154
155
156   #Subscription from Domino Client is received
157   #Actions:
158   #       - Save the templates  & labels
159   #       - Respond Back with Subscription Response
160   def d_subscribe(self, sub_msg):
161     logging.info('Subscribe Request received from %d' , sub_msg.domino_udid)
162
163     if sub_msg.template_op == APPEND:
164       if self.dominoServer.subscribed_templateformats.has_key(sub_msg.domino_udid):
165         self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].update(set(sub_msg.supported_template_types))
166       else:
167         self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
168     elif sub_msg.template_op == OVERWRITE:
169       self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
170     elif sub_msg.template_op == DELETE:
171       self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].difference_update(set(sub_msg.supported_template_types))
172
173 #    if sub_msg.labels != []:
174     if sub_msg.label_op == APPEND:
175       logging.debug('APPENDING Labels...')
176       if self.dominoServer.subscribed_labels.has_key(sub_msg.domino_udid):
177         self.dominoServer.subscribed_labels[sub_msg.domino_udid].update(set(sub_msg.labels))
178       else:
179         self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
180     elif sub_msg.label_op == OVERWRITE:
181       logging.debug('OVERWRITING Labels...')
182       self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
183     elif sub_msg.label_op == DELETE:
184       logging.debug('DELETING Labels...')
185       self.dominoServer.subscribed_labels[sub_msg.domino_udid].difference_update(set(sub_msg.labels))
186
187     logging.debug('Supported Template: %s Supported Labels: %s' , self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] , self.dominoServer.subscribed_labels[sub_msg.domino_udid])
188
189     #commit to the database
190     dbconn = sqlite3.connect(SERVER_DBFILE)
191     c = dbconn.cursor()
192     newlabelset = self.dominoServer.subscribed_labels[sub_msg.domino_udid]
193     try:
194       c.execute("REPLACE INTO labels (udid, label_list) VALUES ({udid}, '{newvalue}')".\
195                format(udid=sub_msg.domino_udid, newvalue=','.join(list(newlabelset)) ))
196     except sqlite3.OperationalError as ex1:
197       logging.error('Could not add the new labels to %s for Domino Client %d :  %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
198     except:
199       logging.error('Could not add the new labels to %s for Domino Client %d', SERVER_DBFILE, sub_msg.domino_udid)
200       logging.error('Unexpected error: %s', sys.exc_info()[0])
201
202     newttypeset = self.dominoServer.subscribed_templateformats[sub_msg.domino_udid]
203     try:
204       c.execute("REPLACE INTO ttypes (udid, ttype_list) VALUES ({udid}, '{newvalue}')".\
205                format(udid=sub_msg.domino_udid, newvalue=','.join(list(newttypeset)) ))
206     except sqlite3.OperationalError as ex1:
207       logging.error('Could not add the new labels to %s for Domino Client %d :  %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
208     except:
209       logging.error('Could not add the new labels to %s for Domino Client %d', SERVER_DBFILE, sub_msg.domino_udid)
210       logging.error('Unexpected error: %s', sys.exc_info()[0])
211
212
213     dbconn.commit()
214     dbconn.close()
215
216  
217     #Fill in the details
218     sub_r = SubscribeResponseMessage()
219     sub_r.domino_udid = SERVER_UDID
220     sub_r.seq_no = self.seqno
221     sub_r.responseCode = SUCCESS
222     self.seqno = self.seqno + 1
223
224     return sub_r
225
226   #Template Publication from Domino Client is received
227   #Actions:
228   #       - Parse the template, perform mapping, partition the template
229   #       - Launch Push service
230   #       - Respond Back with Publication Response
231   def d_publish(self, pub_msg):
232     logging.info('Publish Request received from %d' , pub_msg.domino_udid)
233     logging.debug(pub_msg.template)
234
235     # Save as file
236     try:
237       os.makedirs(TOSCADIR)
238     except OSError as exception:
239       if exception.errno == errno.EEXIST:
240         logging.debug('ERRNO %d; %s exists. Creating: %s', exception.errno, TOSCADIR,  TOSCADIR+TOSCA_DEFAULT_FNAME)
241       else:
242         logging.error('IGNORING error occurred in creating %s. Err no: %d', exception.errno)
243
244     #Risking a race condition if another process is attempting to write to same file
245     try:
246       miscutil.write_templatefile(TOSCADIR+TOSCA_DEFAULT_FNAME , pub_msg.template)
247     except:
248       #Some sort of race condition should have occured that prevented the write operation
249       #treat as failure
250       logging.error('FAILED to write the published file: %s', sys.exc_info()[0])
251       pub_r = PublishResponseMessage()
252       pub_r.domino_udid = SERVER_UDID
253       pub_r.seq_no = self.seqno
254       pub_r.responseCode = FAILED
255       self.seqno = self.seqno + 1
256       return pub_r
257
258     # Load tosca object from file into memory
259     try:
260       #tosca = ToscaTemplate( TOSCADIR+TOSCA_DEFAULT_FNAME )
261       tpl = yaml.load(file(TOSCADIR+TOSCA_DEFAULT_FNAME,'r'))
262     except:
263       logging.error('Tosca Parser error: %s', sys.exc_info()[0])
264       #tosca file could not be read
265       pub_r = PublishResponseMessage()
266       pub_r.domino_udid = SERVER_UDID
267       pub_r.seq_no = self.seqno
268       pub_r.responseCode = FAILED
269       self.seqno = self.seqno + 1 
270       return pub_r 
271
272     # Extract Labels
273     node_labels = label.extract_labels( tpl )
274     logging.debug('Node Labels: %s', node_labels)
275
276     # Map nodes in the template to resource domains
277     site_map = label.map_nodes( self.dominoServer.subscribed_labels , node_labels )
278     logging.debug('Site Maps: %s' , site_map)
279
280     # Select a site for each VNF
281     node_site = label.select_site( site_map ) 
282     logging.debug('Selected Sites: %s', node_site)
283
284     # Create per-domain Tosca files
285     file_paths = partitioner.partition_tosca('./toscafiles/template',node_site,tpl)
286     logging.debug('Per domain file paths: %s', file_paths)
287  
288     # Create work-flow
289
290     # Create response with response code as SUCCESS by default
291     # Response code will be overwrittent if partial or full failure occurs
292     pub_r = PublishResponseMessage()
293     pub_r.domino_udid = SERVER_UDID
294     pub_r.seq_no = self.seqno
295     pub_r.responseCode = SUCCESS
296     self.seqno = self.seqno + 1
297
298     # Send domain templates to each domain agent/client 
299     # FOR NOW: send untranslated but partitioned tosca files to scheduled sites
300     # TBD: read from work-flow
301     for site in file_paths:
302       domino_client_ip = self.dominoServer.registration_record[site].ipaddr
303       domino_client_port = self.dominoServer.registration_record[site].tcpport
304       try:
305         if 'hot' in self.dominoServer.subscribed_templateformats[site]:
306           tosca = ToscaTemplate(file_paths[site])
307           translator = TOSCATranslator(tosca, {}, False)
308           output = translator.translate()
309           logging.debug('HOT translation: \n %s', output)
310           template_lines = [ output ]
311         else: 
312           template_lines = miscutil.read_templatefile(file_paths[site]) 
313         self.push_template(template_lines, domino_client_ip, domino_client_port)
314       except IOError as e:
315         logging.error('I/O error(%d): %s' , e.errno, e.strerror)
316         pub_r.responseCode = FAILED
317       except:
318         logging.error('Error: %s', sys.exc_info()[0])
319         pub_r.responseCode = FAILED
320
321     # Check if any file is generated for distribution, if not
322     # return FAILED as responseCode, we should also send description for
323     # reason
324     if len(file_paths) == 0:
325       pub_r.responseCode = FAILED
326
327     return pub_r
328     
329   #Query from Domino Client is received
330   #Actions:
331   #
332   #       - Respond Back with Query Response
333   def d_query(self, qu_msg):
334     #Fill in the details
335     qu_r = QueryResponseMessage()
336
337     return qu_r
338
339
340 class DominoServer:
341    def __init__(self):
342      self.assignedUUIDs = list()
343      self.subscribed_labels = dict()
344      self.subscribed_templateformats = dict()
345      self.registration_record = dict() 
346      self.communicationHandler = CommunicationHandler(self)
347      self.processor = Communication.Processor(self.communicationHandler)
348      self.transport = TSocket.TServerSocket(port=DOMINO_SERVER_PORT)
349      self.tfactory = TTransport.TBufferedTransportFactory()
350      self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
351      #Use TThreadedServer or TThreadPoolServer for a multithreaded server
352      #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
353      self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
354
355
356    def start_communicationService(self):
357      self.communicationServer.serve()
358
359    #For now assign the desired UDID
360    #To be implemented:
361    #Check if ID is already assigned and in use
362    #If not assigned, assign it
363    #If assigned, offer a new random id
364    def assign_udid(self, udid_desired):
365      if udid_desired in self.assignedUUIDs:
366        new_udid = random.getrandbits(63)
367        while new_udid in self.assignedUUIDs:
368          new_udid = random.getrandbits(63)
369  
370        self.assignedUUIDs.append(new_udid)
371        return new_udid
372      else:
373        self.assignedUUIDs.append(udid_desired)
374        return udid_desired
375      
376    def handle_RPC_timeout(self, RPCmessage):
377      if RPCmessage.messageType == PUSH:
378       logging.debug('RPC Timeout for message type: PUSH')
379       # TBD: handle each RPC timeout separately
380
381 def main(argv):
382   server = DominoServer()
383   loglevel = LOGLEVEL
384   #process input arguments
385   try:
386       opts, args = getopt.getopt(argv,"hc:l:",["conf=","log="])
387   except getopt.GetoptError:
388       print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>'
389       sys.exit(2)
390   for opt, arg in opts:
391       if opt == '-h':
392          print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>'
393          sys.exit()
394       elif opt in ("-c", "--conf"):
395          configfile = arg
396       elif opt in ("-l", "--log"):
397          loglevel= arg
398   #Set logging level
399   numeric_level = getattr(logging, loglevel.upper(), None)
400   try:
401     if not isinstance(numeric_level, int):
402       raise ValueError('Invalid log level: %s' % loglevel)
403     logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
404   except ValueError, ex:
405     print ex.message
406     sys.exit(2)
407
408   #start the database with schemas
409   dbconn = sqlite3.connect(SERVER_DBFILE)
410   c = dbconn.cursor()
411   try:
412     c.execute('''CREATE TABLE labels (udid INTEGER PRIMARY KEY, label_list TEXT)''')
413   except sqlite3.OperationalError as ex:
414     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
415
416   try:
417     c.execute('''CREATE TABLE ttypes (udid INTEGER PRIMARY KEY, ttype_list TEXT)''')
418   except sqlite3.OperationalError as ex:
419     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
420
421   try:
422     c.execute('''CREATE TABLE clients (udid INTEGER PRIMARY KEY, ipaddr TEXT, tcpport INTEGER, templatetypes TEXT, seqno INTEGER)''')
423   except sqlite3.OperationalError as ex:
424     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
425
426   dbconn.commit()
427   dbconn.close()
428
429   logging.debug('Domino Server Starting...')
430   server.start_communicationService()
431   print 'done.'
432
433 if __name__ == "__main__":
434    main(sys.argv[1:])