Auto Generated INFO.yaml file
[domino.git] / DominoClient.py
1 #!/usr/bin/env python
2
3 #Copyright 2016 Open Platform for NFV Project, Inc. and its contributors
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #       http://www.apache.org/licenses/LICENSE-2.0
8 #   Unless required by applicable law or agreed to in writing, software
9 #   distributed under the License is distributed on an "AS IS" BASIS,
10 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 #   See the License for the specific language governing permissions and
12 #   limitations under the License.
13
14
15 import sys, os, glob, threading
16 import getopt, socket
17 import logging, errno
18 import uuid
19
20 sys.path.insert(0, glob.glob('./lib')[0])
21
22 from dominoRPC import Communication
23 from dominoRPC.ttypes import *
24 from dominoRPC.constants import *
25
26 from dominoCLI import DominoClientCLI
27 from dominoCLI.ttypes import *
28 from dominoCLI.constants import *
29
30 from thrift import Thrift
31 from thrift.transport import TSocket
32 from thrift.transport import TTransport
33 from thrift.protocol import TBinaryProtocol
34 from thrift.server import TServer
35
36 from util import *
37
38 #Load configuration parameters
39 from domino_conf import *
40
41 class CommunicationHandler:
42   def __init__(self):
43     self.log = {}
44
45   def __init__(self, dominoclient):
46     self.log = {}
47     self.dominoClient = dominoclient
48     self.transport = None
49     self.protocol = None
50     self.sender = None
51
52   # Template Push from Domino Server is received
53   # Actions:
54   #       - Depending on Controller Domain, call API
55   #       - Respond Back with Push Response
56   def d_push(self, push_msg):
57     logging.info('%s Received Template File', self.dominoClient.UDID)
58     # Retrieve the template file
59     try:
60       os.makedirs(TOSCA_RX_DIR+str(self.dominoClient.UDID))
61     except OSError as exception:
62       if exception.errno == errno.EEXIST:
63         logging.debug('IGNORING error: ERRNO %d; %s exists.', exception.errno, TOSCA_RX_DIR+str(self.dominoClient.UDID))
64       else:
65         logging.error('IGNORING error in creating %s. Err no: %d', exception.errno)
66
67     try:  
68       miscutil.write_templatefile(TOSCA_RX_DIR+str(self.dominoClient.UDID)+'/'+str(push_msg.template_UUID)+'.yaml' , push_msg.template)
69     except:    
70       logging.error('FAILED to write the pushed file: %s', sys.exc_info()[0])
71       push_r = PushResponseMessage()
72       # Fill response message fields
73       push_r.domino_udid = self.dominoClient.UDID
74       push_r.seq_no = self.dominoClient.seqno
75       push_r.responseCode = FAILED
76       self.dominoClient.seqno = self.dominoClient.seqno + 1
77       return push_r# Any inspection code goes here
78
79     ## End of inspection
80
81     # Call NB API
82     # If heat client, call heat command
83     
84     # If ONOS client, run as shell script
85
86
87     ## End of NB API call
88
89     # Marshall the response message for the Domino Server Fill
90     push_r = PushResponseMessage()
91     # Fill response message fields
92     push_r.domino_udid = self.dominoClient.UDID    
93     push_r.seq_no = self.dominoClient.seqno
94     push_r.responseCode = SUCCESS    
95     ## End of filling
96
97     self.dominoClient.seqno = self.dominoClient.seqno + 1
98
99     return push_r
100
101   
102   def openconnection(self):
103     try:
104       # Make socket
105       transport = TSocket.TSocket(self.dominoClient.dominoserver_IP, DOMINO_SERVER_PORT)
106       transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
107       # Add buffering to compensate for slow raw sockets
108       self.transport = TTransport.TBufferedTransport(transport)
109       # Wrap in a protocol
110       self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
111       # Create a client to use the protocol encoder
112       self.sender = Communication.Client(self.protocol)
113       self.transport.open()
114     except Thrift.TException, tx:
115       logging.error('%s' , tx.message)
116
117   def closeconnection():
118     self.transport.close()
119
120 class CLIHandler:
121   def __init__(self):
122     self.log = {}
123
124   def __init__(self, dominoclient, CLIservice):
125     self.log = {}
126     self.dominoClient = dominoclient
127     self.CLIservice = CLIservice
128
129   def d_CLI(self, msg):
130     #logging.info('Received CLI %s', msg.CLI_input) #breaks testing due to random TUIDs
131
132     CLIrespmsg = CLIResponse()
133     CLIrespmsg.CLI_response = self.CLIservice.process_input(msg.CLI_input)
134     return CLIrespmsg
135  
136
137 class DominoClientCLIService(threading.Thread):
138   def __init__(self, dominoclient, communicationhandler, interactive):
139     threading.Thread.__init__(self)
140     self.dominoclient = dominoclient
141     self.communicationhandler = communicationhandler
142     self.interactive = interactive
143
144   def process_input(self, args):
145     if len(args) == 0:
146       return 'Empty API body'
147
148     try:
149       if args[0] == 'heartbeat':
150         self.dominoclient.heartbeat()
151
152       elif args[0] == 'publish':
153         opts, args = getopt.getopt(args[1:],"t:k:",["tosca-file=","tuid"])
154         if len(opts) == 0:
155           print '\nUsage: publish -t <toscafile> -k <TUID>'
156           return
157         
158         template_UUID = None
159         toscafile = None
160         for opt, arg in opts:
161           if opt in ('-t', '--tosca-file'):
162             toscafile = arg
163           elif opt in ('-k', '--tuid'):
164             template_UUID = arg
165         if toscafile is not None:
166           self.dominoclient.publish(toscafile,template_UUID)
167         else:
168           print '\nUsage: publish -t <toscafile> -k <TUID>'
169
170       elif args[0] == 'subscribe':
171         labels = []    
172         templateTypes = []
173         labelop = APPEND
174         templateop = APPEND
175         opts, args = getopt.getopt(args[1:],"l:t:",["labels=","ttype=","lop=","top="])
176         for opt, arg in opts:
177           if opt in ('-l', '--labels'):
178             labels = labels + arg.split(',')
179           elif opt in ('-t', '--ttype'):
180             templateTypes = templateTypes + arg.split(',')
181           elif opt in ('--lop'):
182             try:
183               labelop = str2enum[arg.upper()]
184             except KeyError as ex:
185               print '\nInvalid label option, pick one of: APPEND, OVERWRITE, DELETE'
186               return 
187           elif opt in ('--top'):
188             try:
189               templateop = str2enum[arg.upper()]
190             except KeyError as ex:
191               print '\nInvalid label option, pick one of: APPEND, OVERWRITE, DELETE'
192               return
193         
194         #check if labels or supported templates are nonempty
195         if labels != [] or templateTypes != []:
196           self.dominoclient.subscribe(labels, templateTypes, labelop, templateop)
197
198       elif args[0] == 'register':
199         self.dominoclient.start()
200
201       elif args[0] == 'list-tuids':
202         return self.dominoclient.query(['list-tuids'])
203
204       else:
205         return 'Command is misentered or not supported!'
206
207     except getopt.GetoptError:
208       print 'Command is misentered or not supported!'
209
210
211   def run(self):
212     global DEFAULT_TOSCA_PUBFILE
213     if self.interactive == "TRUE":
214       flag = True
215     else:
216       flag = False
217
218     if flag: #interactive CLI, loop in while until killed
219       while True:
220          sys.stdout.write('>>')
221          input_string = raw_input()
222          args = input_string.split()
223          if len(args) == 0:
224            continue
225
226          sys.stdout.write('>>')
227          #process input arguments
228          resp_msg = self.process_input(args)
229          if resp_msg is not None:
230            print resp_msg
231     else: #domino cli-client is used, listen for the CLI rpc calls
232       cliHandler = CLIHandler(self.dominoclient, self)
233       processor = DominoClientCLI.Processor(cliHandler)
234       transport = TSocket.TServerSocket(port=self.dominoclient.CLIport)
235       tfactory = TTransport.TBufferedTransportFactory()
236       pfactory = TBinaryProtocol.TBinaryProtocolFactory()
237       #Use TThreadedServer or TThreadPoolServer for a multithreaded server
238       CLIServer = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
239       logging.debug('RPC service for CLI is starting...')
240       CLIServer.serve()       
241
242 class DominoClient:
243   def __init__(self):
244
245
246     self.communicationHandler = CommunicationHandler(self)
247     self.processor = None
248     self.transport = None
249     self.tfactory = None
250     self.pfactory = None
251     self.communicationServer = None
252
253     self.CLIservice = None
254
255     self.serviceport = DOMINO_CLIENT_PORT
256     self.dominoserver_IP = DOMINO_SERVER_IP
257     self.CLIport = DOMINO_CLI_PORT 
258
259     #Start from UNREGISTERED STATE
260     #TO BE DONE: initialize from a saved state
261     self.state = 'UNREGISTERED'
262     self.seqno = 0
263     self.UDID = 1
264
265   def start_communicationService(self):
266     self.processor = Communication.Processor(self.communicationHandler)
267     self.transport = TSocket.TServerSocket(port=int(self.serviceport))
268     self.tfactory = TTransport.TBufferedTransportFactory()
269     self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
270     #Use TThreadedServer or TThreadPoolServer for a multithreaded server
271     #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
272     self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
273
274     self.communicationServer.serve()
275  
276   def start(self):
277     try:
278       self.communicationHandler.openconnection()
279       self.register()
280     except Thrift.TException, tx:
281       print '%s' % (tx.message)
282    
283   def register(self):  
284     if self.state == 'UNREGISTERED':
285       logging.info('%d Sending Registration', self.UDID)
286       #prepare registration message
287       reg_msg = RegisterMessage()
288       reg_msg.domino_udid_desired = UDID_DESIRED
289       reg_msg.seq_no = self.seqno
290       reg_msg.ipaddr = netutil.get_ip()
291       reg_msg.tcpport = self.serviceport
292       reg_msg.supported_templates = LIST_SUPPORTED_TEMPLATES
293
294       try:
295         reg_msg_r = self.sender().d_register(reg_msg)
296         logging.info('Registration Response: Response Code: %d'  , reg_msg_r.responseCode)
297         if reg_msg_r.comments:
298           logging.debug('Response Comments: %s' ,  reg_msg_r.comments)
299
300         if reg_msg_r.responseCode == SUCCESS:
301           self.state = 'REGISTERED'
302           self.UDID = reg_msg_r.domino_udid_assigned
303         else:
304           #Handle registration failure here (possibly based on reponse comments)
305           pass
306       except (Thrift.TException, TSocket.TTransportException) as tx:
307         logging.error('%s' , tx.message)
308       except (socket.timeout) as tx:
309         self.handle_RPC_timeout(reg_msg)
310       except (socket.error) as tx:
311         logging.error('%s' , tx.message)
312       self.seqno = self.seqno + 1
313
314   def heartbeat(self):
315     if self.state == 'UNREGISTERED':
316       self.start()
317           
318     logging.info('%s Sending heartbeat', self.UDID)
319     hbm = HeartBeatMessage()         
320     hbm.domino_udid = self.UDID        
321     hbm.seq_no = self.seqno         
322
323     try:
324       hbm_r = self.sender().d_heartbeat(hbm)
325       logging.info('heart beat received from: %s ,sequence number: %d' , hbm_r.domino_udid, hbm_r.seq_no)
326     except (Thrift.TException, TSocket.TTransportException) as tx:
327       logging.error('%s' , tx.message)
328     except (socket.timeout) as tx:
329       self.handle_RPC_timeout(hbm)
330     except:
331       logging.error('Unexpected error: %s', sys.exc_info()[0])
332     
333     self.seqno = self.seqno + 1    
334
335   def publish(self, toscafile, template_UUID=None):
336     if self.state == 'UNREGISTERED':
337       self.start()
338
339     logging.info('Publishing the template file: ' + toscafile)
340     pub_msg = PublishMessage()
341     pub_msg.domino_udid = self.UDID
342     pub_msg.seq_no = self.seqno
343     pub_msg.template_type = 'tosca-nfv-v1.0'
344     if template_UUID is not None:
345       pub_msg.template_UUID = template_UUID
346
347     try:
348       pub_msg.template = miscutil.read_templatefile(toscafile)
349     except IOError as e:
350       logging.error('I/O error(%d): %s' , e.errno, e.strerror)
351       return
352     try:
353       pub_msg_r = self.sender().d_publish(pub_msg)
354       logging.info('Publish Response is received from: %s ,sequence number: %d Status: %d', pub_msg_r.domino_udid, pub_msg_r.seq_no, pub_msg_r.responseCode)
355     except (Thrift.TException, TSocket.TTransportException) as tx:
356       print '%s' % (tx.message)
357     except (socket.timeout) as tx:
358       self.handle_RPC_timeout(pub_msg)
359
360     self.seqno = self.seqno + 1
361
362   def subscribe(self, labels, templateTypes, label_op, template_op):
363      if self.state == 'UNREGISTERED':
364        self.start()
365
366      logging.info('subscribing labels %s and templates %s', labels, templateTypes)
367      #send subscription message
368      sub_msg = SubscribeMessage()
369      sub_msg.domino_udid = self.UDID
370      sub_msg.seq_no = self.seqno
371      sub_msg.template_op = template_op
372      sub_msg.supported_template_types = templateTypes
373      sub_msg.label_op = label_op
374      sub_msg.labels = labels
375      try:
376        sub_msg_r = self.sender().d_subscribe(sub_msg)
377        logging.info('Subscribe Response is received from: %s ,sequence number: %d', sub_msg_r.domino_udid,sub_msg_r.seq_no)
378      except (Thrift.TException, TSocket.TTransportException) as tx: 
379        logging.error('%s' , tx.message)
380      except (socket.timeout) as tx: 
381        self.handle_RPC_timeout(sub_msg)
382
383      self.seqno = self.seqno + 1 
384
385   def query(self, queryString, template_UUID=None):
386     logging.info('querying Domino Server: %s', queryString)
387     query_msg = QueryMessage()
388     query_msg.domino_udid = self.UDID
389     query_msg.seq_no = self.seqno
390     query_msg.queryString = queryString
391     query_msg.template_UUID = template_UUID 
392     self.seqno = self.seqno + 1
393     try:
394       query_msg_r = self.sender().d_query(query_msg)
395       logging.info('Query Response is received from: %s ,sequence number: %d', query_msg_r.domino_udid,query_msg_r.seq_no)
396       if (query_msg_r.queryResponse is not None) and (len(query_msg_r.queryResponse)>0):
397         return query_msg_r.queryResponse
398     except (Thrift.TException, TSocket.TTransportException) as tx:
399       logging.error('%s' , tx.message)
400     except (socket.timeout) as tx:
401       self.handle_RPC_timeout(query_msg)
402
403   def stop(self):
404     try:
405       self.communicationHandler.closeconnection()
406     except Thrift.TException, tx:
407       logging.error('%s' , tx.message)
408     
409   def sender(self):
410     return self.communicationHandler.sender
411
412   def startCLI(self, interactive):
413     self.CLIservice = DominoClientCLIService(self, self.communicationHandler, interactive)
414     logging.info('CLI Service is starting')
415     self.CLIservice.start()
416     #to wait until CLI service is finished
417     #self.CLIservice.join()
418
419   def set_serviceport(self, port):
420     self.serviceport = port
421
422   def set_CLIport(self, cliport):
423     self.CLIport = cliport
424
425   def set_dominoserver_ipaddr(self, ipaddr):
426     self.dominoserver_IP = ipaddr
427
428   def handle_RPC_timeout(self, RPCmessage):
429     # TBD: handle each RPC timeout separately
430     if RPCmessage.messageType == HEART_BEAT:
431       logging.debug('RPC Timeout for message type: HEART_BEAT') 
432     elif RPCmessage.messageType == PUBLISH:
433       logging.debug('RPC Timeout for message type: PUBLISH')
434     elif RPCmessage.messageType == SUBSCRIBE:
435       logging.debug('RPC Timeout for message type: SUBSCRIBE')
436     elif RPCmessage.messageType == REGISTER:
437       logging.debug('RPC Timeout for message type: REGISTER')
438     elif RPCmessage.messageType == QUERY:
439       logging.debug('RPC Timeout for message type: QUERY') 
440
441 def main():
442   client = DominoClient()
443   loglevel = LOGLEVEL
444   interactive = INTERACTIVE
445   #process input arguments
446   try:
447       opts, args = getopt.getopt(sys.argv[1:],"hc:p:i:l:",["conf=","port=","ipaddr=","log=","iac=","cliport=","uuid=","regmod="])
448   except getopt.GetoptError:
449       print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel> --iac=true/false --cliport <cliport>'
450       sys.exit(2)
451   for opt, arg in opts:
452       if opt == '-h':
453          print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel> --iac=true/false --cliport <cliport>'
454          sys.exit()
455       elif opt in ("-c", "--conf"):
456          configfile = arg
457       elif opt in ("-p", "--port"):
458          client.set_serviceport(int(arg))
459       elif opt in ("-i", "--ipaddr"):
460          client.set_dominoserver_ipaddr(arg)
461       elif opt in ("-l", "--log"):
462          loglevel = arg
463       elif opt in ("--iac"):
464          interactive = arg.upper()
465       elif opt in ("--cliport"):
466          client.set_CLIport(int(arg))
467       elif opt in ("--uuid"):
468          client.UDID = arg
469       elif opt in ("--regmod"):
470          if arg.upper() == 'REGISTERED': 
471            client.state = 'REGISTERED'
472   #Set logging level
473   numeric_level = getattr(logging, loglevel.upper(), None)
474   try:
475     if not isinstance(numeric_level, int):
476       raise ValueError('Invalid log level: %s' % loglevel)
477     logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
478   except ValueError, ex:
479     print ex.message
480     exit()
481  
482   #The client is starting
483   logging.debug('Domino Client Starting...')
484   client.start()
485   client.startCLI(interactive)
486   client.start_communicationService()
487
488 if __name__ == "__main__":
489   sys.exit(main())