DevStack support
[doctor.git] / doctor_tests / app_manager / sample.py
1 ##############################################################################
2 # Copyright (c) 2018 Nokia Corporation and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 from flask import Flask
10 from flask import request
11 import json
12 import yaml
13 import time
14 from threading import Thread
15 import requests
16
17 from doctor_tests.app_manager.base import BaseAppManager
18 from doctor_tests.identity_auth import get_identity_auth
19 from doctor_tests.identity_auth import get_session
20 from doctor_tests.os_clients import neutron_client
21 from doctor_tests.os_clients import nova_client
22
23
24 class SampleAppManager(BaseAppManager):
25
26     def __init__(self, stack, conf, log):
27         super(SampleAppManager, self).__init__(conf, log)
28         self.stack = stack
29         self.app = None
30
31     def start(self):
32         self.log.info('sample app manager start......')
33         self.app = AppManager(self.stack, self.conf, self, self.log)
34         self.app.start()
35
36     def stop(self):
37         self.log.info('sample app manager stop......')
38         if not self.app:
39             return
40         headers = {
41             'Content-Type': 'application/json',
42             'Accept': 'application/json',
43         }
44         url = 'http://%s:%d/shutdown'\
45               % (self.conf.app_manager.ip,
46                  self.conf.app_manager.port)
47         requests.post(url, data='', headers=headers)
48
49
50 class AppManager(Thread):
51
52     def __init__(self, stack, conf, app_manager, log):
53         Thread.__init__(self)
54         self.stack = stack
55         self.conf = conf
56         self.port = self.conf.app_manager.port
57         self.app_manager = app_manager
58         self.log = log
59         self.intance_ids = None
60         self.auth = get_identity_auth(project=self.conf.doctor_project)
61         self.session = get_session(auth=self.auth)
62         self.nova = nova_client(self.conf.nova_version,
63                                 self.session)
64         self.neutron = neutron_client(session=self.session)
65         self.headers = {
66             'Content-Type': 'application/json',
67             'Accept': 'application/json'}
68         if self.conf.admin_tool.type == 'fenix':
69             self.headers['X-Auth-Token'] = self.session.get_token()
70         self.orig_number_of_instances = self.number_of_instances()
71         self.ha_instances = self.get_ha_instances()
72         self.floating_ip = None
73         self.active_instance_id = self.active_instance_id()
74
75     def active_instance_id(self):
76         for instance in self.ha_instances:
77             network_interfaces = next(iter(instance.addresses.values()))
78             for network_interface in network_interfaces:
79                 _type = network_interface.get('OS-EXT-IPS:type')
80                 if _type == "floating":
81                     if not self.floating_ip:
82                         self.floating_ip = network_interface.get('addr')
83                     self.log.debug('active_instance: %s %s' %
84                                    (instance.name, instance.id))
85                     return instance.id
86         raise Exception("No active instance found")
87
88     def switch_over_ha_instance(self):
89         for instance in self.ha_instances:
90             if instance.id != self.active_instance_id:
91                 self.log.info('Switch over to: %s %s' % (instance.name,
92                                                          instance.id))
93                 # Deprecated, need to use neutron instead
94                 # instance.add_floating_ip(self.floating_ip)
95                 port = self.neutron.list_ports(device_id=instance.id)['ports'][0]['id']  # noqa
96                 floating_id = self.neutron.list_floatingips(floating_ip_address=self.floating_ip)['floatingips'][0]['id']  # noqa
97                 self.neutron.update_floatingip(floating_id, {'floatingip': {'port_id': port}})  # noqa
98                 # Have to update ha_instances as floating_ip changed
99                 self.ha_instances = self.get_ha_instances()
100                 self.active_instance_id = instance.id
101                 break
102
103     def get_instance_ids(self):
104         ret = list()
105         for instance in self.nova.servers.list(detailed=False):
106             ret.append(instance.id)
107         return ret
108
109     def get_ha_instances(self):
110         ha_instances = list()
111         for instance in self.nova.servers.list(detailed=True):
112             if "doctor_ha_app_" in instance.name:
113                 ha_instances.append(instance)
114                 self.log.debug('ha_instances: %s' % instance.name)
115         return ha_instances
116
117     def _alarm_data_decoder(self, data):
118         if "[" in data or "{" in data:
119             # string to list or dict removing unicode
120             data = yaml.load(data.replace("u'", "'"))
121         return data
122
123     def _alarm_traits_decoder(self, data):
124         return ({str(t[0]): self._alarm_data_decoder(str(t[2]))
125                 for t in data['reason_data']['event']['traits']})
126
127     def get_session_instance_ids(self, url, session_id):
128         ret = requests.get(url, data=None, headers=self.headers)
129         if ret.status_code != 200:
130             raise Exception(ret.text)
131         self.log.info('get_instance_ids %s' % ret.json())
132         return ret.json()['instance_ids']
133
134     def scale_instances(self, number_of_instances):
135         number_of_instances_before = self.number_of_instances()
136
137         parameters = self.stack.parameters
138         parameters['nonha_intances'] += number_of_instances
139         self.stack.update(self.stack.stack_name,
140                           self.stack.stack_id,
141                           self.stack.template,
142                           parameters=parameters,
143                           files=self.stack.files)
144
145         number_of_instances_after = self.number_of_instances()
146         if (number_of_instances_before + number_of_instances !=
147            number_of_instances_after):
148             self.log.error('scale_instances with: %d from: %d ends up to: %d'
149                            % (number_of_instances, number_of_instances_before,
150                               number_of_instances_after))
151             raise Exception('scale_instances failed')
152
153         self.log.info('scaled insances from %d to %d' %
154                       (number_of_instances_before,
155                        number_of_instances_after))
156
157     def number_of_instances(self):
158         return len(self.nova.servers.list(detailed=False))
159
160     def run(self):
161         app = Flask('app_manager')
162
163         @app.route('/maintenance', methods=['POST'])
164         def maintenance_alarm():
165             data = json.loads(request.data.decode('utf8'))
166             try:
167                 payload = self._alarm_traits_decoder(data)
168             except Exception:
169                 payload = ({t[0]: t[2] for t in
170                            data['reason_data']['event']['traits']})
171                 self.log.error('cannot parse alarm data: %s' % payload)
172                 raise Exception('sample app manager cannot parse alarm.'
173                                 'Possibly trait data over 256 char')
174
175             self.log.info('sample app manager received data = %s' % payload)
176
177             state = payload['state']
178             reply_state = None
179             reply = dict()
180
181             self.log.info('sample app manager state: %s' % state)
182
183             if state == 'MAINTENANCE':
184                 instance_ids = (self.get_session_instance_ids(
185                                 payload['instance_ids'],
186                                 payload['session_id']))
187                 reply['instance_ids'] = instance_ids
188                 reply_state = 'ACK_MAINTENANCE'
189
190             elif state == 'SCALE_IN':
191                 # scale down 2 isntances that is VCPUS equaling to single
192                 # compute node
193                 self.scale_instances(-2)
194                 reply['instance_ids'] = self.get_instance_ids()
195                 reply_state = 'ACK_SCALE_IN'
196
197             elif state == 'MAINTENANCE_COMPLETE':
198                 # possibly need to upscale
199                 number_of_instances = self.number_of_instances()
200                 if self.orig_number_of_instances > number_of_instances:
201                     scale_instances = (self.orig_number_of_instances -
202                                        number_of_instances)
203                     self.scale_instances(scale_instances)
204                 reply_state = 'ACK_MAINTENANCE_COMPLETE'
205
206             elif state == 'PREPARE_MAINTENANCE':
207                 if "MIGRATE" not in payload['allowed_actions']:
208                     raise Exception('MIGRATE not supported')
209
210                 instance_ids = (self.get_session_instance_ids(
211                                 payload['instance_ids'],
212                                 payload['session_id']))
213                 self.log.info('sample app manager got instances: %s' %
214                               instance_ids)
215                 instance_actions = dict()
216                 for instance_id in instance_ids:
217                     instance_actions[instance_id] = "MIGRATE"
218                     if instance_id == self.active_instance_id:
219                         self.switch_over_ha_instance()
220                 reply['instance_actions'] = instance_actions
221                 reply_state = 'ACK_PREPARE_MAINTENANCE'
222
223             elif state == 'PLANNED_MAINTENANCE':
224                 if "MIGRATE" not in payload['allowed_actions']:
225                     raise Exception('MIGRATE not supported')
226
227                 instance_ids = (self.get_session_instance_ids(
228                                 payload['instance_ids'],
229                                 payload['session_id']))
230                 self.log.info('sample app manager got instances: %s' %
231                               instance_ids)
232                 instance_actions = dict()
233                 for instance_id in instance_ids:
234                     instance_actions[instance_id] = "MIGRATE"
235                     if instance_id == self.active_instance_id:
236                         self.switch_over_ha_instance()
237                 reply['instance_actions'] = instance_actions
238                 reply_state = 'ACK_PLANNED_MAINTENANCE'
239
240             elif state == 'INSTANCE_ACTION_DONE':
241                 self.log.info('%s' % payload['instance_ids'])
242
243             else:
244                 raise Exception('sample app manager received event with'
245                                 ' unknown state %s' % state)
246
247             if reply_state:
248                 reply['session_id'] = payload['session_id']
249                 reply['state'] = reply_state
250                 url = payload['reply_url']
251                 self.log.info('sample app manager reply: %s' % reply)
252                 requests.put(url, data=json.dumps(reply), headers=self.headers)
253
254             return 'OK'
255
256         @app.route('/shutdown', methods=['POST'])
257         def shutdown():
258             self.log.info('shutdown app manager server at %s' % time.time())
259             func = request.environ.get('werkzeug.server.shutdown')
260             if func is None:
261                 raise RuntimeError('Not running with the Werkzeug Server')
262             func()
263             return 'app manager shutting down...'
264
265         app.run(host="0.0.0.0", port=self.port)