Fixed loglevel reference error and modified README.md
[domino.git] / DominoServer.py
1 #!/usr/bin/env python
2
3 #Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #       http://www.apache.org/licenses/LICENSE-2.0
8 #   Unless required by applicable law or agreed to in writing, software
9 #   distributed under the License is distributed on an "AS IS" BASIS,
10 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 #   See the License for the specific language governing permissions and
12 #   limitations under the License.
13
14 import sys, os, glob, random, errno
15 import getopt, socket
16 import logging, json
17 #sys.path.append('gen-py')
18 #sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
19 sys.path.insert(0, glob.glob('./lib')[0])
20
21
22 from dominoRPC import Communication
23 from dominoRPC.ttypes import *
24 from dominoRPC.constants import *
25
26 from thrift import Thrift
27 from thrift.transport import TSocket
28 from thrift.transport import TTransport
29 from thrift.protocol import TBinaryProtocol
30 from thrift.server import TServer
31
32 from toscaparser.tosca_template import ToscaTemplate
33 #from toscaparser.utils.gettextutils import _
34 #import toscaparser.utils.urlutils
35
36 from mapper import *
37 from partitioner import *
38 from util import miscutil
39
40 #Load configuration parameters
41 from domino_conf import *
42
43
44 class CommunicationHandler:
45   def __init__(self):
46     self.log = {}
47
48   def __init__(self, dominoserver):
49     self.log = {}
50     self.dominoServer = dominoserver
51     self.seqno = 0;
52    
53   def openconnection(self, ipaddr, tcpport):
54     try:
55       # Make socket
56       transport = TSocket.TSocket(ipaddr, tcpport)
57       transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
58       # Add buffering to compensate for slow raw sockets
59       self.transport = TTransport.TBufferedTransport(transport)
60       # Wrap in a protocol
61       self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
62       # Create a client to use the protocol encoder
63       self.sender = Communication.Client(self.protocol)
64       self.transport.open()
65     except Thrift.TException, tx:
66       logging.error('%s' , tx.message) 
67
68
69
70   def closeconnection(self):
71     self.transport.close()
72
73   def push_template(self,template,ipaddr,tcpport):
74     self.openconnection(ipaddr,tcpport)
75     pushm = PushMessage()
76     pushm.domino_udid = SERVER_UDID 
77     pushm.seq_no = self.seqno
78     pushm.template_type = 'tosca-nfv-v1.0'
79     pushm.template = template
80     try:
81       push_r = self.sender.d_push(pushm)  
82       logging.info('Push Response received from %d' , push_r.domino_udid)
83     except (Thrift.TException, TSocket.TTransportException) as tx:
84       logging.error('%s' , tx.message)
85     except (socket.timeout) as tx:
86       self.dominoServer.handle_RPC_timeout(pushm)
87     except:       
88       logging.error('Unexpected error: %s', sys.exc_info()[0])
89
90     self.seqno = self.seqno + 1
91
92     self.closeconnection()
93  
94   #Heartbeat from Domino Client is received
95   #Actions:
96   #     - Respond Back with a heartbeat
97
98   def d_heartbeat(self, hb_msg):
99     global SERVER_UDID
100     logging.info('heartbeat received from %d' , hb_msg.domino_udid)
101
102     hb_r = HeartBeatMessage()
103     hb_r.domino_udid = SERVER_UDID
104     hb_r.seq_no = self.seqno
105
106     self.seqno = self.seqno + 1 
107
108     return hb_r
109
110   #Registration from Domino Client is received
111   #Actions:
112   #
113   #       - Respond Back with Registration Response
114   def d_register(self, reg_msg):
115     global SERVER_UDID
116
117     #Prepare and send Registration Response
118     reg_r = RegisterResponseMessage()
119     logging.info('Registration Request received for UDID %d from IP: %s port: %d', reg_msg.domino_udid_desired, reg_msg.ipaddr, reg_msg.tcpport)
120
121    
122     reg_r.domino_udid_assigned = self.dominoServer.assign_udid(reg_msg.domino_udid_desired)
123     reg_r.seq_no = self.seqno
124     reg_r.domino_udid = SERVER_UDID
125     #return unconditional success 
126     #To be implemented:
127     #Define conditions for unsuccessful registration (e.g., unsupported mapping)
128     reg_r.responseCode = SUCCESS 
129     #no need to send comments
130     #To be implemented:
131     #Logic for a new UDID assignment
132  
133     self.seqno = self.seqno + 1
134
135     # Store the Domino Client info
136     # TBD: check the sequence number to ensure the most recent record is saved
137     self.dominoServer.registration_record[reg_r.domino_udid_assigned] = reg_msg 
138     data = {}
139     data[reg_r.domino_udid_assigned] = [reg_msg.ipaddr, reg_msg.tcpport, reg_msg.supported_templates, reg_msg.seq_no]
140     with open(SERVER_DBFILE, 'a') as f:
141       json.dump(data, f)
142       f.close()
143
144     return reg_r
145
146
147   #Subscription from Domino Client is received
148   #Actions:
149   #       - Save the templates  & labels
150   #       - Respond Back with Subscription Response
151   def d_subscribe(self, sub_msg):
152     global SERVER_UDID, SERVER_SEQNO
153     logging.info('Subscribe Request received from %d' , sub_msg.domino_udid)
154
155     if sub_msg.template_op == APPEND:
156       if self.dominoServer.subscribed_templateformats.has_key(sub_msg.domino_udid):
157         self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].update(set(sub_msg.supported_template_types))
158       else:
159         self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
160     elif sub_msg.template_op == OVERWRITE:
161       self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
162     elif sub_msg.template_op == DELETE:
163       self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].difference_update(set(sub_msg.supported_template_types))
164
165     if sub_msg.labels != []:
166       if sub_msg.label_op == APPEND:
167         if self.dominoServer.subscribed_labels.has_key(sub_msg.domino_udid):
168           self.dominoServer.subscribed_labels[sub_msg.domino_udid].update(set(sub_msg.labels))
169         else:
170           self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
171       elif sub_msg.label_op == OVERWRITE:
172         self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
173       elif sub_msg.label_op == DELETE:
174         self.dominoServer.subscribed_labels[sub_msg.domino_udid].difference_update(set(sub_msg.labels))
175
176     logging.debug('Supported Template: %s Supported Labels: %s' , self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] , self.dominoServer.subscribed_labels[sub_msg.domino_udid])
177  
178     #Fill in the details
179     sub_r = SubscribeResponseMessage()
180     sub_r.domino_udid = SERVER_UDID
181     sub_r.seq_no = self.seqno
182     sub_r.responseCode = SUCCESS
183     self.seqno = self.seqno + 1
184
185     return sub_r
186
187   #Template Publication from Domino Client is received
188   #Actions:
189   #       - Parse the template, perform mapping, partition the template
190   #       - Launch Push service
191   #       - Respond Back with Publication Response
192   def d_publish(self, pub_msg):
193     global SERVER_UDID, SERVER_SEQNO, TOSCADIR, TOSCA_DEFAULT_FNAME
194     logging.info('Publish Request received from %d' , pub_msg.domino_udid)
195     logging.debug(pub_msg.template)
196
197     # Save as file
198     try:
199       os.makedirs(TOSCADIR)
200     except OSError as exception:
201       if exception.errno == errno.EEXIST:
202         logging.debug('ERRNO %d; %s exists. Creating: %s', exception.errno, TOSCADIR,  TOSCADIR+TOSCA_DEFAULT_FNAME)
203       else:
204         logging.error('Error occurred in creating %s. Err no: %d', exception.errno)
205
206     #Risking a race condition if another process is attempting to write to same file
207     f = open(TOSCADIR+TOSCA_DEFAULT_FNAME, 'w')  
208     for item in pub_msg.template:
209       print>>f, item
210     f.close()
211
212     # Load tosca object from file into memory
213     tosca = ToscaTemplate( TOSCADIR+TOSCA_DEFAULT_FNAME )
214     
215     # Extract Labels
216     node_labels = label.extract_labels( tosca )
217     logging.debug('Node Labels: %s', node_labels)
218
219     # Map nodes in the template to resource domains
220     site_map = label.map_nodes( self.dominoServer.subscribed_labels , node_labels )
221     logging.debug('Site Maps: %s' , site_map)
222
223     # Select a site for each VNF
224     node_site = label.select_site( site_map ) 
225     logging.debug('Selected Sites: %s', node_site)
226
227     # Create per-domain Tosca files
228     file_paths = partitioner.partition_tosca('./toscafiles/template1.yaml',node_site,tosca.tpl)
229     
230     # Create list of translated template files
231
232     # Create work-flow
233
234     # Send domain templates to each domain agent/client 
235     # FOR NOW: send untranslated but partitioned tosca files to scheduled sites
236     # TBD: read from work-flow
237     for site in file_paths:
238       domino_client_ip = self.dominoServer.registration_record[site].ipaddr
239       domino_client_port = self.dominoServer.registration_record[site].tcpport
240       self.push_template(miscutil.read_templatefile(file_paths[site]), domino_client_ip, domino_client_port)
241
242     #Fill in the details
243     pub_r = PublishResponseMessage()
244     pub_r.domino_udid = SERVER_UDID
245     pub_r.seq_no = self.seqno
246     pub_r.responseCode = SUCCESS
247     self.seqno = self.seqno + 1 
248     return pub_r
249     
250   #Query from Domino Client is received
251   #Actions:
252   #
253   #       - Respond Back with Query Response
254   def d_query(self, qu_msg):
255     #Fill in the details
256     qu_r = QueryResponseMessage()
257
258     return qu_r
259
260
261 class DominoServer:
262    def __init__(self):
263      self.assignedUUIDs = list()
264      self.subscribed_labels = dict()
265      self.subscribed_templateformats = dict()
266      self.registration_record = dict() 
267      self.communicationHandler = CommunicationHandler(self)
268      self.processor = Communication.Processor(self.communicationHandler)
269      self.transport = TSocket.TServerSocket(port=DOMINO_SERVER_PORT)
270      self.tfactory = TTransport.TBufferedTransportFactory()
271      self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
272      #Use TThreadedServer or TThreadPoolServer for a multithreaded server
273      #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
274      self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
275
276
277    def start_communicationService(self):
278      self.communicationServer.serve()
279
280    #For now assign the desired UDID
281    #To be implemented:
282    #Check if ID is already assigned and in use
283    #If not assigned, assign it
284    #If assigned, offer a new random id
285    def assign_udid(self, udid_desired):
286      if udid_desired in self.assignedUUIDs:
287        new_udid = random.getrandbits(63)
288        while new_udid in self.assignedUUIDs:
289          new_udid = random.getrandbits(63)
290  
291        self.assignedUUIDs.append(new_udid)
292        return new_udid
293      else:
294        self.assignedUUIDs.append(udid_desired)
295        return udid_desired
296      
297    def handle_RPC_timeout(self, RPCmessage):
298      if RPCmessage.messageType == PUSH:
299       logging.debug('RPC Timeout for message type: PUSH')
300       # TBD: handle each RPC timeout separately
301
302 def main(argv):
303   server = DominoServer()
304   loglevel = 'WARNING'
305   #process input arguments
306   try:
307       opts, args = getopt.getopt(argv,"hc:l:",["conf=","log="])
308   except getopt.GetoptError:
309       print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>'
310       sys.exit(2)
311   for opt, arg in opts:
312       if opt == '-h':
313          print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel>'
314          sys.exit()
315       elif opt in ("-c", "--conf"):
316          configfile = arg
317       elif opt in ("--log"):
318          loglevel= arg
319   #Set logging level
320   numeric_level = getattr(logging, loglevel.upper(), None)
321   try:
322     if not isinstance(numeric_level, int):
323       raise ValueError('Invalid log level: %s' % loglevel)
324     logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
325   except ValueError, ex:
326     print ex.message
327     sys.exit(2)
328
329   logging.debug('Domino Server Starting...')
330   server.start_communicationService()
331   print 'done.'
332
333 if __name__ == "__main__":
334    main(sys.argv[1:])