1 ##############################################################################
2 # Copyright (c) 2018 Nokia Corporation and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 from flask import Flask
10 from flask import request
14 from threading import Thread
17 from doctor_tests.app_manager.base import BaseAppManager
18 from doctor_tests.identity_auth import get_identity_auth
19 from doctor_tests.identity_auth import get_session
20 from doctor_tests.os_clients import nova_client
23 class SampleAppManager(BaseAppManager):
25 def __init__(self, stack, conf, log):
26 super(SampleAppManager, self).__init__(conf, log)
31 self.log.info('sample app manager start......')
32 self.app = AppManager(self.stack, self.conf, self, self.log)
36 self.log.info('sample app manager stop......')
40 'Content-Type': 'application/json',
41 'Accept': 'application/json',
43 url = 'http://%s:%d/shutdown'\
44 % (self.conf.app_manager.ip,
45 self.conf.app_manager.port)
46 requests.post(url, data='', headers=headers)
49 class AppManager(Thread):
51 def __init__(self, stack, conf, app_manager, log):
55 self.port = self.conf.app_manager.port
56 self.app_manager = app_manager
58 self.intance_ids = None
60 'Content-Type': 'application/json',
61 'Accept': 'application/json'}
62 self.auth = get_identity_auth(project=self.conf.doctor_project)
63 self.nova = nova_client(self.conf.nova_version,
64 get_session(auth=self.auth))
65 self.orig_number_of_instances = self.number_of_instances()
66 self.ha_instances = self.get_ha_instances()
67 self.floating_ip = None
68 self.active_instance_id = self.active_instance_id()
70 def active_instance_id(self):
71 for instance in self.ha_instances:
72 network_interfaces = next(iter(instance.addresses.values()))
73 for network_interface in network_interfaces:
74 _type = network_interface.get('OS-EXT-IPS:type')
75 if _type == "floating":
76 if not self.floating_ip:
77 self.floating_ip = network_interface.get('addr')
78 self.log.debug('active_instance: %s %s' %
79 (instance.name, instance.id))
81 raise Exception("No active instance found")
83 def switch_over_ha_instance(self):
84 for instance in self.ha_instances:
85 if instance.id != self.active_instance_id:
86 self.log.info('Switch over to: %s %s' % (instance.name,
88 instance.add_floating_ip(self.floating_ip)
89 self.active_instance_id = instance.id
92 def get_instance_ids(self):
94 for instance in self.nova.servers.list(detailed=False):
95 ret.append(instance.id)
98 def get_ha_instances(self):
100 for instance in self.nova.servers.list(detailed=True):
101 if "doctor_ha_app_" in instance.name:
102 ha_instances.append(instance)
103 self.log.debug('ha_instances: %s' % instance.name)
106 def _alarm_data_decoder(self, data):
107 if "[" in data or "{" in data:
108 # string to list or dict removing unicode
109 data = yaml.load(data.replace("u'", "'"))
112 def _alarm_traits_decoder(self, data):
113 return ({str(t[0]): self._alarm_data_decoder(str(t[2]))
114 for t in data['reason_data']['event']['traits']})
116 def get_session_instance_ids(self, url, session_id):
117 data = {'session_id': session_id}
118 ret = requests.get(url, data=json.dumps(data), headers=self.headers)
119 if ret.status_code != 200:
120 raise Exception(ret.text)
121 self.log.info('get_instance_ids %s' % ret.json())
122 return ret.json()['instance_ids']
124 def scale_instances(self, number_of_instances):
125 number_of_instances_before = self.number_of_instances()
127 parameters = self.stack.parameters
128 parameters['nonha_intances'] += number_of_instances
129 self.stack.update(self.stack.stack_name,
132 parameters=parameters,
133 files=self.stack.files)
135 number_of_instances_after = self.number_of_instances()
136 if (number_of_instances_before + number_of_instances !=
137 number_of_instances_after):
138 self.log.error('scale_instances with: %d from: %d ends up to: %d'
139 % (number_of_instances, number_of_instances_before,
140 number_of_instances_after))
141 raise Exception('scale_instances failed')
143 self.log.info('scaled insances from %d to %d' %
144 (number_of_instances_before,
145 number_of_instances_after))
147 def number_of_instances(self):
148 return len(self.nova.servers.list(detailed=False))
151 app = Flask('app_manager')
153 @app.route('/maintenance', methods=['POST'])
154 def maintenance_alarm():
155 data = json.loads(request.data.decode('utf8'))
157 payload = self._alarm_traits_decoder(data)
159 payload = ({t[0]: t[2] for t in
160 data['reason_data']['event']['traits']})
161 self.log.error('cannot parse alarm data: %s' % payload)
162 raise Exception('sample app manager cannot parse alarm.'
163 'Possibly trait data over 256 char')
165 self.log.info('sample app manager received data = %s' % payload)
167 state = payload['state']
171 self.log.info('sample app manager state: %s' % state)
173 if state == 'MAINTENANCE':
174 instance_ids = (self.get_session_instance_ids(
175 payload['instance_ids'],
176 payload['session_id']))
177 reply['instance_ids'] = instance_ids
178 reply_state = 'ACK_MAINTENANCE'
180 elif state == 'DOWN_SCALE':
181 # scale down 2 isntances that is VCPUS equaling to single
183 self.scale_instances(-2)
184 reply['instance_ids'] = self.get_instance_ids()
185 reply_state = 'ACK_DOWN_SCALE'
187 elif state == 'MAINTENANCE_COMPLETE':
188 # possibly need to upscale
189 number_of_instances = self.number_of_instances()
190 if self.orig_number_of_instances > number_of_instances:
191 scale_instances = (self.orig_number_of_instances -
193 self.scale_instances(scale_instances)
194 reply_state = 'ACK_MAINTENANCE_COMPLETE'
196 elif state == 'PREPARE_MAINTENANCE':
197 if "MIGRATE" not in payload['allowed_actions']:
198 raise Exception('MIGRATE not supported')
200 instance_ids = (self.get_session_instance_ids(
201 payload['instance_ids'],
202 payload['session_id']))
203 self.log.info('sample app manager got instances: %s' %
205 instance_actions = dict()
206 for instance_id in instance_ids:
207 instance_actions[instance_id] = "MIGRATE"
208 if instance_id == self.active_instance_id:
209 self.switch_over_ha_instance()
210 reply['instance_actions'] = instance_actions
211 reply_state = 'ACK_PREPARE_MAINTENANCE'
213 elif state == 'PLANNED_MAINTENANCE':
214 if "MIGRATE" not in payload['allowed_actions']:
215 raise Exception('MIGRATE not supported')
217 instance_ids = (self.get_session_instance_ids(
218 payload['instance_ids'],
219 payload['session_id']))
220 self.log.info('sample app manager got instances: %s' %
222 instance_actions = dict()
223 for instance_id in instance_ids:
224 instance_actions[instance_id] = "MIGRATE"
225 if instance_id == self.active_instance_id:
226 self.switch_over_ha_instance()
227 reply['instance_actions'] = instance_actions
228 reply_state = 'ACK_PLANNED_MAINTENANCE'
230 elif state == 'INSTANCE_ACTION_DONE':
231 self.log.info('%s' % payload['instance_ids'])
234 raise Exception('sample app manager received event with'
235 ' unknown state %s' % state)
238 reply['session_id'] = payload['session_id']
239 reply['state'] = reply_state
240 url = payload['reply_url']
241 self.log.info('sample app manager reply: %s' % reply)
242 requests.put(url, data=json.dumps(reply), headers=self.headers)
246 @app.route('/shutdown', methods=['POST'])
248 self.log.info('shutdown app manager server at %s' % time.time())
249 func = request.environ.get('werkzeug.server.shutdown')
251 raise RuntimeError('Not running with the Werkzeug Server')
253 return 'app manager shutting down...'
255 app.run(host="0.0.0.0", port=self.port)