Merge "Example as code, documentation template for sphinx build"
[domino.git] / DominoClient.py
1 #!/usr/bin/env python
2
3 #Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #       http://www.apache.org/licenses/LICENSE-2.0
8 #   Unless required by applicable law or agreed to in writing, software
9 #   distributed under the License is distributed on an "AS IS" BASIS,
10 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 #   See the License for the specific language governing permissions and
12 #   limitations under the License.
13
14
15 import sys, glob, threading
16 import getopt, socket
17 import logging
18
19 #sys.path.append('gen-py')
20 #sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
21 sys.path.insert(0, glob.glob('./lib')[0])
22
23 from dominoRPC import Communication
24 from dominoRPC.ttypes import *
25 from dominoRPC.constants import *
26
27 from thrift import Thrift
28 from thrift.transport import TSocket
29 from thrift.transport import TTransport
30 from thrift.protocol import TBinaryProtocol
31 from thrift.server import TServer
32
33 from util import *
34
35 #Load configuration parameters
36 from domino_conf import *
37
38 class CommunicationHandler:
39   def __init__(self):
40     self.log = {}
41
42   def __init__(self, dominoclient):
43     global DOMINO_SERVER_IP, DOMINO_SERVER_PORT
44     self.log = {}
45     self.dominoClient = dominoclient
46     try:
47       # Make socket
48       transport = TSocket.TSocket(DOMINO_SERVER_IP, DOMINO_SERVER_PORT)
49       transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
50       # Add buffering to compensate for slow raw sockets
51       self.transport = TTransport.TBufferedTransport(transport)
52       # Wrap in a protocol
53       self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
54       # Create a client to use the protocol encoder
55       self.sender = Communication.Client(self.protocol)
56     except Thrift.TException, tx: 
57       logging.error('%s' , tx.message)
58
59   # Template Push from Domino Server is received
60   # Actions:
61   #       - Depending on Controller Domain, call API
62   #       - Respond Back with Push Response
63   def d_push(self, push_msg):
64     logging.info('%d Received Template File', self.dominoClient.UDID)
65     # Retrieve the template file
66
67     ## End of retrieval
68  
69     # Any inspection code goes here
70
71     ## End of inspection
72
73     # Call NB API
74     # If heat client, call heat command
75     
76     # If ONOS client, run as shell script
77
78
79     ## End of NB API call
80
81     # Marshall the response message for the Domino Server Fill
82     push_r = PushResponseMessage()
83     # Fill response message fields
84     push_r.domino_udid = self.dominoClient.UDID    
85     push_r.seq_no = self.dominoClient.seqno
86     push_r.responseCode = SUCCESS    
87     ## End of filling
88
89     self.dominoClient.seqno = self.dominoClient.seqno + 1
90
91     return push_r
92
93   
94   def openconnection(self):
95     self.transport.open()
96
97   def closeconnection():
98     self.transport.close()
99  
100 def read_templatefile(temp_filename): 
101   f = open(temp_filename, 'r')
102   lines = f.read().splitlines()
103
104   return lines
105
106 class DominoClientCLIService(threading.Thread):
107   def __init__(self, dominoclient, communicationhandler):
108     threading.Thread.__init__(self)
109     self.dominoclient = dominoclient
110     self.communicationhandler = communicationhandler
111
112   def run(self):
113     global DEFAULT_TOSCA_PUBFILE
114     while True:
115        sys.stdout.write('>>')
116        input_string = raw_input()
117        args = input_string.split()
118        if len(args) == 0:
119          continue
120
121        labels = []       
122        templateTypes = []
123
124        #process input arguments
125        try:
126          sys.stdout.write('>>')
127          if args[0] == 'heartbeat':
128            logging.info('%d Sending heatbeat', self.dominoclient.UDID)
129            hbm = HeartBeatMessage()
130            hbm.domino_udid = self.dominoclient.UDID
131            hbm.seq_no = self.dominoclient.seqno
132            try:
133              hbm_r = self.communicationhandler.sender.d_heartbeat(hbm)
134              logging.info('heart beat received from: %d ,sequence number: %d' , hbm_r.domino_udid, hbm_r.seq_no)
135            except (Thrift.TException, TSocket.TTransportException) as tx:
136              logging.error('%s' , tx.message)
137            except (socket.timeout) as tx:
138              self.dominoclient.handle_RPC_timeout(hbm)
139            except:
140              logging.error('Unexpected error: %s', sys.exc_info()[0])
141            self.dominoclient.seqno = self.dominoclient.seqno + 1
142          
143          elif args[0] == 'publish':
144            opts, args = getopt.getopt(args[1:],"t:",["tosca-file="])
145            if len(opts) == 0:
146              print '\nUsage: publish -t <toscafile>'
147              continue
148
149            #toscafile = DEFAULT_TOSCA_PUBFILE
150            for opt, arg in opts:
151              if opt in ('-t', '--tosca-file'):
152                toscafile = arg
153            
154            pub_msg = PublishMessage()
155            pub_msg.domino_udid = self.dominoclient.UDID
156            pub_msg.seq_no = self.dominoclient.seqno
157            pub_msg.template_type = 'tosca-nfv-v1.0'
158            try:
159              pub_msg.template = read_templatefile(toscafile)
160            except IOError as e:
161              logging.error('I/O error(%d): %s' , e.errno, e.strerror)
162              continue
163            logging.info('Publishing the template file: ' + toscafile)
164            try:
165              pub_msg_r = self.communicationhandler.sender.d_publish(pub_msg)
166              logging.info('Publish Response is received from: %d ,sequence number: %d', pub_msg_r.domino_udid, pub_msg_r.seq_no)
167            except (Thrift.TException, TSocket.TTransportException) as tx:
168              print '%s' % (tx.message)
169            except (socket.timeout) as tx:
170              self.dominoclient.handle_RPC_timeout(pub_msg)
171
172            self.dominoclient.seqno = self.dominoclient.seqno + 1
173        
174          elif args[0] == 'subscribe':         
175            opts, args = getopt.getopt(args[1:],"l:t:",["labels=","ttype="])
176            for opt, arg in opts:
177               if opt in ('-l', '--labels'):
178                  labels = labels + arg.split(',')
179               elif opt in ('-t', '--ttype'):
180                  templateTypes = templateTypes + arg.split(',')
181          
182          elif args[0] == 'register':
183            self.dominoclient.start()
184   
185        except getopt.GetoptError:
186          print 'Command is misentered or not supported!'
187
188
189        #check if labels or supported templates are nonempty
190        if labels != [] or templateTypes != []:
191          #send subscription message
192          sub_msg = SubscribeMessage()
193          sub_msg.domino_udid = self.dominoclient.UDID
194          sub_msg.seq_no = self.dominoclient.seqno
195          sub_msg.template_op = APPEND
196          sub_msg.supported_template_types = templateTypes
197          sub_msg.label_op = APPEND
198          sub_msg.labels = labels
199          logging.info('subscribing labels %s and templates %s', labels, templateTypes)
200          try:
201            sub_msg_r = self.communicationhandler.sender.d_subscribe(sub_msg) 
202            logging.info('Subscribe Response is received from: %d ,sequence number: %d', sub_msg_r.domino_udid,sub_msg_r.seq_no)
203          except (Thrift.TException, TSocket.TTransportException) as tx:
204            logging.error('%s' , tx.message)
205          except (socket.timeout) as tx:
206            self.dominoclient.handle_RPC_timeout(sub_msg)
207
208          self.dominoclient.seqno = self.dominoclient.seqno + 1
209
210 class DominoClient:
211   def __init__(self):
212     self.communicationHandler = CommunicationHandler(self)
213     self.processor = None
214     self.transport = None
215     self.tfactory = None
216     self.pfactory = None
217     self.communicationServer = None
218
219     self.CLIservice = DominoClientCLIService(self, self.communicationHandler)
220
221     self.serviceport = 9091
222     self.dominoserver_IP = 'localhost'
223
224     #Start from UNREGISTERED STATE
225     #TO BE DONE: initialize from a saved state
226     self.state = 'UNREGISTERED'
227     self.seqno = 0
228     self.UDID = 1
229
230   def start_communicationService(self):
231     self.processor = Communication.Processor(self.communicationHandler)
232     self.transport = TSocket.TServerSocket(port=int(self.serviceport))
233     self.tfactory = TTransport.TBufferedTransportFactory()
234     self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
235     #Use TThreadedServer or TThreadPoolServer for a multithreaded server
236     #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
237     self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
238
239     self.communicationServer.serve()
240  
241   def start(self):
242     try:
243       self.communicationHandler.openconnection()
244       self.register()
245     except Thrift.TException, tx:
246       print '%s' % (tx.message)
247    
248   def register(self):  
249     if self.state == 'UNREGISTERED':
250       #prepare registration message
251       reg_msg = RegisterMessage()
252       reg_msg.domino_udid_desired = UDID_DESIRED
253       reg_msg.seq_no = self.seqno
254       reg_msg.ipaddr = netutil.get_ip()
255       reg_msg.tcpport = self.serviceport
256       reg_msg.supported_templates = LIST_SUPPORTED_TEMPLATES
257
258       try:
259         reg_msg_r = self.sender().d_register(reg_msg)
260         logging.info('Registration Response: Response Code: %d'  , reg_msg_r.responseCode)
261         if reg_msg_r.comments:
262           logging.debug('Response Comments: %s' ,  reg_msg_r.comments)
263
264         if reg_msg_r.responseCode == SUCCESS:
265           self.state = 'REGISTERED'
266           self.UDID = reg_msg_r.domino_udid_assigned
267         else:
268           #Handle registration failure here (possibly based on reponse comments)
269           pass
270       except (Thrift.TException, TSocket.TTransportException) as tx:
271         logging.error('%s' , tx.message)
272       except (socket.timeout) as tx:
273         self.dominoclient.handle_RPC_timeout(pub_msg)
274       except (socket.error) as tx:
275         logging.error('%s' , tx.message)
276       self.seqno = self.seqno + 1
277
278   def stop(self):
279     try:
280       self.communicationHandler.closeconnection()
281     except Thrift.TException, tx:
282       logging.error('%s' , tx.message)
283     
284   def sender(self):
285     return self.communicationHandler.sender
286
287   def startCLI(self):
288     logging.info('CLI Service is starting')
289     self.CLIservice.start()
290     #to wait until CLI service is finished
291     #self.CLIservice.join()
292
293   def set_serviceport(self, port):
294     self.serviceport = port
295
296   def set_dominoserver_ipaddr(self, ipaddr):
297     self.dominoserver_IP = ipaddr
298
299   def handle_RPC_timeout(self, RPCmessage):
300     # TBD: handle each RPC timeout separately
301     if RPCmessage.messageType == HEART_BEAT:
302       logging.debug('RPC Timeout for message type: HEART_BEAT') 
303     elif RPCmessage.messageType == PUBLISH:
304       logging.debug('RPC Timeout for message type: PUBLISH')
305     elif RPCmessage.messageType == SUBSCRIBE:
306       logging.debug('RPC Timeout for message type: SUBSCRIBE')
307     elif RPCmessage.messageType == REGISTER:
308       logging.debug('RPC Timeout for message type: REGISTER')
309     elif RPCmessage.messageType == QUERY:
310       logging.debug('RPC Timeout for message type: QUERY') 
311
312 def main(argv):
313   client = DominoClient()
314   loglevel = 'WARNING'
315   #process input arguments
316   try:
317       opts, args = getopt.getopt(argv,"hc:p:i:l:",["conf=","port=","ipaddr=","log="])
318   except getopt.GetoptError:
319       print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel>'
320       sys.exit(2)
321   for opt, arg in opts:
322       if opt == '-h':
323          print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel>'
324          sys.exit()
325       elif opt in ("-c", "--conf"):
326          configfile = arg
327       elif opt in ("-p", "--port"):
328          client.set_serviceport(int(arg))
329       elif opt in ("-i", "--ipaddr"):
330          client.set_dominoserver_ipaddr(arg)
331       elif opt in ("--log"):
332          loglevel = arg
333
334   #Set logging level
335   numeric_level = getattr(logging, loglevel.upper(), None)
336   try:
337     if not isinstance(numeric_level, int):
338       raise ValueError('Invalid log level: %s' % loglevel)
339     logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
340   except ValueError, ex:
341     print ex.message
342     exit()
343  
344   #The client is starting
345   logging.debug('Domino Client Starting...')
346   client.start()
347   client.startCLI()
348   client.start_communicationService()
349
350 if __name__ == "__main__":
351    main(sys.argv[1:])
352