// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include #include #include "Filer.h" #include "osd/OSDMap.h" #include "Striper.h" #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" #include "messages/MOSDMap.h" #include "msg/Messenger.h" #include "include/Context.h" #include "common/Finisher.h" #include "common/config.h" #define dout_subsys ceph_subsys_filer #undef dout_prefix #define dout_prefix *_dout << objecter->messenger->get_myname() << ".filer " class Filer::C_Probe : public Context { public: Filer *filer; Probe *probe; object_t oid; uint64_t size; ceph::real_time mtime; C_Probe(Filer *f, Probe *p, object_t o) : filer(f), probe(p), oid(o), size(0) {} void finish(int r) override { if (r == -ENOENT) { r = 0; assert(size == 0); } bool probe_complete; { Probe::unique_lock pl(probe->lock); if (r != 0) { probe->err = r; } probe_complete = filer->_probed(probe, oid, size, mtime, pl); assert(!pl.owns_lock()); } if (probe_complete) { probe->onfinish->complete(probe->err); delete probe; } } }; int Filer::probe(inodeno_t ino, file_layout_t *layout, snapid_t snapid, uint64_t start_from, uint64_t *end, // LB, when !fwd ceph::real_time *pmtime, bool fwd, int flags, Context *onfinish) { ldout(cct, 10) << "probe " << (fwd ? "fwd ":"bwd ") << hex << ino << dec << " starting from " << start_from << dendl; assert(snapid); // (until there is a non-NOSNAP write) Probe *probe = new Probe(ino, *layout, snapid, start_from, end, pmtime, flags, fwd, onfinish); return probe_impl(probe, layout, start_from, end); } int Filer::probe(inodeno_t ino, file_layout_t *layout, snapid_t snapid, uint64_t start_from, uint64_t *end, // LB, when !fwd utime_t *pmtime, bool fwd, int flags, Context *onfinish) { ldout(cct, 10) << "probe " << (fwd ? "fwd ":"bwd ") << hex << ino << dec << " starting from " << start_from << dendl; assert(snapid); // (until there is a non-NOSNAP write) Probe *probe = new Probe(ino, *layout, snapid, start_from, end, pmtime, flags, fwd, onfinish); return probe_impl(probe, layout, start_from, end); } int Filer::probe_impl(Probe* probe, file_layout_t *layout, uint64_t start_from, uint64_t *end) // LB, when !fwd { // period (bytes before we jump unto a new set of object(s)) uint64_t period = layout->get_period(); // start with 1+ periods. probe->probing_len = period; if (probe->fwd) { if (start_from % period) probe->probing_len += period - (start_from % period); } else { assert(start_from > *end); if (start_from % period) probe->probing_len -= period - (start_from % period); probe->probing_off -= probe->probing_len; } Probe::unique_lock pl(probe->lock); _probe(probe, pl); assert(!pl.owns_lock()); return 0; } /** * probe->lock must be initially locked, this function will release it */ void Filer::_probe(Probe *probe, Probe::unique_lock& pl) { assert(pl.owns_lock() && pl.mutex() == &probe->lock); ldout(cct, 10) << "_probe " << hex << probe->ino << dec << " " << probe->probing_off << "~" << probe->probing_len << dendl; // map range onto objects probe->known_size.clear(); probe->probing.clear(); Striper::file_to_extents(cct, probe->ino, &probe->layout, probe->probing_off, probe->probing_len, 0, probe->probing); std::vector stat_extents; for (vector::iterator p = probe->probing.begin(); p != probe->probing.end(); ++p) { ldout(cct, 10) << "_probe probing " << p->oid << dendl; probe->ops.insert(p->oid); stat_extents.push_back(*p); } pl.unlock(); for (std::vector::iterator i = stat_extents.begin(); i != stat_extents.end(); ++i) { C_Probe *c = new C_Probe(this, probe, i->oid); objecter->stat(i->oid, i->oloc, probe->snapid, &c->size, &c->mtime, probe->flags | CEPH_OSD_FLAG_RWORDERED, new C_OnFinisher(c, finisher)); } } /** * probe->lock must be initially held, and will be released by this function. * * @return true if probe is complete and Probe object may be freed. */ bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, ceph::real_time mtime, Probe::unique_lock& pl) { assert(pl.owns_lock() && pl.mutex() == &probe->lock); ldout(cct, 10) << "_probed " << probe->ino << " object " << oid << " has size " << size << " mtime " << mtime << dendl; probe->known_size[oid] = size; if (mtime > probe->max_mtime) probe->max_mtime = mtime; assert(probe->ops.count(oid)); probe->ops.erase(oid); if (!probe->ops.empty()) { pl.unlock(); return false; // waiting for more! } if (probe->err) { // we hit an error, propagate back up pl.unlock(); return true; } // analyze! uint64_t end = 0; if (!probe->fwd) { std::reverse(probe->probing.begin(), probe->probing.end()); } for (vector::iterator p = probe->probing.begin(); p != probe->probing.end(); ++p) { uint64_t shouldbe = p->length + p->offset; ldout(cct, 10) << "_probed " << probe->ino << " object " << hex << p->oid << dec << " should be " << shouldbe << ", actual is " << probe->known_size[p->oid] << dendl; if (!probe->found_size) { assert(probe->known_size[p->oid] <= shouldbe); if ((probe->fwd && probe->known_size[p->oid] == shouldbe) || (!probe->fwd && probe->known_size[p->oid] == 0 && probe->probing_off > 0)) continue; // keep going // aha, we found the end! // calc offset into buffer_extent to get distance from probe->from. uint64_t oleft = probe->known_size[p->oid] - p->offset; for (vector >::iterator i = p->buffer_extents.begin(); i != p->buffer_extents.end(); ++i) { if (oleft <= (uint64_t)i->second) { end = probe->probing_off + i->first + oleft; ldout(cct, 10) << "_probed end is in buffer_extent " << i->first << "~" << i->second << " off " << oleft << ", from was " << probe->probing_off << ", end is " << end << dendl; probe->found_size = true; ldout(cct, 10) << "_probed found size at " << end << dendl; *probe->psize = end; if (!probe->pmtime && !probe->pumtime) // stop if we don't need mtime too break; } oleft -= i->second; } } break; } if (!probe->found_size || (probe->probing_off && (probe->pmtime || probe->pumtime))) { // keep probing! ldout(cct, 10) << "_probed probing further" << dendl; uint64_t period = probe->layout.get_period(); if (probe->fwd) { probe->probing_off += probe->probing_len; assert(probe->probing_off % period == 0); probe->probing_len = period; } else { // previous period. assert(probe->probing_off % period == 0); probe->probing_len = period; probe->probing_off -= period; } _probe(probe, pl); assert(!pl.owns_lock()); return false; } else if (probe->pmtime) { ldout(cct, 10) << "_probed found mtime " << probe->max_mtime << dendl; *probe->pmtime = probe->max_mtime; } else if (probe->pumtime) { ldout(cct, 10) << "_probed found mtime " << probe->max_mtime << dendl; *probe->pumtime = ceph::real_clock::to_ceph_timespec(probe->max_mtime); } // done! pl.unlock(); return true; } // ----------------------- struct PurgeRange { std::mutex lock; typedef std::lock_guard lock_guard; typedef std::unique_lock unique_lock; inodeno_t ino; file_layout_t layout; SnapContext snapc; uint64_t first, num; ceph::real_time mtime; int flags; Context *oncommit; int uncommitted; PurgeRange(inodeno_t i, const file_layout_t& l, const SnapContext& sc, uint64_t fo, uint64_t no, ceph::real_time t, int fl, Context *fin) : ino(i), layout(l), snapc(sc), first(fo), num(no), mtime(t), flags(fl), oncommit(fin), uncommitted(0) {} }; int Filer::purge_range(inodeno_t ino, const file_layout_t *layout, const SnapContext& snapc, uint64_t first_obj, uint64_t num_obj, ceph::real_time mtime, int flags, Context *oncommit) { assert(num_obj > 0); // single object? easy! if (num_obj == 1) { object_t oid = file_object_t(ino, first_obj); object_locator_t oloc = OSDMap::file_to_object_locator(*layout); objecter->remove(oid, oloc, snapc, mtime, flags, oncommit); return 0; } PurgeRange *pr = new PurgeRange(ino, *layout, snapc, first_obj, num_obj, mtime, flags, oncommit); _do_purge_range(pr, 0); return 0; } struct C_PurgeRange : public Context { Filer *filer; PurgeRange *pr; C_PurgeRange(Filer *f, PurgeRange *p) : filer(f), pr(p) {} void finish(int r) override { filer->_do_purge_range(pr, 1); } }; void Filer::_do_purge_range(PurgeRange *pr, int fin) { PurgeRange::unique_lock prl(pr->lock); pr->uncommitted -= fin; ldout(cct, 10) << "_do_purge_range " << pr->ino << " objects " << pr->first << "~" << pr->num << " uncommitted " << pr->uncommitted << dendl; if (pr->num == 0 && pr->uncommitted == 0) { pr->oncommit->complete(0); prl.unlock(); delete pr; return; } std::vector remove_oids; int max = cct->_conf->filer_max_purge_ops - pr->uncommitted; while (pr->num > 0 && max > 0) { remove_oids.push_back(file_object_t(pr->ino, pr->first)); pr->uncommitted++; pr->first++; pr->num--; max--; } prl.unlock(); // Issue objecter ops outside pr->lock to avoid lock dependency loop for (const auto& oid : remove_oids) { object_locator_t oloc = OSDMap::file_to_object_locator(pr->layout); objecter->remove(oid, oloc, pr->snapc, pr->mtime, pr->flags, new C_OnFinisher(new C_PurgeRange(this, pr), finisher)); } } // ----------------------- struct TruncRange { std::mutex lock; typedef std::lock_guard lock_guard; typedef std::unique_lock unique_lock; inodeno_t ino; file_layout_t layout; SnapContext snapc; ceph::real_time mtime; int flags; Context *oncommit; int uncommitted; uint64_t offset; uint64_t length; uint32_t truncate_seq; TruncRange(inodeno_t i, const file_layout_t& l, const SnapContext& sc, ceph::real_time t, int fl, Context *fin, uint64_t off, uint64_t len, uint32_t ts) : ino(i), layout(l), snapc(sc), mtime(t), flags(fl), oncommit(fin), uncommitted(0), offset(off), length(len), truncate_seq(ts) {} }; void Filer::truncate(inodeno_t ino, file_layout_t *layout, const SnapContext& snapc, uint64_t offset, uint64_t len, __u32 truncate_seq, ceph::real_time mtime, int flags, Context *oncommit) { uint64_t period = layout->get_period(); uint64_t num_objs = Striper::get_num_objects(*layout, len + (offset % period)); if (num_objs == 1) { vector extents; Striper::file_to_extents(cct, ino, layout, offset, len, 0, extents); vector ops(1); ops[0].op.op = CEPH_OSD_OP_TRIMTRUNC; ops[0].op.extent.truncate_seq = truncate_seq; ops[0].op.extent.truncate_size = extents[0].offset; objecter->_modify(extents[0].oid, extents[0].oloc, ops, mtime, snapc, flags, oncommit); return; } if (len > 0 && (offset + len) % period) len += period - ((offset + len) % period); TruncRange *tr = new TruncRange(ino, *layout, snapc, mtime, flags, oncommit, offset, len, truncate_seq); _do_truncate_range(tr, 0); } struct C_TruncRange : public Context { Filer *filer; TruncRange *tr; C_TruncRange(Filer *f, TruncRange *t) : filer(f), tr(t) {} void finish(int r) override { filer->_do_truncate_range(tr, 1); } }; void Filer::_do_truncate_range(TruncRange *tr, int fin) { TruncRange::unique_lock trl(tr->lock); tr->uncommitted -= fin; ldout(cct, 10) << "_do_truncate_range " << tr->ino << " objects " << tr->offset << "~" << tr->length << " uncommitted " << tr->uncommitted << dendl; if (tr->length == 0 && tr->uncommitted == 0) { tr->oncommit->complete(0); trl.unlock(); delete tr; return; } vector extents; int max = cct->_conf->filer_max_truncate_ops - tr->uncommitted; if (max > 0 && tr->length > 0) { uint64_t len = tr->layout.get_period() * max; if (len > tr->length) len = tr->length; uint64_t offset = tr->offset + tr->length - len; Striper::file_to_extents(cct, tr->ino, &tr->layout, offset, len, 0, extents); tr->uncommitted += extents.size(); tr->length -= len; } trl.unlock(); // Issue objecter ops outside tr->lock to avoid lock dependency loop for (const auto& p : extents) { vector ops(1); ops[0].op.op = CEPH_OSD_OP_TRIMTRUNC; ops[0].op.extent.truncate_size = p.offset; ops[0].op.extent.truncate_seq = tr->truncate_seq; objecter->_modify(p.oid, p.oloc, ops, tr->mtime, tr->snapc, tr->flags, new C_OnFinisher(new C_TruncRange(this, tr), finisher)); } }