7a3c9adde13ad2b5acf3ab2a1377eaeb914236f0
[doctor.git] / doctor_tests / app_manager / vnfm.py
1 ##############################################################################
2 # Copyright (c) 2018 Nokia Corporation and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 from flask import Flask
10 from flask import request
11 import json
12 import requests
13 from threading import Thread
14 import time
15 import uuid
16 import yaml
17
18 from doctor_tests.app_manager.base import BaseAppManager
19 from doctor_tests.identity_auth import get_identity_auth
20 from doctor_tests.identity_auth import get_session
21 from doctor_tests.os_clients import neutron_client
22 from doctor_tests.os_clients import nova_client
23 from doctor_tests.os_clients import keystone_client
24
25
26 class VNFM(BaseAppManager):
27
28     def __init__(self, stack, conf, log):
29         super(VNFM, self).__init__(conf, log)
30         self.stack = stack
31         self.app = None
32
33     def start(self):
34         self.log.info('VNFM start......')
35         self.app = VNFManager(self.stack, self.conf, self, self.log)
36         self.app.start()
37
38     def stop(self):
39         self.log.info('VNFM stop......')
40         if not self.app:
41             return
42         self.app.delete_constraints()
43         headers = {
44             'Content-Type': 'application/json',
45             'Accept': 'application/json',
46         }
47         url = 'http://%s:%d/shutdown'\
48               % (self.conf.app_manager.ip,
49                  self.conf.app_manager.port)
50         requests.post(url, data='', headers=headers)
51
52
53 class VNFManager(Thread):
54
55     def __init__(self, stack, conf, app_manager, log):
56         Thread.__init__(self)
57         self.stack = stack
58         self.conf = conf
59         self.port = self.conf.app_manager.port
60         self.app_manager = app_manager
61         self.log = log
62         self.intance_ids = None
63         self.auth = get_identity_auth(project=self.conf.doctor_project)
64         self.session = get_session(auth=self.auth)
65         self.keystone = keystone_client(
66             self.conf.keystone_version, self.session)
67         self.nova = nova_client(self.conf.nova_version,
68                                 self.session)
69         self.neutron = neutron_client(session=self.session)
70         self.headers = {
71             'Content-Type': 'application/json',
72             'Accept': 'application/json'}
73         if self.conf.admin_tool.type == 'fenix':
74             self.headers['X-Auth-Token'] = self.session.get_token()
75         self.orig_number_of_instances = self.number_of_instances()
76         # List of instances
77         self.ha_instances = []
78         self.nonha_instances = []
79         # Different instance_id specific constraints {instanse_id: {},...}
80         self.instance_constraints = None
81         # Update existing instances to instance lists
82         self.update_instances()
83         nonha_instances = len(self.nonha_instances)
84         if nonha_instances < 7:
85             self.scale = 2
86             self.max_impacted = 2
87         else:
88             self.scale = int((nonha_instances) / 2)
89             self.max_impacted = self.scale - 1
90         self.log.info('Init nonha_instances: %s scale: %s: max_impacted %s' %
91                       (nonha_instances, self.scale, self.max_impacted))
92         # Different instance groups constraints dict
93         self.ha_group = None
94         self.nonha_group = None
95         # Floating IP used in HA instance
96         self.floating_ip = None
97         # VNF project_id
98         self.project_id = None
99         # HA instance_id that is active / has floating IP
100         self.active_instance_id = self.active_instance_id()
101
102         services = self.keystone.services.list()
103         for service in services:
104             if service.type == 'maintenance':
105                 self.log.info('maintenance service: %s:%s type %s'
106                               % (service.name, service.id, service.type))
107                 maint_id = service.id
108         self.maint_endpoint = [ep.url for ep in self.keystone.endpoints.list()
109                                if ep.service_id == maint_id and
110                                ep.interface == 'public'][0]
111         self.log.info('maintenance endpoint: %s' % self.maint_endpoint)
112
113         self.update_constraints()
114
115     def delete_remote_instance_constraints(self, instance_id):
116         url = "%s/instance/%s" % (self.maint_endpoint, instance_id)
117         self.log.info('DELETE: %s' % url)
118         ret = requests.delete(url, data=None, headers=self.headers)
119         if ret.status_code != 200 and ret.status_code != 204:
120             raise Exception(ret.text)
121
122     def update_remote_instance_constraints(self, instance):
123         url = "%s/instance/%s" % (self.maint_endpoint, instance["instance_id"])
124         self.log.info('PUT: %s' % url)
125         ret = requests.put(url, data=json.dumps(instance),
126                            headers=self.headers)
127         if ret.status_code != 200 and ret.status_code != 204:
128             raise Exception(ret.text)
129
130     def delete_remote_group_constraints(self, instance_group):
131         url = "%s/instance_group/%s" % (self.maint_endpoint,
132                                         instance_group["group_id"])
133         self.log.info('DELETE: %s' % url)
134         ret = requests.delete(url, data=None, headers=self.headers)
135         if ret.status_code != 200 and ret.status_code != 204:
136             raise Exception(ret.text)
137
138     def update_remote_group_constraints(self, instance_group):
139         url = "%s/instance_group/%s" % (self.maint_endpoint,
140                                         instance_group["group_id"])
141         self.log.info('PUT: %s' % url)
142         ret = requests.put(url, data=json.dumps(instance_group),
143                            headers=self.headers)
144         if ret.status_code != 200 and ret.status_code != 204:
145             raise Exception(ret.text)
146
147     def delete_constraints(self):
148         if self.conf.admin_tool.type == 'fenix':
149             self.headers['X-Auth-Token'] = self.session.get_token()
150         for instance_id in self.instance_constraints:
151             self.delete_remote_instance_constraints(instance_id)
152         self.delete_remote_group_constraints(self.nonha_group)
153         self.delete_remote_group_constraints(self.ha_group)
154
155     def update_constraints(self):
156         self.log.info('Update constraints')
157         if self.project_id is None:
158             self.project_id = self.keystone.projects.list(
159                 name=self.conf.doctor_project)[0].id
160         if self.nonha_group is None:
161             # Nova does not support groupping instances that do not belong to
162             # anti-affinity server_groups. Anyhow all instances need groupping
163             self.nonha_group = {
164                 "group_id": str(uuid.uuid4()),
165                 "project_id": self.project_id,
166                 "group_name": "doctor_nonha_app_group",
167                 "anti_affinity_group": False,
168                 "max_instances_per_host": 0,
169                 "max_impacted_members": self.max_impacted,
170                 "recovery_time": 2,
171                 "resource_mitigation": True}
172             self.log.info('create doctor_nonha_app_group constraints: %s'
173                           % self.nonha_group)
174             self.update_remote_group_constraints(self.nonha_group)
175         if self.ha_group is None:
176             group_id = [sg.id for sg in self.nova.server_groups.list()
177                         if sg.name == "doctor_ha_app_group"][0]
178             self.ha_group = {
179                 "group_id": group_id,
180                 "project_id": self.project_id,
181                 "group_name": "doctor_ha_app_group",
182                 "anti_affinity_group": True,
183                 "max_instances_per_host": 1,
184                 "max_impacted_members": 1,
185                 "recovery_time": 4,
186                 "resource_mitigation": True}
187             self.log.info('create doctor_ha_app_group constraints: %s'
188                           % self.nonha_group)
189             self.update_remote_group_constraints(self.ha_group)
190         instance_constraints = {}
191         for ha_instance in self.ha_instances:
192             instance = {
193                 "instance_id": ha_instance.id,
194                 "project_id": self.project_id,
195                 "group_id": self.ha_group["group_id"],
196                 "instance_name": ha_instance.name,
197                 "max_interruption_time": 120,
198                 "migration_type": "MIGRATION",
199                 "resource_mitigation": True,
200                 "lead_time": 40}
201             self.log.info('create ha instance constraints: %s'
202                           % instance)
203             instance_constraints[ha_instance.id] = instance
204         for nonha_instance in self.nonha_instances:
205             instance = {
206                 "instance_id": nonha_instance.id,
207                 "project_id": self.project_id,
208                 "group_id": self.nonha_group["group_id"],
209                 "instance_name": nonha_instance.name,
210                 "max_interruption_time": 120,
211                 "migration_type": "MIGRATION",
212                 "resource_mitigation": True,
213                 "lead_time": 40}
214             self.log.info('create nonha instance constraints: %s'
215                           % instance)
216             instance_constraints[nonha_instance.id] = instance
217         if not self.instance_constraints:
218             # Initial instance constraints
219             self.log.info('create initial instances constraints...')
220             for instance in [instance_constraints[i] for i
221                              in instance_constraints]:
222                 self.update_remote_instance_constraints(instance)
223             self.instance_constraints = instance_constraints.copy()
224         else:
225             self.log.info('check instances constraints changes...')
226             added = [i for i in instance_constraints.keys()
227                      if i not in self.instance_constraints]
228             deleted = [i for i in self.instance_constraints.keys()
229                        if i not in instance_constraints]
230             modified = [i for i in instance_constraints.keys()
231                         if (i not in added and i not in deleted and
232                             instance_constraints[i] !=
233                             self.instance_constraints[i])]
234             for instance_id in deleted:
235                 self.delete_remote_instance_constraints(instance_id)
236             updated = added + modified
237             for instance in [instance_constraints[i] in i in updated]:
238                 self.update_remote_instance_constraints(instance)
239             if updated or deleted:
240                 # Some instance constraints have changed
241                 self.instance_constraints = instance_constraints.copy()
242
243     def active_instance_id(self):
244         # Need rertry as it takes time after heat template done before
245         # Floating IP in place
246         retry = 5
247         while retry > 0:
248             for instance in self.ha_instances:
249                 network_interfaces = next(iter(instance.addresses.values()))
250                 for network_interface in network_interfaces:
251                     _type = network_interface.get('OS-EXT-IPS:type')
252                     if _type == "floating":
253                         if not self.floating_ip:
254                             self.floating_ip = network_interface.get('addr')
255                         self.log.debug('active_instance: %s %s' %
256                                        (instance.name, instance.id))
257                         return instance.id
258             time.sleep(2)
259             self.update_instances()
260             retry -= 1
261         raise Exception("No active instance found")
262
263     def switch_over_ha_instance(self):
264         for instance in self.ha_instances:
265             if instance.id != self.active_instance_id:
266                 self.log.info('Switch over to: %s %s' % (instance.name,
267                                                          instance.id))
268                 # Deprecated, need to use neutron instead
269                 # instance.add_floating_ip(self.floating_ip)
270                 port = self.neutron.list_ports(device_id=instance.id)['ports'][0]['id']  # noqa
271                 floating_id = self.neutron.list_floatingips(floating_ip_address=self.floating_ip)['floatingips'][0]['id']  # noqa
272                 self.neutron.update_floatingip(floating_id, {'floatingip': {'port_id': port}})  # noqa
273                 # Have to update ha_instances as floating_ip changed
274                 self.update_instances()
275                 self.active_instance_id = instance.id
276                 break
277
278     def get_instance_ids(self):
279         ret = list()
280         for instance in self.nova.servers.list(detailed=False):
281             ret.append(instance.id)
282         return ret
283
284     def update_instances(self):
285         instances = self.nova.servers.list(detailed=True)
286         self.ha_instances = [i for i in instances
287                              if "doctor_ha_app_" in i.name]
288         self.nonha_instances = [i for i in instances
289                                 if "doctor_nonha_app_" in i.name]
290
291     def _alarm_data_decoder(self, data):
292         if "[" in data or "{" in data:
293             # string to list or dict removing unicode
294             data = yaml.load(data.replace("u'", "'"))
295         return data
296
297     def _alarm_traits_decoder(self, data):
298         return ({str(t[0]): self._alarm_data_decoder(str(t[2]))
299                 for t in data['reason_data']['event']['traits']})
300
301     def get_session_instance_ids(self, url, session_id):
302         ret = requests.get(url, data=None, headers=self.headers)
303         if ret.status_code != 200:
304             raise Exception(ret.text)
305         self.log.info('get_instance_ids %s' % ret.json())
306         return ret.json()['instance_ids']
307
308     def scale_instances(self, number_of_instances):
309         number_of_instances_before = self.number_of_instances()
310
311         parameters = self.stack.parameters
312         parameters['nonha_intances'] += number_of_instances
313         self.stack.update(self.stack.stack_name,
314                           self.stack.stack_id,
315                           self.stack.template,
316                           parameters=parameters,
317                           files=self.stack.files)
318
319         number_of_instances_after = self.number_of_instances()
320         if (number_of_instances_before + number_of_instances !=
321            number_of_instances_after):
322             self.log.error('scale_instances with: %d from: %d ends up to: %d'
323                            % (number_of_instances, number_of_instances_before,
324                               number_of_instances_after))
325             raise Exception('scale_instances failed')
326
327         self.log.info('scaled instances from %d to %d' %
328                       (number_of_instances_before,
329                        number_of_instances_after))
330
331     def number_of_instances(self):
332         return len(self.nova.servers.list(detailed=False))
333
334     def run(self):
335         app = Flask('VNFM')
336
337         @app.route('/maintenance', methods=['POST'])
338         def maintenance_alarm():
339             data = json.loads(request.data.decode('utf8'))
340             try:
341                 payload = self._alarm_traits_decoder(data)
342             except Exception:
343                 payload = ({t[0]: t[2] for t in
344                            data['reason_data']['event']['traits']})
345                 self.log.error('cannot parse alarm data: %s' % payload)
346                 raise Exception('VNFM cannot parse alarm.'
347                                 'Possibly trait data over 256 char')
348
349             self.log.info('VNFM received data = %s' % payload)
350
351             state = payload['state']
352             reply_state = None
353             reply = dict()
354
355             self.log.info('VNFM state: %s' % state)
356
357             if state == 'MAINTENANCE':
358                 instance_ids = (self.get_session_instance_ids(
359                                 payload['instance_ids'],
360                                 payload['session_id']))
361                 reply['instance_ids'] = instance_ids
362                 reply_state = 'ACK_MAINTENANCE'
363
364             elif state == 'SCALE_IN':
365                 # scale down "self.scale" instances that is VCPUS equaling
366                 # at least a single compute node
367                 self.scale_instances(-self.scale)
368                 reply['instance_ids'] = self.get_instance_ids()
369                 reply_state = 'ACK_SCALE_IN'
370
371             elif state == 'MAINTENANCE_COMPLETE':
372                 # possibly need to upscale
373                 number_of_instances = self.number_of_instances()
374                 if self.orig_number_of_instances > number_of_instances:
375                     scale_instances = (self.orig_number_of_instances -
376                                        number_of_instances)
377                     self.scale_instances(scale_instances)
378                 reply_state = 'ACK_MAINTENANCE_COMPLETE'
379
380             elif state == 'PREPARE_MAINTENANCE':
381                 # TBD from contraints
382                 if "MIGRATE" not in payload['allowed_actions']:
383                     raise Exception('MIGRATE not supported')
384                 instance_ids = payload['instance_ids'][0]
385                 self.log.info('VNFM got instance: %s' % instance_ids)
386                 if instance_ids == self.active_instance_id:
387                     self.switch_over_ha_instance()
388                 # optional also in contraints
389                 reply['instance_action'] = "MIGRATE"
390                 reply_state = 'ACK_PREPARE_MAINTENANCE'
391
392             elif state == 'PLANNED_MAINTENANCE':
393                 # TBD from contraints
394                 if "MIGRATE" not in payload['allowed_actions']:
395                     raise Exception('MIGRATE not supported')
396                 instance_ids = payload['instance_ids'][0]
397                 self.log.info('VNFM got instance: %s' % instance_ids)
398                 if instance_ids == self.active_instance_id:
399                     self.switch_over_ha_instance()
400                 # optional also in contraints
401                 reply['instance_action'] = "MIGRATE"
402                 reply_state = 'ACK_PLANNED_MAINTENANCE'
403
404             elif state == 'INSTANCE_ACTION_DONE':
405                 # TBD was action done in allowed window
406                 self.log.info('%s' % payload['instance_ids'])
407             else:
408                 raise Exception('VNFM received event with'
409                                 ' unknown state %s' % state)
410
411             if reply_state:
412                 if self.conf.admin_tool.type == 'fenix':
413                     self.headers['X-Auth-Token'] = self.session.get_token()
414                 reply['session_id'] = payload['session_id']
415                 reply['state'] = reply_state
416                 url = payload['reply_url']
417                 self.log.info('VNFM reply: %s' % reply)
418                 requests.put(url, data=json.dumps(reply), headers=self.headers)
419
420             return 'OK'
421
422         @app.route('/shutdown', methods=['POST'])
423         def shutdown():
424             self.log.info('shutdown VNFM server at %s' % time.time())
425             func = request.environ.get('werkzeug.server.shutdown')
426             if func is None:
427                 raise RuntimeError('Not running with the Werkzeug Server')
428             func()
429             return 'VNFM shutting down...'
430
431         app.run(host="0.0.0.0", port=self.port)