VNFM against Fenix API schema validation fix
[doctor.git] / doctor_tests / app_manager / vnfm.py
1 ##############################################################################
2 # Copyright (c) 2018 Nokia Corporation and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 from flask import Flask
10 from flask import request
11 import json
12 import requests
13 from threading import Thread
14 import time
15 import uuid
16 import yaml
17
18 from doctor_tests.app_manager.base import BaseAppManager
19 from doctor_tests.identity_auth import get_identity_auth
20 from doctor_tests.identity_auth import get_session
21 from doctor_tests.os_clients import neutron_client
22 from doctor_tests.os_clients import nova_client
23 from doctor_tests.os_clients import keystone_client
24
25
26 class VNFM(BaseAppManager):
27
28     def __init__(self, stack, conf, log):
29         super(VNFM, self).__init__(conf, log)
30         self.stack = stack
31         self.app = None
32
33     def start(self):
34         self.log.info('VNFM start......')
35         self.app = VNFManager(self.stack, self.conf, self, self.log)
36         self.app.start()
37
38     def stop(self):
39         self.log.info('VNFM stop......')
40         if not self.app:
41             return
42         self.app.delete_constraints()
43         headers = {
44             'Content-Type': 'application/json',
45             'Accept': 'application/json',
46         }
47         url = 'http://%s:%d/shutdown'\
48               % (self.conf.app_manager.ip,
49                  self.conf.app_manager.port)
50         requests.post(url, data='', headers=headers)
51
52
53 class VNFManager(Thread):
54
55     def __init__(self, stack, conf, app_manager, log):
56         Thread.__init__(self)
57         self.stack = stack
58         self.conf = conf
59         self.port = self.conf.app_manager.port
60         self.app_manager = app_manager
61         self.log = log
62         self.intance_ids = None
63         self.auth = get_identity_auth(project=self.conf.doctor_project)
64         self.session = get_session(auth=self.auth)
65         self.keystone = keystone_client(
66             self.conf.keystone_version, self.session)
67         self.nova = nova_client(self.conf.nova_version,
68                                 self.session)
69         self.neutron = neutron_client(session=self.session)
70         self.headers = {
71             'Content-Type': 'application/json',
72             'Accept': 'application/json'}
73         if self.conf.admin_tool.type == 'fenix':
74             self.headers['X-Auth-Token'] = self.session.get_token()
75         self.orig_number_of_instances = self.number_of_instances()
76         # List of instances
77         self.ha_instances = []
78         self.nonha_instances = []
79         # Different instance_id specific constraints {instanse_id: {},...}
80         self.instance_constraints = None
81         # Update existing instances to instance lists
82         self.update_instances()
83         nonha_instances = len(self.nonha_instances)
84         if nonha_instances < 7:
85             self.scale = 2
86             self.max_impacted = 2
87         else:
88             self.scale = int((nonha_instances) / 2)
89             self.max_impacted = self.scale - 1
90         self.log.info('Init nonha_instances: %s scale: %s: max_impacted %s' %
91                       (nonha_instances, self.scale, self.max_impacted))
92         # Different instance groups constraints dict
93         self.ha_group = None
94         self.nonha_group = None
95         # Floating IP used in HA instance
96         self.floating_ip = None
97         # VNF project_id
98         self.project_id = None
99         # HA instance_id that is active / has floating IP
100         self.active_instance_id = self.active_instance_id()
101
102         services = self.keystone.services.list()
103         for service in services:
104             if service.type == 'maintenance':
105                 self.log.info('maintenance service: %s:%s type %s'
106                               % (service.name, service.id, service.type))
107                 maint_id = service.id
108         self.maint_endpoint = [ep.url for ep in self.keystone.endpoints.list()
109                                if ep.service_id == maint_id and
110                                ep.interface == 'public'][0]
111         self.log.info('maintenance endpoint: %s' % self.maint_endpoint)
112         self.update_constraints_lock = False
113         self.update_constraints()
114
115     def delete_remote_instance_constraints(self, instance_id):
116         url = "%s/instance/%s" % (self.maint_endpoint, instance_id)
117         self.log.info('DELETE: %s' % url)
118         ret = requests.delete(url, data=None, headers=self.headers)
119         if ret.status_code != 200 and ret.status_code != 204:
120             raise Exception(ret.text)
121
122     def update_remote_instance_constraints(self, instance):
123         url = "%s/instance/%s" % (self.maint_endpoint, instance["instance_id"])
124         self.log.info('PUT: %s' % url)
125         ret = requests.put(url, data=json.dumps(instance),
126                            headers=self.headers)
127         if ret.status_code != 200 and ret.status_code != 204:
128             raise Exception(ret.text)
129
130     def delete_remote_group_constraints(self, instance_group):
131         url = "%s/instance_group/%s" % (self.maint_endpoint,
132                                         instance_group["group_id"])
133         self.log.info('DELETE: %s' % url)
134         ret = requests.delete(url, data=None, headers=self.headers)
135         if ret.status_code != 200 and ret.status_code != 204:
136             raise Exception(ret.text)
137
138     def update_remote_group_constraints(self, instance_group):
139         url = "%s/instance_group/%s" % (self.maint_endpoint,
140                                         instance_group["group_id"])
141         self.log.info('PUT: %s' % url)
142         ret = requests.put(url, data=json.dumps(instance_group),
143                            headers=self.headers)
144         if ret.status_code != 200 and ret.status_code != 204:
145             raise Exception(ret.text)
146
147     def delete_constraints(self):
148         if self.conf.admin_tool.type == 'fenix':
149             self.headers['X-Auth-Token'] = self.session.get_token()
150         for instance_id in self.instance_constraints:
151             self.delete_remote_instance_constraints(instance_id)
152         self.delete_remote_group_constraints(self.nonha_group)
153         self.delete_remote_group_constraints(self.ha_group)
154
155     def update_constraints(self):
156         while self.update_constraints_lock:
157             self.log.info('Waiting update_constraints_lock...')
158             time.sleep(1)
159         self.update_constraints_lock = True
160         self.log.info('Update constraints')
161         if self.project_id is None:
162             self.project_id = self.keystone.projects.list(
163                 name=self.conf.doctor_project)[0].id
164         if self.nonha_group is None:
165             # Nova does not support groupping instances that do not belong to
166             # anti-affinity server_groups. Anyhow all instances need groupping
167             self.nonha_group = {
168                 "group_id": str(uuid.uuid4()),
169                 "project_id": self.project_id,
170                 "group_name": "doctor_nonha_app_group",
171                 "anti_affinity_group": False,
172                 "max_instances_per_host": 0,
173                 "max_impacted_members": self.max_impacted,
174                 "recovery_time": 2,
175                 "resource_mitigation": True}
176             self.log.info('create doctor_nonha_app_group constraints: %s'
177                           % self.nonha_group)
178             self.update_remote_group_constraints(self.nonha_group)
179         if self.ha_group is None:
180             group_id = [sg.id for sg in self.nova.server_groups.list()
181                         if sg.name == "doctor_ha_app_group"][0]
182             self.ha_group = {
183                 "group_id": group_id,
184                 "project_id": self.project_id,
185                 "group_name": "doctor_ha_app_group",
186                 "anti_affinity_group": True,
187                 "max_instances_per_host": 1,
188                 "max_impacted_members": 1,
189                 "recovery_time": 4,
190                 "resource_mitigation": True}
191             self.log.info('create doctor_ha_app_group constraints: %s'
192                           % self.ha_group)
193             self.update_remote_group_constraints(self.ha_group)
194         instance_constraints = {}
195         for ha_instance in self.ha_instances:
196             instance = {
197                 "instance_id": ha_instance.id,
198                 "project_id": self.project_id,
199                 "group_id": self.ha_group["group_id"],
200                 "instance_name": ha_instance.name,
201                 "max_interruption_time": 120,
202                 "migration_type": "MIGRATE",
203                 "resource_mitigation": True,
204                 "lead_time": 40}
205             self.log.info('create ha instance constraints: %s'
206                           % instance)
207             instance_constraints[ha_instance.id] = instance
208         for nonha_instance in self.nonha_instances:
209             instance = {
210                 "instance_id": nonha_instance.id,
211                 "project_id": self.project_id,
212                 "group_id": self.nonha_group["group_id"],
213                 "instance_name": nonha_instance.name,
214                 "max_interruption_time": 120,
215                 "migration_type": "MIGRATE",
216                 "resource_mitigation": True,
217                 "lead_time": 40}
218             self.log.info('create nonha instance constraints: %s'
219                           % instance)
220             instance_constraints[nonha_instance.id] = instance
221         if not self.instance_constraints:
222             # Initial instance constraints
223             self.log.info('create initial instances constraints...')
224             for instance in [instance_constraints[i] for i
225                              in instance_constraints]:
226                 self.update_remote_instance_constraints(instance)
227             self.instance_constraints = instance_constraints.copy()
228         else:
229             self.log.info('check instances constraints changes...')
230             added = [i for i in instance_constraints.keys()
231                      if i not in self.instance_constraints]
232             deleted = [i for i in self.instance_constraints.keys()
233                        if i not in instance_constraints]
234             modified = [i for i in instance_constraints.keys()
235                         if (i not in added and i not in deleted and
236                             instance_constraints[i] !=
237                             self.instance_constraints[i])]
238             for instance_id in deleted:
239                 self.delete_remote_instance_constraints(instance_id)
240             updated = added + modified
241             for instance in [instance_constraints[i] for i in updated]:
242                 self.update_remote_instance_constraints(instance)
243             if updated or deleted:
244                 # Some instance constraints have changed
245                 self.instance_constraints = instance_constraints.copy()
246         self.update_constraints_lock = False
247
248     def active_instance_id(self):
249         # Need rertry as it takes time after heat template done before
250         # Floating IP in place
251         retry = 5
252         while retry > 0:
253             for instance in self.ha_instances:
254                 network_interfaces = next(iter(instance.addresses.values()))
255                 for network_interface in network_interfaces:
256                     _type = network_interface.get('OS-EXT-IPS:type')
257                     if _type == "floating":
258                         if not self.floating_ip:
259                             self.floating_ip = network_interface.get('addr')
260                         self.log.debug('active_instance: %s %s' %
261                                        (instance.name, instance.id))
262                         return instance.id
263             time.sleep(2)
264             self.update_instances()
265             retry -= 1
266         raise Exception("No active instance found")
267
268     def switch_over_ha_instance(self):
269         for instance in self.ha_instances:
270             if instance.id != self.active_instance_id:
271                 self.log.info('Switch over to: %s %s' % (instance.name,
272                                                          instance.id))
273                 # Deprecated, need to use neutron instead
274                 # instance.add_floating_ip(self.floating_ip)
275                 port = self.neutron.list_ports(device_id=instance.id)['ports'][0]['id']  # noqa
276                 floating_id = self.neutron.list_floatingips(floating_ip_address=self.floating_ip)['floatingips'][0]['id']  # noqa
277                 self.neutron.update_floatingip(floating_id, {'floatingip': {'port_id': port}})  # noqa
278                 # Have to update ha_instances as floating_ip changed
279                 self.update_instances()
280                 self.active_instance_id = instance.id
281                 break
282
283     def get_instance_ids(self):
284         ret = list()
285         for instance in self.nova.servers.list(detailed=False):
286             ret.append(instance.id)
287         return ret
288
289     def update_instances(self):
290         instances = self.nova.servers.list(detailed=True)
291         self.ha_instances = [i for i in instances
292                              if "doctor_ha_app_" in i.name]
293         self.nonha_instances = [i for i in instances
294                                 if "doctor_nonha_app_" in i.name]
295
296     def _alarm_data_decoder(self, data):
297         if "[" in data or "{" in data:
298             # string to list or dict removing unicode
299             data = yaml.load(data.replace("u'", "'"))
300         return data
301
302     def _alarm_traits_decoder(self, data):
303         return ({str(t[0]): self._alarm_data_decoder(str(t[2]))
304                 for t in data['reason_data']['event']['traits']})
305
306     def get_session_instance_ids(self, url, session_id):
307         ret = requests.get(url, data=None, headers=self.headers)
308         if ret.status_code != 200:
309             raise Exception(ret.text)
310         self.log.info('get_instance_ids %s' % ret.json())
311         return ret.json()['instance_ids']
312
313     def scale_instances(self, number_of_instances):
314         number_of_instances_before = self.number_of_instances()
315
316         parameters = self.stack.parameters
317         parameters['nonha_intances'] += number_of_instances
318         self.stack.update(self.stack.stack_name,
319                           self.stack.stack_id,
320                           self.stack.template,
321                           parameters=parameters,
322                           files=self.stack.files)
323
324         number_of_instances_after = self.number_of_instances()
325         if (number_of_instances_before + number_of_instances !=
326            number_of_instances_after):
327             self.log.error('scale_instances with: %d from: %d ends up to: %d'
328                            % (number_of_instances, number_of_instances_before,
329                               number_of_instances_after))
330             raise Exception('scale_instances failed')
331
332         self.log.info('scaled instances from %d to %d' %
333                       (number_of_instances_before,
334                        number_of_instances_after))
335
336     def number_of_instances(self):
337         return len(self.nova.servers.list(detailed=False))
338
339     def run(self):
340         app = Flask('VNFM')
341
342         @app.route('/maintenance', methods=['POST'])
343         def maintenance_alarm():
344             data = json.loads(request.data.decode('utf8'))
345             try:
346                 payload = self._alarm_traits_decoder(data)
347             except Exception:
348                 payload = ({t[0]: t[2] for t in
349                            data['reason_data']['event']['traits']})
350                 self.log.error('cannot parse alarm data: %s' % payload)
351                 raise Exception('VNFM cannot parse alarm.'
352                                 'Possibly trait data over 256 char')
353
354             self.log.info('VNFM received data = %s' % payload)
355
356             state = payload['state']
357             reply_state = None
358             reply = dict()
359
360             self.log.info('VNFM state: %s' % state)
361
362             if state == 'MAINTENANCE':
363                 instance_ids = (self.get_session_instance_ids(
364                                 payload['instance_ids'],
365                                 payload['session_id']))
366                 my_instance_ids = self.get_instance_ids()
367                 invalid_instances = (
368                     [instance_id for instance_id in instance_ids
369                      if instance_id not in my_instance_ids])
370                 if invalid_instances:
371                     self.log.error('Invalid instances: %s' % invalid_instances)
372                     reply_state = 'NACK_MAINTENANCE'
373                 else:
374                     reply_state = 'ACK_MAINTENANCE'
375
376             elif state == 'SCALE_IN':
377                 # scale down "self.scale" instances that is VCPUS equaling
378                 # at least a single compute node
379                 self.scale_instances(-self.scale)
380                 reply_state = 'ACK_SCALE_IN'
381
382             elif state == 'MAINTENANCE_COMPLETE':
383                 # possibly need to upscale
384                 number_of_instances = self.number_of_instances()
385                 if self.orig_number_of_instances > number_of_instances:
386                     scale_instances = (self.orig_number_of_instances -
387                                        number_of_instances)
388                     self.scale_instances(scale_instances)
389                 reply_state = 'ACK_MAINTENANCE_COMPLETE'
390
391             elif state == 'PREPARE_MAINTENANCE':
392                 # TBD from contraints
393                 if "MIGRATE" not in payload['allowed_actions']:
394                     raise Exception('MIGRATE not supported')
395                 instance_ids = payload['instance_ids'][0]
396                 self.log.info('VNFM got instance: %s' % instance_ids)
397                 if instance_ids == self.active_instance_id:
398                     self.switch_over_ha_instance()
399                 # optional also in contraints
400                 reply['instance_action'] = "MIGRATE"
401                 reply_state = 'ACK_PREPARE_MAINTENANCE'
402
403             elif state == 'PLANNED_MAINTENANCE':
404                 # TBD from contraints
405                 if "MIGRATE" not in payload['allowed_actions']:
406                     raise Exception('MIGRATE not supported')
407                 instance_ids = payload['instance_ids'][0]
408                 self.log.info('VNFM got instance: %s' % instance_ids)
409                 if instance_ids == self.active_instance_id:
410                     self.switch_over_ha_instance()
411                 # optional also in contraints
412                 reply['instance_action'] = "MIGRATE"
413                 reply_state = 'ACK_PLANNED_MAINTENANCE'
414
415             elif state == 'INSTANCE_ACTION_DONE':
416                 # TBD was action done in allowed window
417                 self.log.info('%s' % payload['instance_ids'])
418             else:
419                 raise Exception('VNFM received event with'
420                                 ' unknown state %s' % state)
421
422             if reply_state:
423                 if self.conf.admin_tool.type == 'fenix':
424                     self.headers['X-Auth-Token'] = self.session.get_token()
425                 reply['state'] = reply_state
426                 url = payload['reply_url']
427                 self.log.info('VNFM reply: %s' % reply)
428                 requests.put(url, data=json.dumps(reply), headers=self.headers)
429
430             return 'OK'
431
432         @app.route('/shutdown', methods=['POST'])
433         def shutdown():
434             self.log.info('shutdown VNFM server at %s' % time.time())
435             func = request.environ.get('werkzeug.server.shutdown')
436             if func is None:
437                 raise RuntimeError('Not running with the Werkzeug Server')
438             func()
439             return 'VNFM shutting down...'
440
441         app.run(host="0.0.0.0", port=self.port)