Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / journal / JournalMetadata.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "journal/JournalMetadata.h"
5 #include "journal/Utils.h"
6 #include "common/errno.h"
7 #include "common/Timer.h"
8 #include "cls/journal/cls_journal_client.h"
9 #include <functional>
10 #include <set>
11
12 #define dout_subsys ceph_subsys_journaler
13 #undef dout_prefix
14 #define dout_prefix *_dout << "JournalMetadata: " << this << " "
15
16 namespace journal {
17
18 using namespace cls::journal;
19
20 namespace {
21
22 struct C_GetClient : public Context {
23   CephContext *cct;
24   librados::IoCtx &ioctx;
25   const std::string &oid;
26   AsyncOpTracker &async_op_tracker;
27   std::string client_id;
28   cls::journal::Client *client;
29   Context *on_finish;
30
31   bufferlist out_bl;
32
33   C_GetClient(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid,
34               AsyncOpTracker &async_op_tracker, const std::string &client_id,
35               cls::journal::Client *client, Context *on_finish)
36     : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
37       client_id(client_id), client(client), on_finish(on_finish) {
38     async_op_tracker.start_op();
39   }
40   ~C_GetClient() override {
41     async_op_tracker.finish_op();
42   }
43
44   virtual void send() {
45     send_get_client();
46   }
47
48   void send_get_client() {
49     ldout(cct, 20) << "C_GetClient: " << __func__ << dendl;
50
51     librados::ObjectReadOperation op;
52     client::get_client_start(&op, client_id);
53
54     librados::AioCompletion *comp = librados::Rados::aio_create_completion(
55       this, nullptr, &utils::rados_state_callback<
56         C_GetClient, &C_GetClient::handle_get_client>);
57
58     int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
59     assert(r == 0);
60     comp->release();
61   }
62
63   void handle_get_client(int r) {
64     ldout(cct, 20) << "C_GetClient: " << __func__ << ": r=" << r << dendl;
65
66     if (r == 0) {
67       bufferlist::iterator it = out_bl.begin();
68       r = client::get_client_finish(&it, client);
69     }
70     complete(r);
71   }
72
73   void finish(int r) override {
74     on_finish->complete(r);
75   }
76 };
77
78 struct C_AllocateTag : public Context {
79   CephContext *cct;
80   librados::IoCtx &ioctx;
81   const std::string &oid;
82   AsyncOpTracker &async_op_tracker;
83   uint64_t tag_class;
84   Tag *tag;
85   Context *on_finish;
86
87   bufferlist out_bl;
88
89   C_AllocateTag(CephContext *cct, librados::IoCtx &ioctx,
90                 const std::string &oid, AsyncOpTracker &async_op_tracker,
91                 uint64_t tag_class, const bufferlist &data, Tag *tag,
92                 Context *on_finish)
93     : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
94       tag_class(tag_class), tag(tag), on_finish(on_finish) {
95     async_op_tracker.start_op();
96     tag->data = data;
97   }
98   ~C_AllocateTag() override {
99     async_op_tracker.finish_op();
100   }
101
102   void send() {
103     send_get_next_tag_tid();
104   }
105
106   void send_get_next_tag_tid() {
107     ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
108
109     librados::ObjectReadOperation op;
110     client::get_next_tag_tid_start(&op);
111
112     librados::AioCompletion *comp = librados::Rados::aio_create_completion(
113       this, nullptr, &utils::rados_state_callback<
114         C_AllocateTag, &C_AllocateTag::handle_get_next_tag_tid>);
115
116     out_bl.clear();
117     int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
118     assert(r == 0);
119     comp->release();
120   }
121
122   void handle_get_next_tag_tid(int r) {
123     ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
124
125     if (r == 0) {
126       bufferlist::iterator iter = out_bl.begin();
127       r = client::get_next_tag_tid_finish(&iter, &tag->tid);
128     }
129     if (r < 0) {
130       complete(r);
131       return;
132     }
133     send_tag_create();
134   }
135
136   void send_tag_create() {
137     ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
138
139     librados::ObjectWriteOperation op;
140     client::tag_create(&op, tag->tid, tag_class, tag->data);
141
142     librados::AioCompletion *comp = librados::Rados::aio_create_completion(
143       this, nullptr, &utils::rados_state_callback<
144         C_AllocateTag, &C_AllocateTag::handle_tag_create>);
145
146     int r = ioctx.aio_operate(oid, comp, &op);
147     assert(r == 0);
148     comp->release();
149   }
150
151   void handle_tag_create(int r) {
152     ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
153
154     if (r == -ESTALE) {
155       send_get_next_tag_tid();
156       return;
157     } else if (r < 0) {
158       complete(r);
159       return;
160     }
161
162     send_get_tag();
163   }
164
165   void send_get_tag() {
166     ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
167
168     librados::ObjectReadOperation op;
169     client::get_tag_start(&op, tag->tid);
170
171     librados::AioCompletion *comp = librados::Rados::aio_create_completion(
172       this, nullptr, &utils::rados_state_callback<
173         C_AllocateTag, &C_AllocateTag::handle_get_tag>);
174
175     out_bl.clear();
176     int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
177     assert(r == 0);
178     comp->release();
179   }
180
181   void handle_get_tag(int r) {
182     ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
183
184     if (r == 0) {
185       bufferlist::iterator iter = out_bl.begin();
186
187       cls::journal::Tag journal_tag;
188       r = client::get_tag_finish(&iter, &journal_tag);
189       if (r == 0) {
190         *tag = journal_tag;
191       }
192     }
193     complete(r);
194   }
195
196   void finish(int r) override {
197     on_finish->complete(r);
198   }
199 };
200
201 struct C_GetTag : public Context {
202   CephContext *cct;
203   librados::IoCtx &ioctx;
204   const std::string &oid;
205   AsyncOpTracker &async_op_tracker;
206   uint64_t tag_tid;
207   JournalMetadata::Tag *tag;
208   Context *on_finish;
209
210   bufferlist out_bl;
211
212   C_GetTag(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid,
213            AsyncOpTracker &async_op_tracker, uint64_t tag_tid,
214            JournalMetadata::Tag *tag, Context *on_finish)
215     : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
216       tag_tid(tag_tid), tag(tag), on_finish(on_finish) {
217     async_op_tracker.start_op();
218   }
219   ~C_GetTag() override {
220     async_op_tracker.finish_op();
221   }
222
223   void send() {
224     send_get_tag();
225   }
226
227   void send_get_tag() {
228     librados::ObjectReadOperation op;
229     client::get_tag_start(&op, tag_tid);
230
231     librados::AioCompletion *comp = librados::Rados::aio_create_completion(
232       this, nullptr, &utils::rados_state_callback<
233         C_GetTag, &C_GetTag::handle_get_tag>);
234
235     int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
236     assert(r == 0);
237     comp->release();
238   }
239
240   void handle_get_tag(int r) {
241     if (r == 0) {
242       bufferlist::iterator iter = out_bl.begin();
243       r = client::get_tag_finish(&iter, tag);
244     }
245     complete(r);
246   }
247
248   void finish(int r) override {
249     on_finish->complete(r);
250   }
251 };
252
253 struct C_GetTags : public Context {
254   CephContext *cct;
255   librados::IoCtx &ioctx;
256   const std::string &oid;
257   const std::string &client_id;
258   AsyncOpTracker &async_op_tracker;
259   uint64_t start_after_tag_tid;
260   boost::optional<uint64_t> tag_class;
261   JournalMetadata::Tags *tags;
262   Context *on_finish;
263
264   const uint64_t MAX_RETURN = 64;
265   bufferlist out_bl;
266
267   C_GetTags(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid,
268             const std::string &client_id, AsyncOpTracker &async_op_tracker,
269             uint64_t start_after_tag_tid,
270             const boost::optional<uint64_t> &tag_class,
271             JournalMetadata::Tags *tags, Context *on_finish)
272     : cct(cct), ioctx(ioctx), oid(oid), client_id(client_id),
273       async_op_tracker(async_op_tracker),
274       start_after_tag_tid(start_after_tag_tid), tag_class(tag_class),
275       tags(tags), on_finish(on_finish) {
276     async_op_tracker.start_op();
277   }
278   ~C_GetTags() override {
279     async_op_tracker.finish_op();
280   }
281
282   void send() {
283     send_tag_list();
284   }
285
286   void send_tag_list() {
287     librados::ObjectReadOperation op;
288     client::tag_list_start(&op, start_after_tag_tid, MAX_RETURN, client_id,
289                            tag_class);
290
291     librados::AioCompletion *comp = librados::Rados::aio_create_completion(
292       this, nullptr, &utils::rados_state_callback<
293         C_GetTags, &C_GetTags::handle_tag_list>);
294
295     out_bl.clear();
296     int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
297     assert(r == 0);
298     comp->release();
299   }
300
301   void handle_tag_list(int r) {
302     if (r == 0) {
303       std::set<cls::journal::Tag> journal_tags;
304       bufferlist::iterator iter = out_bl.begin();
305       r = client::tag_list_finish(&iter, &journal_tags);
306       if (r == 0) {
307         for (auto &journal_tag : journal_tags) {
308           tags->push_back(journal_tag);
309           start_after_tag_tid = journal_tag.tid;
310         }
311
312         if (journal_tags.size() == MAX_RETURN) {
313           send_tag_list();
314           return;
315         }
316       }
317     }
318     complete(r);
319   }
320
321   void finish(int r) override {
322     on_finish->complete(r);
323   }
324 };
325
326 struct C_FlushCommitPosition : public Context {
327   Context *commit_position_ctx;
328   Context *on_finish;
329
330   C_FlushCommitPosition(Context *commit_position_ctx, Context *on_finish)
331     : commit_position_ctx(commit_position_ctx), on_finish(on_finish) {
332   }
333   void finish(int r) override {
334     if (commit_position_ctx != nullptr) {
335       commit_position_ctx->complete(r);
336     }
337     on_finish->complete(r);
338   }
339 };
340
341 struct C_AssertActiveTag : public Context {
342   CephContext *cct;
343   librados::IoCtx &ioctx;
344   const std::string &oid;
345   AsyncOpTracker &async_op_tracker;
346   std::string client_id;
347   uint64_t tag_tid;
348   Context *on_finish;
349
350   bufferlist out_bl;
351
352   C_AssertActiveTag(CephContext *cct, librados::IoCtx &ioctx,
353                     const std::string &oid, AsyncOpTracker &async_op_tracker,
354                     const std::string &client_id, uint64_t tag_tid,
355                     Context *on_finish)
356     : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
357       client_id(client_id), tag_tid(tag_tid), on_finish(on_finish) {
358     async_op_tracker.start_op();
359   }
360   ~C_AssertActiveTag() override {
361     async_op_tracker.finish_op();
362   }
363
364   void send() {
365     ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << dendl;
366
367     librados::ObjectReadOperation op;
368     client::tag_list_start(&op, tag_tid, 2, client_id, boost::none);
369
370     librados::AioCompletion *comp = librados::Rados::aio_create_completion(
371       this, nullptr, &utils::rados_state_callback<
372         C_AssertActiveTag, &C_AssertActiveTag::handle_send>);
373
374     int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
375     assert(r == 0);
376     comp->release();
377   }
378
379   void handle_send(int r) {
380     ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << ": r=" << r << dendl;
381
382     std::set<cls::journal::Tag> tags;
383     if (r == 0) {
384       bufferlist::iterator it = out_bl.begin();
385       r = client::tag_list_finish(&it, &tags);
386     }
387
388     // NOTE: since 0 is treated as an uninitialized list filter, we need to
389     // load to entries and look at the last tid
390     if (r == 0 && !tags.empty() && tags.rbegin()->tid > tag_tid) {
391       r = -ESTALE;
392     }
393     complete(r);
394   }
395
396   void finish(int r) override {
397     on_finish->complete(r);
398   }
399 };
400
401 } // anonymous namespace
402
403 JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer,
404                                  Mutex *timer_lock, librados::IoCtx &ioctx,
405                                  const std::string &oid,
406                                  const std::string &client_id,
407                                  const Settings &settings)
408     : RefCountedObject(NULL, 0), m_cct(NULL), m_oid(oid),
409       m_client_id(client_id), m_settings(settings), m_order(0),
410       m_splay_width(0), m_pool_id(-1), m_initialized(false),
411       m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
412       m_lock("JournalMetadata::m_lock"), m_commit_tid(0), m_watch_ctx(this),
413       m_watch_handle(0), m_minimum_set(0), m_active_set(0),
414       m_update_notifications(0), m_commit_position_ctx(NULL),
415       m_commit_position_task_ctx(NULL) {
416   m_ioctx.dup(ioctx);
417   m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
418 }
419
420 JournalMetadata::~JournalMetadata() {
421   Mutex::Locker locker(m_lock);
422   assert(!m_initialized);
423 }
424
425 void JournalMetadata::init(Context *on_finish) {
426   {
427     Mutex::Locker locker(m_lock);
428     assert(!m_initialized);
429     m_initialized = true;
430   }
431
432   // chain the init sequence (reverse order)
433   on_finish = utils::create_async_context_callback(
434     this, on_finish);
435   on_finish = new C_ImmutableMetadata(this, on_finish);
436   on_finish = new FunctionContext([this, on_finish](int r) {
437       if (r < 0) {
438         lderr(m_cct) << __func__ << ": failed to watch journal"
439                      << cpp_strerror(r) << dendl;
440         Mutex::Locker locker(m_lock);
441         m_watch_handle = 0;
442         on_finish->complete(r);
443         return;
444       }
445
446       get_immutable_metadata(&m_order, &m_splay_width, &m_pool_id, on_finish);
447     });
448
449   librados::AioCompletion *comp = librados::Rados::aio_create_completion(
450     on_finish, nullptr, utils::rados_ctx_callback);
451   int r = m_ioctx.aio_watch(m_oid, comp, &m_watch_handle, &m_watch_ctx);
452   assert(r == 0);
453   comp->release();
454 }
455
456 void JournalMetadata::shut_down(Context *on_finish) {
457
458   ldout(m_cct, 20) << __func__ << dendl;
459
460   uint64_t watch_handle = 0;
461   {
462     Mutex::Locker locker(m_lock);
463     m_initialized = false;
464     std::swap(watch_handle, m_watch_handle);
465   }
466
467   // chain the shut down sequence (reverse order)
468   on_finish = utils::create_async_context_callback(
469     this, on_finish);
470   on_finish = new FunctionContext([this, on_finish](int r) {
471       ldout(m_cct, 20) << "shut_down: waiting for ops" << dendl;
472       m_async_op_tracker.wait_for_ops(on_finish);
473     });
474   on_finish = new FunctionContext([this, on_finish](int r) {
475       ldout(m_cct, 20) << "shut_down: flushing watch" << dendl;
476       librados::Rados rados(m_ioctx);
477       librados::AioCompletion *comp = librados::Rados::aio_create_completion(
478         on_finish, nullptr, utils::rados_ctx_callback);
479       r = rados.aio_watch_flush(comp);
480       assert(r == 0);
481       comp->release();
482     });
483   on_finish = new FunctionContext([this, on_finish](int r) {
484       flush_commit_position(on_finish);
485     });
486   if (watch_handle != 0) {
487     librados::AioCompletion *comp = librados::Rados::aio_create_completion(
488       on_finish, nullptr, utils::rados_ctx_callback);
489     int r = m_ioctx.aio_unwatch(watch_handle, comp);
490     assert(r == 0);
491     comp->release();
492   } else {
493     on_finish->complete(0);
494   }
495 }
496
497 void JournalMetadata::get_immutable_metadata(uint8_t *order,
498                                              uint8_t *splay_width,
499                                              int64_t *pool_id,
500                                              Context *on_finish) {
501   client::get_immutable_metadata(m_ioctx, m_oid, order, splay_width, pool_id,
502                                  on_finish);
503 }
504
505 void JournalMetadata::get_mutable_metadata(uint64_t *minimum_set,
506                                            uint64_t *active_set,
507                                            RegisteredClients *clients,
508                                            Context *on_finish) {
509   client::get_mutable_metadata(m_ioctx, m_oid, minimum_set, active_set, clients,
510                                on_finish);
511 }
512
513 void JournalMetadata::register_client(const bufferlist &data,
514                                       Context *on_finish) {
515   ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
516   librados::ObjectWriteOperation op;
517   client::client_register(&op, m_client_id, data);
518
519   C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
520
521   librados::AioCompletion *comp =
522     librados::Rados::aio_create_completion(ctx, NULL,
523                                            utils::rados_ctx_callback);
524   int r = m_ioctx.aio_operate(m_oid, comp, &op);
525   assert(r == 0);
526   comp->release();
527 }
528
529 void JournalMetadata::update_client(const bufferlist &data,
530                                     Context *on_finish) {
531   ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
532   librados::ObjectWriteOperation op;
533   client::client_update_data(&op, m_client_id, data);
534
535   C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
536
537   librados::AioCompletion *comp =
538     librados::Rados::aio_create_completion(ctx, NULL,
539                                            utils::rados_ctx_callback);
540   int r = m_ioctx.aio_operate(m_oid, comp, &op);
541   assert(r == 0);
542   comp->release();
543 }
544
545 void JournalMetadata::unregister_client(Context *on_finish) {
546   assert(!m_client_id.empty());
547
548   ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
549   librados::ObjectWriteOperation op;
550   client::client_unregister(&op, m_client_id);
551
552   C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
553
554   librados::AioCompletion *comp =
555     librados::Rados::aio_create_completion(ctx, NULL,
556                                            utils::rados_ctx_callback);
557   int r = m_ioctx.aio_operate(m_oid, comp, &op);
558   assert(r == 0);
559   comp->release();
560 }
561
562 void JournalMetadata::allocate_tag(uint64_t tag_class, const bufferlist &data,
563                                    Tag *tag, Context *on_finish) {
564   on_finish = new C_NotifyUpdate(this, on_finish);
565   C_AllocateTag *ctx = new C_AllocateTag(m_cct, m_ioctx, m_oid,
566                                          m_async_op_tracker, tag_class,
567                                          data, tag, on_finish);
568   ctx->send();
569 }
570
571 void JournalMetadata::get_client(const std::string &client_id,
572                                  cls::journal::Client *client,
573                                  Context *on_finish) {
574   C_GetClient *ctx = new C_GetClient(m_cct, m_ioctx, m_oid, m_async_op_tracker,
575                                      client_id, client, on_finish);
576   ctx->send();
577 }
578
579 void JournalMetadata::get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish) {
580   C_GetTag *ctx = new C_GetTag(m_cct, m_ioctx, m_oid, m_async_op_tracker,
581                                tag_tid, tag, on_finish);
582   ctx->send();
583 }
584
585 void JournalMetadata::get_tags(uint64_t start_after_tag_tid,
586                                const boost::optional<uint64_t> &tag_class,
587                                Tags *tags, Context *on_finish) {
588   C_GetTags *ctx = new C_GetTags(m_cct, m_ioctx, m_oid, m_client_id,
589                                  m_async_op_tracker, start_after_tag_tid,
590                                  tag_class, tags, on_finish);
591   ctx->send();
592 }
593
594 void JournalMetadata::add_listener(JournalMetadataListener *listener) {
595   Mutex::Locker locker(m_lock);
596   while (m_update_notifications > 0) {
597     m_update_cond.Wait(m_lock);
598   }
599   m_listeners.push_back(listener);
600 }
601
602 void JournalMetadata::remove_listener(JournalMetadataListener *listener) {
603   Mutex::Locker locker(m_lock);
604   while (m_update_notifications > 0) {
605     m_update_cond.Wait(m_lock);
606   }
607   m_listeners.remove(listener);
608 }
609
610 void JournalMetadata::set_minimum_set(uint64_t object_set) {
611   Mutex::Locker locker(m_lock);
612
613   ldout(m_cct, 20) << __func__ << ": current=" << m_minimum_set
614                    << ", new=" << object_set << dendl;
615   if (m_minimum_set >= object_set) {
616     return;
617   }
618
619   librados::ObjectWriteOperation op;
620   client::set_minimum_set(&op, object_set);
621
622   C_NotifyUpdate *ctx = new C_NotifyUpdate(this);
623   librados::AioCompletion *comp =
624     librados::Rados::aio_create_completion(ctx, NULL,
625                                            utils::rados_ctx_callback);
626   int r = m_ioctx.aio_operate(m_oid, comp, &op);
627   assert(r == 0);
628   comp->release();
629
630   m_minimum_set = object_set;
631 }
632
633 int JournalMetadata::set_active_set(uint64_t object_set) {
634   C_SaferCond ctx;
635   set_active_set(object_set, &ctx);
636   return ctx.wait();
637 }
638
639 void JournalMetadata::set_active_set(uint64_t object_set, Context *on_finish) {
640   Mutex::Locker locker(m_lock);
641
642   ldout(m_cct, 20) << __func__ << ": current=" << m_active_set
643                    << ", new=" << object_set << dendl;
644   if (m_active_set >= object_set) {
645     m_work_queue->queue(on_finish, 0);
646     return;
647   }
648
649   librados::ObjectWriteOperation op;
650   client::set_active_set(&op, object_set);
651
652   C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
653   librados::AioCompletion *comp =
654     librados::Rados::aio_create_completion(ctx, NULL,
655                                            utils::rados_ctx_callback);
656   int r = m_ioctx.aio_operate(m_oid, comp, &op);
657   assert(r == 0);
658   comp->release();
659
660   m_active_set = object_set;
661 }
662
663 void JournalMetadata::assert_active_tag(uint64_t tag_tid, Context *on_finish) {
664   Mutex::Locker locker(m_lock);
665
666   C_AssertActiveTag *ctx = new C_AssertActiveTag(m_cct, m_ioctx, m_oid,
667                                                  m_async_op_tracker,
668                                                  m_client_id, tag_tid,
669                                                  on_finish);
670   ctx->send();
671 }
672
673 void JournalMetadata::flush_commit_position() {
674   ldout(m_cct, 20) << __func__ << dendl;
675
676   Mutex::Locker timer_locker(*m_timer_lock);
677   Mutex::Locker locker(m_lock);
678   if (m_commit_position_ctx == nullptr) {
679     return;
680   }
681
682   cancel_commit_task();
683   handle_commit_position_task();
684 }
685
686 void JournalMetadata::flush_commit_position(Context *on_safe) {
687   ldout(m_cct, 20) << __func__ << dendl;
688
689   Mutex::Locker timer_locker(*m_timer_lock);
690   Mutex::Locker locker(m_lock);
691   if (m_commit_position_ctx == nullptr) {
692     // nothing to flush
693     if (on_safe != nullptr) {
694       m_work_queue->queue(on_safe, 0);
695     }
696     return;
697   }
698
699   if (on_safe != nullptr) {
700     m_commit_position_ctx = new C_FlushCommitPosition(
701       m_commit_position_ctx, on_safe);
702   }
703   cancel_commit_task();
704   handle_commit_position_task();
705 }
706
707 void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) {
708   Mutex::Locker locker(m_lock);
709   uint64_t &allocated_entry_tid = m_allocated_entry_tids[tag_tid];
710   if (allocated_entry_tid <= entry_tid) {
711     allocated_entry_tid = entry_tid + 1;
712   }
713 }
714
715 bool JournalMetadata::get_last_allocated_entry_tid(uint64_t tag_tid,
716                                                    uint64_t *entry_tid) const {
717   Mutex::Locker locker(m_lock);
718
719   AllocatedEntryTids::const_iterator it = m_allocated_entry_tids.find(tag_tid);
720   if (it == m_allocated_entry_tids.end()) {
721     return false;
722   }
723
724   assert(it->second > 0);
725   *entry_tid = it->second - 1;
726   return true;
727 }
728
729 void JournalMetadata::handle_immutable_metadata(int r, Context *on_init) {
730   if (r < 0) {
731     lderr(m_cct) << "failed to initialize immutable metadata: "
732                  << cpp_strerror(r) << dendl;
733     on_init->complete(r);
734     return;
735   }
736
737   ldout(m_cct, 10) << "initialized immutable metadata" << dendl;
738   refresh(on_init);
739 }
740
741 void JournalMetadata::refresh(Context *on_complete) {
742   ldout(m_cct, 10) << "refreshing mutable metadata" << dendl;
743   C_Refresh *refresh = new C_Refresh(this, on_complete);
744   get_mutable_metadata(&refresh->minimum_set, &refresh->active_set,
745                        &refresh->registered_clients, refresh);
746 }
747
748 void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
749   ldout(m_cct, 10) << "refreshed mutable metadata: r=" << r << dendl;
750   if (r == 0) {
751     Mutex::Locker locker(m_lock);
752
753     Client client(m_client_id, bufferlist());
754     RegisteredClients::iterator it = refresh->registered_clients.find(client);
755     if (it != refresh->registered_clients.end()) {
756       if (it->state == cls::journal::CLIENT_STATE_DISCONNECTED) {
757         ldout(m_cct, 0) << "client flagged disconnected: " << m_client_id
758                         << dendl;
759       }
760       m_minimum_set = MAX(m_minimum_set, refresh->minimum_set);
761       m_active_set = MAX(m_active_set, refresh->active_set);
762       m_registered_clients = refresh->registered_clients;
763       m_client = *it;
764
765       ++m_update_notifications;
766       m_lock.Unlock();
767       for (Listeners::iterator it = m_listeners.begin();
768            it != m_listeners.end(); ++it) {
769         (*it)->handle_update(this);
770       }
771       m_lock.Lock();
772       if (--m_update_notifications == 0) {
773         m_update_cond.Signal();
774       }
775     } else {
776       lderr(m_cct) << "failed to locate client: " << m_client_id << dendl;
777       r = -ENOENT;
778     }
779   }
780
781   if (refresh->on_finish != NULL) {
782     refresh->on_finish->complete(r);
783   }
784 }
785
786 void JournalMetadata::cancel_commit_task() {
787   ldout(m_cct, 20) << __func__ << dendl;
788
789   assert(m_timer_lock->is_locked());
790   assert(m_lock.is_locked());
791   assert(m_commit_position_ctx != nullptr);
792   assert(m_commit_position_task_ctx != nullptr);
793
794   m_timer->cancel_event(m_commit_position_task_ctx);
795   m_commit_position_task_ctx = NULL;
796 }
797
798 void JournalMetadata::schedule_commit_task() {
799   ldout(m_cct, 20) << __func__ << dendl;
800
801   assert(m_timer_lock->is_locked());
802   assert(m_lock.is_locked());
803   assert(m_commit_position_ctx != nullptr);
804   if (m_commit_position_task_ctx == NULL) {
805     m_commit_position_task_ctx =
806       m_timer->add_event_after(m_settings.commit_interval,
807                                new C_CommitPositionTask(this));
808   }
809 }
810
811 void JournalMetadata::handle_commit_position_task() {
812   assert(m_timer_lock->is_locked());
813   assert(m_lock.is_locked());
814   ldout(m_cct, 20) << __func__ << ": "
815                    << "client_id=" << m_client_id << ", "
816                    << "commit_position=" << m_commit_position << dendl;
817
818   librados::ObjectWriteOperation op;
819   client::client_commit(&op, m_client_id, m_commit_position);
820
821   Context *ctx = new C_NotifyUpdate(this, m_commit_position_ctx);
822   m_commit_position_ctx = NULL;
823
824   ctx = schedule_laggy_clients_disconnect(ctx);
825
826   librados::AioCompletion *comp =
827     librados::Rados::aio_create_completion(ctx, NULL,
828                                            utils::rados_ctx_callback);
829   int r = m_ioctx.aio_operate(m_oid, comp, &op);
830   assert(r == 0);
831   comp->release();
832
833   m_commit_position_task_ctx = NULL;
834 }
835
836 void JournalMetadata::schedule_watch_reset() {
837   assert(m_timer_lock->is_locked());
838   m_timer->add_event_after(1, new C_WatchReset(this));
839 }
840
841 void JournalMetadata::handle_watch_reset() {
842   assert(m_timer_lock->is_locked());
843   if (!m_initialized) {
844     return;
845   }
846
847   int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
848   if (r < 0) {
849     if (r == -ENOENT) {
850       ldout(m_cct, 5) << __func__ << ": journal header not found" << dendl;
851     } else if (r == -EBLACKLISTED) {
852       ldout(m_cct, 5) << __func__ << ": client blacklisted" << dendl;
853     } else {
854       lderr(m_cct) << __func__ << ": failed to watch journal: "
855                    << cpp_strerror(r) << dendl;
856     }
857     schedule_watch_reset();
858   } else {
859     ldout(m_cct, 10) << __func__ << ": reset journal watch" << dendl;
860     refresh(NULL);
861   }
862 }
863
864 void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) {
865   ldout(m_cct, 10) << "journal header updated" << dendl;
866
867   bufferlist bl;
868   m_ioctx.notify_ack(m_oid, notify_id, cookie, bl);
869
870   refresh(NULL);
871 }
872
873 void JournalMetadata::handle_watch_error(int err) {
874   if (err == -ENOTCONN) {
875     ldout(m_cct, 5) << "journal watch error: header removed" << dendl;
876   } else if (err == -EBLACKLISTED) {
877     lderr(m_cct) << "journal watch error: client blacklisted" << dendl;
878   } else {
879     lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl;
880   }
881
882   Mutex::Locker timer_locker(*m_timer_lock);
883   Mutex::Locker locker(m_lock);
884
885   // release old watch on error
886   if (m_watch_handle != 0) {
887     m_ioctx.unwatch2(m_watch_handle);
888     m_watch_handle = 0;
889   }
890
891   if (m_initialized && err != -ENOENT) {
892     schedule_watch_reset();
893   }
894 }
895
896 uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num,
897                                               uint64_t tag_tid,
898                                               uint64_t entry_tid) {
899   Mutex::Locker locker(m_lock);
900   uint64_t commit_tid = ++m_commit_tid;
901   m_pending_commit_tids[commit_tid] = CommitEntry(object_num, tag_tid,
902                                                   entry_tid);
903
904   ldout(m_cct, 20) << "allocated commit tid: commit_tid=" << commit_tid << " ["
905                    << "object_num=" << object_num << ", "
906                    << "tag_tid=" << tag_tid << ", "
907                    << "entry_tid=" << entry_tid << "]"
908                    << dendl;
909   return commit_tid;
910 }
911
912 void JournalMetadata::overflow_commit_tid(uint64_t commit_tid,
913                                           uint64_t object_num) {
914   Mutex::Locker locker(m_lock);
915
916   auto it = m_pending_commit_tids.find(commit_tid);
917   assert(it != m_pending_commit_tids.end());
918   assert(it->second.object_num < object_num);
919
920   ldout(m_cct, 20) << __func__ << ": "
921                    << "commit_tid=" << commit_tid << ", "
922                    << "old_object_num=" << it->second.object_num << ", "
923                    << "new_object_num=" << object_num << dendl;
924   it->second.object_num = object_num;
925 }
926
927 void JournalMetadata::get_commit_entry(uint64_t commit_tid,
928                                        uint64_t *object_num,
929                                        uint64_t *tag_tid, uint64_t *entry_tid) {
930   Mutex::Locker locker(m_lock);
931
932   auto it = m_pending_commit_tids.find(commit_tid);
933   assert(it != m_pending_commit_tids.end());
934
935   *object_num = it->second.object_num;
936   *tag_tid = it->second.tag_tid;
937   *entry_tid = it->second.entry_tid;
938 }
939
940 void JournalMetadata::committed(uint64_t commit_tid,
941                                 const CreateContext &create_context) {
942   ldout(m_cct, 20) << "committed tid=" << commit_tid << dendl;
943
944   ObjectSetPosition commit_position;
945   Context *stale_ctx = nullptr;
946   {
947     Mutex::Locker timer_locker(*m_timer_lock);
948     Mutex::Locker locker(m_lock);
949     assert(commit_tid > m_commit_position_tid);
950
951     if (!m_commit_position.object_positions.empty()) {
952       // in-flight commit position update
953       commit_position = m_commit_position;
954     } else {
955       // safe commit position
956       commit_position = m_client.commit_position;
957     }
958
959     CommitTids::iterator it = m_pending_commit_tids.find(commit_tid);
960     assert(it != m_pending_commit_tids.end());
961
962     CommitEntry &commit_entry = it->second;
963     commit_entry.committed = true;
964
965     bool update_commit_position = false;
966     while (!m_pending_commit_tids.empty()) {
967       CommitTids::iterator it = m_pending_commit_tids.begin();
968       CommitEntry &commit_entry = it->second;
969       if (!commit_entry.committed) {
970         break;
971       }
972
973       commit_position.object_positions.emplace_front(
974         commit_entry.object_num, commit_entry.tag_tid,
975         commit_entry.entry_tid);
976       m_pending_commit_tids.erase(it);
977       update_commit_position = true;
978     }
979
980     if (!update_commit_position) {
981       return;
982     }
983
984     // prune the position to have one position per splay offset
985     std::set<uint8_t> in_use_splay_offsets;
986     ObjectPositions::iterator ob_it = commit_position.object_positions.begin();
987     while (ob_it != commit_position.object_positions.end()) {
988       uint8_t splay_offset = ob_it->object_number % m_splay_width;
989       if (!in_use_splay_offsets.insert(splay_offset).second) {
990         ob_it = commit_position.object_positions.erase(ob_it);
991       } else {
992         ++ob_it;
993       }
994     }
995
996     stale_ctx = m_commit_position_ctx;
997     m_commit_position_ctx = create_context();
998     m_commit_position = commit_position;
999     m_commit_position_tid = commit_tid;
1000
1001     ldout(m_cct, 20) << "updated commit position: " << commit_position << ", "
1002                      << "on_safe=" << m_commit_position_ctx << dendl;
1003     schedule_commit_task();
1004   }
1005
1006
1007   if (stale_ctx != nullptr) {
1008     ldout(m_cct, 20) << "canceling stale commit: on_safe=" << stale_ctx
1009                      << dendl;
1010     stale_ctx->complete(-ESTALE);
1011   }
1012 }
1013
1014 void JournalMetadata::notify_update() {
1015   ldout(m_cct, 10) << "notifying journal header update" << dendl;
1016
1017   bufferlist bl;
1018   m_ioctx.notify2(m_oid, bl, 5000, NULL);
1019 }
1020
1021 void JournalMetadata::async_notify_update(Context *on_safe) {
1022   ldout(m_cct, 10) << "async notifying journal header update" << dendl;
1023
1024   C_AioNotify *ctx = new C_AioNotify(this, on_safe);
1025   librados::AioCompletion *comp =
1026     librados::Rados::aio_create_completion(ctx, NULL,
1027                                            utils::rados_ctx_callback);
1028
1029   bufferlist bl;
1030   int r = m_ioctx.aio_notify(m_oid, comp, bl, 5000, NULL);
1031   assert(r == 0);
1032
1033   comp->release();
1034 }
1035
1036 void JournalMetadata::wait_for_ops() {
1037   C_SaferCond ctx;
1038   m_async_op_tracker.wait_for_ops(&ctx);
1039   ctx.wait();
1040 }
1041
1042 void JournalMetadata::handle_notified(int r) {
1043   ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl;
1044 }
1045
1046 Context *JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) {
1047   assert(m_lock.is_locked());
1048
1049   ldout(m_cct, 20) << __func__ << dendl;
1050
1051   if (m_settings.max_concurrent_object_sets <= 0) {
1052     return on_finish;
1053   }
1054
1055   Context *ctx = on_finish;
1056
1057   for (auto &c : m_registered_clients) {
1058     if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
1059         c.id == m_client_id ||
1060         m_settings.whitelisted_laggy_clients.count(c.id) > 0) {
1061       continue;
1062     }
1063     const std::string &client_id = c.id;
1064     uint64_t object_set = 0;
1065     if (!c.commit_position.object_positions.empty()) {
1066       auto &position = *(c.commit_position.object_positions.begin());
1067       object_set = position.object_number / m_splay_width;
1068     }
1069
1070     if (m_active_set > object_set + m_settings.max_concurrent_object_sets) {
1071       ldout(m_cct, 1) << __func__ << ": " << client_id
1072                       << ": scheduling disconnect" << dendl;
1073
1074       ctx = new FunctionContext([this, client_id, ctx](int r1) {
1075           ldout(m_cct, 10) << __func__ << ": " << client_id
1076                            << ": flagging disconnected" << dendl;
1077
1078           librados::ObjectWriteOperation op;
1079           client::client_update_state(&op, client_id,
1080                                       cls::journal::CLIENT_STATE_DISCONNECTED);
1081
1082           librados::AioCompletion *comp =
1083               librados::Rados::aio_create_completion(ctx, nullptr,
1084                                                      utils::rados_ctx_callback);
1085           int r = m_ioctx.aio_operate(m_oid, comp, &op);
1086           assert(r == 0);
1087           comp->release();
1088         });
1089     }
1090   }
1091
1092   if (ctx == on_finish) {
1093     ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl;
1094   }
1095
1096   return ctx;
1097 }
1098
1099 std::ostream &operator<<(std::ostream &os,
1100                          const JournalMetadata::RegisteredClients &clients) {
1101   os << "[";
1102   for (JournalMetadata::RegisteredClients::const_iterator c = clients.begin();
1103        c != clients.end(); ++c) {
1104     os << (c == clients.begin() ? "" : ", " ) << *c;
1105   }
1106   os << "]";
1107   return os;
1108 }
1109
1110 std::ostream &operator<<(std::ostream &os,
1111                          const JournalMetadata &jm) {
1112   Mutex::Locker locker(jm.m_lock);
1113   os << "[oid=" << jm.m_oid << ", "
1114      << "initialized=" << jm.m_initialized << ", "
1115      << "order=" << (int)jm.m_order << ", "
1116      << "splay_width=" << (int)jm.m_splay_width << ", "
1117      << "pool_id=" << jm.m_pool_id << ", "
1118      << "minimum_set=" << jm.m_minimum_set << ", "
1119      << "active_set=" << jm.m_active_set << ", "
1120      << "client_id=" << jm.m_client_id << ", "
1121      << "commit_tid=" << jm.m_commit_tid << ", "
1122      << "commit_interval=" << jm.m_settings.commit_interval << ", "
1123      << "commit_position=" << jm.m_commit_position << ", "
1124      << "registered_clients=" << jm.m_registered_clients << "]";
1125   return os;
1126 }
1127
1128 } // namespace journal