3 #Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
15 import sys, glob, threading
19 #sys.path.append('gen-py')
20 #sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
21 sys.path.insert(0, glob.glob('./lib')[0])
23 from dominoRPC import Communication
24 from dominoRPC.ttypes import *
25 from dominoRPC.constants import *
27 from thrift import Thrift
28 from thrift.transport import TSocket
29 from thrift.transport import TTransport
30 from thrift.protocol import TBinaryProtocol
31 from thrift.server import TServer
35 #Load configuration parameters
36 from domino_conf import *
38 class CommunicationHandler:
42 def __init__(self, dominoclient):
43 global DOMINO_SERVER_IP, DOMINO_SERVER_PORT
45 self.dominoClient = dominoclient
48 transport = TSocket.TSocket(DOMINO_SERVER_IP, DOMINO_SERVER_PORT)
49 transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
50 # Add buffering to compensate for slow raw sockets
51 self.transport = TTransport.TBufferedTransport(transport)
53 self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
54 # Create a client to use the protocol encoder
55 self.sender = Communication.Client(self.protocol)
56 except Thrift.TException, tx:
57 logging.error('%s' , tx.message)
59 # Template Push from Domino Server is received
61 # - Depending on Controller Domain, call API
62 # - Respond Back with Push Response
63 def d_push(self, push_msg):
64 logging.info('%d Received Template File', self.dominoClient.UDID)
65 # Retrieve the template file
69 # Any inspection code goes here
74 # If heat client, call heat command
76 # If ONOS client, run as shell script
81 # Marshall the response message for the Domino Server Fill
82 push_r = PushResponseMessage()
83 # Fill response message fields
84 push_r.domino_udid = self.dominoClient.UDID
85 push_r.seq_no = self.dominoClient.seqno
86 push_r.responseCode = SUCCESS
89 self.dominoClient.seqno = self.dominoClient.seqno + 1
94 def openconnection(self):
97 def closeconnection():
98 self.transport.close()
100 def read_templatefile(temp_filename):
101 f = open(temp_filename, 'r')
102 lines = f.read().splitlines()
106 class DominoClientCLIService(threading.Thread):
107 def __init__(self, dominoclient, communicationhandler):
108 threading.Thread.__init__(self)
109 self.dominoclient = dominoclient
110 self.communicationhandler = communicationhandler
113 global DEFAULT_TOSCA_PUBFILE
115 sys.stdout.write('>>')
116 input_string = raw_input()
117 args = input_string.split()
124 #process input arguments
126 sys.stdout.write('>>')
127 if args[0] == 'heartbeat':
128 logging.info('%d Sending heatbeat', self.dominoclient.UDID)
129 hbm = HeartBeatMessage()
130 hbm.domino_udid = self.dominoclient.UDID
131 hbm.seq_no = self.dominoclient.seqno
133 hbm_r = self.communicationhandler.sender.d_heartbeat(hbm)
134 logging.info('heart beat received from: %d ,sequence number: %d' , hbm_r.domino_udid, hbm_r.seq_no)
135 except (Thrift.TException, TSocket.TTransportException) as tx:
136 logging.error('%s' , tx.message)
137 except (socket.timeout) as tx:
138 self.dominoclient.handle_RPC_timeout(hbm)
140 logging.error('Unexpected error: %s', sys.exc_info()[0])
141 self.dominoclient.seqno = self.dominoclient.seqno + 1
143 elif args[0] == 'publish':
144 opts, args = getopt.getopt(args[1:],"t:",["tosca-file="])
146 print '\nUsage: publish -t <toscafile>'
149 #toscafile = DEFAULT_TOSCA_PUBFILE
150 for opt, arg in opts:
151 if opt in ('-t', '--tosca-file'):
154 pub_msg = PublishMessage()
155 pub_msg.domino_udid = self.dominoclient.UDID
156 pub_msg.seq_no = self.dominoclient.seqno
157 pub_msg.template_type = 'tosca-nfv-v1.0'
159 pub_msg.template = read_templatefile(toscafile)
161 logging.error('I/O error(%d): %s' , e.errno, e.strerror)
163 logging.info('Publishing the template file: ' + toscafile)
165 pub_msg_r = self.communicationhandler.sender.d_publish(pub_msg)
166 logging.info('Publish Response is received from: %d ,sequence number: %d', pub_msg_r.domino_udid, pub_msg_r.seq_no)
167 except (Thrift.TException, TSocket.TTransportException) as tx:
168 print '%s' % (tx.message)
169 except (socket.timeout) as tx:
170 self.dominoclient.handle_RPC_timeout(pub_msg)
172 self.dominoclient.seqno = self.dominoclient.seqno + 1
174 elif args[0] == 'subscribe':
175 opts, args = getopt.getopt(args[1:],"l:t:",["labels=","ttype="])
176 for opt, arg in opts:
177 if opt in ('-l', '--labels'):
178 labels = labels + arg.split(',')
179 elif opt in ('-t', '--ttype'):
180 templateTypes = templateTypes + arg.split(',')
182 elif args[0] == 'register':
183 self.dominoclient.start()
185 except getopt.GetoptError:
186 print 'Command is misentered or not supported!'
189 #check if labels or supported templates are nonempty
190 if labels != [] or templateTypes != []:
191 #send subscription message
192 sub_msg = SubscribeMessage()
193 sub_msg.domino_udid = self.dominoclient.UDID
194 sub_msg.seq_no = self.dominoclient.seqno
195 sub_msg.template_op = APPEND
196 sub_msg.supported_template_types = templateTypes
197 sub_msg.label_op = APPEND
198 sub_msg.labels = labels
199 logging.info('subscribing labels %s and templates %s', labels, templateTypes)
201 sub_msg_r = self.communicationhandler.sender.d_subscribe(sub_msg)
202 logging.info('Subscribe Response is received from: %d ,sequence number: %d', sub_msg_r.domino_udid,sub_msg_r.seq_no)
203 except (Thrift.TException, TSocket.TTransportException) as tx:
204 logging.error('%s' , tx.message)
205 except (socket.timeout) as tx:
206 self.dominoclient.handle_RPC_timeout(sub_msg)
208 self.dominoclient.seqno = self.dominoclient.seqno + 1
212 self.communicationHandler = CommunicationHandler(self)
213 self.processor = None
214 self.transport = None
217 self.communicationServer = None
219 self.CLIservice = DominoClientCLIService(self, self.communicationHandler)
221 self.serviceport = 9091
222 self.dominoserver_IP = 'localhost'
224 #Start from UNREGISTERED STATE
225 #TO BE DONE: initialize from a saved state
226 self.state = 'UNREGISTERED'
230 def start_communicationService(self):
231 self.processor = Communication.Processor(self.communicationHandler)
232 self.transport = TSocket.TServerSocket(port=int(self.serviceport))
233 self.tfactory = TTransport.TBufferedTransportFactory()
234 self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
235 #Use TThreadedServer or TThreadPoolServer for a multithreaded server
236 #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
237 self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
239 self.communicationServer.serve()
243 self.communicationHandler.openconnection()
245 except Thrift.TException, tx:
246 print '%s' % (tx.message)
249 if self.state == 'UNREGISTERED':
250 #prepare registration message
251 reg_msg = RegisterMessage()
252 reg_msg.domino_udid_desired = UDID_DESIRED
253 reg_msg.seq_no = self.seqno
254 reg_msg.ipaddr = netutil.get_ip()
255 reg_msg.tcpport = self.serviceport
256 reg_msg.supported_templates = LIST_SUPPORTED_TEMPLATES
259 reg_msg_r = self.sender().d_register(reg_msg)
260 logging.info('Registration Response: Response Code: %d' , reg_msg_r.responseCode)
261 if reg_msg_r.comments:
262 logging.debug('Response Comments: %s' , reg_msg_r.comments)
264 if reg_msg_r.responseCode == SUCCESS:
265 self.state = 'REGISTERED'
266 self.UDID = reg_msg_r.domino_udid_assigned
268 #Handle registration failure here (possibly based on reponse comments)
270 except (Thrift.TException, TSocket.TTransportException) as tx:
271 logging.error('%s' , tx.message)
272 except (socket.timeout) as tx:
273 self.dominoclient.handle_RPC_timeout(pub_msg)
274 except (socket.error) as tx:
275 logging.error('%s' , tx.message)
276 self.seqno = self.seqno + 1
280 self.communicationHandler.closeconnection()
281 except Thrift.TException, tx:
282 logging.error('%s' , tx.message)
285 return self.communicationHandler.sender
288 logging.info('CLI Service is starting')
289 self.CLIservice.start()
290 #to wait until CLI service is finished
291 #self.CLIservice.join()
293 def set_serviceport(self, port):
294 self.serviceport = port
296 def set_dominoserver_ipaddr(self, ipaddr):
297 self.dominoserver_IP = ipaddr
299 def handle_RPC_timeout(self, RPCmessage):
300 # TBD: handle each RPC timeout separately
301 if RPCmessage.messageType == HEART_BEAT:
302 logging.debug('RPC Timeout for message type: HEART_BEAT')
303 elif RPCmessage.messageType == PUBLISH:
304 logging.debug('RPC Timeout for message type: PUBLISH')
305 elif RPCmessage.messageType == SUBSCRIBE:
306 logging.debug('RPC Timeout for message type: SUBSCRIBE')
307 elif RPCmessage.messageType == REGISTER:
308 logging.debug('RPC Timeout for message type: REGISTER')
309 elif RPCmessage.messageType == QUERY:
310 logging.debug('RPC Timeout for message type: QUERY')
313 client = DominoClient()
315 #process input arguments
317 opts, args = getopt.getopt(argv,"hc:p:i:l:",["conf=","port=","ipaddr=","log="])
318 except getopt.GetoptError:
319 print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel>'
321 for opt, arg in opts:
323 print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel>'
325 elif opt in ("-c", "--conf"):
327 elif opt in ("-p", "--port"):
328 client.set_serviceport(int(arg))
329 elif opt in ("-i", "--ipaddr"):
330 client.set_dominoserver_ipaddr(arg)
331 elif opt in ("--log"):
335 numeric_level = getattr(logging, loglevel.upper(), None)
337 if not isinstance(numeric_level, int):
338 raise ValueError('Invalid log level: %s' % loglevel)
339 logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
340 except ValueError, ex:
344 #The client is starting
345 logging.debug('Domino Client Starting...')
348 client.start_communicationService()
350 if __name__ == "__main__":