1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "journal/ObjectPlayer.h"
5 #include "journal/Utils.h"
6 #include "common/Timer.h"
9 #define dout_subsys ceph_subsys_journaler
11 #define dout_prefix *_dout << "ObjectPlayer: " << this << " "
15 ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
16 const std::string &object_oid_prefix,
17 uint64_t object_num, SafeTimer &timer,
18 Mutex &timer_lock, uint8_t order,
19 uint64_t max_fetch_bytes)
20 : RefCountedObject(NULL, 0), m_object_num(object_num),
21 m_oid(utils::get_object_name(object_oid_prefix, m_object_num)),
22 m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order),
23 m_max_fetch_bytes(max_fetch_bytes > 0 ? max_fetch_bytes : 2 << order),
24 m_watch_interval(0), m_watch_task(NULL),
25 m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)),
26 m_fetch_in_progress(false) {
28 m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
31 ObjectPlayer::~ObjectPlayer() {
33 Mutex::Locker timer_locker(m_timer_lock);
34 Mutex::Locker locker(m_lock);
35 assert(!m_fetch_in_progress);
36 assert(m_watch_ctx == nullptr);
40 void ObjectPlayer::fetch(Context *on_finish) {
41 ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl;
43 Mutex::Locker locker(m_lock);
44 assert(!m_fetch_in_progress);
45 m_fetch_in_progress = true;
47 C_Fetch *context = new C_Fetch(this, on_finish);
48 librados::ObjectReadOperation op;
49 op.read(m_read_off, m_max_fetch_bytes, &context->read_bl, NULL);
50 op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
52 librados::AioCompletion *rados_completion =
53 librados::Rados::aio_create_completion(context, utils::rados_ctx_callback,
55 int r = m_ioctx.aio_operate(m_oid, rados_completion, &op, 0, NULL);
57 rados_completion->release();
60 void ObjectPlayer::watch(Context *on_fetch, double interval) {
61 ldout(m_cct, 20) << __func__ << ": " << m_oid << " watch" << dendl;
63 Mutex::Locker timer_locker(m_timer_lock);
64 m_watch_interval = interval;
66 assert(m_watch_ctx == nullptr);
67 m_watch_ctx = on_fetch;
72 void ObjectPlayer::unwatch() {
73 ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl;
74 Context *watch_ctx = nullptr;
76 Mutex::Locker timer_locker(m_timer_lock);
80 if (!cancel_watch()) {
84 std::swap(watch_ctx, m_watch_ctx);
87 if (watch_ctx != nullptr) {
88 watch_ctx->complete(-ECANCELED);
92 void ObjectPlayer::front(Entry *entry) const {
93 Mutex::Locker locker(m_lock);
94 assert(!m_entries.empty());
95 *entry = m_entries.front();
98 void ObjectPlayer::pop_front() {
99 Mutex::Locker locker(m_lock);
100 assert(!m_entries.empty());
102 auto &entry = m_entries.front();
103 m_entry_keys.erase({entry.get_tag_tid(), entry.get_entry_tid()});
104 m_entries.pop_front();
107 int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
109 ldout(m_cct, 10) << __func__ << ": " << m_oid << ", r=" << r << ", len="
110 << bl.length() << dendl;
117 } else if (bl.length() == 0) {
121 Mutex::Locker locker(m_lock);
122 assert(m_fetch_in_progress);
123 m_read_off += bl.length();
124 m_read_bl.append(bl);
125 m_refetch_state = REFETCH_STATE_REQUIRED;
127 bool full_fetch = (m_max_fetch_bytes == 2U << m_order);
128 bool partial_entry = false;
129 bool invalid = false;
130 uint32_t invalid_start_off = 0;
132 clear_invalid_range(m_read_bl_off, m_read_bl.length());
133 bufferlist::iterator iter(&m_read_bl, 0);
134 while (!iter.end()) {
135 uint32_t bytes_needed;
136 uint32_t bl_off = iter.get_off();
137 if (!Entry::is_readable(iter, &bytes_needed)) {
138 if (bytes_needed != 0) {
139 invalid_start_off = m_read_bl_off + bl_off;
141 partial_entry = true;
143 lderr(m_cct) << ": partial record at offset " << invalid_start_off
146 ldout(m_cct, 20) << ": partial record detected, will re-fetch"
153 invalid_start_off = m_read_bl_off + bl_off;
155 lderr(m_cct) << ": detected corrupt journal entry at offset "
156 << invalid_start_off << dendl;
163 ::decode(entry, iter);
164 ldout(m_cct, 20) << ": " << entry << " decoded" << dendl;
166 uint32_t entry_len = iter.get_off() - bl_off;
168 // new corrupt region detected
169 uint32_t invalid_end_off = m_read_bl_off + bl_off;
170 lderr(m_cct) << ": corruption range [" << invalid_start_off
171 << ", " << invalid_end_off << ")" << dendl;
172 m_invalid_ranges.insert(invalid_start_off,
173 invalid_end_off - invalid_start_off);
177 EntryKey entry_key(std::make_pair(entry.get_tag_tid(),
178 entry.get_entry_tid()));
179 if (m_entry_keys.find(entry_key) == m_entry_keys.end()) {
180 m_entry_keys[entry_key] = m_entries.insert(m_entries.end(), entry);
182 ldout(m_cct, 10) << ": " << entry << " is duplicate, replacing" << dendl;
183 *m_entry_keys[entry_key] = entry;
186 // prune decoded / corrupted journal entries from front of bl
188 sub_bl.substr_of(m_read_bl, iter.get_off(),
189 m_read_bl.length() - iter.get_off());
190 sub_bl.swap(m_read_bl);
191 iter = bufferlist::iterator(&m_read_bl, 0);
193 // advance the decoded entry offset
194 m_read_bl_off += entry_len;
198 uint32_t invalid_end_off = m_read_bl_off + m_read_bl.length();
199 if (!partial_entry) {
200 lderr(m_cct) << ": corruption range [" << invalid_start_off
201 << ", " << invalid_end_off << ")" << dendl;
203 m_invalid_ranges.insert(invalid_start_off,
204 invalid_end_off - invalid_start_off);
207 if (!m_invalid_ranges.empty() && !partial_entry) {
209 } else if (partial_entry && (full_fetch || m_entries.empty())) {
217 void ObjectPlayer::clear_invalid_range(uint32_t off, uint32_t len) {
218 // possibly remove previously partial record region
219 InvalidRanges decode_range;
220 decode_range.insert(off, len);
221 InvalidRanges intersect_range;
222 intersect_range.intersection_of(m_invalid_ranges, decode_range);
223 if (!intersect_range.empty()) {
224 ldout(m_cct, 20) << ": clearing invalid range: " << intersect_range
226 m_invalid_ranges.subtract(intersect_range);
230 void ObjectPlayer::schedule_watch() {
231 assert(m_timer_lock.is_locked());
232 if (m_watch_ctx == NULL) {
236 ldout(m_cct, 20) << __func__ << ": " << m_oid << " scheduling watch" << dendl;
237 assert(m_watch_task == nullptr);
238 m_watch_task = m_timer.add_event_after(
240 new FunctionContext([this](int) {
245 bool ObjectPlayer::cancel_watch() {
246 assert(m_timer_lock.is_locked());
247 ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
248 if (m_watch_task != nullptr) {
249 bool canceled = m_timer.cancel_event(m_watch_task);
252 m_watch_task = nullptr;
258 void ObjectPlayer::handle_watch_task() {
259 assert(m_timer_lock.is_locked());
261 ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
262 assert(m_watch_ctx != nullptr);
263 assert(m_watch_task != nullptr);
265 m_watch_task = nullptr;
266 fetch(new C_WatchFetch(this));
269 void ObjectPlayer::handle_watch_fetched(int r) {
270 ldout(m_cct, 10) << __func__ << ": " << m_oid << " poll complete, r=" << r
273 Context *watch_ctx = nullptr;
275 Mutex::Locker timer_locker(m_timer_lock);
276 std::swap(watch_ctx, m_watch_ctx);
284 if (watch_ctx != nullptr) {
285 watch_ctx->complete(r);
289 void ObjectPlayer::C_Fetch::finish(int r) {
290 bool refetch = false;
291 r = object_player->handle_fetch_complete(r, read_bl, &refetch);
294 Mutex::Locker locker(object_player->m_lock);
295 object_player->m_fetch_in_progress = false;
299 object_player->fetch(on_finish);
303 object_player.reset();
304 on_finish->complete(r);
307 void ObjectPlayer::C_WatchFetch::finish(int r) {
308 object_player->handle_watch_fetched(r);
311 } // namespace journal