Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / journal / JournalPlayer.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_JOURNAL_JOURNAL_PLAYER_H
5 #define CEPH_JOURNAL_JOURNAL_PLAYER_H
6
7 #include "include/int_types.h"
8 #include "include/Context.h"
9 #include "include/rados/librados.hpp"
10 #include "common/AsyncOpTracker.h"
11 #include "common/Mutex.h"
12 #include "journal/JournalMetadata.h"
13 #include "journal/ObjectPlayer.h"
14 #include "cls/journal/cls_journal_types.h"
15 #include <boost/none.hpp>
16 #include <boost/optional.hpp>
17 #include <map>
18
19 class SafeTimer;
20
21 namespace journal {
22
23 class Entry;
24 class ReplayHandler;
25
26 class JournalPlayer {
27 public:
28   typedef cls::journal::ObjectPosition ObjectPosition;
29   typedef cls::journal::ObjectPositions ObjectPositions;
30   typedef cls::journal::ObjectSetPosition ObjectSetPosition;
31
32   JournalPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
33                 const JournalMetadataPtr& journal_metadata,
34                 ReplayHandler *replay_handler);
35   ~JournalPlayer();
36
37   void prefetch();
38   void prefetch_and_watch(double interval);
39   void shut_down(Context *on_finish);
40
41   bool try_pop_front(Entry *entry, uint64_t *commit_tid);
42
43 private:
44   typedef std::set<uint8_t> PrefetchSplayOffsets;
45   typedef std::map<uint8_t, ObjectPlayerPtr> SplayedObjectPlayers;
46   typedef std::map<uint8_t, ObjectPosition> SplayedObjectPositions;
47   typedef std::set<uint64_t> ObjectNumbers;
48
49   enum State {
50     STATE_INIT,
51     STATE_PREFETCH,
52     STATE_PLAYBACK,
53     STATE_ERROR
54   };
55
56   enum WatchStep {
57     WATCH_STEP_FETCH_CURRENT,
58     WATCH_STEP_FETCH_FIRST,
59     WATCH_STEP_ASSERT_ACTIVE
60   };
61
62   struct C_Fetch : public Context {
63     JournalPlayer *player;
64     uint64_t object_num;
65     C_Fetch(JournalPlayer *p, uint64_t o) : player(p), object_num(o) {
66       player->m_async_op_tracker.start_op();
67     }
68     ~C_Fetch() override {
69       player->m_async_op_tracker.finish_op();
70     }
71     void finish(int r) override {
72       player->handle_fetched(object_num, r);
73     }
74   };
75
76   struct C_Watch : public Context {
77     JournalPlayer *player;
78     uint64_t object_num;
79     C_Watch(JournalPlayer *player, uint64_t object_num)
80       : player(player), object_num(object_num) {
81       player->m_async_op_tracker.start_op();
82     }
83     ~C_Watch() override {
84       player->m_async_op_tracker.finish_op();
85     }
86
87     void finish(int r) override {
88       player->handle_watch(object_num, r);
89     }
90   };
91
92   librados::IoCtx m_ioctx;
93   CephContext *m_cct;
94   std::string m_object_oid_prefix;
95   JournalMetadataPtr m_journal_metadata;
96
97   ReplayHandler *m_replay_handler;
98
99   AsyncOpTracker m_async_op_tracker;
100
101   mutable Mutex m_lock;
102   State m_state;
103   uint8_t m_splay_offset;
104
105   bool m_watch_enabled;
106   bool m_watch_scheduled;
107   double m_watch_interval;
108   WatchStep m_watch_step = WATCH_STEP_FETCH_CURRENT;
109   bool m_watch_prune_active_tag = false;
110
111   bool m_shut_down = false;
112   bool m_handler_notified = false;
113
114   ObjectNumbers m_fetch_object_numbers;
115
116   PrefetchSplayOffsets m_prefetch_splay_offsets;
117   SplayedObjectPlayers m_object_players;
118
119   bool m_commit_position_valid = false;
120   ObjectPosition m_commit_position;
121   SplayedObjectPositions m_commit_positions;
122   uint64_t m_active_set;
123
124   boost::optional<uint64_t> m_active_tag_tid = boost::none;
125   boost::optional<uint64_t> m_prune_tag_tid = boost::none;
126
127   void advance_splay_object();
128
129   bool is_object_set_ready() const;
130   bool verify_playback_ready();
131   void prune_tag(uint64_t tag_tid);
132   void prune_active_tag(const boost::optional<uint64_t>& tag_tid);
133
134   ObjectPlayerPtr get_object_player() const;
135   ObjectPlayerPtr get_object_player(uint64_t object_number) const;
136   bool remove_empty_object_player(const ObjectPlayerPtr &object_player);
137
138   void process_state(uint64_t object_number, int r);
139   int process_prefetch(uint64_t object_number);
140   int process_playback(uint64_t object_number);
141
142   void fetch(uint64_t object_num);
143   void fetch(const ObjectPlayerPtr &object_player);
144   void handle_fetched(uint64_t object_num, int r);
145   void refetch(bool immediate);
146
147   void schedule_watch(bool immediate);
148   void handle_watch(uint64_t object_num, int r);
149   void handle_watch_assert_active(int r);
150
151   void notify_entries_available();
152   void notify_complete(int r);
153 };
154
155 } // namespace journal
156
157 #endif // CEPH_JOURNAL_JOURNAL_PLAYER_H