3 #Copyright 2016 Open Platform for NFV Project, Inc. and its contributors
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
15 import sys, os, glob, threading
20 #sys.path.append('gen-py')
21 #sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
22 sys.path.insert(0, glob.glob('./lib')[0])
24 from dominoRPC import Communication
25 from dominoRPC.ttypes import *
26 from dominoRPC.constants import *
28 from dominoCLI import DominoClientCLI
29 from dominoCLI.ttypes import *
30 from dominoCLI.constants import *
32 from thrift import Thrift
33 from thrift.transport import TSocket
34 from thrift.transport import TTransport
35 from thrift.protocol import TBinaryProtocol
36 from thrift.server import TServer
40 #Load configuration parameters
41 from domino_conf import *
43 class CommunicationHandler:
47 def __init__(self, dominoclient):
49 self.dominoClient = dominoclient
54 # Template Push from Domino Server is received
56 # - Depending on Controller Domain, call API
57 # - Respond Back with Push Response
58 def d_push(self, push_msg):
59 logging.info('%s Received Template File', self.dominoClient.UDID)
60 # Retrieve the template file
62 os.makedirs(TOSCA_RX_DIR+str(self.dominoClient.UDID))
63 except OSError as exception:
64 if exception.errno == errno.EEXIST:
65 logging.debug('IGNORING error: ERRNO %d; %s exists.', exception.errno, TOSCA_RX_DIR+str(self.dominoClient.UDID))
67 logging.error('IGNORING error in creating %s. Err no: %d', exception.errno)
70 miscutil.write_templatefile(TOSCA_RX_DIR+str(self.dominoClient.UDID)+'/'+str(push_msg.template_UUID)+'.yaml' , push_msg.template)
72 logging.error('FAILED to write the pushed file: %s', sys.exc_info()[0])
73 push_r = PushResponseMessage()
74 # Fill response message fields
75 push_r.domino_udid = self.dominoClient.UDID
76 push_r.seq_no = self.dominoClient.seqno
77 push_r.responseCode = FAILED
78 self.dominoClient.seqno = self.dominoClient.seqno + 1
79 return push_r# Any inspection code goes here
84 # If heat client, call heat command
86 # If ONOS client, run as shell script
91 # Marshall the response message for the Domino Server Fill
92 push_r = PushResponseMessage()
93 # Fill response message fields
94 push_r.domino_udid = self.dominoClient.UDID
95 push_r.seq_no = self.dominoClient.seqno
96 push_r.responseCode = SUCCESS
99 self.dominoClient.seqno = self.dominoClient.seqno + 1
104 def openconnection(self):
107 transport = TSocket.TSocket(self.dominoClient.dominoserver_IP, DOMINO_SERVER_PORT)
108 transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
109 # Add buffering to compensate for slow raw sockets
110 self.transport = TTransport.TBufferedTransport(transport)
112 self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
113 # Create a client to use the protocol encoder
114 self.sender = Communication.Client(self.protocol)
115 self.transport.open()
116 except Thrift.TException, tx:
117 logging.error('%s' , tx.message)
119 def closeconnection():
120 self.transport.close()
126 def __init__(self, dominoclient, CLIservice):
128 self.dominoClient = dominoclient
129 self.CLIservice = CLIservice
131 def d_CLI(self, msg):
132 logging.info('Received CLI %s', msg.CLI_input)
134 CLIrespmsg = CLIResponse()
135 CLIrespmsg.CLI_response = self.CLIservice.process_input(msg.CLI_input)
139 class DominoClientCLIService(threading.Thread):
140 def __init__(self, dominoclient, communicationhandler, interactive):
141 threading.Thread.__init__(self)
142 self.dominoclient = dominoclient
143 self.communicationhandler = communicationhandler
144 self.interactive = interactive
146 def process_input(self, args):
148 return 'Empty API body'
151 if args[0] == 'heartbeat':
152 self.dominoclient.heartbeat()
154 elif args[0] == 'publish':
155 opts, args = getopt.getopt(args[1:],"t:k:",["tosca-file=","tuid"])
157 print '\nUsage: publish -t <toscafile> -k <TUID>'
162 for opt, arg in opts:
163 if opt in ('-t', '--tosca-file'):
165 elif opt in ('-k', '--tuid'):
167 if toscafile is not None:
168 self.dominoclient.publish(toscafile,template_UUID)
170 print '\nUsage: publish -t <toscafile> -k <TUID>'
172 elif args[0] == 'subscribe':
177 opts, args = getopt.getopt(args[1:],"l:t:",["labels=","ttype=","lop=","top="])
178 for opt, arg in opts:
179 if opt in ('-l', '--labels'):
180 labels = labels + arg.split(',')
181 elif opt in ('-t', '--ttype'):
182 templateTypes = templateTypes + arg.split(',')
183 elif opt in ('--lop'):
185 labelop = str2enum[arg.upper()]
186 except KeyError as ex:
187 print '\nInvalid label option, pick one of: APPEND, OVERWRITE, DELETE'
189 elif opt in ('--top'):
191 templateop = str2enum[arg.upper()]
192 except KeyError as ex:
193 print '\nInvalid label option, pick one of: APPEND, OVERWRITE, DELETE'
196 #check if labels or supported templates are nonempty
197 if labels != [] or templateTypes != []:
198 self.dominoclient.subscribe(labels, templateTypes, labelop, templateop)
200 elif args[0] == 'register':
201 self.dominoclient.start()
203 elif args[0] == 'list-tuids':
204 return self.dominoclient.query(['list-tuids'])
207 return 'Command is misentered or not supported!'
209 except getopt.GetoptError:
210 print 'Command is misentered or not supported!'
214 global DEFAULT_TOSCA_PUBFILE
215 if self.interactive == "TRUE":
220 if flag: #interactive CLI, loop in while until killed
222 sys.stdout.write('>>')
223 input_string = raw_input()
224 args = input_string.split()
228 sys.stdout.write('>>')
229 #process input arguments
230 resp_msg = self.process_input(args)
231 if resp_msg is not None:
233 else: #domino cli-client is used, listen for the CLI rpc calls
234 cliHandler = CLIHandler(self.dominoclient, self)
235 processor = DominoClientCLI.Processor(cliHandler)
236 transport = TSocket.TServerSocket(port=self.dominoclient.CLIport)
237 tfactory = TTransport.TBufferedTransportFactory()
238 pfactory = TBinaryProtocol.TBinaryProtocolFactory()
239 #Use TThreadedServer or TThreadPoolServer for a multithreaded server
240 CLIServer = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
241 logging.debug('RPC service for CLI is starting...')
248 self.communicationHandler = CommunicationHandler(self)
249 self.processor = None
250 self.transport = None
253 self.communicationServer = None
255 self.CLIservice = None
257 self.serviceport = DOMINO_CLIENT_PORT
258 self.dominoserver_IP = DOMINO_SERVER_IP
259 self.CLIport = DOMINO_CLI_PORT
261 #Start from UNREGISTERED STATE
262 #TO BE DONE: initialize from a saved state
263 self.state = 'UNREGISTERED'
267 def start_communicationService(self):
268 self.processor = Communication.Processor(self.communicationHandler)
269 self.transport = TSocket.TServerSocket(port=int(self.serviceport))
270 self.tfactory = TTransport.TBufferedTransportFactory()
271 self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
272 #Use TThreadedServer or TThreadPoolServer for a multithreaded server
273 #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
274 self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
276 self.communicationServer.serve()
280 self.communicationHandler.openconnection()
282 except Thrift.TException, tx:
283 print '%s' % (tx.message)
286 if self.state == 'UNREGISTERED':
287 logging.info('%d Sending Registration', self.UDID)
288 #prepare registration message
289 reg_msg = RegisterMessage()
290 reg_msg.domino_udid_desired = UDID_DESIRED
291 reg_msg.seq_no = self.seqno
292 reg_msg.ipaddr = netutil.get_ip()
293 reg_msg.tcpport = self.serviceport
294 reg_msg.supported_templates = LIST_SUPPORTED_TEMPLATES
297 reg_msg_r = self.sender().d_register(reg_msg)
298 logging.info('Registration Response: Response Code: %d' , reg_msg_r.responseCode)
299 if reg_msg_r.comments:
300 logging.debug('Response Comments: %s' , reg_msg_r.comments)
302 if reg_msg_r.responseCode == SUCCESS:
303 self.state = 'REGISTERED'
304 self.UDID = reg_msg_r.domino_udid_assigned
306 #Handle registration failure here (possibly based on reponse comments)
308 except (Thrift.TException, TSocket.TTransportException) as tx:
309 logging.error('%s' , tx.message)
310 except (socket.timeout) as tx:
311 self.handle_RPC_timeout(reg_msg)
312 except (socket.error) as tx:
313 logging.error('%s' , tx.message)
314 self.seqno = self.seqno + 1
317 if self.state == 'UNREGISTERED':
320 logging.info('%s Sending heartbeat', self.UDID)
321 hbm = HeartBeatMessage()
322 hbm.domino_udid = self.UDID
323 hbm.seq_no = self.seqno
326 hbm_r = self.sender().d_heartbeat(hbm)
327 logging.info('heart beat received from: %s ,sequence number: %d' , hbm_r.domino_udid, hbm_r.seq_no)
328 except (Thrift.TException, TSocket.TTransportException) as tx:
329 logging.error('%s' , tx.message)
330 except (socket.timeout) as tx:
331 self.handle_RPC_timeout(hbm)
333 logging.error('Unexpected error: %s', sys.exc_info()[0])
335 self.seqno = self.seqno + 1
337 def publish(self, toscafile, template_UUID=None):
338 if self.state == 'UNREGISTERED':
341 logging.info('Publishing the template file: ' + toscafile)
342 pub_msg = PublishMessage()
343 pub_msg.domino_udid = self.UDID
344 pub_msg.seq_no = self.seqno
345 pub_msg.template_type = 'tosca-nfv-v1.0'
346 if template_UUID is not None:
347 pub_msg.template_UUID = template_UUID
350 pub_msg.template = miscutil.read_templatefile(toscafile)
352 logging.error('I/O error(%d): %s' , e.errno, e.strerror)
355 pub_msg_r = self.sender().d_publish(pub_msg)
356 logging.info('Publish Response is received from: %s ,sequence number: %d Status: %d', pub_msg_r.domino_udid, pub_msg_r.seq_no, pub_msg_r.responseCode)
357 except (Thrift.TException, TSocket.TTransportException) as tx:
358 print '%s' % (tx.message)
359 except (socket.timeout) as tx:
360 self.handle_RPC_timeout(pub_msg)
362 self.seqno = self.seqno + 1
364 def subscribe(self, labels, templateTypes, label_op, template_op):
365 if self.state == 'UNREGISTERED':
368 logging.info('subscribing labels %s and templates %s', labels, templateTypes)
369 #send subscription message
370 sub_msg = SubscribeMessage()
371 sub_msg.domino_udid = self.UDID
372 sub_msg.seq_no = self.seqno
373 sub_msg.template_op = template_op
374 sub_msg.supported_template_types = templateTypes
375 sub_msg.label_op = label_op
376 sub_msg.labels = labels
378 sub_msg_r = self.sender().d_subscribe(sub_msg)
379 logging.info('Subscribe Response is received from: %s ,sequence number: %d', sub_msg_r.domino_udid,sub_msg_r.seq_no)
380 except (Thrift.TException, TSocket.TTransportException) as tx:
381 logging.error('%s' , tx.message)
382 except (socket.timeout) as tx:
383 self.handle_RPC_timeout(sub_msg)
385 self.seqno = self.seqno + 1
387 def query(self, queryString, template_UUID=None):
388 logging.info('querying Domino Server: %s', queryString)
389 query_msg = QueryMessage()
390 query_msg.domino_udid = self.UDID
391 query_msg.seq_no = self.seqno
392 query_msg.queryString = queryString
393 query_msg.template_UUID = template_UUID
394 self.seqno = self.seqno + 1
396 query_msg_r = self.sender().d_query(query_msg)
397 logging.info('Query Response is received from: %s ,sequence number: %d', query_msg_r.domino_udid,query_msg_r.seq_no)
398 if (query_msg_r.queryResponse is not None) and (len(query_msg_r.queryResponse)>0):
399 return query_msg_r.queryResponse
400 except (Thrift.TException, TSocket.TTransportException) as tx:
401 logging.error('%s' , tx.message)
402 except (socket.timeout) as tx:
403 self.handle_RPC_timeout(query_msg)
407 self.communicationHandler.closeconnection()
408 except Thrift.TException, tx:
409 logging.error('%s' , tx.message)
412 return self.communicationHandler.sender
414 def startCLI(self, interactive):
415 self.CLIservice = DominoClientCLIService(self, self.communicationHandler, interactive)
416 logging.info('CLI Service is starting')
417 self.CLIservice.start()
418 #to wait until CLI service is finished
419 #self.CLIservice.join()
421 def set_serviceport(self, port):
422 self.serviceport = port
424 def set_CLIport(self, cliport):
425 self.CLIport = cliport
427 def set_dominoserver_ipaddr(self, ipaddr):
428 self.dominoserver_IP = ipaddr
430 def handle_RPC_timeout(self, RPCmessage):
431 # TBD: handle each RPC timeout separately
432 if RPCmessage.messageType == HEART_BEAT:
433 logging.debug('RPC Timeout for message type: HEART_BEAT')
434 elif RPCmessage.messageType == PUBLISH:
435 logging.debug('RPC Timeout for message type: PUBLISH')
436 elif RPCmessage.messageType == SUBSCRIBE:
437 logging.debug('RPC Timeout for message type: SUBSCRIBE')
438 elif RPCmessage.messageType == REGISTER:
439 logging.debug('RPC Timeout for message type: REGISTER')
440 elif RPCmessage.messageType == QUERY:
441 logging.debug('RPC Timeout for message type: QUERY')
444 client = DominoClient()
446 interactive = INTERACTIVE
447 #process input arguments
449 opts, args = getopt.getopt(argv,"hc:p:i:l:",["conf=","port=","ipaddr=","log=","iac=","cliport=","uuid=","regmod="])
450 except getopt.GetoptError:
451 print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel> --iac=true/false --cliport <cliport>'
453 for opt, arg in opts:
455 print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel> --iac=true/false --cliport <cliport>'
457 elif opt in ("-c", "--conf"):
459 elif opt in ("-p", "--port"):
460 client.set_serviceport(int(arg))
461 elif opt in ("-i", "--ipaddr"):
462 client.set_dominoserver_ipaddr(arg)
463 elif opt in ("-l", "--log"):
465 elif opt in ("--iac"):
466 interactive = arg.upper()
467 elif opt in ("--cliport"):
468 client.set_CLIport(int(arg))
469 elif opt in ("--uuid"):
471 elif opt in ("--regmod"):
472 if arg.upper() == 'REGISTERED':
473 client.state = 'REGISTERED'
475 numeric_level = getattr(logging, loglevel.upper(), None)
477 if not isinstance(numeric_level, int):
478 raise ValueError('Invalid log level: %s' % loglevel)
479 logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
480 except ValueError, ex:
484 #The client is starting
485 logging.debug('Domino Client Starting...')
487 client.startCLI(interactive)
488 client.start_communicationService()
490 if __name__ == "__main__":