X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Flibrados%2FIoCtxImpl.cc;fp=src%2Fceph%2Fsrc%2Flibrados%2FIoCtxImpl.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=2805ed8a03f24ab5c6ddfe5c8f0767f76f0cd770;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/librados/IoCtxImpl.cc b/src/ceph/src/librados/IoCtxImpl.cc deleted file mode 100644 index 2805ed8..0000000 --- a/src/ceph/src/librados/IoCtxImpl.cc +++ /dev/null @@ -1,2247 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2012 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include - -#include "IoCtxImpl.h" - -#include "librados/AioCompletionImpl.h" -#include "librados/PoolAsyncCompletionImpl.h" -#include "librados/RadosClient.h" -#include "include/assert.h" -#include "common/valgrind.h" -#include "common/EventTrace.h" - -#define dout_subsys ceph_subsys_rados -#undef dout_prefix -#define dout_prefix *_dout << "librados: " - -namespace librados { -namespace { - -struct C_notify_Finish : public Context { - CephContext *cct; - Context *ctx; - Objecter *objecter; - Objecter::LingerOp *linger_op; - bufferlist reply_bl; - bufferlist *preply_bl; - char **preply_buf; - size_t *preply_buf_len; - - C_notify_Finish(CephContext *_cct, Context *_ctx, Objecter *_objecter, - Objecter::LingerOp *_linger_op, bufferlist *_preply_bl, - char **_preply_buf, size_t *_preply_buf_len) - : cct(_cct), ctx(_ctx), objecter(_objecter), linger_op(_linger_op), - preply_bl(_preply_bl), preply_buf(_preply_buf), - preply_buf_len(_preply_buf_len) - { - linger_op->on_notify_finish = this; - linger_op->notify_result_bl = &reply_bl; - } - - void finish(int r) override - { - ldout(cct, 10) << __func__ << " completed notify (linger op " - << linger_op << "), r = " << r << dendl; - - // pass result back to user - // NOTE: we do this regardless of what error code we return - if (preply_buf) { - if (reply_bl.length()) { - *preply_buf = (char*)malloc(reply_bl.length()); - memcpy(*preply_buf, reply_bl.c_str(), reply_bl.length()); - } else { - *preply_buf = NULL; - } - } - if (preply_buf_len) - *preply_buf_len = reply_bl.length(); - if (preply_bl) - preply_bl->claim(reply_bl); - - ctx->complete(r); - } -}; - -struct C_aio_linger_cancel : public Context { - Objecter *objecter; - Objecter::LingerOp *linger_op; - - C_aio_linger_cancel(Objecter *_objecter, Objecter::LingerOp *_linger_op) - : objecter(_objecter), linger_op(_linger_op) - { - } - - void finish(int r) override - { - objecter->linger_cancel(linger_op); - } -}; - -struct C_aio_linger_Complete : public Context { - AioCompletionImpl *c; - Objecter::LingerOp *linger_op; - bool cancel; - - C_aio_linger_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op, bool _cancel) - : c(_c), linger_op(_linger_op), cancel(_cancel) - { - c->get(); - } - - void finish(int r) override { - if (cancel || r < 0) - c->io->client->finisher.queue(new C_aio_linger_cancel(c->io->objecter, - linger_op)); - - c->lock.Lock(); - c->rval = r; - c->complete = true; - c->cond.Signal(); - - if (c->callback_complete || - c->callback_safe) { - c->io->client->finisher.queue(new C_AioComplete(c)); - } - c->put_unlock(); - } -}; - -struct C_aio_notify_Complete : public C_aio_linger_Complete { - Mutex lock; - bool acked = false; - bool finished = false; - int ret_val = 0; - - C_aio_notify_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op) - : C_aio_linger_Complete(_c, _linger_op, false), - lock("C_aio_notify_Complete::lock") { - } - - void handle_ack(int r) { - // invoked by C_aio_notify_Ack - lock.Lock(); - acked = true; - complete_unlock(r); - } - - void complete(int r) override { - // invoked by C_notify_Finish (or C_aio_notify_Ack on failure) - lock.Lock(); - finished = true; - complete_unlock(r); - } - - void complete_unlock(int r) { - if (ret_val == 0 && r < 0) { - ret_val = r; - } - - if (acked && finished) { - lock.Unlock(); - cancel = true; - C_aio_linger_Complete::complete(ret_val); - } else { - lock.Unlock(); - } - } -}; - -struct C_aio_notify_Ack : public Context { - CephContext *cct; - C_notify_Finish *onfinish; - C_aio_notify_Complete *oncomplete; - - C_aio_notify_Ack(CephContext *_cct, C_notify_Finish *_onfinish, - C_aio_notify_Complete *_oncomplete) - : cct(_cct), onfinish(_onfinish), oncomplete(_oncomplete) - { - } - - void finish(int r) override - { - ldout(cct, 10) << __func__ << " linger op " << oncomplete->linger_op << " " - << "acked (" << r << ")" << dendl; - oncomplete->handle_ack(r); - if (r < 0) { - // on failure, we won't expect to see a notify_finish callback - onfinish->complete(r); - } - } -}; - -struct C_aio_selfmanaged_snap_op_Complete : public Context { - librados::RadosClient *client; - librados::AioCompletionImpl *c; - - C_aio_selfmanaged_snap_op_Complete(librados::RadosClient *client, - librados::AioCompletionImpl *c) - : client(client), c(c) { - c->get(); - } - - void finish(int r) override { - c->lock.Lock(); - c->rval = r; - c->complete = true; - c->cond.Signal(); - - if (c->callback_complete || c->callback_safe) { - client->finisher.queue(new librados::C_AioComplete(c)); - } - c->put_unlock(); - } -}; - -struct C_aio_selfmanaged_snap_create_Complete : public C_aio_selfmanaged_snap_op_Complete { - snapid_t snapid; - uint64_t *dest_snapid; - - C_aio_selfmanaged_snap_create_Complete(librados::RadosClient *client, - librados::AioCompletionImpl *c, - uint64_t *dest_snapid) - : C_aio_selfmanaged_snap_op_Complete(client, c), - dest_snapid(dest_snapid) { - } - - void finish(int r) override { - if (r >= 0) { - *dest_snapid = snapid; - } - C_aio_selfmanaged_snap_op_Complete::finish(r); - } -}; - -} // anonymous namespace -} // namespace librados - -librados::IoCtxImpl::IoCtxImpl() : - ref_cnt(0), client(NULL), poolid(0), assert_ver(0), last_objver(0), - notify_timeout(30), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"), - aio_write_seq(0), objecter(NULL) -{ -} - -librados::IoCtxImpl::IoCtxImpl(RadosClient *c, Objecter *objecter, - int64_t poolid, snapid_t s) - : ref_cnt(0), client(c), poolid(poolid), snap_seq(s), - assert_ver(0), last_objver(0), - notify_timeout(c->cct->_conf->client_notify_timeout), - oloc(poolid), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"), - aio_write_seq(0), objecter(objecter) -{ -} - -void librados::IoCtxImpl::set_snap_read(snapid_t s) -{ - if (!s) - s = CEPH_NOSNAP; - ldout(client->cct, 10) << "set snap read " << snap_seq << " -> " << s << dendl; - snap_seq = s; -} - -int librados::IoCtxImpl::set_snap_write_context(snapid_t seq, vector& snaps) -{ - ::SnapContext n; - ldout(client->cct, 10) << "set snap write context: seq = " << seq - << " and snaps = " << snaps << dendl; - n.seq = seq; - n.snaps = snaps; - if (!n.is_valid()) - return -EINVAL; - snapc = n; - return 0; -} - -int librados::IoCtxImpl::get_object_hash_position( - const std::string& oid, uint32_t *hash_position) -{ - int64_t r = objecter->get_object_hash_position(poolid, oid, oloc.nspace); - if (r < 0) - return r; - *hash_position = (uint32_t)r; - return 0; -} - -int librados::IoCtxImpl::get_object_pg_hash_position( - const std::string& oid, uint32_t *pg_hash_position) -{ - int64_t r = objecter->get_object_pg_hash_position(poolid, oid, oloc.nspace); - if (r < 0) - return r; - *pg_hash_position = (uint32_t)r; - return 0; -} - -void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl *c) -{ - get(); - aio_write_list_lock.Lock(); - assert(c->io == this); - c->aio_write_seq = ++aio_write_seq; - ldout(client->cct, 20) << "queue_aio_write " << this << " completion " << c - << " write_seq " << aio_write_seq << dendl; - aio_write_list.push_back(&c->aio_write_list_item); - aio_write_list_lock.Unlock(); -} - -void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c) -{ - ldout(client->cct, 20) << "complete_aio_write " << c << dendl; - aio_write_list_lock.Lock(); - assert(c->io == this); - c->aio_write_list_item.remove_myself(); - - map >::iterator waiters = aio_write_waiters.begin(); - while (waiters != aio_write_waiters.end()) { - if (!aio_write_list.empty() && - aio_write_list.front()->aio_write_seq <= waiters->first) { - ldout(client->cct, 20) << " next outstanding write is " << aio_write_list.front()->aio_write_seq - << " <= waiter " << waiters->first - << ", stopping" << dendl; - break; - } - ldout(client->cct, 20) << " waking waiters on seq " << waiters->first << dendl; - for (std::list::iterator it = waiters->second.begin(); - it != waiters->second.end(); ++it) { - client->finisher.queue(new C_AioCompleteAndSafe(*it)); - (*it)->put(); - } - aio_write_waiters.erase(waiters++); - } - - aio_write_cond.Signal(); - aio_write_list_lock.Unlock(); - put(); -} - -void librados::IoCtxImpl::flush_aio_writes_async(AioCompletionImpl *c) -{ - ldout(client->cct, 20) << "flush_aio_writes_async " << this - << " completion " << c << dendl; - Mutex::Locker l(aio_write_list_lock); - ceph_tid_t seq = aio_write_seq; - if (aio_write_list.empty()) { - ldout(client->cct, 20) << "flush_aio_writes_async no writes. (tid " - << seq << ")" << dendl; - client->finisher.queue(new C_AioCompleteAndSafe(c)); - } else { - ldout(client->cct, 20) << "flush_aio_writes_async " << aio_write_list.size() - << " writes in flight; waiting on tid " << seq << dendl; - c->get(); - aio_write_waiters[seq].push_back(c); - } -} - -void librados::IoCtxImpl::flush_aio_writes() -{ - ldout(client->cct, 20) << "flush_aio_writes" << dendl; - aio_write_list_lock.Lock(); - ceph_tid_t seq = aio_write_seq; - while (!aio_write_list.empty() && - aio_write_list.front()->aio_write_seq <= seq) - aio_write_cond.Wait(aio_write_list_lock); - aio_write_list_lock.Unlock(); -} - -string librados::IoCtxImpl::get_cached_pool_name() -{ - std::string pn; - client->pool_get_name(get_id(), &pn); - return pn; -} - -// SNAPS - -int librados::IoCtxImpl::snap_create(const char *snapName) -{ - int reply; - string sName(snapName); - - Mutex mylock ("IoCtxImpl::snap_create::mylock"); - Cond cond; - bool done; - Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply); - reply = objecter->create_pool_snap(poolid, sName, onfinish); - - if (reply < 0) { - delete onfinish; - } else { - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - } - return reply; -} - -int librados::IoCtxImpl::selfmanaged_snap_create(uint64_t *psnapid) -{ - int reply; - - Mutex mylock("IoCtxImpl::selfmanaged_snap_create::mylock"); - Cond cond; - bool done; - Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply); - snapid_t snapid; - reply = objecter->allocate_selfmanaged_snap(poolid, &snapid, onfinish); - - if (reply < 0) { - delete onfinish; - } else { - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - if (reply == 0) - *psnapid = snapid; - } - return reply; -} - -void librados::IoCtxImpl::aio_selfmanaged_snap_create(uint64_t *snapid, - AioCompletionImpl *c) -{ - C_aio_selfmanaged_snap_create_Complete *onfinish = - new C_aio_selfmanaged_snap_create_Complete(client, c, snapid); - int r = objecter->allocate_selfmanaged_snap(poolid, &onfinish->snapid, - onfinish); - if (r < 0) { - onfinish->complete(r); - } -} - -int librados::IoCtxImpl::snap_remove(const char *snapName) -{ - int reply; - string sName(snapName); - - Mutex mylock ("IoCtxImpl::snap_remove::mylock"); - Cond cond; - bool done; - Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply); - reply = objecter->delete_pool_snap(poolid, sName, onfinish); - - if (reply < 0) { - delete onfinish; - } else { - mylock.Lock(); - while(!done) - cond.Wait(mylock); - mylock.Unlock(); - } - return reply; -} - -int librados::IoCtxImpl::selfmanaged_snap_rollback_object(const object_t& oid, - ::SnapContext& snapc, - uint64_t snapid) -{ - int reply; - - Mutex mylock("IoCtxImpl::snap_rollback::mylock"); - Cond cond; - bool done; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &reply); - - ::ObjectOperation op; - prepare_assert_ops(&op); - op.rollback(snapid); - objecter->mutate(oid, oloc, - op, snapc, ceph::real_clock::now(), 0, - onack, NULL); - - mylock.Lock(); - while (!done) cond.Wait(mylock); - mylock.Unlock(); - return reply; -} - -int librados::IoCtxImpl::rollback(const object_t& oid, const char *snapName) -{ - snapid_t snap; - - int r = objecter->pool_snap_by_name(poolid, snapName, &snap); - if (r < 0) { - return r; - } - - return selfmanaged_snap_rollback_object(oid, snapc, snap); -} - -int librados::IoCtxImpl::selfmanaged_snap_remove(uint64_t snapid) -{ - int reply; - - Mutex mylock("IoCtxImpl::selfmanaged_snap_remove::mylock"); - Cond cond; - bool done; - objecter->delete_selfmanaged_snap(poolid, snapid_t(snapid), - new C_SafeCond(&mylock, &cond, &done, &reply)); - - mylock.Lock(); - while (!done) cond.Wait(mylock); - mylock.Unlock(); - return (int)reply; -} - -void librados::IoCtxImpl::aio_selfmanaged_snap_remove(uint64_t snapid, - AioCompletionImpl *c) -{ - Context *onfinish = new C_aio_selfmanaged_snap_op_Complete(client, c); - objecter->delete_selfmanaged_snap(poolid, snapid, onfinish); -} - -int librados::IoCtxImpl::pool_change_auid(unsigned long long auid) -{ - int reply; - - Mutex mylock("IoCtxImpl::pool_change_auid::mylock"); - Cond cond; - bool done; - objecter->change_pool_auid(poolid, - new C_SafeCond(&mylock, &cond, &done, &reply), - auid); - - mylock.Lock(); - while (!done) cond.Wait(mylock); - mylock.Unlock(); - return reply; -} - -int librados::IoCtxImpl::pool_change_auid_async(unsigned long long auid, - PoolAsyncCompletionImpl *c) -{ - objecter->change_pool_auid(poolid, - new C_PoolAsync_Safe(c), - auid); - return 0; -} - -int librados::IoCtxImpl::snap_list(vector *snaps) -{ - return objecter->pool_snap_list(poolid, snaps); -} - -int librados::IoCtxImpl::snap_lookup(const char *name, uint64_t *snapid) -{ - return objecter->pool_snap_by_name(poolid, name, (snapid_t *)snapid); -} - -int librados::IoCtxImpl::snap_get_name(uint64_t snapid, std::string *s) -{ - pool_snap_info_t info; - int ret = objecter->pool_snap_get_info(poolid, snapid, &info); - if (ret < 0) { - return ret; - } - *s = info.name.c_str(); - return 0; -} - -int librados::IoCtxImpl::snap_get_stamp(uint64_t snapid, time_t *t) -{ - pool_snap_info_t info; - int ret = objecter->pool_snap_get_info(poolid, snapid, &info); - if (ret < 0) { - return ret; - } - *t = info.stamp.sec(); - return 0; -} - - -// IO - -int librados::IoCtxImpl::nlist(Objecter::NListContext *context, int max_entries) -{ - Cond cond; - bool done; - int r = 0; - Mutex mylock("IoCtxImpl::nlist::mylock"); - - if (context->at_end()) - return 0; - - context->max_entries = max_entries; - context->nspace = oloc.nspace; - - objecter->list_nobjects(context, new C_SafeCond(&mylock, &cond, &done, &r)); - - mylock.Lock(); - while(!done) - cond.Wait(mylock); - mylock.Unlock(); - - return r; -} - -uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext *context, - uint32_t pos) -{ - context->list.clear(); - return objecter->list_nobjects_seek(context, pos); -} - -uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext *context, - const rados_object_list_cursor& cursor) -{ - context->list.clear(); - return objecter->list_nobjects_seek(context, *(const hobject_t *)cursor); -} - -rados_object_list_cursor librados::IoCtxImpl::nlist_get_cursor(Objecter::NListContext *context) -{ - hobject_t *c = new hobject_t; - - objecter->list_nobjects_get_cursor(context, c); - return (rados_object_list_cursor)c; -} - -int librados::IoCtxImpl::create(const object_t& oid, bool exclusive) -{ - ::ObjectOperation op; - prepare_assert_ops(&op); - op.create(exclusive); - return operate(oid, &op, NULL); -} - -/* - * add any version assert operations that are appropriate given the - * stat in the IoCtx, either the target version assert or any src - * object asserts. these affect a single ioctx operation, so clear - * the ioctx state when we're doing. - * - * return a pointer to the ObjectOperation if we added any events; - * this is convenient for passing the extra_ops argument into Objecter - * methods. - */ -::ObjectOperation *librados::IoCtxImpl::prepare_assert_ops(::ObjectOperation *op) -{ - ::ObjectOperation *pop = NULL; - if (assert_ver) { - op->assert_version(assert_ver); - assert_ver = 0; - pop = op; - } - return pop; -} - -int librados::IoCtxImpl::write(const object_t& oid, bufferlist& bl, - size_t len, uint64_t off) -{ - if (len > UINT_MAX/2) - return -E2BIG; - ::ObjectOperation op; - prepare_assert_ops(&op); - bufferlist mybl; - mybl.substr_of(bl, 0, len); - op.write(off, mybl); - return operate(oid, &op, NULL); -} - -int librados::IoCtxImpl::append(const object_t& oid, bufferlist& bl, size_t len) -{ - if (len > UINT_MAX/2) - return -E2BIG; - ::ObjectOperation op; - prepare_assert_ops(&op); - bufferlist mybl; - mybl.substr_of(bl, 0, len); - op.append(mybl); - return operate(oid, &op, NULL); -} - -int librados::IoCtxImpl::write_full(const object_t& oid, bufferlist& bl) -{ - if (bl.length() > UINT_MAX/2) - return -E2BIG; - ::ObjectOperation op; - prepare_assert_ops(&op); - op.write_full(bl); - return operate(oid, &op, NULL); -} - -int librados::IoCtxImpl::writesame(const object_t& oid, bufferlist& bl, - size_t write_len, uint64_t off) -{ - if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2)) - return -E2BIG; - if ((bl.length() == 0) || (write_len % bl.length())) - return -EINVAL; - ::ObjectOperation op; - prepare_assert_ops(&op); - bufferlist mybl; - mybl.substr_of(bl, 0, bl.length()); - op.writesame(off, write_len, mybl); - return operate(oid, &op, NULL); -} - -int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o, - ceph::real_time *pmtime, int flags) -{ - ceph::real_time ut = (pmtime ? *pmtime : - ceph::real_clock::now()); - - /* can't write to a snapshot */ - if (snap_seq != CEPH_NOSNAP) - return -EROFS; - - if (!o->size()) - return 0; - - Mutex mylock("IoCtxImpl::operate::mylock"); - Cond cond; - bool done; - int r; - version_t ver; - - Context *oncommit = new C_SafeCond(&mylock, &cond, &done, &r); - - int op = o->ops[0].op.op; - ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid - << " nspace=" << oloc.nspace << dendl; - Objecter::Op *objecter_op = objecter->prepare_mutate_op(oid, oloc, - *o, snapc, ut, flags, - oncommit, &ver); - objecter->op_submit(objecter_op); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - ldout(client->cct, 10) << "Objecter returned from " - << ceph_osd_op_name(op) << " r=" << r << dendl; - - set_sync_op_version(ver); - - return r; -} - -int librados::IoCtxImpl::operate_read(const object_t& oid, - ::ObjectOperation *o, - bufferlist *pbl, - int flags) -{ - if (!o->size()) - return 0; - - Mutex mylock("IoCtxImpl::operate_read::mylock"); - Cond cond; - bool done; - int r; - version_t ver; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - - int op = o->ops[0].op.op; - ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid << " nspace=" << oloc.nspace << dendl; - Objecter::Op *objecter_op = objecter->prepare_read_op(oid, oloc, - *o, snap_seq, pbl, flags, - onack, &ver); - objecter->op_submit(objecter_op); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - ldout(client->cct, 10) << "Objecter returned from " - << ceph_osd_op_name(op) << " r=" << r << dendl; - - set_sync_op_version(ver); - - return r; -} - -int librados::IoCtxImpl::aio_operate_read(const object_t &oid, - ::ObjectOperation *o, - AioCompletionImpl *c, - int flags, - bufferlist *pbl, - const blkin_trace_info *trace_info) -{ - FUNCTRACE(); - Context *oncomplete = new C_aio_Complete(c); - -#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) - ((C_aio_Complete *) oncomplete)->oid = oid; -#endif - c->is_read = true; - c->io = this; - - ZTracer::Trace trace; - if (trace_info) { - ZTracer::Trace parent_trace("", nullptr, trace_info); - trace.init("rados operate read", &objecter->trace_endpoint, &parent_trace); - } - - trace.event("init root span"); - Objecter::Op *objecter_op = objecter->prepare_read_op(oid, oloc, - *o, snap_seq, pbl, flags, - oncomplete, &c->objver, nullptr, 0, &trace); - objecter->op_submit(objecter_op, &c->tid); - trace.event("rados operate read submitted"); - - return 0; -} - -int librados::IoCtxImpl::aio_operate(const object_t& oid, - ::ObjectOperation *o, AioCompletionImpl *c, - const SnapContext& snap_context, int flags, - const blkin_trace_info *trace_info) -{ - FUNCTRACE(); - OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN"); - auto ut = ceph::real_clock::now(); - /* can't write to a snapshot */ - if (snap_seq != CEPH_NOSNAP) - return -EROFS; - - Context *oncomplete = new C_aio_Complete(c); -#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) - ((C_aio_Complete *) oncomplete)->oid = oid; -#endif - - c->io = this; - queue_aio_write(c); - - ZTracer::Trace trace; - if (trace_info) { - ZTracer::Trace parent_trace("", nullptr, trace_info); - trace.init("rados operate", &objecter->trace_endpoint, &parent_trace); - } - - trace.event("init root span"); - Objecter::Op *op = objecter->prepare_mutate_op( - oid, oloc, *o, snap_context, ut, flags, - oncomplete, &c->objver, osd_reqid_t(), &trace); - objecter->op_submit(op, &c->tid); - trace.event("rados operate op submitted"); - - return 0; -} - -int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c, - bufferlist *pbl, size_t len, uint64_t off, - uint64_t snapid, const blkin_trace_info *info) -{ - FUNCTRACE(); - if (len > (size_t) INT_MAX) - return -EDOM; - - OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN"); - Context *oncomplete = new C_aio_Complete(c); - -#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) - ((C_aio_Complete *) oncomplete)->oid = oid; -#endif - c->is_read = true; - c->io = this; - c->blp = pbl; - - ZTracer::Trace trace; - if (info) - trace.init("rados read", &objecter->trace_endpoint, info); - - Objecter::Op *o = objecter->prepare_read_op( - oid, oloc, - off, len, snapid, pbl, 0, - oncomplete, &c->objver, nullptr, 0, &trace); - objecter->op_submit(o, &c->tid); - return 0; -} - -int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c, - char *buf, size_t len, uint64_t off, - uint64_t snapid, const blkin_trace_info *info) -{ - FUNCTRACE(); - if (len > (size_t) INT_MAX) - return -EDOM; - - OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN"); - Context *oncomplete = new C_aio_Complete(c); - -#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) - ((C_aio_Complete *) oncomplete)->oid = oid; -#endif - c->is_read = true; - c->io = this; - c->bl.clear(); - c->bl.push_back(buffer::create_static(len, buf)); - c->blp = &c->bl; - c->out_buf = buf; - - ZTracer::Trace trace; - if (info) - trace.init("rados read", &objecter->trace_endpoint, info); - - Objecter::Op *o = objecter->prepare_read_op( - oid, oloc, - off, len, snapid, &c->bl, 0, - oncomplete, &c->objver, nullptr, 0, &trace); - objecter->op_submit(o, &c->tid); - return 0; -} - -class C_ObjectOperation : public Context { -public: - ::ObjectOperation m_ops; - explicit C_ObjectOperation(Context *c) : m_ctx(c) {} - void finish(int r) override { - m_ctx->complete(r); - } -private: - Context *m_ctx; -}; - -int librados::IoCtxImpl::aio_sparse_read(const object_t oid, - AioCompletionImpl *c, - std::map *m, - bufferlist *data_bl, size_t len, - uint64_t off, uint64_t snapid) -{ - FUNCTRACE(); - if (len > (size_t) INT_MAX) - return -EDOM; - - Context *nested = new C_aio_Complete(c); - C_ObjectOperation *onack = new C_ObjectOperation(nested); - -#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) - ((C_aio_Complete *) nested)->oid = oid; -#endif - c->is_read = true; - c->io = this; - - onack->m_ops.sparse_read(off, len, m, data_bl, NULL); - - Objecter::Op *o = objecter->prepare_read_op( - oid, oloc, - onack->m_ops, snapid, NULL, 0, - onack, &c->objver); - objecter->op_submit(o, &c->tid); - return 0; -} - -int librados::IoCtxImpl::aio_cmpext(const object_t& oid, - AioCompletionImpl *c, - uint64_t off, - bufferlist& cmp_bl) -{ - if (cmp_bl.length() > UINT_MAX/2) - return -E2BIG; - - Context *onack = new C_aio_Complete(c); - - c->is_read = true; - c->io = this; - - Objecter::Op *o = objecter->prepare_cmpext_op( - oid, oloc, off, cmp_bl, snap_seq, 0, - onack, &c->objver); - objecter->op_submit(o, &c->tid); - - return 0; -} - -/* use m_ops.cmpext() + prepare_read_op() for non-bufferlist C API */ -int librados::IoCtxImpl::aio_cmpext(const object_t& oid, - AioCompletionImpl *c, - const char *cmp_buf, - size_t cmp_len, - uint64_t off) -{ - if (cmp_len > UINT_MAX/2) - return -E2BIG; - - bufferlist cmp_bl; - cmp_bl.append(cmp_buf, cmp_len); - - Context *nested = new C_aio_Complete(c); - C_ObjectOperation *onack = new C_ObjectOperation(nested); - - c->is_read = true; - c->io = this; - - onack->m_ops.cmpext(off, cmp_len, cmp_buf, NULL); - - Objecter::Op *o = objecter->prepare_read_op( - oid, oloc, onack->m_ops, snap_seq, NULL, 0, onack, &c->objver); - objecter->op_submit(o, &c->tid); - return 0; -} - -int librados::IoCtxImpl::aio_write(const object_t &oid, AioCompletionImpl *c, - const bufferlist& bl, size_t len, - uint64_t off, const blkin_trace_info *info) -{ - FUNCTRACE(); - auto ut = ceph::real_clock::now(); - ldout(client->cct, 20) << "aio_write " << oid << " " << off << "~" << len << " snapc=" << snapc << " snap_seq=" << snap_seq << dendl; - OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN"); - - if (len > UINT_MAX/2) - return -E2BIG; - /* can't write to a snapshot */ - if (snap_seq != CEPH_NOSNAP) - return -EROFS; - - Context *oncomplete = new C_aio_Complete(c); - -#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) - ((C_aio_Complete *) oncomplete)->oid = oid; -#endif - ZTracer::Trace trace; - if (info) - trace.init("rados write", &objecter->trace_endpoint, info); - - c->io = this; - queue_aio_write(c); - - Objecter::Op *o = objecter->prepare_write_op( - oid, oloc, - off, len, snapc, bl, ut, 0, - oncomplete, &c->objver, nullptr, 0, &trace); - objecter->op_submit(o, &c->tid); - - return 0; -} - -int librados::IoCtxImpl::aio_append(const object_t &oid, AioCompletionImpl *c, - const bufferlist& bl, size_t len) -{ - FUNCTRACE(); - auto ut = ceph::real_clock::now(); - - if (len > UINT_MAX/2) - return -E2BIG; - /* can't write to a snapshot */ - if (snap_seq != CEPH_NOSNAP) - return -EROFS; - - Context *oncomplete = new C_aio_Complete(c); -#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) - ((C_aio_Complete *) oncomplete)->oid = oid; -#endif - - c->io = this; - queue_aio_write(c); - - Objecter::Op *o = objecter->prepare_append_op( - oid, oloc, - len, snapc, bl, ut, 0, - oncomplete, &c->objver); - objecter->op_submit(o, &c->tid); - - return 0; -} - -int librados::IoCtxImpl::aio_write_full(const object_t &oid, - AioCompletionImpl *c, - const bufferlist& bl) -{ - FUNCTRACE(); - auto ut = ceph::real_clock::now(); - - if (bl.length() > UINT_MAX/2) - return -E2BIG; - /* can't write to a snapshot */ - if (snap_seq != CEPH_NOSNAP) - return -EROFS; - - Context *oncomplete = new C_aio_Complete(c); -#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) - ((C_aio_Complete *) oncomplete)->oid = oid; -#endif - - c->io = this; - queue_aio_write(c); - - Objecter::Op *o = objecter->prepare_write_full_op( - oid, oloc, - snapc, bl, ut, 0, - oncomplete, &c->objver); - objecter->op_submit(o, &c->tid); - - return 0; -} - -int librados::IoCtxImpl::aio_writesame(const object_t &oid, - AioCompletionImpl *c, - const bufferlist& bl, - size_t write_len, - uint64_t off) -{ - FUNCTRACE(); - auto ut = ceph::real_clock::now(); - - if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2)) - return -E2BIG; - if ((bl.length() == 0) || (write_len % bl.length())) - return -EINVAL; - /* can't write to a snapshot */ - if (snap_seq != CEPH_NOSNAP) - return -EROFS; - - Context *oncomplete = new C_aio_Complete(c); - -#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) - ((C_aio_Complete *) oncomplete)->oid = oid; -#endif - c->io = this; - queue_aio_write(c); - - Objecter::Op *o = objecter->prepare_writesame_op( - oid, oloc, - write_len, off, - snapc, bl, ut, 0, - oncomplete, &c->objver); - objecter->op_submit(o, &c->tid); - - return 0; -} - -int librados::IoCtxImpl::aio_remove(const object_t &oid, AioCompletionImpl *c, int flags) -{ - FUNCTRACE(); - auto ut = ceph::real_clock::now(); - - /* can't write to a snapshot */ - if (snap_seq != CEPH_NOSNAP) - return -EROFS; - - Context *oncomplete = new C_aio_Complete(c); - -#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) - ((C_aio_Complete *) oncomplete)->oid = oid; -#endif - c->io = this; - queue_aio_write(c); - - Objecter::Op *o = objecter->prepare_remove_op( - oid, oloc, - snapc, ut, flags, - oncomplete, &c->objver); - objecter->op_submit(o, &c->tid); - - return 0; -} - - -int librados::IoCtxImpl::aio_stat(const object_t& oid, AioCompletionImpl *c, - uint64_t *psize, time_t *pmtime) -{ - C_aio_stat_Ack *onack = new C_aio_stat_Ack(c, pmtime); - c->is_read = true; - c->io = this; - Objecter::Op *o = objecter->prepare_stat_op( - oid, oloc, - snap_seq, psize, &onack->mtime, 0, - onack, &c->objver); - objecter->op_submit(o, &c->tid); - return 0; -} - -int librados::IoCtxImpl::aio_stat2(const object_t& oid, AioCompletionImpl *c, - uint64_t *psize, struct timespec *pts) -{ - C_aio_stat2_Ack *onack = new C_aio_stat2_Ack(c, pts); - c->is_read = true; - c->io = this; - Objecter::Op *o = objecter->prepare_stat_op( - oid, oloc, - snap_seq, psize, &onack->mtime, 0, - onack, &c->objver); - objecter->op_submit(o, &c->tid); - return 0; -} - -int librados::IoCtxImpl::aio_getxattr(const object_t& oid, AioCompletionImpl *c, - const char *name, bufferlist& bl) -{ - ::ObjectOperation rd; - prepare_assert_ops(&rd); - rd.getxattr(name, &bl, NULL); - int r = aio_operate_read(oid, &rd, c, 0, &bl); - return r; -} - -int librados::IoCtxImpl::aio_rmxattr(const object_t& oid, AioCompletionImpl *c, - const char *name) -{ - ::ObjectOperation op; - prepare_assert_ops(&op); - op.rmxattr(name); - return aio_operate(oid, &op, c, snapc, 0); -} - -int librados::IoCtxImpl::aio_setxattr(const object_t& oid, AioCompletionImpl *c, - const char *name, bufferlist& bl) -{ - ::ObjectOperation op; - prepare_assert_ops(&op); - op.setxattr(name, bl); - return aio_operate(oid, &op, c, snapc, 0); -} - -namespace { -struct AioGetxattrsData { - AioGetxattrsData(librados::AioCompletionImpl *c, map* attrset, - librados::RadosClient *_client) : - user_completion(c), user_attrset(attrset), client(_client) {} - struct librados::C_AioCompleteAndSafe user_completion; - map result_attrset; - map* user_attrset; - librados::RadosClient *client; -}; -} - -static void aio_getxattrs_complete(rados_completion_t c, void *arg) { - AioGetxattrsData *cdata = reinterpret_cast(arg); - int rc = rados_aio_get_return_value(c); - cdata->user_attrset->clear(); - if (rc >= 0) { - for (map::iterator p = cdata->result_attrset.begin(); - p != cdata->result_attrset.end(); - ++p) { - ldout(cdata->client->cct, 10) << "IoCtxImpl::getxattrs: xattr=" << p->first << dendl; - (*cdata->user_attrset)[p->first] = p->second; - } - } - cdata->user_completion.finish(rc); - ((librados::AioCompletionImpl*)c)->put(); - delete cdata; -} - -int librados::IoCtxImpl::aio_getxattrs(const object_t& oid, AioCompletionImpl *c, - map& attrset) -{ - AioGetxattrsData *cdata = new AioGetxattrsData(c, &attrset, client); - ::ObjectOperation rd; - prepare_assert_ops(&rd); - rd.getxattrs(&cdata->result_attrset, NULL); - librados::AioCompletionImpl *comp = new librados::AioCompletionImpl; - comp->set_complete_callback(cdata, aio_getxattrs_complete); - return aio_operate_read(oid, &rd, comp, 0, NULL); -} - -int librados::IoCtxImpl::aio_cancel(AioCompletionImpl *c) -{ - return objecter->op_cancel(c->tid, -ECANCELED); -} - - -int librados::IoCtxImpl::hit_set_list(uint32_t hash, AioCompletionImpl *c, - std::list< std::pair > *pls) -{ - Context *oncomplete = new C_aio_Complete(c); - c->is_read = true; - c->io = this; - - ::ObjectOperation rd; - rd.hit_set_ls(pls, NULL); - object_locator_t oloc(poolid); - Objecter::Op *o = objecter->prepare_pg_read_op( - hash, oloc, rd, NULL, 0, oncomplete, NULL, NULL); - objecter->op_submit(o, &c->tid); - return 0; -} - -int librados::IoCtxImpl::hit_set_get(uint32_t hash, AioCompletionImpl *c, - time_t stamp, - bufferlist *pbl) -{ - Context *oncomplete = new C_aio_Complete(c); - c->is_read = true; - c->io = this; - - ::ObjectOperation rd; - rd.hit_set_get(ceph::real_clock::from_time_t(stamp), pbl, 0); - object_locator_t oloc(poolid); - Objecter::Op *o = objecter->prepare_pg_read_op( - hash, oloc, rd, NULL, 0, oncomplete, NULL, NULL); - objecter->op_submit(o, &c->tid); - return 0; -} - -int librados::IoCtxImpl::remove(const object_t& oid) -{ - ::ObjectOperation op; - prepare_assert_ops(&op); - op.remove(); - return operate(oid, &op, NULL); -} - -int librados::IoCtxImpl::remove(const object_t& oid, int flags) -{ - ::ObjectOperation op; - prepare_assert_ops(&op); - op.remove(); - return operate(oid, &op, NULL, flags); -} - -int librados::IoCtxImpl::trunc(const object_t& oid, uint64_t size) -{ - ::ObjectOperation op; - prepare_assert_ops(&op); - op.truncate(size); - return operate(oid, &op, NULL); -} - -int librados::IoCtxImpl::get_inconsistent_objects(const pg_t& pg, - const librados::object_id_t& start_after, - uint64_t max_to_get, - AioCompletionImpl *c, - std::vector* objects, - uint32_t* interval) -{ - Context *oncomplete = new C_aio_Complete(c); - c->is_read = true; - c->io = this; - - ::ObjectOperation op; - op.scrub_ls(start_after, max_to_get, objects, interval, nullptr); - object_locator_t oloc{poolid, pg.ps()}; - Objecter::Op *o = objecter->prepare_pg_read_op( - oloc.hash, oloc, op, nullptr, CEPH_OSD_FLAG_PGOP, oncomplete, - nullptr, nullptr); - objecter->op_submit(o, &c->tid); - return 0; -} - -int librados::IoCtxImpl::get_inconsistent_snapsets(const pg_t& pg, - const librados::object_id_t& start_after, - uint64_t max_to_get, - AioCompletionImpl *c, - std::vector* snapsets, - uint32_t* interval) -{ - Context *oncomplete = new C_aio_Complete(c); - c->is_read = true; - c->io = this; - - ::ObjectOperation op; - op.scrub_ls(start_after, max_to_get, snapsets, interval, nullptr); - object_locator_t oloc{poolid, pg.ps()}; - Objecter::Op *o = objecter->prepare_pg_read_op( - oloc.hash, oloc, op, nullptr, CEPH_OSD_FLAG_PGOP, oncomplete, - nullptr, nullptr); - objecter->op_submit(o, &c->tid); - return 0; -} - -int librados::IoCtxImpl::tmap_update(const object_t& oid, bufferlist& cmdbl) -{ - ::ObjectOperation wr; - prepare_assert_ops(&wr); - wr.tmap_update(cmdbl); - return operate(oid, &wr, NULL); -} - -int librados::IoCtxImpl::tmap_put(const object_t& oid, bufferlist& bl) -{ - ::ObjectOperation wr; - prepare_assert_ops(&wr); - wr.tmap_put(bl); - return operate(oid, &wr, NULL); -} - -int librados::IoCtxImpl::tmap_get(const object_t& oid, bufferlist& bl) -{ - ::ObjectOperation rd; - prepare_assert_ops(&rd); - rd.tmap_get(&bl, NULL); - return operate_read(oid, &rd, NULL); -} - -int librados::IoCtxImpl::tmap_to_omap(const object_t& oid, bool nullok) -{ - ::ObjectOperation wr; - prepare_assert_ops(&wr); - wr.tmap_to_omap(nullok); - return operate(oid, &wr, NULL); -} - -int librados::IoCtxImpl::exec(const object_t& oid, - const char *cls, const char *method, - bufferlist& inbl, bufferlist& outbl) -{ - ::ObjectOperation rd; - prepare_assert_ops(&rd); - rd.call(cls, method, inbl); - return operate_read(oid, &rd, &outbl); -} - -int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c, - const char *cls, const char *method, - bufferlist& inbl, bufferlist *outbl) -{ - FUNCTRACE(); - Context *oncomplete = new C_aio_Complete(c); - -#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) - ((C_aio_Complete *) oncomplete)->oid = oid; -#endif - c->is_read = true; - c->io = this; - - ::ObjectOperation rd; - prepare_assert_ops(&rd); - rd.call(cls, method, inbl); - Objecter::Op *o = objecter->prepare_read_op( - oid, oloc, rd, snap_seq, outbl, 0, oncomplete, &c->objver); - objecter->op_submit(o, &c->tid); - return 0; -} - -int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c, - const char *cls, const char *method, - bufferlist& inbl, char *buf, size_t out_len) -{ - FUNCTRACE(); - Context *oncomplete = new C_aio_Complete(c); - -#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) - ((C_aio_Complete *) oncomplete)->oid = oid; -#endif - c->is_read = true; - c->io = this; - c->bl.clear(); - c->bl.push_back(buffer::create_static(out_len, buf)); - c->blp = &c->bl; - c->out_buf = buf; - - ::ObjectOperation rd; - prepare_assert_ops(&rd); - rd.call(cls, method, inbl); - Objecter::Op *o = objecter->prepare_read_op( - oid, oloc, rd, snap_seq, &c->bl, 0, oncomplete, &c->objver); - objecter->op_submit(o, &c->tid); - return 0; -} - -int librados::IoCtxImpl::read(const object_t& oid, - bufferlist& bl, size_t len, uint64_t off) -{ - if (len > (size_t) INT_MAX) - return -EDOM; - OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN"); - - ::ObjectOperation rd; - prepare_assert_ops(&rd); - rd.read(off, len, &bl, NULL, NULL); - int r = operate_read(oid, &rd, &bl); - if (r < 0) - return r; - - if (bl.length() < len) { - ldout(client->cct, 10) << "Returned length " << bl.length() - << " less than original length "<< len << dendl; - } - - return bl.length(); -} - -int librados::IoCtxImpl::cmpext(const object_t& oid, uint64_t off, - bufferlist& cmp_bl) -{ - if (cmp_bl.length() > UINT_MAX/2) - return -E2BIG; - - ::ObjectOperation op; - prepare_assert_ops(&op); - op.cmpext(off, cmp_bl, NULL); - return operate_read(oid, &op, NULL); -} - -int librados::IoCtxImpl::mapext(const object_t& oid, - uint64_t off, size_t len, - std::map& m) -{ - bufferlist bl; - - Mutex mylock("IoCtxImpl::read::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - - objecter->mapext(oid, oloc, - off, len, snap_seq, &bl, 0, - onack); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - ldout(client->cct, 10) << "Objecter returned from read r=" << r << dendl; - - if (r < 0) - return r; - - bufferlist::iterator iter = bl.begin(); - ::decode(m, iter); - - return m.size(); -} - -int librados::IoCtxImpl::sparse_read(const object_t& oid, - std::map& m, - bufferlist& data_bl, size_t len, - uint64_t off) -{ - if (len > (size_t) INT_MAX) - return -EDOM; - - ::ObjectOperation rd; - prepare_assert_ops(&rd); - rd.sparse_read(off, len, &m, &data_bl, NULL); - - int r = operate_read(oid, &rd, NULL); - if (r < 0) - return r; - - return m.size(); -} - -int librados::IoCtxImpl::checksum(const object_t& oid, uint8_t type, - const bufferlist &init_value, size_t len, - uint64_t off, size_t chunk_size, - bufferlist *pbl) -{ - if (len > (size_t) INT_MAX) { - return -EDOM; - } - - ::ObjectOperation rd; - prepare_assert_ops(&rd); - rd.checksum(type, init_value, off, len, chunk_size, pbl, nullptr, nullptr); - - int r = operate_read(oid, &rd, nullptr); - if (r < 0) { - return r; - } - - return 0; -} - -int librados::IoCtxImpl::stat(const object_t& oid, uint64_t *psize, time_t *pmtime) -{ - uint64_t size; - real_time mtime; - - if (!psize) - psize = &size; - - ::ObjectOperation rd; - prepare_assert_ops(&rd); - rd.stat(psize, &mtime, NULL); - int r = operate_read(oid, &rd, NULL); - - if (r >= 0 && pmtime) { - *pmtime = real_clock::to_time_t(mtime); - } - - return r; -} - -int librados::IoCtxImpl::stat2(const object_t& oid, uint64_t *psize, struct timespec *pts) -{ - uint64_t size; - ceph::real_time mtime; - - if (!psize) - psize = &size; - - ::ObjectOperation rd; - prepare_assert_ops(&rd); - rd.stat(psize, &mtime, NULL); - int r = operate_read(oid, &rd, NULL); - if (r < 0) { - return r; - } - - if (pts) { - *pts = ceph::real_clock::to_timespec(mtime); - } - - return 0; -} - -int librados::IoCtxImpl::getxattr(const object_t& oid, - const char *name, bufferlist& bl) -{ - ::ObjectOperation rd; - prepare_assert_ops(&rd); - rd.getxattr(name, &bl, NULL); - int r = operate_read(oid, &rd, &bl); - if (r < 0) - return r; - - return bl.length(); -} - -int librados::IoCtxImpl::rmxattr(const object_t& oid, const char *name) -{ - ::ObjectOperation op; - prepare_assert_ops(&op); - op.rmxattr(name); - return operate(oid, &op, NULL); -} - -int librados::IoCtxImpl::setxattr(const object_t& oid, - const char *name, bufferlist& bl) -{ - ::ObjectOperation op; - prepare_assert_ops(&op); - op.setxattr(name, bl); - return operate(oid, &op, NULL); -} - -int librados::IoCtxImpl::getxattrs(const object_t& oid, - map& attrset) -{ - map aset; - - ::ObjectOperation rd; - prepare_assert_ops(&rd); - rd.getxattrs(&aset, NULL); - int r = operate_read(oid, &rd, NULL); - - attrset.clear(); - if (r >= 0) { - for (map::iterator p = aset.begin(); p != aset.end(); ++p) { - ldout(client->cct, 10) << "IoCtxImpl::getxattrs: xattr=" << p->first << dendl; - attrset[p->first.c_str()] = p->second; - } - } - - return r; -} - -void librados::IoCtxImpl::set_sync_op_version(version_t ver) -{ - ANNOTATE_BENIGN_RACE_SIZED(&last_objver, sizeof(last_objver), - "IoCtxImpl last_objver"); - last_objver = ver; -} - -struct WatchInfo : public Objecter::WatchContext { - librados::IoCtxImpl *ioctx; - object_t oid; - librados::WatchCtx *ctx; - librados::WatchCtx2 *ctx2; - bool internal = false; - - WatchInfo(librados::IoCtxImpl *io, object_t o, - librados::WatchCtx *c, librados::WatchCtx2 *c2, - bool inter) - : ioctx(io), oid(o), ctx(c), ctx2(c2), internal(inter) { - ioctx->get(); - } - ~WatchInfo() override { - ioctx->put(); - if (internal) { - delete ctx; - delete ctx2; - } - } - - void handle_notify(uint64_t notify_id, - uint64_t cookie, - uint64_t notifier_id, - bufferlist& bl) override { - ldout(ioctx->client->cct, 10) << __func__ << " " << notify_id - << " cookie " << cookie - << " notifier_id " << notifier_id - << " len " << bl.length() - << dendl; - - if (ctx2) - ctx2->handle_notify(notify_id, cookie, notifier_id, bl); - if (ctx) { - ctx->notify(0, 0, bl); - - // send ACK back to OSD if using legacy protocol - bufferlist empty; - ioctx->notify_ack(oid, notify_id, cookie, empty); - } - } - void handle_error(uint64_t cookie, int err) override { - ldout(ioctx->client->cct, 10) << __func__ << " cookie " << cookie - << " err " << err - << dendl; - if (ctx2) - ctx2->handle_error(cookie, err); - } -}; - -int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle, - librados::WatchCtx *ctx, - librados::WatchCtx2 *ctx2, - bool internal) -{ - return watch(oid, handle, ctx, ctx2, 0, internal); -} - -int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle, - librados::WatchCtx *ctx, - librados::WatchCtx2 *ctx2, - uint32_t timeout, - bool internal) -{ - ::ObjectOperation wr; - version_t objver; - C_SaferCond onfinish; - - Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0); - *handle = linger_op->get_cookie(); - linger_op->watch_context = new WatchInfo(this, - oid, ctx, ctx2, internal); - - prepare_assert_ops(&wr); - wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout); - bufferlist bl; - objecter->linger_watch(linger_op, wr, - snapc, ceph::real_clock::now(), bl, - &onfinish, - &objver); - - int r = onfinish.wait(); - - set_sync_op_version(objver); - - if (r < 0) { - objecter->linger_cancel(linger_op); - *handle = 0; - } - - return r; -} - -int librados::IoCtxImpl::aio_watch(const object_t& oid, - AioCompletionImpl *c, - uint64_t *handle, - librados::WatchCtx *ctx, - librados::WatchCtx2 *ctx2, - bool internal) { - return aio_watch(oid, c, handle, ctx, ctx2, 0, internal); -} - -int librados::IoCtxImpl::aio_watch(const object_t& oid, - AioCompletionImpl *c, - uint64_t *handle, - librados::WatchCtx *ctx, - librados::WatchCtx2 *ctx2, - uint32_t timeout, - bool internal) -{ - Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0); - c->io = this; - Context *oncomplete = new C_aio_linger_Complete(c, linger_op, false); - - ::ObjectOperation wr; - *handle = linger_op->get_cookie(); - linger_op->watch_context = new WatchInfo(this, oid, ctx, ctx2, internal); - - prepare_assert_ops(&wr); - wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout); - bufferlist bl; - objecter->linger_watch(linger_op, wr, - snapc, ceph::real_clock::now(), bl, - oncomplete, &c->objver); - - return 0; -} - - -int librados::IoCtxImpl::notify_ack( - const object_t& oid, - uint64_t notify_id, - uint64_t cookie, - bufferlist& bl) -{ - ::ObjectOperation rd; - prepare_assert_ops(&rd); - rd.notify_ack(notify_id, cookie, bl); - objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0); - return 0; -} - -int librados::IoCtxImpl::watch_check(uint64_t cookie) -{ - Objecter::LingerOp *linger_op = reinterpret_cast(cookie); - return objecter->linger_check(linger_op); -} - -int librados::IoCtxImpl::unwatch(uint64_t cookie) -{ - Objecter::LingerOp *linger_op = reinterpret_cast(cookie); - C_SaferCond onfinish; - version_t ver = 0; - - ::ObjectOperation wr; - prepare_assert_ops(&wr); - wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH); - objecter->mutate(linger_op->target.base_oid, oloc, wr, - snapc, ceph::real_clock::now(), 0, - &onfinish, &ver); - objecter->linger_cancel(linger_op); - - int r = onfinish.wait(); - set_sync_op_version(ver); - return r; -} - -int librados::IoCtxImpl::aio_unwatch(uint64_t cookie, AioCompletionImpl *c) -{ - c->io = this; - Objecter::LingerOp *linger_op = reinterpret_cast(cookie); - Context *oncomplete = new C_aio_linger_Complete(c, linger_op, true); - - ::ObjectOperation wr; - prepare_assert_ops(&wr); - wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH); - objecter->mutate(linger_op->target.base_oid, oloc, wr, - snapc, ceph::real_clock::now(), 0, - oncomplete, &c->objver); - return 0; -} - -int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl, - uint64_t timeout_ms, - bufferlist *preply_bl, - char **preply_buf, size_t *preply_buf_len) -{ - Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0); - - C_SaferCond notify_finish_cond; - Context *notify_finish = new C_notify_Finish(client->cct, ¬ify_finish_cond, - objecter, linger_op, preply_bl, - preply_buf, preply_buf_len); - - uint32_t timeout = notify_timeout; - if (timeout_ms) - timeout = timeout_ms / 1000; - - // Construct RADOS op - ::ObjectOperation rd; - prepare_assert_ops(&rd); - bufferlist inbl; - rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl); - - // Issue RADOS op - C_SaferCond onack; - version_t objver; - objecter->linger_notify(linger_op, - rd, snap_seq, inbl, NULL, - &onack, &objver); - - ldout(client->cct, 10) << __func__ << " issued linger op " << linger_op << dendl; - int r = onack.wait(); - ldout(client->cct, 10) << __func__ << " linger op " << linger_op - << " acked (" << r << ")" << dendl; - - if (r == 0) { - ldout(client->cct, 10) << __func__ << " waiting for watch_notify finish " - << linger_op << dendl; - r = notify_finish_cond.wait(); - - } else { - ldout(client->cct, 10) << __func__ << " failed to initiate notify, r = " - << r << dendl; - notify_finish->complete(r); - } - - objecter->linger_cancel(linger_op); - - set_sync_op_version(objver); - return r; -} - -int librados::IoCtxImpl::aio_notify(const object_t& oid, AioCompletionImpl *c, - bufferlist& bl, uint64_t timeout_ms, - bufferlist *preply_bl, char **preply_buf, - size_t *preply_buf_len) -{ - Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0); - - c->io = this; - - C_aio_notify_Complete *oncomplete = new C_aio_notify_Complete(c, linger_op); - C_notify_Finish *onnotify = new C_notify_Finish(client->cct, oncomplete, - objecter, linger_op, - preply_bl, preply_buf, - preply_buf_len); - Context *onack = new C_aio_notify_Ack(client->cct, onnotify, oncomplete); - - uint32_t timeout = notify_timeout; - if (timeout_ms) - timeout = timeout_ms / 1000; - - // Construct RADOS op - ::ObjectOperation rd; - prepare_assert_ops(&rd); - bufferlist inbl; - rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl); - - // Issue RADOS op - objecter->linger_notify(linger_op, - rd, snap_seq, inbl, NULL, - onack, &c->objver); - return 0; -} - -int librados::IoCtxImpl::set_alloc_hint(const object_t& oid, - uint64_t expected_object_size, - uint64_t expected_write_size, - uint32_t flags) -{ - ::ObjectOperation wr; - prepare_assert_ops(&wr); - wr.set_alloc_hint(expected_object_size, expected_write_size, flags); - return operate(oid, &wr, NULL); -} - -version_t librados::IoCtxImpl::last_version() -{ - return last_objver; -} - -void librados::IoCtxImpl::set_assert_version(uint64_t ver) -{ - assert_ver = ver; -} - -void librados::IoCtxImpl::set_notify_timeout(uint32_t timeout) -{ - notify_timeout = timeout; -} - -int librados::IoCtxImpl::cache_pin(const object_t& oid) -{ - ::ObjectOperation wr; - prepare_assert_ops(&wr); - wr.cache_pin(); - return operate(oid, &wr, NULL); -} - -int librados::IoCtxImpl::cache_unpin(const object_t& oid) -{ - ::ObjectOperation wr; - prepare_assert_ops(&wr); - wr.cache_unpin(); - return operate(oid, &wr, NULL); -} - - -///////////////////////////// C_aio_stat_Ack //////////////////////////// - -librados::IoCtxImpl::C_aio_stat_Ack::C_aio_stat_Ack(AioCompletionImpl *_c, - time_t *pm) - : c(_c), pmtime(pm) -{ - assert(!c->io); - c->get(); -} - -void librados::IoCtxImpl::C_aio_stat_Ack::finish(int r) -{ - c->lock.Lock(); - c->rval = r; - c->complete = true; - c->cond.Signal(); - - if (r >= 0 && pmtime) { - *pmtime = real_clock::to_time_t(mtime); - } - - if (c->callback_complete) { - c->io->client->finisher.queue(new C_AioComplete(c)); - } - - c->put_unlock(); -} - -///////////////////////////// C_aio_stat2_Ack //////////////////////////// - -librados::IoCtxImpl::C_aio_stat2_Ack::C_aio_stat2_Ack(AioCompletionImpl *_c, - struct timespec *pt) - : c(_c), pts(pt) -{ - assert(!c->io); - c->get(); -} - -void librados::IoCtxImpl::C_aio_stat2_Ack::finish(int r) -{ - c->lock.Lock(); - c->rval = r; - c->complete = true; - c->cond.Signal(); - - if (r >= 0 && pts) { - *pts = real_clock::to_timespec(mtime); - } - - if (c->callback_complete) { - c->io->client->finisher.queue(new C_AioComplete(c)); - } - - c->put_unlock(); -} - -//////////////////////////// C_aio_Complete //////////////////////////////// - -librados::IoCtxImpl::C_aio_Complete::C_aio_Complete(AioCompletionImpl *_c) - : c(_c) -{ - c->get(); -} - -void librados::IoCtxImpl::C_aio_Complete::finish(int r) -{ - c->lock.Lock(); - c->rval = r; - c->complete = true; - c->cond.Signal(); - - if (r == 0 && c->blp && c->blp->length() > 0) { - if (c->out_buf && !c->blp->is_provided_buffer(c->out_buf)) - c->blp->copy(0, c->blp->length(), c->out_buf); - c->rval = c->blp->length(); - } - - if (c->callback_complete || - c->callback_safe) { - c->io->client->finisher.queue(new C_AioComplete(c)); - } - - if (c->aio_write_seq) { - c->io->complete_aio_write(c); - } - -#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) - OID_EVENT_TRACE(oid.name.c_str(), "RADOS_OP_COMPLETE"); -#endif - c->put_unlock(); -} - -void librados::IoCtxImpl::object_list_slice( - const hobject_t start, - const hobject_t finish, - const size_t n, - const size_t m, - hobject_t *split_start, - hobject_t *split_finish) -{ - if (start.is_max()) { - *split_start = hobject_t::get_max(); - *split_finish = hobject_t::get_max(); - return; - } - - uint64_t start_hash = hobject_t::_reverse_bits(start.get_hash()); - uint64_t finish_hash = - finish.is_max() ? 0x100000000 : - hobject_t::_reverse_bits(finish.get_hash()); - - uint64_t diff = finish_hash - start_hash; - uint64_t rev_start = start_hash + (diff * n / m); - uint64_t rev_finish = start_hash + (diff * (n + 1) / m); - if (n == 0) { - *split_start = start; - } else { - *split_start = hobject_t( - object_t(), string(), CEPH_NOSNAP, - hobject_t::_reverse_bits(rev_start), poolid, string()); - } - - if (n == m - 1) - *split_finish = finish; - else if (rev_finish >= 0x100000000) - *split_finish = hobject_t::get_max(); - else - *split_finish = hobject_t( - object_t(), string(), CEPH_NOSNAP, - hobject_t::_reverse_bits(rev_finish), poolid, string()); -} - -int librados::IoCtxImpl::application_enable(const std::string& app_name, - bool force) -{ - auto c = new PoolAsyncCompletionImpl(); - application_enable_async(app_name, force, c); - - int r = c->wait(); - assert(r == 0); - - r = c->get_return_value(); - c->release(); - if (r < 0) { - return r; - } - - return client->wait_for_latest_osdmap(); -} - -void librados::IoCtxImpl::application_enable_async(const std::string& app_name, - bool force, - PoolAsyncCompletionImpl *c) -{ - // pre-Luminous clusters will return -EINVAL and application won't be - // preserved until Luminous is configured as minimim version. - if (!client->get_required_monitor_features().contains_all( - ceph::features::mon::FEATURE_LUMINOUS)) { - client->finisher.queue(new C_PoolAsync_Safe(c), -EOPNOTSUPP); - return; - } - - std::stringstream cmd; - cmd << "{" - << "\"prefix\": \"osd pool application enable\"," - << "\"pool\": \"" << get_cached_pool_name() << "\"," - << "\"app\": \"" << app_name << "\""; - if (force) { - cmd << ",\"force\":\"--yes-i-really-mean-it\""; - } - cmd << "}"; - - std::vector cmds; - cmds.push_back(cmd.str()); - bufferlist inbl; - client->mon_command_async(cmds, inbl, nullptr, nullptr, - new C_PoolAsync_Safe(c)); -} - -int librados::IoCtxImpl::application_list(std::set *app_names) -{ - int r = 0; - app_names->clear(); - objecter->with_osdmap([&](const OSDMap& o) { - auto pg_pool = o.get_pg_pool(poolid); - if (pg_pool == nullptr) { - r = -ENOENT; - return; - } - - for (auto &pair : pg_pool->application_metadata) { - app_names->insert(pair.first); - } - }); - return r; -} - -int librados::IoCtxImpl::application_metadata_get(const std::string& app_name, - const std::string &key, - std::string* value) -{ - int r = 0; - objecter->with_osdmap([&](const OSDMap& o) { - auto pg_pool = o.get_pg_pool(poolid); - if (pg_pool == nullptr) { - r = -ENOENT; - return; - } - - auto app_it = pg_pool->application_metadata.find(app_name); - if (app_it == pg_pool->application_metadata.end()) { - r = -ENOENT; - return; - } - - auto it = app_it->second.find(key); - if (it == app_it->second.end()) { - r = -ENOENT; - return; - } - - *value = it->second; - }); - return r; -} - -int librados::IoCtxImpl::application_metadata_set(const std::string& app_name, - const std::string &key, - const std::string& value) -{ - std::stringstream cmd; - cmd << "{" - << "\"prefix\":\"osd pool application set\"," - << "\"pool\":\"" << get_cached_pool_name() << "\"," - << "\"app\":\"" << app_name << "\"," - << "\"key\":\"" << key << "\"," - << "\"value\":\"" << value << "\"" - << "}"; - - std::vector cmds; - cmds.push_back(cmd.str()); - bufferlist inbl; - int r = client->mon_command(cmds, inbl, nullptr, nullptr); - if (r < 0) { - return r; - } - - // ensure we have the latest osd map epoch before proceeding - return client->wait_for_latest_osdmap(); -} - -int librados::IoCtxImpl::application_metadata_remove(const std::string& app_name, - const std::string &key) -{ - std::stringstream cmd; - cmd << "{" - << "\"prefix\":\"osd pool application rm\"," - << "\"pool\":\"" << get_cached_pool_name() << "\"," - << "\"app\":\"" << app_name << "\"," - << "\"key\":\"" << key << "\"" - << "}"; - - std::vector cmds; - cmds.push_back(cmd.str()); - bufferlist inbl; - int r = client->mon_command(cmds, inbl, nullptr, nullptr); - if (r < 0) { - return r; - } - - // ensure we have the latest osd map epoch before proceeding - return client->wait_for_latest_osdmap(); -} - -int librados::IoCtxImpl::application_metadata_list(const std::string& app_name, - std::map *values) -{ - int r = 0; - values->clear(); - objecter->with_osdmap([&](const OSDMap& o) { - auto pg_pool = o.get_pg_pool(poolid); - if (pg_pool == nullptr) { - r = -ENOENT; - return; - } - - auto it = pg_pool->application_metadata.find(app_name); - if (it == pg_pool->application_metadata.end()) { - r = -ENOENT; - return; - } - - *values = it->second; - }); - return r; -} -