3 #Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 import sys, os, glob, random, errno
17 #sys.path.append('gen-py')
18 #sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
19 sys.path.insert(0, glob.glob('./lib')[0])
22 from dominoRPC import Communication
23 from dominoRPC.ttypes import *
24 from dominoRPC.constants import *
26 from thrift import Thrift
27 from thrift.transport import TSocket
28 from thrift.transport import TTransport
29 from thrift.protocol import TBinaryProtocol
30 from thrift.server import TServer
32 from toscaparser.tosca_template import ToscaTemplate
33 #from toscaparser.utils.gettextutils import _
34 #import toscaparser.utils.urlutils
37 from partitioner import *
38 from util import miscutil
40 #Load configuration parameters
41 from domino_conf import *
44 class CommunicationHandler:
48 def __init__(self, dominoserver):
50 self.dominoServer = dominoserver
53 def openconnection(self, ipaddr, tcpport):
56 transport = TSocket.TSocket(ipaddr, tcpport)
57 transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
58 # Add buffering to compensate for slow raw sockets
59 self.transport = TTransport.TBufferedTransport(transport)
61 self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
62 # Create a client to use the protocol encoder
63 self.sender = Communication.Client(self.protocol)
65 except Thrift.TException, tx:
66 logging.error('%s' , tx.message)
70 def closeconnection(self):
71 self.transport.close()
73 def push_template(self,template,ipaddr,tcpport):
74 self.openconnection(ipaddr,tcpport)
76 pushm.domino_udid = SERVER_UDID
77 pushm.seq_no = self.seqno
78 pushm.template_type = 'tosca-nfv-v1.0'
79 pushm.template = template
81 push_r = self.sender.d_push(pushm)
82 logging.info('Push Response received from %d' , push_r.domino_udid)
83 except (Thrift.TException, TSocket.TTransportException) as tx:
84 logging.error('%s' , tx.message)
85 except (socket.timeout) as tx:
86 self.dominoServer.handle_RPC_timeout(pushm)
88 logging.error('Unexpected error: %s', sys.exc_info()[0])
90 self.seqno = self.seqno + 1
92 self.closeconnection()
94 #Heartbeat from Domino Client is received
96 # - Respond Back with a heartbeat
98 def d_heartbeat(self, hb_msg):
100 logging.info('heartbeat received from %d' , hb_msg.domino_udid)
102 hb_r = HeartBeatMessage()
103 hb_r.domino_udid = SERVER_UDID
104 hb_r.seq_no = self.seqno
106 self.seqno = self.seqno + 1
110 #Registration from Domino Client is received
113 # - Respond Back with Registration Response
114 def d_register(self, reg_msg):
117 #Prepare and send Registration Response
118 reg_r = RegisterResponseMessage()
119 logging.info('Registration Request received for UDID %d from IP: %s port: %d', reg_msg.domino_udid_desired, reg_msg.ipaddr, reg_msg.tcpport)
122 reg_r.domino_udid_assigned = self.dominoServer.assign_udid(reg_msg.domino_udid_desired)
123 reg_r.seq_no = self.seqno
124 reg_r.domino_udid = SERVER_UDID
125 #return unconditional success
127 #Define conditions for unsuccessful registration (e.g., unsupported mapping)
128 reg_r.responseCode = SUCCESS
129 #no need to send comments
131 #Logic for a new UDID assignment
133 self.seqno = self.seqno + 1
135 # Store the Domino Client info
136 # TBD: check the sequence number to ensure the most recent record is saved
137 self.dominoServer.registration_record[reg_r.domino_udid_assigned] = reg_msg
139 data[reg_r.domino_udid_assigned] = [reg_msg.ipaddr, reg_msg.tcpport, reg_msg.supported_templates, reg_msg.seq_no]
140 with open(SERVER_DBFILE, 'a') as f:
147 #Subscription from Domino Client is received
149 # - Save the templates & labels
150 # - Respond Back with Subscription Response
151 def d_subscribe(self, sub_msg):
152 global SERVER_UDID, SERVER_SEQNO
153 logging.info('Subscribe Request received from %d' , sub_msg.domino_udid)
155 if sub_msg.template_op == APPEND:
156 if self.dominoServer.subscribed_templateformats.has_key(sub_msg.domino_udid):
157 self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].update(set(sub_msg.supported_template_types))
159 self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
160 elif sub_msg.template_op == OVERWRITE:
161 self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
162 elif sub_msg.template_op == DELETE:
163 self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].difference_update(set(sub_msg.supported_template_types))
165 if sub_msg.labels != []:
166 if sub_msg.label_op == APPEND:
167 if self.dominoServer.subscribed_labels.has_key(sub_msg.domino_udid):
168 self.dominoServer.subscribed_labels[sub_msg.domino_udid].update(set(sub_msg.labels))
170 self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
171 elif sub_msg.label_op == OVERWRITE:
172 self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
173 elif sub_msg.label_op == DELETE:
174 self.dominoServer.subscribed_labels[sub_msg.domino_udid].difference_update(set(sub_msg.labels))
176 logging.debug('Supported Template: %s Supported Labels: %s' , self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] , self.dominoServer.subscribed_labels[sub_msg.domino_udid])
179 sub_r = SubscribeResponseMessage()
180 sub_r.domino_udid = SERVER_UDID
181 sub_r.seq_no = self.seqno
182 sub_r.responseCode = SUCCESS
183 self.seqno = self.seqno + 1
187 #Template Publication from Domino Client is received
189 # - Parse the template, perform mapping, partition the template
190 # - Launch Push service
191 # - Respond Back with Publication Response
192 def d_publish(self, pub_msg):
193 global SERVER_UDID, SERVER_SEQNO, TOSCADIR, TOSCA_DEFAULT_FNAME
194 logging.info('Publish Request received from %d' , pub_msg.domino_udid)
195 logging.debug(pub_msg.template)
199 os.makedirs(TOSCADIR)
200 except OSError as exception:
201 if exception.errno == errno.EEXIST:
202 logging.debug('ERRNO %d; %s exists. Creating: %s', exception.errno, TOSCADIR, TOSCADIR+TOSCA_DEFAULT_FNAME)
204 logging.error('Error occurred in creating %s. Err no: %d', exception.errno)
206 #Risking a race condition if another process is attempting to write to same file
207 f = open(TOSCADIR+TOSCA_DEFAULT_FNAME, 'w')
208 for item in pub_msg.template:
212 # Load tosca object from file into memory
213 tosca = ToscaTemplate( TOSCADIR+TOSCA_DEFAULT_FNAME )
216 node_labels = label.extract_labels( tosca )
217 logging.debug('Node Labels: %s', node_labels)
219 # Map nodes in the template to resource domains
220 site_map = label.map_nodes( self.dominoServer.subscribed_labels , node_labels )
221 logging.debug('Site Maps: %s' , site_map)
223 # Select a site for each VNF
224 node_site = label.select_site( site_map )
225 logging.debug('Selected Sites: %s', node_site)
227 # Create per-domain Tosca files
228 file_paths = partitioner.partition_tosca('./toscafiles/template1.yaml',node_site,tosca.tpl)
230 # Create list of translated template files
234 # Send domain templates to each domain agent/client
235 # FOR NOW: send untranslated but partitioned tosca files to scheduled sites
236 # TBD: read from work-flow
237 for site in file_paths:
238 domino_client_ip = self.dominoServer.registration_record[site].ipaddr
239 domino_client_port = self.dominoServer.registration_record[site].tcpport
240 self.push_template(miscutil.read_templatefile(file_paths[site]), domino_client_ip, domino_client_port)
243 pub_r = PublishResponseMessage()
244 pub_r.domino_udid = SERVER_UDID
245 pub_r.seq_no = self.seqno
246 pub_r.responseCode = SUCCESS
247 self.seqno = self.seqno + 1
250 #Query from Domino Client is received
253 # - Respond Back with Query Response
254 def d_query(self, qu_msg):
256 qu_r = QueryResponseMessage()
263 self.assignedUUIDs = list()
264 self.subscribed_labels = dict()
265 self.subscribed_templateformats = dict()
266 self.registration_record = dict()
267 self.communicationHandler = CommunicationHandler(self)
268 self.processor = Communication.Processor(self.communicationHandler)
269 self.transport = TSocket.TServerSocket(port=DOMINO_SERVER_PORT)
270 self.tfactory = TTransport.TBufferedTransportFactory()
271 self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
272 #Use TThreadedServer or TThreadPoolServer for a multithreaded server
273 #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
274 self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
277 def start_communicationService(self):
278 self.communicationServer.serve()
280 #For now assign the desired UDID
282 #Check if ID is already assigned and in use
283 #If not assigned, assign it
284 #If assigned, offer a new random id
285 def assign_udid(self, udid_desired):
286 if udid_desired in self.assignedUUIDs:
287 new_udid = random.getrandbits(63)
288 while new_udid in self.assignedUUIDs:
289 new_udid = random.getrandbits(63)
291 self.assignedUUIDs.append(new_udid)
294 self.assignedUUIDs.append(udid_desired)
297 def handle_RPC_timeout(self, RPCmessage):
298 if RPCmessage.messageType == PUSH:
299 logging.debug('RPC Timeout for message type: PUSH')
300 # TBD: handle each RPC timeout separately
303 server = DominoServer()
305 #process input arguments
307 opts, args = getopt.getopt(argv,"hc:l:",["conf=","log="])
308 except getopt.GetoptError:
309 print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>'
311 for opt, arg in opts:
313 print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel>'
315 elif opt in ("-c", "--conf"):
317 elif opt in ("--log"):
320 numeric_level = getattr(logging, loglevel.upper(), None)
322 if not isinstance(numeric_level, int):
323 raise ValueError('Invalid log level: %s' % loglevel)
324 logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
325 except ValueError, ex:
329 logging.debug('Domino Server Starting...')
330 server.start_communicationService()
333 if __name__ == "__main__":