1 ##############################################################################
2 # Copyright (c) 2018 Nokia Corporation and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 from flask import Flask
10 from flask import request
13 from threading import Thread
18 from doctor_tests.app_manager.base import BaseAppManager
19 from doctor_tests.identity_auth import get_identity_auth
20 from doctor_tests.identity_auth import get_session
21 from doctor_tests.os_clients import neutron_client
22 from doctor_tests.os_clients import nova_client
23 from doctor_tests.os_clients import keystone_client
26 class VNFM(BaseAppManager):
28 def __init__(self, stack, conf, log):
29 super(VNFM, self).__init__(conf, log)
34 self.log.info('VNFM start......')
35 self.app = VNFManager(self.stack, self.conf, self, self.log)
39 self.log.info('VNFM stop......')
42 self.app.delete_constraints()
44 'Content-Type': 'application/json',
45 'Accept': 'application/json',
47 url = 'http://%s:%d/shutdown'\
48 % (self.conf.app_manager.ip,
49 self.conf.app_manager.port)
50 requests.post(url, data='', headers=headers)
53 class VNFManager(Thread):
55 def __init__(self, stack, conf, app_manager, log):
59 self.port = self.conf.app_manager.port
60 self.app_manager = app_manager
62 self.intance_ids = None
63 self.auth = get_identity_auth(project=self.conf.doctor_project)
64 self.session = get_session(auth=self.auth)
65 self.keystone = keystone_client(
66 self.conf.keystone_version, self.session)
67 self.nova = nova_client(self.conf.nova_version,
69 self.neutron = neutron_client(session=self.session)
71 'Content-Type': 'application/json',
72 'Accept': 'application/json'}
73 if self.conf.admin_tool.type == 'fenix':
74 self.headers['X-Auth-Token'] = self.session.get_token()
75 self.orig_number_of_instances = self.number_of_instances()
77 self.ha_instances = []
78 self.nonha_instances = []
79 # Different instance_id specific constraints {instanse_id: {},...}
80 self.instance_constraints = None
81 # Update existing instances to instance lists
82 self.update_instances()
83 nonha_instances = len(self.nonha_instances)
84 if nonha_instances < 7:
88 self.scale = int((nonha_instances) / 2)
89 self.max_impacted = self.scale - 1
90 self.log.info('Init nonha_instances: %s scale: %s: max_impacted %s' %
91 (nonha_instances, self.scale, self.max_impacted))
92 # Different instance groups constraints dict
94 self.nonha_group = None
95 # Floating IP used in HA instance
96 self.floating_ip = None
98 self.project_id = None
99 # HA instance_id that is active / has floating IP
100 self.active_instance_id = self.active_instance_id()
102 services = self.keystone.services.list()
103 for service in services:
104 if service.type == 'maintenance':
105 self.log.info('maintenance service: %s:%s type %s'
106 % (service.name, service.id, service.type))
107 maint_id = service.id
108 self.maint_endpoint = [ep.url for ep in self.keystone.endpoints.list()
109 if ep.service_id == maint_id and
110 ep.interface == 'public'][0]
111 self.log.info('maintenance endpoint: %s' % self.maint_endpoint)
112 self.update_constraints_lock = False
113 self.update_constraints()
115 def delete_remote_instance_constraints(self, instance_id):
116 url = "%s/instance/%s" % (self.maint_endpoint, instance_id)
117 self.log.info('DELETE: %s' % url)
118 ret = requests.delete(url, data=None, headers=self.headers)
119 if ret.status_code != 200 and ret.status_code != 204:
120 raise Exception(ret.text)
122 def update_remote_instance_constraints(self, instance):
123 url = "%s/instance/%s" % (self.maint_endpoint, instance["instance_id"])
124 self.log.info('PUT: %s' % url)
125 ret = requests.put(url, data=json.dumps(instance),
126 headers=self.headers)
127 if ret.status_code != 200 and ret.status_code != 204:
128 raise Exception(ret.text)
130 def delete_remote_group_constraints(self, instance_group):
131 url = "%s/instance_group/%s" % (self.maint_endpoint,
132 instance_group["group_id"])
133 self.log.info('DELETE: %s' % url)
134 ret = requests.delete(url, data=None, headers=self.headers)
135 if ret.status_code != 200 and ret.status_code != 204:
136 raise Exception(ret.text)
138 def update_remote_group_constraints(self, instance_group):
139 url = "%s/instance_group/%s" % (self.maint_endpoint,
140 instance_group["group_id"])
141 self.log.info('PUT: %s' % url)
142 ret = requests.put(url, data=json.dumps(instance_group),
143 headers=self.headers)
144 if ret.status_code != 200 and ret.status_code != 204:
145 raise Exception(ret.text)
147 def delete_constraints(self):
148 if self.conf.admin_tool.type == 'fenix':
149 self.headers['X-Auth-Token'] = self.session.get_token()
150 for instance_id in self.instance_constraints:
151 self.delete_remote_instance_constraints(instance_id)
152 self.delete_remote_group_constraints(self.nonha_group)
153 self.delete_remote_group_constraints(self.ha_group)
155 def update_constraints(self):
156 while self.update_constraints_lock:
157 self.log.info('Waiting update_constraints_lock...')
159 self.update_constraints_lock = True
160 self.log.info('Update constraints')
161 if self.project_id is None:
162 self.project_id = self.keystone.projects.list(
163 name=self.conf.doctor_project)[0].id
164 if self.nonha_group is None:
165 # Nova does not support groupping instances that do not belong to
166 # anti-affinity server_groups. Anyhow all instances need groupping
168 "group_id": str(uuid.uuid4()),
169 "project_id": self.project_id,
170 "group_name": "doctor_nonha_app_group",
171 "anti_affinity_group": False,
172 "max_instances_per_host": 0,
173 "max_impacted_members": self.max_impacted,
175 "resource_mitigation": True}
176 self.log.info('create doctor_nonha_app_group constraints: %s'
178 self.update_remote_group_constraints(self.nonha_group)
179 if self.ha_group is None:
180 group_id = [sg.id for sg in self.nova.server_groups.list()
181 if sg.name == "doctor_ha_app_group"][0]
183 "group_id": group_id,
184 "project_id": self.project_id,
185 "group_name": "doctor_ha_app_group",
186 "anti_affinity_group": True,
187 "max_instances_per_host": 1,
188 "max_impacted_members": 1,
190 "resource_mitigation": True}
191 self.log.info('create doctor_ha_app_group constraints: %s'
193 self.update_remote_group_constraints(self.ha_group)
194 instance_constraints = {}
195 for ha_instance in self.ha_instances:
197 "instance_id": ha_instance.id,
198 "project_id": self.project_id,
199 "group_id": self.ha_group["group_id"],
200 "instance_name": ha_instance.name,
201 "max_interruption_time": 120,
202 "migration_type": "MIGRATE",
203 "resource_mitigation": True,
205 self.log.info('create ha instance constraints: %s'
207 instance_constraints[ha_instance.id] = instance
208 for nonha_instance in self.nonha_instances:
210 "instance_id": nonha_instance.id,
211 "project_id": self.project_id,
212 "group_id": self.nonha_group["group_id"],
213 "instance_name": nonha_instance.name,
214 "max_interruption_time": 120,
215 "migration_type": "MIGRATE",
216 "resource_mitigation": True,
218 self.log.info('create nonha instance constraints: %s'
220 instance_constraints[nonha_instance.id] = instance
221 if not self.instance_constraints:
222 # Initial instance constraints
223 self.log.info('create initial instances constraints...')
224 for instance in [instance_constraints[i] for i
225 in instance_constraints]:
226 self.update_remote_instance_constraints(instance)
227 self.instance_constraints = instance_constraints.copy()
229 self.log.info('check instances constraints changes...')
230 added = [i for i in instance_constraints.keys()
231 if i not in self.instance_constraints]
232 deleted = [i for i in self.instance_constraints.keys()
233 if i not in instance_constraints]
234 modified = [i for i in instance_constraints.keys()
235 if (i not in added and i not in deleted and
236 instance_constraints[i] !=
237 self.instance_constraints[i])]
238 for instance_id in deleted:
239 self.delete_remote_instance_constraints(instance_id)
240 updated = added + modified
241 for instance in [instance_constraints[i] for i in updated]:
242 self.update_remote_instance_constraints(instance)
243 if updated or deleted:
244 # Some instance constraints have changed
245 self.instance_constraints = instance_constraints.copy()
246 self.update_constraints_lock = False
248 def active_instance_id(self):
249 # Need rertry as it takes time after heat template done before
250 # Floating IP in place
253 for instance in self.ha_instances:
254 network_interfaces = next(iter(instance.addresses.values()))
255 for network_interface in network_interfaces:
256 _type = network_interface.get('OS-EXT-IPS:type')
257 if _type == "floating":
258 if not self.floating_ip:
259 self.floating_ip = network_interface.get('addr')
260 self.log.debug('active_instance: %s %s' %
261 (instance.name, instance.id))
264 self.update_instances()
266 raise Exception("No active instance found")
268 def switch_over_ha_instance(self):
269 for instance in self.ha_instances:
270 if instance.id != self.active_instance_id:
271 self.log.info('Switch over to: %s %s' % (instance.name,
273 # Deprecated, need to use neutron instead
274 # instance.add_floating_ip(self.floating_ip)
275 port = self.neutron.list_ports(device_id=instance.id)['ports'][0]['id'] # noqa
276 floating_id = self.neutron.list_floatingips(floating_ip_address=self.floating_ip)['floatingips'][0]['id'] # noqa
277 self.neutron.update_floatingip(floating_id, {'floatingip': {'port_id': port}}) # noqa
278 # Have to update ha_instances as floating_ip changed
279 self.update_instances()
280 self.active_instance_id = instance.id
283 def get_instance_ids(self):
285 for instance in self.nova.servers.list(detailed=False):
286 ret.append(instance.id)
289 def update_instances(self):
290 instances = self.nova.servers.list(detailed=True)
291 self.ha_instances = [i for i in instances
292 if "doctor_ha_app_" in i.name]
293 self.nonha_instances = [i for i in instances
294 if "doctor_nonha_app_" in i.name]
296 def _alarm_data_decoder(self, data):
297 if "[" in data or "{" in data:
298 # string to list or dict removing unicode
299 data = yaml.load(data.replace("u'", "'"))
302 def _alarm_traits_decoder(self, data):
303 return ({str(t[0]): self._alarm_data_decoder(str(t[2]))
304 for t in data['reason_data']['event']['traits']})
306 def get_session_instance_ids(self, url, session_id):
307 ret = requests.get(url, data=None, headers=self.headers)
308 if ret.status_code != 200:
309 raise Exception(ret.text)
310 self.log.info('get_instance_ids %s' % ret.json())
311 return ret.json()['instance_ids']
313 def scale_instances(self, number_of_instances):
314 number_of_instances_before = self.number_of_instances()
316 parameters = self.stack.parameters
317 parameters['nonha_intances'] += number_of_instances
318 self.stack.update(self.stack.stack_name,
321 parameters=parameters,
322 files=self.stack.files)
324 number_of_instances_after = self.number_of_instances()
325 if (number_of_instances_before + number_of_instances !=
326 number_of_instances_after):
327 self.log.error('scale_instances with: %d from: %d ends up to: %d'
328 % (number_of_instances, number_of_instances_before,
329 number_of_instances_after))
330 raise Exception('scale_instances failed')
332 self.log.info('scaled instances from %d to %d' %
333 (number_of_instances_before,
334 number_of_instances_after))
336 def number_of_instances(self):
337 return len(self.nova.servers.list(detailed=False))
342 @app.route('/maintenance', methods=['POST'])
343 def maintenance_alarm():
344 data = json.loads(request.data.decode('utf8'))
346 payload = self._alarm_traits_decoder(data)
348 payload = ({t[0]: t[2] for t in
349 data['reason_data']['event']['traits']})
350 self.log.error('cannot parse alarm data: %s' % payload)
351 raise Exception('VNFM cannot parse alarm.'
352 'Possibly trait data over 256 char')
354 self.log.info('VNFM received data = %s' % payload)
356 state = payload['state']
360 self.log.info('VNFM state: %s' % state)
362 if state == 'MAINTENANCE':
363 instance_ids = (self.get_session_instance_ids(
364 payload['instance_ids'],
365 payload['session_id']))
366 my_instance_ids = self.get_instance_ids()
367 invalid_instances = (
368 [instance_id for instance_id in instance_ids
369 if instance_id not in my_instance_ids])
370 if invalid_instances:
371 self.log.error('Invalid instances: %s' % invalid_instances)
372 reply_state = 'NACK_MAINTENANCE'
374 reply_state = 'ACK_MAINTENANCE'
376 elif state == 'SCALE_IN':
377 # scale down "self.scale" instances that is VCPUS equaling
378 # at least a single compute node
379 self.scale_instances(-self.scale)
380 reply_state = 'ACK_SCALE_IN'
382 elif state == 'MAINTENANCE_COMPLETE':
383 # possibly need to upscale
384 number_of_instances = self.number_of_instances()
385 if self.orig_number_of_instances > number_of_instances:
386 scale_instances = (self.orig_number_of_instances -
388 self.scale_instances(scale_instances)
389 reply_state = 'ACK_MAINTENANCE_COMPLETE'
391 elif state == 'PREPARE_MAINTENANCE':
392 # TBD from contraints
393 if "MIGRATE" not in payload['allowed_actions']:
394 raise Exception('MIGRATE not supported')
395 instance_ids = payload['instance_ids'][0]
396 self.log.info('VNFM got instance: %s' % instance_ids)
397 if instance_ids == self.active_instance_id:
398 self.switch_over_ha_instance()
399 # optional also in contraints
400 reply['instance_action'] = "MIGRATE"
401 reply_state = 'ACK_PREPARE_MAINTENANCE'
403 elif state == 'PLANNED_MAINTENANCE':
404 # TBD from contraints
405 if "MIGRATE" not in payload['allowed_actions']:
406 raise Exception('MIGRATE not supported')
407 instance_ids = payload['instance_ids'][0]
408 self.log.info('VNFM got instance: %s' % instance_ids)
409 if instance_ids == self.active_instance_id:
410 self.switch_over_ha_instance()
411 # optional also in contraints
412 reply['instance_action'] = "MIGRATE"
413 reply_state = 'ACK_PLANNED_MAINTENANCE'
415 elif state == 'INSTANCE_ACTION_DONE':
416 # TBD was action done in allowed window
417 self.log.info('%s' % payload['instance_ids'])
419 raise Exception('VNFM received event with'
420 ' unknown state %s' % state)
423 if self.conf.admin_tool.type == 'fenix':
424 self.headers['X-Auth-Token'] = self.session.get_token()
425 reply['state'] = reply_state
426 url = payload['reply_url']
427 self.log.info('VNFM reply: %s' % reply)
428 requests.put(url, data=json.dumps(reply), headers=self.headers)
432 @app.route('/shutdown', methods=['POST'])
434 self.log.info('shutdown VNFM server at %s' % time.time())
435 func = request.environ.get('werkzeug.server.shutdown')
437 raise RuntimeError('Not running with the Werkzeug Server')
439 return 'VNFM shutting down...'
441 app.run(host="0.0.0.0", port=self.port)