X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fosdc%2FObjecter.h;fp=src%2Fceph%2Fsrc%2Fosdc%2FObjecter.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=527022b5d2ac830318965a8723bef33111e4cc45;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/osdc/Objecter.h b/src/ceph/src/osdc/Objecter.h deleted file mode 100644 index 527022b..0000000 --- a/src/ceph/src/osdc/Objecter.h +++ /dev/null @@ -1,3049 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef CEPH_OBJECTER_H -#define CEPH_OBJECTER_H - -#include -#include -#include -#include -#include -#include -#include - -#include - -#include "include/assert.h" -#include "include/buffer.h" -#include "include/types.h" -#include "include/rados/rados_types.hpp" - -#include "common/admin_socket.h" -#include "common/ceph_time.h" -#include "common/ceph_timer.h" -#include "common/Finisher.h" -#include "common/shunique_lock.h" -#include "common/zipkin_trace.h" - -#include "messages/MOSDOp.h" -#include "osd/OSDMap.h" - -using namespace std; - -class Context; -class Messenger; -class OSDMap; -class MonClient; -class Message; -class Finisher; - -class MPoolOpReply; - -class MGetPoolStatsReply; -class MStatfsReply; -class MCommandReply; -class MWatchNotify; - -class PerfCounters; - -// ----------------------------------------- - -struct ObjectOperation { - vector ops; - int flags; - int priority; - - vector out_bl; - vector out_handler; - vector out_rval; - - ObjectOperation() : flags(0), priority(0) {} - ~ObjectOperation() { - while (!out_handler.empty()) { - delete out_handler.back(); - out_handler.pop_back(); - } - } - - size_t size() { - return ops.size(); - } - - void set_last_op_flags(int flags) { - assert(!ops.empty()); - ops.rbegin()->op.flags = flags; - } - - class C_TwoContexts; - /** - * Add a callback to run when this operation completes, - * after any other callbacks for it. - */ - void add_handler(Context *extra); - - OSDOp& add_op(int op) { - int s = ops.size(); - ops.resize(s+1); - ops[s].op.op = op; - out_bl.resize(s+1); - out_bl[s] = NULL; - out_handler.resize(s+1); - out_handler[s] = NULL; - out_rval.resize(s+1); - out_rval[s] = NULL; - return ops[s]; - } - void add_data(int op, uint64_t off, uint64_t len, bufferlist& bl) { - OSDOp& osd_op = add_op(op); - osd_op.op.extent.offset = off; - osd_op.op.extent.length = len; - osd_op.indata.claim_append(bl); - } - void add_writesame(int op, uint64_t off, uint64_t write_len, - bufferlist& bl) { - OSDOp& osd_op = add_op(op); - osd_op.op.writesame.offset = off; - osd_op.op.writesame.length = write_len; - osd_op.op.writesame.data_length = bl.length(); - osd_op.indata.claim_append(bl); - } - void add_xattr(int op, const char *name, const bufferlist& data) { - OSDOp& osd_op = add_op(op); - osd_op.op.xattr.name_len = (name ? strlen(name) : 0); - osd_op.op.xattr.value_len = data.length(); - if (name) - osd_op.indata.append(name); - osd_op.indata.append(data); - } - void add_xattr_cmp(int op, const char *name, uint8_t cmp_op, - uint8_t cmp_mode, const bufferlist& data) { - OSDOp& osd_op = add_op(op); - osd_op.op.xattr.name_len = (name ? strlen(name) : 0); - osd_op.op.xattr.value_len = data.length(); - osd_op.op.xattr.cmp_op = cmp_op; - osd_op.op.xattr.cmp_mode = cmp_mode; - if (name) - osd_op.indata.append(name); - osd_op.indata.append(data); - } - void add_call(int op, const char *cname, const char *method, - bufferlist &indata, - bufferlist *outbl, Context *ctx, int *prval) { - OSDOp& osd_op = add_op(op); - - unsigned p = ops.size() - 1; - out_handler[p] = ctx; - out_bl[p] = outbl; - out_rval[p] = prval; - - osd_op.op.cls.class_len = strlen(cname); - osd_op.op.cls.method_len = strlen(method); - osd_op.op.cls.indata_len = indata.length(); - osd_op.indata.append(cname, osd_op.op.cls.class_len); - osd_op.indata.append(method, osd_op.op.cls.method_len); - osd_op.indata.append(indata); - } - void add_pgls(int op, uint64_t count, collection_list_handle_t cookie, - epoch_t start_epoch) { - OSDOp& osd_op = add_op(op); - osd_op.op.pgls.count = count; - osd_op.op.pgls.start_epoch = start_epoch; - ::encode(cookie, osd_op.indata); - } - void add_pgls_filter(int op, uint64_t count, const bufferlist& filter, - collection_list_handle_t cookie, epoch_t start_epoch) { - OSDOp& osd_op = add_op(op); - osd_op.op.pgls.count = count; - osd_op.op.pgls.start_epoch = start_epoch; - string cname = "pg"; - string mname = "filter"; - ::encode(cname, osd_op.indata); - ::encode(mname, osd_op.indata); - osd_op.indata.append(filter); - ::encode(cookie, osd_op.indata); - } - void add_alloc_hint(int op, uint64_t expected_object_size, - uint64_t expected_write_size, - uint32_t flags) { - OSDOp& osd_op = add_op(op); - osd_op.op.alloc_hint.expected_object_size = expected_object_size; - osd_op.op.alloc_hint.expected_write_size = expected_write_size; - osd_op.op.alloc_hint.flags = flags; - } - - // ------ - - // pg - void pg_ls(uint64_t count, bufferlist& filter, - collection_list_handle_t cookie, epoch_t start_epoch) { - if (filter.length() == 0) - add_pgls(CEPH_OSD_OP_PGLS, count, cookie, start_epoch); - else - add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER, count, filter, cookie, - start_epoch); - flags |= CEPH_OSD_FLAG_PGOP; - } - - void pg_nls(uint64_t count, const bufferlist& filter, - collection_list_handle_t cookie, epoch_t start_epoch) { - if (filter.length() == 0) - add_pgls(CEPH_OSD_OP_PGNLS, count, cookie, start_epoch); - else - add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER, count, filter, cookie, - start_epoch); - flags |= CEPH_OSD_FLAG_PGOP; - } - - void scrub_ls(const librados::object_id_t& start_after, - uint64_t max_to_get, - std::vector *objects, - uint32_t *interval, - int *rval); - void scrub_ls(const librados::object_id_t& start_after, - uint64_t max_to_get, - std::vector *objects, - uint32_t *interval, - int *rval); - - void create(bool excl) { - OSDOp& o = add_op(CEPH_OSD_OP_CREATE); - o.op.flags = (excl ? CEPH_OSD_OP_FLAG_EXCL : 0); - } - - struct C_ObjectOperation_stat : public Context { - bufferlist bl; - uint64_t *psize; - ceph::real_time *pmtime; - time_t *ptime; - struct timespec *pts; - int *prval; - C_ObjectOperation_stat(uint64_t *ps, ceph::real_time *pm, time_t *pt, struct timespec *_pts, - int *prval) - : psize(ps), pmtime(pm), ptime(pt), pts(_pts), prval(prval) {} - void finish(int r) override { - if (r >= 0) { - bufferlist::iterator p = bl.begin(); - try { - uint64_t size; - ceph::real_time mtime; - ::decode(size, p); - ::decode(mtime, p); - if (psize) - *psize = size; - if (pmtime) - *pmtime = mtime; - if (ptime) - *ptime = ceph::real_clock::to_time_t(mtime); - if (pts) - *pts = ceph::real_clock::to_timespec(mtime); - } catch (buffer::error& e) { - if (prval) - *prval = -EIO; - } - } - } - }; - void stat(uint64_t *psize, ceph::real_time *pmtime, int *prval) { - add_op(CEPH_OSD_OP_STAT); - unsigned p = ops.size() - 1; - C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, pmtime, NULL, NULL, - prval); - out_bl[p] = &h->bl; - out_handler[p] = h; - out_rval[p] = prval; - } - void stat(uint64_t *psize, time_t *ptime, int *prval) { - add_op(CEPH_OSD_OP_STAT); - unsigned p = ops.size() - 1; - C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, NULL, ptime, NULL, - prval); - out_bl[p] = &h->bl; - out_handler[p] = h; - out_rval[p] = prval; - } - void stat(uint64_t *psize, struct timespec *pts, int *prval) { - add_op(CEPH_OSD_OP_STAT); - unsigned p = ops.size() - 1; - C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, NULL, NULL, pts, - prval); - out_bl[p] = &h->bl; - out_handler[p] = h; - out_rval[p] = prval; - } - // object cmpext - struct C_ObjectOperation_cmpext : public Context { - int *prval; - C_ObjectOperation_cmpext(int *prval) - : prval(prval) {} - - void finish(int r) { - if (prval) - *prval = r; - } - }; - - void cmpext(uint64_t off, bufferlist& cmp_bl, int *prval) { - add_data(CEPH_OSD_OP_CMPEXT, off, cmp_bl.length(), cmp_bl); - unsigned p = ops.size() - 1; - C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval); - out_handler[p] = h; - out_rval[p] = prval; - } - - // Used by C API - void cmpext(uint64_t off, uint64_t cmp_len, const char *cmp_buf, int *prval) { - bufferlist cmp_bl; - cmp_bl.append(cmp_buf, cmp_len); - add_data(CEPH_OSD_OP_CMPEXT, off, cmp_len, cmp_bl); - unsigned p = ops.size() - 1; - C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval); - out_handler[p] = h; - out_rval[p] = prval; - } - - void read(uint64_t off, uint64_t len, bufferlist *pbl, int *prval, - Context* ctx) { - bufferlist bl; - add_data(CEPH_OSD_OP_READ, off, len, bl); - unsigned p = ops.size() - 1; - out_bl[p] = pbl; - out_rval[p] = prval; - out_handler[p] = ctx; - } - - struct C_ObjectOperation_sparse_read : public Context { - bufferlist bl; - bufferlist *data_bl; - std::map *extents; - int *prval; - C_ObjectOperation_sparse_read(bufferlist *data_bl, - std::map *extents, - int *prval) - : data_bl(data_bl), extents(extents), prval(prval) {} - void finish(int r) override { - bufferlist::iterator iter = bl.begin(); - if (r >= 0) { - try { - ::decode(*extents, iter); - ::decode(*data_bl, iter); - } catch (buffer::error& e) { - if (prval) - *prval = -EIO; - } - } - } - }; - void sparse_read(uint64_t off, uint64_t len, std::map *m, - bufferlist *data_bl, int *prval) { - bufferlist bl; - add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl); - unsigned p = ops.size() - 1; - C_ObjectOperation_sparse_read *h = - new C_ObjectOperation_sparse_read(data_bl, m, prval); - out_bl[p] = &h->bl; - out_handler[p] = h; - out_rval[p] = prval; - } - void write(uint64_t off, bufferlist& bl, - uint64_t truncate_size, - uint32_t truncate_seq) { - add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl); - OSDOp& o = *ops.rbegin(); - o.op.extent.truncate_size = truncate_size; - o.op.extent.truncate_seq = truncate_seq; - } - void write(uint64_t off, bufferlist& bl) { - write(off, bl, 0, 0); - } - void write_full(bufferlist& bl) { - add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl); - } - void writesame(uint64_t off, uint64_t write_len, bufferlist& bl) { - add_writesame(CEPH_OSD_OP_WRITESAME, off, write_len, bl); - } - void append(bufferlist& bl) { - add_data(CEPH_OSD_OP_APPEND, 0, bl.length(), bl); - } - void zero(uint64_t off, uint64_t len) { - bufferlist bl; - add_data(CEPH_OSD_OP_ZERO, off, len, bl); - } - void truncate(uint64_t off) { - bufferlist bl; - add_data(CEPH_OSD_OP_TRUNCATE, off, 0, bl); - } - void remove() { - bufferlist bl; - add_data(CEPH_OSD_OP_DELETE, 0, 0, bl); - } - void mapext(uint64_t off, uint64_t len) { - bufferlist bl; - add_data(CEPH_OSD_OP_MAPEXT, off, len, bl); - } - void sparse_read(uint64_t off, uint64_t len) { - bufferlist bl; - add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl); - } - - void checksum(uint8_t type, const bufferlist &init_value_bl, - uint64_t off, uint64_t len, size_t chunk_size, - bufferlist *pbl, int *prval, Context *ctx) { - OSDOp& osd_op = add_op(CEPH_OSD_OP_CHECKSUM); - osd_op.op.checksum.offset = off; - osd_op.op.checksum.length = len; - osd_op.op.checksum.type = type; - osd_op.op.checksum.chunk_size = chunk_size; - osd_op.indata.append(init_value_bl); - - unsigned p = ops.size() - 1; - out_bl[p] = pbl; - out_rval[p] = prval; - out_handler[p] = ctx; - } - - // object attrs - void getxattr(const char *name, bufferlist *pbl, int *prval) { - bufferlist bl; - add_xattr(CEPH_OSD_OP_GETXATTR, name, bl); - unsigned p = ops.size() - 1; - out_bl[p] = pbl; - out_rval[p] = prval; - } - struct C_ObjectOperation_decodevals : public Context { - uint64_t max_entries; - bufferlist bl; - std::map *pattrs; - bool *ptruncated; - int *prval; - C_ObjectOperation_decodevals(uint64_t m, std::map *pa, - bool *pt, int *pr) - : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) { - if (ptruncated) { - *ptruncated = false; - } - } - void finish(int r) override { - if (r >= 0) { - bufferlist::iterator p = bl.begin(); - try { - if (pattrs) - ::decode(*pattrs, p); - if (ptruncated) { - std::map ignore; - if (!pattrs) { - ::decode(ignore, p); - pattrs = &ignore; - } - if (!p.end()) { - ::decode(*ptruncated, p); - } else { - // the OSD did not provide this. since old OSDs do not - // enfoce omap result limits either, we can infer it from - // the size of the result - *ptruncated = (pattrs->size() == max_entries); - } - } - } - catch (buffer::error& e) { - if (prval) - *prval = -EIO; - } - } - } - }; - struct C_ObjectOperation_decodekeys : public Context { - uint64_t max_entries; - bufferlist bl; - std::set *pattrs; - bool *ptruncated; - int *prval; - C_ObjectOperation_decodekeys(uint64_t m, std::set *pa, bool *pt, - int *pr) - : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) { - if (ptruncated) { - *ptruncated = false; - } - } - void finish(int r) override { - if (r >= 0) { - bufferlist::iterator p = bl.begin(); - try { - if (pattrs) - ::decode(*pattrs, p); - if (ptruncated) { - std::set ignore; - if (!pattrs) { - ::decode(ignore, p); - pattrs = &ignore; - } - if (!p.end()) { - ::decode(*ptruncated, p); - } else { - // the OSD did not provide this. since old OSDs do not - // enfoce omap result limits either, we can infer it from - // the size of the result - *ptruncated = (pattrs->size() == max_entries); - } - } - } - catch (buffer::error& e) { - if (prval) - *prval = -EIO; - } - } - } - }; - struct C_ObjectOperation_decodewatchers : public Context { - bufferlist bl; - list *pwatchers; - int *prval; - C_ObjectOperation_decodewatchers(list *pw, int *pr) - : pwatchers(pw), prval(pr) {} - void finish(int r) override { - if (r >= 0) { - bufferlist::iterator p = bl.begin(); - try { - obj_list_watch_response_t resp; - ::decode(resp, p); - if (pwatchers) { - for (list::iterator i = resp.entries.begin() ; - i != resp.entries.end() ; ++i) { - obj_watch_t ow; - ostringstream sa; - sa << i->addr; - strncpy(ow.addr, sa.str().c_str(), 256); - ow.watcher_id = i->name.num(); - ow.cookie = i->cookie; - ow.timeout_seconds = i->timeout_seconds; - pwatchers->push_back(ow); - } - } - } - catch (buffer::error& e) { - if (prval) - *prval = -EIO; - } - } - } - }; - struct C_ObjectOperation_decodesnaps : public Context { - bufferlist bl; - librados::snap_set_t *psnaps; - int *prval; - C_ObjectOperation_decodesnaps(librados::snap_set_t *ps, int *pr) - : psnaps(ps), prval(pr) {} - void finish(int r) override { - if (r >= 0) { - bufferlist::iterator p = bl.begin(); - try { - obj_list_snap_response_t resp; - ::decode(resp, p); - if (psnaps) { - psnaps->clones.clear(); - for (vector::iterator ci = resp.clones.begin(); - ci != resp.clones.end(); - ++ci) { - librados::clone_info_t clone; - - clone.cloneid = ci->cloneid; - clone.snaps.reserve(ci->snaps.size()); - clone.snaps.insert(clone.snaps.end(), ci->snaps.begin(), - ci->snaps.end()); - clone.overlap = ci->overlap; - clone.size = ci->size; - - psnaps->clones.push_back(clone); - } - psnaps->seq = resp.seq; - } - } catch (buffer::error& e) { - if (prval) - *prval = -EIO; - } - } - } - }; - void getxattrs(std::map *pattrs, int *prval) { - add_op(CEPH_OSD_OP_GETXATTRS); - if (pattrs || prval) { - unsigned p = ops.size() - 1; - C_ObjectOperation_decodevals *h - = new C_ObjectOperation_decodevals(0, pattrs, nullptr, prval); - out_handler[p] = h; - out_bl[p] = &h->bl; - out_rval[p] = prval; - } - } - void setxattr(const char *name, const bufferlist& bl) { - add_xattr(CEPH_OSD_OP_SETXATTR, name, bl); - } - void setxattr(const char *name, const string& s) { - bufferlist bl; - bl.append(s); - add_xattr(CEPH_OSD_OP_SETXATTR, name, bl); - } - void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode, - const bufferlist& bl) { - add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, bl); - } - void rmxattr(const char *name) { - bufferlist bl; - add_xattr(CEPH_OSD_OP_RMXATTR, name, bl); - } - void setxattrs(map& attrs) { - bufferlist bl; - ::encode(attrs, bl); - add_xattr(CEPH_OSD_OP_RESETXATTRS, 0, bl.length()); - } - void resetxattrs(const char *prefix, map& attrs) { - bufferlist bl; - ::encode(attrs, bl); - add_xattr(CEPH_OSD_OP_RESETXATTRS, prefix, bl); - } - - // trivialmap - void tmap_update(bufferlist& bl) { - add_data(CEPH_OSD_OP_TMAPUP, 0, 0, bl); - } - void tmap_put(bufferlist& bl) { - add_data(CEPH_OSD_OP_TMAPPUT, 0, bl.length(), bl); - } - void tmap_get(bufferlist *pbl, int *prval) { - add_op(CEPH_OSD_OP_TMAPGET); - unsigned p = ops.size() - 1; - out_bl[p] = pbl; - out_rval[p] = prval; - } - void tmap_get() { - add_op(CEPH_OSD_OP_TMAPGET); - } - void tmap_to_omap(bool nullok=false) { - OSDOp& osd_op = add_op(CEPH_OSD_OP_TMAP2OMAP); - if (nullok) - osd_op.op.tmap2omap.flags = CEPH_OSD_TMAP2OMAP_NULLOK; - } - - // objectmap - void omap_get_keys(const string &start_after, - uint64_t max_to_get, - std::set *out_set, - bool *ptruncated, - int *prval) { - OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS); - bufferlist bl; - ::encode(start_after, bl); - ::encode(max_to_get, bl); - op.op.extent.offset = 0; - op.op.extent.length = bl.length(); - op.indata.claim_append(bl); - if (prval || ptruncated || out_set) { - unsigned p = ops.size() - 1; - C_ObjectOperation_decodekeys *h = - new C_ObjectOperation_decodekeys(max_to_get, out_set, ptruncated, prval); - out_handler[p] = h; - out_bl[p] = &h->bl; - out_rval[p] = prval; - } - } - - void omap_get_vals(const string &start_after, - const string &filter_prefix, - uint64_t max_to_get, - std::map *out_set, - bool *ptruncated, - int *prval) { - OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALS); - bufferlist bl; - ::encode(start_after, bl); - ::encode(max_to_get, bl); - ::encode(filter_prefix, bl); - op.op.extent.offset = 0; - op.op.extent.length = bl.length(); - op.indata.claim_append(bl); - if (prval || out_set || ptruncated) { - unsigned p = ops.size() - 1; - C_ObjectOperation_decodevals *h = - new C_ObjectOperation_decodevals(max_to_get, out_set, ptruncated, prval); - out_handler[p] = h; - out_bl[p] = &h->bl; - out_rval[p] = prval; - } - } - - void omap_get_vals_by_keys(const std::set &to_get, - std::map *out_set, - int *prval) { - OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS); - bufferlist bl; - ::encode(to_get, bl); - op.op.extent.offset = 0; - op.op.extent.length = bl.length(); - op.indata.claim_append(bl); - if (prval || out_set) { - unsigned p = ops.size() - 1; - C_ObjectOperation_decodevals *h = - new C_ObjectOperation_decodevals(0, out_set, nullptr, prval); - out_handler[p] = h; - out_bl[p] = &h->bl; - out_rval[p] = prval; - } - } - - void omap_cmp(const std::map > &assertions, - int *prval) { - OSDOp &op = add_op(CEPH_OSD_OP_OMAP_CMP); - bufferlist bl; - ::encode(assertions, bl); - op.op.extent.offset = 0; - op.op.extent.length = bl.length(); - op.indata.claim_append(bl); - if (prval) { - unsigned p = ops.size() - 1; - out_rval[p] = prval; - } - } - - struct C_ObjectOperation_copyget : public Context { - bufferlist bl; - object_copy_cursor_t *cursor; - uint64_t *out_size; - ceph::real_time *out_mtime; - std::map *out_attrs; - bufferlist *out_data, *out_omap_header, *out_omap_data; - vector *out_snaps; - snapid_t *out_snap_seq; - uint32_t *out_flags; - uint32_t *out_data_digest; - uint32_t *out_omap_digest; - mempool::osd_pglog::vector > *out_reqids; - uint64_t *out_truncate_seq; - uint64_t *out_truncate_size; - int *prval; - C_ObjectOperation_copyget(object_copy_cursor_t *c, - uint64_t *s, - ceph::real_time *m, - std::map *a, - bufferlist *d, bufferlist *oh, - bufferlist *o, - std::vector *osnaps, - snapid_t *osnap_seq, - uint32_t *flags, - uint32_t *dd, - uint32_t *od, - mempool::osd_pglog::vector > *oreqids, - uint64_t *otseq, - uint64_t *otsize, - int *r) - : cursor(c), - out_size(s), out_mtime(m), - out_attrs(a), out_data(d), out_omap_header(oh), - out_omap_data(o), out_snaps(osnaps), out_snap_seq(osnap_seq), - out_flags(flags), out_data_digest(dd), out_omap_digest(od), - out_reqids(oreqids), - out_truncate_seq(otseq), - out_truncate_size(otsize), - prval(r) {} - void finish(int r) override { - // reqids are copied on ENOENT - if (r < 0 && r != -ENOENT) - return; - try { - bufferlist::iterator p = bl.begin(); - object_copy_data_t copy_reply; - ::decode(copy_reply, p); - if (r == -ENOENT) { - if (out_reqids) - *out_reqids = copy_reply.reqids; - return; - } - if (out_size) - *out_size = copy_reply.size; - if (out_mtime) - *out_mtime = ceph::real_clock::from_ceph_timespec(copy_reply.mtime); - if (out_attrs) - *out_attrs = copy_reply.attrs; - if (out_data) - out_data->claim_append(copy_reply.data); - if (out_omap_header) - out_omap_header->claim_append(copy_reply.omap_header); - if (out_omap_data) - *out_omap_data = copy_reply.omap_data; - if (out_snaps) - *out_snaps = copy_reply.snaps; - if (out_snap_seq) - *out_snap_seq = copy_reply.snap_seq; - if (out_flags) - *out_flags = copy_reply.flags; - if (out_data_digest) - *out_data_digest = copy_reply.data_digest; - if (out_omap_digest) - *out_omap_digest = copy_reply.omap_digest; - if (out_reqids) - *out_reqids = copy_reply.reqids; - if (out_truncate_seq) - *out_truncate_seq = copy_reply.truncate_seq; - if (out_truncate_size) - *out_truncate_size = copy_reply.truncate_size; - *cursor = copy_reply.cursor; - } catch (buffer::error& e) { - if (prval) - *prval = -EIO; - } - } - }; - - void copy_get(object_copy_cursor_t *cursor, - uint64_t max, - uint64_t *out_size, - ceph::real_time *out_mtime, - std::map *out_attrs, - bufferlist *out_data, - bufferlist *out_omap_header, - bufferlist *out_omap_data, - vector *out_snaps, - snapid_t *out_snap_seq, - uint32_t *out_flags, - uint32_t *out_data_digest, - uint32_t *out_omap_digest, - mempool::osd_pglog::vector > *out_reqids, - uint64_t *truncate_seq, - uint64_t *truncate_size, - int *prval) { - OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET); - osd_op.op.copy_get.max = max; - ::encode(*cursor, osd_op.indata); - ::encode(max, osd_op.indata); - unsigned p = ops.size() - 1; - out_rval[p] = prval; - C_ObjectOperation_copyget *h = - new C_ObjectOperation_copyget(cursor, out_size, out_mtime, - out_attrs, out_data, out_omap_header, - out_omap_data, out_snaps, out_snap_seq, - out_flags, out_data_digest, - out_omap_digest, out_reqids, truncate_seq, - truncate_size, prval); - out_bl[p] = &h->bl; - out_handler[p] = h; - } - - void undirty() { - add_op(CEPH_OSD_OP_UNDIRTY); - } - - struct C_ObjectOperation_isdirty : public Context { - bufferlist bl; - bool *pisdirty; - int *prval; - C_ObjectOperation_isdirty(bool *p, int *r) - : pisdirty(p), prval(r) {} - void finish(int r) override { - if (r < 0) - return; - try { - bufferlist::iterator p = bl.begin(); - bool isdirty; - ::decode(isdirty, p); - if (pisdirty) - *pisdirty = isdirty; - } catch (buffer::error& e) { - if (prval) - *prval = -EIO; - } - } - }; - - void is_dirty(bool *pisdirty, int *prval) { - add_op(CEPH_OSD_OP_ISDIRTY); - unsigned p = ops.size() - 1; - out_rval[p] = prval; - C_ObjectOperation_isdirty *h = - new C_ObjectOperation_isdirty(pisdirty, prval); - out_bl[p] = &h->bl; - out_handler[p] = h; - } - - struct C_ObjectOperation_hit_set_ls : public Context { - bufferlist bl; - std::list< std::pair > *ptls; - std::list< std::pair > *putls; - int *prval; - C_ObjectOperation_hit_set_ls(std::list< std::pair > *t, - std::list< std::pair > *ut, - int *r) - : ptls(t), putls(ut), prval(r) {} - void finish(int r) override { - if (r < 0) - return; - try { - bufferlist::iterator p = bl.begin(); - std::list< std::pair > ls; - ::decode(ls, p); - if (ptls) { - ptls->clear(); - for (auto p = ls.begin(); p != ls.end(); ++p) - // round initial timestamp up to the next full second to - // keep this a valid interval. - ptls->push_back( - make_pair(ceph::real_clock::to_time_t( - ceph::ceil(p->first, - // Sadly, no time literals until C++14. - std::chrono::seconds(1))), - ceph::real_clock::to_time_t(p->second))); - } - if (putls) - putls->swap(ls); - } catch (buffer::error& e) { - r = -EIO; - } - if (prval) - *prval = r; - } - }; - - /** - * list available HitSets. - * - * We will get back a list of time intervals. Note that the most - * recent range may have an empty end timestamp if it is still - * accumulating. - * - * @param pls [out] list of time intervals - * @param prval [out] return value - */ - void hit_set_ls(std::list< std::pair > *pls, int *prval) { - add_op(CEPH_OSD_OP_PG_HITSET_LS); - unsigned p = ops.size() - 1; - out_rval[p] = prval; - C_ObjectOperation_hit_set_ls *h = - new C_ObjectOperation_hit_set_ls(pls, NULL, prval); - out_bl[p] = &h->bl; - out_handler[p] = h; - } - void hit_set_ls(std::list > *pls, - int *prval) { - add_op(CEPH_OSD_OP_PG_HITSET_LS); - unsigned p = ops.size() - 1; - out_rval[p] = prval; - C_ObjectOperation_hit_set_ls *h = - new C_ObjectOperation_hit_set_ls(NULL, pls, prval); - out_bl[p] = &h->bl; - out_handler[p] = h; - } - - /** - * get HitSet - * - * Return an encoded HitSet that includes the provided time - * interval. - * - * @param stamp [in] timestamp - * @param pbl [out] target buffer for encoded HitSet - * @param prval [out] return value - */ - void hit_set_get(ceph::real_time stamp, bufferlist *pbl, int *prval) { - OSDOp& op = add_op(CEPH_OSD_OP_PG_HITSET_GET); - op.op.hit_set_get.stamp = ceph::real_clock::to_ceph_timespec(stamp); - unsigned p = ops.size() - 1; - out_rval[p] = prval; - out_bl[p] = pbl; - } - - void omap_get_header(bufferlist *bl, int *prval) { - add_op(CEPH_OSD_OP_OMAPGETHEADER); - unsigned p = ops.size() - 1; - out_bl[p] = bl; - out_rval[p] = prval; - } - - void omap_set(const map &map) { - bufferlist bl; - ::encode(map, bl); - add_data(CEPH_OSD_OP_OMAPSETVALS, 0, bl.length(), bl); - } - - void omap_set_header(bufferlist &bl) { - add_data(CEPH_OSD_OP_OMAPSETHEADER, 0, bl.length(), bl); - } - - void omap_clear() { - add_op(CEPH_OSD_OP_OMAPCLEAR); - } - - void omap_rm_keys(const std::set &to_remove) { - bufferlist bl; - ::encode(to_remove, bl); - add_data(CEPH_OSD_OP_OMAPRMKEYS, 0, bl.length(), bl); - } - - // object classes - void call(const char *cname, const char *method, bufferlist &indata) { - add_call(CEPH_OSD_OP_CALL, cname, method, indata, NULL, NULL, NULL); - } - - void call(const char *cname, const char *method, bufferlist &indata, - bufferlist *outdata, Context *ctx, int *prval) { - add_call(CEPH_OSD_OP_CALL, cname, method, indata, outdata, ctx, prval); - } - - // watch/notify - void watch(uint64_t cookie, __u8 op, uint32_t timeout = 0) { - OSDOp& osd_op = add_op(CEPH_OSD_OP_WATCH); - osd_op.op.watch.cookie = cookie; - osd_op.op.watch.op = op; - osd_op.op.watch.timeout = timeout; - } - - void notify(uint64_t cookie, uint32_t prot_ver, uint32_t timeout, - bufferlist &bl, bufferlist *inbl) { - OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY); - osd_op.op.notify.cookie = cookie; - ::encode(prot_ver, *inbl); - ::encode(timeout, *inbl); - ::encode(bl, *inbl); - osd_op.indata.append(*inbl); - } - - void notify_ack(uint64_t notify_id, uint64_t cookie, - bufferlist& reply_bl) { - OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY_ACK); - bufferlist bl; - ::encode(notify_id, bl); - ::encode(cookie, bl); - ::encode(reply_bl, bl); - osd_op.indata.append(bl); - } - - void list_watchers(list *out, - int *prval) { - (void)add_op(CEPH_OSD_OP_LIST_WATCHERS); - if (prval || out) { - unsigned p = ops.size() - 1; - C_ObjectOperation_decodewatchers *h = - new C_ObjectOperation_decodewatchers(out, prval); - out_handler[p] = h; - out_bl[p] = &h->bl; - out_rval[p] = prval; - } - } - - void list_snaps(librados::snap_set_t *out, int *prval) { - (void)add_op(CEPH_OSD_OP_LIST_SNAPS); - if (prval || out) { - unsigned p = ops.size() - 1; - C_ObjectOperation_decodesnaps *h = - new C_ObjectOperation_decodesnaps(out, prval); - out_handler[p] = h; - out_bl[p] = &h->bl; - out_rval[p] = prval; - } - } - - void assert_version(uint64_t ver) { - OSDOp& osd_op = add_op(CEPH_OSD_OP_ASSERT_VER); - osd_op.op.assert_ver.ver = ver; - } - - void cmpxattr(const char *name, const bufferlist& val, - int op, int mode) { - add_xattr(CEPH_OSD_OP_CMPXATTR, name, val); - OSDOp& o = *ops.rbegin(); - o.op.xattr.cmp_op = op; - o.op.xattr.cmp_mode = mode; - } - - void rollback(uint64_t snapid) { - OSDOp& osd_op = add_op(CEPH_OSD_OP_ROLLBACK); - osd_op.op.snap.snapid = snapid; - } - - void copy_from(object_t src, snapid_t snapid, object_locator_t src_oloc, - version_t src_version, unsigned flags, - unsigned src_fadvise_flags) { - OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_FROM); - osd_op.op.copy_from.snapid = snapid; - osd_op.op.copy_from.src_version = src_version; - osd_op.op.copy_from.flags = flags; - osd_op.op.copy_from.src_fadvise_flags = src_fadvise_flags; - ::encode(src, osd_op.indata); - ::encode(src_oloc, osd_op.indata); - } - - /** - * writeback content to backing tier - * - * If object is marked dirty in the cache tier, write back content - * to backing tier. If the object is clean this is a no-op. - * - * If writeback races with an update, the update will block. - * - * use with IGNORE_CACHE to avoid triggering promote. - */ - void cache_flush() { - add_op(CEPH_OSD_OP_CACHE_FLUSH); - } - - /** - * writeback content to backing tier - * - * If object is marked dirty in the cache tier, write back content - * to backing tier. If the object is clean this is a no-op. - * - * If writeback races with an update, return EAGAIN. Requires that - * the SKIPRWLOCKS flag be set. - * - * use with IGNORE_CACHE to avoid triggering promote. - */ - void cache_try_flush() { - add_op(CEPH_OSD_OP_CACHE_TRY_FLUSH); - } - - /** - * evict object from cache tier - * - * If object is marked clean, remove the object from the cache tier. - * Otherwise, return EBUSY. - * - * use with IGNORE_CACHE to avoid triggering promote. - */ - void cache_evict() { - add_op(CEPH_OSD_OP_CACHE_EVICT); - } - - /* - * Extensible tier - */ - void set_redirect(object_t tgt, snapid_t snapid, object_locator_t tgt_oloc, - version_t tgt_version) { - OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_REDIRECT); - osd_op.op.copy_from.snapid = snapid; - osd_op.op.copy_from.src_version = tgt_version; - ::encode(tgt, osd_op.indata); - ::encode(tgt_oloc, osd_op.indata); - } - - void set_alloc_hint(uint64_t expected_object_size, - uint64_t expected_write_size, - uint32_t flags) { - add_alloc_hint(CEPH_OSD_OP_SETALLOCHINT, expected_object_size, - expected_write_size, flags); - - // CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed - // not worth a feature bit. Set FAILOK per-op flag to make - // sure older osds don't trip over an unsupported opcode. - set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK); - } - - void dup(vector& sops) { - ops = sops; - out_bl.resize(sops.size()); - out_handler.resize(sops.size()); - out_rval.resize(sops.size()); - for (uint32_t i = 0; i < sops.size(); i++) { - out_bl[i] = &sops[i].outdata; - out_handler[i] = NULL; - out_rval[i] = &sops[i].rval; - } - } - - /** - * Pin/unpin an object in cache tier - */ - void cache_pin() { - add_op(CEPH_OSD_OP_CACHE_PIN); - } - - void cache_unpin() { - add_op(CEPH_OSD_OP_CACHE_UNPIN); - } -}; - - -// ---------------- - - -class Objecter : public md_config_obs_t, public Dispatcher { -public: - // config observer bits - const char** get_tracked_conf_keys() const override; - void handle_conf_change(const struct md_config_t *conf, - const std::set &changed) override; - -public: - Messenger *messenger; - MonClient *monc; - Finisher *finisher; - ZTracer::Endpoint trace_endpoint; -private: - OSDMap *osdmap; -public: - using Dispatcher::cct; - std::multimap crush_location; - - std::atomic initialized{false}; - -private: - std::atomic last_tid{0}; - std::atomic inflight_ops{0}; - std::atomic client_inc{-1}; - uint64_t max_linger_id; - std::atomic num_in_flight{0}; - std::atomic global_op_flags{0}; // flags which are applied to each IO op - bool keep_balanced_budget; - bool honor_osdmap_full; - bool osdmap_full_try; - - // If this is true, accumulate a set of blacklisted entities - // to be drained by consume_blacklist_events. - bool blacklist_events_enabled; - std::set blacklist_events; - -public: - void maybe_request_map(); - - void enable_blacklist_events(); -private: - - void _maybe_request_map(); - - version_t last_seen_osdmap_version; - version_t last_seen_pgmap_version; - - mutable boost::shared_mutex rwlock; - using lock_guard = std::unique_lock; - using unique_lock = std::unique_lock; - using shared_lock = boost::shared_lock; - using shunique_lock = ceph::shunique_lock; - ceph::timer timer; - - PerfCounters *logger; - - uint64_t tick_event; - - void start_tick(); - void tick(); - void update_crush_location(); - - class RequestStateHook; - - RequestStateHook *m_request_state_hook; - -public: - /*** track pending operations ***/ - // read - public: - - struct OSDSession; - - struct op_target_t { - int flags = 0; - - epoch_t epoch = 0; ///< latest epoch we calculated the mapping - - object_t base_oid; - object_locator_t base_oloc; - object_t target_oid; - object_locator_t target_oloc; - - ///< true if we are directed at base_pgid, not base_oid - bool precalc_pgid = false; - - ///< true if we have ever mapped to a valid pool - bool pool_ever_existed = false; - - ///< explcit pg target, if any - pg_t base_pgid; - - pg_t pgid; ///< last (raw) pg we mapped to - spg_t actual_pgid; ///< last (actual) spg_t we mapped to - unsigned pg_num = 0; ///< last pg_num we mapped to - unsigned pg_num_mask = 0; ///< last pg_num_mask we mapped to - vector up; ///< set of up osds for last pg we mapped to - vector acting; ///< set of acting osds for last pg we mapped to - int up_primary = -1; ///< last up_primary we mapped to - int acting_primary = -1; ///< last acting_primary we mapped to - int size = -1; ///< the size of the pool when were were last mapped - int min_size = -1; ///< the min size of the pool when were were last mapped - bool sort_bitwise = false; ///< whether the hobject_t sort order is bitwise - bool recovery_deletes = false; ///< whether the deletes are performed during recovery instead of peering - - bool used_replica = false; - bool paused = false; - - int osd = -1; ///< the final target osd, or -1 - - epoch_t last_force_resend = 0; - - op_target_t(object_t oid, object_locator_t oloc, int flags) - : flags(flags), - base_oid(oid), - base_oloc(oloc) - {} - - op_target_t(pg_t pgid) - : base_oloc(pgid.pool(), pgid.ps()), - precalc_pgid(true), - base_pgid(pgid) - {} - - op_target_t() = default; - - hobject_t get_hobj() { - return hobject_t(target_oid, - target_oloc.key, - CEPH_NOSNAP, - target_oloc.hash >= 0 ? target_oloc.hash : pgid.ps(), - target_oloc.pool, - target_oloc.nspace); - } - - bool contained_by(const hobject_t& begin, const hobject_t& end) { - hobject_t h = get_hobj(); - int r = cmp(h, begin); - return r == 0 || (r > 0 && h < end); - } - - void dump(Formatter *f) const; - }; - - struct Op : public RefCountedObject { - OSDSession *session; - int incarnation; - - op_target_t target; - - ConnectionRef con; // for rx buffer only - uint64_t features; // explicitly specified op features - - vector ops; - - snapid_t snapid; - SnapContext snapc; - ceph::real_time mtime; - - bufferlist *outbl; - vector out_bl; - vector out_handler; - vector out_rval; - - int priority; - Context *onfinish; - uint64_t ontimeout; - - ceph_tid_t tid; - int attempts; - - version_t *objver; - epoch_t *reply_epoch; - - ceph::mono_time stamp; - - epoch_t map_dne_bound; - - bool budgeted; - - /// true if we should resend this message on failure - bool should_resend; - - /// true if the throttle budget is get/put on a series of OPs, - /// instead of per OP basis, when this flag is set, the budget is - /// acquired before sending the very first OP of the series and - /// released upon receiving the last OP reply. - bool ctx_budgeted; - - int *data_offset; - - osd_reqid_t reqid; // explicitly setting reqid - ZTracer::Trace trace; - - Op(const object_t& o, const object_locator_t& ol, vector& op, - int f, Context *fin, version_t *ov, int *offset = NULL, - ZTracer::Trace *parent_trace = nullptr) : - session(NULL), incarnation(0), - target(o, ol, f), - con(NULL), - features(CEPH_FEATURES_SUPPORTED_DEFAULT), - snapid(CEPH_NOSNAP), - outbl(NULL), - priority(0), - onfinish(fin), - ontimeout(0), - tid(0), - attempts(0), - objver(ov), - reply_epoch(NULL), - map_dne_bound(0), - budgeted(false), - should_resend(true), - ctx_budgeted(false), - data_offset(offset) { - ops.swap(op); - - /* initialize out_* to match op vector */ - out_bl.resize(ops.size()); - out_rval.resize(ops.size()); - out_handler.resize(ops.size()); - for (unsigned i = 0; i < ops.size(); i++) { - out_bl[i] = NULL; - out_handler[i] = NULL; - out_rval[i] = NULL; - } - - if (target.base_oloc.key == o) - target.base_oloc.key.clear(); - - if (parent_trace && parent_trace->valid()) { - trace.init("op", nullptr, parent_trace); - trace.event("start"); - } - } - - bool operator<(const Op& other) const { - return tid < other.tid; - } - - bool respects_full() const { - return - (target.flags & (CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_RWORDERED)) && - !(target.flags & (CEPH_OSD_FLAG_FULL_TRY | CEPH_OSD_FLAG_FULL_FORCE)); - } - - private: - ~Op() override { - while (!out_handler.empty()) { - delete out_handler.back(); - out_handler.pop_back(); - } - trace.event("finish"); - } - }; - - struct C_Op_Map_Latest : public Context { - Objecter *objecter; - ceph_tid_t tid; - version_t latest; - C_Op_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t), - latest(0) {} - void finish(int r) override; - }; - - struct C_Command_Map_Latest : public Context { - Objecter *objecter; - uint64_t tid; - version_t latest; - C_Command_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t), - latest(0) {} - void finish(int r) override; - }; - - struct C_Stat : public Context { - bufferlist bl; - uint64_t *psize; - ceph::real_time *pmtime; - Context *fin; - C_Stat(uint64_t *ps, ceph::real_time *pm, Context *c) : - psize(ps), pmtime(pm), fin(c) {} - void finish(int r) override { - if (r >= 0) { - bufferlist::iterator p = bl.begin(); - uint64_t s; - ceph::real_time m; - ::decode(s, p); - ::decode(m, p); - if (psize) - *psize = s; - if (pmtime) - *pmtime = m; - } - fin->complete(r); - } - }; - - struct C_GetAttrs : public Context { - bufferlist bl; - map& attrset; - Context *fin; - C_GetAttrs(map& set, Context *c) : attrset(set), - fin(c) {} - void finish(int r) override { - if (r >= 0) { - bufferlist::iterator p = bl.begin(); - ::decode(attrset, p); - } - fin->complete(r); - } - }; - - - // Pools and statistics - struct NListContext { - collection_list_handle_t pos; - - // these are for !sortbitwise compat only - int current_pg = 0; - int starting_pg_num = 0; - bool sort_bitwise = false; - - bool at_end_of_pool = false; ///< publicly visible end flag - - int64_t pool_id = -1; - int pool_snap_seq = 0; - uint64_t max_entries = 0; - string nspace; - - bufferlist bl; // raw data read to here - std::list list; - - bufferlist filter; - - bufferlist extra_info; - - // The budget associated with this context, once it is set (>= 0), - // the budget is not get/released on OP basis, instead the budget - // is acquired before sending the first OP and released upon receiving - // the last op reply. - int ctx_budget = -1; - - bool at_end() const { - return at_end_of_pool; - } - - uint32_t get_pg_hash_position() const { - return pos.get_hash(); - } - }; - - struct C_NList : public Context { - NListContext *list_context; - Context *final_finish; - Objecter *objecter; - epoch_t epoch; - C_NList(NListContext *lc, Context * finish, Objecter *ob) : - list_context(lc), final_finish(finish), objecter(ob), epoch(0) {} - void finish(int r) override { - if (r >= 0) { - objecter->_nlist_reply(list_context, r, final_finish, epoch); - } else { - final_finish->complete(r); - } - } - }; - - struct PoolStatOp { - ceph_tid_t tid; - list pools; - - map *pool_stats; - Context *onfinish; - uint64_t ontimeout; - - ceph::mono_time last_submit; - }; - - struct StatfsOp { - ceph_tid_t tid; - struct ceph_statfs *stats; - boost::optional data_pool; - Context *onfinish; - uint64_t ontimeout; - - ceph::mono_time last_submit; - }; - - struct PoolOp { - ceph_tid_t tid; - int64_t pool; - string name; - Context *onfinish; - uint64_t ontimeout; - int pool_op; - uint64_t auid; - int16_t crush_rule; - snapid_t snapid; - bufferlist *blp; - - ceph::mono_time last_submit; - PoolOp() : tid(0), pool(0), onfinish(NULL), ontimeout(0), pool_op(0), - auid(0), crush_rule(0), snapid(0), blp(NULL) {} - }; - - // -- osd commands -- - struct CommandOp : public RefCountedObject { - OSDSession *session = nullptr; - ceph_tid_t tid = 0; - vector cmd; - bufferlist inbl; - bufferlist *poutbl = nullptr; - string *prs = nullptr; - - // target_osd == -1 means target_pg is valid - const int target_osd = -1; - const pg_t target_pg; - - op_target_t target; - - epoch_t map_dne_bound = 0; - int map_check_error = 0; // error to return if map check fails - const char *map_check_error_str = nullptr; - - Context *onfinish = nullptr; - uint64_t ontimeout = 0; - ceph::mono_time last_submit; - - CommandOp( - int target_osd, - const vector &cmd, - bufferlist inbl, - bufferlist *poutbl, - string *prs, - Context *onfinish) - : cmd(cmd), - inbl(inbl), - poutbl(poutbl), - prs(prs), - target_osd(target_osd), - onfinish(onfinish) {} - - CommandOp( - pg_t pgid, - const vector &cmd, - bufferlist inbl, - bufferlist *poutbl, - string *prs, - Context *onfinish) - : cmd(cmd), - inbl(inbl), - poutbl(poutbl), - prs(prs), - target_pg(pgid), - target(pgid), - onfinish(onfinish) {} - - }; - - void submit_command(CommandOp *c, ceph_tid_t *ptid); - int _calc_command_target(CommandOp *c, shunique_lock &sul); - void _assign_command_session(CommandOp *c, shunique_lock &sul); - void _send_command(CommandOp *c); - int command_op_cancel(OSDSession *s, ceph_tid_t tid, int r); - void _finish_command(CommandOp *c, int r, string rs); - void handle_command_reply(MCommandReply *m); - - - // -- lingering ops -- - - struct WatchContext { - // this simply mirrors librados WatchCtx2 - virtual void handle_notify(uint64_t notify_id, - uint64_t cookie, - uint64_t notifier_id, - bufferlist& bl) = 0; - virtual void handle_error(uint64_t cookie, int err) = 0; - virtual ~WatchContext() {} - }; - - struct LingerOp : public RefCountedObject { - uint64_t linger_id; - - op_target_t target; - - snapid_t snap; - SnapContext snapc; - ceph::real_time mtime; - - vector ops; - bufferlist inbl; - bufferlist *poutbl; - version_t *pobjver; - - bool is_watch; - ceph::mono_time watch_valid_thru; ///< send time for last acked ping - int last_error; ///< error from last failed ping|reconnect, if any - boost::shared_mutex watch_lock; - using lock_guard = std::unique_lock; - using unique_lock = std::unique_lock; - using shared_lock = boost::shared_lock; - using shunique_lock = ceph::shunique_lock; - - // queue of pending async operations, with the timestamp of - // when they were queued. - list watch_pending_async; - - uint32_t register_gen; - bool registered; - bool canceled; - Context *on_reg_commit; - - // we trigger these from an async finisher - Context *on_notify_finish; - bufferlist *notify_result_bl; - uint64_t notify_id; - - WatchContext *watch_context; - - OSDSession *session; - - ceph_tid_t register_tid; - ceph_tid_t ping_tid; - epoch_t map_dne_bound; - - void _queued_async() { - // watch_lock ust be locked unique - watch_pending_async.push_back(ceph::mono_clock::now()); - } - void finished_async() { - unique_lock l(watch_lock); - assert(!watch_pending_async.empty()); - watch_pending_async.pop_front(); - } - - LingerOp() : linger_id(0), - target(object_t(), object_locator_t(), 0), - snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL), - is_watch(false), last_error(0), - register_gen(0), - registered(false), - canceled(false), - on_reg_commit(NULL), - on_notify_finish(NULL), - notify_result_bl(NULL), - notify_id(0), - watch_context(NULL), - session(NULL), - register_tid(0), - ping_tid(0), - map_dne_bound(0) {} - - // no copy! - const LingerOp &operator=(const LingerOp& r); - LingerOp(const LingerOp& o); - - uint64_t get_cookie() { - return reinterpret_cast(this); - } - - private: - ~LingerOp() override { - delete watch_context; - } - }; - - struct C_Linger_Commit : public Context { - Objecter *objecter; - LingerOp *info; - bufferlist outbl; // used for notify only - C_Linger_Commit(Objecter *o, LingerOp *l) : objecter(o), info(l) { - info->get(); - } - ~C_Linger_Commit() override { - info->put(); - } - void finish(int r) override { - objecter->_linger_commit(info, r, outbl); - } - }; - - struct C_Linger_Reconnect : public Context { - Objecter *objecter; - LingerOp *info; - C_Linger_Reconnect(Objecter *o, LingerOp *l) : objecter(o), info(l) { - info->get(); - } - ~C_Linger_Reconnect() override { - info->put(); - } - void finish(int r) override { - objecter->_linger_reconnect(info, r); - } - }; - - struct C_Linger_Ping : public Context { - Objecter *objecter; - LingerOp *info; - ceph::mono_time sent; - uint32_t register_gen; - C_Linger_Ping(Objecter *o, LingerOp *l) - : objecter(o), info(l), register_gen(info->register_gen) { - info->get(); - } - ~C_Linger_Ping() override { - info->put(); - } - void finish(int r) override { - objecter->_linger_ping(info, r, sent, register_gen); - } - }; - - struct C_Linger_Map_Latest : public Context { - Objecter *objecter; - uint64_t linger_id; - version_t latest; - C_Linger_Map_Latest(Objecter *o, uint64_t id) : - objecter(o), linger_id(id), latest(0) {} - void finish(int r) override; - }; - - // -- osd sessions -- - struct OSDBackoff { - spg_t pgid; - uint64_t id; - hobject_t begin, end; - }; - - struct OSDSession : public RefCountedObject { - boost::shared_mutex lock; - using lock_guard = std::lock_guard; - using unique_lock = std::unique_lock; - using shared_lock = boost::shared_lock; - using shunique_lock = ceph::shunique_lock; - - // pending ops - map ops; - map linger_ops; - map command_ops; - - // backoffs - map> backoffs; - map backoffs_by_id; - - int osd; - int incarnation; - ConnectionRef con; - int num_locks; - std::unique_ptr completion_locks; - using unique_completion_lock = std::unique_lock< - decltype(completion_locks)::element_type>; - - - OSDSession(CephContext *cct, int o) : - osd(o), incarnation(0), con(NULL), - num_locks(cct->_conf->objecter_completion_locks_per_session), - completion_locks(new std::mutex[num_locks]) {} - - ~OSDSession() override; - - bool is_homeless() { return (osd == -1); } - - unique_completion_lock get_lock(object_t& oid); - }; - map osd_sessions; - - bool osdmap_full_flag() const; - bool osdmap_pool_full(const int64_t pool_id) const; - - private: - - /** - * Test pg_pool_t::FLAG_FULL on a pool - * - * @return true if the pool exists and has the flag set, or - * the global full flag is set, else false - */ - bool _osdmap_pool_full(const int64_t pool_id) const; - bool _osdmap_pool_full(const pg_pool_t &p) const; - void update_pool_full_map(map& pool_full_map); - - map linger_ops; - // we use this just to confirm a cookie is valid before dereferencing the ptr - set linger_ops_set; - - map poolstat_ops; - map statfs_ops; - map pool_ops; - std::atomic num_homeless_ops{0}; - - OSDSession *homeless_session; - - // ops waiting for an osdmap with a new pool or confirmation that - // the pool does not exist (may be expanded to other uses later) - map check_latest_map_lingers; - map check_latest_map_ops; - map check_latest_map_commands; - - map > > waiting_for_map; - - ceph::timespan mon_timeout; - ceph::timespan osd_timeout; - - MOSDOp *_prepare_osd_op(Op *op); - void _send_op(Op *op, MOSDOp *m = NULL); - void _send_op_account(Op *op); - void _cancel_linger_op(Op *op); - void finish_op(OSDSession *session, ceph_tid_t tid); - void _finish_op(Op *op, int r); - static bool is_pg_changed( - int oldprimary, - const vector& oldacting, - int newprimary, - const vector& newacting, - bool any_change=false); - enum recalc_op_target_result { - RECALC_OP_TARGET_NO_ACTION = 0, - RECALC_OP_TARGET_NEED_RESEND, - RECALC_OP_TARGET_POOL_DNE, - RECALC_OP_TARGET_OSD_DNE, - RECALC_OP_TARGET_OSD_DOWN, - }; - bool _osdmap_full_flag() const; - bool _osdmap_has_pool_full() const; - - bool target_should_be_paused(op_target_t *op); - int _calc_target(op_target_t *t, Connection *con, - bool any_change = false); - int _map_session(op_target_t *op, OSDSession **s, - shunique_lock& lc); - - void _session_op_assign(OSDSession *s, Op *op); - void _session_op_remove(OSDSession *s, Op *op); - void _session_linger_op_assign(OSDSession *to, LingerOp *op); - void _session_linger_op_remove(OSDSession *from, LingerOp *op); - void _session_command_op_assign(OSDSession *to, CommandOp *op); - void _session_command_op_remove(OSDSession *from, CommandOp *op); - - int _assign_op_target_session(Op *op, shunique_lock& lc, - bool src_session_locked, - bool dst_session_locked); - int _recalc_linger_op_target(LingerOp *op, shunique_lock& lc); - - void _linger_submit(LingerOp *info, shunique_lock& sul); - void _send_linger(LingerOp *info, shunique_lock& sul); - void _linger_commit(LingerOp *info, int r, bufferlist& outbl); - void _linger_reconnect(LingerOp *info, int r); - void _send_linger_ping(LingerOp *info); - void _linger_ping(LingerOp *info, int r, ceph::mono_time sent, - uint32_t register_gen); - int _normalize_watch_error(int r); - - friend class C_DoWatchError; -public: - void linger_callback_flush(Context *ctx) { - finisher->queue(ctx); - } - -private: - void _check_op_pool_dne(Op *op, unique_lock *sl); - void _send_op_map_check(Op *op); - void _op_cancel_map_check(Op *op); - void _check_linger_pool_dne(LingerOp *op, bool *need_unregister); - void _send_linger_map_check(LingerOp *op); - void _linger_cancel_map_check(LingerOp *op); - void _check_command_map_dne(CommandOp *op); - void _send_command_map_check(CommandOp *op); - void _command_cancel_map_check(CommandOp *op); - - void kick_requests(OSDSession *session); - void _kick_requests(OSDSession *session, map& lresend); - void _linger_ops_resend(map& lresend, unique_lock& ul); - - int _get_session(int osd, OSDSession **session, shunique_lock& sul); - void put_session(OSDSession *s); - void get_session(OSDSession *s); - void _reopen_session(OSDSession *session); - void close_session(OSDSession *session); - - void _nlist_reply(NListContext *list_context, int r, Context *final_finish, - epoch_t reply_epoch); - - void resend_mon_ops(); - - /** - * handle a budget for in-flight ops - * budget is taken whenever an op goes into the ops map - * and returned whenever an op is removed from the map - * If throttle_op needs to throttle it will unlock client_lock. - */ - int calc_op_budget(Op *op); - void _throttle_op(Op *op, shunique_lock& sul, int op_size = 0); - int _take_op_budget(Op *op, shunique_lock& sul) { - assert(sul && sul.mutex() == &rwlock); - int op_budget = calc_op_budget(op); - if (keep_balanced_budget) { - _throttle_op(op, sul, op_budget); - } else { - op_throttle_bytes.take(op_budget); - op_throttle_ops.take(1); - } - op->budgeted = true; - return op_budget; - } - void put_op_budget_bytes(int op_budget) { - assert(op_budget >= 0); - op_throttle_bytes.put(op_budget); - op_throttle_ops.put(1); - } - void put_op_budget(Op *op) { - assert(op->budgeted); - int op_budget = calc_op_budget(op); - put_op_budget_bytes(op_budget); - } - void put_nlist_context_budget(NListContext *list_context); - Throttle op_throttle_bytes, op_throttle_ops; - - public: - Objecter(CephContext *cct_, Messenger *m, MonClient *mc, - Finisher *fin, - double mon_timeout, - double osd_timeout) : - Dispatcher(cct_), messenger(m), monc(mc), finisher(fin), - trace_endpoint("0.0.0.0", 0, "Objecter"), - osdmap(new OSDMap), - max_linger_id(0), - keep_balanced_budget(false), honor_osdmap_full(true), osdmap_full_try(false), - blacklist_events_enabled(false), - last_seen_osdmap_version(0), last_seen_pgmap_version(0), - logger(NULL), tick_event(0), m_request_state_hook(NULL), - homeless_session(new OSDSession(cct, -1)), - mon_timeout(ceph::make_timespan(mon_timeout)), - osd_timeout(ceph::make_timespan(osd_timeout)), - op_throttle_bytes(cct, "objecter_bytes", - cct->_conf->objecter_inflight_op_bytes), - op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops), - epoch_barrier(0), - retry_writes_after_first_reply(cct->_conf->objecter_retry_writes_after_first_reply) - { } - ~Objecter() override; - - void init(); - void start(const OSDMap *o = nullptr); - void shutdown(); - - // These two templates replace osdmap_(get)|(put)_read. Simply wrap - // whatever functionality you want to use the OSDMap in a lambda like: - // - // with_osdmap([](const OSDMap& o) { o.do_stuff(); }); - // - // or - // - // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); }); - // - // Do not call into something that will try to lock the OSDMap from - // here or you will have great woe and misery. - - template - auto with_osdmap(Callback&& cb, Args&&... args) const -> - decltype(cb(*osdmap, std::forward(args)...)) { - shared_lock l(rwlock); - return std::forward(cb)(*osdmap, std::forward(args)...); - } - - - /** - * Tell the objecter to throttle outgoing ops according to its - * budget (in _conf). If you do this, ops can block, in - * which case it will unlock client_lock and sleep until - * incoming messages reduce the used budget low enough for - * the ops to continue going; then it will lock client_lock again. - */ - void set_balanced_budget() { keep_balanced_budget = true; } - void unset_balanced_budget() { keep_balanced_budget = false; } - - void set_honor_osdmap_full() { honor_osdmap_full = true; } - void unset_honor_osdmap_full() { honor_osdmap_full = false; } - - void set_osdmap_full_try() { osdmap_full_try = true; } - void unset_osdmap_full_try() { osdmap_full_try = false; } - - void _scan_requests(OSDSession *s, - bool force_resend, - bool cluster_full, - map *pool_full_map, - map& need_resend, - list& need_resend_linger, - map& need_resend_command, - shunique_lock& sul); - - int64_t get_object_hash_position(int64_t pool, const string& key, - const string& ns); - int64_t get_object_pg_hash_position(int64_t pool, const string& key, - const string& ns); - - // messages - public: - bool ms_dispatch(Message *m) override; - bool ms_can_fast_dispatch_any() const override { - return true; - } - bool ms_can_fast_dispatch(const Message *m) const override { - switch (m->get_type()) { - case CEPH_MSG_OSD_OPREPLY: - case CEPH_MSG_WATCH_NOTIFY: - return true; - default: - return false; - } - } - void ms_fast_dispatch(Message *m) override { - if (!ms_dispatch(m)) { - m->put(); - } - } - - void handle_osd_op_reply(class MOSDOpReply *m); - void handle_osd_backoff(class MOSDBackoff *m); - void handle_watch_notify(class MWatchNotify *m); - void handle_osd_map(class MOSDMap *m); - void wait_for_osd_map(); - - /** - * Get list of entities blacklisted since this was last called, - * and reset the list. - * - * Uses a std::set because typical use case is to compare some - * other list of clients to see which overlap with the blacklisted - * addrs. - * - */ - void consume_blacklist_events(std::set *events); - - int pool_snap_by_name(int64_t poolid, - const char *snap_name, - snapid_t *snap) const; - int pool_snap_get_info(int64_t poolid, snapid_t snap, - pool_snap_info_t *info) const; - int pool_snap_list(int64_t poolid, vector *snaps); -private: - - void emit_blacklist_events(const OSDMap::Incremental &inc); - void emit_blacklist_events(const OSDMap &old_osd_map, - const OSDMap &new_osd_map); - - // low-level - void _op_submit(Op *op, shunique_lock& lc, ceph_tid_t *ptid); - void _op_submit_with_budget(Op *op, shunique_lock& lc, - ceph_tid_t *ptid, - int *ctx_budget = NULL); - inline void unregister_op(Op *op); - - // public interface -public: - void op_submit(Op *op, ceph_tid_t *ptid = NULL, int *ctx_budget = NULL); - bool is_active() { - shared_lock l(rwlock); - return !((!inflight_ops) && linger_ops.empty() && - poolstat_ops.empty() && statfs_ops.empty()); - } - - /** - * Output in-flight requests - */ - void _dump_active(OSDSession *s); - void _dump_active(); - void dump_active(); - void dump_requests(Formatter *fmt); - void _dump_ops(const OSDSession *s, Formatter *fmt); - void dump_ops(Formatter *fmt); - void _dump_linger_ops(const OSDSession *s, Formatter *fmt); - void dump_linger_ops(Formatter *fmt); - void _dump_command_ops(const OSDSession *s, Formatter *fmt); - void dump_command_ops(Formatter *fmt); - void dump_pool_ops(Formatter *fmt) const; - void dump_pool_stat_ops(Formatter *fmt) const; - void dump_statfs_ops(Formatter *fmt) const; - - int get_client_incarnation() const { return client_inc; } - void set_client_incarnation(int inc) { client_inc = inc; } - - bool have_map(epoch_t epoch); - /// wait for epoch; true if we already have it - bool wait_for_map(epoch_t epoch, Context *c, int err=0); - void _wait_for_new_map(Context *c, epoch_t epoch, int err=0); - void wait_for_latest_osdmap(Context *fin); - void get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin); - void _get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin); - - /** Get the current set of global op flags */ - int get_global_op_flags() const { return global_op_flags; } - /** Add a flag to the global op flags, not really atomic operation */ - void add_global_op_flags(int flag) { - global_op_flags.fetch_or(flag); - } - /** Clear the passed flags from the global op flag set */ - void clear_global_op_flag(int flags) { - global_op_flags.fetch_and(~flags); - } - - /// cancel an in-progress request with the given return code -private: - int op_cancel(OSDSession *s, ceph_tid_t tid, int r); - int _op_cancel(ceph_tid_t tid, int r); -public: - int op_cancel(ceph_tid_t tid, int r); - - /** - * Any write op which is in progress at the start of this call shall no - * longer be in progress when this call ends. Operations started after the - * start of this call may still be in progress when this call ends. - * - * @return the latest possible epoch in which a cancelled op could have - * existed, or -1 if nothing was cancelled. - */ - epoch_t op_cancel_writes(int r, int64_t pool=-1); - - // commands - void osd_command(int osd, const std::vector& cmd, - const bufferlist& inbl, ceph_tid_t *ptid, - bufferlist *poutbl, string *prs, Context *onfinish) { - assert(osd >= 0); - CommandOp *c = new CommandOp( - osd, - cmd, - inbl, - poutbl, - prs, - onfinish); - submit_command(c, ptid); - } - void pg_command(pg_t pgid, const vector& cmd, - const bufferlist& inbl, ceph_tid_t *ptid, - bufferlist *poutbl, string *prs, Context *onfinish) { - CommandOp *c = new CommandOp( - pgid, - cmd, - inbl, - poutbl, - prs, - onfinish); - submit_command(c, ptid); - } - - // mid-level helpers - Op *prepare_mutate_op( - const object_t& oid, const object_locator_t& oloc, - ObjectOperation& op, const SnapContext& snapc, - ceph::real_time mtime, int flags, - Context *oncommit, version_t *objver = NULL, - osd_reqid_t reqid = osd_reqid_t(), - ZTracer::Trace *parent_trace = nullptr) { - Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags | - CEPH_OSD_FLAG_WRITE, oncommit, objver, nullptr, parent_trace); - o->priority = op.priority; - o->mtime = mtime; - o->snapc = snapc; - o->out_rval.swap(op.out_rval); - o->reqid = reqid; - return o; - } - ceph_tid_t mutate( - const object_t& oid, const object_locator_t& oloc, - ObjectOperation& op, const SnapContext& snapc, - ceph::real_time mtime, int flags, - Context *oncommit, version_t *objver = NULL, - osd_reqid_t reqid = osd_reqid_t()) { - Op *o = prepare_mutate_op(oid, oloc, op, snapc, mtime, flags, - oncommit, objver, reqid); - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - Op *prepare_read_op( - const object_t& oid, const object_locator_t& oloc, - ObjectOperation& op, - snapid_t snapid, bufferlist *pbl, int flags, - Context *onack, version_t *objver = NULL, - int *data_offset = NULL, - uint64_t features = 0, - ZTracer::Trace *parent_trace = nullptr) { - Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags | - CEPH_OSD_FLAG_READ, onack, objver, data_offset, parent_trace); - o->priority = op.priority; - o->snapid = snapid; - o->outbl = pbl; - if (!o->outbl && op.size() == 1 && op.out_bl[0]->length()) - o->outbl = op.out_bl[0]; - o->out_bl.swap(op.out_bl); - o->out_handler.swap(op.out_handler); - o->out_rval.swap(op.out_rval); - return o; - } - ceph_tid_t read( - const object_t& oid, const object_locator_t& oloc, - ObjectOperation& op, - snapid_t snapid, bufferlist *pbl, int flags, - Context *onack, version_t *objver = NULL, - int *data_offset = NULL, - uint64_t features = 0) { - Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver, - data_offset); - if (features) - o->features = features; - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - Op *prepare_pg_read_op( - uint32_t hash, object_locator_t oloc, - ObjectOperation& op, bufferlist *pbl, int flags, - Context *onack, epoch_t *reply_epoch, - int *ctx_budget) { - Op *o = new Op(object_t(), oloc, - op.ops, - flags | global_op_flags | CEPH_OSD_FLAG_READ | - CEPH_OSD_FLAG_IGNORE_OVERLAY, - onack, NULL); - o->target.precalc_pgid = true; - o->target.base_pgid = pg_t(hash, oloc.pool); - o->priority = op.priority; - o->snapid = CEPH_NOSNAP; - o->outbl = pbl; - o->out_bl.swap(op.out_bl); - o->out_handler.swap(op.out_handler); - o->out_rval.swap(op.out_rval); - o->reply_epoch = reply_epoch; - if (ctx_budget) { - // budget is tracked by listing context - o->ctx_budgeted = true; - } - return o; - } - ceph_tid_t pg_read( - uint32_t hash, object_locator_t oloc, - ObjectOperation& op, bufferlist *pbl, int flags, - Context *onack, epoch_t *reply_epoch, - int *ctx_budget) { - Op *o = prepare_pg_read_op(hash, oloc, op, pbl, flags, - onack, reply_epoch, ctx_budget); - ceph_tid_t tid; - op_submit(o, &tid, ctx_budget); - return tid; - } - - // caller owns a ref - LingerOp *linger_register(const object_t& oid, const object_locator_t& oloc, - int flags); - ceph_tid_t linger_watch(LingerOp *info, - ObjectOperation& op, - const SnapContext& snapc, ceph::real_time mtime, - bufferlist& inbl, - Context *onfinish, - version_t *objver); - ceph_tid_t linger_notify(LingerOp *info, - ObjectOperation& op, - snapid_t snap, bufferlist& inbl, - bufferlist *poutbl, - Context *onack, - version_t *objver); - int linger_check(LingerOp *info); - void linger_cancel(LingerOp *info); // releases a reference - void _linger_cancel(LingerOp *info); - - void _do_watch_notify(LingerOp *info, MWatchNotify *m); - - /** - * set up initial ops in the op vector, and allocate a final op slot. - * - * The caller is responsible for filling in the final ops_count ops. - * - * @param ops op vector - * @param ops_count number of final ops the caller will fill in - * @param extra_ops pointer to [array of] initial op[s] - * @return index of final op (for caller to fill in) - */ - int init_ops(vector& ops, int ops_count, ObjectOperation *extra_ops) { - int i; - int extra = 0; - - if (extra_ops) - extra = extra_ops->ops.size(); - - ops.resize(ops_count + extra); - - for (i=0; iops[i]; - } - - return i; - } - - - // high-level helpers - Op *prepare_stat_op( - const object_t& oid, const object_locator_t& oloc, - snapid_t snap, uint64_t *psize, ceph::real_time *pmtime, - int flags, Context *onfinish, version_t *objver = NULL, - ObjectOperation *extra_ops = NULL) { - vector ops; - int i = init_ops(ops, 1, extra_ops); - ops[i].op.op = CEPH_OSD_OP_STAT; - C_Stat *fin = new C_Stat(psize, pmtime, onfinish); - Op *o = new Op(oid, oloc, ops, flags | global_op_flags | - CEPH_OSD_FLAG_READ, fin, objver); - o->snapid = snap; - o->outbl = &fin->bl; - return o; - } - ceph_tid_t stat( - const object_t& oid, const object_locator_t& oloc, - snapid_t snap, uint64_t *psize, ceph::real_time *pmtime, - int flags, Context *onfinish, version_t *objver = NULL, - ObjectOperation *extra_ops = NULL) { - Op *o = prepare_stat_op(oid, oloc, snap, psize, pmtime, flags, - onfinish, objver, extra_ops); - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - - Op *prepare_read_op( - const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, - int flags, Context *onfinish, version_t *objver = NULL, - ObjectOperation *extra_ops = NULL, int op_flags = 0, - ZTracer::Trace *parent_trace = nullptr) { - vector ops; - int i = init_ops(ops, 1, extra_ops); - ops[i].op.op = CEPH_OSD_OP_READ; - ops[i].op.extent.offset = off; - ops[i].op.extent.length = len; - ops[i].op.extent.truncate_size = 0; - ops[i].op.extent.truncate_seq = 0; - ops[i].op.flags = op_flags; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags | - CEPH_OSD_FLAG_READ, onfinish, objver, nullptr, parent_trace); - o->snapid = snap; - o->outbl = pbl; - return o; - } - ceph_tid_t read( - const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, - int flags, Context *onfinish, version_t *objver = NULL, - ObjectOperation *extra_ops = NULL, int op_flags = 0) { - Op *o = prepare_read_op(oid, oloc, off, len, snap, pbl, flags, - onfinish, objver, extra_ops, op_flags); - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - - Op *prepare_cmpext_op( - const object_t& oid, const object_locator_t& oloc, - uint64_t off, bufferlist &cmp_bl, - snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL, - ObjectOperation *extra_ops = NULL, int op_flags = 0) { - vector ops; - int i = init_ops(ops, 1, extra_ops); - ops[i].op.op = CEPH_OSD_OP_CMPEXT; - ops[i].op.extent.offset = off; - ops[i].op.extent.length = cmp_bl.length(); - ops[i].op.extent.truncate_size = 0; - ops[i].op.extent.truncate_seq = 0; - ops[i].indata = cmp_bl; - ops[i].op.flags = op_flags; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags | - CEPH_OSD_FLAG_READ, onfinish, objver); - o->snapid = snap; - return o; - } - - ceph_tid_t cmpext( - const object_t& oid, const object_locator_t& oloc, - uint64_t off, bufferlist &cmp_bl, - snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL, - ObjectOperation *extra_ops = NULL, int op_flags = 0) { - Op *o = prepare_cmpext_op(oid, oloc, off, cmp_bl, snap, - flags, onfinish, objver, extra_ops, op_flags); - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - - ceph_tid_t read_trunc(const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, snapid_t snap, - bufferlist *pbl, int flags, uint64_t trunc_size, - __u32 trunc_seq, Context *onfinish, - version_t *objver = NULL, - ObjectOperation *extra_ops = NULL, int op_flags = 0) { - vector ops; - int i = init_ops(ops, 1, extra_ops); - ops[i].op.op = CEPH_OSD_OP_READ; - ops[i].op.extent.offset = off; - ops[i].op.extent.length = len; - ops[i].op.extent.truncate_size = trunc_size; - ops[i].op.extent.truncate_seq = trunc_seq; - ops[i].op.flags = op_flags; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags | - CEPH_OSD_FLAG_READ, onfinish, objver); - o->snapid = snap; - o->outbl = pbl; - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - ceph_tid_t mapext(const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, - int flags, Context *onfinish, version_t *objver = NULL, - ObjectOperation *extra_ops = NULL) { - vector ops; - int i = init_ops(ops, 1, extra_ops); - ops[i].op.op = CEPH_OSD_OP_MAPEXT; - ops[i].op.extent.offset = off; - ops[i].op.extent.length = len; - ops[i].op.extent.truncate_size = 0; - ops[i].op.extent.truncate_seq = 0; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags | - CEPH_OSD_FLAG_READ, onfinish, objver); - o->snapid = snap; - o->outbl = pbl; - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - ceph_tid_t getxattr(const object_t& oid, const object_locator_t& oloc, - const char *name, snapid_t snap, bufferlist *pbl, int flags, - Context *onfinish, - version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { - vector ops; - int i = init_ops(ops, 1, extra_ops); - ops[i].op.op = CEPH_OSD_OP_GETXATTR; - ops[i].op.xattr.name_len = (name ? strlen(name) : 0); - ops[i].op.xattr.value_len = 0; - if (name) - ops[i].indata.append(name); - Op *o = new Op(oid, oloc, ops, flags | global_op_flags | - CEPH_OSD_FLAG_READ, onfinish, objver); - o->snapid = snap; - o->outbl = pbl; - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - - ceph_tid_t getxattrs(const object_t& oid, const object_locator_t& oloc, - snapid_t snap, map& attrset, - int flags, Context *onfinish, version_t *objver = NULL, - ObjectOperation *extra_ops = NULL) { - vector ops; - int i = init_ops(ops, 1, extra_ops); - ops[i].op.op = CEPH_OSD_OP_GETXATTRS; - C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish); - Op *o = new Op(oid, oloc, ops, flags | global_op_flags | - CEPH_OSD_FLAG_READ, fin, objver); - o->snapid = snap; - o->outbl = &fin->bl; - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - - ceph_tid_t read_full(const object_t& oid, const object_locator_t& oloc, - snapid_t snap, bufferlist *pbl, int flags, - Context *onfinish, version_t *objver = NULL, - ObjectOperation *extra_ops = NULL) { - return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags | - CEPH_OSD_FLAG_READ, onfinish, objver, extra_ops); - } - - - // writes - ceph_tid_t _modify(const object_t& oid, const object_locator_t& oloc, - vector& ops, ceph::real_time mtime, - const SnapContext& snapc, int flags, - Context *oncommit, - version_t *objver = NULL) { - Op *o = new Op(oid, oloc, ops, flags | global_op_flags | - CEPH_OSD_FLAG_WRITE, oncommit, objver); - o->mtime = mtime; - o->snapc = snapc; - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - Op *prepare_write_op( - const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, const SnapContext& snapc, - const bufferlist &bl, ceph::real_time mtime, int flags, - Context *oncommit, version_t *objver = NULL, - ObjectOperation *extra_ops = NULL, int op_flags = 0, - ZTracer::Trace *parent_trace = nullptr) { - vector ops; - int i = init_ops(ops, 1, extra_ops); - ops[i].op.op = CEPH_OSD_OP_WRITE; - ops[i].op.extent.offset = off; - ops[i].op.extent.length = len; - ops[i].op.extent.truncate_size = 0; - ops[i].op.extent.truncate_seq = 0; - ops[i].indata = bl; - ops[i].op.flags = op_flags; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags | - CEPH_OSD_FLAG_WRITE, oncommit, objver, - nullptr, parent_trace); - o->mtime = mtime; - o->snapc = snapc; - return o; - } - ceph_tid_t write( - const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, const SnapContext& snapc, - const bufferlist &bl, ceph::real_time mtime, int flags, - Context *oncommit, version_t *objver = NULL, - ObjectOperation *extra_ops = NULL, int op_flags = 0) { - Op *o = prepare_write_op(oid, oloc, off, len, snapc, bl, mtime, flags, - oncommit, objver, extra_ops, op_flags); - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - Op *prepare_append_op( - const object_t& oid, const object_locator_t& oloc, - uint64_t len, const SnapContext& snapc, - const bufferlist &bl, ceph::real_time mtime, int flags, - Context *oncommit, - version_t *objver = NULL, - ObjectOperation *extra_ops = NULL) { - vector ops; - int i = init_ops(ops, 1, extra_ops); - ops[i].op.op = CEPH_OSD_OP_APPEND; - ops[i].op.extent.offset = 0; - ops[i].op.extent.length = len; - ops[i].op.extent.truncate_size = 0; - ops[i].op.extent.truncate_seq = 0; - ops[i].indata = bl; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags | - CEPH_OSD_FLAG_WRITE, oncommit, objver); - o->mtime = mtime; - o->snapc = snapc; - return o; - } - ceph_tid_t append( - const object_t& oid, const object_locator_t& oloc, - uint64_t len, const SnapContext& snapc, - const bufferlist &bl, ceph::real_time mtime, int flags, - Context *oncommit, - version_t *objver = NULL, - ObjectOperation *extra_ops = NULL) { - Op *o = prepare_append_op(oid, oloc, len, snapc, bl, mtime, flags, - oncommit, objver, extra_ops); - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - ceph_tid_t write_trunc(const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, const SnapContext& snapc, - const bufferlist &bl, ceph::real_time mtime, int flags, - uint64_t trunc_size, __u32 trunc_seq, - Context *oncommit, - version_t *objver = NULL, - ObjectOperation *extra_ops = NULL, int op_flags = 0) { - vector ops; - int i = init_ops(ops, 1, extra_ops); - ops[i].op.op = CEPH_OSD_OP_WRITE; - ops[i].op.extent.offset = off; - ops[i].op.extent.length = len; - ops[i].op.extent.truncate_size = trunc_size; - ops[i].op.extent.truncate_seq = trunc_seq; - ops[i].indata = bl; - ops[i].op.flags = op_flags; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags | - CEPH_OSD_FLAG_WRITE, oncommit, objver); - o->mtime = mtime; - o->snapc = snapc; - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - Op *prepare_write_full_op( - const object_t& oid, const object_locator_t& oloc, - const SnapContext& snapc, const bufferlist &bl, - ceph::real_time mtime, int flags, - Context *oncommit, version_t *objver = NULL, - ObjectOperation *extra_ops = NULL, int op_flags = 0) { - vector ops; - int i = init_ops(ops, 1, extra_ops); - ops[i].op.op = CEPH_OSD_OP_WRITEFULL; - ops[i].op.extent.offset = 0; - ops[i].op.extent.length = bl.length(); - ops[i].indata = bl; - ops[i].op.flags = op_flags; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags | - CEPH_OSD_FLAG_WRITE, oncommit, objver); - o->mtime = mtime; - o->snapc = snapc; - return o; - } - ceph_tid_t write_full( - const object_t& oid, const object_locator_t& oloc, - const SnapContext& snapc, const bufferlist &bl, - ceph::real_time mtime, int flags, - Context *oncommit, version_t *objver = NULL, - ObjectOperation *extra_ops = NULL, int op_flags = 0) { - Op *o = prepare_write_full_op(oid, oloc, snapc, bl, mtime, flags, - oncommit, objver, extra_ops, op_flags); - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - Op *prepare_writesame_op( - const object_t& oid, const object_locator_t& oloc, - uint64_t write_len, uint64_t off, - const SnapContext& snapc, const bufferlist &bl, - ceph::real_time mtime, int flags, - Context *oncommit, version_t *objver = NULL, - ObjectOperation *extra_ops = NULL, int op_flags = 0) { - - vector ops; - int i = init_ops(ops, 1, extra_ops); - ops[i].op.op = CEPH_OSD_OP_WRITESAME; - ops[i].op.writesame.offset = off; - ops[i].op.writesame.length = write_len; - ops[i].op.writesame.data_length = bl.length(); - ops[i].indata = bl; - ops[i].op.flags = op_flags; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags | - CEPH_OSD_FLAG_WRITE, oncommit, objver); - o->mtime = mtime; - o->snapc = snapc; - return o; - } - ceph_tid_t writesame( - const object_t& oid, const object_locator_t& oloc, - uint64_t write_len, uint64_t off, - const SnapContext& snapc, const bufferlist &bl, - ceph::real_time mtime, int flags, - Context *oncommit, version_t *objver = NULL, - ObjectOperation *extra_ops = NULL, int op_flags = 0) { - - Op *o = prepare_writesame_op(oid, oloc, write_len, off, snapc, bl, - mtime, flags, oncommit, objver, - extra_ops, op_flags); - - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - ceph_tid_t trunc(const object_t& oid, const object_locator_t& oloc, - const SnapContext& snapc, ceph::real_time mtime, int flags, - uint64_t trunc_size, __u32 trunc_seq, - Context *oncommit, version_t *objver = NULL, - ObjectOperation *extra_ops = NULL) { - vector ops; - int i = init_ops(ops, 1, extra_ops); - ops[i].op.op = CEPH_OSD_OP_TRUNCATE; - ops[i].op.extent.offset = trunc_size; - ops[i].op.extent.truncate_size = trunc_size; - ops[i].op.extent.truncate_seq = trunc_seq; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags | - CEPH_OSD_FLAG_WRITE, oncommit, objver); - o->mtime = mtime; - o->snapc = snapc; - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - ceph_tid_t zero(const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, const SnapContext& snapc, - ceph::real_time mtime, int flags, Context *oncommit, - version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { - vector ops; - int i = init_ops(ops, 1, extra_ops); - ops[i].op.op = CEPH_OSD_OP_ZERO; - ops[i].op.extent.offset = off; - ops[i].op.extent.length = len; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags | - CEPH_OSD_FLAG_WRITE, oncommit, objver); - o->mtime = mtime; - o->snapc = snapc; - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - ceph_tid_t rollback_object(const object_t& oid, const object_locator_t& oloc, - const SnapContext& snapc, snapid_t snapid, - ceph::real_time mtime, Context *oncommit, - version_t *objver = NULL, - ObjectOperation *extra_ops = NULL) { - vector ops; - int i = init_ops(ops, 1, extra_ops); - ops[i].op.op = CEPH_OSD_OP_ROLLBACK; - ops[i].op.snap.snapid = snapid; - Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, oncommit, objver); - o->mtime = mtime; - o->snapc = snapc; - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - ceph_tid_t create(const object_t& oid, const object_locator_t& oloc, - const SnapContext& snapc, ceph::real_time mtime, int global_flags, - int create_flags, Context *oncommit, - version_t *objver = NULL, - ObjectOperation *extra_ops = NULL) { - vector ops; - int i = init_ops(ops, 1, extra_ops); - ops[i].op.op = CEPH_OSD_OP_CREATE; - ops[i].op.flags = create_flags; - Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags | - CEPH_OSD_FLAG_WRITE, oncommit, objver); - o->mtime = mtime; - o->snapc = snapc; - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - Op *prepare_remove_op( - const object_t& oid, const object_locator_t& oloc, - const SnapContext& snapc, ceph::real_time mtime, int flags, - Context *oncommit, - version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { - vector ops; - int i = init_ops(ops, 1, extra_ops); - ops[i].op.op = CEPH_OSD_OP_DELETE; - Op *o = new Op(oid, oloc, ops, flags | global_op_flags | - CEPH_OSD_FLAG_WRITE, oncommit, objver); - o->mtime = mtime; - o->snapc = snapc; - return o; - } - ceph_tid_t remove( - const object_t& oid, const object_locator_t& oloc, - const SnapContext& snapc, ceph::real_time mtime, int flags, - Context *oncommit, - version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { - Op *o = prepare_remove_op(oid, oloc, snapc, mtime, flags, - oncommit, objver, extra_ops); - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - - ceph_tid_t setxattr(const object_t& oid, const object_locator_t& oloc, - const char *name, const SnapContext& snapc, const bufferlist &bl, - ceph::real_time mtime, int flags, - Context *oncommit, - version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { - vector ops; - int i = init_ops(ops, 1, extra_ops); - ops[i].op.op = CEPH_OSD_OP_SETXATTR; - ops[i].op.xattr.name_len = (name ? strlen(name) : 0); - ops[i].op.xattr.value_len = bl.length(); - if (name) - ops[i].indata.append(name); - ops[i].indata.append(bl); - Op *o = new Op(oid, oloc, ops, flags | global_op_flags | - CEPH_OSD_FLAG_WRITE, oncommit, objver); - o->mtime = mtime; - o->snapc = snapc; - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - ceph_tid_t removexattr(const object_t& oid, const object_locator_t& oloc, - const char *name, const SnapContext& snapc, - ceph::real_time mtime, int flags, - Context *oncommit, - version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { - vector ops; - int i = init_ops(ops, 1, extra_ops); - ops[i].op.op = CEPH_OSD_OP_RMXATTR; - ops[i].op.xattr.name_len = (name ? strlen(name) : 0); - ops[i].op.xattr.value_len = 0; - if (name) - ops[i].indata.append(name); - Op *o = new Op(oid, oloc, ops, flags | global_op_flags | - CEPH_OSD_FLAG_WRITE, oncommit, objver); - o->mtime = mtime; - o->snapc = snapc; - ceph_tid_t tid; - op_submit(o, &tid); - return tid; - } - - void list_nobjects(NListContext *p, Context *onfinish); - uint32_t list_nobjects_seek(NListContext *p, uint32_t pos); - uint32_t list_nobjects_seek(NListContext *list_context, const hobject_t& c); - void list_nobjects_get_cursor(NListContext *list_context, hobject_t *c); - - hobject_t enumerate_objects_begin(); - hobject_t enumerate_objects_end(); - //hobject_t enumerate_objects_begin(int n, int m); - void enumerate_objects( - int64_t pool_id, - const std::string &ns, - const hobject_t &start, - const hobject_t &end, - const uint32_t max, - const bufferlist &filter_bl, - std::list *result, - hobject_t *next, - Context *on_finish); - - void _enumerate_reply( - bufferlist &bl, - int r, - const hobject_t &end, - const int64_t pool_id, - int budget, - epoch_t reply_epoch, - std::list *result, - hobject_t *next, - Context *on_finish); - friend class C_EnumerateReply; - - // ------------------------- - // pool ops -private: - void pool_op_submit(PoolOp *op); - void _pool_op_submit(PoolOp *op); - void _finish_pool_op(PoolOp *op, int r); - void _do_delete_pool(int64_t pool, Context *onfinish); -public: - int create_pool_snap(int64_t pool, string& snapName, Context *onfinish); - int allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid, - Context *onfinish); - int delete_pool_snap(int64_t pool, string& snapName, Context *onfinish); - int delete_selfmanaged_snap(int64_t pool, snapid_t snap, Context *onfinish); - - int create_pool(string& name, Context *onfinish, uint64_t auid=0, - int crush_rule=-1); - int delete_pool(int64_t pool, Context *onfinish); - int delete_pool(const string& name, Context *onfinish); - int change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid); - - void handle_pool_op_reply(MPoolOpReply *m); - int pool_op_cancel(ceph_tid_t tid, int r); - - // -------------------------- - // pool stats -private: - void _poolstat_submit(PoolStatOp *op); -public: - void handle_get_pool_stats_reply(MGetPoolStatsReply *m); - void get_pool_stats(list& pools, map *result, - Context *onfinish); - int pool_stat_op_cancel(ceph_tid_t tid, int r); - void _finish_pool_stat_op(PoolStatOp *op, int r); - - // --------------------------- - // df stats -private: - void _fs_stats_submit(StatfsOp *op); -public: - void handle_fs_stats_reply(MStatfsReply *m); - void get_fs_stats(struct ceph_statfs& result, boost::optional poolid, - Context *onfinish); - int statfs_op_cancel(ceph_tid_t tid, int r); - void _finish_statfs_op(StatfsOp *op, int r); - - // --------------------------- - // some scatter/gather hackery - - void _sg_read_finish(vector& extents, - vector& resultbl, - bufferlist *bl, Context *onfinish); - - struct C_SGRead : public Context { - Objecter *objecter; - vector extents; - vector resultbl; - bufferlist *bl; - Context *onfinish; - C_SGRead(Objecter *ob, - vector& e, vector& r, bufferlist *b, - Context *c) : - objecter(ob), bl(b), onfinish(c) { - extents.swap(e); - resultbl.swap(r); - } - void finish(int r) override { - objecter->_sg_read_finish(extents, resultbl, bl, onfinish); - } - }; - - void sg_read_trunc(vector& extents, snapid_t snap, - bufferlist *bl, int flags, uint64_t trunc_size, - __u32 trunc_seq, Context *onfinish, int op_flags = 0) { - if (extents.size() == 1) { - read_trunc(extents[0].oid, extents[0].oloc, extents[0].offset, - extents[0].length, snap, bl, flags, extents[0].truncate_size, - trunc_seq, onfinish, 0, 0, op_flags); - } else { - C_GatherBuilder gather(cct); - vector resultbl(extents.size()); - int i=0; - for (vector::iterator p = extents.begin(); - p != extents.end(); - ++p) { - read_trunc(p->oid, p->oloc, p->offset, p->length, snap, &resultbl[i++], - flags, p->truncate_size, trunc_seq, gather.new_sub(), - 0, 0, op_flags); - } - gather.set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish)); - gather.activate(); - } - } - - void sg_read(vector& extents, snapid_t snap, bufferlist *bl, - int flags, Context *onfinish, int op_flags = 0) { - sg_read_trunc(extents, snap, bl, flags, 0, 0, onfinish, op_flags); - } - - void sg_write_trunc(vector& extents, const SnapContext& snapc, - const bufferlist& bl, ceph::real_time mtime, int flags, - uint64_t trunc_size, __u32 trunc_seq, - Context *oncommit, int op_flags = 0) { - if (extents.size() == 1) { - write_trunc(extents[0].oid, extents[0].oloc, extents[0].offset, - extents[0].length, snapc, bl, mtime, flags, - extents[0].truncate_size, trunc_seq, oncommit, - 0, 0, op_flags); - } else { - C_GatherBuilder gcom(cct, oncommit); - for (vector::iterator p = extents.begin(); - p != extents.end(); - ++p) { - bufferlist cur; - for (vector >::iterator bit - = p->buffer_extents.begin(); - bit != p->buffer_extents.end(); - ++bit) - bl.copy(bit->first, bit->second, cur); - assert(cur.length() == p->length); - write_trunc(p->oid, p->oloc, p->offset, p->length, - snapc, cur, mtime, flags, p->truncate_size, trunc_seq, - oncommit ? gcom.new_sub():0, - 0, 0, op_flags); - } - gcom.activate(); - } - } - - void sg_write(vector& extents, const SnapContext& snapc, - const bufferlist& bl, ceph::real_time mtime, int flags, - Context *oncommit, int op_flags = 0) { - sg_write_trunc(extents, snapc, bl, mtime, flags, 0, 0, oncommit, - op_flags); - } - - void ms_handle_connect(Connection *con) override; - bool ms_handle_reset(Connection *con) override; - void ms_handle_remote_reset(Connection *con) override; - bool ms_handle_refused(Connection *con) override; - bool ms_get_authorizer(int dest_type, - AuthAuthorizer **authorizer, - bool force_new) override; - - void blacklist_self(bool set); - -private: - epoch_t epoch_barrier; - bool retry_writes_after_first_reply; -public: - void set_epoch_barrier(epoch_t epoch); - - PerfCounters *get_logger() { - return logger; - } -}; - -#endif