Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / journal / JournalPlayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "journal/JournalPlayer.h"
5 #include "journal/Entry.h"
6 #include "journal/ReplayHandler.h"
7 #include "journal/Utils.h"
8
9 #define dout_subsys ceph_subsys_journaler
10 #undef dout_prefix
11 #define dout_prefix *_dout << "JournalPlayer: " << this << " "
12
13 namespace journal {
14
15 namespace {
16
17 struct C_HandleComplete : public Context {
18   ReplayHandler *replay_handler;
19
20   explicit C_HandleComplete(ReplayHandler *_replay_handler)
21     : replay_handler(_replay_handler) {
22     replay_handler->get();
23   }
24   ~C_HandleComplete() override {
25     replay_handler->put();
26   }
27   void finish(int r) override {
28     replay_handler->handle_complete(r);
29   }
30 };
31
32 struct C_HandleEntriesAvailable : public Context {
33   ReplayHandler *replay_handler;
34
35   explicit C_HandleEntriesAvailable(ReplayHandler *_replay_handler)
36       : replay_handler(_replay_handler) {
37     replay_handler->get();
38   }
39   ~C_HandleEntriesAvailable() override {
40     replay_handler->put();
41   }
42   void finish(int r) override {
43     replay_handler->handle_entries_available();
44   }
45 };
46
47 } // anonymous namespace
48
49 JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
50                              const std::string &object_oid_prefix,
51                              const JournalMetadataPtr& journal_metadata,
52                              ReplayHandler *replay_handler)
53   : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
54     m_journal_metadata(journal_metadata), m_replay_handler(replay_handler),
55     m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT), m_splay_offset(0),
56     m_watch_enabled(false), m_watch_scheduled(false), m_watch_interval(0) {
57   m_replay_handler->get();
58   m_ioctx.dup(ioctx);
59   m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
60
61   ObjectSetPosition commit_position;
62   m_journal_metadata->get_commit_position(&commit_position);
63
64   if (!commit_position.object_positions.empty()) {
65     ldout(m_cct, 5) << "commit position: " << commit_position << dendl;
66
67     // start replay after the last committed entry's object
68     uint8_t splay_width = m_journal_metadata->get_splay_width();
69     auto &active_position = commit_position.object_positions.front();
70     m_active_tag_tid = active_position.tag_tid;
71     m_commit_position_valid = true;
72     m_commit_position = active_position;
73     m_splay_offset = active_position.object_number % splay_width;
74     for (auto &position : commit_position.object_positions) {
75       uint8_t splay_offset = position.object_number % splay_width;
76       m_commit_positions[splay_offset] = position;
77     }
78   }
79 }
80
81 JournalPlayer::~JournalPlayer() {
82   assert(m_async_op_tracker.empty());
83   {
84     Mutex::Locker locker(m_lock);
85     assert(m_shut_down);
86     assert(m_fetch_object_numbers.empty());
87     assert(!m_watch_scheduled);
88   }
89   m_replay_handler->put();
90 }
91
92 void JournalPlayer::prefetch() {
93   Mutex::Locker locker(m_lock);
94   assert(m_state == STATE_INIT);
95   m_state = STATE_PREFETCH;
96
97   m_active_set = m_journal_metadata->get_active_set();
98   uint8_t splay_width = m_journal_metadata->get_splay_width();
99   for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
100     m_prefetch_splay_offsets.insert(splay_offset);
101   }
102
103   // compute active object for each splay offset (might be before
104   // active set)
105   std::map<uint8_t, uint64_t> splay_offset_to_objects;
106   for (auto &position : m_commit_positions) {
107     assert(splay_offset_to_objects.count(position.first) == 0);
108     splay_offset_to_objects[position.first] = position.second.object_number;
109   }
110
111   // prefetch the active object for each splay offset
112   std::set<uint64_t> prefetch_object_numbers;
113   for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
114     uint64_t object_number = splay_offset;
115     if (splay_offset_to_objects.count(splay_offset) != 0) {
116       object_number = splay_offset_to_objects[splay_offset];
117     }
118
119     prefetch_object_numbers.insert(object_number);
120   }
121
122   ldout(m_cct, 10) << __func__ << ": prefetching "
123                    << prefetch_object_numbers.size() << " " << "objects"
124                    << dendl;
125   for (auto object_number : prefetch_object_numbers) {
126     fetch(object_number);
127   }
128 }
129
130 void JournalPlayer::prefetch_and_watch(double interval) {
131   {
132     Mutex::Locker locker(m_lock);
133     m_watch_enabled = true;
134     m_watch_interval = interval;
135     m_watch_step = WATCH_STEP_FETCH_CURRENT;
136   }
137   prefetch();
138 }
139
140 void JournalPlayer::shut_down(Context *on_finish) {
141   ldout(m_cct, 20) << __func__ << dendl;
142   Mutex::Locker locker(m_lock);
143
144   assert(!m_shut_down);
145   m_shut_down = true;
146   m_watch_enabled = false;
147
148   on_finish = utils::create_async_context_callback(
149       m_journal_metadata, on_finish);
150
151   if (m_watch_scheduled) {
152     ObjectPlayerPtr object_player = get_object_player();
153     switch (m_watch_step) {
154     case WATCH_STEP_FETCH_FIRST:
155       object_player = m_object_players.begin()->second;
156       // fallthrough
157     case WATCH_STEP_FETCH_CURRENT:
158       object_player->unwatch();
159       break;
160     case WATCH_STEP_ASSERT_ACTIVE:
161       break;
162     }
163   }
164
165   m_async_op_tracker.wait_for_ops(on_finish);
166 }
167
168 bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
169   ldout(m_cct, 20) << __func__ << dendl;
170   Mutex::Locker locker(m_lock);
171
172   if (m_state != STATE_PLAYBACK) {
173     m_handler_notified = false;
174     return false;
175   }
176
177   if (!verify_playback_ready()) {
178     if (!is_object_set_ready()) {
179       m_handler_notified = false;
180     } else {
181       refetch(true);
182     }
183     return false;
184   }
185
186   ObjectPlayerPtr object_player = get_object_player();
187   assert(object_player && !object_player->empty());
188
189   object_player->front(entry);
190   object_player->pop_front();
191
192   uint64_t last_entry_tid;
193   if (m_journal_metadata->get_last_allocated_entry_tid(
194         entry->get_tag_tid(), &last_entry_tid) &&
195       entry->get_entry_tid() != last_entry_tid + 1) {
196     lderr(m_cct) << "missing prior journal entry: " << *entry << dendl;
197
198     m_state = STATE_ERROR;
199     notify_complete(-ENOMSG);
200     return false;
201   }
202
203   advance_splay_object();
204   remove_empty_object_player(object_player);
205
206   m_journal_metadata->reserve_entry_tid(entry->get_tag_tid(),
207                                         entry->get_entry_tid());
208   *commit_tid = m_journal_metadata->allocate_commit_tid(
209     object_player->get_object_number(), entry->get_tag_tid(),
210     entry->get_entry_tid());
211   return true;
212 }
213
214 void JournalPlayer::process_state(uint64_t object_number, int r) {
215   ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << ", "
216                    << "r=" << r << dendl;
217
218   assert(m_lock.is_locked());
219   if (r >= 0) {
220     switch (m_state) {
221     case STATE_PREFETCH:
222       ldout(m_cct, 10) << "PREFETCH" << dendl;
223       r = process_prefetch(object_number);
224       break;
225     case STATE_PLAYBACK:
226       ldout(m_cct, 10) << "PLAYBACK" << dendl;
227       r = process_playback(object_number);
228       break;
229     case STATE_ERROR:
230       ldout(m_cct, 10) << "ERROR" << dendl;
231       break;
232     default:
233       lderr(m_cct) << "UNEXPECTED STATE (" << m_state << ")" << dendl;
234       assert(false);
235       break;
236     }
237   }
238
239   if (r < 0) {
240     m_state = STATE_ERROR;
241     notify_complete(r);
242   }
243 }
244
245 int JournalPlayer::process_prefetch(uint64_t object_number) {
246   ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl;
247   assert(m_lock.is_locked());
248
249   uint8_t splay_width = m_journal_metadata->get_splay_width();
250   uint8_t splay_offset = object_number % splay_width;
251
252   PrefetchSplayOffsets::iterator it = m_prefetch_splay_offsets.find(
253     splay_offset);
254   if (it == m_prefetch_splay_offsets.end()) {
255     return 0;
256   }
257
258   bool prefetch_complete = false;
259   assert(m_object_players.count(splay_offset) == 1);
260   ObjectPlayerPtr object_player = m_object_players[splay_offset];
261
262   // prefetch in-order since a newer splay object could prefetch first
263   if (m_fetch_object_numbers.count(object_player->get_object_number()) == 0) {
264     // skip past known committed records
265     if (m_commit_positions.count(splay_offset) != 0 &&
266         !object_player->empty()) {
267       ObjectPosition &position = m_commit_positions[splay_offset];
268
269       ldout(m_cct, 15) << "seeking known commit position " << position << " in "
270                        << object_player->get_oid() << dendl;
271
272       bool found_commit = false;
273       Entry entry;
274       while (!object_player->empty()) {
275         object_player->front(&entry);
276
277         if (entry.get_tag_tid() == position.tag_tid &&
278             entry.get_entry_tid() == position.entry_tid) {
279           found_commit = true;
280         } else if (found_commit) {
281           ldout(m_cct, 10) << "located next uncommitted entry: " << entry
282                            << dendl;
283           break;
284         }
285
286         ldout(m_cct, 20) << "skipping committed entry: " << entry << dendl;
287         m_journal_metadata->reserve_entry_tid(entry.get_tag_tid(),
288                                               entry.get_entry_tid());
289         object_player->pop_front();
290       }
291
292       // do not search for commit position for this object
293       // if we've already seen it
294       if (found_commit) {
295         m_commit_positions.erase(splay_offset);
296       }
297     }
298
299     // if the object is empty, pre-fetch the next splay object
300     if (object_player->empty() && object_player->refetch_required()) {
301       ldout(m_cct, 10) << "refetching potentially partially decoded object"
302                        << dendl;
303       object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE);
304       fetch(object_player);
305     } else if (!remove_empty_object_player(object_player)) {
306       ldout(m_cct, 10) << "prefetch of object complete" << dendl;
307       prefetch_complete = true;
308     }
309   }
310
311   if (!prefetch_complete) {
312     return 0;
313   }
314
315   m_prefetch_splay_offsets.erase(it);
316   if (!m_prefetch_splay_offsets.empty()) {
317     return 0;
318   }
319
320   ldout(m_cct, 10) << "switching to playback mode" << dendl;
321   m_state = STATE_PLAYBACK;
322
323   // if we have a valid commit position, our read should start with
324   // the next consistent journal entry in the sequence
325   if (m_commit_position_valid) {
326     splay_offset = m_commit_position.object_number % splay_width;
327     object_player = m_object_players[splay_offset];
328
329     if (object_player->empty()) {
330       if (!object_player->refetch_required()) {
331         advance_splay_object();
332       }
333     } else {
334       Entry entry;
335       object_player->front(&entry);
336       if (entry.get_tag_tid() == m_commit_position.tag_tid) {
337         advance_splay_object();
338       }
339     }
340   }
341
342   if (verify_playback_ready()) {
343     notify_entries_available();
344   } else if (is_object_set_ready()) {
345     refetch(false);
346   }
347   return 0;
348 }
349
350 int JournalPlayer::process_playback(uint64_t object_number) {
351   ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl;
352   assert(m_lock.is_locked());
353
354   if (verify_playback_ready()) {
355     notify_entries_available();
356   } else if (is_object_set_ready()) {
357     refetch(false);
358   }
359   return 0;
360 }
361
362 bool JournalPlayer::is_object_set_ready() const {
363   assert(m_lock.is_locked());
364   if (m_watch_scheduled || !m_fetch_object_numbers.empty()) {
365     ldout(m_cct, 20) << __func__ << ": waiting for in-flight fetch" << dendl;
366     return false;
367   }
368
369   return true;
370 }
371
372 bool JournalPlayer::verify_playback_ready() {
373   assert(m_lock.is_locked());
374
375   while (true) {
376     if (!is_object_set_ready()) {
377       ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl;
378       return false;
379     }
380
381     ObjectPlayerPtr object_player = get_object_player();
382     assert(object_player);
383     uint64_t object_num = object_player->get_object_number();
384
385     // Verify is the active object player has another entry available
386     // in the sequence
387     // NOTE: replay currently does not check tag class to playback multiple tags
388     // from different classes (issue #14909).  When a new tag is discovered, it
389     // is assumed that the previous tag was closed at the last replayable entry.
390     Entry entry;
391     if (!object_player->empty()) {
392       m_watch_prune_active_tag = false;
393       object_player->front(&entry);
394
395       if (!m_active_tag_tid) {
396         ldout(m_cct, 10) << __func__ << ": "
397                          << "object_num=" << object_num << ", "
398                          << "initial tag=" << entry.get_tag_tid()
399                          << dendl;
400         m_active_tag_tid = entry.get_tag_tid();
401         return true;
402       } else if (entry.get_tag_tid() < *m_active_tag_tid ||
403                  (m_prune_tag_tid && entry.get_tag_tid() <= *m_prune_tag_tid)) {
404         // entry occurred before the current active tag
405         ldout(m_cct, 10) << __func__ << ": detected stale entry: "
406                          << "object_num=" << object_num << ", "
407                          << "entry=" << entry << dendl;
408         prune_tag(entry.get_tag_tid());
409         continue;
410       } else if (entry.get_tag_tid() > *m_active_tag_tid) {
411         // new tag at current playback position -- implies that previous
412         // tag ended abruptly without flushing out all records
413         // search for the start record for the next tag
414         ldout(m_cct, 10) << __func__ << ": new tag detected: "
415                          << "object_num=" << object_num << ", "
416                          << "active_tag=" << *m_active_tag_tid << ", "
417                          << "new_tag=" << entry.get_tag_tid() << dendl;
418         if (entry.get_entry_tid() == 0) {
419           // first entry in new tag -- can promote to active
420           prune_active_tag(entry.get_tag_tid());
421           return true;
422         } else {
423           // prune current active and wait for initial entry for new tag
424           prune_active_tag(boost::none);
425           continue;
426         }
427       } else {
428         ldout(m_cct, 20) << __func__ << ": "
429                          << "object_num=" << object_num << ", "
430                          << "entry: " << entry << dendl;
431         assert(entry.get_tag_tid() == *m_active_tag_tid);
432         return true;
433       }
434     } else {
435       if (!m_active_tag_tid) {
436         // waiting for our first entry
437         ldout(m_cct, 10) << __func__ << ": waiting for first entry: "
438                          << "object_num=" << object_num << dendl;
439         return false;
440       } else if (m_prune_tag_tid && *m_prune_tag_tid == *m_active_tag_tid) {
441         ldout(m_cct, 10) << __func__ << ": no more entries" << dendl;
442         return false;
443       } else if (m_watch_enabled && m_watch_prune_active_tag) {
444         // detected current tag is now longer active and we have re-read the
445         // current object but it's still empty, so this tag is done
446         ldout(m_cct, 10) << __func__ << ": assuming no more in-sequence entries: "
447                          << "object_num=" << object_num << ", "
448                          << "active_tag " << *m_active_tag_tid << dendl;
449         prune_active_tag(boost::none);
450         continue;
451       } else if (object_player->refetch_required()) {
452         // if the active object requires a refetch, don't proceed looking for a
453         // new tag before this process completes
454         ldout(m_cct, 10) << __func__ << ": refetch required: "
455                          << "object_num=" << object_num << dendl;
456         return false;
457       } else if (!m_watch_enabled) {
458         // current playback position is empty so this tag is done
459         ldout(m_cct, 10) << __func__ << ": no more in-sequence entries: "
460                          << "object_num=" << object_num << ", "
461                          << "active_tag=" << *m_active_tag_tid << dendl;
462         prune_active_tag(boost::none);
463         continue;
464       } else if (!m_watch_scheduled) {
465         // no more entries and we don't have an active watch in-progress
466         ldout(m_cct, 10) << __func__ << ": no more entries -- watch required"
467                          << dendl;
468         return false;
469       }
470     }
471   }
472   return false;
473 }
474
475 void JournalPlayer::prune_tag(uint64_t tag_tid) {
476   assert(m_lock.is_locked());
477   ldout(m_cct, 10) << __func__ << ": pruning remaining entries for tag "
478                    << tag_tid << dendl;
479
480   // prune records that are at or below the largest prune tag tid
481   if (!m_prune_tag_tid || *m_prune_tag_tid < tag_tid) {
482     m_prune_tag_tid = tag_tid;
483   }
484
485   bool pruned = false;
486   for (auto &player_pair : m_object_players) {
487     ObjectPlayerPtr object_player(player_pair.second);
488     ldout(m_cct, 15) << __func__ << ": checking " << object_player->get_oid()
489                      << dendl;
490     while (!object_player->empty()) {
491       Entry entry;
492       object_player->front(&entry);
493       if (entry.get_tag_tid() == tag_tid) {
494         ldout(m_cct, 20) << __func__ << ": pruned " << entry << dendl;
495         object_player->pop_front();
496         pruned = true;
497       } else {
498         break;
499       }
500     }
501   }
502
503   // avoid watch delay when pruning stale tags from journal objects
504   if (pruned) {
505     ldout(m_cct, 15) << __func__ << ": reseting refetch state to immediate"
506                      << dendl;
507     for (auto &player_pair : m_object_players) {
508       ObjectPlayerPtr object_player(player_pair.second);
509       object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_IMMEDIATE);
510     }
511   }
512
513   // trim empty player to prefetch the next available object
514   for (auto &player_pair : m_object_players) {
515     remove_empty_object_player(player_pair.second);
516   }
517 }
518
519 void JournalPlayer::prune_active_tag(const boost::optional<uint64_t>& tag_tid) {
520   assert(m_lock.is_locked());
521   assert(m_active_tag_tid);
522
523   uint64_t active_tag_tid = *m_active_tag_tid;
524   if (tag_tid) {
525     m_active_tag_tid = tag_tid;
526   }
527   m_splay_offset = 0;
528   m_watch_step = WATCH_STEP_FETCH_CURRENT;
529
530   prune_tag(active_tag_tid);
531 }
532
533 ObjectPlayerPtr JournalPlayer::get_object_player() const {
534   assert(m_lock.is_locked());
535
536   SplayedObjectPlayers::const_iterator it = m_object_players.find(
537     m_splay_offset);
538   assert(it != m_object_players.end());
539   return it->second;
540 }
541
542 ObjectPlayerPtr JournalPlayer::get_object_player(uint64_t object_number) const {
543   assert(m_lock.is_locked());
544
545   uint8_t splay_width = m_journal_metadata->get_splay_width();
546   uint8_t splay_offset = object_number % splay_width;
547   auto splay_it = m_object_players.find(splay_offset);
548   assert(splay_it != m_object_players.end());
549
550   ObjectPlayerPtr object_player = splay_it->second;
551   assert(object_player->get_object_number() == object_number);
552   return object_player;
553 }
554
555 void JournalPlayer::advance_splay_object() {
556   assert(m_lock.is_locked());
557   ++m_splay_offset;
558   m_splay_offset %= m_journal_metadata->get_splay_width();
559   m_watch_step = WATCH_STEP_FETCH_CURRENT;
560   ldout(m_cct, 20) << __func__ << ": new offset "
561                    << static_cast<uint32_t>(m_splay_offset) << dendl;
562 }
563
564 bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
565   assert(m_lock.is_locked());
566   assert(!m_watch_scheduled);
567
568   uint8_t splay_width = m_journal_metadata->get_splay_width();
569   uint64_t object_set = player->get_object_number() / splay_width;
570   uint64_t active_set = m_journal_metadata->get_active_set();
571   if (!player->empty() || object_set == active_set) {
572     return false;
573   } else if (player->refetch_required()) {
574     ldout(m_cct, 20) << __func__ << ": " << player->get_oid() << " requires "
575                      << "a refetch" << dendl;
576     return false;
577   } else if (m_active_set != active_set) {
578     ldout(m_cct, 20) << __func__ << ": new active set detected, all players "
579                      << "require refetch" << dendl;
580     m_active_set = active_set;
581     for (auto &pair : m_object_players) {
582       pair.second->set_refetch_state(ObjectPlayer::REFETCH_STATE_IMMEDIATE);
583     }
584     return false;
585   }
586
587   ldout(m_cct, 15) << __func__ << ": " << player->get_oid() << " empty"
588                    << dendl;
589
590   m_watch_prune_active_tag = false;
591   m_watch_step = WATCH_STEP_FETCH_CURRENT;
592
593   uint64_t next_object_num = player->get_object_number() + splay_width;
594   fetch(next_object_num);
595   return true;
596 }
597
598 void JournalPlayer::fetch(uint64_t object_num) {
599   assert(m_lock.is_locked());
600
601   ObjectPlayerPtr object_player(new ObjectPlayer(
602     m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(),
603     m_journal_metadata->get_timer_lock(), m_journal_metadata->get_order(),
604     m_journal_metadata->get_settings().max_fetch_bytes));
605
606   uint8_t splay_width = m_journal_metadata->get_splay_width();
607   m_object_players[object_num % splay_width] = object_player;
608   fetch(object_player);
609 }
610
611 void JournalPlayer::fetch(const ObjectPlayerPtr &object_player) {
612   assert(m_lock.is_locked());
613
614   uint64_t object_num = object_player->get_object_number();
615   std::string oid = utils::get_object_name(m_object_oid_prefix, object_num);
616   assert(m_fetch_object_numbers.count(object_num) == 0);
617   m_fetch_object_numbers.insert(object_num);
618
619   ldout(m_cct, 10) << __func__ << ": " << oid << dendl;
620   C_Fetch *fetch_ctx = new C_Fetch(this, object_num);
621
622   object_player->fetch(fetch_ctx);
623 }
624
625 void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
626   ldout(m_cct, 10) << __func__ << ": "
627                    << utils::get_object_name(m_object_oid_prefix, object_num)
628                    << ": r=" << r << dendl;
629
630   Mutex::Locker locker(m_lock);
631   assert(m_fetch_object_numbers.count(object_num) == 1);
632   m_fetch_object_numbers.erase(object_num);
633
634   if (m_shut_down) {
635     return;
636   }
637
638   if (r == 0) {
639     ObjectPlayerPtr object_player = get_object_player(object_num);
640     remove_empty_object_player(object_player);
641   }
642   process_state(object_num, r);
643 }
644
645 void JournalPlayer::refetch(bool immediate) {
646   ldout(m_cct, 10) << __func__ << dendl;
647   assert(m_lock.is_locked());
648   m_handler_notified = false;
649
650   // if watching the object, handle the periodic re-fetch
651   if (m_watch_enabled) {
652     schedule_watch(immediate);
653     return;
654   }
655
656   ObjectPlayerPtr object_player = get_object_player();
657   if (object_player->refetch_required()) {
658     object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE);
659     fetch(object_player);
660     return;
661   }
662
663   notify_complete(0);
664 }
665
666 void JournalPlayer::schedule_watch(bool immediate) {
667   ldout(m_cct, 10) << __func__ << dendl;
668   assert(m_lock.is_locked());
669   if (m_watch_scheduled) {
670     return;
671   }
672
673   m_watch_scheduled = true;
674
675   if (m_watch_step == WATCH_STEP_ASSERT_ACTIVE) {
676     // detect if a new tag has been created in case we are blocked
677     // by an incomplete tag sequence
678     ldout(m_cct, 20) << __func__ << ": asserting active tag="
679                      << *m_active_tag_tid << dendl;
680
681     m_async_op_tracker.start_op();
682     FunctionContext *ctx = new FunctionContext([this](int r) {
683         handle_watch_assert_active(r);
684       });
685     m_journal_metadata->assert_active_tag(*m_active_tag_tid, ctx);
686     return;
687   }
688
689   ObjectPlayerPtr object_player;
690   double watch_interval = m_watch_interval;
691
692   switch (m_watch_step) {
693   case WATCH_STEP_FETCH_CURRENT:
694     {
695       object_player = get_object_player();
696
697       uint8_t splay_width = m_journal_metadata->get_splay_width();
698       uint64_t active_set = m_journal_metadata->get_active_set();
699       uint64_t object_set = object_player->get_object_number() / splay_width;
700       if (immediate ||
701           (object_player->get_refetch_state() ==
702              ObjectPlayer::REFETCH_STATE_IMMEDIATE) ||
703           (object_set < active_set && object_player->refetch_required())) {
704         ldout(m_cct, 20) << __func__ << ": immediately refetching "
705                          << object_player->get_oid()
706                          << dendl;
707         object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE);
708         watch_interval = 0;
709       }
710     }
711     break;
712   case WATCH_STEP_FETCH_FIRST:
713     object_player = m_object_players.begin()->second;
714     watch_interval = 0;
715     break;
716   default:
717     assert(false);
718   }
719
720   ldout(m_cct, 20) << __func__ << ": scheduling watch on "
721                    << object_player->get_oid() << dendl;
722   Context *ctx = utils::create_async_context_callback(
723     m_journal_metadata, new C_Watch(this, object_player->get_object_number()));
724   object_player->watch(ctx, watch_interval);
725 }
726
727 void JournalPlayer::handle_watch(uint64_t object_num, int r) {
728   ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
729   Mutex::Locker locker(m_lock);
730   assert(m_watch_scheduled);
731   m_watch_scheduled = false;
732
733   if (m_shut_down || r == -ECANCELED) {
734     // unwatch of object player(s)
735     return;
736   }
737
738   ObjectPlayerPtr object_player = get_object_player(object_num);
739   if (r == 0 && object_player->empty()) {
740     // possibly need to prune this empty object player if we've
741     // already fetched it after the active set was advanced with no
742     // new records
743     remove_empty_object_player(object_player);
744   }
745
746   // determine what object to query on next watch schedule tick
747   uint8_t splay_width = m_journal_metadata->get_splay_width();
748   if (m_watch_step == WATCH_STEP_FETCH_CURRENT &&
749       object_player->get_object_number() % splay_width != 0) {
750     m_watch_step = WATCH_STEP_FETCH_FIRST;
751   } else if (m_active_tag_tid) {
752     m_watch_step = WATCH_STEP_ASSERT_ACTIVE;
753   } else {
754     m_watch_step = WATCH_STEP_FETCH_CURRENT;
755   }
756
757   process_state(object_num, r);
758 }
759
760 void JournalPlayer::handle_watch_assert_active(int r) {
761   ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
762
763   Mutex::Locker locker(m_lock);
764   assert(m_watch_scheduled);
765   m_watch_scheduled = false;
766
767   if (r == -ESTALE) {
768     // newer tag exists -- since we are at this step in the watch sequence,
769     // we know we can prune the active tag if watch fails again
770     ldout(m_cct, 10) << __func__ << ": tag " << *m_active_tag_tid << " "
771                      << "no longer active" << dendl;
772     m_watch_prune_active_tag = true;
773   }
774
775   m_watch_step = WATCH_STEP_FETCH_CURRENT;
776   if (!m_shut_down && m_watch_enabled) {
777     schedule_watch(false);
778   }
779   m_async_op_tracker.finish_op();
780 }
781
782 void JournalPlayer::notify_entries_available() {
783   assert(m_lock.is_locked());
784   if (m_handler_notified) {
785     return;
786   }
787   m_handler_notified = true;
788
789   ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
790   m_journal_metadata->queue(new C_HandleEntriesAvailable(
791     m_replay_handler), 0);
792 }
793
794 void JournalPlayer::notify_complete(int r) {
795   assert(m_lock.is_locked());
796   m_handler_notified = true;
797
798   ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl;
799   m_journal_metadata->queue(new C_HandleComplete(
800     m_replay_handler), r);
801 }
802
803 } // namespace journal