1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
7 #include "rgw_period_pusher.h"
8 #include "rgw_cr_rest.h"
9 #include "common/errno.h"
10 #include <boost/asio/yield.hpp>
12 #define dout_subsys ceph_subsys_rgw
15 #define dout_prefix (*_dout << "rgw period pusher: ")
17 /// A coroutine to post the period over the given connection.
18 using PushCR = RGWPostRESTResourceCR<RGWPeriod, int>;
20 /// A coroutine that calls PushCR, and retries with backoff until success.
21 class PushAndRetryCR : public RGWCoroutine {
22 const std::string& zone;
23 RGWRESTConn *const conn;
24 RGWHTTPManager *const http;
26 const std::string epoch; //< epoch string for params
27 double timeout; //< current interval between retries
28 const double timeout_max; //< maximum interval between retries
29 uint32_t counter; //< number of failures since backoff increased
32 PushAndRetryCR(CephContext* cct, const std::string& zone, RGWRESTConn* conn,
33 RGWHTTPManager* http, RGWPeriod& period)
34 : RGWCoroutine(cct), zone(zone), conn(conn), http(http), period(period),
35 epoch(std::to_string(period.get_epoch())),
36 timeout(cct->_conf->rgw_period_push_interval),
37 timeout_max(cct->_conf->rgw_period_push_interval_max),
41 int operate() override;
44 int PushAndRetryCR::operate()
49 ldout(cct, 10) << "pushing period " << period.get_id()
50 << " to " << zone << dendl;
51 // initialize the http params
52 rgw_http_param_pair params[] = {
53 { "period", period.get_id().c_str() },
54 { "epoch", epoch.c_str() },
57 call(new PushCR(cct, conn, http, "/admin/realm/period",
58 params, period, nullptr));
62 if (get_ret_status() == 0) {
63 ldout(cct, 10) << "push to " << zone << " succeeded" << dendl;
67 // try each endpoint in the connection before waiting
68 if (++counter < conn->get_endpoint_count())
72 // wait with exponential backoff up to timeout_max
75 dur.set_from_double(timeout);
77 ldout(cct, 10) << "waiting " << dur << "s for retry.." << dendl;
81 if (timeout > timeout_max)
82 timeout = timeout_max;
90 * PushAllCR is a coroutine that sends the period over all of the given
91 * connections, retrying until they are all marked as completed.
93 class PushAllCR : public RGWCoroutine {
94 RGWHTTPManager *const http;
95 RGWPeriod period; //< period object to push
96 std::map<std::string, RGWRESTConn> conns; //< zones that need the period
99 PushAllCR(CephContext* cct, RGWHTTPManager* http, RGWPeriod&& period,
100 std::map<std::string, RGWRESTConn>&& conns)
101 : RGWCoroutine(cct), http(http),
102 period(std::move(period)),
103 conns(std::move(conns))
106 int operate() override;
109 int PushAllCR::operate()
112 // spawn a coroutine to push the period over each connection
114 ldout(cct, 4) << "sending " << conns.size() << " periods" << dendl;
115 for (auto& c : conns)
116 spawn(new PushAndRetryCR(cct, c.first, &c.second, http, period), false);
118 // wait for all to complete
120 return set_cr_done();
125 /// A background thread to run the PushAllCR coroutine and exit.
126 class RGWPeriodPusher::CRThread {
127 RGWCoroutinesManager coroutines;
129 boost::intrusive_ptr<PushAllCR> push_all;
133 CRThread(CephContext* cct, RGWPeriod&& period,
134 std::map<std::string, RGWRESTConn>&& conns)
135 : coroutines(cct, NULL),
136 http(cct, coroutines.get_completion_mgr()),
137 push_all(new PushAllCR(cct, &http, std::move(period), std::move(conns)))
140 // must spawn the CR thread after set_threaded
141 thread = std::thread([this] { coroutines.run(push_all.get()); });
148 if (thread.joinable())
154 RGWPeriodPusher::RGWPeriodPusher(RGWRados* store)
155 : cct(store->ctx()), store(store)
157 const auto& realm = store->realm;
158 auto& realm_id = realm.get_id();
159 if (realm_id.empty()) // no realm configuration
162 // always send out the current period on startup
164 int r = period.init(cct, store, realm_id, realm.get_name());
166 lderr(cct) << "failed to load period for realm " << realm_id << dendl;
170 std::lock_guard<std::mutex> lock(mutex);
171 handle_notify(std::move(period));
174 // destructor is here because CRThread is incomplete in the header
175 RGWPeriodPusher::~RGWPeriodPusher() = default;
177 void RGWPeriodPusher::handle_notify(RGWRealmNotify type,
178 bufferlist::iterator& p)
181 RGWZonesNeedPeriod info;
184 } catch (buffer::error& e) {
185 lderr(cct) << "Failed to decode the period: " << e.what() << dendl;
189 std::lock_guard<std::mutex> lock(mutex);
191 // we can't process this notification without access to our current realm
192 // configuration. queue it until resume()
193 if (store == nullptr) {
194 pending_periods.emplace_back(std::move(info));
198 handle_notify(std::move(info));
201 // expects the caller to hold a lock on mutex
202 void RGWPeriodPusher::handle_notify(RGWZonesNeedPeriod&& period)
204 if (period.get_realm_epoch() < realm_epoch) {
205 ldout(cct, 10) << "period's realm epoch " << period.get_realm_epoch()
206 << " is not newer than current realm epoch " << realm_epoch
207 << ", discarding update" << dendl;
210 if (period.get_realm_epoch() == realm_epoch &&
211 period.get_epoch() <= period_epoch) {
212 ldout(cct, 10) << "period epoch " << period.get_epoch() << " is not newer "
213 "than current epoch " << period_epoch << ", discarding update" << dendl;
217 // find our zonegroup in the new period
218 auto& zonegroups = period.get_map().zonegroups;
219 auto i = zonegroups.find(store->get_zonegroup().get_id());
220 if (i == zonegroups.end()) {
221 lderr(cct) << "The new period does not contain my zonegroup!" << dendl;
224 auto& my_zonegroup = i->second;
226 // if we're not a master zone, we're not responsible for pushing any updates
227 if (my_zonegroup.master_zone != store->get_zone_params().get_id())
230 // construct a map of the zones that need this period. the map uses the same
231 // keys/ordering as the zone[group] map, so we can use a hint for insertions
232 std::map<std::string, RGWRESTConn> conns;
233 auto hint = conns.end();
235 // are we the master zonegroup in this period?
236 if (period.get_map().master_zonegroup == store->get_zonegroup().get_id()) {
237 // update other zonegroup endpoints
238 for (auto& zg : zonegroups) {
239 auto& zonegroup = zg.second;
240 if (zonegroup.get_id() == store->get_zonegroup().get_id())
242 if (zonegroup.endpoints.empty())
245 hint = conns.emplace_hint(
246 hint, std::piecewise_construct,
247 std::forward_as_tuple(zonegroup.get_id()),
248 std::forward_as_tuple(cct, store, zonegroup.get_id(), zonegroup.endpoints));
252 // update other zone endpoints
253 for (auto& z : my_zonegroup.zones) {
254 auto& zone = z.second;
255 if (zone.id == store->get_zone_params().get_id())
257 if (zone.endpoints.empty())
260 hint = conns.emplace_hint(
261 hint, std::piecewise_construct,
262 std::forward_as_tuple(zone.id),
263 std::forward_as_tuple(cct, store, zone.id, zone.endpoints));
267 ldout(cct, 4) << "No zones to update" << dendl;
271 realm_epoch = period.get_realm_epoch();
272 period_epoch = period.get_epoch();
274 ldout(cct, 4) << "Zone master pushing period " << period.get_id()
275 << " epoch " << period_epoch << " to "
276 << conns.size() << " other zones" << dendl;
278 // spawn a new coroutine thread, destroying the previous one
279 cr_thread.reset(new CRThread(cct, std::move(period), std::move(conns)));
282 void RGWPeriodPusher::pause()
284 ldout(cct, 4) << "paused for realm update" << dendl;
285 std::lock_guard<std::mutex> lock(mutex);
289 void RGWPeriodPusher::resume(RGWRados* store)
291 std::lock_guard<std::mutex> lock(mutex);
294 ldout(cct, 4) << "resume with " << pending_periods.size()
295 << " periods pending" << dendl;
297 // process notification queue
298 for (auto& info : pending_periods) {
299 handle_notify(std::move(info));
301 pending_periods.clear();