// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "include/int_types.h" #include "common/Mutex.h" #include "common/Cond.h" #include "include/rados/librados.hpp" #include #include #include #include #include #include #include #include #include #include #include "Object.h" #include "TestOpStat.h" #include "test/librados/test.h" #include "include/memory.h" #include "common/sharedptr_registry.hpp" #include "common/errno.h" #include "osd/HitSet.h" #ifndef RADOSMODEL_H #define RADOSMODEL_H using namespace std; class RadosTestContext; class TestOpStat; template typename T::iterator rand_choose(T &cont) { if (cont.size() == 0) { return cont.end(); } int index = rand() % cont.size(); typename T::iterator retval = cont.begin(); for (; index > 0; --index) ++retval; return retval; } enum TestOpType { TEST_OP_READ, TEST_OP_WRITE, TEST_OP_WRITE_EXCL, TEST_OP_WRITESAME, TEST_OP_DELETE, TEST_OP_SNAP_CREATE, TEST_OP_SNAP_REMOVE, TEST_OP_ROLLBACK, TEST_OP_SETATTR, TEST_OP_RMATTR, TEST_OP_WATCH, TEST_OP_COPY_FROM, TEST_OP_HIT_SET_LIST, TEST_OP_UNDIRTY, TEST_OP_IS_DIRTY, TEST_OP_CACHE_FLUSH, TEST_OP_CACHE_TRY_FLUSH, TEST_OP_CACHE_EVICT, TEST_OP_APPEND, TEST_OP_APPEND_EXCL, TEST_OP_SET_REDIRECT, TEST_OP_UNSET_REDIRECT }; class TestWatchContext : public librados::WatchCtx2 { TestWatchContext(const TestWatchContext&); public: Cond cond; uint64_t handle; bool waiting; Mutex lock; TestWatchContext() : handle(0), waiting(false), lock("watch lock") {} void handle_notify(uint64_t notify_id, uint64_t cookie, uint64_t notifier_id, bufferlist &bl) override { Mutex::Locker l(lock); waiting = false; cond.SignalAll(); } void handle_error(uint64_t cookie, int err) override { Mutex::Locker l(lock); cout << "watch handle_error " << err << std::endl; } void start() { Mutex::Locker l(lock); waiting = true; } void wait() { Mutex::Locker l(lock); while (waiting) cond.Wait(lock); } uint64_t &get_handle() { return handle; } }; class TestOp { public: int num; RadosTestContext *context; TestOpStat *stat; bool done; TestOp(int n, RadosTestContext *context, TestOpStat *stat = 0) : num(n), context(context), stat(stat), done(false) {} virtual ~TestOp() {}; /** * This struct holds data to be passed by a callback * to a TestOp::finish method. */ struct CallbackInfo { uint64_t id; explicit CallbackInfo(uint64_t id) : id(id) {} virtual ~CallbackInfo() {}; }; virtual void _begin() = 0; /** * Called when the operation completes. * This should be overridden by asynchronous operations. * * @param info information stored by a callback, or NULL - * useful for multi-operation TestOps */ virtual void _finish(CallbackInfo *info) { return; } virtual string getType() = 0; virtual bool finished() { return true; } void begin(); void finish(CallbackInfo *info); virtual bool must_quiesce_other_ops() { return false; } }; class TestOpGenerator { public: virtual ~TestOpGenerator() {}; virtual TestOp *next(RadosTestContext &context) = 0; }; class RadosTestContext { public: Mutex state_lock; Cond wait_cond; map > pool_obj_cont; set oid_in_use; set oid_not_in_use; set oid_flushing; set oid_not_flushing; set oid_redirect_not_in_use; set oid_redirect_in_use; SharedPtrRegistry snaps_in_use; int current_snap; string pool_name; librados::IoCtx io_ctx; librados::Rados rados; int next_oid; string prefix; int errors; int max_in_flight; int seq_num; map snaps; uint64_t seq; const char *rados_id; bool initialized; map watches; const uint64_t max_size; const uint64_t min_stride_size; const uint64_t max_stride_size; AttrGenerator attr_gen; const bool no_omap; const bool no_sparse; bool pool_snaps; bool write_fadvise_dontneed; int snapname_num; map redirect_objs; RadosTestContext(const string &pool_name, int max_in_flight, uint64_t max_size, uint64_t min_stride_size, uint64_t max_stride_size, bool no_omap, bool no_sparse, bool pool_snaps, bool write_fadvise_dontneed, const char *id = 0) : state_lock("Context Lock"), pool_obj_cont(), current_snap(0), pool_name(pool_name), next_oid(0), errors(0), max_in_flight(max_in_flight), seq_num(0), seq(0), rados_id(id), initialized(false), max_size(max_size), min_stride_size(min_stride_size), max_stride_size(max_stride_size), attr_gen(2000, 20000), no_omap(no_omap), no_sparse(no_sparse), pool_snaps(pool_snaps), write_fadvise_dontneed(write_fadvise_dontneed), snapname_num(0) { } int init() { int r = rados.init(rados_id); if (r < 0) return r; r = rados.conf_read_file(NULL); if (r < 0) return r; r = rados.conf_parse_env(NULL); if (r < 0) return r; r = rados.connect(); if (r < 0) return r; r = rados.ioctx_create(pool_name.c_str(), io_ctx); if (r < 0) { rados.shutdown(); return r; } bufferlist inbl; r = rados.mon_command( "{\"prefix\": \"osd pool set\", \"pool\": \"" + pool_name + "\", \"var\": \"write_fadvise_dontneed\", \"val\": \"" + (write_fadvise_dontneed ? "true" : "false") + "\"}", inbl, NULL, NULL); if (r < 0) { rados.shutdown(); return r; } char hostname_cstr[100]; gethostname(hostname_cstr, 100); stringstream hostpid; hostpid << hostname_cstr << getpid() << "-"; prefix = hostpid.str(); assert(!initialized); initialized = true; return 0; } void shutdown() { if (initialized) { rados.shutdown(); } } void loop(TestOpGenerator *gen) { assert(initialized); list inflight; state_lock.Lock(); TestOp *next = gen->next(*this); TestOp *waiting = NULL; while (next || !inflight.empty()) { if (next && next->must_quiesce_other_ops() && !inflight.empty()) { waiting = next; next = NULL; // Force to wait for inflight to drain } if (next) { inflight.push_back(next); } state_lock.Unlock(); if (next) { (*inflight.rbegin())->begin(); } state_lock.Lock(); while (1) { for (list::iterator i = inflight.begin(); i != inflight.end();) { if ((*i)->finished()) { cout << (*i)->num << ": done (" << (inflight.size()-1) << " left)" << std::endl; delete *i; inflight.erase(i++); } else { ++i; } } if (inflight.size() >= (unsigned) max_in_flight || (!next && !inflight.empty())) { cout << " waiting on " << inflight.size() << std::endl; wait(); } else { break; } } if (waiting) { next = waiting; waiting = NULL; } else { next = gen->next(*this); } } state_lock.Unlock(); } void wait() { wait_cond.Wait(state_lock); } void kick() { wait_cond.Signal(); } TestWatchContext *get_watch_context(const string &oid) { return watches.count(oid) ? watches[oid] : 0; } TestWatchContext *watch(const string &oid) { assert(!watches.count(oid)); return (watches[oid] = new TestWatchContext); } void unwatch(const string &oid) { assert(watches.count(oid)); delete watches[oid]; watches.erase(oid); } ObjectDesc get_most_recent(const string &oid) { ObjectDesc new_obj; for (map >::reverse_iterator i = pool_obj_cont.rbegin(); i != pool_obj_cont.rend(); ++i) { map::iterator j = i->second.find(oid); if (j != i->second.end()) { new_obj = j->second; break; } } return new_obj; } void rm_object_attrs(const string &oid, const set &attrs) { ObjectDesc new_obj = get_most_recent(oid); for (set::const_iterator i = attrs.begin(); i != attrs.end(); ++i) { new_obj.attrs.erase(*i); } new_obj.dirty = true; pool_obj_cont[current_snap].erase(oid); pool_obj_cont[current_snap].insert(pair(oid, new_obj)); } void remove_object_header(const string &oid) { ObjectDesc new_obj = get_most_recent(oid); new_obj.header = bufferlist(); new_obj.dirty = true; pool_obj_cont[current_snap].erase(oid); pool_obj_cont[current_snap].insert(pair(oid, new_obj)); } void update_object_header(const string &oid, const bufferlist &bl) { ObjectDesc new_obj = get_most_recent(oid); new_obj.header = bl; new_obj.exists = true; new_obj.dirty = true; pool_obj_cont[current_snap].erase(oid); pool_obj_cont[current_snap].insert(pair(oid, new_obj)); } void update_object_attrs(const string &oid, const map &attrs) { ObjectDesc new_obj = get_most_recent(oid); for (map::const_iterator i = attrs.begin(); i != attrs.end(); ++i) { new_obj.attrs[i->first] = i->second; } new_obj.exists = true; new_obj.dirty = true; pool_obj_cont[current_snap].erase(oid); pool_obj_cont[current_snap].insert(pair(oid, new_obj)); } void update_object(ContentsGenerator *cont_gen, const string &oid, const ContDesc &contents) { ObjectDesc new_obj = get_most_recent(oid); new_obj.exists = true; new_obj.dirty = true; new_obj.update(cont_gen, contents); pool_obj_cont[current_snap].erase(oid); pool_obj_cont[current_snap].insert(pair(oid, new_obj)); } void update_object_full(const string &oid, const ObjectDesc &contents) { pool_obj_cont[current_snap].erase(oid); pool_obj_cont[current_snap].insert(pair(oid, contents)); pool_obj_cont[current_snap][oid].dirty = true; } void update_object_undirty(const string &oid) { ObjectDesc new_obj = get_most_recent(oid); new_obj.dirty = false; pool_obj_cont[current_snap].erase(oid); pool_obj_cont[current_snap].insert(pair(oid, new_obj)); } void update_object_version(const string &oid, uint64_t version, int snap = -1) { for (map >::reverse_iterator i = pool_obj_cont.rbegin(); i != pool_obj_cont.rend(); ++i) { if (snap != -1 && snap < i->first) continue; map::iterator j = i->second.find(oid); if (j != i->second.end()) { if (version) j->second.version = version; cout << __func__ << " oid " << oid << " v " << version << " " << j->second.most_recent() << " " << (j->second.dirty ? "dirty" : "clean") << " " << (j->second.exists ? "exists" : "dne") << std::endl; break; } } } void remove_object(const string &oid) { assert(!get_watch_context(oid)); ObjectDesc new_obj; pool_obj_cont[current_snap].erase(oid); pool_obj_cont[current_snap].insert(pair(oid, new_obj)); } bool find_object(const string &oid, ObjectDesc *contents, int snap = -1) const { for (map >::const_reverse_iterator i = pool_obj_cont.rbegin(); i != pool_obj_cont.rend(); ++i) { if (snap != -1 && snap < i->first) continue; if (i->second.count(oid) != 0) { *contents = i->second.find(oid)->second; return true; } } return false; } void update_object_redirect_target(const string &oid, const string &target) { redirect_objs[oid] = target; } bool object_existed_at(const string &oid, int snap = -1) const { ObjectDesc contents; bool found = find_object(oid, &contents, snap); return found && contents.exists; } void remove_snap(int snap) { map >::iterator next_iter = pool_obj_cont.find(snap); assert(next_iter != pool_obj_cont.end()); map >::iterator current_iter = next_iter++; assert(current_iter != pool_obj_cont.end()); map ¤t = current_iter->second; map &next = next_iter->second; for (map::iterator i = current.begin(); i != current.end(); ++i) { if (next.count(i->first) == 0) { next.insert(pair(i->first, i->second)); } } pool_obj_cont.erase(current_iter); snaps.erase(snap); } void add_snap(uint64_t snap) { snaps[current_snap] = snap; current_snap++; pool_obj_cont[current_snap]; seq = snap; } void roll_back(const string &oid, int snap) { assert(!get_watch_context(oid)); ObjectDesc contents; find_object(oid, &contents, snap); contents.dirty = true; pool_obj_cont.rbegin()->second.erase(oid); pool_obj_cont.rbegin()->second.insert(pair(oid, contents)); } }; void read_callback(librados::completion_t comp, void *arg); void write_callback(librados::completion_t comp, void *arg); class RemoveAttrsOp : public TestOp { public: string oid; librados::ObjectWriteOperation op; librados::AioCompletion *comp; RemoveAttrsOp(int n, RadosTestContext *context, const string &oid, TestOpStat *stat) : TestOp(n, context, stat), oid(oid), comp(NULL) {} void _begin() override { ContDesc cont; set to_remove; { Mutex::Locker l(context->state_lock); ObjectDesc obj; if (!context->find_object(oid, &obj)) { context->kick(); done = true; return; } cont = ContDesc(context->seq_num, context->current_snap, context->seq_num, ""); context->oid_in_use.insert(oid); context->oid_not_in_use.erase(oid); if (rand() % 30) { ContentsGenerator::iterator iter = context->attr_gen.get_iterator(cont); for (map::iterator i = obj.attrs.begin(); i != obj.attrs.end(); ++i, ++iter) { if (!(*iter % 3)) { to_remove.insert(i->first); op.rmxattr(i->first.c_str()); } } if (to_remove.empty()) { context->kick(); context->oid_in_use.erase(oid); context->oid_not_in_use.insert(oid); done = true; return; } if (!context->no_omap) { op.omap_rm_keys(to_remove); } } else { if (!context->no_omap) { op.omap_clear(); } for (map::iterator i = obj.attrs.begin(); i != obj.attrs.end(); ++i) { op.rmxattr(i->first.c_str()); to_remove.insert(i->first); } context->remove_object_header(oid); } context->rm_object_attrs(oid, to_remove); } pair *cb_arg = new pair(this, new TestOp::CallbackInfo(0)); comp = context->rados.aio_create_completion((void*) cb_arg, NULL, &write_callback); context->io_ctx.aio_operate(context->prefix+oid, comp, &op); } void _finish(CallbackInfo *info) override { Mutex::Locker l(context->state_lock); done = true; context->update_object_version(oid, comp->get_version64()); context->oid_in_use.erase(oid); context->oid_not_in_use.insert(oid); context->kick(); } bool finished() override { return done; } string getType() override { return "RemoveAttrsOp"; } }; class SetAttrsOp : public TestOp { public: string oid; librados::ObjectWriteOperation op; librados::AioCompletion *comp; SetAttrsOp(int n, RadosTestContext *context, const string &oid, TestOpStat *stat) : TestOp(n, context, stat), oid(oid), comp(NULL) {} void _begin() override { ContDesc cont; { Mutex::Locker l(context->state_lock); cont = ContDesc(context->seq_num, context->current_snap, context->seq_num, ""); context->oid_in_use.insert(oid); context->oid_not_in_use.erase(oid); } map omap_contents; map omap; bufferlist header; ContentsGenerator::iterator keygen = context->attr_gen.get_iterator(cont); op.create(false); while (!*keygen) ++keygen; while (*keygen) { if (*keygen != '_') header.append(*keygen); ++keygen; } for (int i = 0; i < 20; ++i) { string key; while (!*keygen) ++keygen; while (*keygen && key.size() < 40) { key.push_back((*keygen % 20) + 'a'); ++keygen; } ContDesc val(cont); val.seqnum += (unsigned)(*keygen); val.prefix = ("oid: " + oid); omap[key] = val; bufferlist val_buffer = context->attr_gen.gen_bl(val); omap_contents[key] = val_buffer; op.setxattr(key.c_str(), val_buffer); } if (!context->no_omap) { op.omap_set_header(header); op.omap_set(omap_contents); } { Mutex::Locker l(context->state_lock); context->update_object_header(oid, header); context->update_object_attrs(oid, omap); } pair *cb_arg = new pair(this, new TestOp::CallbackInfo(0)); comp = context->rados.aio_create_completion((void*) cb_arg, NULL, &write_callback); context->io_ctx.aio_operate(context->prefix+oid, comp, &op); } void _finish(CallbackInfo *info) override { Mutex::Locker l(context->state_lock); int r; if ((r = comp->get_return_value())) { cerr << "err " << r << std::endl; ceph_abort(); } done = true; context->update_object_version(oid, comp->get_version64()); context->oid_in_use.erase(oid); context->oid_not_in_use.insert(oid); context->kick(); } bool finished() override { return done; } string getType() override { return "SetAttrsOp"; } }; class WriteOp : public TestOp { public: string oid; ContDesc cont; set waiting; librados::AioCompletion *rcompletion; uint64_t waiting_on; uint64_t last_acked_tid; librados::ObjectReadOperation read_op; librados::ObjectWriteOperation write_op; bufferlist rbuffer; bool do_append; bool do_excl; WriteOp(int n, RadosTestContext *context, const string &oid, bool do_append, bool do_excl, TestOpStat *stat = 0) : TestOp(n, context, stat), oid(oid), rcompletion(NULL), waiting_on(0), last_acked_tid(0), do_append(do_append), do_excl(do_excl) {} void _begin() override { context->state_lock.Lock(); done = 0; stringstream acc; acc << context->prefix << "OID: " << oid << " snap " << context->current_snap << std::endl; string prefix = acc.str(); cont = ContDesc(context->seq_num, context->current_snap, context->seq_num, prefix); ContentsGenerator *cont_gen; if (do_append) { ObjectDesc old_value; bool found = context->find_object(oid, &old_value); uint64_t prev_length = found && old_value.has_contents() ? old_value.most_recent_gen()->get_length(old_value.most_recent()) : 0; bool requires; int r = context->io_ctx.pool_requires_alignment2(&requires); assert(r == 0); uint64_t alignment = 0; if (requires) { r = context->io_ctx.pool_required_alignment2(&alignment); assert(r == 0); assert(alignment != 0); } cont_gen = new AppendGenerator( prev_length, alignment, context->min_stride_size, context->max_stride_size, 3); } else { cont_gen = new VarLenGenerator( context->max_size, context->min_stride_size, context->max_stride_size); } context->update_object(cont_gen, oid, cont); context->oid_in_use.insert(oid); context->oid_not_in_use.erase(oid); map ranges; cont_gen->get_ranges_map(cont, ranges); std::cout << num << ": seq_num " << context->seq_num << " ranges " << ranges << std::endl; context->seq_num++; waiting_on = ranges.size(); ContentsGenerator::iterator gen_pos = cont_gen->get_iterator(cont); uint64_t tid = 1; for (map::iterator i = ranges.begin(); i != ranges.end(); ++i, ++tid) { gen_pos.seek(i->first); bufferlist to_write = gen_pos.gen_bl_advance(i->second); assert(to_write.length() == i->second); assert(to_write.length() > 0); std::cout << num << ": writing " << context->prefix+oid << " from " << i->first << " to " << i->first + i->second << " tid " << tid << std::endl; pair *cb_arg = new pair(this, new TestOp::CallbackInfo(tid)); librados::AioCompletion *completion = context->rados.aio_create_completion((void*) cb_arg, NULL, &write_callback); waiting.insert(completion); librados::ObjectWriteOperation op; if (do_append) { op.append(to_write); } else { op.write(i->first, to_write); } if (do_excl && tid == 1) op.assert_exists(); context->io_ctx.aio_operate( context->prefix+oid, completion, &op); } bufferlist contbl; ::encode(cont, contbl); pair *cb_arg = new pair( this, new TestOp::CallbackInfo(++tid)); librados::AioCompletion *completion = context->rados.aio_create_completion( (void*) cb_arg, NULL, &write_callback); waiting.insert(completion); waiting_on++; write_op.setxattr("_header", contbl); if (!do_append) { write_op.truncate(cont_gen->get_length(cont)); } context->io_ctx.aio_operate( context->prefix+oid, completion, &write_op); cb_arg = new pair( this, new TestOp::CallbackInfo(++tid)); rcompletion = context->rados.aio_create_completion( (void*) cb_arg, NULL, &write_callback); waiting_on++; read_op.read(0, 1, &rbuffer, 0); context->io_ctx.aio_operate( context->prefix+oid, rcompletion, &read_op, librados::OPERATION_ORDER_READS_WRITES, // order wrt previous write/update 0); context->state_lock.Unlock(); } void _finish(CallbackInfo *info) override { assert(info); context->state_lock.Lock(); uint64_t tid = info->id; cout << num << ": finishing write tid " << tid << " to " << context->prefix + oid << std::endl; if (tid <= last_acked_tid) { cerr << "Error: finished tid " << tid << " when last_acked_tid was " << last_acked_tid << std::endl; ceph_abort(); } last_acked_tid = tid; assert(!done); waiting_on--; if (waiting_on == 0) { uint64_t version = 0; for (set::iterator i = waiting.begin(); i != waiting.end(); ) { assert((*i)->is_complete()); if (int err = (*i)->get_return_value()) { cerr << "Error: oid " << oid << " write returned error code " << err << std::endl; } if ((*i)->get_version64() > version) version = (*i)->get_version64(); (*i)->release(); waiting.erase(i++); } context->update_object_version(oid, version); if (rcompletion->get_version64() != version) { cerr << "Error: racing read on " << oid << " returned version " << rcompletion->get_version64() << " rather than version " << version << std::endl; assert(0 == "racing read got wrong version"); } { ObjectDesc old_value; assert(context->find_object(oid, &old_value, -1)); if (old_value.deleted()) std::cout << num << ": left oid " << oid << " deleted" << std::endl; else std::cout << num << ": left oid " << oid << " " << old_value.most_recent() << std::endl; } rcompletion->release(); context->oid_in_use.erase(oid); context->oid_not_in_use.insert(oid); context->kick(); done = true; } context->state_lock.Unlock(); } bool finished() override { return done; } string getType() override { return "WriteOp"; } }; class WriteSameOp : public TestOp { public: string oid; ContDesc cont; set waiting; librados::AioCompletion *rcompletion; uint64_t waiting_on; uint64_t last_acked_tid; librados::ObjectReadOperation read_op; librados::ObjectWriteOperation write_op; bufferlist rbuffer; WriteSameOp(int n, RadosTestContext *context, const string &oid, TestOpStat *stat = 0) : TestOp(n, context, stat), oid(oid), rcompletion(NULL), waiting_on(0), last_acked_tid(0) {} void _begin() override { context->state_lock.Lock(); done = 0; stringstream acc; acc << context->prefix << "OID: " << oid << " snap " << context->current_snap << std::endl; string prefix = acc.str(); cont = ContDesc(context->seq_num, context->current_snap, context->seq_num, prefix); ContentsGenerator *cont_gen; cont_gen = new VarLenGenerator( context->max_size, context->min_stride_size, context->max_stride_size); context->update_object(cont_gen, oid, cont); context->oid_in_use.insert(oid); context->oid_not_in_use.erase(oid); map ranges; cont_gen->get_ranges_map(cont, ranges); std::cout << num << ": seq_num " << context->seq_num << " ranges " << ranges << std::endl; context->seq_num++; waiting_on = ranges.size(); ContentsGenerator::iterator gen_pos = cont_gen->get_iterator(cont); uint64_t tid = 1; for (map::iterator i = ranges.begin(); i != ranges.end(); ++i, ++tid) { gen_pos.seek(i->first); bufferlist to_write = gen_pos.gen_bl_advance(i->second); assert(to_write.length() == i->second); assert(to_write.length() > 0); std::cout << num << ": writing " << context->prefix+oid << " from " << i->first << " to " << i->first + i->second << " tid " << tid << std::endl; pair *cb_arg = new pair(this, new TestOp::CallbackInfo(tid)); librados::AioCompletion *completion = context->rados.aio_create_completion((void*) cb_arg, NULL, &write_callback); waiting.insert(completion); librados::ObjectWriteOperation op; /* no writesame multiplication factor for now */ op.writesame(i->first, to_write.length(), to_write); context->io_ctx.aio_operate( context->prefix+oid, completion, &op); } bufferlist contbl; ::encode(cont, contbl); pair *cb_arg = new pair( this, new TestOp::CallbackInfo(++tid)); librados::AioCompletion *completion = context->rados.aio_create_completion( (void*) cb_arg, NULL, &write_callback); waiting.insert(completion); waiting_on++; write_op.setxattr("_header", contbl); write_op.truncate(cont_gen->get_length(cont)); context->io_ctx.aio_operate( context->prefix+oid, completion, &write_op); cb_arg = new pair( this, new TestOp::CallbackInfo(++tid)); rcompletion = context->rados.aio_create_completion( (void*) cb_arg, NULL, &write_callback); waiting_on++; read_op.read(0, 1, &rbuffer, 0); context->io_ctx.aio_operate( context->prefix+oid, rcompletion, &read_op, librados::OPERATION_ORDER_READS_WRITES, // order wrt previous write/update 0); context->state_lock.Unlock(); } void _finish(CallbackInfo *info) override { assert(info); context->state_lock.Lock(); uint64_t tid = info->id; cout << num << ": finishing writesame tid " << tid << " to " << context->prefix + oid << std::endl; if (tid <= last_acked_tid) { cerr << "Error: finished tid " << tid << " when last_acked_tid was " << last_acked_tid << std::endl; ceph_abort(); } last_acked_tid = tid; assert(!done); waiting_on--; if (waiting_on == 0) { uint64_t version = 0; for (set::iterator i = waiting.begin(); i != waiting.end(); ) { assert((*i)->is_complete()); if (int err = (*i)->get_return_value()) { cerr << "Error: oid " << oid << " writesame returned error code " << err << std::endl; } if ((*i)->get_version64() > version) version = (*i)->get_version64(); (*i)->release(); waiting.erase(i++); } context->update_object_version(oid, version); if (rcompletion->get_version64() != version) { cerr << "Error: racing read on " << oid << " returned version " << rcompletion->get_version64() << " rather than version " << version << std::endl; assert(0 == "racing read got wrong version"); } { ObjectDesc old_value; assert(context->find_object(oid, &old_value, -1)); if (old_value.deleted()) std::cout << num << ": left oid " << oid << " deleted" << std::endl; else std::cout << num << ": left oid " << oid << " " << old_value.most_recent() << std::endl; } rcompletion->release(); context->oid_in_use.erase(oid); context->oid_not_in_use.insert(oid); context->kick(); done = true; } context->state_lock.Unlock(); } bool finished() override { return done; } string getType() override { return "WriteSameOp"; } }; class DeleteOp : public TestOp { public: string oid; DeleteOp(int n, RadosTestContext *context, const string &oid, TestOpStat *stat = 0) : TestOp(n, context, stat), oid(oid) {} void _begin() override { context->state_lock.Lock(); if (context->get_watch_context(oid)) { context->kick(); context->state_lock.Unlock(); return; } ObjectDesc contents; context->find_object(oid, &contents); bool present = !contents.deleted(); context->oid_in_use.insert(oid); context->oid_not_in_use.erase(oid); context->seq_num++; context->remove_object(oid); interval_set ranges; context->state_lock.Unlock(); int r = 0; if (rand() % 2) { librados::ObjectWriteOperation op; op.assert_exists(); op.remove(); r = context->io_ctx.operate(context->prefix+oid, &op); } else { r = context->io_ctx.remove(context->prefix+oid); } if (r && !(r == -ENOENT && !present)) { cerr << "r is " << r << " while deleting " << oid << " and present is " << present << std::endl; ceph_abort(); } context->state_lock.Lock(); context->oid_in_use.erase(oid); context->oid_not_in_use.insert(oid); context->kick(); context->state_lock.Unlock(); } string getType() override { return "DeleteOp"; } }; class ReadOp : public TestOp { public: vector completions; librados::ObjectReadOperation op; string oid; ObjectDesc old_value; int snap; bool balance_reads; ceph::shared_ptr in_use; vector results; vector retvals; vector> extent_results; vector is_sparse_read; uint64_t waiting_on; vector checksums; vector checksum_retvals; map attrs; int attrretval; set omap_requested_keys; map omap_returned_values; set omap_keys; map omap; bufferlist header; map xattrs; ReadOp(int n, RadosTestContext *context, const string &oid, bool balance_reads, TestOpStat *stat = 0) : TestOp(n, context, stat), completions(3), oid(oid), snap(0), balance_reads(balance_reads), results(3), retvals(3), extent_results(3), is_sparse_read(3, false), waiting_on(0), checksums(3), checksum_retvals(3), attrretval(0) {} void _do_read(librados::ObjectReadOperation& read_op, int index) { uint64_t len = 0; if (old_value.has_contents()) len = old_value.most_recent_gen()->get_length(old_value.most_recent()); if (context->no_sparse || rand() % 2) { is_sparse_read[index] = false; read_op.read(0, len, &results[index], &retvals[index]); bufferlist init_value_bl; ::encode(static_cast(-1), init_value_bl); read_op.checksum(LIBRADOS_CHECKSUM_TYPE_CRC32C, init_value_bl, 0, len, 0, &checksums[index], &checksum_retvals[index]); } else { is_sparse_read[index] = true; read_op.sparse_read(0, len, &extent_results[index], &results[index], &retvals[index]); } } void _begin() override { context->state_lock.Lock(); if (!(rand() % 4) && !context->snaps.empty()) { snap = rand_choose(context->snaps)->first; in_use = context->snaps_in_use.lookup_or_create(snap, snap); } else { snap = -1; } std::cout << num << ": read oid " << oid << " snap " << snap << std::endl; done = 0; for (uint32_t i = 0; i < 3; i++) { completions[i] = context->rados.aio_create_completion((void *) this, &read_callback, 0); } context->oid_in_use.insert(oid); context->oid_not_in_use.erase(oid); assert(context->find_object(oid, &old_value, snap)); if (old_value.deleted()) std::cout << num << ": expect deleted" << std::endl; else std::cout << num << ": expect " << old_value.most_recent() << std::endl; TestWatchContext *ctx = context->get_watch_context(oid); context->state_lock.Unlock(); if (ctx) { assert(old_value.exists); TestAlarm alarm; std::cerr << num << ": about to start" << std::endl; ctx->start(); std::cerr << num << ": started" << std::endl; bufferlist bl; context->io_ctx.set_notify_timeout(600); int r = context->io_ctx.notify2(context->prefix+oid, bl, 0, NULL); if (r < 0) { std::cerr << "r is " << r << std::endl; ceph_abort(); } std::cerr << num << ": notified, waiting" << std::endl; ctx->wait(); } context->state_lock.Lock(); if (snap >= 0) { context->io_ctx.snap_set_read(context->snaps[snap]); } _do_read(op, 0); for (map::iterator i = old_value.attrs.begin(); i != old_value.attrs.end(); ++i) { if (rand() % 2) { string key = i->first; if (rand() % 2) key.push_back((rand() % 26) + 'a'); omap_requested_keys.insert(key); } } if (!context->no_omap) { op.omap_get_vals_by_keys(omap_requested_keys, &omap_returned_values, 0); // NOTE: we're ignore pmore here, which assumes the OSD limit is high // enough for us. op.omap_get_keys2("", -1, &omap_keys, nullptr, nullptr); op.omap_get_vals2("", -1, &omap, nullptr, nullptr); op.omap_get_header(&header, 0); } op.getxattrs(&xattrs, 0); unsigned flags = 0; if (balance_reads) flags |= librados::OPERATION_BALANCE_READS; assert(!context->io_ctx.aio_operate(context->prefix+oid, completions[0], &op, flags, NULL)); waiting_on++; // send 2 pipelined reads on the same object/snap. This can help testing // OSD's read behavior in some scenarios for (uint32_t i = 1; i < 3; ++i) { librados::ObjectReadOperation pipeline_op; _do_read(pipeline_op, i); assert(!context->io_ctx.aio_operate(context->prefix+oid, completions[i], &pipeline_op, 0)); waiting_on++; } if (snap >= 0) { context->io_ctx.snap_set_read(0); } context->state_lock.Unlock(); } void _finish(CallbackInfo *info) override { Mutex::Locker l(context->state_lock); assert(!done); assert(waiting_on > 0); if (--waiting_on) { return; } context->oid_in_use.erase(oid); context->oid_not_in_use.insert(oid); int retval = completions[0]->get_return_value(); for (vector::iterator it = completions.begin(); it != completions.end(); ++it) { assert((*it)->is_complete()); uint64_t version = (*it)->get_version64(); int err = (*it)->get_return_value(); if (err != retval) { cerr << num << ": Error: oid " << oid << " read returned different error codes: " << retval << " and " << err << std::endl; ceph_abort(); } if (err) { if (!(err == -ENOENT && old_value.deleted())) { cerr << num << ": Error: oid " << oid << " read returned error code " << err << std::endl; ceph_abort(); } } else if (version != old_value.version) { cerr << num << ": oid " << oid << " version is " << version << " and expected " << old_value.version << std::endl; assert(version == old_value.version); } } if (!retval) { map::iterator iter = xattrs.find("_header"); bufferlist headerbl; if (iter == xattrs.end()) { if (old_value.has_contents()) { cerr << num << ": Error: did not find header attr, has_contents: " << old_value.has_contents() << std::endl; assert(!old_value.has_contents()); } } else { headerbl = iter->second; xattrs.erase(iter); } if (old_value.deleted()) { std::cout << num << ": expect deleted" << std::endl; assert(0 == "expected deleted"); } else { std::cout << num << ": expect " << old_value.most_recent() << std::endl; } if (old_value.has_contents()) { ContDesc to_check; bufferlist::iterator p = headerbl.begin(); ::decode(to_check, p); if (to_check != old_value.most_recent()) { cerr << num << ": oid " << oid << " found incorrect object contents " << to_check << ", expected " << old_value.most_recent() << std::endl; context->errors++; } for (unsigned i = 0; i < results.size(); i++) { if (is_sparse_read[i]) { if (!old_value.check_sparse(extent_results[i], results[i])) { cerr << num << ": oid " << oid << " contents " << to_check << " corrupt" << std::endl; context->errors++; } } else { if (!old_value.check(results[i])) { cerr << num << ": oid " << oid << " contents " << to_check << " corrupt" << std::endl; context->errors++; } uint32_t checksum = 0; if (checksum_retvals[i] == 0) { try { auto bl_it = checksums[i].begin(); uint32_t csum_count; ::decode(csum_count, bl_it); ::decode(checksum, bl_it); } catch (const buffer::error &err) { checksum_retvals[i] = -EBADMSG; } } if (checksum_retvals[i] != 0 || checksum != results[i].crc32c(-1)) { cerr << num << ": oid " << oid << " checksum " << checksums[i] << " incorrect, expecting " << results[i].crc32c(-1) << std::endl; context->errors++; } } } if (context->errors) ceph_abort(); } // Attributes if (!context->no_omap) { if (!(old_value.header == header)) { cerr << num << ": oid " << oid << " header does not match, old size: " << old_value.header.length() << " new size " << header.length() << std::endl; assert(old_value.header == header); } if (omap.size() != old_value.attrs.size()) { cerr << num << ": oid " << oid << " omap.size() is " << omap.size() << " and old is " << old_value.attrs.size() << std::endl; assert(omap.size() == old_value.attrs.size()); } if (omap_keys.size() != old_value.attrs.size()) { cerr << num << ": oid " << oid << " omap.size() is " << omap_keys.size() << " and old is " << old_value.attrs.size() << std::endl; assert(omap_keys.size() == old_value.attrs.size()); } } if (xattrs.size() != old_value.attrs.size()) { cerr << num << ": oid " << oid << " xattrs.size() is " << xattrs.size() << " and old is " << old_value.attrs.size() << std::endl; assert(xattrs.size() == old_value.attrs.size()); } for (map::iterator iter = old_value.attrs.begin(); iter != old_value.attrs.end(); ++iter) { bufferlist bl = context->attr_gen.gen_bl( iter->second); if (!context->no_omap) { map::iterator omap_iter = omap.find(iter->first); assert(omap_iter != omap.end()); assert(bl.length() == omap_iter->second.length()); bufferlist::iterator k = bl.begin(); for(bufferlist::iterator l = omap_iter->second.begin(); !k.end() && !l.end(); ++k, ++l) { assert(*l == *k); } } map::iterator xattr_iter = xattrs.find(iter->first); assert(xattr_iter != xattrs.end()); assert(bl.length() == xattr_iter->second.length()); bufferlist::iterator k = bl.begin(); for (bufferlist::iterator j = xattr_iter->second.begin(); !k.end() && !j.end(); ++j, ++k) { assert(*j == *k); } } if (!context->no_omap) { for (set::iterator i = omap_requested_keys.begin(); i != omap_requested_keys.end(); ++i) { if (!omap_returned_values.count(*i)) assert(!old_value.attrs.count(*i)); if (!old_value.attrs.count(*i)) assert(!omap_returned_values.count(*i)); } for (map::iterator i = omap_returned_values.begin(); i != omap_returned_values.end(); ++i) { assert(omap_requested_keys.count(i->first)); assert(omap.count(i->first)); assert(old_value.attrs.count(i->first)); assert(i->second == omap[i->first]); } } } for (vector::iterator it = completions.begin(); it != completions.end(); ++it) { (*it)->release(); } context->kick(); done = true; } bool finished() override { return done; } string getType() override { return "ReadOp"; } }; class SnapCreateOp : public TestOp { public: SnapCreateOp(int n, RadosTestContext *context, TestOpStat *stat = 0) : TestOp(n, context, stat) {} void _begin() override { uint64_t snap; string snapname; if (context->pool_snaps) { stringstream ss; ss << context->prefix << "snap" << ++context->snapname_num; snapname = ss.str(); int ret = context->io_ctx.snap_create(snapname.c_str()); if (ret) { cerr << "snap_create returned " << ret << std::endl; ceph_abort(); } assert(!context->io_ctx.snap_lookup(snapname.c_str(), &snap)); } else { assert(!context->io_ctx.selfmanaged_snap_create(&snap)); } context->state_lock.Lock(); context->add_snap(snap); if (context->pool_snaps) { context->state_lock.Unlock(); } else { vector snapset(context->snaps.size()); int j = 0; for (map::reverse_iterator i = context->snaps.rbegin(); i != context->snaps.rend(); ++i, ++j) { snapset[j] = i->second; } context->state_lock.Unlock(); int r = context->io_ctx.selfmanaged_snap_set_write_ctx(context->seq, snapset); if (r) { cerr << "r is " << r << " snapset is " << snapset << " seq is " << context->seq << std::endl; ceph_abort(); } } } string getType() override { return "SnapCreateOp"; } bool must_quiesce_other_ops() override { return context->pool_snaps; } }; class SnapRemoveOp : public TestOp { public: int to_remove; SnapRemoveOp(int n, RadosTestContext *context, int snap, TestOpStat *stat = 0) : TestOp(n, context, stat), to_remove(snap) {} void _begin() override { context->state_lock.Lock(); uint64_t snap = context->snaps[to_remove]; context->remove_snap(to_remove); if (context->pool_snaps) { string snapname; assert(!context->io_ctx.snap_get_name(snap, &snapname)); assert(!context->io_ctx.snap_remove(snapname.c_str())); } else { assert(!context->io_ctx.selfmanaged_snap_remove(snap)); vector snapset(context->snaps.size()); int j = 0; for (map::reverse_iterator i = context->snaps.rbegin(); i != context->snaps.rend(); ++i, ++j) { snapset[j] = i->second; } int r = context->io_ctx.selfmanaged_snap_set_write_ctx(context->seq, snapset); if (r) { cerr << "r is " << r << " snapset is " << snapset << " seq is " << context->seq << std::endl; ceph_abort(); } } context->state_lock.Unlock(); } string getType() override { return "SnapRemoveOp"; } }; class WatchOp : public TestOp { string oid; public: WatchOp(int n, RadosTestContext *context, const string &_oid, TestOpStat *stat = 0) : TestOp(n, context, stat), oid(_oid) {} void _begin() override { context->state_lock.Lock(); ObjectDesc contents; context->find_object(oid, &contents); if (contents.deleted()) { context->kick(); context->state_lock.Unlock(); return; } context->oid_in_use.insert(oid); context->oid_not_in_use.erase(oid); TestWatchContext *ctx = context->get_watch_context(oid); context->state_lock.Unlock(); int r; if (!ctx) { { Mutex::Locker l(context->state_lock); ctx = context->watch(oid); } r = context->io_ctx.watch2(context->prefix+oid, &ctx->get_handle(), ctx); } else { r = context->io_ctx.unwatch2(ctx->get_handle()); { Mutex::Locker l(context->state_lock); context->unwatch(oid); } } if (r) { cerr << "r is " << r << std::endl; ceph_abort(); } { Mutex::Locker l(context->state_lock); context->oid_in_use.erase(oid); context->oid_not_in_use.insert(oid); } } string getType() override { return "WatchOp"; } }; class RollbackOp : public TestOp { public: string oid; int roll_back_to; librados::ObjectWriteOperation zero_write_op1; librados::ObjectWriteOperation zero_write_op2; librados::ObjectWriteOperation op; vector comps; ceph::shared_ptr in_use; int last_finished; int outstanding; RollbackOp(int n, RadosTestContext *context, const string &_oid, TestOpStat *stat = 0) : TestOp(n, context, stat), oid(_oid), roll_back_to(-1), comps(3, NULL), last_finished(-1), outstanding(3) {} void _begin() override { context->state_lock.Lock(); if (context->get_watch_context(oid)) { context->kick(); context->state_lock.Unlock(); return; } if (context->snaps.empty()) { context->kick(); context->state_lock.Unlock(); done = true; return; } context->oid_in_use.insert(oid); context->oid_not_in_use.erase(oid); roll_back_to = rand_choose(context->snaps)->first; in_use = context->snaps_in_use.lookup_or_create( roll_back_to, roll_back_to); cout << "rollback oid " << oid << " to " << roll_back_to << std::endl; bool existed_before = context->object_existed_at(oid); bool existed_after = context->object_existed_at(oid, roll_back_to); context->roll_back(oid, roll_back_to); uint64_t snap = context->snaps[roll_back_to]; outstanding -= (!existed_before) + (!existed_after); context->state_lock.Unlock(); bufferlist bl, bl2; zero_write_op1.append(bl); zero_write_op2.append(bl2); if (context->pool_snaps) { op.snap_rollback(snap); } else { op.selfmanaged_snap_rollback(snap); } if (existed_before) { pair *cb_arg = new pair(this, new TestOp::CallbackInfo(0)); comps[0] = context->rados.aio_create_completion((void*) cb_arg, NULL, &write_callback); context->io_ctx.aio_operate( context->prefix+oid, comps[0], &zero_write_op1); } { pair *cb_arg = new pair(this, new TestOp::CallbackInfo(1)); comps[1] = context->rados.aio_create_completion((void*) cb_arg, NULL, &write_callback); context->io_ctx.aio_operate( context->prefix+oid, comps[1], &op); } if (existed_after) { pair *cb_arg = new pair(this, new TestOp::CallbackInfo(2)); comps[2] = context->rados.aio_create_completion((void*) cb_arg, NULL, &write_callback); context->io_ctx.aio_operate( context->prefix+oid, comps[2], &zero_write_op2); } } void _finish(CallbackInfo *info) override { Mutex::Locker l(context->state_lock); uint64_t tid = info->id; cout << num << ": finishing rollback tid " << tid << " to " << context->prefix + oid << std::endl; assert((int)(info->id) > last_finished); last_finished = info->id; int r; if ((r = comps[last_finished]->get_return_value()) != 0) { cerr << "err " << r << std::endl; ceph_abort(); } if (--outstanding == 0) { done = true; context->update_object_version(oid, comps[tid]->get_version64()); context->oid_in_use.erase(oid); context->oid_not_in_use.insert(oid); in_use = ceph::shared_ptr(); context->kick(); } } bool finished() override { return done; } string getType() override { return "RollBackOp"; } }; class CopyFromOp : public TestOp { public: string oid, oid_src; ObjectDesc src_value; librados::ObjectWriteOperation op; librados::ObjectReadOperation rd_op; librados::AioCompletion *comp; librados::AioCompletion *comp_racing_read; ceph::shared_ptr in_use; int snap; int done; uint64_t version; int r; CopyFromOp(int n, RadosTestContext *context, const string &oid, const string &oid_src, TestOpStat *stat) : TestOp(n, context, stat), oid(oid), oid_src(oid_src), comp(NULL), snap(-1), done(0), version(0), r(0) {} void _begin() override { ContDesc cont; { Mutex::Locker l(context->state_lock); cont = ContDesc(context->seq_num, context->current_snap, context->seq_num, ""); context->oid_in_use.insert(oid); context->oid_not_in_use.erase(oid); context->oid_in_use.insert(oid_src); context->oid_not_in_use.erase(oid_src); // choose source snap if (0 && !(rand() % 4) && !context->snaps.empty()) { snap = rand_choose(context->snaps)->first; in_use = context->snaps_in_use.lookup_or_create(snap, snap); } else { snap = -1; } context->find_object(oid_src, &src_value, snap); if (!src_value.deleted()) context->update_object_full(oid, src_value); } string src = context->prefix+oid_src; op.copy_from(src.c_str(), context->io_ctx, src_value.version); pair *cb_arg = new pair(this, new TestOp::CallbackInfo(0)); comp = context->rados.aio_create_completion((void*) cb_arg, NULL, &write_callback); context->io_ctx.aio_operate(context->prefix+oid, comp, &op); // queue up a racing read, too. pair *read_cb_arg = new pair(this, new TestOp::CallbackInfo(1)); comp_racing_read = context->rados.aio_create_completion((void*) read_cb_arg, NULL, &write_callback); rd_op.stat(NULL, NULL, NULL); context->io_ctx.aio_operate(context->prefix+oid, comp_racing_read, &rd_op, librados::OPERATION_ORDER_READS_WRITES, // order wrt previous write/update NULL); } void _finish(CallbackInfo *info) override { Mutex::Locker l(context->state_lock); // note that the read can (and atm will) come back before the // write reply, but will reflect the update and the versions will // match. if (info->id == 0) { // copy_from assert(comp->is_complete()); cout << num << ": finishing copy_from to " << context->prefix + oid << std::endl; if ((r = comp->get_return_value())) { if (r == -ENOENT && src_value.deleted()) { cout << num << ": got expected ENOENT (src dne)" << std::endl; } else { cerr << "Error: oid " << oid << " copy_from " << oid_src << " returned error code " << r << std::endl; ceph_abort(); } } else { assert(!version || comp->get_version64() == version); version = comp->get_version64(); context->update_object_version(oid, comp->get_version64()); } } else if (info->id == 1) { // racing read assert(comp_racing_read->is_complete()); cout << num << ": finishing copy_from racing read to " << context->prefix + oid << std::endl; if ((r = comp_racing_read->get_return_value())) { if (!(r == -ENOENT && src_value.deleted())) { cerr << "Error: oid " << oid << " copy_from " << oid_src << " returned error code " << r << std::endl; } } else { assert(comp_racing_read->get_return_value() == 0); assert(!version || comp_racing_read->get_version64() == version); version = comp_racing_read->get_version64(); } } if (++done == 2) { context->oid_in_use.erase(oid); context->oid_not_in_use.insert(oid); context->oid_in_use.erase(oid_src); context->oid_not_in_use.insert(oid_src); context->kick(); } } bool finished() override { return done == 2; } string getType() override { return "CopyFromOp"; } }; class SetRedirectOp : public TestOp { public: string oid, oid_tgt, tgt_pool_name; ObjectDesc src_value, tgt_value; librados::ObjectWriteOperation op; librados::ObjectReadOperation rd_op; librados::AioCompletion *comp; ceph::shared_ptr in_use; int done; int r; SetRedirectOp(int n, RadosTestContext *context, const string &oid, const string &oid_tgt, const string &tgt_pool_name, TestOpStat *stat = 0) : TestOp(n, context, stat), oid(oid), oid_tgt(oid_tgt), tgt_pool_name(tgt_pool_name), comp(NULL), done(0), r(0) {} void _begin() override { Mutex::Locker l(context->state_lock); context->oid_in_use.insert(oid); context->oid_not_in_use.erase(oid); context->oid_redirect_in_use.insert(oid_tgt); context->oid_redirect_not_in_use.erase(oid_tgt); context->find_object(oid, &src_value); if(!context->redirect_objs[oid].empty()) { /* update target's user_version */ rd_op.stat(NULL, NULL, NULL); comp = context->rados.aio_create_completion(); context->io_ctx.aio_operate(context->prefix+oid_tgt, comp, &rd_op, librados::OPERATION_ORDER_READS_WRITES, NULL); comp->wait_for_safe(); context->update_object_version(oid_tgt, comp->get_version64()); comp->release(); /* unset redirect target */ comp = context->rados.aio_create_completion(); bool present = !src_value.deleted(); context->remove_object(oid); op.remove(); context->io_ctx.aio_operate(context->prefix+oid, comp, &op, librados::OPERATION_ORDER_READS_WRITES | librados::OPERATION_IGNORE_REDIRECT); comp->wait_for_safe(); if ((r = comp->get_return_value())) { if (!(r == -ENOENT && !present)) { cerr << "r is " << r << " while deleting " << oid << " and present is " << present << std::endl; ceph_abort(); } } comp->release(); context->oid_redirect_not_in_use.insert(context->redirect_objs[oid]); context->oid_redirect_in_use.erase(context->redirect_objs[oid]); /* copy_from oid_tgt --> oid */ comp = context->rados.aio_create_completion(); context->find_object(oid_tgt, &tgt_value); string src = context->prefix+oid_tgt; op.copy_from(src.c_str(), context->io_ctx, tgt_value.version); context->io_ctx.aio_operate(context->prefix+oid, comp, &op, librados::OPERATION_ORDER_READS_WRITES); comp->wait_for_safe(); if ((r = comp->get_return_value())) { cerr << "Error: oid " << oid << " copy_from " << oid_tgt << " returned error code " << r << std::endl; ceph_abort(); } context->update_object_full(oid, tgt_value); context->update_object_version(oid, comp->get_version64()); comp->release(); } comp = context->rados.aio_create_completion(); rd_op.stat(NULL, NULL, NULL); context->io_ctx.aio_operate(context->prefix+oid, comp, &rd_op, librados::OPERATION_ORDER_READS_WRITES | librados::OPERATION_IGNORE_REDIRECT, NULL); comp->wait_for_safe(); if ((r = comp->get_return_value()) && !src_value.deleted()) { cerr << "Error: oid " << oid << " stat returned error code " << r << std::endl; ceph_abort(); } context->update_object_version(oid, comp->get_version64()); comp->release(); context->find_object(oid, &src_value); context->find_object(oid_tgt, &tgt_value); if (!src_value.deleted() && !tgt_value.deleted()) context->update_object_full(oid, tgt_value); if (src_value.version != 0 && !src_value.deleted()) op.assert_version(src_value.version); op.set_redirect(context->prefix+oid_tgt, context->io_ctx, tgt_value.version); pair *cb_arg = new pair(this, new TestOp::CallbackInfo(0)); comp = context->rados.aio_create_completion((void*) cb_arg, NULL, &write_callback); context->io_ctx.aio_operate(context->prefix+oid, comp, &op, librados::OPERATION_ORDER_READS_WRITES); } void _finish(CallbackInfo *info) override { Mutex::Locker l(context->state_lock); if (info->id == 0) { assert(comp->is_complete()); cout << num << ": finishing set_redirect to oid " << oid << std::endl; if ((r = comp->get_return_value())) { if (r == -ENOENT && src_value.deleted()) { cout << num << ": got expected ENOENT (src dne)" << std::endl; } else { cerr << "Error: oid " << oid << " set_redirect " << oid_tgt << " returned error code " << r << std::endl; ceph_abort(); } } else { context->update_object_redirect_target(oid, oid_tgt); context->update_object_version(oid, comp->get_version64()); } } if (++done == 1) { context->oid_in_use.erase(oid); context->oid_not_in_use.insert(oid); context->kick(); } } bool finished() override { return done == 1; } string getType() override { return "SetRedirectOp"; } }; class UnsetRedirectOp : public TestOp { public: string oid; librados::ObjectWriteOperation op; librados::AioCompletion *completion; librados::AioCompletion *comp; UnsetRedirectOp(int n, RadosTestContext *context, const string &oid, TestOpStat *stat = 0) : TestOp(n, context, stat), oid(oid) {} void _begin() override { context->state_lock.Lock(); if (context->get_watch_context(oid)) { context->kick(); context->state_lock.Unlock(); return; } ObjectDesc contents; context->find_object(oid, &contents); bool present = !contents.deleted(); context->oid_in_use.insert(oid); context->oid_not_in_use.erase(oid); context->seq_num++; context->remove_object(oid); context->state_lock.Unlock(); comp = context->rados.aio_create_completion(); op.remove(); context->io_ctx.aio_operate(context->prefix+oid, comp, &op, librados::OPERATION_ORDER_READS_WRITES | librados::OPERATION_IGNORE_REDIRECT); comp->wait_for_safe(); int r = comp->get_return_value(); if (r && !(r == -ENOENT && !present)) { cerr << "r is " << r << " while deleting " << oid << " and present is " << present << std::endl; ceph_abort(); } context->state_lock.Lock(); context->oid_in_use.erase(oid); context->oid_not_in_use.insert(oid); if(!context->redirect_objs[oid].empty()) { context->oid_redirect_not_in_use.insert(context->redirect_objs[oid]); context->oid_redirect_in_use.erase(context->redirect_objs[oid]); context->update_object_redirect_target(oid, string()); } context->kick(); context->state_lock.Unlock(); } string getType() override { return "UnsetRedirectOp"; } }; class HitSetListOp : public TestOp { librados::AioCompletion *comp1, *comp2; uint32_t hash; std::list< std::pair > ls; bufferlist bl; public: HitSetListOp(int n, RadosTestContext *context, uint32_t hash, TestOpStat *stat = 0) : TestOp(n, context, stat), comp1(NULL), comp2(NULL), hash(hash) {} void _begin() override { pair *cb_arg = new pair(this, new TestOp::CallbackInfo(0)); comp1 = context->rados.aio_create_completion((void*) cb_arg, NULL, &write_callback); int r = context->io_ctx.hit_set_list(hash, comp1, &ls); assert(r == 0); } void _finish(CallbackInfo *info) override { Mutex::Locker l(context->state_lock); if (!comp2) { if (ls.empty()) { cerr << num << ": no hitsets" << std::endl; done = true; } else { cerr << num << ": hitsets are " << ls << std::endl; int r = rand() % ls.size(); std::list >::iterator p = ls.begin(); while (r--) ++p; pair *cb_arg = new pair(this, new TestOp::CallbackInfo(0)); comp2 = context->rados.aio_create_completion((void*) cb_arg, NULL, &write_callback); r = context->io_ctx.hit_set_get(hash, comp2, p->second, &bl); assert(r == 0); } } else { int r = comp2->get_return_value(); if (r == 0) { HitSet hitset; bufferlist::iterator p = bl.begin(); ::decode(hitset, p); cout << num << ": got hitset of type " << hitset.get_type_name() << " size " << bl.length() << std::endl; } else { // FIXME: we could verify that we did in fact race with a trim... assert(r == -ENOENT); } done = true; } context->kick(); } bool finished() override { return done; } string getType() override { return "HitSetListOp"; } }; class UndirtyOp : public TestOp { public: librados::AioCompletion *completion; librados::ObjectWriteOperation op; string oid; UndirtyOp(int n, RadosTestContext *context, const string &oid, TestOpStat *stat = 0) : TestOp(n, context, stat), completion(NULL), oid(oid) {} void _begin() override { context->state_lock.Lock(); pair *cb_arg = new pair(this, new TestOp::CallbackInfo(0)); completion = context->rados.aio_create_completion((void *) cb_arg, NULL, &write_callback); context->oid_in_use.insert(oid); context->oid_not_in_use.erase(oid); context->update_object_undirty(oid); context->state_lock.Unlock(); op.undirty(); int r = context->io_ctx.aio_operate(context->prefix+oid, completion, &op, 0); assert(!r); } void _finish(CallbackInfo *info) override { context->state_lock.Lock(); assert(!done); assert(completion->is_complete()); context->oid_in_use.erase(oid); context->oid_not_in_use.insert(oid); context->update_object_version(oid, completion->get_version64()); context->kick(); done = true; context->state_lock.Unlock(); } bool finished() override { return done; } string getType() override { return "UndirtyOp"; } }; class IsDirtyOp : public TestOp { public: librados::AioCompletion *completion; librados::ObjectReadOperation op; string oid; bool dirty; ObjectDesc old_value; int snap; ceph::shared_ptr in_use; IsDirtyOp(int n, RadosTestContext *context, const string &oid, TestOpStat *stat = 0) : TestOp(n, context, stat), completion(NULL), oid(oid), dirty(false) {} void _begin() override { context->state_lock.Lock(); if (!(rand() % 4) && !context->snaps.empty()) { snap = rand_choose(context->snaps)->first; in_use = context->snaps_in_use.lookup_or_create(snap, snap); } else { snap = -1; } std::cout << num << ": is_dirty oid " << oid << " snap " << snap << std::endl; pair *cb_arg = new pair(this, new TestOp::CallbackInfo(0)); completion = context->rados.aio_create_completion((void *) cb_arg, NULL, &write_callback); context->oid_in_use.insert(oid); context->oid_not_in_use.erase(oid); context->state_lock.Unlock(); if (snap >= 0) { context->io_ctx.snap_set_read(context->snaps[snap]); } op.is_dirty(&dirty, NULL); int r = context->io_ctx.aio_operate(context->prefix+oid, completion, &op, 0); assert(!r); if (snap >= 0) { context->io_ctx.snap_set_read(0); } } void _finish(CallbackInfo *info) override { context->state_lock.Lock(); assert(!done); assert(completion->is_complete()); context->oid_in_use.erase(oid); context->oid_not_in_use.insert(oid); assert(context->find_object(oid, &old_value, snap)); int r = completion->get_return_value(); if (r == 0) { cout << num << ": " << (dirty ? "dirty" : "clean") << std::endl; assert(!old_value.deleted()); assert(dirty == old_value.dirty); } else { cout << num << ": got " << r << std::endl; assert(r == -ENOENT); assert(old_value.deleted()); } context->kick(); done = true; context->state_lock.Unlock(); } bool finished() override { return done; } string getType() override { return "IsDirtyOp"; } }; class CacheFlushOp : public TestOp { public: librados::AioCompletion *completion; librados::ObjectReadOperation op; string oid; bool blocking; int snap; bool can_fail; ceph::shared_ptr in_use; CacheFlushOp(int n, RadosTestContext *context, const string &oid, TestOpStat *stat, bool b) : TestOp(n, context, stat), completion(NULL), oid(oid), blocking(b), snap(0), can_fail(false) {} void _begin() override { context->state_lock.Lock(); if (!(rand() % 4) && !context->snaps.empty()) { snap = rand_choose(context->snaps)->first; in_use = context->snaps_in_use.lookup_or_create(snap, snap); } else { snap = -1; } // not being particularly specific here about knowing which // flushes are on the oldest clean snap and which ones are not. can_fail = !blocking || !context->snaps.empty(); // FIXME: we could fail if we've ever removed a snap due to // the async snap trimming. can_fail = true; cout << num << ": " << (blocking ? "cache_flush" : "cache_try_flush") << " oid " << oid << " snap " << snap << std::endl; if (snap >= 0) { context->io_ctx.snap_set_read(context->snaps[snap]); } pair *cb_arg = new pair(this, new TestOp::CallbackInfo(0)); completion = context->rados.aio_create_completion((void *) cb_arg, NULL, &write_callback); context->oid_flushing.insert(oid); context->oid_not_flushing.erase(oid); context->state_lock.Unlock(); unsigned flags = librados::OPERATION_IGNORE_CACHE; if (blocking) { op.cache_flush(); } else { op.cache_try_flush(); flags = librados::OPERATION_SKIPRWLOCKS; } int r = context->io_ctx.aio_operate(context->prefix+oid, completion, &op, flags, NULL); assert(!r); if (snap >= 0) { context->io_ctx.snap_set_read(0); } } void _finish(CallbackInfo *info) override { context->state_lock.Lock(); assert(!done); assert(completion->is_complete()); context->oid_flushing.erase(oid); context->oid_not_flushing.insert(oid); int r = completion->get_return_value(); cout << num << ": got " << cpp_strerror(r) << std::endl; if (r == 0) { context->update_object_version(oid, 0, snap); } else if (r == -EBUSY) { assert(can_fail); } else if (r == -EINVAL) { // caching not enabled? } else if (r == -ENOENT) { // may have raced with a remove? } else { assert(0 == "shouldn't happen"); } context->kick(); done = true; context->state_lock.Unlock(); } bool finished() override { return done; } string getType() override { return "CacheFlushOp"; } }; class CacheEvictOp : public TestOp { public: librados::AioCompletion *completion; librados::ObjectReadOperation op; string oid; ceph::shared_ptr in_use; CacheEvictOp(int n, RadosTestContext *context, const string &oid, TestOpStat *stat) : TestOp(n, context, stat), completion(NULL), oid(oid) {} void _begin() override { context->state_lock.Lock(); int snap; if (!(rand() % 4) && !context->snaps.empty()) { snap = rand_choose(context->snaps)->first; in_use = context->snaps_in_use.lookup_or_create(snap, snap); } else { snap = -1; } cout << num << ": cache_evict oid " << oid << " snap " << snap << std::endl; if (snap >= 0) { context->io_ctx.snap_set_read(context->snaps[snap]); } pair *cb_arg = new pair(this, new TestOp::CallbackInfo(0)); completion = context->rados.aio_create_completion((void *) cb_arg, NULL, &write_callback); context->state_lock.Unlock(); op.cache_evict(); int r = context->io_ctx.aio_operate(context->prefix+oid, completion, &op, librados::OPERATION_IGNORE_CACHE, NULL); assert(!r); if (snap >= 0) { context->io_ctx.snap_set_read(0); } } void _finish(CallbackInfo *info) override { context->state_lock.Lock(); assert(!done); assert(completion->is_complete()); int r = completion->get_return_value(); cout << num << ": got " << cpp_strerror(r) << std::endl; if (r == 0) { // yay! } else if (r == -EBUSY) { // raced with something that dirtied the object } else if (r == -EINVAL) { // caching not enabled? } else if (r == -ENOENT) { // may have raced with a remove? } else { assert(0 == "shouldn't happen"); } context->kick(); done = true; context->state_lock.Unlock(); } bool finished() override { return done; } string getType() override { return "CacheEvictOp"; } }; #endif