a7bc412662c55d6319a30dafce04c0f97c5ec496
[doctor.git] / doctor_tests / app_manager / sample.py
1 ##############################################################################
2 # Copyright (c) 2018 Nokia Corporation and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 from flask import Flask
10 from flask import request
11 import json
12 import yaml
13 import time
14 from threading import Thread
15 import requests
16
17 from doctor_tests.app_manager.base import BaseAppManager
18 from doctor_tests.identity_auth import get_identity_auth
19 from doctor_tests.identity_auth import get_session
20 from doctor_tests.os_clients import nova_client
21
22
23 class SampleAppManager(BaseAppManager):
24
25     def __init__(self, stack, conf, log):
26         super(SampleAppManager, self).__init__(conf, log)
27         self.stack = stack
28         self.app = None
29
30     def start(self):
31         self.log.info('sample app manager start......')
32         self.app = AppManager(self.stack, self.conf, self, self.log)
33         self.app.start()
34
35     def stop(self):
36         self.log.info('sample app manager stop......')
37         if not self.app:
38             return
39         headers = {
40             'Content-Type': 'application/json',
41             'Accept': 'application/json',
42         }
43         url = 'http://%s:%d/shutdown'\
44               % (self.conf.app_manager.ip,
45                  self.conf.app_manager.port)
46         requests.post(url, data='', headers=headers)
47
48
49 class AppManager(Thread):
50
51     def __init__(self, stack, conf, app_manager, log):
52         Thread.__init__(self)
53         self.stack = stack
54         self.conf = conf
55         self.port = self.conf.app_manager.port
56         self.app_manager = app_manager
57         self.log = log
58         self.intance_ids = None
59         self.headers = {
60             'Content-Type': 'application/json',
61             'Accept': 'application/json'}
62         self.auth = get_identity_auth(project=self.conf.doctor_project)
63         self.nova = nova_client(self.conf.nova_version,
64                                 get_session(auth=self.auth))
65         self.orig_number_of_instances = self.number_of_instances()
66         self.ha_instances = self.get_ha_instances()
67         self.floating_ip = None
68         self.active_instance_id = self.active_instance_id()
69
70     def active_instance_id(self):
71         for instance in self.ha_instances:
72             network_interfaces = next(iter(instance.addresses.values()))
73             for network_interface in network_interfaces:
74                 _type = network_interface.get('OS-EXT-IPS:type')
75                 if _type == "floating":
76                     if not self.floating_ip:
77                         self.floating_ip = network_interface.get('addr')
78                     self.log.debug('active_instance: %s %s' %
79                                    (instance.name, instance.id))
80                     return instance.id
81         raise Exception("No active instance found")
82
83     def switch_over_ha_instance(self):
84         for instance in self.ha_instances:
85             if instance.id != self.active_instance_id:
86                 self.log.info('Switch over to: %s %s' % (instance.name,
87                                                          instance.id))
88                 instance.add_floating_ip(self.floating_ip)
89                 self.active_instance_id = instance.id
90                 break
91
92     def get_instance_ids(self):
93         ret = list()
94         for instance in self.nova.servers.list(detailed=False):
95             ret.append(instance.id)
96         return ret
97
98     def get_ha_instances(self):
99         ha_instances = list()
100         for instance in self.nova.servers.list(detailed=True):
101             if "doctor_ha_app_" in instance.name:
102                 ha_instances.append(instance)
103                 self.log.debug('ha_instances: %s' % instance.name)
104         return ha_instances
105
106     def _alarm_data_decoder(self, data):
107         if "[" in data or "{" in data:
108             # string to list or dict removing unicode
109             data = yaml.load(data.replace("u'", "'"))
110         return data
111
112     def _alarm_traits_decoder(self, data):
113         return ({str(t[0]): self._alarm_data_decoder(str(t[2]))
114                 for t in data['reason_data']['event']['traits']})
115
116     def get_session_instance_ids(self, url, session_id):
117         ret = requests.get(url, data=None, headers=self.headers)
118         if ret.status_code != 200:
119             raise Exception(ret.text)
120         self.log.info('get_instance_ids %s' % ret.json())
121         return ret.json()['instance_ids']
122
123     def scale_instances(self, number_of_instances):
124         number_of_instances_before = self.number_of_instances()
125
126         parameters = self.stack.parameters
127         parameters['nonha_intances'] += number_of_instances
128         self.stack.update(self.stack.stack_name,
129                           self.stack.stack_id,
130                           self.stack.template,
131                           parameters=parameters,
132                           files=self.stack.files)
133
134         number_of_instances_after = self.number_of_instances()
135         if (number_of_instances_before + number_of_instances !=
136            number_of_instances_after):
137             self.log.error('scale_instances with: %d from: %d ends up to: %d'
138                            % (number_of_instances, number_of_instances_before,
139                               number_of_instances_after))
140             raise Exception('scale_instances failed')
141
142         self.log.info('scaled insances from %d to %d' %
143                       (number_of_instances_before,
144                        number_of_instances_after))
145
146     def number_of_instances(self):
147         return len(self.nova.servers.list(detailed=False))
148
149     def run(self):
150         app = Flask('app_manager')
151
152         @app.route('/maintenance', methods=['POST'])
153         def maintenance_alarm():
154             data = json.loads(request.data.decode('utf8'))
155             try:
156                 payload = self._alarm_traits_decoder(data)
157             except:
158                 payload = ({t[0]: t[2] for t in
159                            data['reason_data']['event']['traits']})
160                 self.log.error('cannot parse alarm data: %s' % payload)
161                 raise Exception('sample app manager cannot parse alarm.'
162                                 'Possibly trait data over 256 char')
163
164             self.log.info('sample app manager received data = %s' % payload)
165
166             state = payload['state']
167             reply_state = None
168             reply = dict()
169
170             self.log.info('sample app manager state: %s' % state)
171
172             if state == 'MAINTENANCE':
173                 instance_ids = (self.get_session_instance_ids(
174                                 payload['instance_ids'],
175                                 payload['session_id']))
176                 reply['instance_ids'] = instance_ids
177                 reply_state = 'ACK_MAINTENANCE'
178
179             elif state == 'SCALE_IN':
180                 # scale down 2 isntances that is VCPUS equaling to single
181                 # compute node
182                 self.scale_instances(-2)
183                 reply['instance_ids'] = self.get_instance_ids()
184                 reply_state = 'ACK_SCALE_IN'
185
186             elif state == 'MAINTENANCE_COMPLETE':
187                 # possibly need to upscale
188                 number_of_instances = self.number_of_instances()
189                 if self.orig_number_of_instances > number_of_instances:
190                     scale_instances = (self.orig_number_of_instances -
191                                        number_of_instances)
192                     self.scale_instances(scale_instances)
193                 reply_state = 'ACK_MAINTENANCE_COMPLETE'
194
195             elif state == 'PREPARE_MAINTENANCE':
196                 if "MIGRATE" not in payload['allowed_actions']:
197                     raise Exception('MIGRATE not supported')
198
199                 instance_ids = (self.get_session_instance_ids(
200                                 payload['instance_ids'],
201                                 payload['session_id']))
202                 self.log.info('sample app manager got instances: %s' %
203                               instance_ids)
204                 instance_actions = dict()
205                 for instance_id in instance_ids:
206                     instance_actions[instance_id] = "MIGRATE"
207                     if instance_id == self.active_instance_id:
208                         self.switch_over_ha_instance()
209                 reply['instance_actions'] = instance_actions
210                 reply_state = 'ACK_PREPARE_MAINTENANCE'
211
212             elif state == 'PLANNED_MAINTENANCE':
213                 if "MIGRATE" not in payload['allowed_actions']:
214                     raise Exception('MIGRATE not supported')
215
216                 instance_ids = (self.get_session_instance_ids(
217                                 payload['instance_ids'],
218                                 payload['session_id']))
219                 self.log.info('sample app manager got instances: %s' %
220                               instance_ids)
221                 instance_actions = dict()
222                 for instance_id in instance_ids:
223                     instance_actions[instance_id] = "MIGRATE"
224                     if instance_id == self.active_instance_id:
225                         self.switch_over_ha_instance()
226                 reply['instance_actions'] = instance_actions
227                 reply_state = 'ACK_PLANNED_MAINTENANCE'
228
229             elif state == 'INSTANCE_ACTION_DONE':
230                 self.log.info('%s' % payload['instance_ids'])
231
232             else:
233                 raise Exception('sample app manager received event with'
234                                 ' unknown state %s' % state)
235
236             if reply_state:
237                 reply['session_id'] = payload['session_id']
238                 reply['state'] = reply_state
239                 url = payload['reply_url']
240                 self.log.info('sample app manager reply: %s' % reply)
241                 requests.put(url, data=json.dumps(reply), headers=self.headers)
242
243             return 'OK'
244
245         @app.route('/shutdown', methods=['POST'])
246         def shutdown():
247             self.log.info('shutdown app manager server at %s' % time.time())
248             func = request.environ.get('werkzeug.server.shutdown')
249             if func is None:
250                 raise RuntimeError('Not running with the Werkzeug Server')
251             func()
252             return 'app manager shutting down...'
253
254         app.run(host="0.0.0.0", port=self.port)