3 #Copyright 2016 Open Platform for NFV Project, Inc. and its contributors
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
15 import sys, os, glob, threading
19 #sys.path.append('gen-py')
20 #sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
21 sys.path.insert(0, glob.glob('./lib')[0])
23 from dominoRPC import Communication
24 from dominoRPC.ttypes import *
25 from dominoRPC.constants import *
27 from dominoCLI import DominoClientCLI
28 from dominoCLI.ttypes import *
29 from dominoCLI.constants import *
31 from thrift import Thrift
32 from thrift.transport import TSocket
33 from thrift.transport import TTransport
34 from thrift.protocol import TBinaryProtocol
35 from thrift.server import TServer
39 #Load configuration parameters
40 from domino_conf import *
42 class CommunicationHandler:
46 def __init__(self, dominoclient):
48 self.dominoClient = dominoclient
53 # Template Push from Domino Server is received
55 # - Depending on Controller Domain, call API
56 # - Respond Back with Push Response
57 def d_push(self, push_msg):
58 logging.info('%d Received Template File', self.dominoClient.UDID)
59 # Retrieve the template file
61 os.makedirs(TOSCA_RX_DIR+str(self.dominoClient.UDID))
62 except OSError as exception:
63 if exception.errno == errno.EEXIST:
64 logging.debug('IGNORING error: ERRNO %d; %s exists.', exception.errno, TOSCA_RX_DIR+str(self.dominoClient.UDID))
66 logging.error('IGNORING error in creating %s. Err no: %d', exception.errno)
69 miscutil.write_templatefile(TOSCA_RX_DIR+str(self.dominoClient.UDID)+'/'+str(push_msg.seq_no)+'.yaml' , push_msg.template)
71 logging.error('FAILED to write the pushed file: %s', sys.exc_info()[0])
72 push_r = PushResponseMessage()
73 # Fill response message fields
74 push_r.domino_udid = self.dominoClient.UDID
75 push_r.seq_no = self.dominoClient.seqno
76 push_r.responseCode = FAILED
77 self.dominoClient.seqno = self.dominoClient.seqno + 1
78 return push_r# Any inspection code goes here
83 # If heat client, call heat command
85 # If ONOS client, run as shell script
90 # Marshall the response message for the Domino Server Fill
91 push_r = PushResponseMessage()
92 # Fill response message fields
93 push_r.domino_udid = self.dominoClient.UDID
94 push_r.seq_no = self.dominoClient.seqno
95 push_r.responseCode = SUCCESS
98 self.dominoClient.seqno = self.dominoClient.seqno + 1
103 def openconnection(self):
106 transport = TSocket.TSocket(self.dominoClient.dominoserver_IP, DOMINO_SERVER_PORT)
107 transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
108 # Add buffering to compensate for slow raw sockets
109 self.transport = TTransport.TBufferedTransport(transport)
111 self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
112 # Create a client to use the protocol encoder
113 self.sender = Communication.Client(self.protocol)
114 self.transport.open()
115 except Thrift.TException, tx:
116 logging.error('%s' , tx.message)
118 def closeconnection():
119 self.transport.close()
125 def __init__(self, dominoclient, CLIservice):
127 self.dominoClient = dominoclient
128 self.CLIservice = CLIservice
130 def d_CLI(self, msg):
131 logging.info('Received CLI %s', msg.CLI_input)
133 self.CLIservice.process_input(msg.CLI_input)
135 CLIrespmsg = CLIResponse()
136 CLIrespmsg.CLI_response = "Testing..."
140 class DominoClientCLIService(threading.Thread):
141 def __init__(self, dominoclient, communicationhandler, interactive):
142 threading.Thread.__init__(self)
143 self.dominoclient = dominoclient
144 self.communicationhandler = communicationhandler
145 self.interactive = interactive
147 def process_input(self, args):
149 print 'Empty API body'
153 if args[0] == 'heartbeat':
154 self.dominoclient.heartbeat()
156 elif args[0] == 'publish':
157 opts, args = getopt.getopt(args[1:],"t:",["tosca-file="])
159 print '\nUsage: publish -t <toscafile>'
162 for opt, arg in opts:
163 if opt in ('-t', '--tosca-file'):
166 self.dominoclient.publish(toscafile)
168 elif args[0] == 'subscribe':
173 opts, args = getopt.getopt(args[1:],"l:t:",["labels=","ttype=","lop=","top="])
174 for opt, arg in opts:
175 if opt in ('-l', '--labels'):
176 labels = labels + arg.split(',')
177 elif opt in ('-t', '--ttype'):
178 templateTypes = templateTypes + arg.split(',')
179 elif opt in ('--lop'):
181 labelop = str2enum[arg.upper()]
182 except KeyError as ex:
183 print '\nInvalid label option, pick one of: APPEND, OVERWRITE, DELETE'
185 elif opt in ('--top'):
187 templateop = str2enum[arg.upper()]
188 except KeyError as ex:
189 print '\nInvalid label option, pick one of: APPEND, OVERWRITE, DELETE'
192 #check if labels or supported templates are nonempty
193 if labels != [] or templateTypes != []:
194 self.dominoclient.subscribe(labels, templateTypes, labelop, templateop)
196 elif args[0] == 'register':
197 self.dominoclient.start()
199 except getopt.GetoptError:
200 print 'Command is misentered or not supported!'
204 global DEFAULT_TOSCA_PUBFILE
205 if self.interactive == "TRUE":
210 if flag: #interactive CLI, loop in while until killed
212 sys.stdout.write('>>')
213 input_string = raw_input()
214 args = input_string.split()
218 sys.stdout.write('>>')
219 #process input arguments
220 self.process_input(args)
221 else: #domino cli-client is used, listen for the CLI rpc calls
222 cliHandler = CLIHandler(self.dominoclient, self)
223 processor = DominoClientCLI.Processor(cliHandler)
224 transport = TSocket.TServerSocket(port=self.dominoclient.CLIport)
225 tfactory = TTransport.TBufferedTransportFactory()
226 pfactory = TBinaryProtocol.TBinaryProtocolFactory()
227 #Use TThreadedServer or TThreadPoolServer for a multithreaded server
228 CLIServer = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
229 logging.debug('RPC service for CLI is starting...')
236 self.communicationHandler = CommunicationHandler(self)
237 self.processor = None
238 self.transport = None
241 self.communicationServer = None
243 self.CLIservice = None
245 self.serviceport = DOMINO_CLIENT_PORT
246 self.dominoserver_IP = DOMINO_SERVER_IP
247 self.CLIport = DOMINO_CLI_PORT
249 #Start from UNREGISTERED STATE
250 #TO BE DONE: initialize from a saved state
251 self.state = 'UNREGISTERED'
255 def start_communicationService(self):
256 self.processor = Communication.Processor(self.communicationHandler)
257 self.transport = TSocket.TServerSocket(port=int(self.serviceport))
258 self.tfactory = TTransport.TBufferedTransportFactory()
259 self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
260 #Use TThreadedServer or TThreadPoolServer for a multithreaded server
261 #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
262 self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
264 self.communicationServer.serve()
268 self.communicationHandler.openconnection()
270 except Thrift.TException, tx:
271 print '%s' % (tx.message)
274 if self.state == 'UNREGISTERED':
275 logging.info('%d Sending Registration', self.UDID)
276 #prepare registration message
277 reg_msg = RegisterMessage()
278 reg_msg.domino_udid_desired = UDID_DESIRED
279 reg_msg.seq_no = self.seqno
280 reg_msg.ipaddr = netutil.get_ip()
281 reg_msg.tcpport = self.serviceport
282 reg_msg.supported_templates = LIST_SUPPORTED_TEMPLATES
285 reg_msg_r = self.sender().d_register(reg_msg)
286 logging.info('Registration Response: Response Code: %d' , reg_msg_r.responseCode)
287 if reg_msg_r.comments:
288 logging.debug('Response Comments: %s' , reg_msg_r.comments)
290 if reg_msg_r.responseCode == SUCCESS:
291 self.state = 'REGISTERED'
292 self.UDID = reg_msg_r.domino_udid_assigned
294 #Handle registration failure here (possibly based on reponse comments)
296 except (Thrift.TException, TSocket.TTransportException) as tx:
297 logging.error('%s' , tx.message)
298 except (socket.timeout) as tx:
299 self.handle_RPC_timeout(reg_msg)
300 except (socket.error) as tx:
301 logging.error('%s' , tx.message)
302 self.seqno = self.seqno + 1
305 if self.state == 'UNREGISTERED':
308 logging.info('%d Sending heartbeat', self.UDID)
309 hbm = HeartBeatMessage()
310 hbm.domino_udid = self.UDID
311 hbm.seq_no = self.seqno
314 hbm_r = self.sender().d_heartbeat(hbm)
315 logging.info('heart beat received from: %d ,sequence number: %d' , hbm_r.domino_udid, hbm_r.seq_no)
316 except (Thrift.TException, TSocket.TTransportException) as tx:
317 logging.error('%s' , tx.message)
318 except (socket.timeout) as tx:
319 self.handle_RPC_timeout(hbm)
321 logging.error('Unexpected error: %s', sys.exc_info()[0])
323 self.seqno = self.seqno + 1
325 def publish(self, toscafile):
326 if self.state == 'UNREGISTERED':
329 logging.info('Publishing the template file: ' + toscafile)
330 pub_msg = PublishMessage()
331 pub_msg.domino_udid = self.UDID
332 pub_msg.seq_no = self.seqno
333 pub_msg.template_type = 'tosca-nfv-v1.0'
336 pub_msg.template = miscutil.read_templatefile(toscafile)
338 logging.error('I/O error(%d): %s' , e.errno, e.strerror)
341 pub_msg_r = self.sender().d_publish(pub_msg)
342 logging.info('Publish Response is received from: %d ,sequence number: %d Status: %d', pub_msg_r.domino_udid, pub_msg_r.seq_no, pub_msg_r.responseCode)
343 except (Thrift.TException, TSocket.TTransportException) as tx:
344 print '%s' % (tx.message)
345 except (socket.timeout) as tx:
346 self.handle_RPC_timeout(pub_msg)
348 self.seqno = self.seqno + 1
350 def subscribe(self, labels, templateTypes, label_op, template_op):
351 if self.state == 'UNREGISTERED':
354 logging.info('subscribing labels %s and templates %s', labels, templateTypes)
355 #send subscription message
356 sub_msg = SubscribeMessage()
357 sub_msg.domino_udid = self.UDID
358 sub_msg.seq_no = self.seqno
359 sub_msg.template_op = template_op
360 sub_msg.supported_template_types = templateTypes
361 sub_msg.label_op = label_op
362 sub_msg.labels = labels
364 sub_msg_r = self.sender().d_subscribe(sub_msg)
365 logging.info('Subscribe Response is received from: %d ,sequence number: %d', sub_msg_r.domino_udid,sub_msg_r.seq_no)
366 except (Thrift.TException, TSocket.TTransportException) as tx:
367 logging.error('%s' , tx.message)
368 except (socket.timeout) as tx:
369 self.handle_RPC_timeout(sub_msg)
371 self.seqno = self.seqno + 1
375 self.communicationHandler.closeconnection()
376 except Thrift.TException, tx:
377 logging.error('%s' , tx.message)
380 return self.communicationHandler.sender
382 def startCLI(self, interactive):
383 self.CLIservice = DominoClientCLIService(self, self.communicationHandler, interactive)
384 logging.info('CLI Service is starting')
385 self.CLIservice.start()
386 #to wait until CLI service is finished
387 #self.CLIservice.join()
389 def set_serviceport(self, port):
390 self.serviceport = port
392 def set_CLIport(self, cliport):
393 self.CLIport = cliport
395 def set_dominoserver_ipaddr(self, ipaddr):
396 self.dominoserver_IP = ipaddr
398 def handle_RPC_timeout(self, RPCmessage):
399 # TBD: handle each RPC timeout separately
400 if RPCmessage.messageType == HEART_BEAT:
401 logging.debug('RPC Timeout for message type: HEART_BEAT')
402 elif RPCmessage.messageType == PUBLISH:
403 logging.debug('RPC Timeout for message type: PUBLISH')
404 elif RPCmessage.messageType == SUBSCRIBE:
405 logging.debug('RPC Timeout for message type: SUBSCRIBE')
406 elif RPCmessage.messageType == REGISTER:
407 logging.debug('RPC Timeout for message type: REGISTER')
408 elif RPCmessage.messageType == QUERY:
409 logging.debug('RPC Timeout for message type: QUERY')
412 client = DominoClient()
414 interactive = INTERACTIVE
415 #process input arguments
417 opts, args = getopt.getopt(argv,"hc:p:i:l:",["conf=","port=","ipaddr=","log=","iac=","cliport="])
418 except getopt.GetoptError:
419 print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel> --iac=true/false --cliport <cliport>'
421 for opt, arg in opts:
423 print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel> --iac=true/false --cliport <cliport>'
425 elif opt in ("-c", "--conf"):
427 elif opt in ("-p", "--port"):
428 client.set_serviceport(int(arg))
429 elif opt in ("-i", "--ipaddr"):
430 client.set_dominoserver_ipaddr(arg)
431 elif opt in ("-l", "--log"):
433 elif opt in ("--iac"):
434 interactive = arg.upper()
435 elif opt in ("--cliport"):
436 client.set_CLIport(int(arg))
439 numeric_level = getattr(logging, loglevel.upper(), None)
441 if not isinstance(numeric_level, int):
442 raise ValueError('Invalid log level: %s' % loglevel)
443 logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
444 except ValueError, ex:
448 #The client is starting
449 logging.debug('Domino Client Starting...')
451 client.startCLI(interactive)
452 client.start_communicationService()
454 if __name__ == "__main__":