initial code repo
[stor4nfv.git] / src / ceph / src / journal / Journaler.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "journal/Journaler.h"
5 #include "include/stringify.h"
6 #include "common/errno.h"
7 #include "common/Timer.h"
8 #include "common/WorkQueue.h"
9 #include "journal/Entry.h"
10 #include "journal/FutureImpl.h"
11 #include "journal/JournalMetadata.h"
12 #include "journal/JournalPlayer.h"
13 #include "journal/JournalRecorder.h"
14 #include "journal/JournalTrimmer.h"
15 #include "journal/ReplayEntry.h"
16 #include "journal/ReplayHandler.h"
17 #include "cls/journal/cls_journal_client.h"
18 #include "cls/journal/cls_journal_types.h"
19 #include "Utils.h"
20
21 #define dout_subsys ceph_subsys_journaler
22 #undef dout_prefix
23 #define dout_prefix *_dout << "Journaler: " << this << " "
24
25 namespace journal {
26
27 namespace {
28
29 static const std::string JOURNAL_HEADER_PREFIX = "journal.";
30 static const std::string JOURNAL_OBJECT_PREFIX = "journal_data.";
31
32 } // anonymous namespace
33
34 using namespace cls::journal;
35 using utils::rados_ctx_callback;
36
37 std::string Journaler::header_oid(const std::string &journal_id) {
38   return JOURNAL_HEADER_PREFIX + journal_id;
39 }
40
41 std::string Journaler::object_oid_prefix(int pool_id,
42                                          const std::string &journal_id) {
43   return JOURNAL_OBJECT_PREFIX + stringify(pool_id) + "." + journal_id + ".";
44 }
45
46 Journaler::Threads::Threads(CephContext *cct)
47     : timer_lock("Journaler::timer_lock") {
48   thread_pool = new ThreadPool(cct, "Journaler::thread_pool", "tp_journal", 1);
49   thread_pool->start();
50
51   work_queue = new ContextWQ("Journaler::work_queue", 60, thread_pool);
52
53   timer = new SafeTimer(cct, timer_lock, true);
54   timer->init();
55 }
56
57 Journaler::Threads::~Threads() {
58   {
59     Mutex::Locker timer_locker(timer_lock);
60     timer->shutdown();
61   }
62   delete timer;
63
64   work_queue->drain();
65   delete work_queue;
66
67   thread_pool->stop();
68   delete thread_pool;
69 }
70
71 Journaler::Journaler(librados::IoCtx &header_ioctx,
72                      const std::string &journal_id,
73                      const std::string &client_id, const Settings &settings)
74     : m_threads(new Threads(reinterpret_cast<CephContext*>(header_ioctx.cct()))),
75       m_client_id(client_id) {
76   set_up(m_threads->work_queue, m_threads->timer, &m_threads->timer_lock,
77          header_ioctx, journal_id, settings);
78 }
79
80 Journaler::Journaler(ContextWQ *work_queue, SafeTimer *timer,
81                      Mutex *timer_lock, librados::IoCtx &header_ioctx,
82                      const std::string &journal_id,
83                      const std::string &client_id, const Settings &settings)
84     : m_client_id(client_id) {
85   set_up(work_queue, timer, timer_lock, header_ioctx, journal_id,
86          settings);
87 }
88
89 void Journaler::set_up(ContextWQ *work_queue, SafeTimer *timer,
90                        Mutex *timer_lock, librados::IoCtx &header_ioctx,
91                        const std::string &journal_id,
92                        const Settings &settings) {
93   m_header_ioctx.dup(header_ioctx);
94   m_cct = reinterpret_cast<CephContext *>(m_header_ioctx.cct());
95
96   m_header_oid = header_oid(journal_id);
97   m_object_oid_prefix = object_oid_prefix(m_header_ioctx.get_id(), journal_id);
98
99   m_metadata = new JournalMetadata(work_queue, timer, timer_lock,
100                                    m_header_ioctx, m_header_oid, m_client_id,
101                                    settings);
102   m_metadata->get();
103 }
104
105 Journaler::~Journaler() {
106   if (m_metadata != nullptr) {
107     assert(!m_metadata->is_initialized());
108     if (!m_initialized) {
109       // never initialized -- ensure any in-flight ops are complete
110       // since we wouldn't expect shut_down to be invoked
111       m_metadata->wait_for_ops();
112     }
113     m_metadata->put();
114     m_metadata = nullptr;
115   }
116   assert(m_trimmer == nullptr);
117   assert(m_player == nullptr);
118   assert(m_recorder == nullptr);
119
120   delete m_threads;
121 }
122
123 void Journaler::exists(Context *on_finish) const {
124   librados::ObjectReadOperation op;
125   op.stat(NULL, NULL, NULL);
126
127   librados::AioCompletion *comp =
128     librados::Rados::aio_create_completion(on_finish, nullptr, rados_ctx_callback);
129   int r = m_header_ioctx.aio_operate(m_header_oid, comp, &op, NULL);
130   assert(r == 0);
131   comp->release();
132 }
133
134 void Journaler::init(Context *on_init) {
135   m_initialized = true;
136   m_metadata->init(new C_InitJournaler(this, on_init));
137 }
138
139 int Journaler::init_complete() {
140   int64_t pool_id = m_metadata->get_pool_id();
141
142   if (pool_id < 0 || pool_id == m_header_ioctx.get_id()) {
143     ldout(m_cct, 20) << "using image pool for journal data" << dendl;
144     m_data_ioctx.dup(m_header_ioctx);
145   } else {
146     ldout(m_cct, 20) << "using pool id=" << pool_id << " for journal data"
147                      << dendl;
148     librados::Rados rados(m_header_ioctx);
149     int r = rados.ioctx_create2(pool_id, m_data_ioctx);
150     if (r < 0) {
151       if (r == -ENOENT) {
152         ldout(m_cct, 1) << "pool id=" << pool_id << " no longer exists"
153                         << dendl;
154       }
155       return r;
156     }
157   }
158   m_trimmer = new JournalTrimmer(m_data_ioctx, m_object_oid_prefix,
159                                  m_metadata);
160   return 0;
161 }
162
163 void Journaler::shut_down() {
164   C_SaferCond ctx;
165   shut_down(&ctx);
166   ctx.wait();
167 }
168
169 void Journaler::shut_down(Context *on_finish) {
170   assert(m_player == nullptr);
171   assert(m_recorder == nullptr);
172
173   JournalMetadata *metadata = nullptr;
174   std::swap(metadata, m_metadata);
175   assert(metadata != nullptr);
176
177   on_finish = new FunctionContext([metadata, on_finish](int r) {
178       metadata->put();
179       on_finish->complete(0);
180     });
181
182   JournalTrimmer *trimmer = nullptr;
183   std::swap(trimmer, m_trimmer);
184   if (trimmer == nullptr) {
185     metadata->shut_down(on_finish);
186     return;
187   }
188
189   on_finish = new FunctionContext([trimmer, metadata, on_finish](int r) {
190       delete trimmer;
191       metadata->shut_down(on_finish);
192     });
193   trimmer->shut_down(on_finish);
194 }
195
196 bool Journaler::is_initialized() const {
197   return m_metadata->is_initialized();
198 }
199
200 void Journaler::get_immutable_metadata(uint8_t *order, uint8_t *splay_width,
201                                        int64_t *pool_id, Context *on_finish) {
202   m_metadata->get_immutable_metadata(order, splay_width, pool_id, on_finish);
203 }
204
205 void Journaler::get_mutable_metadata(uint64_t *minimum_set,
206                                      uint64_t *active_set,
207                                      RegisteredClients *clients,
208                                      Context *on_finish) {
209   m_metadata->get_mutable_metadata(minimum_set, active_set, clients, on_finish);
210 }
211
212 void Journaler::create(uint8_t order, uint8_t splay_width,
213                       int64_t pool_id, Context *on_finish) {
214   if (order > 64 || order < 12) {
215     lderr(m_cct) << "order must be in the range [12, 64]" << dendl;
216     on_finish->complete(-EDOM);
217     return;
218   }
219   if (splay_width == 0) {
220     on_finish->complete(-EINVAL);
221     return;
222   }
223
224   ldout(m_cct, 5) << "creating new journal: " << m_header_oid << dendl;
225
226   librados::ObjectWriteOperation op;
227   client::create(&op, order, splay_width, pool_id);
228
229   librados::AioCompletion *comp =
230     librados::Rados::aio_create_completion(on_finish, nullptr, rados_ctx_callback);
231   int r = m_header_ioctx.aio_operate(m_header_oid, comp, &op);
232   assert(r == 0);
233   comp->release();
234 }
235
236 void Journaler::remove(bool force, Context *on_finish) {
237   // chain journal removal (reverse order)
238   on_finish = new FunctionContext([this, on_finish](int r) {
239       librados::AioCompletion *comp = librados::Rados::aio_create_completion(
240         on_finish, nullptr, utils::rados_ctx_callback);
241       r = m_header_ioctx.aio_remove(m_header_oid, comp);
242       assert(r == 0);
243       comp->release();
244     });
245
246   on_finish = new FunctionContext([this, force, on_finish](int r) {
247       m_trimmer->remove_objects(force, on_finish);
248     });
249
250   m_metadata->shut_down(on_finish);
251 }
252
253 void Journaler::flush_commit_position(Context *on_safe) {
254   m_metadata->flush_commit_position(on_safe);
255 }
256
257 void Journaler::add_listener(JournalMetadataListener *listener) {
258   m_metadata->add_listener(listener);
259 }
260
261 void Journaler::remove_listener(JournalMetadataListener *listener) {
262   m_metadata->remove_listener(listener);
263 }
264
265 int Journaler::register_client(const bufferlist &data) {
266   C_SaferCond cond;
267   register_client(data, &cond);
268   return cond.wait();
269 }
270
271 int Journaler::unregister_client() {
272   C_SaferCond cond;
273   unregister_client(&cond);
274   return cond.wait();
275 }
276
277 void Journaler::register_client(const bufferlist &data, Context *on_finish) {
278   return m_metadata->register_client(data, on_finish);
279 }
280
281 void Journaler::update_client(const bufferlist &data, Context *on_finish) {
282   return m_metadata->update_client(data, on_finish);
283 }
284
285 void Journaler::unregister_client(Context *on_finish) {
286   return m_metadata->unregister_client(on_finish);
287 }
288
289 void Journaler::get_client(const std::string &client_id,
290                            cls::journal::Client *client,
291                            Context *on_finish) {
292   m_metadata->get_client(client_id, client, on_finish);
293 }
294
295 int Journaler::get_cached_client(const std::string &client_id,
296                                  cls::journal::Client *client) {
297   RegisteredClients clients;
298   m_metadata->get_registered_clients(&clients);
299
300   auto it = clients.find({client_id, {}});
301   if (it == clients.end()) {
302     return -ENOENT;
303   }
304
305   *client = *it;
306   return 0;
307 }
308
309 void Journaler::allocate_tag(const bufferlist &data, cls::journal::Tag *tag,
310                              Context *on_finish) {
311   m_metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, data, tag,
312                            on_finish);
313 }
314
315 void Journaler::allocate_tag(uint64_t tag_class, const bufferlist &data,
316                              cls::journal::Tag *tag, Context *on_finish) {
317   m_metadata->allocate_tag(tag_class, data, tag, on_finish);
318 }
319
320 void Journaler::get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish) {
321   m_metadata->get_tag(tag_tid, tag, on_finish);
322 }
323
324 void Journaler::get_tags(uint64_t tag_class, Tags *tags, Context *on_finish) {
325   m_metadata->get_tags(0, tag_class, tags, on_finish);
326 }
327
328 void Journaler::get_tags(uint64_t start_after_tag_tid, uint64_t tag_class,
329                          Tags *tags, Context *on_finish) {
330   m_metadata->get_tags(start_after_tag_tid, tag_class, tags, on_finish);
331 }
332
333 void Journaler::start_replay(ReplayHandler *replay_handler) {
334   create_player(replay_handler);
335   m_player->prefetch();
336 }
337
338 void Journaler::start_live_replay(ReplayHandler *replay_handler,
339                                   double interval) {
340   create_player(replay_handler);
341   m_player->prefetch_and_watch(interval);
342 }
343
344 bool Journaler::try_pop_front(ReplayEntry *replay_entry,
345                               uint64_t *tag_tid) {
346   assert(m_player != NULL);
347
348   Entry entry;
349   uint64_t commit_tid;
350   if (!m_player->try_pop_front(&entry, &commit_tid)) {
351     return false;
352   }
353
354   *replay_entry = ReplayEntry(entry.get_data(), commit_tid);
355   if (tag_tid != nullptr) {
356     *tag_tid = entry.get_tag_tid();
357   }
358   return true;
359 }
360
361 void Journaler::stop_replay() {
362   C_SaferCond ctx;
363   stop_replay(&ctx);
364   ctx.wait();
365 }
366
367 void Journaler::stop_replay(Context *on_finish) {
368   JournalPlayer *player = nullptr;
369   std::swap(player, m_player);
370   assert(player != nullptr);
371
372   on_finish = new FunctionContext([player, on_finish](int r) {
373       delete player;
374       on_finish->complete(r);
375     });
376   player->shut_down(on_finish);
377 }
378
379 void Journaler::committed(const ReplayEntry &replay_entry) {
380   m_trimmer->committed(replay_entry.get_commit_tid());
381 }
382
383 void Journaler::committed(const Future &future) {
384   FutureImplPtr future_impl = future.get_future_impl();
385   m_trimmer->committed(future_impl->get_commit_tid());
386 }
387
388 void Journaler::start_append(int flush_interval, uint64_t flush_bytes,
389                              double flush_age) {
390   assert(m_recorder == NULL);
391
392   // TODO verify active object set >= current replay object set
393
394   m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix,
395                                    m_metadata, flush_interval, flush_bytes,
396                                    flush_age);
397 }
398
399 void Journaler::stop_append(Context *on_safe) {
400   JournalRecorder *recorder = nullptr;
401   std::swap(recorder, m_recorder);
402   assert(recorder != nullptr);
403
404   on_safe = new FunctionContext([recorder, on_safe](int r) {
405       delete recorder;
406       on_safe->complete(r);
407     });
408   recorder->flush(on_safe);
409 }
410
411 uint64_t Journaler::get_max_append_size() const {
412   uint64_t max_payload_size = m_metadata->get_object_size() -
413                               Entry::get_fixed_size();
414   if (m_metadata->get_settings().max_payload_bytes > 0) {
415     max_payload_size = MIN(max_payload_size,
416                            m_metadata->get_settings().max_payload_bytes);
417   }
418   return max_payload_size;
419 }
420
421 Future Journaler::append(uint64_t tag_tid, const bufferlist &payload_bl) {
422   return m_recorder->append(tag_tid, payload_bl);
423 }
424
425 void Journaler::flush_append(Context *on_safe) {
426   m_recorder->flush(on_safe);
427 }
428
429 void Journaler::create_player(ReplayHandler *replay_handler) {
430   assert(m_player == NULL);
431   m_player = new JournalPlayer(m_data_ioctx, m_object_oid_prefix, m_metadata,
432                                replay_handler);
433 }
434
435 void Journaler::get_metadata(uint8_t *order, uint8_t *splay_width,
436                              int64_t *pool_id) {
437   assert(m_metadata != NULL);
438
439   *order = m_metadata->get_order();
440   *splay_width = m_metadata->get_splay_width();
441   *pool_id = m_metadata->get_pool_id();
442 }
443
444 std::ostream &operator<<(std::ostream &os,
445                          const Journaler &journaler) {
446   os << "[metadata=";
447   if (journaler.m_metadata != NULL) {
448     os << *journaler.m_metadata;
449   } else {
450     os << "NULL";
451   }
452   os << "]";
453   return os;
454 }
455
456 } // namespace journal