// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include #include #include "rgw_period_pusher.h" #include "rgw_cr_rest.h" #include "common/errno.h" #include #define dout_subsys ceph_subsys_rgw #undef dout_prefix #define dout_prefix (*_dout << "rgw period pusher: ") /// A coroutine to post the period over the given connection. using PushCR = RGWPostRESTResourceCR; /// A coroutine that calls PushCR, and retries with backoff until success. class PushAndRetryCR : public RGWCoroutine { const std::string& zone; RGWRESTConn *const conn; RGWHTTPManager *const http; RGWPeriod& period; const std::string epoch; //< epoch string for params double timeout; //< current interval between retries const double timeout_max; //< maximum interval between retries uint32_t counter; //< number of failures since backoff increased public: PushAndRetryCR(CephContext* cct, const std::string& zone, RGWRESTConn* conn, RGWHTTPManager* http, RGWPeriod& period) : RGWCoroutine(cct), zone(zone), conn(conn), http(http), period(period), epoch(std::to_string(period.get_epoch())), timeout(cct->_conf->rgw_period_push_interval), timeout_max(cct->_conf->rgw_period_push_interval_max), counter(0) {} int operate() override; }; int PushAndRetryCR::operate() { reenter(this) { for (;;) { yield { ldout(cct, 10) << "pushing period " << period.get_id() << " to " << zone << dendl; // initialize the http params rgw_http_param_pair params[] = { { "period", period.get_id().c_str() }, { "epoch", epoch.c_str() }, { nullptr, nullptr } }; call(new PushCR(cct, conn, http, "/admin/realm/period", params, period, nullptr)); } // stop on success if (get_ret_status() == 0) { ldout(cct, 10) << "push to " << zone << " succeeded" << dendl; return set_cr_done(); } // try each endpoint in the connection before waiting if (++counter < conn->get_endpoint_count()) continue; counter = 0; // wait with exponential backoff up to timeout_max yield { utime_t dur; dur.set_from_double(timeout); ldout(cct, 10) << "waiting " << dur << "s for retry.." << dendl; wait(dur); timeout *= 2; if (timeout > timeout_max) timeout = timeout_max; } } } return 0; } /** * PushAllCR is a coroutine that sends the period over all of the given * connections, retrying until they are all marked as completed. */ class PushAllCR : public RGWCoroutine { RGWHTTPManager *const http; RGWPeriod period; //< period object to push std::map conns; //< zones that need the period public: PushAllCR(CephContext* cct, RGWHTTPManager* http, RGWPeriod&& period, std::map&& conns) : RGWCoroutine(cct), http(http), period(std::move(period)), conns(std::move(conns)) {} int operate() override; }; int PushAllCR::operate() { reenter(this) { // spawn a coroutine to push the period over each connection yield { ldout(cct, 4) << "sending " << conns.size() << " periods" << dendl; for (auto& c : conns) spawn(new PushAndRetryCR(cct, c.first, &c.second, http, period), false); } // wait for all to complete drain_all(); return set_cr_done(); } return 0; } /// A background thread to run the PushAllCR coroutine and exit. class RGWPeriodPusher::CRThread { RGWCoroutinesManager coroutines; RGWHTTPManager http; boost::intrusive_ptr push_all; std::thread thread; public: CRThread(CephContext* cct, RGWPeriod&& period, std::map&& conns) : coroutines(cct, NULL), http(cct, coroutines.get_completion_mgr()), push_all(new PushAllCR(cct, &http, std::move(period), std::move(conns))) { http.set_threaded(); // must spawn the CR thread after set_threaded thread = std::thread([this] { coroutines.run(push_all.get()); }); } ~CRThread() { push_all.reset(); coroutines.stop(); http.stop(); if (thread.joinable()) thread.join(); } }; RGWPeriodPusher::RGWPeriodPusher(RGWRados* store) : cct(store->ctx()), store(store) { const auto& realm = store->realm; auto& realm_id = realm.get_id(); if (realm_id.empty()) // no realm configuration return; // always send out the current period on startup RGWPeriod period; int r = period.init(cct, store, realm_id, realm.get_name()); if (r < 0) { lderr(cct) << "failed to load period for realm " << realm_id << dendl; return; } std::lock_guard lock(mutex); handle_notify(std::move(period)); } // destructor is here because CRThread is incomplete in the header RGWPeriodPusher::~RGWPeriodPusher() = default; void RGWPeriodPusher::handle_notify(RGWRealmNotify type, bufferlist::iterator& p) { // decode the period RGWZonesNeedPeriod info; try { ::decode(info, p); } catch (buffer::error& e) { lderr(cct) << "Failed to decode the period: " << e.what() << dendl; return; } std::lock_guard lock(mutex); // we can't process this notification without access to our current realm // configuration. queue it until resume() if (store == nullptr) { pending_periods.emplace_back(std::move(info)); return; } handle_notify(std::move(info)); } // expects the caller to hold a lock on mutex void RGWPeriodPusher::handle_notify(RGWZonesNeedPeriod&& period) { if (period.get_realm_epoch() < realm_epoch) { ldout(cct, 10) << "period's realm epoch " << period.get_realm_epoch() << " is not newer than current realm epoch " << realm_epoch << ", discarding update" << dendl; return; } if (period.get_realm_epoch() == realm_epoch && period.get_epoch() <= period_epoch) { ldout(cct, 10) << "period epoch " << period.get_epoch() << " is not newer " "than current epoch " << period_epoch << ", discarding update" << dendl; return; } // find our zonegroup in the new period auto& zonegroups = period.get_map().zonegroups; auto i = zonegroups.find(store->get_zonegroup().get_id()); if (i == zonegroups.end()) { lderr(cct) << "The new period does not contain my zonegroup!" << dendl; return; } auto& my_zonegroup = i->second; // if we're not a master zone, we're not responsible for pushing any updates if (my_zonegroup.master_zone != store->get_zone_params().get_id()) return; // construct a map of the zones that need this period. the map uses the same // keys/ordering as the zone[group] map, so we can use a hint for insertions std::map conns; auto hint = conns.end(); // are we the master zonegroup in this period? if (period.get_map().master_zonegroup == store->get_zonegroup().get_id()) { // update other zonegroup endpoints for (auto& zg : zonegroups) { auto& zonegroup = zg.second; if (zonegroup.get_id() == store->get_zonegroup().get_id()) continue; if (zonegroup.endpoints.empty()) continue; hint = conns.emplace_hint( hint, std::piecewise_construct, std::forward_as_tuple(zonegroup.get_id()), std::forward_as_tuple(cct, store, zonegroup.get_id(), zonegroup.endpoints)); } } // update other zone endpoints for (auto& z : my_zonegroup.zones) { auto& zone = z.second; if (zone.id == store->get_zone_params().get_id()) continue; if (zone.endpoints.empty()) continue; hint = conns.emplace_hint( hint, std::piecewise_construct, std::forward_as_tuple(zone.id), std::forward_as_tuple(cct, store, zone.id, zone.endpoints)); } if (conns.empty()) { ldout(cct, 4) << "No zones to update" << dendl; return; } realm_epoch = period.get_realm_epoch(); period_epoch = period.get_epoch(); ldout(cct, 4) << "Zone master pushing period " << period.get_id() << " epoch " << period_epoch << " to " << conns.size() << " other zones" << dendl; // spawn a new coroutine thread, destroying the previous one cr_thread.reset(new CRThread(cct, std::move(period), std::move(conns))); } void RGWPeriodPusher::pause() { ldout(cct, 4) << "paused for realm update" << dendl; std::lock_guard lock(mutex); store = nullptr; } void RGWPeriodPusher::resume(RGWRados* store) { std::lock_guard lock(mutex); this->store = store; ldout(cct, 4) << "resume with " << pending_periods.size() << " periods pending" << dendl; // process notification queue for (auto& info : pending_periods) { handle_notify(std::move(info)); } pending_periods.clear(); }