Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_period_pusher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <map>
5 #include <thread>
6
7 #include "rgw_period_pusher.h"
8 #include "rgw_cr_rest.h"
9 #include "common/errno.h"
10 #include <boost/asio/yield.hpp>
11
12 #define dout_subsys ceph_subsys_rgw
13
14 #undef dout_prefix
15 #define dout_prefix (*_dout << "rgw period pusher: ")
16
17 /// A coroutine to post the period over the given connection.
18 using PushCR = RGWPostRESTResourceCR<RGWPeriod, int>;
19
20 /// A coroutine that calls PushCR, and retries with backoff until success.
21 class PushAndRetryCR : public RGWCoroutine {
22   const std::string& zone;
23   RGWRESTConn *const conn;
24   RGWHTTPManager *const http;
25   RGWPeriod& period;
26   const std::string epoch; //< epoch string for params
27   double timeout; //< current interval between retries
28   const double timeout_max; //< maximum interval between retries
29   uint32_t counter; //< number of failures since backoff increased
30
31  public:
32   PushAndRetryCR(CephContext* cct, const std::string& zone, RGWRESTConn* conn,
33                  RGWHTTPManager* http, RGWPeriod& period)
34     : RGWCoroutine(cct), zone(zone), conn(conn), http(http), period(period),
35       epoch(std::to_string(period.get_epoch())),
36       timeout(cct->_conf->rgw_period_push_interval),
37       timeout_max(cct->_conf->rgw_period_push_interval_max),
38       counter(0)
39   {}
40
41   int operate() override;
42 };
43
44 int PushAndRetryCR::operate()
45 {
46   reenter(this) {
47     for (;;) {
48       yield {
49         ldout(cct, 10) << "pushing period " << period.get_id()
50             << " to " << zone << dendl;
51         // initialize the http params
52         rgw_http_param_pair params[] = {
53           { "period", period.get_id().c_str() },
54           { "epoch", epoch.c_str() },
55           { nullptr, nullptr }
56         };
57         call(new PushCR(cct, conn, http, "/admin/realm/period",
58                         params, period, nullptr));
59       }
60
61       // stop on success
62       if (get_ret_status() == 0) {
63         ldout(cct, 10) << "push to " << zone << " succeeded" << dendl;
64         return set_cr_done();
65       }
66
67       // try each endpoint in the connection before waiting
68       if (++counter < conn->get_endpoint_count())
69         continue;
70       counter = 0;
71
72       // wait with exponential backoff up to timeout_max
73       yield {
74         utime_t dur;
75         dur.set_from_double(timeout);
76
77         ldout(cct, 10) << "waiting " << dur << "s for retry.." << dendl;
78         wait(dur);
79
80         timeout *= 2;
81         if (timeout > timeout_max)
82           timeout = timeout_max;
83       }
84     }
85   }
86   return 0;
87 }
88
89 /**
90  * PushAllCR is a coroutine that sends the period over all of the given
91  * connections, retrying until they are all marked as completed.
92  */
93 class PushAllCR : public RGWCoroutine {
94   RGWHTTPManager *const http;
95   RGWPeriod period; //< period object to push
96   std::map<std::string, RGWRESTConn> conns; //< zones that need the period
97
98  public:
99   PushAllCR(CephContext* cct, RGWHTTPManager* http, RGWPeriod&& period,
100             std::map<std::string, RGWRESTConn>&& conns)
101     : RGWCoroutine(cct), http(http),
102       period(std::move(period)),
103       conns(std::move(conns))
104   {}
105
106   int operate() override;
107 };
108
109 int PushAllCR::operate()
110 {
111   reenter(this) {
112     // spawn a coroutine to push the period over each connection
113     yield {
114       ldout(cct, 4) << "sending " << conns.size() << " periods" << dendl;
115       for (auto& c : conns)
116         spawn(new PushAndRetryCR(cct, c.first, &c.second, http, period), false);
117     }
118     // wait for all to complete
119     drain_all();
120     return set_cr_done();
121   }
122   return 0;
123 }
124
125 /// A background thread to run the PushAllCR coroutine and exit.
126 class RGWPeriodPusher::CRThread {
127   RGWCoroutinesManager coroutines;
128   RGWHTTPManager http;
129   boost::intrusive_ptr<PushAllCR> push_all;
130   std::thread thread;
131
132  public:
133   CRThread(CephContext* cct, RGWPeriod&& period,
134            std::map<std::string, RGWRESTConn>&& conns)
135     : coroutines(cct, NULL),
136       http(cct, coroutines.get_completion_mgr()),
137       push_all(new PushAllCR(cct, &http, std::move(period), std::move(conns)))
138   {
139     http.set_threaded();
140     // must spawn the CR thread after set_threaded
141     thread = std::thread([this] { coroutines.run(push_all.get()); });
142   }
143   ~CRThread()
144   {
145     push_all.reset();
146     coroutines.stop();
147     http.stop();
148     if (thread.joinable())
149       thread.join();
150   }
151 };
152
153
154 RGWPeriodPusher::RGWPeriodPusher(RGWRados* store)
155   : cct(store->ctx()), store(store)
156 {
157   const auto& realm = store->realm;
158   auto& realm_id = realm.get_id();
159   if (realm_id.empty()) // no realm configuration
160     return;
161
162   // always send out the current period on startup
163   RGWPeriod period;
164   int r = period.init(cct, store, realm_id, realm.get_name());
165   if (r < 0) {
166     lderr(cct) << "failed to load period for realm " << realm_id << dendl;
167     return;
168   }
169
170   std::lock_guard<std::mutex> lock(mutex);
171   handle_notify(std::move(period));
172 }
173
174 // destructor is here because CRThread is incomplete in the header
175 RGWPeriodPusher::~RGWPeriodPusher() = default;
176
177 void RGWPeriodPusher::handle_notify(RGWRealmNotify type,
178                                     bufferlist::iterator& p)
179 {
180   // decode the period
181   RGWZonesNeedPeriod info;
182   try {
183     ::decode(info, p);
184   } catch (buffer::error& e) {
185     lderr(cct) << "Failed to decode the period: " << e.what() << dendl;
186     return;
187   }
188
189   std::lock_guard<std::mutex> lock(mutex);
190
191   // we can't process this notification without access to our current realm
192   // configuration. queue it until resume()
193   if (store == nullptr) {
194     pending_periods.emplace_back(std::move(info));
195     return;
196   }
197
198   handle_notify(std::move(info));
199 }
200
201 // expects the caller to hold a lock on mutex
202 void RGWPeriodPusher::handle_notify(RGWZonesNeedPeriod&& period)
203 {
204   if (period.get_realm_epoch() < realm_epoch) {
205     ldout(cct, 10) << "period's realm epoch " << period.get_realm_epoch()
206         << " is not newer than current realm epoch " << realm_epoch
207         << ", discarding update" << dendl;
208     return;
209   }
210   if (period.get_realm_epoch() == realm_epoch &&
211       period.get_epoch() <= period_epoch) {
212     ldout(cct, 10) << "period epoch " << period.get_epoch() << " is not newer "
213         "than current epoch " << period_epoch << ", discarding update" << dendl;
214     return;
215   }
216
217   // find our zonegroup in the new period
218   auto& zonegroups = period.get_map().zonegroups;
219   auto i = zonegroups.find(store->get_zonegroup().get_id());
220   if (i == zonegroups.end()) {
221     lderr(cct) << "The new period does not contain my zonegroup!" << dendl;
222     return;
223   }
224   auto& my_zonegroup = i->second;
225
226   // if we're not a master zone, we're not responsible for pushing any updates
227   if (my_zonegroup.master_zone != store->get_zone_params().get_id())
228     return;
229
230   // construct a map of the zones that need this period. the map uses the same
231   // keys/ordering as the zone[group] map, so we can use a hint for insertions
232   std::map<std::string, RGWRESTConn> conns;
233   auto hint = conns.end();
234
235   // are we the master zonegroup in this period?
236   if (period.get_map().master_zonegroup == store->get_zonegroup().get_id()) {
237     // update other zonegroup endpoints
238     for (auto& zg : zonegroups) {
239       auto& zonegroup = zg.second;
240       if (zonegroup.get_id() == store->get_zonegroup().get_id())
241         continue;
242       if (zonegroup.endpoints.empty())
243         continue;
244
245       hint = conns.emplace_hint(
246           hint, std::piecewise_construct,
247           std::forward_as_tuple(zonegroup.get_id()),
248           std::forward_as_tuple(cct, store, zonegroup.get_id(), zonegroup.endpoints));
249     }
250   }
251
252   // update other zone endpoints
253   for (auto& z : my_zonegroup.zones) {
254     auto& zone = z.second;
255     if (zone.id == store->get_zone_params().get_id())
256       continue;
257     if (zone.endpoints.empty())
258       continue;
259
260     hint = conns.emplace_hint(
261         hint, std::piecewise_construct,
262         std::forward_as_tuple(zone.id),
263         std::forward_as_tuple(cct, store, zone.id, zone.endpoints));
264   }
265
266   if (conns.empty()) {
267     ldout(cct, 4) << "No zones to update" << dendl;
268     return;
269   }
270
271   realm_epoch = period.get_realm_epoch();
272   period_epoch = period.get_epoch();
273
274   ldout(cct, 4) << "Zone master pushing period " << period.get_id()
275       << " epoch " << period_epoch << " to "
276       << conns.size() << " other zones" << dendl;
277
278   // spawn a new coroutine thread, destroying the previous one
279   cr_thread.reset(new CRThread(cct, std::move(period), std::move(conns)));
280 }
281
282 void RGWPeriodPusher::pause()
283 {
284   ldout(cct, 4) << "paused for realm update" << dendl;
285   std::lock_guard<std::mutex> lock(mutex);
286   store = nullptr;
287 }
288
289 void RGWPeriodPusher::resume(RGWRados* store)
290 {
291   std::lock_guard<std::mutex> lock(mutex);
292   this->store = store;
293
294   ldout(cct, 4) << "resume with " << pending_periods.size()
295       << " periods pending" << dendl;
296
297   // process notification queue
298   for (auto& info : pending_periods) {
299     handle_notify(std::move(info));
300   }
301   pending_periods.clear();
302 }