added sqlite3 support at Domino Server for label subscriptions and client registrations
[domino.git] / DominoClient.py
1 #!/usr/bin/env python
2
3 #Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #       http://www.apache.org/licenses/LICENSE-2.0
8 #   Unless required by applicable law or agreed to in writing, software
9 #   distributed under the License is distributed on an "AS IS" BASIS,
10 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 #   See the License for the specific language governing permissions and
12 #   limitations under the License.
13
14
15 import sys, glob, threading
16 import getopt, socket
17 import logging
18
19 #sys.path.append('gen-py')
20 #sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
21 sys.path.insert(0, glob.glob('./lib')[0])
22
23 from dominoRPC import Communication
24 from dominoRPC.ttypes import *
25 from dominoRPC.constants import *
26
27 from dominoCLI import DominoClientCLI
28 from dominoCLI.ttypes import *
29 from dominoCLI.constants import *
30
31 from thrift import Thrift
32 from thrift.transport import TSocket
33 from thrift.transport import TTransport
34 from thrift.protocol import TBinaryProtocol
35 from thrift.server import TServer
36
37 from util import *
38
39 #Load configuration parameters
40 from domino_conf import *
41
42 class CommunicationHandler:
43   def __init__(self):
44     self.log = {}
45
46   def __init__(self, dominoclient):
47     self.log = {}
48     self.dominoClient = dominoclient
49     try:
50       # Make socket
51       transport = TSocket.TSocket(DOMINO_SERVER_IP, DOMINO_SERVER_PORT)
52       transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
53       # Add buffering to compensate for slow raw sockets
54       self.transport = TTransport.TBufferedTransport(transport)
55       # Wrap in a protocol
56       self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
57       # Create a client to use the protocol encoder
58       self.sender = Communication.Client(self.protocol)
59     except Thrift.TException, tx: 
60       logging.error('%s' , tx.message)
61
62   # Template Push from Domino Server is received
63   # Actions:
64   #       - Depending on Controller Domain, call API
65   #       - Respond Back with Push Response
66   def d_push(self, push_msg):
67     logging.info('%d Received Template File', self.dominoClient.UDID)
68     # Retrieve the template file
69
70     ## End of retrieval
71  
72     # Any inspection code goes here
73
74     ## End of inspection
75
76     # Call NB API
77     # If heat client, call heat command
78     
79     # If ONOS client, run as shell script
80
81
82     ## End of NB API call
83
84     # Marshall the response message for the Domino Server Fill
85     push_r = PushResponseMessage()
86     # Fill response message fields
87     push_r.domino_udid = self.dominoClient.UDID    
88     push_r.seq_no = self.dominoClient.seqno
89     push_r.responseCode = SUCCESS    
90     ## End of filling
91
92     self.dominoClient.seqno = self.dominoClient.seqno + 1
93
94     return push_r
95
96   
97   def openconnection(self):
98     self.transport.open()
99
100   def closeconnection():
101     self.transport.close()
102
103 class CLIHandler:
104   def __init__(self):
105     self.log = {}
106
107   def __init__(self, dominoclient, CLIservice):
108     self.log = {}
109     self.dominoClient = dominoclient
110     self.CLIservice = CLIservice
111
112   def d_CLI(self, msg):
113     logging.info('Received CLI %s', msg.CLI_input)
114
115     self.CLIservice.process_input(msg.CLI_input)
116     
117     CLIrespmsg = CLIResponse()
118     CLIrespmsg.CLI_response = "Testing..."
119     return CLIrespmsg
120  
121 def read_templatefile(temp_filename): 
122   f = open(temp_filename, 'r')
123   lines = f.read().splitlines()
124
125   return lines
126
127 class DominoClientCLIService(threading.Thread):
128   def __init__(self, dominoclient, communicationhandler, interactive):
129     threading.Thread.__init__(self)
130     self.dominoclient = dominoclient
131     self.communicationhandler = communicationhandler
132     self.interactive = interactive
133
134   def process_input(self, args):
135     try:
136       if args[0] == 'heartbeat':
137         self.dominoclient.heartbeat()
138
139       elif args[0] == 'publish':
140         opts, args = getopt.getopt(args[1:],"t:",["tosca-file="])
141         if len(opts) == 0:
142           print '\nUsage: publish -t <toscafile>'
143           return
144
145         for opt, arg in opts:
146           if opt in ('-t', '--tosca-file'):
147             toscafile = arg
148        
149         self.dominoclient.publish(toscafile)
150
151       elif args[0] == 'subscribe':
152         labels = []    
153         templateTypes = []
154         labelop = APPEND
155         templateop = APPEND
156         opts, args = getopt.getopt(args[1:],"l:t:",["labels=","ttype=","lop=","top="])
157         for opt, arg in opts:
158           if opt in ('-l', '--labels'):
159             labels = labels + arg.split(',')
160           elif opt in ('-t', '--ttype'):
161             templateTypes = templateTypes + arg.split(',')
162           elif opt in ('--lop'):
163             try:
164               labelop = str2enum[arg.upper()]
165             except KeyError as ex:
166               print '\nInvalid label option, pick one of: APPEND, OVERWRITE, DELETE'
167               return 
168           elif opt in ('--top'):
169             try:
170               templateop = str2enum[arg.upper()]
171             except KeyError as ex:
172               print '\nInvalid label option, pick one of: APPEND, OVERWRITE, DELETE'
173               return
174         
175         #check if labels or supported templates are nonempty
176         if labels != [] or templateTypes != []:
177           self.dominoclient.subscribe(labels, templateTypes, labelop, templateop)
178
179       elif args[0] == 'register':
180         self.dominoclient.start()
181
182     except getopt.GetoptError:
183       print 'Command is misentered or not supported!'
184
185
186   def run(self):
187     global DEFAULT_TOSCA_PUBFILE
188     if self.interactive == "TRUE":
189       flag = True
190     else:
191       flag = False
192
193     if flag: #interactive CLI, loop in while until killed
194       while True:
195          sys.stdout.write('>>')
196          input_string = raw_input()
197          args = input_string.split()
198          if len(args) == 0:
199            continue
200
201          sys.stdout.write('>>')
202          #process input arguments
203          self.process_input(args)
204     else: #domino cli-client is used, listen for the CLI rpc calls
205       cliHandler = CLIHandler(self.dominoclient, self)
206       processor = DominoClientCLI.Processor(cliHandler)
207       transport = TSocket.TServerSocket(port=self.dominoclient.CLIport)
208       tfactory = TTransport.TBufferedTransportFactory()
209       pfactory = TBinaryProtocol.TBinaryProtocolFactory()
210       #Use TThreadedServer or TThreadPoolServer for a multithreaded server
211       CLIServer = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
212       logging.debug('RPC service for CLI is starting...')
213       CLIServer.serve()       
214
215 class DominoClient:
216   def __init__(self):
217     self.communicationHandler = CommunicationHandler(self)
218     self.processor = None
219     self.transport = None
220     self.tfactory = None
221     self.pfactory = None
222     self.communicationServer = None
223
224     self.CLIservice = None
225
226     self.serviceport = 9091
227     self.dominoserver_IP = 'localhost'
228
229     self.CLIport = DOMINO_CLI_PORT 
230
231     #Start from UNREGISTERED STATE
232     #TO BE DONE: initialize from a saved state
233     self.state = 'UNREGISTERED'
234     self.seqno = 0
235     self.UDID = 1
236
237   def start_communicationService(self):
238     self.processor = Communication.Processor(self.communicationHandler)
239     self.transport = TSocket.TServerSocket(port=int(self.serviceport))
240     self.tfactory = TTransport.TBufferedTransportFactory()
241     self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
242     #Use TThreadedServer or TThreadPoolServer for a multithreaded server
243     #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
244     self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
245
246     self.communicationServer.serve()
247  
248   def start(self):
249     try:
250       self.communicationHandler.openconnection()
251       self.register()
252     except Thrift.TException, tx:
253       print '%s' % (tx.message)
254    
255   def register(self):  
256     if self.state == 'UNREGISTERED':
257       logging.info('%d Sending Registration', self.UDID)
258       #prepare registration message
259       reg_msg = RegisterMessage()
260       reg_msg.domino_udid_desired = UDID_DESIRED
261       reg_msg.seq_no = self.seqno
262       reg_msg.ipaddr = netutil.get_ip()
263       reg_msg.tcpport = self.serviceport
264       reg_msg.supported_templates = LIST_SUPPORTED_TEMPLATES
265
266       try:
267         reg_msg_r = self.sender().d_register(reg_msg)
268         logging.info('Registration Response: Response Code: %d'  , reg_msg_r.responseCode)
269         if reg_msg_r.comments:
270           logging.debug('Response Comments: %s' ,  reg_msg_r.comments)
271
272         if reg_msg_r.responseCode == SUCCESS:
273           self.state = 'REGISTERED'
274           self.UDID = reg_msg_r.domino_udid_assigned
275         else:
276           #Handle registration failure here (possibly based on reponse comments)
277           pass
278       except (Thrift.TException, TSocket.TTransportException) as tx:
279         logging.error('%s' , tx.message)
280       except (socket.timeout) as tx:
281         self.dominoclient.handle_RPC_timeout(pub_msg)
282       except (socket.error) as tx:
283         logging.error('%s' , tx.message)
284       self.seqno = self.seqno + 1
285
286   def heartbeat(self):
287     if self.state == 'UNREGISTERED':
288       self.start()
289           
290     logging.info('%d Sending heartbeat', self.UDID)
291     hbm = HeartBeatMessage()         
292     hbm.domino_udid = self.UDID        
293     hbm.seq_no = self.seqno         
294
295     try:
296       hbm_r = self.sender().d_heartbeat(hbm)
297       logging.info('heart beat received from: %d ,sequence number: %d' , hbm_r.domino_udid, hbm_r.seq_no)
298     except (Thrift.TException, TSocket.TTransportException) as tx:
299       logging.error('%s' , tx.message)
300     except (socket.timeout) as tx:
301       self.handle_RPC_timeout(hbm)
302     except:
303       logging.error('Unexpected error: %s', sys.exc_info()[0])
304     
305     self.seqno = self.seqno + 1    
306
307   def publish(self, toscafile):
308     if self.state == 'UNREGISTERED':
309       self.start()
310
311     logging.info('Publishing the template file: ' + toscafile)
312     pub_msg = PublishMessage()
313     pub_msg.domino_udid = self.UDID
314     pub_msg.seq_no = self.seqno
315     pub_msg.template_type = 'tosca-nfv-v1.0'
316
317     try:
318       pub_msg.template = read_templatefile(toscafile)
319     except IOError as e:
320       logging.error('I/O error(%d): %s' , e.errno, e.strerror)
321       return
322     try:
323       pub_msg_r = self.sender().d_publish(pub_msg)
324       logging.info('Publish Response is received from: %d ,sequence number: %d', pub_msg_r.domino_udid, pub_msg_r.seq_no)
325     except (Thrift.TException, TSocket.TTransportException) as tx:
326       print '%s' % (tx.message)
327     except (socket.timeout) as tx:
328       self.handle_RPC_timeout(pub_msg)
329
330     self.seqno = self.seqno + 1
331
332   def subscribe(self, labels, templateTypes, label_op, template_op):
333      if self.state == 'UNREGISTERED':
334        self.start()
335
336      logging.info('subscribing labels %s and templates %s', labels, templateTypes)
337      #send subscription message
338      sub_msg = SubscribeMessage()
339      sub_msg.domino_udid = self.UDID
340      sub_msg.seq_no = self.seqno
341      sub_msg.template_op = template_op
342      sub_msg.supported_template_types = templateTypes
343      sub_msg.label_op = label_op
344      sub_msg.labels = labels
345      try:
346        sub_msg_r = self.sender().d_subscribe(sub_msg)
347        logging.info('Subscribe Response is received from: %d ,sequence number: %d', sub_msg_r.domino_udid,sub_msg_r.seq_no)
348      except (Thrift.TException, TSocket.TTransportException) as tx: 
349        logging.error('%s' , tx.message)
350      except (socket.timeout) as tx: 
351        self.handle_RPC_timeout(sub_msg)
352
353      self.seqno = self.seqno + 1 
354
355   def stop(self):
356     try:
357       self.communicationHandler.closeconnection()
358     except Thrift.TException, tx:
359       logging.error('%s' , tx.message)
360     
361   def sender(self):
362     return self.communicationHandler.sender
363
364   def startCLI(self, interactive):
365     self.CLIservice = DominoClientCLIService(self, self.communicationHandler, interactive)
366     logging.info('CLI Service is starting')
367     self.CLIservice.start()
368     #to wait until CLI service is finished
369     #self.CLIservice.join()
370
371   def set_serviceport(self, port):
372     self.serviceport = port
373
374   def set_CLIport(self, cliport):
375     self.CLIport = cliport
376
377   def set_dominoserver_ipaddr(self, ipaddr):
378     self.dominoserver_IP = ipaddr
379
380   def handle_RPC_timeout(self, RPCmessage):
381     # TBD: handle each RPC timeout separately
382     if RPCmessage.messageType == HEART_BEAT:
383       logging.debug('RPC Timeout for message type: HEART_BEAT') 
384     elif RPCmessage.messageType == PUBLISH:
385       logging.debug('RPC Timeout for message type: PUBLISH')
386     elif RPCmessage.messageType == SUBSCRIBE:
387       logging.debug('RPC Timeout for message type: SUBSCRIBE')
388     elif RPCmessage.messageType == REGISTER:
389       logging.debug('RPC Timeout for message type: REGISTER')
390     elif RPCmessage.messageType == QUERY:
391       logging.debug('RPC Timeout for message type: QUERY') 
392
393 def main(argv):
394   client = DominoClient()
395   loglevel = 'WARNING'
396   interactive = "FALSE"
397   #process input arguments
398   try:
399       opts, args = getopt.getopt(argv,"hc:p:i:l:",["conf=","port=","ipaddr=","log=","iac=","cliport="])
400   except getopt.GetoptError:
401       print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel> --iac=true/false'
402       sys.exit(2)
403   for opt, arg in opts:
404       if opt == '-h':
405          print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel> --iac=true/false'
406          sys.exit()
407       elif opt in ("-c", "--conf"):
408          configfile = arg
409       elif opt in ("-p", "--port"):
410          client.set_serviceport(int(arg))
411       elif opt in ("-i", "--ipaddr"):
412          client.set_dominoserver_ipaddr(arg)
413       elif opt in ("-l", "--log"):
414          loglevel = arg
415       elif opt in ("--iac"):
416          interactive = arg.upper()
417       elif opt in ("--cliport"):
418          client.set_CLIport(int(arg))
419
420   #Set logging level
421   numeric_level = getattr(logging, loglevel.upper(), None)
422   try:
423     if not isinstance(numeric_level, int):
424       raise ValueError('Invalid log level: %s' % loglevel)
425     logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
426   except ValueError, ex:
427     print ex.message
428     exit()
429  
430   #The client is starting
431   logging.debug('Domino Client Starting...')
432   client.start()
433   client.startCLI(interactive)
434   client.start_communicationService()
435
436 if __name__ == "__main__":
437    main(sys.argv[1:])
438