3 #Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
15 import sys, glob, threading
19 #sys.path.append('gen-py')
20 #sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
21 sys.path.insert(0, glob.glob('./lib')[0])
23 from dominoRPC import Communication
24 from dominoRPC.ttypes import *
25 from dominoRPC.constants import *
27 from dominoCLI import DominoClientCLI
28 from dominoCLI.ttypes import *
29 from dominoCLI.constants import *
31 from thrift import Thrift
32 from thrift.transport import TSocket
33 from thrift.transport import TTransport
34 from thrift.protocol import TBinaryProtocol
35 from thrift.server import TServer
39 #Load configuration parameters
40 from domino_conf import *
42 class CommunicationHandler:
46 def __init__(self, dominoclient):
48 self.dominoClient = dominoclient
51 transport = TSocket.TSocket(DOMINO_SERVER_IP, DOMINO_SERVER_PORT)
52 transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
53 # Add buffering to compensate for slow raw sockets
54 self.transport = TTransport.TBufferedTransport(transport)
56 self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
57 # Create a client to use the protocol encoder
58 self.sender = Communication.Client(self.protocol)
59 except Thrift.TException, tx:
60 logging.error('%s' , tx.message)
62 # Template Push from Domino Server is received
64 # - Depending on Controller Domain, call API
65 # - Respond Back with Push Response
66 def d_push(self, push_msg):
67 logging.info('%d Received Template File', self.dominoClient.UDID)
68 # Retrieve the template file
72 # Any inspection code goes here
77 # If heat client, call heat command
79 # If ONOS client, run as shell script
84 # Marshall the response message for the Domino Server Fill
85 push_r = PushResponseMessage()
86 # Fill response message fields
87 push_r.domino_udid = self.dominoClient.UDID
88 push_r.seq_no = self.dominoClient.seqno
89 push_r.responseCode = SUCCESS
92 self.dominoClient.seqno = self.dominoClient.seqno + 1
97 def openconnection(self):
100 def closeconnection():
101 self.transport.close()
107 def __init__(self, dominoclient, CLIservice):
109 self.dominoClient = dominoclient
110 self.CLIservice = CLIservice
112 def d_CLI(self, msg):
113 logging.info('Received CLI %s', msg.CLI_input)
115 self.CLIservice.process_input(msg.CLI_input)
117 CLIrespmsg = CLIResponse()
118 CLIrespmsg.CLI_response = "Testing..."
121 def read_templatefile(temp_filename):
122 f = open(temp_filename, 'r')
123 lines = f.read().splitlines()
127 class DominoClientCLIService(threading.Thread):
128 def __init__(self, dominoclient, communicationhandler, interactive):
129 threading.Thread.__init__(self)
130 self.dominoclient = dominoclient
131 self.communicationhandler = communicationhandler
132 self.interactive = interactive
134 def process_input(self, args):
136 if args[0] == 'heartbeat':
137 self.dominoclient.heartbeat()
139 elif args[0] == 'publish':
140 opts, args = getopt.getopt(args[1:],"t:",["tosca-file="])
142 print '\nUsage: publish -t <toscafile>'
145 for opt, arg in opts:
146 if opt in ('-t', '--tosca-file'):
149 self.dominoclient.publish(toscafile)
151 elif args[0] == 'subscribe':
156 opts, args = getopt.getopt(args[1:],"l:t:",["labels=","ttype=","lop=","top="])
157 for opt, arg in opts:
158 if opt in ('-l', '--labels'):
159 labels = labels + arg.split(',')
160 elif opt in ('-t', '--ttype'):
161 templateTypes = templateTypes + arg.split(',')
162 elif opt in ('--lop'):
164 labelop = str2enum[arg.upper()]
165 except KeyError as ex:
166 print '\nInvalid label option, pick one of: APPEND, OVERWRITE, DELETE'
168 elif opt in ('--top'):
170 templateop = str2enum[arg.upper()]
171 except KeyError as ex:
172 print '\nInvalid label option, pick one of: APPEND, OVERWRITE, DELETE'
175 #check if labels or supported templates are nonempty
176 if labels != [] or templateTypes != []:
177 self.dominoclient.subscribe(labels, templateTypes, labelop, templateop)
179 elif args[0] == 'register':
180 self.dominoclient.start()
182 except getopt.GetoptError:
183 print 'Command is misentered or not supported!'
187 global DEFAULT_TOSCA_PUBFILE
188 if self.interactive == "TRUE":
193 if flag: #interactive CLI, loop in while until killed
195 sys.stdout.write('>>')
196 input_string = raw_input()
197 args = input_string.split()
201 sys.stdout.write('>>')
202 #process input arguments
203 self.process_input(args)
204 else: #domino cli-client is used, listen for the CLI rpc calls
205 cliHandler = CLIHandler(self.dominoclient, self)
206 processor = DominoClientCLI.Processor(cliHandler)
207 transport = TSocket.TServerSocket(port=self.dominoclient.CLIport)
208 tfactory = TTransport.TBufferedTransportFactory()
209 pfactory = TBinaryProtocol.TBinaryProtocolFactory()
210 #Use TThreadedServer or TThreadPoolServer for a multithreaded server
211 CLIServer = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
212 logging.debug('RPC service for CLI is starting...')
217 self.communicationHandler = CommunicationHandler(self)
218 self.processor = None
219 self.transport = None
222 self.communicationServer = None
224 self.CLIservice = None
226 self.serviceport = 9091
227 self.dominoserver_IP = 'localhost'
229 self.CLIport = DOMINO_CLI_PORT
231 #Start from UNREGISTERED STATE
232 #TO BE DONE: initialize from a saved state
233 self.state = 'UNREGISTERED'
237 def start_communicationService(self):
238 self.processor = Communication.Processor(self.communicationHandler)
239 self.transport = TSocket.TServerSocket(port=int(self.serviceport))
240 self.tfactory = TTransport.TBufferedTransportFactory()
241 self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
242 #Use TThreadedServer or TThreadPoolServer for a multithreaded server
243 #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
244 self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
246 self.communicationServer.serve()
250 self.communicationHandler.openconnection()
252 except Thrift.TException, tx:
253 print '%s' % (tx.message)
256 if self.state == 'UNREGISTERED':
257 logging.info('%d Sending Registration', self.UDID)
258 #prepare registration message
259 reg_msg = RegisterMessage()
260 reg_msg.domino_udid_desired = UDID_DESIRED
261 reg_msg.seq_no = self.seqno
262 reg_msg.ipaddr = netutil.get_ip()
263 reg_msg.tcpport = self.serviceport
264 reg_msg.supported_templates = LIST_SUPPORTED_TEMPLATES
267 reg_msg_r = self.sender().d_register(reg_msg)
268 logging.info('Registration Response: Response Code: %d' , reg_msg_r.responseCode)
269 if reg_msg_r.comments:
270 logging.debug('Response Comments: %s' , reg_msg_r.comments)
272 if reg_msg_r.responseCode == SUCCESS:
273 self.state = 'REGISTERED'
274 self.UDID = reg_msg_r.domino_udid_assigned
276 #Handle registration failure here (possibly based on reponse comments)
278 except (Thrift.TException, TSocket.TTransportException) as tx:
279 logging.error('%s' , tx.message)
280 except (socket.timeout) as tx:
281 self.dominoclient.handle_RPC_timeout(pub_msg)
282 except (socket.error) as tx:
283 logging.error('%s' , tx.message)
284 self.seqno = self.seqno + 1
287 if self.state == 'UNREGISTERED':
290 logging.info('%d Sending heartbeat', self.UDID)
291 hbm = HeartBeatMessage()
292 hbm.domino_udid = self.UDID
293 hbm.seq_no = self.seqno
296 hbm_r = self.sender().d_heartbeat(hbm)
297 logging.info('heart beat received from: %d ,sequence number: %d' , hbm_r.domino_udid, hbm_r.seq_no)
298 except (Thrift.TException, TSocket.TTransportException) as tx:
299 logging.error('%s' , tx.message)
300 except (socket.timeout) as tx:
301 self.handle_RPC_timeout(hbm)
303 logging.error('Unexpected error: %s', sys.exc_info()[0])
305 self.seqno = self.seqno + 1
307 def publish(self, toscafile):
308 if self.state == 'UNREGISTERED':
311 logging.info('Publishing the template file: ' + toscafile)
312 pub_msg = PublishMessage()
313 pub_msg.domino_udid = self.UDID
314 pub_msg.seq_no = self.seqno
315 pub_msg.template_type = 'tosca-nfv-v1.0'
318 pub_msg.template = read_templatefile(toscafile)
320 logging.error('I/O error(%d): %s' , e.errno, e.strerror)
323 pub_msg_r = self.sender().d_publish(pub_msg)
324 logging.info('Publish Response is received from: %d ,sequence number: %d', pub_msg_r.domino_udid, pub_msg_r.seq_no)
325 except (Thrift.TException, TSocket.TTransportException) as tx:
326 print '%s' % (tx.message)
327 except (socket.timeout) as tx:
328 self.handle_RPC_timeout(pub_msg)
330 self.seqno = self.seqno + 1
332 def subscribe(self, labels, templateTypes, label_op, template_op):
333 if self.state == 'UNREGISTERED':
336 logging.info('subscribing labels %s and templates %s', labels, templateTypes)
337 #send subscription message
338 sub_msg = SubscribeMessage()
339 sub_msg.domino_udid = self.UDID
340 sub_msg.seq_no = self.seqno
341 sub_msg.template_op = template_op
342 sub_msg.supported_template_types = templateTypes
343 sub_msg.label_op = label_op
344 sub_msg.labels = labels
346 sub_msg_r = self.sender().d_subscribe(sub_msg)
347 logging.info('Subscribe Response is received from: %d ,sequence number: %d', sub_msg_r.domino_udid,sub_msg_r.seq_no)
348 except (Thrift.TException, TSocket.TTransportException) as tx:
349 logging.error('%s' , tx.message)
350 except (socket.timeout) as tx:
351 self.handle_RPC_timeout(sub_msg)
353 self.seqno = self.seqno + 1
357 self.communicationHandler.closeconnection()
358 except Thrift.TException, tx:
359 logging.error('%s' , tx.message)
362 return self.communicationHandler.sender
364 def startCLI(self, interactive):
365 self.CLIservice = DominoClientCLIService(self, self.communicationHandler, interactive)
366 logging.info('CLI Service is starting')
367 self.CLIservice.start()
368 #to wait until CLI service is finished
369 #self.CLIservice.join()
371 def set_serviceport(self, port):
372 self.serviceport = port
374 def set_CLIport(self, cliport):
375 self.CLIport = cliport
377 def set_dominoserver_ipaddr(self, ipaddr):
378 self.dominoserver_IP = ipaddr
380 def handle_RPC_timeout(self, RPCmessage):
381 # TBD: handle each RPC timeout separately
382 if RPCmessage.messageType == HEART_BEAT:
383 logging.debug('RPC Timeout for message type: HEART_BEAT')
384 elif RPCmessage.messageType == PUBLISH:
385 logging.debug('RPC Timeout for message type: PUBLISH')
386 elif RPCmessage.messageType == SUBSCRIBE:
387 logging.debug('RPC Timeout for message type: SUBSCRIBE')
388 elif RPCmessage.messageType == REGISTER:
389 logging.debug('RPC Timeout for message type: REGISTER')
390 elif RPCmessage.messageType == QUERY:
391 logging.debug('RPC Timeout for message type: QUERY')
394 client = DominoClient()
396 interactive = "FALSE"
397 #process input arguments
399 opts, args = getopt.getopt(argv,"hc:p:i:l:",["conf=","port=","ipaddr=","log=","iac=","cliport="])
400 except getopt.GetoptError:
401 print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel> --iac=true/false'
403 for opt, arg in opts:
405 print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel> --iac=true/false'
407 elif opt in ("-c", "--conf"):
409 elif opt in ("-p", "--port"):
410 client.set_serviceport(int(arg))
411 elif opt in ("-i", "--ipaddr"):
412 client.set_dominoserver_ipaddr(arg)
413 elif opt in ("-l", "--log"):
415 elif opt in ("--iac"):
416 interactive = arg.upper()
417 elif opt in ("--cliport"):
418 client.set_CLIport(int(arg))
421 numeric_level = getattr(logging, loglevel.upper(), None)
423 if not isinstance(numeric_level, int):
424 raise ValueError('Invalid log level: %s' % loglevel)
425 logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
426 except ValueError, ex:
430 #The client is starting
431 logging.debug('Domino Client Starting...')
433 client.startCLI(interactive)
434 client.start_communicationService()
436 if __name__ == "__main__":