files pushed to the client are written to files under directories named after udid
[domino.git] / DominoServer.py
1 #!/usr/bin/env python
2
3 #Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #       http://www.apache.org/licenses/LICENSE-2.0
8 #   Unless required by applicable law or agreed to in writing, software
9 #   distributed under the License is distributed on an "AS IS" BASIS,
10 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 #   See the License for the specific language governing permissions and
12 #   limitations under the License.
13
14 import sys, os, glob, random, errno
15 import getopt, socket
16 import logging, json
17 import sqlite3
18 #sys.path.append('gen-py')
19 #sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
20 sys.path.insert(0, glob.glob('./lib')[0])
21
22
23 from dominoRPC import Communication
24 from dominoRPC.ttypes import *
25 from dominoRPC.constants import *
26
27 from thrift import Thrift
28 from thrift.transport import TSocket
29 from thrift.transport import TTransport
30 from thrift.protocol import TBinaryProtocol
31 from thrift.server import TServer
32
33 from toscaparser.tosca_template import ToscaTemplate
34 #from toscaparser.utils.gettextutils import _
35 #import toscaparser.utils.urlutils
36
37 from mapper import *
38 from partitioner import *
39 from util import miscutil
40
41 #Load configuration parameters
42 from domino_conf import *
43
44
45 class CommunicationHandler:
46   def __init__(self):
47     self.log = {}
48
49   def __init__(self, dominoserver):
50     self.log = {}
51     self.dominoServer = dominoserver
52     self.seqno = SERVER_SEQNO;
53    
54   def openconnection(self, ipaddr, tcpport):
55     try:
56       # Make socket
57       transport = TSocket.TSocket(ipaddr, tcpport)
58       transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
59       # Add buffering to compensate for slow raw sockets
60       self.transport = TTransport.TBufferedTransport(transport)
61       # Wrap in a protocol
62       self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
63       # Create a client to use the protocol encoder
64       self.sender = Communication.Client(self.protocol)
65       self.transport.open()
66     except Thrift.TException, tx:
67       logging.error('%s' , tx.message) 
68
69
70
71   def closeconnection(self):
72     self.transport.close()
73
74   def push_template(self,template,ipaddr,tcpport):
75     self.openconnection(ipaddr,tcpport)
76     pushm = PushMessage()
77     pushm.domino_udid = SERVER_UDID 
78     pushm.seq_no = self.seqno
79     pushm.template_type = 'tosca-nfv-v1.0'
80     pushm.template = template
81     try:
82       push_r = self.sender.d_push(pushm)  
83       logging.info('Push Response received from %d' , push_r.domino_udid)
84     except (Thrift.TException, TSocket.TTransportException) as tx:
85       logging.error('%s' , tx.message)
86     except (socket.timeout) as tx:
87       self.dominoServer.handle_RPC_timeout(pushm)
88     except:       
89       logging.error('Unexpected error: %s', sys.exc_info()[0])
90
91     self.seqno = self.seqno + 1
92
93     self.closeconnection()
94  
95   #Heartbeat from Domino Client is received
96   #Actions:
97   #     - Respond Back with a heartbeat
98
99   def d_heartbeat(self, hb_msg):
100     logging.info('heartbeat received from %d' , hb_msg.domino_udid)
101
102     hb_r = HeartBeatMessage()
103     hb_r.domino_udid = SERVER_UDID
104     hb_r.seq_no = self.seqno
105
106     self.seqno = self.seqno + 1 
107
108     return hb_r
109
110   #Registration from Domino Client is received
111   #Actions:
112   #
113   #       - Respond Back with Registration Response
114   def d_register(self, reg_msg):
115
116     #Prepare and send Registration Response
117     reg_r = RegisterResponseMessage()
118     logging.info('Registration Request received for UDID %d from IP: %s port: %d', reg_msg.domino_udid_desired, reg_msg.ipaddr, reg_msg.tcpport)
119
120    
121     reg_r.domino_udid_assigned = self.dominoServer.assign_udid(reg_msg.domino_udid_desired)
122     reg_r.seq_no = self.seqno
123     reg_r.domino_udid = SERVER_UDID
124     #return unconditional success 
125     #To be implemented:
126     #Define conditions for unsuccessful registration (e.g., unsupported mapping)
127     reg_r.responseCode = SUCCESS 
128     #no need to send comments
129     #To be implemented:
130     #Logic for a new UDID assignment
131  
132     self.seqno = self.seqno + 1
133
134     self.dominoServer.registration_record[reg_r.domino_udid_assigned] = reg_msg    
135
136     #commit to the database
137     dbconn = sqlite3.connect(SERVER_DBFILE)
138     c = dbconn.cursor()
139     try:
140       newrow = [(reg_r.domino_udid_assigned, reg_msg.ipaddr, reg_msg.tcpport, ','.join(reg_msg.supported_templates), reg_msg.seq_no),]
141       c.executemany('INSERT INTO clients VALUES (?,?,?,?,?)',newrow)
142     except sqlite3.OperationalError as ex:
143       logging.error('Could not add the new registration record into %s for Domino Client %d :  %s', SERVER_DBFILE, reg_r.domino_udid_assigned, ex.message)
144     except:
145       logging.error('Could not add the new registration record into %s for Domino Client %d', SERVER_DBFILE, reg_r.domino_udid_assigned)
146       logging.error('Unexpected error: %s', sys.exc_info()[0])
147  
148     dbconn.commit()
149     dbconn.close()
150
151     return reg_r
152
153
154   #Subscription from Domino Client is received
155   #Actions:
156   #       - Save the templates  & labels
157   #       - Respond Back with Subscription Response
158   def d_subscribe(self, sub_msg):
159     logging.info('Subscribe Request received from %d' , sub_msg.domino_udid)
160
161     if sub_msg.template_op == APPEND:
162       if self.dominoServer.subscribed_templateformats.has_key(sub_msg.domino_udid):
163         self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].update(set(sub_msg.supported_template_types))
164       else:
165         self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
166     elif sub_msg.template_op == OVERWRITE:
167       self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
168     elif sub_msg.template_op == DELETE:
169       self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].difference_update(set(sub_msg.supported_template_types))
170
171 #    if sub_msg.labels != []:
172     if sub_msg.label_op == APPEND:
173       logging.debug('APPENDING Labels...')
174       if self.dominoServer.subscribed_labels.has_key(sub_msg.domino_udid):
175         self.dominoServer.subscribed_labels[sub_msg.domino_udid].update(set(sub_msg.labels))
176       else:
177         self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
178     elif sub_msg.label_op == OVERWRITE:
179       logging.debug('OVERWRITING Labels...')
180       self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
181     elif sub_msg.label_op == DELETE:
182       logging.debug('DELETING Labels...')
183       self.dominoServer.subscribed_labels[sub_msg.domino_udid].difference_update(set(sub_msg.labels))
184
185     logging.debug('Supported Template: %s Supported Labels: %s' , self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] , self.dominoServer.subscribed_labels[sub_msg.domino_udid])
186
187     #commit to the database
188     dbconn = sqlite3.connect(SERVER_DBFILE)
189     c = dbconn.cursor()
190     newlabelset = self.dominoServer.subscribed_labels[sub_msg.domino_udid]
191     try:
192       c.execute("REPLACE INTO labels (udid, label_list) VALUES ({udid}, '{newvalue}')".\
193                format(udid=sub_msg.domino_udid, newvalue=','.join(list(newlabelset)) ))
194     except sqlite3.OperationalError as ex1:
195       logging.error('Could not add the new labels to %s for Domino Client %d :  %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
196     except:
197       logging.error('Could not add the new labels to %s for Domino Client %d', SERVER_DBFILE, sub_msg.domino_udid)
198       logging.error('Unexpected error: %s', sys.exc_info()[0])
199
200     newttypeset = self.dominoServer.subscribed_templateformats[sub_msg.domino_udid]
201     try:
202       c.execute("REPLACE INTO ttypes (udid, ttype_list) VALUES ({udid}, '{newvalue}')".\
203                format(udid=sub_msg.domino_udid, newvalue=','.join(list(newttypeset)) ))
204     except sqlite3.OperationalError as ex1:
205       logging.error('Could not add the new labels to %s for Domino Client %d :  %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
206     except:
207       logging.error('Could not add the new labels to %s for Domino Client %d', SERVER_DBFILE, sub_msg.domino_udid)
208       logging.error('Unexpected error: %s', sys.exc_info()[0])
209
210
211     dbconn.commit()
212     dbconn.close()
213
214  
215     #Fill in the details
216     sub_r = SubscribeResponseMessage()
217     sub_r.domino_udid = SERVER_UDID
218     sub_r.seq_no = self.seqno
219     sub_r.responseCode = SUCCESS
220     self.seqno = self.seqno + 1
221
222     return sub_r
223
224   #Template Publication from Domino Client is received
225   #Actions:
226   #       - Parse the template, perform mapping, partition the template
227   #       - Launch Push service
228   #       - Respond Back with Publication Response
229   def d_publish(self, pub_msg):
230     logging.info('Publish Request received from %d' , pub_msg.domino_udid)
231     logging.debug(pub_msg.template)
232
233     # Save as file
234     try:
235       os.makedirs(TOSCADIR)
236     except OSError as exception:
237       if exception.errno == errno.EEXIST:
238         logging.debug('ERRNO %d; %s exists. Creating: %s', exception.errno, TOSCADIR,  TOSCADIR+TOSCA_DEFAULT_FNAME)
239       else:
240         logging.error('IGNORING error occurred in creating %s. Err no: %d', exception.errno)
241
242     #Risking a race condition if another process is attempting to write to same file
243     try:
244       miscutil.write_templatefile(TOSCADIR+TOSCA_DEFAULT_FNAME , pub_msg.template)
245     except:
246       #Some sort of race condition should have occured that prevented the write operation
247       #treat as failure
248       logging.error('FAILED to write the published file: %s', sys.exc_info()[0])
249       pub_r = PublishResponseMessage()
250       pub_r.domino_udid = SERVER_UDID
251       pub_r.seq_no = self.seqno
252       pub_r.responseCode = FAILED
253       self.seqno = self.seqno + 1
254       return pub_r
255
256     # Load tosca object from file into memory
257     try:
258       tosca = ToscaTemplate( TOSCADIR+TOSCA_DEFAULT_FNAME )
259     except:
260       logging.error('Tosca Parser error: %s', sys.exc_info()[0])
261       #tosca file could not be read
262       pub_r = PublishResponseMessage()
263       pub_r.domino_udid = SERVER_UDID
264       pub_r.seq_no = self.seqno
265       pub_r.responseCode = FAILED
266       self.seqno = self.seqno + 1 
267       return pub_r 
268
269     # Extract Labels
270     node_labels = label.extract_labels( tosca )
271     logging.debug('Node Labels: %s', node_labels)
272
273     # Map nodes in the template to resource domains
274     site_map = label.map_nodes( self.dominoServer.subscribed_labels , node_labels )
275     logging.debug('Site Maps: %s' , site_map)
276
277     # Select a site for each VNF
278     node_site = label.select_site( site_map ) 
279     logging.debug('Selected Sites: %s', node_site)
280
281     # Create per-domain Tosca files
282     file_paths = partitioner.partition_tosca('./toscafiles/template1.yaml',node_site,tosca.tpl)
283     logging.debug('Per domain file paths: %s', file_paths)
284  
285     # Create list of translated template files
286
287     # Create work-flow
288
289     # Send domain templates to each domain agent/client 
290     # FOR NOW: send untranslated but partitioned tosca files to scheduled sites
291     # TBD: read from work-flow
292     for site in file_paths:
293       domino_client_ip = self.dominoServer.registration_record[site].ipaddr
294       domino_client_port = self.dominoServer.registration_record[site].tcpport
295       try:
296         template_lines = miscutil.read_templatefile(file_paths[site]) 
297         self.push_template(template_lines, domino_client_ip, domino_client_port)
298       except IOError as e:
299         logging.error('I/O error(%d): %s' , e.errno, e.strerror)
300
301     #Fill in the details
302     pub_r = PublishResponseMessage()
303     pub_r.domino_udid = SERVER_UDID
304     pub_r.seq_no = self.seqno
305     pub_r.responseCode = SUCCESS
306     self.seqno = self.seqno + 1 
307     return pub_r
308     
309   #Query from Domino Client is received
310   #Actions:
311   #
312   #       - Respond Back with Query Response
313   def d_query(self, qu_msg):
314     #Fill in the details
315     qu_r = QueryResponseMessage()
316
317     return qu_r
318
319
320 class DominoServer:
321    def __init__(self):
322      self.assignedUUIDs = list()
323      self.subscribed_labels = dict()
324      self.subscribed_templateformats = dict()
325      self.registration_record = dict() 
326      self.communicationHandler = CommunicationHandler(self)
327      self.processor = Communication.Processor(self.communicationHandler)
328      self.transport = TSocket.TServerSocket(port=DOMINO_SERVER_PORT)
329      self.tfactory = TTransport.TBufferedTransportFactory()
330      self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
331      #Use TThreadedServer or TThreadPoolServer for a multithreaded server
332      #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
333      self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
334
335
336    def start_communicationService(self):
337      self.communicationServer.serve()
338
339    #For now assign the desired UDID
340    #To be implemented:
341    #Check if ID is already assigned and in use
342    #If not assigned, assign it
343    #If assigned, offer a new random id
344    def assign_udid(self, udid_desired):
345      if udid_desired in self.assignedUUIDs:
346        new_udid = random.getrandbits(63)
347        while new_udid in self.assignedUUIDs:
348          new_udid = random.getrandbits(63)
349  
350        self.assignedUUIDs.append(new_udid)
351        return new_udid
352      else:
353        self.assignedUUIDs.append(udid_desired)
354        return udid_desired
355      
356    def handle_RPC_timeout(self, RPCmessage):
357      if RPCmessage.messageType == PUSH:
358       logging.debug('RPC Timeout for message type: PUSH')
359       # TBD: handle each RPC timeout separately
360
361 def main(argv):
362   server = DominoServer()
363   loglevel = LOGLEVEL
364   #process input arguments
365   try:
366       opts, args = getopt.getopt(argv,"hc:l:",["conf=","log="])
367   except getopt.GetoptError:
368       print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>'
369       sys.exit(2)
370   for opt, arg in opts:
371       if opt == '-h':
372          print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel>'
373          sys.exit()
374       elif opt in ("-c", "--conf"):
375          configfile = arg
376       elif opt in ("-l", "--log"):
377          loglevel= arg
378   #Set logging level
379   numeric_level = getattr(logging, loglevel.upper(), None)
380   try:
381     if not isinstance(numeric_level, int):
382       raise ValueError('Invalid log level: %s' % loglevel)
383     logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
384   except ValueError, ex:
385     print ex.message
386     sys.exit(2)
387
388   #start the database with schemas
389   dbconn = sqlite3.connect(SERVER_DBFILE)
390   c = dbconn.cursor()
391   try:
392     c.execute('''CREATE TABLE labels (udid INTEGER PRIMARY KEY, label_list TEXT)''')
393   except sqlite3.OperationalError as ex:
394     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
395
396   try:
397     c.execute('''CREATE TABLE ttypes (udid INTEGER PRIMARY KEY, ttype_list TEXT)''')
398   except sqlite3.OperationalError as ex:
399     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
400
401   try:
402     c.execute('''CREATE TABLE clients (udid INTEGER PRIMARY KEY, ipaddr TEXT, tcpport INTEGER, templatetypes TEXT, seqno INTEGER)''')
403   except sqlite3.OperationalError as ex:
404     logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
405
406   dbconn.commit()
407   dbconn.close()
408
409   logging.debug('Domino Server Starting...')
410   server.start_communicationService()
411   print 'done.'
412
413 if __name__ == "__main__":
414    main(sys.argv[1:])