JIRA DOMINO-22
[domino.git] / DominoClient.py
1 #!/usr/bin/env python
2
3 #Copyright 2016 Open Platform for NFV Project, Inc. and its contributors
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #       http://www.apache.org/licenses/LICENSE-2.0
8 #   Unless required by applicable law or agreed to in writing, software
9 #   distributed under the License is distributed on an "AS IS" BASIS,
10 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 #   See the License for the specific language governing permissions and
12 #   limitations under the License.
13
14
15 import sys, os, glob, threading
16 import getopt, socket
17 import logging, errno
18 import uuid
19
20 #sys.path.append('gen-py')
21 #sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
22 sys.path.insert(0, glob.glob('./lib')[0])
23
24 from dominoRPC import Communication
25 from dominoRPC.ttypes import *
26 from dominoRPC.constants import *
27
28 from dominoCLI import DominoClientCLI
29 from dominoCLI.ttypes import *
30 from dominoCLI.constants import *
31
32 from thrift import Thrift
33 from thrift.transport import TSocket
34 from thrift.transport import TTransport
35 from thrift.protocol import TBinaryProtocol
36 from thrift.server import TServer
37
38 from util import *
39
40 #Load configuration parameters
41 from domino_conf import *
42
43 class CommunicationHandler:
44   def __init__(self):
45     self.log = {}
46
47   def __init__(self, dominoclient):
48     self.log = {}
49     self.dominoClient = dominoclient
50     self.transport = None
51     self.protocol = None
52     self.sender = None
53
54   # Template Push from Domino Server is received
55   # Actions:
56   #       - Depending on Controller Domain, call API
57   #       - Respond Back with Push Response
58   def d_push(self, push_msg):
59     logging.info('%s Received Template File', self.dominoClient.UDID)
60     # Retrieve the template file
61     try:
62       os.makedirs(TOSCA_RX_DIR+str(self.dominoClient.UDID))
63     except OSError as exception:
64       if exception.errno == errno.EEXIST:
65         logging.debug('IGNORING error: ERRNO %d; %s exists.', exception.errno, TOSCA_RX_DIR+str(self.dominoClient.UDID))
66       else:
67         logging.error('IGNORING error in creating %s. Err no: %d', exception.errno)
68
69     try:  
70       miscutil.write_templatefile(TOSCA_RX_DIR+str(self.dominoClient.UDID)+'/'+str(push_msg.template_UUID)+'.yaml' , push_msg.template)
71     except:    
72       logging.error('FAILED to write the pushed file: %s', sys.exc_info()[0])
73       push_r = PushResponseMessage()
74       # Fill response message fields
75       push_r.domino_udid = self.dominoClient.UDID
76       push_r.seq_no = self.dominoClient.seqno
77       push_r.responseCode = FAILED
78       self.dominoClient.seqno = self.dominoClient.seqno + 1
79       return push_r# Any inspection code goes here
80
81     ## End of inspection
82
83     # Call NB API
84     # If heat client, call heat command
85     
86     # If ONOS client, run as shell script
87
88
89     ## End of NB API call
90
91     # Marshall the response message for the Domino Server Fill
92     push_r = PushResponseMessage()
93     # Fill response message fields
94     push_r.domino_udid = self.dominoClient.UDID    
95     push_r.seq_no = self.dominoClient.seqno
96     push_r.responseCode = SUCCESS    
97     ## End of filling
98
99     self.dominoClient.seqno = self.dominoClient.seqno + 1
100
101     return push_r
102
103   
104   def openconnection(self):
105     try:
106       # Make socket
107       transport = TSocket.TSocket(self.dominoClient.dominoserver_IP, DOMINO_SERVER_PORT)
108       transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
109       # Add buffering to compensate for slow raw sockets
110       self.transport = TTransport.TBufferedTransport(transport)
111       # Wrap in a protocol
112       self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
113       # Create a client to use the protocol encoder
114       self.sender = Communication.Client(self.protocol)
115       self.transport.open()
116     except Thrift.TException, tx:
117       logging.error('%s' , tx.message)
118
119   def closeconnection():
120     self.transport.close()
121
122 class CLIHandler:
123   def __init__(self):
124     self.log = {}
125
126   def __init__(self, dominoclient, CLIservice):
127     self.log = {}
128     self.dominoClient = dominoclient
129     self.CLIservice = CLIservice
130
131   def d_CLI(self, msg):
132     logging.info('Received CLI %s', msg.CLI_input)
133
134     CLIrespmsg = CLIResponse()
135     CLIrespmsg.CLI_response = self.CLIservice.process_input(msg.CLI_input)
136     return CLIrespmsg
137  
138
139 class DominoClientCLIService(threading.Thread):
140   def __init__(self, dominoclient, communicationhandler, interactive):
141     threading.Thread.__init__(self)
142     self.dominoclient = dominoclient
143     self.communicationhandler = communicationhandler
144     self.interactive = interactive
145
146   def process_input(self, args):
147     if len(args) == 0:
148       return 'Empty API body'
149
150     try:
151       if args[0] == 'heartbeat':
152         self.dominoclient.heartbeat()
153
154       elif args[0] == 'publish':
155         opts, args = getopt.getopt(args[1:],"t:k:",["tosca-file=","tuid"])
156         if len(opts) == 0:
157           print '\nUsage: publish -t <toscafile> -k <TUID>'
158           return
159         
160         template_UUID = None
161         toscafile = None
162         for opt, arg in opts:
163           if opt in ('-t', '--tosca-file'):
164             toscafile = arg
165           elif opt in ('-k', '--tuid'):
166             template_UUID = arg
167         if toscafile is not None:
168           self.dominoclient.publish(toscafile,template_UUID)
169         else:
170           print '\nUsage: publish -t <toscafile> -k <TUID>'
171
172       elif args[0] == 'subscribe':
173         labels = []    
174         templateTypes = []
175         labelop = APPEND
176         templateop = APPEND
177         opts, args = getopt.getopt(args[1:],"l:t:",["labels=","ttype=","lop=","top="])
178         for opt, arg in opts:
179           if opt in ('-l', '--labels'):
180             labels = labels + arg.split(',')
181           elif opt in ('-t', '--ttype'):
182             templateTypes = templateTypes + arg.split(',')
183           elif opt in ('--lop'):
184             try:
185               labelop = str2enum[arg.upper()]
186             except KeyError as ex:
187               print '\nInvalid label option, pick one of: APPEND, OVERWRITE, DELETE'
188               return 
189           elif opt in ('--top'):
190             try:
191               templateop = str2enum[arg.upper()]
192             except KeyError as ex:
193               print '\nInvalid label option, pick one of: APPEND, OVERWRITE, DELETE'
194               return
195         
196         #check if labels or supported templates are nonempty
197         if labels != [] or templateTypes != []:
198           self.dominoclient.subscribe(labels, templateTypes, labelop, templateop)
199
200       elif args[0] == 'register':
201         self.dominoclient.start()
202
203       elif args[0] == 'list-tuids':
204         return self.dominoclient.query(['list-tuids'])
205
206       else:
207         return 'Command is misentered or not supported!'
208
209     except getopt.GetoptError:
210       print 'Command is misentered or not supported!'
211
212
213   def run(self):
214     global DEFAULT_TOSCA_PUBFILE
215     if self.interactive == "TRUE":
216       flag = True
217     else:
218       flag = False
219
220     if flag: #interactive CLI, loop in while until killed
221       while True:
222          sys.stdout.write('>>')
223          input_string = raw_input()
224          args = input_string.split()
225          if len(args) == 0:
226            continue
227
228          sys.stdout.write('>>')
229          #process input arguments
230          resp_msg = self.process_input(args)
231          if resp_msg is not None:
232            print resp_msg
233     else: #domino cli-client is used, listen for the CLI rpc calls
234       cliHandler = CLIHandler(self.dominoclient, self)
235       processor = DominoClientCLI.Processor(cliHandler)
236       transport = TSocket.TServerSocket(port=self.dominoclient.CLIport)
237       tfactory = TTransport.TBufferedTransportFactory()
238       pfactory = TBinaryProtocol.TBinaryProtocolFactory()
239       #Use TThreadedServer or TThreadPoolServer for a multithreaded server
240       CLIServer = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
241       logging.debug('RPC service for CLI is starting...')
242       CLIServer.serve()       
243
244 class DominoClient:
245   def __init__(self):
246
247
248     self.communicationHandler = CommunicationHandler(self)
249     self.processor = None
250     self.transport = None
251     self.tfactory = None
252     self.pfactory = None
253     self.communicationServer = None
254
255     self.CLIservice = None
256
257     self.serviceport = DOMINO_CLIENT_PORT
258     self.dominoserver_IP = DOMINO_SERVER_IP
259     self.CLIport = DOMINO_CLI_PORT 
260
261     #Start from UNREGISTERED STATE
262     #TO BE DONE: initialize from a saved state
263     self.state = 'UNREGISTERED'
264     self.seqno = 0
265     self.UDID = 1
266
267   def start_communicationService(self):
268     self.processor = Communication.Processor(self.communicationHandler)
269     self.transport = TSocket.TServerSocket(port=int(self.serviceport))
270     self.tfactory = TTransport.TBufferedTransportFactory()
271     self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
272     #Use TThreadedServer or TThreadPoolServer for a multithreaded server
273     #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
274     self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
275
276     self.communicationServer.serve()
277  
278   def start(self):
279     try:
280       self.communicationHandler.openconnection()
281       self.register()
282     except Thrift.TException, tx:
283       print '%s' % (tx.message)
284    
285   def register(self):  
286     if self.state == 'UNREGISTERED':
287       logging.info('%d Sending Registration', self.UDID)
288       #prepare registration message
289       reg_msg = RegisterMessage()
290       reg_msg.domino_udid_desired = UDID_DESIRED
291       reg_msg.seq_no = self.seqno
292       reg_msg.ipaddr = netutil.get_ip()
293       reg_msg.tcpport = self.serviceport
294       reg_msg.supported_templates = LIST_SUPPORTED_TEMPLATES
295
296       try:
297         reg_msg_r = self.sender().d_register(reg_msg)
298         logging.info('Registration Response: Response Code: %d'  , reg_msg_r.responseCode)
299         if reg_msg_r.comments:
300           logging.debug('Response Comments: %s' ,  reg_msg_r.comments)
301
302         if reg_msg_r.responseCode == SUCCESS:
303           self.state = 'REGISTERED'
304           self.UDID = reg_msg_r.domino_udid_assigned
305         else:
306           #Handle registration failure here (possibly based on reponse comments)
307           pass
308       except (Thrift.TException, TSocket.TTransportException) as tx:
309         logging.error('%s' , tx.message)
310       except (socket.timeout) as tx:
311         self.handle_RPC_timeout(reg_msg)
312       except (socket.error) as tx:
313         logging.error('%s' , tx.message)
314       self.seqno = self.seqno + 1
315
316   def heartbeat(self):
317     if self.state == 'UNREGISTERED':
318       self.start()
319           
320     logging.info('%s Sending heartbeat', self.UDID)
321     hbm = HeartBeatMessage()         
322     hbm.domino_udid = self.UDID        
323     hbm.seq_no = self.seqno         
324
325     try:
326       hbm_r = self.sender().d_heartbeat(hbm)
327       logging.info('heart beat received from: %s ,sequence number: %d' , hbm_r.domino_udid, hbm_r.seq_no)
328     except (Thrift.TException, TSocket.TTransportException) as tx:
329       logging.error('%s' , tx.message)
330     except (socket.timeout) as tx:
331       self.handle_RPC_timeout(hbm)
332     except:
333       logging.error('Unexpected error: %s', sys.exc_info()[0])
334     
335     self.seqno = self.seqno + 1    
336
337   def publish(self, toscafile, template_UUID=None):
338     if self.state == 'UNREGISTERED':
339       self.start()
340
341     logging.info('Publishing the template file: ' + toscafile)
342     pub_msg = PublishMessage()
343     pub_msg.domino_udid = self.UDID
344     pub_msg.seq_no = self.seqno
345     pub_msg.template_type = 'tosca-nfv-v1.0'
346     if template_UUID is not None:
347       pub_msg.template_UUID = template_UUID
348
349     try:
350       pub_msg.template = miscutil.read_templatefile(toscafile)
351     except IOError as e:
352       logging.error('I/O error(%d): %s' , e.errno, e.strerror)
353       return
354     try:
355       pub_msg_r = self.sender().d_publish(pub_msg)
356       logging.info('Publish Response is received from: %s ,sequence number: %d Status: %d', pub_msg_r.domino_udid, pub_msg_r.seq_no, pub_msg_r.responseCode)
357     except (Thrift.TException, TSocket.TTransportException) as tx:
358       print '%s' % (tx.message)
359     except (socket.timeout) as tx:
360       self.handle_RPC_timeout(pub_msg)
361
362     self.seqno = self.seqno + 1
363
364   def subscribe(self, labels, templateTypes, label_op, template_op):
365      if self.state == 'UNREGISTERED':
366        self.start()
367
368      logging.info('subscribing labels %s and templates %s', labels, templateTypes)
369      #send subscription message
370      sub_msg = SubscribeMessage()
371      sub_msg.domino_udid = self.UDID
372      sub_msg.seq_no = self.seqno
373      sub_msg.template_op = template_op
374      sub_msg.supported_template_types = templateTypes
375      sub_msg.label_op = label_op
376      sub_msg.labels = labels
377      try:
378        sub_msg_r = self.sender().d_subscribe(sub_msg)
379        logging.info('Subscribe Response is received from: %s ,sequence number: %d', sub_msg_r.domino_udid,sub_msg_r.seq_no)
380      except (Thrift.TException, TSocket.TTransportException) as tx: 
381        logging.error('%s' , tx.message)
382      except (socket.timeout) as tx: 
383        self.handle_RPC_timeout(sub_msg)
384
385      self.seqno = self.seqno + 1 
386
387   def query(self, queryString, template_UUID=None):
388     logging.info('querying Domino Server: %s', queryString)
389     query_msg = QueryMessage()
390     query_msg.domino_udid = self.UDID
391     query_msg.seq_no = self.seqno
392     query_msg.queryString = queryString
393     query_msg.template_UUID = template_UUID 
394     self.seqno = self.seqno + 1
395     try:
396       query_msg_r = self.sender().d_query(query_msg)
397       logging.info('Query Response is received from: %s ,sequence number: %d', query_msg_r.domino_udid,query_msg_r.seq_no)
398       if (query_msg_r.queryResponse is not None) and (len(query_msg_r.queryResponse)>0):
399         return query_msg_r.queryResponse
400     except (Thrift.TException, TSocket.TTransportException) as tx:
401       logging.error('%s' , tx.message)
402     except (socket.timeout) as tx:
403       self.handle_RPC_timeout(query_msg)
404
405   def stop(self):
406     try:
407       self.communicationHandler.closeconnection()
408     except Thrift.TException, tx:
409       logging.error('%s' , tx.message)
410     
411   def sender(self):
412     return self.communicationHandler.sender
413
414   def startCLI(self, interactive):
415     self.CLIservice = DominoClientCLIService(self, self.communicationHandler, interactive)
416     logging.info('CLI Service is starting')
417     self.CLIservice.start()
418     #to wait until CLI service is finished
419     #self.CLIservice.join()
420
421   def set_serviceport(self, port):
422     self.serviceport = port
423
424   def set_CLIport(self, cliport):
425     self.CLIport = cliport
426
427   def set_dominoserver_ipaddr(self, ipaddr):
428     self.dominoserver_IP = ipaddr
429
430   def handle_RPC_timeout(self, RPCmessage):
431     # TBD: handle each RPC timeout separately
432     if RPCmessage.messageType == HEART_BEAT:
433       logging.debug('RPC Timeout for message type: HEART_BEAT') 
434     elif RPCmessage.messageType == PUBLISH:
435       logging.debug('RPC Timeout for message type: PUBLISH')
436     elif RPCmessage.messageType == SUBSCRIBE:
437       logging.debug('RPC Timeout for message type: SUBSCRIBE')
438     elif RPCmessage.messageType == REGISTER:
439       logging.debug('RPC Timeout for message type: REGISTER')
440     elif RPCmessage.messageType == QUERY:
441       logging.debug('RPC Timeout for message type: QUERY') 
442
443 def main(argv):
444   client = DominoClient()
445   loglevel = LOGLEVEL
446   interactive = INTERACTIVE
447   #process input arguments
448   try:
449       opts, args = getopt.getopt(argv,"hc:p:i:l:",["conf=","port=","ipaddr=","log=","iac=","cliport=","uuid=","regmod="])
450   except getopt.GetoptError:
451       print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel> --iac=true/false --cliport <cliport>'
452       sys.exit(2)
453   for opt, arg in opts:
454       if opt == '-h':
455          print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel> --iac=true/false --cliport <cliport>'
456          sys.exit()
457       elif opt in ("-c", "--conf"):
458          configfile = arg
459       elif opt in ("-p", "--port"):
460          client.set_serviceport(int(arg))
461       elif opt in ("-i", "--ipaddr"):
462          client.set_dominoserver_ipaddr(arg)
463       elif opt in ("-l", "--log"):
464          loglevel = arg
465       elif opt in ("--iac"):
466          interactive = arg.upper()
467       elif opt in ("--cliport"):
468          client.set_CLIport(int(arg))
469       elif opt in ("--uuid"):
470          client.UDID = arg
471       elif opt in ("--regmod"):
472          if arg.upper() == 'REGISTERED': 
473            client.state = 'REGISTERED'
474   #Set logging level
475   numeric_level = getattr(logging, loglevel.upper(), None)
476   try:
477     if not isinstance(numeric_level, int):
478       raise ValueError('Invalid log level: %s' % loglevel)
479     logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
480   except ValueError, ex:
481     print ex.message
482     exit()
483  
484   #The client is starting
485   logging.debug('Domino Client Starting...')
486   client.start()
487   client.startCLI(interactive)
488   client.start_communicationService()
489
490 if __name__ == "__main__":
491    main(sys.argv[1:])
492