Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / journal / ObjectPlayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "journal/ObjectPlayer.h"
5 #include "journal/Utils.h"
6 #include "common/Timer.h"
7 #include <limits>
8
9 #define dout_subsys ceph_subsys_journaler
10 #undef dout_prefix
11 #define dout_prefix *_dout << "ObjectPlayer: " << this << " "
12
13 namespace journal {
14
15 ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
16                            const std::string &object_oid_prefix,
17                            uint64_t object_num, SafeTimer &timer,
18                            Mutex &timer_lock, uint8_t order,
19                            uint64_t max_fetch_bytes)
20   : RefCountedObject(NULL, 0), m_object_num(object_num),
21     m_oid(utils::get_object_name(object_oid_prefix, m_object_num)),
22     m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order),
23     m_max_fetch_bytes(max_fetch_bytes > 0 ? max_fetch_bytes : 2 << order),
24     m_watch_interval(0), m_watch_task(NULL),
25     m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)),
26     m_fetch_in_progress(false) {
27   m_ioctx.dup(ioctx);
28   m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
29 }
30
31 ObjectPlayer::~ObjectPlayer() {
32   {
33     Mutex::Locker timer_locker(m_timer_lock);
34     Mutex::Locker locker(m_lock);
35     assert(!m_fetch_in_progress);
36     assert(m_watch_ctx == nullptr);
37   }
38 }
39
40 void ObjectPlayer::fetch(Context *on_finish) {
41   ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl;
42
43   Mutex::Locker locker(m_lock);
44   assert(!m_fetch_in_progress);
45   m_fetch_in_progress = true;
46
47   C_Fetch *context = new C_Fetch(this, on_finish);
48   librados::ObjectReadOperation op;
49   op.read(m_read_off, m_max_fetch_bytes, &context->read_bl, NULL);
50   op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
51
52   librados::AioCompletion *rados_completion =
53     librados::Rados::aio_create_completion(context, utils::rados_ctx_callback,
54                                            NULL);
55   int r = m_ioctx.aio_operate(m_oid, rados_completion, &op, 0, NULL);
56   assert(r == 0);
57   rados_completion->release();
58 }
59
60 void ObjectPlayer::watch(Context *on_fetch, double interval) {
61   ldout(m_cct, 20) << __func__ << ": " << m_oid << " watch" << dendl;
62
63   Mutex::Locker timer_locker(m_timer_lock);
64   m_watch_interval = interval;
65
66   assert(m_watch_ctx == nullptr);
67   m_watch_ctx = on_fetch;
68
69   schedule_watch();
70 }
71
72 void ObjectPlayer::unwatch() {
73   ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl;
74   Context *watch_ctx = nullptr;
75   {
76     Mutex::Locker timer_locker(m_timer_lock);
77     assert(!m_unwatched);
78     m_unwatched = true;
79
80     if (!cancel_watch()) {
81       return;
82     }
83
84     std::swap(watch_ctx, m_watch_ctx);
85   }
86
87   if (watch_ctx != nullptr) {
88     watch_ctx->complete(-ECANCELED);
89   }
90 }
91
92 void ObjectPlayer::front(Entry *entry) const {
93   Mutex::Locker locker(m_lock);
94   assert(!m_entries.empty());
95   *entry = m_entries.front();
96 }
97
98 void ObjectPlayer::pop_front() {
99   Mutex::Locker locker(m_lock);
100   assert(!m_entries.empty());
101
102   auto &entry = m_entries.front();
103   m_entry_keys.erase({entry.get_tag_tid(), entry.get_entry_tid()});
104   m_entries.pop_front();
105 }
106
107 int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
108                                         bool *refetch) {
109   ldout(m_cct, 10) << __func__ << ": " << m_oid << ", r=" << r << ", len="
110                    << bl.length() << dendl;
111
112   *refetch = false;
113   if (r == -ENOENT) {
114     return 0;
115   } else if (r < 0) {
116     return r;
117   } else if (bl.length() == 0) {
118     return 0;
119   }
120
121   Mutex::Locker locker(m_lock);
122   assert(m_fetch_in_progress);
123   m_read_off += bl.length();
124   m_read_bl.append(bl);
125   m_refetch_state = REFETCH_STATE_REQUIRED;
126
127   bool full_fetch = (m_max_fetch_bytes == 2U << m_order);
128   bool partial_entry = false;
129   bool invalid = false;
130   uint32_t invalid_start_off = 0;
131
132   clear_invalid_range(m_read_bl_off, m_read_bl.length());
133   bufferlist::iterator iter(&m_read_bl, 0);
134   while (!iter.end()) {
135     uint32_t bytes_needed;
136     uint32_t bl_off = iter.get_off();
137     if (!Entry::is_readable(iter, &bytes_needed)) {
138       if (bytes_needed != 0) {
139         invalid_start_off = m_read_bl_off + bl_off;
140         invalid = true;
141         partial_entry = true;
142         if (full_fetch) {
143           lderr(m_cct) << ": partial record at offset " << invalid_start_off
144                        << dendl;
145         } else {
146           ldout(m_cct, 20) << ": partial record detected, will re-fetch"
147                            << dendl;
148         }
149         break;
150       }
151
152       if (!invalid) {
153         invalid_start_off = m_read_bl_off + bl_off;
154         invalid = true;
155         lderr(m_cct) << ": detected corrupt journal entry at offset "
156                      << invalid_start_off << dendl;
157       }
158       ++iter;
159       continue;
160     }
161
162     Entry entry;
163     ::decode(entry, iter);
164     ldout(m_cct, 20) << ": " << entry << " decoded" << dendl;
165
166     uint32_t entry_len = iter.get_off() - bl_off;
167     if (invalid) {
168       // new corrupt region detected
169       uint32_t invalid_end_off = m_read_bl_off + bl_off;
170       lderr(m_cct) << ": corruption range [" << invalid_start_off
171                    << ", " << invalid_end_off << ")" << dendl;
172       m_invalid_ranges.insert(invalid_start_off,
173                               invalid_end_off - invalid_start_off);
174       invalid = false;
175     }
176
177     EntryKey entry_key(std::make_pair(entry.get_tag_tid(),
178                                       entry.get_entry_tid()));
179     if (m_entry_keys.find(entry_key) == m_entry_keys.end()) {
180       m_entry_keys[entry_key] = m_entries.insert(m_entries.end(), entry);
181     } else {
182       ldout(m_cct, 10) << ": " << entry << " is duplicate, replacing" << dendl;
183       *m_entry_keys[entry_key] = entry;
184     }
185
186     // prune decoded / corrupted journal entries from front of bl
187     bufferlist sub_bl;
188     sub_bl.substr_of(m_read_bl, iter.get_off(),
189                      m_read_bl.length() - iter.get_off());
190     sub_bl.swap(m_read_bl);
191     iter = bufferlist::iterator(&m_read_bl, 0);
192
193     // advance the decoded entry offset
194     m_read_bl_off += entry_len;
195   }
196
197   if (invalid) {
198     uint32_t invalid_end_off = m_read_bl_off + m_read_bl.length();
199     if (!partial_entry) {
200       lderr(m_cct) << ": corruption range [" << invalid_start_off
201                    << ", " << invalid_end_off << ")" << dendl;
202     }
203     m_invalid_ranges.insert(invalid_start_off,
204                             invalid_end_off - invalid_start_off);
205   }
206
207   if (!m_invalid_ranges.empty() && !partial_entry) {
208     return -EBADMSG;
209   } else if (partial_entry && (full_fetch || m_entries.empty())) {
210     *refetch = true;
211     return -EAGAIN;
212   }
213
214   return 0;
215 }
216
217 void ObjectPlayer::clear_invalid_range(uint32_t off, uint32_t len) {
218   // possibly remove previously partial record region
219   InvalidRanges decode_range;
220   decode_range.insert(off, len);
221   InvalidRanges intersect_range;
222   intersect_range.intersection_of(m_invalid_ranges, decode_range);
223   if (!intersect_range.empty()) {
224     ldout(m_cct, 20) << ": clearing invalid range: " << intersect_range
225                      << dendl;
226     m_invalid_ranges.subtract(intersect_range);
227   }
228 }
229
230 void ObjectPlayer::schedule_watch() {
231   assert(m_timer_lock.is_locked());
232   if (m_watch_ctx == NULL) {
233     return;
234   }
235
236   ldout(m_cct, 20) << __func__ << ": " << m_oid << " scheduling watch" << dendl;
237   assert(m_watch_task == nullptr);
238   m_watch_task = m_timer.add_event_after(
239     m_watch_interval,
240     new FunctionContext([this](int) {
241         handle_watch_task();
242       }));
243 }
244
245 bool ObjectPlayer::cancel_watch() {
246   assert(m_timer_lock.is_locked());
247   ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
248   if (m_watch_task != nullptr) {
249     bool canceled = m_timer.cancel_event(m_watch_task);
250     assert(canceled);
251
252     m_watch_task = nullptr;
253     return true;
254   }
255   return false;
256 }
257
258 void ObjectPlayer::handle_watch_task() {
259   assert(m_timer_lock.is_locked());
260
261   ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
262   assert(m_watch_ctx != nullptr);
263   assert(m_watch_task != nullptr);
264
265   m_watch_task = nullptr;
266   fetch(new C_WatchFetch(this));
267 }
268
269 void ObjectPlayer::handle_watch_fetched(int r) {
270   ldout(m_cct, 10) << __func__ << ": " << m_oid << " poll complete, r=" << r
271                    << dendl;
272
273   Context *watch_ctx = nullptr;
274   {
275     Mutex::Locker timer_locker(m_timer_lock);
276     std::swap(watch_ctx, m_watch_ctx);
277
278     if (m_unwatched) {
279       m_unwatched = false;
280       r = -ECANCELED;
281     }
282   }
283
284   if (watch_ctx != nullptr) {
285     watch_ctx->complete(r);
286   }
287 }
288
289 void ObjectPlayer::C_Fetch::finish(int r) {
290   bool refetch = false;
291   r = object_player->handle_fetch_complete(r, read_bl, &refetch);
292
293   {
294     Mutex::Locker locker(object_player->m_lock);
295     object_player->m_fetch_in_progress = false;
296   }
297
298   if (refetch) {
299     object_player->fetch(on_finish);
300     return;
301   }
302
303   object_player.reset();
304   on_finish->complete(r);
305 }
306
307 void ObjectPlayer::C_WatchFetch::finish(int r) {
308   object_player->handle_watch_fetched(r);
309 }
310
311 } // namespace journal