Added cliport support for domino-cli.py to be able to run multiple domino clients...
[domino.git] / DominoClient.py
1 #!/usr/bin/env python
2
3 #Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #       http://www.apache.org/licenses/LICENSE-2.0
8 #   Unless required by applicable law or agreed to in writing, software
9 #   distributed under the License is distributed on an "AS IS" BASIS,
10 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 #   See the License for the specific language governing permissions and
12 #   limitations under the License.
13
14
15 import sys, os, glob, threading
16 import getopt, socket
17 import logging, errno
18
19 #sys.path.append('gen-py')
20 #sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
21 sys.path.insert(0, glob.glob('./lib')[0])
22
23 from dominoRPC import Communication
24 from dominoRPC.ttypes import *
25 from dominoRPC.constants import *
26
27 from dominoCLI import DominoClientCLI
28 from dominoCLI.ttypes import *
29 from dominoCLI.constants import *
30
31 from thrift import Thrift
32 from thrift.transport import TSocket
33 from thrift.transport import TTransport
34 from thrift.protocol import TBinaryProtocol
35 from thrift.server import TServer
36
37 from util import *
38
39 #Load configuration parameters
40 from domino_conf import *
41
42 class CommunicationHandler:
43   def __init__(self):
44     self.log = {}
45
46   def __init__(self, dominoclient):
47     self.log = {}
48     self.dominoClient = dominoclient
49     try:
50       # Make socket
51       transport = TSocket.TSocket(DOMINO_SERVER_IP, DOMINO_SERVER_PORT)
52       transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
53       # Add buffering to compensate for slow raw sockets
54       self.transport = TTransport.TBufferedTransport(transport)
55       # Wrap in a protocol
56       self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
57       # Create a client to use the protocol encoder
58       self.sender = Communication.Client(self.protocol)
59     except Thrift.TException, tx: 
60       logging.error('%s' , tx.message)
61
62   # Template Push from Domino Server is received
63   # Actions:
64   #       - Depending on Controller Domain, call API
65   #       - Respond Back with Push Response
66   def d_push(self, push_msg):
67     logging.info('%d Received Template File', self.dominoClient.UDID)
68     # Retrieve the template file
69     try:
70       os.makedirs(TOSCA_RX_DIR+str(self.dominoClient.UDID))
71     except OSError as exception:
72       if exception.errno == errno.EEXIST:
73         logging.debug('IGNORING error: ERRNO %d; %s exists.', exception.errno, TOSCA_RX_DIR+str(self.dominoClient.UDID))
74       else:
75         logging.error('IGNORING error in creating %s. Err no: %d', exception.errno)
76
77     try:  
78       miscutil.write_templatefile(TOSCA_RX_DIR+str(self.dominoClient.UDID)+'/'+str(push_msg.seq_no)+'.yaml' , push_msg.template)
79     except:    
80       logging.error('FAILED to write the pushed file: %s', sys.exc_info()[0])
81       push_r = PushResponseMessage()
82       # Fill response message fields
83       push_r.domino_udid = self.dominoClient.UDID
84       push_r.seq_no = self.dominoClient.seqno
85       push_r.responseCode = FAILED
86       self.dominoClient.seqno = self.dominoClient.seqno + 1
87       return push_r# Any inspection code goes here
88
89     ## End of inspection
90
91     # Call NB API
92     # If heat client, call heat command
93     
94     # If ONOS client, run as shell script
95
96
97     ## End of NB API call
98
99     # Marshall the response message for the Domino Server Fill
100     push_r = PushResponseMessage()
101     # Fill response message fields
102     push_r.domino_udid = self.dominoClient.UDID    
103     push_r.seq_no = self.dominoClient.seqno
104     push_r.responseCode = SUCCESS    
105     ## End of filling
106
107     self.dominoClient.seqno = self.dominoClient.seqno + 1
108
109     return push_r
110
111   
112   def openconnection(self):
113     self.transport.open()
114
115   def closeconnection():
116     self.transport.close()
117
118 class CLIHandler:
119   def __init__(self):
120     self.log = {}
121
122   def __init__(self, dominoclient, CLIservice):
123     self.log = {}
124     self.dominoClient = dominoclient
125     self.CLIservice = CLIservice
126
127   def d_CLI(self, msg):
128     logging.info('Received CLI %s', msg.CLI_input)
129
130     self.CLIservice.process_input(msg.CLI_input)
131     
132     CLIrespmsg = CLIResponse()
133     CLIrespmsg.CLI_response = "Testing..."
134     return CLIrespmsg
135  
136
137 class DominoClientCLIService(threading.Thread):
138   def __init__(self, dominoclient, communicationhandler, interactive):
139     threading.Thread.__init__(self)
140     self.dominoclient = dominoclient
141     self.communicationhandler = communicationhandler
142     self.interactive = interactive
143
144   def process_input(self, args):
145     if len(args) == 0:
146       print 'Empty API body'
147       return
148
149     try:
150       if args[0] == 'heartbeat':
151         self.dominoclient.heartbeat()
152
153       elif args[0] == 'publish':
154         opts, args = getopt.getopt(args[1:],"t:",["tosca-file="])
155         if len(opts) == 0:
156           print '\nUsage: publish -t <toscafile>'
157           return
158
159         for opt, arg in opts:
160           if opt in ('-t', '--tosca-file'):
161             toscafile = arg
162        
163         self.dominoclient.publish(toscafile)
164
165       elif args[0] == 'subscribe':
166         labels = []    
167         templateTypes = []
168         labelop = APPEND
169         templateop = APPEND
170         opts, args = getopt.getopt(args[1:],"l:t:",["labels=","ttype=","lop=","top="])
171         for opt, arg in opts:
172           if opt in ('-l', '--labels'):
173             labels = labels + arg.split(',')
174           elif opt in ('-t', '--ttype'):
175             templateTypes = templateTypes + arg.split(',')
176           elif opt in ('--lop'):
177             try:
178               labelop = str2enum[arg.upper()]
179             except KeyError as ex:
180               print '\nInvalid label option, pick one of: APPEND, OVERWRITE, DELETE'
181               return 
182           elif opt in ('--top'):
183             try:
184               templateop = str2enum[arg.upper()]
185             except KeyError as ex:
186               print '\nInvalid label option, pick one of: APPEND, OVERWRITE, DELETE'
187               return
188         
189         #check if labels or supported templates are nonempty
190         if labels != [] or templateTypes != []:
191           self.dominoclient.subscribe(labels, templateTypes, labelop, templateop)
192
193       elif args[0] == 'register':
194         self.dominoclient.start()
195
196     except getopt.GetoptError:
197       print 'Command is misentered or not supported!'
198
199
200   def run(self):
201     global DEFAULT_TOSCA_PUBFILE
202     if self.interactive == "TRUE":
203       flag = True
204     else:
205       flag = False
206
207     if flag: #interactive CLI, loop in while until killed
208       while True:
209          sys.stdout.write('>>')
210          input_string = raw_input()
211          args = input_string.split()
212          if len(args) == 0:
213            continue
214
215          sys.stdout.write('>>')
216          #process input arguments
217          self.process_input(args)
218     else: #domino cli-client is used, listen for the CLI rpc calls
219       cliHandler = CLIHandler(self.dominoclient, self)
220       processor = DominoClientCLI.Processor(cliHandler)
221       transport = TSocket.TServerSocket(port=self.dominoclient.CLIport)
222       tfactory = TTransport.TBufferedTransportFactory()
223       pfactory = TBinaryProtocol.TBinaryProtocolFactory()
224       #Use TThreadedServer or TThreadPoolServer for a multithreaded server
225       CLIServer = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
226       logging.debug('RPC service for CLI is starting...')
227       CLIServer.serve()       
228
229 class DominoClient:
230   def __init__(self):
231     self.communicationHandler = CommunicationHandler(self)
232     self.processor = None
233     self.transport = None
234     self.tfactory = None
235     self.pfactory = None
236     self.communicationServer = None
237
238     self.CLIservice = None
239
240     self.serviceport = 9091
241     self.dominoserver_IP = 'localhost'
242
243     self.CLIport = DOMINO_CLI_PORT 
244
245     #Start from UNREGISTERED STATE
246     #TO BE DONE: initialize from a saved state
247     self.state = 'UNREGISTERED'
248     self.seqno = 0
249     self.UDID = 1
250
251   def start_communicationService(self):
252     self.processor = Communication.Processor(self.communicationHandler)
253     self.transport = TSocket.TServerSocket(port=int(self.serviceport))
254     self.tfactory = TTransport.TBufferedTransportFactory()
255     self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
256     #Use TThreadedServer or TThreadPoolServer for a multithreaded server
257     #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
258     self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
259
260     self.communicationServer.serve()
261  
262   def start(self):
263     try:
264       self.communicationHandler.openconnection()
265       self.register()
266     except Thrift.TException, tx:
267       print '%s' % (tx.message)
268    
269   def register(self):  
270     if self.state == 'UNREGISTERED':
271       logging.info('%d Sending Registration', self.UDID)
272       #prepare registration message
273       reg_msg = RegisterMessage()
274       reg_msg.domino_udid_desired = UDID_DESIRED
275       reg_msg.seq_no = self.seqno
276       reg_msg.ipaddr = netutil.get_ip()
277       reg_msg.tcpport = self.serviceport
278       reg_msg.supported_templates = LIST_SUPPORTED_TEMPLATES
279
280       try:
281         reg_msg_r = self.sender().d_register(reg_msg)
282         logging.info('Registration Response: Response Code: %d'  , reg_msg_r.responseCode)
283         if reg_msg_r.comments:
284           logging.debug('Response Comments: %s' ,  reg_msg_r.comments)
285
286         if reg_msg_r.responseCode == SUCCESS:
287           self.state = 'REGISTERED'
288           self.UDID = reg_msg_r.domino_udid_assigned
289         else:
290           #Handle registration failure here (possibly based on reponse comments)
291           pass
292       except (Thrift.TException, TSocket.TTransportException) as tx:
293         logging.error('%s' , tx.message)
294       except (socket.timeout) as tx:
295         self.dominoclient.handle_RPC_timeout(pub_msg)
296       except (socket.error) as tx:
297         logging.error('%s' , tx.message)
298       self.seqno = self.seqno + 1
299
300   def heartbeat(self):
301     if self.state == 'UNREGISTERED':
302       self.start()
303           
304     logging.info('%d Sending heartbeat', self.UDID)
305     hbm = HeartBeatMessage()         
306     hbm.domino_udid = self.UDID        
307     hbm.seq_no = self.seqno         
308
309     try:
310       hbm_r = self.sender().d_heartbeat(hbm)
311       logging.info('heart beat received from: %d ,sequence number: %d' , hbm_r.domino_udid, hbm_r.seq_no)
312     except (Thrift.TException, TSocket.TTransportException) as tx:
313       logging.error('%s' , tx.message)
314     except (socket.timeout) as tx:
315       self.handle_RPC_timeout(hbm)
316     except:
317       logging.error('Unexpected error: %s', sys.exc_info()[0])
318     
319     self.seqno = self.seqno + 1    
320
321   def publish(self, toscafile):
322     if self.state == 'UNREGISTERED':
323       self.start()
324
325     logging.info('Publishing the template file: ' + toscafile)
326     pub_msg = PublishMessage()
327     pub_msg.domino_udid = self.UDID
328     pub_msg.seq_no = self.seqno
329     pub_msg.template_type = 'tosca-nfv-v1.0'
330
331     try:
332       pub_msg.template = miscutil.read_templatefile(toscafile)
333     except IOError as e:
334       logging.error('I/O error(%d): %s' , e.errno, e.strerror)
335       return
336     try:
337       pub_msg_r = self.sender().d_publish(pub_msg)
338       logging.info('Publish Response is received from: %d ,sequence number: %d Op. Status: %d', pub_msg_r.domino_udid, pub_msg_r.seq_no, pub_msg_r.responseCode)
339     except (Thrift.TException, TSocket.TTransportException) as tx:
340       print '%s' % (tx.message)
341     except (socket.timeout) as tx:
342       self.handle_RPC_timeout(pub_msg)
343
344     self.seqno = self.seqno + 1
345
346   def subscribe(self, labels, templateTypes, label_op, template_op):
347      if self.state == 'UNREGISTERED':
348        self.start()
349
350      logging.info('subscribing labels %s and templates %s', labels, templateTypes)
351      #send subscription message
352      sub_msg = SubscribeMessage()
353      sub_msg.domino_udid = self.UDID
354      sub_msg.seq_no = self.seqno
355      sub_msg.template_op = template_op
356      sub_msg.supported_template_types = templateTypes
357      sub_msg.label_op = label_op
358      sub_msg.labels = labels
359      try:
360        sub_msg_r = self.sender().d_subscribe(sub_msg)
361        logging.info('Subscribe Response is received from: %d ,sequence number: %d', sub_msg_r.domino_udid,sub_msg_r.seq_no)
362      except (Thrift.TException, TSocket.TTransportException) as tx: 
363        logging.error('%s' , tx.message)
364      except (socket.timeout) as tx: 
365        self.handle_RPC_timeout(sub_msg)
366
367      self.seqno = self.seqno + 1 
368
369   def stop(self):
370     try:
371       self.communicationHandler.closeconnection()
372     except Thrift.TException, tx:
373       logging.error('%s' , tx.message)
374     
375   def sender(self):
376     return self.communicationHandler.sender
377
378   def startCLI(self, interactive):
379     self.CLIservice = DominoClientCLIService(self, self.communicationHandler, interactive)
380     logging.info('CLI Service is starting')
381     self.CLIservice.start()
382     #to wait until CLI service is finished
383     #self.CLIservice.join()
384
385   def set_serviceport(self, port):
386     self.serviceport = port
387
388   def set_CLIport(self, cliport):
389     self.CLIport = cliport
390
391   def set_dominoserver_ipaddr(self, ipaddr):
392     self.dominoserver_IP = ipaddr
393
394   def handle_RPC_timeout(self, RPCmessage):
395     # TBD: handle each RPC timeout separately
396     if RPCmessage.messageType == HEART_BEAT:
397       logging.debug('RPC Timeout for message type: HEART_BEAT') 
398     elif RPCmessage.messageType == PUBLISH:
399       logging.debug('RPC Timeout for message type: PUBLISH')
400     elif RPCmessage.messageType == SUBSCRIBE:
401       logging.debug('RPC Timeout for message type: SUBSCRIBE')
402     elif RPCmessage.messageType == REGISTER:
403       logging.debug('RPC Timeout for message type: REGISTER')
404     elif RPCmessage.messageType == QUERY:
405       logging.debug('RPC Timeout for message type: QUERY') 
406
407 def main(argv):
408   client = DominoClient()
409   loglevel = LOGLEVEL
410   interactive = INTERACTIVE
411   #process input arguments
412   try:
413       opts, args = getopt.getopt(argv,"hc:p:i:l:",["conf=","port=","ipaddr=","log=","iac=","cliport="])
414   except getopt.GetoptError:
415       print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel> --iac=true/false --cliport <cliport>'
416       sys.exit(2)
417   for opt, arg in opts:
418       if opt == '-h':
419          print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel> --iac=true/false --cliport <cliport>'
420          sys.exit()
421       elif opt in ("-c", "--conf"):
422          configfile = arg
423       elif opt in ("-p", "--port"):
424          client.set_serviceport(int(arg))
425       elif opt in ("-i", "--ipaddr"):
426          client.set_dominoserver_ipaddr(arg)
427       elif opt in ("-l", "--log"):
428          loglevel = arg
429       elif opt in ("--iac"):
430          interactive = arg.upper()
431       elif opt in ("--cliport"):
432          client.set_CLIport(int(arg))
433
434   #Set logging level
435   numeric_level = getattr(logging, loglevel.upper(), None)
436   try:
437     if not isinstance(numeric_level, int):
438       raise ValueError('Invalid log level: %s' % loglevel)
439     logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
440   except ValueError, ex:
441     print ex.message
442     exit()
443  
444   #The client is starting
445   logging.debug('Domino Client Starting...')
446   client.start()
447   client.startCLI(interactive)
448   client.start_communicationService()
449
450 if __name__ == "__main__":
451    main(sys.argv[1:])
452