added new tosca samples and fixed some small bugs
[domino.git] / DominoClient.py
1 #!/usr/bin/env python
2
3 #Copyright 2016 Open Platform for NFV Project, Inc. and its contributors
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #       http://www.apache.org/licenses/LICENSE-2.0
8 #   Unless required by applicable law or agreed to in writing, software
9 #   distributed under the License is distributed on an "AS IS" BASIS,
10 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 #   See the License for the specific language governing permissions and
12 #   limitations under the License.
13
14
15 import sys, os, glob, threading
16 import getopt, socket
17 import logging, errno
18
19 #sys.path.append('gen-py')
20 #sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
21 sys.path.insert(0, glob.glob('./lib')[0])
22
23 from dominoRPC import Communication
24 from dominoRPC.ttypes import *
25 from dominoRPC.constants import *
26
27 from dominoCLI import DominoClientCLI
28 from dominoCLI.ttypes import *
29 from dominoCLI.constants import *
30
31 from thrift import Thrift
32 from thrift.transport import TSocket
33 from thrift.transport import TTransport
34 from thrift.protocol import TBinaryProtocol
35 from thrift.server import TServer
36
37 from util import *
38
39 #Load configuration parameters
40 from domino_conf import *
41
42 class CommunicationHandler:
43   def __init__(self):
44     self.log = {}
45
46   def __init__(self, dominoclient):
47     self.log = {}
48     self.dominoClient = dominoclient
49     self.transport = None
50     self.protocol = None
51     self.sender = None
52
53   # Template Push from Domino Server is received
54   # Actions:
55   #       - Depending on Controller Domain, call API
56   #       - Respond Back with Push Response
57   def d_push(self, push_msg):
58     logging.info('%d Received Template File', self.dominoClient.UDID)
59     # Retrieve the template file
60     try:
61       os.makedirs(TOSCA_RX_DIR+str(self.dominoClient.UDID))
62     except OSError as exception:
63       if exception.errno == errno.EEXIST:
64         logging.debug('IGNORING error: ERRNO %d; %s exists.', exception.errno, TOSCA_RX_DIR+str(self.dominoClient.UDID))
65       else:
66         logging.error('IGNORING error in creating %s. Err no: %d', exception.errno)
67
68     try:  
69       miscutil.write_templatefile(TOSCA_RX_DIR+str(self.dominoClient.UDID)+'/'+str(push_msg.seq_no)+'.yaml' , push_msg.template)
70     except:    
71       logging.error('FAILED to write the pushed file: %s', sys.exc_info()[0])
72       push_r = PushResponseMessage()
73       # Fill response message fields
74       push_r.domino_udid = self.dominoClient.UDID
75       push_r.seq_no = self.dominoClient.seqno
76       push_r.responseCode = FAILED
77       self.dominoClient.seqno = self.dominoClient.seqno + 1
78       return push_r# Any inspection code goes here
79
80     ## End of inspection
81
82     # Call NB API
83     # If heat client, call heat command
84     
85     # If ONOS client, run as shell script
86
87
88     ## End of NB API call
89
90     # Marshall the response message for the Domino Server Fill
91     push_r = PushResponseMessage()
92     # Fill response message fields
93     push_r.domino_udid = self.dominoClient.UDID    
94     push_r.seq_no = self.dominoClient.seqno
95     push_r.responseCode = SUCCESS    
96     ## End of filling
97
98     self.dominoClient.seqno = self.dominoClient.seqno + 1
99
100     return push_r
101
102   
103   def openconnection(self):
104     try:
105       # Make socket
106       transport = TSocket.TSocket(self.dominoClient.dominoserver_IP, DOMINO_SERVER_PORT)
107       transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
108       # Add buffering to compensate for slow raw sockets
109       self.transport = TTransport.TBufferedTransport(transport)
110       # Wrap in a protocol
111       self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
112       # Create a client to use the protocol encoder
113       self.sender = Communication.Client(self.protocol)
114       self.transport.open()
115     except Thrift.TException, tx:
116       logging.error('%s' , tx.message)
117
118   def closeconnection():
119     self.transport.close()
120
121 class CLIHandler:
122   def __init__(self):
123     self.log = {}
124
125   def __init__(self, dominoclient, CLIservice):
126     self.log = {}
127     self.dominoClient = dominoclient
128     self.CLIservice = CLIservice
129
130   def d_CLI(self, msg):
131     logging.info('Received CLI %s', msg.CLI_input)
132
133     self.CLIservice.process_input(msg.CLI_input)
134     
135     CLIrespmsg = CLIResponse()
136     CLIrespmsg.CLI_response = "Testing..."
137     return CLIrespmsg
138  
139
140 class DominoClientCLIService(threading.Thread):
141   def __init__(self, dominoclient, communicationhandler, interactive):
142     threading.Thread.__init__(self)
143     self.dominoclient = dominoclient
144     self.communicationhandler = communicationhandler
145     self.interactive = interactive
146
147   def process_input(self, args):
148     if len(args) == 0:
149       print 'Empty API body'
150       return
151
152     try:
153       if args[0] == 'heartbeat':
154         self.dominoclient.heartbeat()
155
156       elif args[0] == 'publish':
157         opts, args = getopt.getopt(args[1:],"t:",["tosca-file="])
158         if len(opts) == 0:
159           print '\nUsage: publish -t <toscafile>'
160           return
161
162         for opt, arg in opts:
163           if opt in ('-t', '--tosca-file'):
164             toscafile = arg
165        
166         self.dominoclient.publish(toscafile)
167
168       elif args[0] == 'subscribe':
169         labels = []    
170         templateTypes = []
171         labelop = APPEND
172         templateop = APPEND
173         opts, args = getopt.getopt(args[1:],"l:t:",["labels=","ttype=","lop=","top="])
174         for opt, arg in opts:
175           if opt in ('-l', '--labels'):
176             labels = labels + arg.split(',')
177           elif opt in ('-t', '--ttype'):
178             templateTypes = templateTypes + arg.split(',')
179           elif opt in ('--lop'):
180             try:
181               labelop = str2enum[arg.upper()]
182             except KeyError as ex:
183               print '\nInvalid label option, pick one of: APPEND, OVERWRITE, DELETE'
184               return 
185           elif opt in ('--top'):
186             try:
187               templateop = str2enum[arg.upper()]
188             except KeyError as ex:
189               print '\nInvalid label option, pick one of: APPEND, OVERWRITE, DELETE'
190               return
191         
192         #check if labels or supported templates are nonempty
193         if labels != [] or templateTypes != []:
194           self.dominoclient.subscribe(labels, templateTypes, labelop, templateop)
195
196       elif args[0] == 'register':
197         self.dominoclient.start()
198
199     except getopt.GetoptError:
200       print 'Command is misentered or not supported!'
201
202
203   def run(self):
204     global DEFAULT_TOSCA_PUBFILE
205     if self.interactive == "TRUE":
206       flag = True
207     else:
208       flag = False
209
210     if flag: #interactive CLI, loop in while until killed
211       while True:
212          sys.stdout.write('>>')
213          input_string = raw_input()
214          args = input_string.split()
215          if len(args) == 0:
216            continue
217
218          sys.stdout.write('>>')
219          #process input arguments
220          self.process_input(args)
221     else: #domino cli-client is used, listen for the CLI rpc calls
222       cliHandler = CLIHandler(self.dominoclient, self)
223       processor = DominoClientCLI.Processor(cliHandler)
224       transport = TSocket.TServerSocket(port=self.dominoclient.CLIport)
225       tfactory = TTransport.TBufferedTransportFactory()
226       pfactory = TBinaryProtocol.TBinaryProtocolFactory()
227       #Use TThreadedServer or TThreadPoolServer for a multithreaded server
228       CLIServer = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
229       logging.debug('RPC service for CLI is starting...')
230       CLIServer.serve()       
231
232 class DominoClient:
233   def __init__(self):
234
235
236     self.communicationHandler = CommunicationHandler(self)
237     self.processor = None
238     self.transport = None
239     self.tfactory = None
240     self.pfactory = None
241     self.communicationServer = None
242
243     self.CLIservice = None
244
245     self.serviceport = 9091
246     self.dominoserver_IP = DOMINO_SERVER_IP
247     self.CLIport = DOMINO_CLI_PORT 
248
249     #Start from UNREGISTERED STATE
250     #TO BE DONE: initialize from a saved state
251     self.state = 'UNREGISTERED'
252     self.seqno = 0
253     self.UDID = 1
254
255   def start_communicationService(self):
256     self.processor = Communication.Processor(self.communicationHandler)
257     self.transport = TSocket.TServerSocket(port=int(self.serviceport))
258     self.tfactory = TTransport.TBufferedTransportFactory()
259     self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
260     #Use TThreadedServer or TThreadPoolServer for a multithreaded server
261     #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
262     self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
263
264     self.communicationServer.serve()
265  
266   def start(self):
267     try:
268       self.communicationHandler.openconnection()
269       self.register()
270     except Thrift.TException, tx:
271       print '%s' % (tx.message)
272    
273   def register(self):  
274     if self.state == 'UNREGISTERED':
275       logging.info('%d Sending Registration', self.UDID)
276       #prepare registration message
277       reg_msg = RegisterMessage()
278       reg_msg.domino_udid_desired = UDID_DESIRED
279       reg_msg.seq_no = self.seqno
280       reg_msg.ipaddr = netutil.get_ip()
281       reg_msg.tcpport = self.serviceport
282       reg_msg.supported_templates = LIST_SUPPORTED_TEMPLATES
283
284       try:
285         reg_msg_r = self.sender().d_register(reg_msg)
286         logging.info('Registration Response: Response Code: %d'  , reg_msg_r.responseCode)
287         if reg_msg_r.comments:
288           logging.debug('Response Comments: %s' ,  reg_msg_r.comments)
289
290         if reg_msg_r.responseCode == SUCCESS:
291           self.state = 'REGISTERED'
292           self.UDID = reg_msg_r.domino_udid_assigned
293         else:
294           #Handle registration failure here (possibly based on reponse comments)
295           pass
296       except (Thrift.TException, TSocket.TTransportException) as tx:
297         logging.error('%s' , tx.message)
298       except (socket.timeout) as tx:
299         self.handle_RPC_timeout(reg_msg)
300       except (socket.error) as tx:
301         logging.error('%s' , tx.message)
302       self.seqno = self.seqno + 1
303
304   def heartbeat(self):
305     if self.state == 'UNREGISTERED':
306       self.start()
307           
308     logging.info('%d Sending heartbeat', self.UDID)
309     hbm = HeartBeatMessage()         
310     hbm.domino_udid = self.UDID        
311     hbm.seq_no = self.seqno         
312
313     try:
314       hbm_r = self.sender().d_heartbeat(hbm)
315       logging.info('heart beat received from: %d ,sequence number: %d' , hbm_r.domino_udid, hbm_r.seq_no)
316     except (Thrift.TException, TSocket.TTransportException) as tx:
317       logging.error('%s' , tx.message)
318     except (socket.timeout) as tx:
319       self.handle_RPC_timeout(hbm)
320     except:
321       logging.error('Unexpected error: %s', sys.exc_info()[0])
322     
323     self.seqno = self.seqno + 1    
324
325   def publish(self, toscafile):
326     if self.state == 'UNREGISTERED':
327       self.start()
328
329     logging.info('Publishing the template file: ' + toscafile)
330     pub_msg = PublishMessage()
331     pub_msg.domino_udid = self.UDID
332     pub_msg.seq_no = self.seqno
333     pub_msg.template_type = 'tosca-nfv-v1.0'
334
335     try:
336       pub_msg.template = miscutil.read_templatefile(toscafile)
337     except IOError as e:
338       logging.error('I/O error(%d): %s' , e.errno, e.strerror)
339       return
340     try:
341       pub_msg_r = self.sender().d_publish(pub_msg)
342       logging.info('Publish Response is received from: %d ,sequence number: %d Op. Status: %d', pub_msg_r.domino_udid, pub_msg_r.seq_no, pub_msg_r.responseCode)
343     except (Thrift.TException, TSocket.TTransportException) as tx:
344       print '%s' % (tx.message)
345     except (socket.timeout) as tx:
346       self.handle_RPC_timeout(pub_msg)
347
348     self.seqno = self.seqno + 1
349
350   def subscribe(self, labels, templateTypes, label_op, template_op):
351      if self.state == 'UNREGISTERED':
352        self.start()
353
354      logging.info('subscribing labels %s and templates %s', labels, templateTypes)
355      #send subscription message
356      sub_msg = SubscribeMessage()
357      sub_msg.domino_udid = self.UDID
358      sub_msg.seq_no = self.seqno
359      sub_msg.template_op = template_op
360      sub_msg.supported_template_types = templateTypes
361      sub_msg.label_op = label_op
362      sub_msg.labels = labels
363      try:
364        sub_msg_r = self.sender().d_subscribe(sub_msg)
365        logging.info('Subscribe Response is received from: %d ,sequence number: %d', sub_msg_r.domino_udid,sub_msg_r.seq_no)
366      except (Thrift.TException, TSocket.TTransportException) as tx: 
367        logging.error('%s' , tx.message)
368      except (socket.timeout) as tx: 
369        self.handle_RPC_timeout(sub_msg)
370
371      self.seqno = self.seqno + 1 
372
373   def stop(self):
374     try:
375       self.communicationHandler.closeconnection()
376     except Thrift.TException, tx:
377       logging.error('%s' , tx.message)
378     
379   def sender(self):
380     return self.communicationHandler.sender
381
382   def startCLI(self, interactive):
383     self.CLIservice = DominoClientCLIService(self, self.communicationHandler, interactive)
384     logging.info('CLI Service is starting')
385     self.CLIservice.start()
386     #to wait until CLI service is finished
387     #self.CLIservice.join()
388
389   def set_serviceport(self, port):
390     self.serviceport = port
391
392   def set_CLIport(self, cliport):
393     self.CLIport = cliport
394
395   def set_dominoserver_ipaddr(self, ipaddr):
396     self.dominoserver_IP = ipaddr
397
398   def handle_RPC_timeout(self, RPCmessage):
399     # TBD: handle each RPC timeout separately
400     if RPCmessage.messageType == HEART_BEAT:
401       logging.debug('RPC Timeout for message type: HEART_BEAT') 
402     elif RPCmessage.messageType == PUBLISH:
403       logging.debug('RPC Timeout for message type: PUBLISH')
404     elif RPCmessage.messageType == SUBSCRIBE:
405       logging.debug('RPC Timeout for message type: SUBSCRIBE')
406     elif RPCmessage.messageType == REGISTER:
407       logging.debug('RPC Timeout for message type: REGISTER')
408     elif RPCmessage.messageType == QUERY:
409       logging.debug('RPC Timeout for message type: QUERY') 
410
411 def main(argv):
412   client = DominoClient()
413   loglevel = LOGLEVEL
414   interactive = INTERACTIVE
415   #process input arguments
416   try:
417       opts, args = getopt.getopt(argv,"hc:p:i:l:",["conf=","port=","ipaddr=","log=","iac=","cliport="])
418   except getopt.GetoptError:
419       print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel> --iac=true/false --cliport <cliport>'
420       sys.exit(2)
421   for opt, arg in opts:
422       if opt == '-h':
423          print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel> --iac=true/false --cliport <cliport>'
424          sys.exit()
425       elif opt in ("-c", "--conf"):
426          configfile = arg
427       elif opt in ("-p", "--port"):
428          client.set_serviceport(int(arg))
429       elif opt in ("-i", "--ipaddr"):
430          client.set_dominoserver_ipaddr(arg)
431       elif opt in ("-l", "--log"):
432          loglevel = arg
433       elif opt in ("--iac"):
434          interactive = arg.upper()
435       elif opt in ("--cliport"):
436          client.set_CLIport(int(arg))
437
438   #Set logging level
439   numeric_level = getattr(logging, loglevel.upper(), None)
440   try:
441     if not isinstance(numeric_level, int):
442       raise ValueError('Invalid log level: %s' % loglevel)
443     logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
444   except ValueError, ex:
445     print ex.message
446     exit()
447  
448   #The client is starting
449   logging.debug('Domino Client Starting...')
450   client.start()
451   client.startCLI(interactive)
452   client.start_communicationService()
453
454 if __name__ == "__main__":
455    main(sys.argv[1:])
456