// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2012 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include #include "IoCtxImpl.h" #include "librados/AioCompletionImpl.h" #include "librados/PoolAsyncCompletionImpl.h" #include "librados/RadosClient.h" #include "include/assert.h" #include "common/valgrind.h" #include "common/EventTrace.h" #define dout_subsys ceph_subsys_rados #undef dout_prefix #define dout_prefix *_dout << "librados: " namespace librados { namespace { struct C_notify_Finish : public Context { CephContext *cct; Context *ctx; Objecter *objecter; Objecter::LingerOp *linger_op; bufferlist reply_bl; bufferlist *preply_bl; char **preply_buf; size_t *preply_buf_len; C_notify_Finish(CephContext *_cct, Context *_ctx, Objecter *_objecter, Objecter::LingerOp *_linger_op, bufferlist *_preply_bl, char **_preply_buf, size_t *_preply_buf_len) : cct(_cct), ctx(_ctx), objecter(_objecter), linger_op(_linger_op), preply_bl(_preply_bl), preply_buf(_preply_buf), preply_buf_len(_preply_buf_len) { linger_op->on_notify_finish = this; linger_op->notify_result_bl = &reply_bl; } void finish(int r) override { ldout(cct, 10) << __func__ << " completed notify (linger op " << linger_op << "), r = " << r << dendl; // pass result back to user // NOTE: we do this regardless of what error code we return if (preply_buf) { if (reply_bl.length()) { *preply_buf = (char*)malloc(reply_bl.length()); memcpy(*preply_buf, reply_bl.c_str(), reply_bl.length()); } else { *preply_buf = NULL; } } if (preply_buf_len) *preply_buf_len = reply_bl.length(); if (preply_bl) preply_bl->claim(reply_bl); ctx->complete(r); } }; struct C_aio_linger_cancel : public Context { Objecter *objecter; Objecter::LingerOp *linger_op; C_aio_linger_cancel(Objecter *_objecter, Objecter::LingerOp *_linger_op) : objecter(_objecter), linger_op(_linger_op) { } void finish(int r) override { objecter->linger_cancel(linger_op); } }; struct C_aio_linger_Complete : public Context { AioCompletionImpl *c; Objecter::LingerOp *linger_op; bool cancel; C_aio_linger_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op, bool _cancel) : c(_c), linger_op(_linger_op), cancel(_cancel) { c->get(); } void finish(int r) override { if (cancel || r < 0) c->io->client->finisher.queue(new C_aio_linger_cancel(c->io->objecter, linger_op)); c->lock.Lock(); c->rval = r; c->complete = true; c->cond.Signal(); if (c->callback_complete || c->callback_safe) { c->io->client->finisher.queue(new C_AioComplete(c)); } c->put_unlock(); } }; struct C_aio_notify_Complete : public C_aio_linger_Complete { Mutex lock; bool acked = false; bool finished = false; int ret_val = 0; C_aio_notify_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op) : C_aio_linger_Complete(_c, _linger_op, false), lock("C_aio_notify_Complete::lock") { } void handle_ack(int r) { // invoked by C_aio_notify_Ack lock.Lock(); acked = true; complete_unlock(r); } void complete(int r) override { // invoked by C_notify_Finish (or C_aio_notify_Ack on failure) lock.Lock(); finished = true; complete_unlock(r); } void complete_unlock(int r) { if (ret_val == 0 && r < 0) { ret_val = r; } if (acked && finished) { lock.Unlock(); cancel = true; C_aio_linger_Complete::complete(ret_val); } else { lock.Unlock(); } } }; struct C_aio_notify_Ack : public Context { CephContext *cct; C_notify_Finish *onfinish; C_aio_notify_Complete *oncomplete; C_aio_notify_Ack(CephContext *_cct, C_notify_Finish *_onfinish, C_aio_notify_Complete *_oncomplete) : cct(_cct), onfinish(_onfinish), oncomplete(_oncomplete) { } void finish(int r) override { ldout(cct, 10) << __func__ << " linger op " << oncomplete->linger_op << " " << "acked (" << r << ")" << dendl; oncomplete->handle_ack(r); if (r < 0) { // on failure, we won't expect to see a notify_finish callback onfinish->complete(r); } } }; struct C_aio_selfmanaged_snap_op_Complete : public Context { librados::RadosClient *client; librados::AioCompletionImpl *c; C_aio_selfmanaged_snap_op_Complete(librados::RadosClient *client, librados::AioCompletionImpl *c) : client(client), c(c) { c->get(); } void finish(int r) override { c->lock.Lock(); c->rval = r; c->complete = true; c->cond.Signal(); if (c->callback_complete || c->callback_safe) { client->finisher.queue(new librados::C_AioComplete(c)); } c->put_unlock(); } }; struct C_aio_selfmanaged_snap_create_Complete : public C_aio_selfmanaged_snap_op_Complete { snapid_t snapid; uint64_t *dest_snapid; C_aio_selfmanaged_snap_create_Complete(librados::RadosClient *client, librados::AioCompletionImpl *c, uint64_t *dest_snapid) : C_aio_selfmanaged_snap_op_Complete(client, c), dest_snapid(dest_snapid) { } void finish(int r) override { if (r >= 0) { *dest_snapid = snapid; } C_aio_selfmanaged_snap_op_Complete::finish(r); } }; } // anonymous namespace } // namespace librados librados::IoCtxImpl::IoCtxImpl() : ref_cnt(0), client(NULL), poolid(0), assert_ver(0), last_objver(0), notify_timeout(30), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"), aio_write_seq(0), objecter(NULL) { } librados::IoCtxImpl::IoCtxImpl(RadosClient *c, Objecter *objecter, int64_t poolid, snapid_t s) : ref_cnt(0), client(c), poolid(poolid), snap_seq(s), assert_ver(0), last_objver(0), notify_timeout(c->cct->_conf->client_notify_timeout), oloc(poolid), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"), aio_write_seq(0), objecter(objecter) { } void librados::IoCtxImpl::set_snap_read(snapid_t s) { if (!s) s = CEPH_NOSNAP; ldout(client->cct, 10) << "set snap read " << snap_seq << " -> " << s << dendl; snap_seq = s; } int librados::IoCtxImpl::set_snap_write_context(snapid_t seq, vector& snaps) { ::SnapContext n; ldout(client->cct, 10) << "set snap write context: seq = " << seq << " and snaps = " << snaps << dendl; n.seq = seq; n.snaps = snaps; if (!n.is_valid()) return -EINVAL; snapc = n; return 0; } int librados::IoCtxImpl::get_object_hash_position( const std::string& oid, uint32_t *hash_position) { int64_t r = objecter->get_object_hash_position(poolid, oid, oloc.nspace); if (r < 0) return r; *hash_position = (uint32_t)r; return 0; } int librados::IoCtxImpl::get_object_pg_hash_position( const std::string& oid, uint32_t *pg_hash_position) { int64_t r = objecter->get_object_pg_hash_position(poolid, oid, oloc.nspace); if (r < 0) return r; *pg_hash_position = (uint32_t)r; return 0; } void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl *c) { get(); aio_write_list_lock.Lock(); assert(c->io == this); c->aio_write_seq = ++aio_write_seq; ldout(client->cct, 20) << "queue_aio_write " << this << " completion " << c << " write_seq " << aio_write_seq << dendl; aio_write_list.push_back(&c->aio_write_list_item); aio_write_list_lock.Unlock(); } void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c) { ldout(client->cct, 20) << "complete_aio_write " << c << dendl; aio_write_list_lock.Lock(); assert(c->io == this); c->aio_write_list_item.remove_myself(); map >::iterator waiters = aio_write_waiters.begin(); while (waiters != aio_write_waiters.end()) { if (!aio_write_list.empty() && aio_write_list.front()->aio_write_seq <= waiters->first) { ldout(client->cct, 20) << " next outstanding write is " << aio_write_list.front()->aio_write_seq << " <= waiter " << waiters->first << ", stopping" << dendl; break; } ldout(client->cct, 20) << " waking waiters on seq " << waiters->first << dendl; for (std::list::iterator it = waiters->second.begin(); it != waiters->second.end(); ++it) { client->finisher.queue(new C_AioCompleteAndSafe(*it)); (*it)->put(); } aio_write_waiters.erase(waiters++); } aio_write_cond.Signal(); aio_write_list_lock.Unlock(); put(); } void librados::IoCtxImpl::flush_aio_writes_async(AioCompletionImpl *c) { ldout(client->cct, 20) << "flush_aio_writes_async " << this << " completion " << c << dendl; Mutex::Locker l(aio_write_list_lock); ceph_tid_t seq = aio_write_seq; if (aio_write_list.empty()) { ldout(client->cct, 20) << "flush_aio_writes_async no writes. (tid " << seq << ")" << dendl; client->finisher.queue(new C_AioCompleteAndSafe(c)); } else { ldout(client->cct, 20) << "flush_aio_writes_async " << aio_write_list.size() << " writes in flight; waiting on tid " << seq << dendl; c->get(); aio_write_waiters[seq].push_back(c); } } void librados::IoCtxImpl::flush_aio_writes() { ldout(client->cct, 20) << "flush_aio_writes" << dendl; aio_write_list_lock.Lock(); ceph_tid_t seq = aio_write_seq; while (!aio_write_list.empty() && aio_write_list.front()->aio_write_seq <= seq) aio_write_cond.Wait(aio_write_list_lock); aio_write_list_lock.Unlock(); } string librados::IoCtxImpl::get_cached_pool_name() { std::string pn; client->pool_get_name(get_id(), &pn); return pn; } // SNAPS int librados::IoCtxImpl::snap_create(const char *snapName) { int reply; string sName(snapName); Mutex mylock ("IoCtxImpl::snap_create::mylock"); Cond cond; bool done; Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply); reply = objecter->create_pool_snap(poolid, sName, onfinish); if (reply < 0) { delete onfinish; } else { mylock.Lock(); while (!done) cond.Wait(mylock); mylock.Unlock(); } return reply; } int librados::IoCtxImpl::selfmanaged_snap_create(uint64_t *psnapid) { int reply; Mutex mylock("IoCtxImpl::selfmanaged_snap_create::mylock"); Cond cond; bool done; Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply); snapid_t snapid; reply = objecter->allocate_selfmanaged_snap(poolid, &snapid, onfinish); if (reply < 0) { delete onfinish; } else { mylock.Lock(); while (!done) cond.Wait(mylock); mylock.Unlock(); if (reply == 0) *psnapid = snapid; } return reply; } void librados::IoCtxImpl::aio_selfmanaged_snap_create(uint64_t *snapid, AioCompletionImpl *c) { C_aio_selfmanaged_snap_create_Complete *onfinish = new C_aio_selfmanaged_snap_create_Complete(client, c, snapid); int r = objecter->allocate_selfmanaged_snap(poolid, &onfinish->snapid, onfinish); if (r < 0) { onfinish->complete(r); } } int librados::IoCtxImpl::snap_remove(const char *snapName) { int reply; string sName(snapName); Mutex mylock ("IoCtxImpl::snap_remove::mylock"); Cond cond; bool done; Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply); reply = objecter->delete_pool_snap(poolid, sName, onfinish); if (reply < 0) { delete onfinish; } else { mylock.Lock(); while(!done) cond.Wait(mylock); mylock.Unlock(); } return reply; } int librados::IoCtxImpl::selfmanaged_snap_rollback_object(const object_t& oid, ::SnapContext& snapc, uint64_t snapid) { int reply; Mutex mylock("IoCtxImpl::snap_rollback::mylock"); Cond cond; bool done; Context *onack = new C_SafeCond(&mylock, &cond, &done, &reply); ::ObjectOperation op; prepare_assert_ops(&op); op.rollback(snapid); objecter->mutate(oid, oloc, op, snapc, ceph::real_clock::now(), 0, onack, NULL); mylock.Lock(); while (!done) cond.Wait(mylock); mylock.Unlock(); return reply; } int librados::IoCtxImpl::rollback(const object_t& oid, const char *snapName) { snapid_t snap; int r = objecter->pool_snap_by_name(poolid, snapName, &snap); if (r < 0) { return r; } return selfmanaged_snap_rollback_object(oid, snapc, snap); } int librados::IoCtxImpl::selfmanaged_snap_remove(uint64_t snapid) { int reply; Mutex mylock("IoCtxImpl::selfmanaged_snap_remove::mylock"); Cond cond; bool done; objecter->delete_selfmanaged_snap(poolid, snapid_t(snapid), new C_SafeCond(&mylock, &cond, &done, &reply)); mylock.Lock(); while (!done) cond.Wait(mylock); mylock.Unlock(); return (int)reply; } void librados::IoCtxImpl::aio_selfmanaged_snap_remove(uint64_t snapid, AioCompletionImpl *c) { Context *onfinish = new C_aio_selfmanaged_snap_op_Complete(client, c); objecter->delete_selfmanaged_snap(poolid, snapid, onfinish); } int librados::IoCtxImpl::pool_change_auid(unsigned long long auid) { int reply; Mutex mylock("IoCtxImpl::pool_change_auid::mylock"); Cond cond; bool done; objecter->change_pool_auid(poolid, new C_SafeCond(&mylock, &cond, &done, &reply), auid); mylock.Lock(); while (!done) cond.Wait(mylock); mylock.Unlock(); return reply; } int librados::IoCtxImpl::pool_change_auid_async(unsigned long long auid, PoolAsyncCompletionImpl *c) { objecter->change_pool_auid(poolid, new C_PoolAsync_Safe(c), auid); return 0; } int librados::IoCtxImpl::snap_list(vector *snaps) { return objecter->pool_snap_list(poolid, snaps); } int librados::IoCtxImpl::snap_lookup(const char *name, uint64_t *snapid) { return objecter->pool_snap_by_name(poolid, name, (snapid_t *)snapid); } int librados::IoCtxImpl::snap_get_name(uint64_t snapid, std::string *s) { pool_snap_info_t info; int ret = objecter->pool_snap_get_info(poolid, snapid, &info); if (ret < 0) { return ret; } *s = info.name.c_str(); return 0; } int librados::IoCtxImpl::snap_get_stamp(uint64_t snapid, time_t *t) { pool_snap_info_t info; int ret = objecter->pool_snap_get_info(poolid, snapid, &info); if (ret < 0) { return ret; } *t = info.stamp.sec(); return 0; } // IO int librados::IoCtxImpl::nlist(Objecter::NListContext *context, int max_entries) { Cond cond; bool done; int r = 0; Mutex mylock("IoCtxImpl::nlist::mylock"); if (context->at_end()) return 0; context->max_entries = max_entries; context->nspace = oloc.nspace; objecter->list_nobjects(context, new C_SafeCond(&mylock, &cond, &done, &r)); mylock.Lock(); while(!done) cond.Wait(mylock); mylock.Unlock(); return r; } uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext *context, uint32_t pos) { context->list.clear(); return objecter->list_nobjects_seek(context, pos); } uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext *context, const rados_object_list_cursor& cursor) { context->list.clear(); return objecter->list_nobjects_seek(context, *(const hobject_t *)cursor); } rados_object_list_cursor librados::IoCtxImpl::nlist_get_cursor(Objecter::NListContext *context) { hobject_t *c = new hobject_t; objecter->list_nobjects_get_cursor(context, c); return (rados_object_list_cursor)c; } int librados::IoCtxImpl::create(const object_t& oid, bool exclusive) { ::ObjectOperation op; prepare_assert_ops(&op); op.create(exclusive); return operate(oid, &op, NULL); } /* * add any version assert operations that are appropriate given the * stat in the IoCtx, either the target version assert or any src * object asserts. these affect a single ioctx operation, so clear * the ioctx state when we're doing. * * return a pointer to the ObjectOperation if we added any events; * this is convenient for passing the extra_ops argument into Objecter * methods. */ ::ObjectOperation *librados::IoCtxImpl::prepare_assert_ops(::ObjectOperation *op) { ::ObjectOperation *pop = NULL; if (assert_ver) { op->assert_version(assert_ver); assert_ver = 0; pop = op; } return pop; } int librados::IoCtxImpl::write(const object_t& oid, bufferlist& bl, size_t len, uint64_t off) { if (len > UINT_MAX/2) return -E2BIG; ::ObjectOperation op; prepare_assert_ops(&op); bufferlist mybl; mybl.substr_of(bl, 0, len); op.write(off, mybl); return operate(oid, &op, NULL); } int librados::IoCtxImpl::append(const object_t& oid, bufferlist& bl, size_t len) { if (len > UINT_MAX/2) return -E2BIG; ::ObjectOperation op; prepare_assert_ops(&op); bufferlist mybl; mybl.substr_of(bl, 0, len); op.append(mybl); return operate(oid, &op, NULL); } int librados::IoCtxImpl::write_full(const object_t& oid, bufferlist& bl) { if (bl.length() > UINT_MAX/2) return -E2BIG; ::ObjectOperation op; prepare_assert_ops(&op); op.write_full(bl); return operate(oid, &op, NULL); } int librados::IoCtxImpl::writesame(const object_t& oid, bufferlist& bl, size_t write_len, uint64_t off) { if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2)) return -E2BIG; if ((bl.length() == 0) || (write_len % bl.length())) return -EINVAL; ::ObjectOperation op; prepare_assert_ops(&op); bufferlist mybl; mybl.substr_of(bl, 0, bl.length()); op.writesame(off, write_len, mybl); return operate(oid, &op, NULL); } int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o, ceph::real_time *pmtime, int flags) { ceph::real_time ut = (pmtime ? *pmtime : ceph::real_clock::now()); /* can't write to a snapshot */ if (snap_seq != CEPH_NOSNAP) return -EROFS; if (!o->size()) return 0; Mutex mylock("IoCtxImpl::operate::mylock"); Cond cond; bool done; int r; version_t ver; Context *oncommit = new C_SafeCond(&mylock, &cond, &done, &r); int op = o->ops[0].op.op; ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid << " nspace=" << oloc.nspace << dendl; Objecter::Op *objecter_op = objecter->prepare_mutate_op(oid, oloc, *o, snapc, ut, flags, oncommit, &ver); objecter->op_submit(objecter_op); mylock.Lock(); while (!done) cond.Wait(mylock); mylock.Unlock(); ldout(client->cct, 10) << "Objecter returned from " << ceph_osd_op_name(op) << " r=" << r << dendl; set_sync_op_version(ver); return r; } int librados::IoCtxImpl::operate_read(const object_t& oid, ::ObjectOperation *o, bufferlist *pbl, int flags) { if (!o->size()) return 0; Mutex mylock("IoCtxImpl::operate_read::mylock"); Cond cond; bool done; int r; version_t ver; Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); int op = o->ops[0].op.op; ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid << " nspace=" << oloc.nspace << dendl; Objecter::Op *objecter_op = objecter->prepare_read_op(oid, oloc, *o, snap_seq, pbl, flags, onack, &ver); objecter->op_submit(objecter_op); mylock.Lock(); while (!done) cond.Wait(mylock); mylock.Unlock(); ldout(client->cct, 10) << "Objecter returned from " << ceph_osd_op_name(op) << " r=" << r << dendl; set_sync_op_version(ver); return r; } int librados::IoCtxImpl::aio_operate_read(const object_t &oid, ::ObjectOperation *o, AioCompletionImpl *c, int flags, bufferlist *pbl, const blkin_trace_info *trace_info) { FUNCTRACE(); Context *oncomplete = new C_aio_Complete(c); #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) ((C_aio_Complete *) oncomplete)->oid = oid; #endif c->is_read = true; c->io = this; ZTracer::Trace trace; if (trace_info) { ZTracer::Trace parent_trace("", nullptr, trace_info); trace.init("rados operate read", &objecter->trace_endpoint, &parent_trace); } trace.event("init root span"); Objecter::Op *objecter_op = objecter->prepare_read_op(oid, oloc, *o, snap_seq, pbl, flags, oncomplete, &c->objver, nullptr, 0, &trace); objecter->op_submit(objecter_op, &c->tid); trace.event("rados operate read submitted"); return 0; } int librados::IoCtxImpl::aio_operate(const object_t& oid, ::ObjectOperation *o, AioCompletionImpl *c, const SnapContext& snap_context, int flags, const blkin_trace_info *trace_info) { FUNCTRACE(); OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN"); auto ut = ceph::real_clock::now(); /* can't write to a snapshot */ if (snap_seq != CEPH_NOSNAP) return -EROFS; Context *oncomplete = new C_aio_Complete(c); #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) ((C_aio_Complete *) oncomplete)->oid = oid; #endif c->io = this; queue_aio_write(c); ZTracer::Trace trace; if (trace_info) { ZTracer::Trace parent_trace("", nullptr, trace_info); trace.init("rados operate", &objecter->trace_endpoint, &parent_trace); } trace.event("init root span"); Objecter::Op *op = objecter->prepare_mutate_op( oid, oloc, *o, snap_context, ut, flags, oncomplete, &c->objver, osd_reqid_t(), &trace); objecter->op_submit(op, &c->tid); trace.event("rados operate op submitted"); return 0; } int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c, bufferlist *pbl, size_t len, uint64_t off, uint64_t snapid, const blkin_trace_info *info) { FUNCTRACE(); if (len > (size_t) INT_MAX) return -EDOM; OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN"); Context *oncomplete = new C_aio_Complete(c); #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) ((C_aio_Complete *) oncomplete)->oid = oid; #endif c->is_read = true; c->io = this; c->blp = pbl; ZTracer::Trace trace; if (info) trace.init("rados read", &objecter->trace_endpoint, info); Objecter::Op *o = objecter->prepare_read_op( oid, oloc, off, len, snapid, pbl, 0, oncomplete, &c->objver, nullptr, 0, &trace); objecter->op_submit(o, &c->tid); return 0; } int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c, char *buf, size_t len, uint64_t off, uint64_t snapid, const blkin_trace_info *info) { FUNCTRACE(); if (len > (size_t) INT_MAX) return -EDOM; OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN"); Context *oncomplete = new C_aio_Complete(c); #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) ((C_aio_Complete *) oncomplete)->oid = oid; #endif c->is_read = true; c->io = this; c->bl.clear(); c->bl.push_back(buffer::create_static(len, buf)); c->blp = &c->bl; c->out_buf = buf; ZTracer::Trace trace; if (info) trace.init("rados read", &objecter->trace_endpoint, info); Objecter::Op *o = objecter->prepare_read_op( oid, oloc, off, len, snapid, &c->bl, 0, oncomplete, &c->objver, nullptr, 0, &trace); objecter->op_submit(o, &c->tid); return 0; } class C_ObjectOperation : public Context { public: ::ObjectOperation m_ops; explicit C_ObjectOperation(Context *c) : m_ctx(c) {} void finish(int r) override { m_ctx->complete(r); } private: Context *m_ctx; }; int librados::IoCtxImpl::aio_sparse_read(const object_t oid, AioCompletionImpl *c, std::map *m, bufferlist *data_bl, size_t len, uint64_t off, uint64_t snapid) { FUNCTRACE(); if (len > (size_t) INT_MAX) return -EDOM; Context *nested = new C_aio_Complete(c); C_ObjectOperation *onack = new C_ObjectOperation(nested); #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) ((C_aio_Complete *) nested)->oid = oid; #endif c->is_read = true; c->io = this; onack->m_ops.sparse_read(off, len, m, data_bl, NULL); Objecter::Op *o = objecter->prepare_read_op( oid, oloc, onack->m_ops, snapid, NULL, 0, onack, &c->objver); objecter->op_submit(o, &c->tid); return 0; } int librados::IoCtxImpl::aio_cmpext(const object_t& oid, AioCompletionImpl *c, uint64_t off, bufferlist& cmp_bl) { if (cmp_bl.length() > UINT_MAX/2) return -E2BIG; Context *onack = new C_aio_Complete(c); c->is_read = true; c->io = this; Objecter::Op *o = objecter->prepare_cmpext_op( oid, oloc, off, cmp_bl, snap_seq, 0, onack, &c->objver); objecter->op_submit(o, &c->tid); return 0; } /* use m_ops.cmpext() + prepare_read_op() for non-bufferlist C API */ int librados::IoCtxImpl::aio_cmpext(const object_t& oid, AioCompletionImpl *c, const char *cmp_buf, size_t cmp_len, uint64_t off) { if (cmp_len > UINT_MAX/2) return -E2BIG; bufferlist cmp_bl; cmp_bl.append(cmp_buf, cmp_len); Context *nested = new C_aio_Complete(c); C_ObjectOperation *onack = new C_ObjectOperation(nested); c->is_read = true; c->io = this; onack->m_ops.cmpext(off, cmp_len, cmp_buf, NULL); Objecter::Op *o = objecter->prepare_read_op( oid, oloc, onack->m_ops, snap_seq, NULL, 0, onack, &c->objver); objecter->op_submit(o, &c->tid); return 0; } int librados::IoCtxImpl::aio_write(const object_t &oid, AioCompletionImpl *c, const bufferlist& bl, size_t len, uint64_t off, const blkin_trace_info *info) { FUNCTRACE(); auto ut = ceph::real_clock::now(); ldout(client->cct, 20) << "aio_write " << oid << " " << off << "~" << len << " snapc=" << snapc << " snap_seq=" << snap_seq << dendl; OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN"); if (len > UINT_MAX/2) return -E2BIG; /* can't write to a snapshot */ if (snap_seq != CEPH_NOSNAP) return -EROFS; Context *oncomplete = new C_aio_Complete(c); #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) ((C_aio_Complete *) oncomplete)->oid = oid; #endif ZTracer::Trace trace; if (info) trace.init("rados write", &objecter->trace_endpoint, info); c->io = this; queue_aio_write(c); Objecter::Op *o = objecter->prepare_write_op( oid, oloc, off, len, snapc, bl, ut, 0, oncomplete, &c->objver, nullptr, 0, &trace); objecter->op_submit(o, &c->tid); return 0; } int librados::IoCtxImpl::aio_append(const object_t &oid, AioCompletionImpl *c, const bufferlist& bl, size_t len) { FUNCTRACE(); auto ut = ceph::real_clock::now(); if (len > UINT_MAX/2) return -E2BIG; /* can't write to a snapshot */ if (snap_seq != CEPH_NOSNAP) return -EROFS; Context *oncomplete = new C_aio_Complete(c); #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) ((C_aio_Complete *) oncomplete)->oid = oid; #endif c->io = this; queue_aio_write(c); Objecter::Op *o = objecter->prepare_append_op( oid, oloc, len, snapc, bl, ut, 0, oncomplete, &c->objver); objecter->op_submit(o, &c->tid); return 0; } int librados::IoCtxImpl::aio_write_full(const object_t &oid, AioCompletionImpl *c, const bufferlist& bl) { FUNCTRACE(); auto ut = ceph::real_clock::now(); if (bl.length() > UINT_MAX/2) return -E2BIG; /* can't write to a snapshot */ if (snap_seq != CEPH_NOSNAP) return -EROFS; Context *oncomplete = new C_aio_Complete(c); #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) ((C_aio_Complete *) oncomplete)->oid = oid; #endif c->io = this; queue_aio_write(c); Objecter::Op *o = objecter->prepare_write_full_op( oid, oloc, snapc, bl, ut, 0, oncomplete, &c->objver); objecter->op_submit(o, &c->tid); return 0; } int librados::IoCtxImpl::aio_writesame(const object_t &oid, AioCompletionImpl *c, const bufferlist& bl, size_t write_len, uint64_t off) { FUNCTRACE(); auto ut = ceph::real_clock::now(); if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2)) return -E2BIG; if ((bl.length() == 0) || (write_len % bl.length())) return -EINVAL; /* can't write to a snapshot */ if (snap_seq != CEPH_NOSNAP) return -EROFS; Context *oncomplete = new C_aio_Complete(c); #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) ((C_aio_Complete *) oncomplete)->oid = oid; #endif c->io = this; queue_aio_write(c); Objecter::Op *o = objecter->prepare_writesame_op( oid, oloc, write_len, off, snapc, bl, ut, 0, oncomplete, &c->objver); objecter->op_submit(o, &c->tid); return 0; } int librados::IoCtxImpl::aio_remove(const object_t &oid, AioCompletionImpl *c, int flags) { FUNCTRACE(); auto ut = ceph::real_clock::now(); /* can't write to a snapshot */ if (snap_seq != CEPH_NOSNAP) return -EROFS; Context *oncomplete = new C_aio_Complete(c); #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) ((C_aio_Complete *) oncomplete)->oid = oid; #endif c->io = this; queue_aio_write(c); Objecter::Op *o = objecter->prepare_remove_op( oid, oloc, snapc, ut, flags, oncomplete, &c->objver); objecter->op_submit(o, &c->tid); return 0; } int librados::IoCtxImpl::aio_stat(const object_t& oid, AioCompletionImpl *c, uint64_t *psize, time_t *pmtime) { C_aio_stat_Ack *onack = new C_aio_stat_Ack(c, pmtime); c->is_read = true; c->io = this; Objecter::Op *o = objecter->prepare_stat_op( oid, oloc, snap_seq, psize, &onack->mtime, 0, onack, &c->objver); objecter->op_submit(o, &c->tid); return 0; } int librados::IoCtxImpl::aio_stat2(const object_t& oid, AioCompletionImpl *c, uint64_t *psize, struct timespec *pts) { C_aio_stat2_Ack *onack = new C_aio_stat2_Ack(c, pts); c->is_read = true; c->io = this; Objecter::Op *o = objecter->prepare_stat_op( oid, oloc, snap_seq, psize, &onack->mtime, 0, onack, &c->objver); objecter->op_submit(o, &c->tid); return 0; } int librados::IoCtxImpl::aio_getxattr(const object_t& oid, AioCompletionImpl *c, const char *name, bufferlist& bl) { ::ObjectOperation rd; prepare_assert_ops(&rd); rd.getxattr(name, &bl, NULL); int r = aio_operate_read(oid, &rd, c, 0, &bl); return r; } int librados::IoCtxImpl::aio_rmxattr(const object_t& oid, AioCompletionImpl *c, const char *name) { ::ObjectOperation op; prepare_assert_ops(&op); op.rmxattr(name); return aio_operate(oid, &op, c, snapc, 0); } int librados::IoCtxImpl::aio_setxattr(const object_t& oid, AioCompletionImpl *c, const char *name, bufferlist& bl) { ::ObjectOperation op; prepare_assert_ops(&op); op.setxattr(name, bl); return aio_operate(oid, &op, c, snapc, 0); } namespace { struct AioGetxattrsData { AioGetxattrsData(librados::AioCompletionImpl *c, map* attrset, librados::RadosClient *_client) : user_completion(c), user_attrset(attrset), client(_client) {} struct librados::C_AioCompleteAndSafe user_completion; map result_attrset; map* user_attrset; librados::RadosClient *client; }; } static void aio_getxattrs_complete(rados_completion_t c, void *arg) { AioGetxattrsData *cdata = reinterpret_cast(arg); int rc = rados_aio_get_return_value(c); cdata->user_attrset->clear(); if (rc >= 0) { for (map::iterator p = cdata->result_attrset.begin(); p != cdata->result_attrset.end(); ++p) { ldout(cdata->client->cct, 10) << "IoCtxImpl::getxattrs: xattr=" << p->first << dendl; (*cdata->user_attrset)[p->first] = p->second; } } cdata->user_completion.finish(rc); ((librados::AioCompletionImpl*)c)->put(); delete cdata; } int librados::IoCtxImpl::aio_getxattrs(const object_t& oid, AioCompletionImpl *c, map& attrset) { AioGetxattrsData *cdata = new AioGetxattrsData(c, &attrset, client); ::ObjectOperation rd; prepare_assert_ops(&rd); rd.getxattrs(&cdata->result_attrset, NULL); librados::AioCompletionImpl *comp = new librados::AioCompletionImpl; comp->set_complete_callback(cdata, aio_getxattrs_complete); return aio_operate_read(oid, &rd, comp, 0, NULL); } int librados::IoCtxImpl::aio_cancel(AioCompletionImpl *c) { return objecter->op_cancel(c->tid, -ECANCELED); } int librados::IoCtxImpl::hit_set_list(uint32_t hash, AioCompletionImpl *c, std::list< std::pair > *pls) { Context *oncomplete = new C_aio_Complete(c); c->is_read = true; c->io = this; ::ObjectOperation rd; rd.hit_set_ls(pls, NULL); object_locator_t oloc(poolid); Objecter::Op *o = objecter->prepare_pg_read_op( hash, oloc, rd, NULL, 0, oncomplete, NULL, NULL); objecter->op_submit(o, &c->tid); return 0; } int librados::IoCtxImpl::hit_set_get(uint32_t hash, AioCompletionImpl *c, time_t stamp, bufferlist *pbl) { Context *oncomplete = new C_aio_Complete(c); c->is_read = true; c->io = this; ::ObjectOperation rd; rd.hit_set_get(ceph::real_clock::from_time_t(stamp), pbl, 0); object_locator_t oloc(poolid); Objecter::Op *o = objecter->prepare_pg_read_op( hash, oloc, rd, NULL, 0, oncomplete, NULL, NULL); objecter->op_submit(o, &c->tid); return 0; } int librados::IoCtxImpl::remove(const object_t& oid) { ::ObjectOperation op; prepare_assert_ops(&op); op.remove(); return operate(oid, &op, NULL); } int librados::IoCtxImpl::remove(const object_t& oid, int flags) { ::ObjectOperation op; prepare_assert_ops(&op); op.remove(); return operate(oid, &op, NULL, flags); } int librados::IoCtxImpl::trunc(const object_t& oid, uint64_t size) { ::ObjectOperation op; prepare_assert_ops(&op); op.truncate(size); return operate(oid, &op, NULL); } int librados::IoCtxImpl::get_inconsistent_objects(const pg_t& pg, const librados::object_id_t& start_after, uint64_t max_to_get, AioCompletionImpl *c, std::vector* objects, uint32_t* interval) { Context *oncomplete = new C_aio_Complete(c); c->is_read = true; c->io = this; ::ObjectOperation op; op.scrub_ls(start_after, max_to_get, objects, interval, nullptr); object_locator_t oloc{poolid, pg.ps()}; Objecter::Op *o = objecter->prepare_pg_read_op( oloc.hash, oloc, op, nullptr, CEPH_OSD_FLAG_PGOP, oncomplete, nullptr, nullptr); objecter->op_submit(o, &c->tid); return 0; } int librados::IoCtxImpl::get_inconsistent_snapsets(const pg_t& pg, const librados::object_id_t& start_after, uint64_t max_to_get, AioCompletionImpl *c, std::vector* snapsets, uint32_t* interval) { Context *oncomplete = new C_aio_Complete(c); c->is_read = true; c->io = this; ::ObjectOperation op; op.scrub_ls(start_after, max_to_get, snapsets, interval, nullptr); object_locator_t oloc{poolid, pg.ps()}; Objecter::Op *o = objecter->prepare_pg_read_op( oloc.hash, oloc, op, nullptr, CEPH_OSD_FLAG_PGOP, oncomplete, nullptr, nullptr); objecter->op_submit(o, &c->tid); return 0; } int librados::IoCtxImpl::tmap_update(const object_t& oid, bufferlist& cmdbl) { ::ObjectOperation wr; prepare_assert_ops(&wr); wr.tmap_update(cmdbl); return operate(oid, &wr, NULL); } int librados::IoCtxImpl::tmap_put(const object_t& oid, bufferlist& bl) { ::ObjectOperation wr; prepare_assert_ops(&wr); wr.tmap_put(bl); return operate(oid, &wr, NULL); } int librados::IoCtxImpl::tmap_get(const object_t& oid, bufferlist& bl) { ::ObjectOperation rd; prepare_assert_ops(&rd); rd.tmap_get(&bl, NULL); return operate_read(oid, &rd, NULL); } int librados::IoCtxImpl::tmap_to_omap(const object_t& oid, bool nullok) { ::ObjectOperation wr; prepare_assert_ops(&wr); wr.tmap_to_omap(nullok); return operate(oid, &wr, NULL); } int librados::IoCtxImpl::exec(const object_t& oid, const char *cls, const char *method, bufferlist& inbl, bufferlist& outbl) { ::ObjectOperation rd; prepare_assert_ops(&rd); rd.call(cls, method, inbl); return operate_read(oid, &rd, &outbl); } int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c, const char *cls, const char *method, bufferlist& inbl, bufferlist *outbl) { FUNCTRACE(); Context *oncomplete = new C_aio_Complete(c); #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) ((C_aio_Complete *) oncomplete)->oid = oid; #endif c->is_read = true; c->io = this; ::ObjectOperation rd; prepare_assert_ops(&rd); rd.call(cls, method, inbl); Objecter::Op *o = objecter->prepare_read_op( oid, oloc, rd, snap_seq, outbl, 0, oncomplete, &c->objver); objecter->op_submit(o, &c->tid); return 0; } int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c, const char *cls, const char *method, bufferlist& inbl, char *buf, size_t out_len) { FUNCTRACE(); Context *oncomplete = new C_aio_Complete(c); #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) ((C_aio_Complete *) oncomplete)->oid = oid; #endif c->is_read = true; c->io = this; c->bl.clear(); c->bl.push_back(buffer::create_static(out_len, buf)); c->blp = &c->bl; c->out_buf = buf; ::ObjectOperation rd; prepare_assert_ops(&rd); rd.call(cls, method, inbl); Objecter::Op *o = objecter->prepare_read_op( oid, oloc, rd, snap_seq, &c->bl, 0, oncomplete, &c->objver); objecter->op_submit(o, &c->tid); return 0; } int librados::IoCtxImpl::read(const object_t& oid, bufferlist& bl, size_t len, uint64_t off) { if (len > (size_t) INT_MAX) return -EDOM; OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN"); ::ObjectOperation rd; prepare_assert_ops(&rd); rd.read(off, len, &bl, NULL, NULL); int r = operate_read(oid, &rd, &bl); if (r < 0) return r; if (bl.length() < len) { ldout(client->cct, 10) << "Returned length " << bl.length() << " less than original length "<< len << dendl; } return bl.length(); } int librados::IoCtxImpl::cmpext(const object_t& oid, uint64_t off, bufferlist& cmp_bl) { if (cmp_bl.length() > UINT_MAX/2) return -E2BIG; ::ObjectOperation op; prepare_assert_ops(&op); op.cmpext(off, cmp_bl, NULL); return operate_read(oid, &op, NULL); } int librados::IoCtxImpl::mapext(const object_t& oid, uint64_t off, size_t len, std::map& m) { bufferlist bl; Mutex mylock("IoCtxImpl::read::mylock"); Cond cond; bool done; int r; Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); objecter->mapext(oid, oloc, off, len, snap_seq, &bl, 0, onack); mylock.Lock(); while (!done) cond.Wait(mylock); mylock.Unlock(); ldout(client->cct, 10) << "Objecter returned from read r=" << r << dendl; if (r < 0) return r; bufferlist::iterator iter = bl.begin(); ::decode(m, iter); return m.size(); } int librados::IoCtxImpl::sparse_read(const object_t& oid, std::map& m, bufferlist& data_bl, size_t len, uint64_t off) { if (len > (size_t) INT_MAX) return -EDOM; ::ObjectOperation rd; prepare_assert_ops(&rd); rd.sparse_read(off, len, &m, &data_bl, NULL); int r = operate_read(oid, &rd, NULL); if (r < 0) return r; return m.size(); } int librados::IoCtxImpl::checksum(const object_t& oid, uint8_t type, const bufferlist &init_value, size_t len, uint64_t off, size_t chunk_size, bufferlist *pbl) { if (len > (size_t) INT_MAX) { return -EDOM; } ::ObjectOperation rd; prepare_assert_ops(&rd); rd.checksum(type, init_value, off, len, chunk_size, pbl, nullptr, nullptr); int r = operate_read(oid, &rd, nullptr); if (r < 0) { return r; } return 0; } int librados::IoCtxImpl::stat(const object_t& oid, uint64_t *psize, time_t *pmtime) { uint64_t size; real_time mtime; if (!psize) psize = &size; ::ObjectOperation rd; prepare_assert_ops(&rd); rd.stat(psize, &mtime, NULL); int r = operate_read(oid, &rd, NULL); if (r >= 0 && pmtime) { *pmtime = real_clock::to_time_t(mtime); } return r; } int librados::IoCtxImpl::stat2(const object_t& oid, uint64_t *psize, struct timespec *pts) { uint64_t size; ceph::real_time mtime; if (!psize) psize = &size; ::ObjectOperation rd; prepare_assert_ops(&rd); rd.stat(psize, &mtime, NULL); int r = operate_read(oid, &rd, NULL); if (r < 0) { return r; } if (pts) { *pts = ceph::real_clock::to_timespec(mtime); } return 0; } int librados::IoCtxImpl::getxattr(const object_t& oid, const char *name, bufferlist& bl) { ::ObjectOperation rd; prepare_assert_ops(&rd); rd.getxattr(name, &bl, NULL); int r = operate_read(oid, &rd, &bl); if (r < 0) return r; return bl.length(); } int librados::IoCtxImpl::rmxattr(const object_t& oid, const char *name) { ::ObjectOperation op; prepare_assert_ops(&op); op.rmxattr(name); return operate(oid, &op, NULL); } int librados::IoCtxImpl::setxattr(const object_t& oid, const char *name, bufferlist& bl) { ::ObjectOperation op; prepare_assert_ops(&op); op.setxattr(name, bl); return operate(oid, &op, NULL); } int librados::IoCtxImpl::getxattrs(const object_t& oid, map& attrset) { map aset; ::ObjectOperation rd; prepare_assert_ops(&rd); rd.getxattrs(&aset, NULL); int r = operate_read(oid, &rd, NULL); attrset.clear(); if (r >= 0) { for (map::iterator p = aset.begin(); p != aset.end(); ++p) { ldout(client->cct, 10) << "IoCtxImpl::getxattrs: xattr=" << p->first << dendl; attrset[p->first.c_str()] = p->second; } } return r; } void librados::IoCtxImpl::set_sync_op_version(version_t ver) { ANNOTATE_BENIGN_RACE_SIZED(&last_objver, sizeof(last_objver), "IoCtxImpl last_objver"); last_objver = ver; } struct WatchInfo : public Objecter::WatchContext { librados::IoCtxImpl *ioctx; object_t oid; librados::WatchCtx *ctx; librados::WatchCtx2 *ctx2; bool internal = false; WatchInfo(librados::IoCtxImpl *io, object_t o, librados::WatchCtx *c, librados::WatchCtx2 *c2, bool inter) : ioctx(io), oid(o), ctx(c), ctx2(c2), internal(inter) { ioctx->get(); } ~WatchInfo() override { ioctx->put(); if (internal) { delete ctx; delete ctx2; } } void handle_notify(uint64_t notify_id, uint64_t cookie, uint64_t notifier_id, bufferlist& bl) override { ldout(ioctx->client->cct, 10) << __func__ << " " << notify_id << " cookie " << cookie << " notifier_id " << notifier_id << " len " << bl.length() << dendl; if (ctx2) ctx2->handle_notify(notify_id, cookie, notifier_id, bl); if (ctx) { ctx->notify(0, 0, bl); // send ACK back to OSD if using legacy protocol bufferlist empty; ioctx->notify_ack(oid, notify_id, cookie, empty); } } void handle_error(uint64_t cookie, int err) override { ldout(ioctx->client->cct, 10) << __func__ << " cookie " << cookie << " err " << err << dendl; if (ctx2) ctx2->handle_error(cookie, err); } }; int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle, librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2, bool internal) { return watch(oid, handle, ctx, ctx2, 0, internal); } int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle, librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2, uint32_t timeout, bool internal) { ::ObjectOperation wr; version_t objver; C_SaferCond onfinish; Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0); *handle = linger_op->get_cookie(); linger_op->watch_context = new WatchInfo(this, oid, ctx, ctx2, internal); prepare_assert_ops(&wr); wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout); bufferlist bl; objecter->linger_watch(linger_op, wr, snapc, ceph::real_clock::now(), bl, &onfinish, &objver); int r = onfinish.wait(); set_sync_op_version(objver); if (r < 0) { objecter->linger_cancel(linger_op); *handle = 0; } return r; } int librados::IoCtxImpl::aio_watch(const object_t& oid, AioCompletionImpl *c, uint64_t *handle, librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2, bool internal) { return aio_watch(oid, c, handle, ctx, ctx2, 0, internal); } int librados::IoCtxImpl::aio_watch(const object_t& oid, AioCompletionImpl *c, uint64_t *handle, librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2, uint32_t timeout, bool internal) { Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0); c->io = this; Context *oncomplete = new C_aio_linger_Complete(c, linger_op, false); ::ObjectOperation wr; *handle = linger_op->get_cookie(); linger_op->watch_context = new WatchInfo(this, oid, ctx, ctx2, internal); prepare_assert_ops(&wr); wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout); bufferlist bl; objecter->linger_watch(linger_op, wr, snapc, ceph::real_clock::now(), bl, oncomplete, &c->objver); return 0; } int librados::IoCtxImpl::notify_ack( const object_t& oid, uint64_t notify_id, uint64_t cookie, bufferlist& bl) { ::ObjectOperation rd; prepare_assert_ops(&rd); rd.notify_ack(notify_id, cookie, bl); objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0); return 0; } int librados::IoCtxImpl::watch_check(uint64_t cookie) { Objecter::LingerOp *linger_op = reinterpret_cast(cookie); return objecter->linger_check(linger_op); } int librados::IoCtxImpl::unwatch(uint64_t cookie) { Objecter::LingerOp *linger_op = reinterpret_cast(cookie); C_SaferCond onfinish; version_t ver = 0; ::ObjectOperation wr; prepare_assert_ops(&wr); wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH); objecter->mutate(linger_op->target.base_oid, oloc, wr, snapc, ceph::real_clock::now(), 0, &onfinish, &ver); objecter->linger_cancel(linger_op); int r = onfinish.wait(); set_sync_op_version(ver); return r; } int librados::IoCtxImpl::aio_unwatch(uint64_t cookie, AioCompletionImpl *c) { c->io = this; Objecter::LingerOp *linger_op = reinterpret_cast(cookie); Context *oncomplete = new C_aio_linger_Complete(c, linger_op, true); ::ObjectOperation wr; prepare_assert_ops(&wr); wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH); objecter->mutate(linger_op->target.base_oid, oloc, wr, snapc, ceph::real_clock::now(), 0, oncomplete, &c->objver); return 0; } int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl, uint64_t timeout_ms, bufferlist *preply_bl, char **preply_buf, size_t *preply_buf_len) { Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0); C_SaferCond notify_finish_cond; Context *notify_finish = new C_notify_Finish(client->cct, ¬ify_finish_cond, objecter, linger_op, preply_bl, preply_buf, preply_buf_len); uint32_t timeout = notify_timeout; if (timeout_ms) timeout = timeout_ms / 1000; // Construct RADOS op ::ObjectOperation rd; prepare_assert_ops(&rd); bufferlist inbl; rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl); // Issue RADOS op C_SaferCond onack; version_t objver; objecter->linger_notify(linger_op, rd, snap_seq, inbl, NULL, &onack, &objver); ldout(client->cct, 10) << __func__ << " issued linger op " << linger_op << dendl; int r = onack.wait(); ldout(client->cct, 10) << __func__ << " linger op " << linger_op << " acked (" << r << ")" << dendl; if (r == 0) { ldout(client->cct, 10) << __func__ << " waiting for watch_notify finish " << linger_op << dendl; r = notify_finish_cond.wait(); } else { ldout(client->cct, 10) << __func__ << " failed to initiate notify, r = " << r << dendl; notify_finish->complete(r); } objecter->linger_cancel(linger_op); set_sync_op_version(objver); return r; } int librados::IoCtxImpl::aio_notify(const object_t& oid, AioCompletionImpl *c, bufferlist& bl, uint64_t timeout_ms, bufferlist *preply_bl, char **preply_buf, size_t *preply_buf_len) { Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0); c->io = this; C_aio_notify_Complete *oncomplete = new C_aio_notify_Complete(c, linger_op); C_notify_Finish *onnotify = new C_notify_Finish(client->cct, oncomplete, objecter, linger_op, preply_bl, preply_buf, preply_buf_len); Context *onack = new C_aio_notify_Ack(client->cct, onnotify, oncomplete); uint32_t timeout = notify_timeout; if (timeout_ms) timeout = timeout_ms / 1000; // Construct RADOS op ::ObjectOperation rd; prepare_assert_ops(&rd); bufferlist inbl; rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl); // Issue RADOS op objecter->linger_notify(linger_op, rd, snap_seq, inbl, NULL, onack, &c->objver); return 0; } int librados::IoCtxImpl::set_alloc_hint(const object_t& oid, uint64_t expected_object_size, uint64_t expected_write_size, uint32_t flags) { ::ObjectOperation wr; prepare_assert_ops(&wr); wr.set_alloc_hint(expected_object_size, expected_write_size, flags); return operate(oid, &wr, NULL); } version_t librados::IoCtxImpl::last_version() { return last_objver; } void librados::IoCtxImpl::set_assert_version(uint64_t ver) { assert_ver = ver; } void librados::IoCtxImpl::set_notify_timeout(uint32_t timeout) { notify_timeout = timeout; } int librados::IoCtxImpl::cache_pin(const object_t& oid) { ::ObjectOperation wr; prepare_assert_ops(&wr); wr.cache_pin(); return operate(oid, &wr, NULL); } int librados::IoCtxImpl::cache_unpin(const object_t& oid) { ::ObjectOperation wr; prepare_assert_ops(&wr); wr.cache_unpin(); return operate(oid, &wr, NULL); } ///////////////////////////// C_aio_stat_Ack //////////////////////////// librados::IoCtxImpl::C_aio_stat_Ack::C_aio_stat_Ack(AioCompletionImpl *_c, time_t *pm) : c(_c), pmtime(pm) { assert(!c->io); c->get(); } void librados::IoCtxImpl::C_aio_stat_Ack::finish(int r) { c->lock.Lock(); c->rval = r; c->complete = true; c->cond.Signal(); if (r >= 0 && pmtime) { *pmtime = real_clock::to_time_t(mtime); } if (c->callback_complete) { c->io->client->finisher.queue(new C_AioComplete(c)); } c->put_unlock(); } ///////////////////////////// C_aio_stat2_Ack //////////////////////////// librados::IoCtxImpl::C_aio_stat2_Ack::C_aio_stat2_Ack(AioCompletionImpl *_c, struct timespec *pt) : c(_c), pts(pt) { assert(!c->io); c->get(); } void librados::IoCtxImpl::C_aio_stat2_Ack::finish(int r) { c->lock.Lock(); c->rval = r; c->complete = true; c->cond.Signal(); if (r >= 0 && pts) { *pts = real_clock::to_timespec(mtime); } if (c->callback_complete) { c->io->client->finisher.queue(new C_AioComplete(c)); } c->put_unlock(); } //////////////////////////// C_aio_Complete //////////////////////////////// librados::IoCtxImpl::C_aio_Complete::C_aio_Complete(AioCompletionImpl *_c) : c(_c) { c->get(); } void librados::IoCtxImpl::C_aio_Complete::finish(int r) { c->lock.Lock(); c->rval = r; c->complete = true; c->cond.Signal(); if (r == 0 && c->blp && c->blp->length() > 0) { if (c->out_buf && !c->blp->is_provided_buffer(c->out_buf)) c->blp->copy(0, c->blp->length(), c->out_buf); c->rval = c->blp->length(); } if (c->callback_complete || c->callback_safe) { c->io->client->finisher.queue(new C_AioComplete(c)); } if (c->aio_write_seq) { c->io->complete_aio_write(c); } #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) OID_EVENT_TRACE(oid.name.c_str(), "RADOS_OP_COMPLETE"); #endif c->put_unlock(); } void librados::IoCtxImpl::object_list_slice( const hobject_t start, const hobject_t finish, const size_t n, const size_t m, hobject_t *split_start, hobject_t *split_finish) { if (start.is_max()) { *split_start = hobject_t::get_max(); *split_finish = hobject_t::get_max(); return; } uint64_t start_hash = hobject_t::_reverse_bits(start.get_hash()); uint64_t finish_hash = finish.is_max() ? 0x100000000 : hobject_t::_reverse_bits(finish.get_hash()); uint64_t diff = finish_hash - start_hash; uint64_t rev_start = start_hash + (diff * n / m); uint64_t rev_finish = start_hash + (diff * (n + 1) / m); if (n == 0) { *split_start = start; } else { *split_start = hobject_t( object_t(), string(), CEPH_NOSNAP, hobject_t::_reverse_bits(rev_start), poolid, string()); } if (n == m - 1) *split_finish = finish; else if (rev_finish >= 0x100000000) *split_finish = hobject_t::get_max(); else *split_finish = hobject_t( object_t(), string(), CEPH_NOSNAP, hobject_t::_reverse_bits(rev_finish), poolid, string()); } int librados::IoCtxImpl::application_enable(const std::string& app_name, bool force) { auto c = new PoolAsyncCompletionImpl(); application_enable_async(app_name, force, c); int r = c->wait(); assert(r == 0); r = c->get_return_value(); c->release(); if (r < 0) { return r; } return client->wait_for_latest_osdmap(); } void librados::IoCtxImpl::application_enable_async(const std::string& app_name, bool force, PoolAsyncCompletionImpl *c) { // pre-Luminous clusters will return -EINVAL and application won't be // preserved until Luminous is configured as minimim version. if (!client->get_required_monitor_features().contains_all( ceph::features::mon::FEATURE_LUMINOUS)) { client->finisher.queue(new C_PoolAsync_Safe(c), -EOPNOTSUPP); return; } std::stringstream cmd; cmd << "{" << "\"prefix\": \"osd pool application enable\"," << "\"pool\": \"" << get_cached_pool_name() << "\"," << "\"app\": \"" << app_name << "\""; if (force) { cmd << ",\"force\":\"--yes-i-really-mean-it\""; } cmd << "}"; std::vector cmds; cmds.push_back(cmd.str()); bufferlist inbl; client->mon_command_async(cmds, inbl, nullptr, nullptr, new C_PoolAsync_Safe(c)); } int librados::IoCtxImpl::application_list(std::set *app_names) { int r = 0; app_names->clear(); objecter->with_osdmap([&](const OSDMap& o) { auto pg_pool = o.get_pg_pool(poolid); if (pg_pool == nullptr) { r = -ENOENT; return; } for (auto &pair : pg_pool->application_metadata) { app_names->insert(pair.first); } }); return r; } int librados::IoCtxImpl::application_metadata_get(const std::string& app_name, const std::string &key, std::string* value) { int r = 0; objecter->with_osdmap([&](const OSDMap& o) { auto pg_pool = o.get_pg_pool(poolid); if (pg_pool == nullptr) { r = -ENOENT; return; } auto app_it = pg_pool->application_metadata.find(app_name); if (app_it == pg_pool->application_metadata.end()) { r = -ENOENT; return; } auto it = app_it->second.find(key); if (it == app_it->second.end()) { r = -ENOENT; return; } *value = it->second; }); return r; } int librados::IoCtxImpl::application_metadata_set(const std::string& app_name, const std::string &key, const std::string& value) { std::stringstream cmd; cmd << "{" << "\"prefix\":\"osd pool application set\"," << "\"pool\":\"" << get_cached_pool_name() << "\"," << "\"app\":\"" << app_name << "\"," << "\"key\":\"" << key << "\"," << "\"value\":\"" << value << "\"" << "}"; std::vector cmds; cmds.push_back(cmd.str()); bufferlist inbl; int r = client->mon_command(cmds, inbl, nullptr, nullptr); if (r < 0) { return r; } // ensure we have the latest osd map epoch before proceeding return client->wait_for_latest_osdmap(); } int librados::IoCtxImpl::application_metadata_remove(const std::string& app_name, const std::string &key) { std::stringstream cmd; cmd << "{" << "\"prefix\":\"osd pool application rm\"," << "\"pool\":\"" << get_cached_pool_name() << "\"," << "\"app\":\"" << app_name << "\"," << "\"key\":\"" << key << "\"" << "}"; std::vector cmds; cmds.push_back(cmd.str()); bufferlist inbl; int r = client->mon_command(cmds, inbl, nullptr, nullptr); if (r < 0) { return r; } // ensure we have the latest osd map epoch before proceeding return client->wait_for_latest_osdmap(); } int librados::IoCtxImpl::application_metadata_list(const std::string& app_name, std::map *values) { int r = 0; values->clear(); objecter->with_osdmap([&](const OSDMap& o) { auto pg_pool = o.get_pg_pool(poolid); if (pg_pool == nullptr) { r = -ENOENT; return; } auto it = pg_pool->application_metadata.find(app_name); if (it == pg_pool->application_metadata.end()) { r = -ENOENT; return; } *values = it->second; }); return r; }