1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_OBJECTER_H
16 #define CEPH_OBJECTER_H
18 #include <condition_variable>
24 #include <type_traits>
26 #include <boost/thread/shared_mutex.hpp>
28 #include "include/assert.h"
29 #include "include/buffer.h"
30 #include "include/types.h"
31 #include "include/rados/rados_types.hpp"
33 #include "common/admin_socket.h"
34 #include "common/ceph_time.h"
35 #include "common/ceph_timer.h"
36 #include "common/Finisher.h"
37 #include "common/shunique_lock.h"
38 #include "common/zipkin_trace.h"
40 #include "messages/MOSDOp.h"
41 #include "osd/OSDMap.h"
54 class MGetPoolStatsReply;
61 // -----------------------------------------
63 struct ObjectOperation {
68 vector<bufferlist*> out_bl;
69 vector<Context*> out_handler;
70 vector<int*> out_rval;
72 ObjectOperation() : flags(0), priority(0) {}
74 while (!out_handler.empty()) {
75 delete out_handler.back();
76 out_handler.pop_back();
84 void set_last_op_flags(int flags) {
86 ops.rbegin()->op.flags = flags;
91 * Add a callback to run when this operation completes,
92 * after any other callbacks for it.
94 void add_handler(Context *extra);
96 OSDOp& add_op(int op) {
102 out_handler.resize(s+1);
103 out_handler[s] = NULL;
104 out_rval.resize(s+1);
108 void add_data(int op, uint64_t off, uint64_t len, bufferlist& bl) {
109 OSDOp& osd_op = add_op(op);
110 osd_op.op.extent.offset = off;
111 osd_op.op.extent.length = len;
112 osd_op.indata.claim_append(bl);
114 void add_writesame(int op, uint64_t off, uint64_t write_len,
116 OSDOp& osd_op = add_op(op);
117 osd_op.op.writesame.offset = off;
118 osd_op.op.writesame.length = write_len;
119 osd_op.op.writesame.data_length = bl.length();
120 osd_op.indata.claim_append(bl);
122 void add_xattr(int op, const char *name, const bufferlist& data) {
123 OSDOp& osd_op = add_op(op);
124 osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
125 osd_op.op.xattr.value_len = data.length();
127 osd_op.indata.append(name);
128 osd_op.indata.append(data);
130 void add_xattr_cmp(int op, const char *name, uint8_t cmp_op,
131 uint8_t cmp_mode, const bufferlist& data) {
132 OSDOp& osd_op = add_op(op);
133 osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
134 osd_op.op.xattr.value_len = data.length();
135 osd_op.op.xattr.cmp_op = cmp_op;
136 osd_op.op.xattr.cmp_mode = cmp_mode;
138 osd_op.indata.append(name);
139 osd_op.indata.append(data);
141 void add_call(int op, const char *cname, const char *method,
143 bufferlist *outbl, Context *ctx, int *prval) {
144 OSDOp& osd_op = add_op(op);
146 unsigned p = ops.size() - 1;
147 out_handler[p] = ctx;
151 osd_op.op.cls.class_len = strlen(cname);
152 osd_op.op.cls.method_len = strlen(method);
153 osd_op.op.cls.indata_len = indata.length();
154 osd_op.indata.append(cname, osd_op.op.cls.class_len);
155 osd_op.indata.append(method, osd_op.op.cls.method_len);
156 osd_op.indata.append(indata);
158 void add_pgls(int op, uint64_t count, collection_list_handle_t cookie,
159 epoch_t start_epoch) {
160 OSDOp& osd_op = add_op(op);
161 osd_op.op.pgls.count = count;
162 osd_op.op.pgls.start_epoch = start_epoch;
163 ::encode(cookie, osd_op.indata);
165 void add_pgls_filter(int op, uint64_t count, const bufferlist& filter,
166 collection_list_handle_t cookie, epoch_t start_epoch) {
167 OSDOp& osd_op = add_op(op);
168 osd_op.op.pgls.count = count;
169 osd_op.op.pgls.start_epoch = start_epoch;
171 string mname = "filter";
172 ::encode(cname, osd_op.indata);
173 ::encode(mname, osd_op.indata);
174 osd_op.indata.append(filter);
175 ::encode(cookie, osd_op.indata);
177 void add_alloc_hint(int op, uint64_t expected_object_size,
178 uint64_t expected_write_size,
180 OSDOp& osd_op = add_op(op);
181 osd_op.op.alloc_hint.expected_object_size = expected_object_size;
182 osd_op.op.alloc_hint.expected_write_size = expected_write_size;
183 osd_op.op.alloc_hint.flags = flags;
189 void pg_ls(uint64_t count, bufferlist& filter,
190 collection_list_handle_t cookie, epoch_t start_epoch) {
191 if (filter.length() == 0)
192 add_pgls(CEPH_OSD_OP_PGLS, count, cookie, start_epoch);
194 add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER, count, filter, cookie,
196 flags |= CEPH_OSD_FLAG_PGOP;
199 void pg_nls(uint64_t count, const bufferlist& filter,
200 collection_list_handle_t cookie, epoch_t start_epoch) {
201 if (filter.length() == 0)
202 add_pgls(CEPH_OSD_OP_PGNLS, count, cookie, start_epoch);
204 add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER, count, filter, cookie,
206 flags |= CEPH_OSD_FLAG_PGOP;
209 void scrub_ls(const librados::object_id_t& start_after,
211 std::vector<librados::inconsistent_obj_t> *objects,
214 void scrub_ls(const librados::object_id_t& start_after,
216 std::vector<librados::inconsistent_snapset_t> *objects,
220 void create(bool excl) {
221 OSDOp& o = add_op(CEPH_OSD_OP_CREATE);
222 o.op.flags = (excl ? CEPH_OSD_OP_FLAG_EXCL : 0);
225 struct C_ObjectOperation_stat : public Context {
228 ceph::real_time *pmtime;
230 struct timespec *pts;
232 C_ObjectOperation_stat(uint64_t *ps, ceph::real_time *pm, time_t *pt, struct timespec *_pts,
234 : psize(ps), pmtime(pm), ptime(pt), pts(_pts), prval(prval) {}
235 void finish(int r) override {
237 bufferlist::iterator p = bl.begin();
240 ceph::real_time mtime;
248 *ptime = ceph::real_clock::to_time_t(mtime);
250 *pts = ceph::real_clock::to_timespec(mtime);
251 } catch (buffer::error& e) {
258 void stat(uint64_t *psize, ceph::real_time *pmtime, int *prval) {
259 add_op(CEPH_OSD_OP_STAT);
260 unsigned p = ops.size() - 1;
261 C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, pmtime, NULL, NULL,
267 void stat(uint64_t *psize, time_t *ptime, int *prval) {
268 add_op(CEPH_OSD_OP_STAT);
269 unsigned p = ops.size() - 1;
270 C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, NULL, ptime, NULL,
276 void stat(uint64_t *psize, struct timespec *pts, int *prval) {
277 add_op(CEPH_OSD_OP_STAT);
278 unsigned p = ops.size() - 1;
279 C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, NULL, NULL, pts,
286 struct C_ObjectOperation_cmpext : public Context {
288 C_ObjectOperation_cmpext(int *prval)
297 void cmpext(uint64_t off, bufferlist& cmp_bl, int *prval) {
298 add_data(CEPH_OSD_OP_CMPEXT, off, cmp_bl.length(), cmp_bl);
299 unsigned p = ops.size() - 1;
300 C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval);
306 void cmpext(uint64_t off, uint64_t cmp_len, const char *cmp_buf, int *prval) {
308 cmp_bl.append(cmp_buf, cmp_len);
309 add_data(CEPH_OSD_OP_CMPEXT, off, cmp_len, cmp_bl);
310 unsigned p = ops.size() - 1;
311 C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval);
316 void read(uint64_t off, uint64_t len, bufferlist *pbl, int *prval,
319 add_data(CEPH_OSD_OP_READ, off, len, bl);
320 unsigned p = ops.size() - 1;
323 out_handler[p] = ctx;
326 struct C_ObjectOperation_sparse_read : public Context {
329 std::map<uint64_t, uint64_t> *extents;
331 C_ObjectOperation_sparse_read(bufferlist *data_bl,
332 std::map<uint64_t, uint64_t> *extents,
334 : data_bl(data_bl), extents(extents), prval(prval) {}
335 void finish(int r) override {
336 bufferlist::iterator iter = bl.begin();
339 ::decode(*extents, iter);
340 ::decode(*data_bl, iter);
341 } catch (buffer::error& e) {
348 void sparse_read(uint64_t off, uint64_t len, std::map<uint64_t,uint64_t> *m,
349 bufferlist *data_bl, int *prval) {
351 add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
352 unsigned p = ops.size() - 1;
353 C_ObjectOperation_sparse_read *h =
354 new C_ObjectOperation_sparse_read(data_bl, m, prval);
359 void write(uint64_t off, bufferlist& bl,
360 uint64_t truncate_size,
361 uint32_t truncate_seq) {
362 add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl);
363 OSDOp& o = *ops.rbegin();
364 o.op.extent.truncate_size = truncate_size;
365 o.op.extent.truncate_seq = truncate_seq;
367 void write(uint64_t off, bufferlist& bl) {
368 write(off, bl, 0, 0);
370 void write_full(bufferlist& bl) {
371 add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl);
373 void writesame(uint64_t off, uint64_t write_len, bufferlist& bl) {
374 add_writesame(CEPH_OSD_OP_WRITESAME, off, write_len, bl);
376 void append(bufferlist& bl) {
377 add_data(CEPH_OSD_OP_APPEND, 0, bl.length(), bl);
379 void zero(uint64_t off, uint64_t len) {
381 add_data(CEPH_OSD_OP_ZERO, off, len, bl);
383 void truncate(uint64_t off) {
385 add_data(CEPH_OSD_OP_TRUNCATE, off, 0, bl);
389 add_data(CEPH_OSD_OP_DELETE, 0, 0, bl);
391 void mapext(uint64_t off, uint64_t len) {
393 add_data(CEPH_OSD_OP_MAPEXT, off, len, bl);
395 void sparse_read(uint64_t off, uint64_t len) {
397 add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
400 void checksum(uint8_t type, const bufferlist &init_value_bl,
401 uint64_t off, uint64_t len, size_t chunk_size,
402 bufferlist *pbl, int *prval, Context *ctx) {
403 OSDOp& osd_op = add_op(CEPH_OSD_OP_CHECKSUM);
404 osd_op.op.checksum.offset = off;
405 osd_op.op.checksum.length = len;
406 osd_op.op.checksum.type = type;
407 osd_op.op.checksum.chunk_size = chunk_size;
408 osd_op.indata.append(init_value_bl);
410 unsigned p = ops.size() - 1;
413 out_handler[p] = ctx;
417 void getxattr(const char *name, bufferlist *pbl, int *prval) {
419 add_xattr(CEPH_OSD_OP_GETXATTR, name, bl);
420 unsigned p = ops.size() - 1;
424 struct C_ObjectOperation_decodevals : public Context {
425 uint64_t max_entries;
427 std::map<std::string,bufferlist> *pattrs;
430 C_ObjectOperation_decodevals(uint64_t m, std::map<std::string,bufferlist> *pa,
432 : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) {
437 void finish(int r) override {
439 bufferlist::iterator p = bl.begin();
442 ::decode(*pattrs, p);
444 std::map<std::string,bufferlist> ignore;
450 ::decode(*ptruncated, p);
452 // the OSD did not provide this. since old OSDs do not
453 // enfoce omap result limits either, we can infer it from
454 // the size of the result
455 *ptruncated = (pattrs->size() == max_entries);
459 catch (buffer::error& e) {
466 struct C_ObjectOperation_decodekeys : public Context {
467 uint64_t max_entries;
469 std::set<std::string> *pattrs;
472 C_ObjectOperation_decodekeys(uint64_t m, std::set<std::string> *pa, bool *pt,
474 : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) {
479 void finish(int r) override {
481 bufferlist::iterator p = bl.begin();
484 ::decode(*pattrs, p);
486 std::set<std::string> ignore;
492 ::decode(*ptruncated, p);
494 // the OSD did not provide this. since old OSDs do not
495 // enfoce omap result limits either, we can infer it from
496 // the size of the result
497 *ptruncated = (pattrs->size() == max_entries);
501 catch (buffer::error& e) {
508 struct C_ObjectOperation_decodewatchers : public Context {
510 list<obj_watch_t> *pwatchers;
512 C_ObjectOperation_decodewatchers(list<obj_watch_t> *pw, int *pr)
513 : pwatchers(pw), prval(pr) {}
514 void finish(int r) override {
516 bufferlist::iterator p = bl.begin();
518 obj_list_watch_response_t resp;
521 for (list<watch_item_t>::iterator i = resp.entries.begin() ;
522 i != resp.entries.end() ; ++i) {
526 strncpy(ow.addr, sa.str().c_str(), 256);
527 ow.watcher_id = i->name.num();
528 ow.cookie = i->cookie;
529 ow.timeout_seconds = i->timeout_seconds;
530 pwatchers->push_back(ow);
534 catch (buffer::error& e) {
541 struct C_ObjectOperation_decodesnaps : public Context {
543 librados::snap_set_t *psnaps;
545 C_ObjectOperation_decodesnaps(librados::snap_set_t *ps, int *pr)
546 : psnaps(ps), prval(pr) {}
547 void finish(int r) override {
549 bufferlist::iterator p = bl.begin();
551 obj_list_snap_response_t resp;
554 psnaps->clones.clear();
555 for (vector<clone_info>::iterator ci = resp.clones.begin();
556 ci != resp.clones.end();
558 librados::clone_info_t clone;
560 clone.cloneid = ci->cloneid;
561 clone.snaps.reserve(ci->snaps.size());
562 clone.snaps.insert(clone.snaps.end(), ci->snaps.begin(),
564 clone.overlap = ci->overlap;
565 clone.size = ci->size;
567 psnaps->clones.push_back(clone);
569 psnaps->seq = resp.seq;
571 } catch (buffer::error& e) {
578 void getxattrs(std::map<std::string,bufferlist> *pattrs, int *prval) {
579 add_op(CEPH_OSD_OP_GETXATTRS);
580 if (pattrs || prval) {
581 unsigned p = ops.size() - 1;
582 C_ObjectOperation_decodevals *h
583 = new C_ObjectOperation_decodevals(0, pattrs, nullptr, prval);
589 void setxattr(const char *name, const bufferlist& bl) {
590 add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
592 void setxattr(const char *name, const string& s) {
595 add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
597 void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode,
598 const bufferlist& bl) {
599 add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, bl);
601 void rmxattr(const char *name) {
603 add_xattr(CEPH_OSD_OP_RMXATTR, name, bl);
605 void setxattrs(map<string, bufferlist>& attrs) {
608 add_xattr(CEPH_OSD_OP_RESETXATTRS, 0, bl.length());
610 void resetxattrs(const char *prefix, map<string, bufferlist>& attrs) {
613 add_xattr(CEPH_OSD_OP_RESETXATTRS, prefix, bl);
617 void tmap_update(bufferlist& bl) {
618 add_data(CEPH_OSD_OP_TMAPUP, 0, 0, bl);
620 void tmap_put(bufferlist& bl) {
621 add_data(CEPH_OSD_OP_TMAPPUT, 0, bl.length(), bl);
623 void tmap_get(bufferlist *pbl, int *prval) {
624 add_op(CEPH_OSD_OP_TMAPGET);
625 unsigned p = ops.size() - 1;
630 add_op(CEPH_OSD_OP_TMAPGET);
632 void tmap_to_omap(bool nullok=false) {
633 OSDOp& osd_op = add_op(CEPH_OSD_OP_TMAP2OMAP);
635 osd_op.op.tmap2omap.flags = CEPH_OSD_TMAP2OMAP_NULLOK;
639 void omap_get_keys(const string &start_after,
641 std::set<std::string> *out_set,
644 OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS);
646 ::encode(start_after, bl);
647 ::encode(max_to_get, bl);
648 op.op.extent.offset = 0;
649 op.op.extent.length = bl.length();
650 op.indata.claim_append(bl);
651 if (prval || ptruncated || out_set) {
652 unsigned p = ops.size() - 1;
653 C_ObjectOperation_decodekeys *h =
654 new C_ObjectOperation_decodekeys(max_to_get, out_set, ptruncated, prval);
661 void omap_get_vals(const string &start_after,
662 const string &filter_prefix,
664 std::map<std::string, bufferlist> *out_set,
667 OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALS);
669 ::encode(start_after, bl);
670 ::encode(max_to_get, bl);
671 ::encode(filter_prefix, bl);
672 op.op.extent.offset = 0;
673 op.op.extent.length = bl.length();
674 op.indata.claim_append(bl);
675 if (prval || out_set || ptruncated) {
676 unsigned p = ops.size() - 1;
677 C_ObjectOperation_decodevals *h =
678 new C_ObjectOperation_decodevals(max_to_get, out_set, ptruncated, prval);
685 void omap_get_vals_by_keys(const std::set<std::string> &to_get,
686 std::map<std::string, bufferlist> *out_set,
688 OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS);
690 ::encode(to_get, bl);
691 op.op.extent.offset = 0;
692 op.op.extent.length = bl.length();
693 op.indata.claim_append(bl);
694 if (prval || out_set) {
695 unsigned p = ops.size() - 1;
696 C_ObjectOperation_decodevals *h =
697 new C_ObjectOperation_decodevals(0, out_set, nullptr, prval);
704 void omap_cmp(const std::map<std::string, pair<bufferlist,int> > &assertions,
706 OSDOp &op = add_op(CEPH_OSD_OP_OMAP_CMP);
708 ::encode(assertions, bl);
709 op.op.extent.offset = 0;
710 op.op.extent.length = bl.length();
711 op.indata.claim_append(bl);
713 unsigned p = ops.size() - 1;
718 struct C_ObjectOperation_copyget : public Context {
720 object_copy_cursor_t *cursor;
722 ceph::real_time *out_mtime;
723 std::map<std::string,bufferlist> *out_attrs;
724 bufferlist *out_data, *out_omap_header, *out_omap_data;
725 vector<snapid_t> *out_snaps;
726 snapid_t *out_snap_seq;
728 uint32_t *out_data_digest;
729 uint32_t *out_omap_digest;
730 mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *out_reqids;
731 uint64_t *out_truncate_seq;
732 uint64_t *out_truncate_size;
734 C_ObjectOperation_copyget(object_copy_cursor_t *c,
737 std::map<std::string,bufferlist> *a,
738 bufferlist *d, bufferlist *oh,
740 std::vector<snapid_t> *osnaps,
745 mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *oreqids,
750 out_size(s), out_mtime(m),
751 out_attrs(a), out_data(d), out_omap_header(oh),
752 out_omap_data(o), out_snaps(osnaps), out_snap_seq(osnap_seq),
753 out_flags(flags), out_data_digest(dd), out_omap_digest(od),
755 out_truncate_seq(otseq),
756 out_truncate_size(otsize),
758 void finish(int r) override {
759 // reqids are copied on ENOENT
760 if (r < 0 && r != -ENOENT)
763 bufferlist::iterator p = bl.begin();
764 object_copy_data_t copy_reply;
765 ::decode(copy_reply, p);
768 *out_reqids = copy_reply.reqids;
772 *out_size = copy_reply.size;
774 *out_mtime = ceph::real_clock::from_ceph_timespec(copy_reply.mtime);
776 *out_attrs = copy_reply.attrs;
778 out_data->claim_append(copy_reply.data);
780 out_omap_header->claim_append(copy_reply.omap_header);
782 *out_omap_data = copy_reply.omap_data;
784 *out_snaps = copy_reply.snaps;
786 *out_snap_seq = copy_reply.snap_seq;
788 *out_flags = copy_reply.flags;
790 *out_data_digest = copy_reply.data_digest;
792 *out_omap_digest = copy_reply.omap_digest;
794 *out_reqids = copy_reply.reqids;
795 if (out_truncate_seq)
796 *out_truncate_seq = copy_reply.truncate_seq;
797 if (out_truncate_size)
798 *out_truncate_size = copy_reply.truncate_size;
799 *cursor = copy_reply.cursor;
800 } catch (buffer::error& e) {
807 void copy_get(object_copy_cursor_t *cursor,
810 ceph::real_time *out_mtime,
811 std::map<std::string,bufferlist> *out_attrs,
812 bufferlist *out_data,
813 bufferlist *out_omap_header,
814 bufferlist *out_omap_data,
815 vector<snapid_t> *out_snaps,
816 snapid_t *out_snap_seq,
818 uint32_t *out_data_digest,
819 uint32_t *out_omap_digest,
820 mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *out_reqids,
821 uint64_t *truncate_seq,
822 uint64_t *truncate_size,
824 OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET);
825 osd_op.op.copy_get.max = max;
826 ::encode(*cursor, osd_op.indata);
827 ::encode(max, osd_op.indata);
828 unsigned p = ops.size() - 1;
830 C_ObjectOperation_copyget *h =
831 new C_ObjectOperation_copyget(cursor, out_size, out_mtime,
832 out_attrs, out_data, out_omap_header,
833 out_omap_data, out_snaps, out_snap_seq,
834 out_flags, out_data_digest,
835 out_omap_digest, out_reqids, truncate_seq,
836 truncate_size, prval);
842 add_op(CEPH_OSD_OP_UNDIRTY);
845 struct C_ObjectOperation_isdirty : public Context {
849 C_ObjectOperation_isdirty(bool *p, int *r)
850 : pisdirty(p), prval(r) {}
851 void finish(int r) override {
855 bufferlist::iterator p = bl.begin();
857 ::decode(isdirty, p);
860 } catch (buffer::error& e) {
867 void is_dirty(bool *pisdirty, int *prval) {
868 add_op(CEPH_OSD_OP_ISDIRTY);
869 unsigned p = ops.size() - 1;
871 C_ObjectOperation_isdirty *h =
872 new C_ObjectOperation_isdirty(pisdirty, prval);
877 struct C_ObjectOperation_hit_set_ls : public Context {
879 std::list< std::pair<time_t, time_t> > *ptls;
880 std::list< std::pair<ceph::real_time, ceph::real_time> > *putls;
882 C_ObjectOperation_hit_set_ls(std::list< std::pair<time_t, time_t> > *t,
883 std::list< std::pair<ceph::real_time,
884 ceph::real_time> > *ut,
886 : ptls(t), putls(ut), prval(r) {}
887 void finish(int r) override {
891 bufferlist::iterator p = bl.begin();
892 std::list< std::pair<ceph::real_time, ceph::real_time> > ls;
896 for (auto p = ls.begin(); p != ls.end(); ++p)
897 // round initial timestamp up to the next full second to
898 // keep this a valid interval.
900 make_pair(ceph::real_clock::to_time_t(
902 // Sadly, no time literals until C++14.
903 std::chrono::seconds(1))),
904 ceph::real_clock::to_time_t(p->second)));
908 } catch (buffer::error& e) {
917 * list available HitSets.
919 * We will get back a list of time intervals. Note that the most
920 * recent range may have an empty end timestamp if it is still
923 * @param pls [out] list of time intervals
924 * @param prval [out] return value
926 void hit_set_ls(std::list< std::pair<time_t, time_t> > *pls, int *prval) {
927 add_op(CEPH_OSD_OP_PG_HITSET_LS);
928 unsigned p = ops.size() - 1;
930 C_ObjectOperation_hit_set_ls *h =
931 new C_ObjectOperation_hit_set_ls(pls, NULL, prval);
935 void hit_set_ls(std::list<std::pair<ceph::real_time, ceph::real_time> > *pls,
937 add_op(CEPH_OSD_OP_PG_HITSET_LS);
938 unsigned p = ops.size() - 1;
940 C_ObjectOperation_hit_set_ls *h =
941 new C_ObjectOperation_hit_set_ls(NULL, pls, prval);
949 * Return an encoded HitSet that includes the provided time
952 * @param stamp [in] timestamp
953 * @param pbl [out] target buffer for encoded HitSet
954 * @param prval [out] return value
956 void hit_set_get(ceph::real_time stamp, bufferlist *pbl, int *prval) {
957 OSDOp& op = add_op(CEPH_OSD_OP_PG_HITSET_GET);
958 op.op.hit_set_get.stamp = ceph::real_clock::to_ceph_timespec(stamp);
959 unsigned p = ops.size() - 1;
964 void omap_get_header(bufferlist *bl, int *prval) {
965 add_op(CEPH_OSD_OP_OMAPGETHEADER);
966 unsigned p = ops.size() - 1;
971 void omap_set(const map<string, bufferlist> &map) {
974 add_data(CEPH_OSD_OP_OMAPSETVALS, 0, bl.length(), bl);
977 void omap_set_header(bufferlist &bl) {
978 add_data(CEPH_OSD_OP_OMAPSETHEADER, 0, bl.length(), bl);
982 add_op(CEPH_OSD_OP_OMAPCLEAR);
985 void omap_rm_keys(const std::set<std::string> &to_remove) {
987 ::encode(to_remove, bl);
988 add_data(CEPH_OSD_OP_OMAPRMKEYS, 0, bl.length(), bl);
992 void call(const char *cname, const char *method, bufferlist &indata) {
993 add_call(CEPH_OSD_OP_CALL, cname, method, indata, NULL, NULL, NULL);
996 void call(const char *cname, const char *method, bufferlist &indata,
997 bufferlist *outdata, Context *ctx, int *prval) {
998 add_call(CEPH_OSD_OP_CALL, cname, method, indata, outdata, ctx, prval);
1002 void watch(uint64_t cookie, __u8 op, uint32_t timeout = 0) {
1003 OSDOp& osd_op = add_op(CEPH_OSD_OP_WATCH);
1004 osd_op.op.watch.cookie = cookie;
1005 osd_op.op.watch.op = op;
1006 osd_op.op.watch.timeout = timeout;
1009 void notify(uint64_t cookie, uint32_t prot_ver, uint32_t timeout,
1010 bufferlist &bl, bufferlist *inbl) {
1011 OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY);
1012 osd_op.op.notify.cookie = cookie;
1013 ::encode(prot_ver, *inbl);
1014 ::encode(timeout, *inbl);
1015 ::encode(bl, *inbl);
1016 osd_op.indata.append(*inbl);
1019 void notify_ack(uint64_t notify_id, uint64_t cookie,
1020 bufferlist& reply_bl) {
1021 OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY_ACK);
1023 ::encode(notify_id, bl);
1024 ::encode(cookie, bl);
1025 ::encode(reply_bl, bl);
1026 osd_op.indata.append(bl);
1029 void list_watchers(list<obj_watch_t> *out,
1031 (void)add_op(CEPH_OSD_OP_LIST_WATCHERS);
1033 unsigned p = ops.size() - 1;
1034 C_ObjectOperation_decodewatchers *h =
1035 new C_ObjectOperation_decodewatchers(out, prval);
1038 out_rval[p] = prval;
1042 void list_snaps(librados::snap_set_t *out, int *prval) {
1043 (void)add_op(CEPH_OSD_OP_LIST_SNAPS);
1045 unsigned p = ops.size() - 1;
1046 C_ObjectOperation_decodesnaps *h =
1047 new C_ObjectOperation_decodesnaps(out, prval);
1050 out_rval[p] = prval;
1054 void assert_version(uint64_t ver) {
1055 OSDOp& osd_op = add_op(CEPH_OSD_OP_ASSERT_VER);
1056 osd_op.op.assert_ver.ver = ver;
1059 void cmpxattr(const char *name, const bufferlist& val,
1061 add_xattr(CEPH_OSD_OP_CMPXATTR, name, val);
1062 OSDOp& o = *ops.rbegin();
1063 o.op.xattr.cmp_op = op;
1064 o.op.xattr.cmp_mode = mode;
1067 void rollback(uint64_t snapid) {
1068 OSDOp& osd_op = add_op(CEPH_OSD_OP_ROLLBACK);
1069 osd_op.op.snap.snapid = snapid;
1072 void copy_from(object_t src, snapid_t snapid, object_locator_t src_oloc,
1073 version_t src_version, unsigned flags,
1074 unsigned src_fadvise_flags) {
1075 OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_FROM);
1076 osd_op.op.copy_from.snapid = snapid;
1077 osd_op.op.copy_from.src_version = src_version;
1078 osd_op.op.copy_from.flags = flags;
1079 osd_op.op.copy_from.src_fadvise_flags = src_fadvise_flags;
1080 ::encode(src, osd_op.indata);
1081 ::encode(src_oloc, osd_op.indata);
1085 * writeback content to backing tier
1087 * If object is marked dirty in the cache tier, write back content
1088 * to backing tier. If the object is clean this is a no-op.
1090 * If writeback races with an update, the update will block.
1092 * use with IGNORE_CACHE to avoid triggering promote.
1094 void cache_flush() {
1095 add_op(CEPH_OSD_OP_CACHE_FLUSH);
1099 * writeback content to backing tier
1101 * If object is marked dirty in the cache tier, write back content
1102 * to backing tier. If the object is clean this is a no-op.
1104 * If writeback races with an update, return EAGAIN. Requires that
1105 * the SKIPRWLOCKS flag be set.
1107 * use with IGNORE_CACHE to avoid triggering promote.
1109 void cache_try_flush() {
1110 add_op(CEPH_OSD_OP_CACHE_TRY_FLUSH);
1114 * evict object from cache tier
1116 * If object is marked clean, remove the object from the cache tier.
1117 * Otherwise, return EBUSY.
1119 * use with IGNORE_CACHE to avoid triggering promote.
1121 void cache_evict() {
1122 add_op(CEPH_OSD_OP_CACHE_EVICT);
1128 void set_redirect(object_t tgt, snapid_t snapid, object_locator_t tgt_oloc,
1129 version_t tgt_version) {
1130 OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_REDIRECT);
1131 osd_op.op.copy_from.snapid = snapid;
1132 osd_op.op.copy_from.src_version = tgt_version;
1133 ::encode(tgt, osd_op.indata);
1134 ::encode(tgt_oloc, osd_op.indata);
1137 void set_alloc_hint(uint64_t expected_object_size,
1138 uint64_t expected_write_size,
1140 add_alloc_hint(CEPH_OSD_OP_SETALLOCHINT, expected_object_size,
1141 expected_write_size, flags);
1143 // CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
1144 // not worth a feature bit. Set FAILOK per-op flag to make
1145 // sure older osds don't trip over an unsupported opcode.
1146 set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
1149 void dup(vector<OSDOp>& sops) {
1151 out_bl.resize(sops.size());
1152 out_handler.resize(sops.size());
1153 out_rval.resize(sops.size());
1154 for (uint32_t i = 0; i < sops.size(); i++) {
1155 out_bl[i] = &sops[i].outdata;
1156 out_handler[i] = NULL;
1157 out_rval[i] = &sops[i].rval;
1162 * Pin/unpin an object in cache tier
1165 add_op(CEPH_OSD_OP_CACHE_PIN);
1168 void cache_unpin() {
1169 add_op(CEPH_OSD_OP_CACHE_UNPIN);
1177 class Objecter : public md_config_obs_t, public Dispatcher {
1179 // config observer bits
1180 const char** get_tracked_conf_keys() const override;
1181 void handle_conf_change(const struct md_config_t *conf,
1182 const std::set <std::string> &changed) override;
1185 Messenger *messenger;
1188 ZTracer::Endpoint trace_endpoint;
1192 using Dispatcher::cct;
1193 std::multimap<string,string> crush_location;
1195 std::atomic<bool> initialized{false};
1198 std::atomic<uint64_t> last_tid{0};
1199 std::atomic<unsigned> inflight_ops{0};
1200 std::atomic<int> client_inc{-1};
1201 uint64_t max_linger_id;
1202 std::atomic<unsigned> num_in_flight{0};
1203 std::atomic<int> global_op_flags{0}; // flags which are applied to each IO op
1204 bool keep_balanced_budget;
1205 bool honor_osdmap_full;
1206 bool osdmap_full_try;
1208 // If this is true, accumulate a set of blacklisted entities
1209 // to be drained by consume_blacklist_events.
1210 bool blacklist_events_enabled;
1211 std::set<entity_addr_t> blacklist_events;
1214 void maybe_request_map();
1216 void enable_blacklist_events();
1219 void _maybe_request_map();
1221 version_t last_seen_osdmap_version;
1222 version_t last_seen_pgmap_version;
1224 mutable boost::shared_mutex rwlock;
1225 using lock_guard = std::unique_lock<decltype(rwlock)>;
1226 using unique_lock = std::unique_lock<decltype(rwlock)>;
1227 using shared_lock = boost::shared_lock<decltype(rwlock)>;
1228 using shunique_lock = ceph::shunique_lock<decltype(rwlock)>;
1229 ceph::timer<ceph::mono_clock> timer;
1231 PerfCounters *logger;
1233 uint64_t tick_event;
1237 void update_crush_location();
1239 class RequestStateHook;
1241 RequestStateHook *m_request_state_hook;
1244 /*** track pending operations ***/
1250 struct op_target_t {
1253 epoch_t epoch = 0; ///< latest epoch we calculated the mapping
1256 object_locator_t base_oloc;
1257 object_t target_oid;
1258 object_locator_t target_oloc;
1260 ///< true if we are directed at base_pgid, not base_oid
1261 bool precalc_pgid = false;
1263 ///< true if we have ever mapped to a valid pool
1264 bool pool_ever_existed = false;
1266 ///< explcit pg target, if any
1269 pg_t pgid; ///< last (raw) pg we mapped to
1270 spg_t actual_pgid; ///< last (actual) spg_t we mapped to
1271 unsigned pg_num = 0; ///< last pg_num we mapped to
1272 unsigned pg_num_mask = 0; ///< last pg_num_mask we mapped to
1273 vector<int> up; ///< set of up osds for last pg we mapped to
1274 vector<int> acting; ///< set of acting osds for last pg we mapped to
1275 int up_primary = -1; ///< last up_primary we mapped to
1276 int acting_primary = -1; ///< last acting_primary we mapped to
1277 int size = -1; ///< the size of the pool when were were last mapped
1278 int min_size = -1; ///< the min size of the pool when were were last mapped
1279 bool sort_bitwise = false; ///< whether the hobject_t sort order is bitwise
1280 bool recovery_deletes = false; ///< whether the deletes are performed during recovery instead of peering
1282 bool used_replica = false;
1283 bool paused = false;
1285 int osd = -1; ///< the final target osd, or -1
1287 epoch_t last_force_resend = 0;
1289 op_target_t(object_t oid, object_locator_t oloc, int flags)
1295 op_target_t(pg_t pgid)
1296 : base_oloc(pgid.pool(), pgid.ps()),
1301 op_target_t() = default;
1303 hobject_t get_hobj() {
1304 return hobject_t(target_oid,
1307 target_oloc.hash >= 0 ? target_oloc.hash : pgid.ps(),
1309 target_oloc.nspace);
1312 bool contained_by(const hobject_t& begin, const hobject_t& end) {
1313 hobject_t h = get_hobj();
1314 int r = cmp(h, begin);
1315 return r == 0 || (r > 0 && h < end);
1318 void dump(Formatter *f) const;
1321 struct Op : public RefCountedObject {
1322 OSDSession *session;
1327 ConnectionRef con; // for rx buffer only
1328 uint64_t features; // explicitly specified op features
1334 ceph::real_time mtime;
1337 vector<bufferlist*> out_bl;
1338 vector<Context*> out_handler;
1339 vector<int*> out_rval;
1349 epoch_t *reply_epoch;
1351 ceph::mono_time stamp;
1353 epoch_t map_dne_bound;
1357 /// true if we should resend this message on failure
1360 /// true if the throttle budget is get/put on a series of OPs,
1361 /// instead of per OP basis, when this flag is set, the budget is
1362 /// acquired before sending the very first OP of the series and
1363 /// released upon receiving the last OP reply.
1368 osd_reqid_t reqid; // explicitly setting reqid
1369 ZTracer::Trace trace;
1371 Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
1372 int f, Context *fin, version_t *ov, int *offset = NULL,
1373 ZTracer::Trace *parent_trace = nullptr) :
1374 session(NULL), incarnation(0),
1377 features(CEPH_FEATURES_SUPPORTED_DEFAULT),
1378 snapid(CEPH_NOSNAP),
1389 should_resend(true),
1390 ctx_budgeted(false),
1391 data_offset(offset) {
1394 /* initialize out_* to match op vector */
1395 out_bl.resize(ops.size());
1396 out_rval.resize(ops.size());
1397 out_handler.resize(ops.size());
1398 for (unsigned i = 0; i < ops.size(); i++) {
1400 out_handler[i] = NULL;
1404 if (target.base_oloc.key == o)
1405 target.base_oloc.key.clear();
1407 if (parent_trace && parent_trace->valid()) {
1408 trace.init("op", nullptr, parent_trace);
1409 trace.event("start");
1413 bool operator<(const Op& other) const {
1414 return tid < other.tid;
1417 bool respects_full() const {
1419 (target.flags & (CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_RWORDERED)) &&
1420 !(target.flags & (CEPH_OSD_FLAG_FULL_TRY | CEPH_OSD_FLAG_FULL_FORCE));
1425 while (!out_handler.empty()) {
1426 delete out_handler.back();
1427 out_handler.pop_back();
1429 trace.event("finish");
1433 struct C_Op_Map_Latest : public Context {
1437 C_Op_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t),
1439 void finish(int r) override;
1442 struct C_Command_Map_Latest : public Context {
1446 C_Command_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t),
1448 void finish(int r) override;
1451 struct C_Stat : public Context {
1454 ceph::real_time *pmtime;
1456 C_Stat(uint64_t *ps, ceph::real_time *pm, Context *c) :
1457 psize(ps), pmtime(pm), fin(c) {}
1458 void finish(int r) override {
1460 bufferlist::iterator p = bl.begin();
1474 struct C_GetAttrs : public Context {
1476 map<string,bufferlist>& attrset;
1478 C_GetAttrs(map<string, bufferlist>& set, Context *c) : attrset(set),
1480 void finish(int r) override {
1482 bufferlist::iterator p = bl.begin();
1483 ::decode(attrset, p);
1490 // Pools and statistics
1491 struct NListContext {
1492 collection_list_handle_t pos;
1494 // these are for !sortbitwise compat only
1496 int starting_pg_num = 0;
1497 bool sort_bitwise = false;
1499 bool at_end_of_pool = false; ///< publicly visible end flag
1501 int64_t pool_id = -1;
1502 int pool_snap_seq = 0;
1503 uint64_t max_entries = 0;
1506 bufferlist bl; // raw data read to here
1507 std::list<librados::ListObjectImpl> list;
1511 bufferlist extra_info;
1513 // The budget associated with this context, once it is set (>= 0),
1514 // the budget is not get/released on OP basis, instead the budget
1515 // is acquired before sending the first OP and released upon receiving
1516 // the last op reply.
1517 int ctx_budget = -1;
1519 bool at_end() const {
1520 return at_end_of_pool;
1523 uint32_t get_pg_hash_position() const {
1524 return pos.get_hash();
1528 struct C_NList : public Context {
1529 NListContext *list_context;
1530 Context *final_finish;
1533 C_NList(NListContext *lc, Context * finish, Objecter *ob) :
1534 list_context(lc), final_finish(finish), objecter(ob), epoch(0) {}
1535 void finish(int r) override {
1537 objecter->_nlist_reply(list_context, r, final_finish, epoch);
1539 final_finish->complete(r);
1548 map<string,pool_stat_t> *pool_stats;
1552 ceph::mono_time last_submit;
1557 struct ceph_statfs *stats;
1558 boost::optional<int64_t> data_pool;
1562 ceph::mono_time last_submit;
1577 ceph::mono_time last_submit;
1578 PoolOp() : tid(0), pool(0), onfinish(NULL), ontimeout(0), pool_op(0),
1579 auid(0), crush_rule(0), snapid(0), blp(NULL) {}
1582 // -- osd commands --
1583 struct CommandOp : public RefCountedObject {
1584 OSDSession *session = nullptr;
1588 bufferlist *poutbl = nullptr;
1589 string *prs = nullptr;
1591 // target_osd == -1 means target_pg is valid
1592 const int target_osd = -1;
1593 const pg_t target_pg;
1597 epoch_t map_dne_bound = 0;
1598 int map_check_error = 0; // error to return if map check fails
1599 const char *map_check_error_str = nullptr;
1601 Context *onfinish = nullptr;
1602 uint64_t ontimeout = 0;
1603 ceph::mono_time last_submit;
1607 const vector<string> &cmd,
1616 target_osd(target_osd),
1617 onfinish(onfinish) {}
1621 const vector<string> &cmd,
1632 onfinish(onfinish) {}
1636 void submit_command(CommandOp *c, ceph_tid_t *ptid);
1637 int _calc_command_target(CommandOp *c, shunique_lock &sul);
1638 void _assign_command_session(CommandOp *c, shunique_lock &sul);
1639 void _send_command(CommandOp *c);
1640 int command_op_cancel(OSDSession *s, ceph_tid_t tid, int r);
1641 void _finish_command(CommandOp *c, int r, string rs);
1642 void handle_command_reply(MCommandReply *m);
1645 // -- lingering ops --
1647 struct WatchContext {
1648 // this simply mirrors librados WatchCtx2
1649 virtual void handle_notify(uint64_t notify_id,
1651 uint64_t notifier_id,
1652 bufferlist& bl) = 0;
1653 virtual void handle_error(uint64_t cookie, int err) = 0;
1654 virtual ~WatchContext() {}
1657 struct LingerOp : public RefCountedObject {
1664 ceph::real_time mtime;
1672 ceph::mono_time watch_valid_thru; ///< send time for last acked ping
1673 int last_error; ///< error from last failed ping|reconnect, if any
1674 boost::shared_mutex watch_lock;
1675 using lock_guard = std::unique_lock<decltype(watch_lock)>;
1676 using unique_lock = std::unique_lock<decltype(watch_lock)>;
1677 using shared_lock = boost::shared_lock<decltype(watch_lock)>;
1678 using shunique_lock = ceph::shunique_lock<decltype(watch_lock)>;
1680 // queue of pending async operations, with the timestamp of
1681 // when they were queued.
1682 list<ceph::mono_time> watch_pending_async;
1684 uint32_t register_gen;
1687 Context *on_reg_commit;
1689 // we trigger these from an async finisher
1690 Context *on_notify_finish;
1691 bufferlist *notify_result_bl;
1694 WatchContext *watch_context;
1696 OSDSession *session;
1698 ceph_tid_t register_tid;
1699 ceph_tid_t ping_tid;
1700 epoch_t map_dne_bound;
1702 void _queued_async() {
1703 // watch_lock ust be locked unique
1704 watch_pending_async.push_back(ceph::mono_clock::now());
1706 void finished_async() {
1707 unique_lock l(watch_lock);
1708 assert(!watch_pending_async.empty());
1709 watch_pending_async.pop_front();
1712 LingerOp() : linger_id(0),
1713 target(object_t(), object_locator_t(), 0),
1714 snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL),
1715 is_watch(false), last_error(0),
1719 on_reg_commit(NULL),
1720 on_notify_finish(NULL),
1721 notify_result_bl(NULL),
1723 watch_context(NULL),
1730 const LingerOp &operator=(const LingerOp& r);
1731 LingerOp(const LingerOp& o);
1733 uint64_t get_cookie() {
1734 return reinterpret_cast<uint64_t>(this);
1738 ~LingerOp() override {
1739 delete watch_context;
1743 struct C_Linger_Commit : public Context {
1746 bufferlist outbl; // used for notify only
1747 C_Linger_Commit(Objecter *o, LingerOp *l) : objecter(o), info(l) {
1750 ~C_Linger_Commit() override {
1753 void finish(int r) override {
1754 objecter->_linger_commit(info, r, outbl);
1758 struct C_Linger_Reconnect : public Context {
1761 C_Linger_Reconnect(Objecter *o, LingerOp *l) : objecter(o), info(l) {
1764 ~C_Linger_Reconnect() override {
1767 void finish(int r) override {
1768 objecter->_linger_reconnect(info, r);
1772 struct C_Linger_Ping : public Context {
1775 ceph::mono_time sent;
1776 uint32_t register_gen;
1777 C_Linger_Ping(Objecter *o, LingerOp *l)
1778 : objecter(o), info(l), register_gen(info->register_gen) {
1781 ~C_Linger_Ping() override {
1784 void finish(int r) override {
1785 objecter->_linger_ping(info, r, sent, register_gen);
1789 struct C_Linger_Map_Latest : public Context {
1793 C_Linger_Map_Latest(Objecter *o, uint64_t id) :
1794 objecter(o), linger_id(id), latest(0) {}
1795 void finish(int r) override;
1798 // -- osd sessions --
1802 hobject_t begin, end;
1805 struct OSDSession : public RefCountedObject {
1806 boost::shared_mutex lock;
1807 using lock_guard = std::lock_guard<decltype(lock)>;
1808 using unique_lock = std::unique_lock<decltype(lock)>;
1809 using shared_lock = boost::shared_lock<decltype(lock)>;
1810 using shunique_lock = ceph::shunique_lock<decltype(lock)>;
1813 map<ceph_tid_t,Op*> ops;
1814 map<uint64_t, LingerOp*> linger_ops;
1815 map<ceph_tid_t,CommandOp*> command_ops;
1818 map<spg_t,map<hobject_t,OSDBackoff>> backoffs;
1819 map<uint64_t,OSDBackoff*> backoffs_by_id;
1825 std::unique_ptr<std::mutex[]> completion_locks;
1826 using unique_completion_lock = std::unique_lock<
1827 decltype(completion_locks)::element_type>;
1830 OSDSession(CephContext *cct, int o) :
1831 osd(o), incarnation(0), con(NULL),
1832 num_locks(cct->_conf->objecter_completion_locks_per_session),
1833 completion_locks(new std::mutex[num_locks]) {}
1835 ~OSDSession() override;
1837 bool is_homeless() { return (osd == -1); }
1839 unique_completion_lock get_lock(object_t& oid);
1841 map<int,OSDSession*> osd_sessions;
1843 bool osdmap_full_flag() const;
1844 bool osdmap_pool_full(const int64_t pool_id) const;
1849 * Test pg_pool_t::FLAG_FULL on a pool
1851 * @return true if the pool exists and has the flag set, or
1852 * the global full flag is set, else false
1854 bool _osdmap_pool_full(const int64_t pool_id) const;
1855 bool _osdmap_pool_full(const pg_pool_t &p) const;
1856 void update_pool_full_map(map<int64_t, bool>& pool_full_map);
1858 map<uint64_t, LingerOp*> linger_ops;
1859 // we use this just to confirm a cookie is valid before dereferencing the ptr
1860 set<LingerOp*> linger_ops_set;
1862 map<ceph_tid_t,PoolStatOp*> poolstat_ops;
1863 map<ceph_tid_t,StatfsOp*> statfs_ops;
1864 map<ceph_tid_t,PoolOp*> pool_ops;
1865 std::atomic<unsigned> num_homeless_ops{0};
1867 OSDSession *homeless_session;
1869 // ops waiting for an osdmap with a new pool or confirmation that
1870 // the pool does not exist (may be expanded to other uses later)
1871 map<uint64_t, LingerOp*> check_latest_map_lingers;
1872 map<ceph_tid_t, Op*> check_latest_map_ops;
1873 map<ceph_tid_t, CommandOp*> check_latest_map_commands;
1875 map<epoch_t,list< pair<Context*, int> > > waiting_for_map;
1877 ceph::timespan mon_timeout;
1878 ceph::timespan osd_timeout;
1880 MOSDOp *_prepare_osd_op(Op *op);
1881 void _send_op(Op *op, MOSDOp *m = NULL);
1882 void _send_op_account(Op *op);
1883 void _cancel_linger_op(Op *op);
1884 void finish_op(OSDSession *session, ceph_tid_t tid);
1885 void _finish_op(Op *op, int r);
1886 static bool is_pg_changed(
1888 const vector<int>& oldacting,
1890 const vector<int>& newacting,
1891 bool any_change=false);
1892 enum recalc_op_target_result {
1893 RECALC_OP_TARGET_NO_ACTION = 0,
1894 RECALC_OP_TARGET_NEED_RESEND,
1895 RECALC_OP_TARGET_POOL_DNE,
1896 RECALC_OP_TARGET_OSD_DNE,
1897 RECALC_OP_TARGET_OSD_DOWN,
1899 bool _osdmap_full_flag() const;
1900 bool _osdmap_has_pool_full() const;
1902 bool target_should_be_paused(op_target_t *op);
1903 int _calc_target(op_target_t *t, Connection *con,
1904 bool any_change = false);
1905 int _map_session(op_target_t *op, OSDSession **s,
1908 void _session_op_assign(OSDSession *s, Op *op);
1909 void _session_op_remove(OSDSession *s, Op *op);
1910 void _session_linger_op_assign(OSDSession *to, LingerOp *op);
1911 void _session_linger_op_remove(OSDSession *from, LingerOp *op);
1912 void _session_command_op_assign(OSDSession *to, CommandOp *op);
1913 void _session_command_op_remove(OSDSession *from, CommandOp *op);
1915 int _assign_op_target_session(Op *op, shunique_lock& lc,
1916 bool src_session_locked,
1917 bool dst_session_locked);
1918 int _recalc_linger_op_target(LingerOp *op, shunique_lock& lc);
1920 void _linger_submit(LingerOp *info, shunique_lock& sul);
1921 void _send_linger(LingerOp *info, shunique_lock& sul);
1922 void _linger_commit(LingerOp *info, int r, bufferlist& outbl);
1923 void _linger_reconnect(LingerOp *info, int r);
1924 void _send_linger_ping(LingerOp *info);
1925 void _linger_ping(LingerOp *info, int r, ceph::mono_time sent,
1926 uint32_t register_gen);
1927 int _normalize_watch_error(int r);
1929 friend class C_DoWatchError;
1931 void linger_callback_flush(Context *ctx) {
1932 finisher->queue(ctx);
1936 void _check_op_pool_dne(Op *op, unique_lock *sl);
1937 void _send_op_map_check(Op *op);
1938 void _op_cancel_map_check(Op *op);
1939 void _check_linger_pool_dne(LingerOp *op, bool *need_unregister);
1940 void _send_linger_map_check(LingerOp *op);
1941 void _linger_cancel_map_check(LingerOp *op);
1942 void _check_command_map_dne(CommandOp *op);
1943 void _send_command_map_check(CommandOp *op);
1944 void _command_cancel_map_check(CommandOp *op);
1946 void kick_requests(OSDSession *session);
1947 void _kick_requests(OSDSession *session, map<uint64_t, LingerOp *>& lresend);
1948 void _linger_ops_resend(map<uint64_t, LingerOp *>& lresend, unique_lock& ul);
1950 int _get_session(int osd, OSDSession **session, shunique_lock& sul);
1951 void put_session(OSDSession *s);
1952 void get_session(OSDSession *s);
1953 void _reopen_session(OSDSession *session);
1954 void close_session(OSDSession *session);
1956 void _nlist_reply(NListContext *list_context, int r, Context *final_finish,
1957 epoch_t reply_epoch);
1959 void resend_mon_ops();
1962 * handle a budget for in-flight ops
1963 * budget is taken whenever an op goes into the ops map
1964 * and returned whenever an op is removed from the map
1965 * If throttle_op needs to throttle it will unlock client_lock.
1967 int calc_op_budget(Op *op);
1968 void _throttle_op(Op *op, shunique_lock& sul, int op_size = 0);
1969 int _take_op_budget(Op *op, shunique_lock& sul) {
1970 assert(sul && sul.mutex() == &rwlock);
1971 int op_budget = calc_op_budget(op);
1972 if (keep_balanced_budget) {
1973 _throttle_op(op, sul, op_budget);
1975 op_throttle_bytes.take(op_budget);
1976 op_throttle_ops.take(1);
1978 op->budgeted = true;
1981 void put_op_budget_bytes(int op_budget) {
1982 assert(op_budget >= 0);
1983 op_throttle_bytes.put(op_budget);
1984 op_throttle_ops.put(1);
1986 void put_op_budget(Op *op) {
1987 assert(op->budgeted);
1988 int op_budget = calc_op_budget(op);
1989 put_op_budget_bytes(op_budget);
1991 void put_nlist_context_budget(NListContext *list_context);
1992 Throttle op_throttle_bytes, op_throttle_ops;
1995 Objecter(CephContext *cct_, Messenger *m, MonClient *mc,
1998 double osd_timeout) :
1999 Dispatcher(cct_), messenger(m), monc(mc), finisher(fin),
2000 trace_endpoint("0.0.0.0", 0, "Objecter"),
2003 keep_balanced_budget(false), honor_osdmap_full(true), osdmap_full_try(false),
2004 blacklist_events_enabled(false),
2005 last_seen_osdmap_version(0), last_seen_pgmap_version(0),
2006 logger(NULL), tick_event(0), m_request_state_hook(NULL),
2007 homeless_session(new OSDSession(cct, -1)),
2008 mon_timeout(ceph::make_timespan(mon_timeout)),
2009 osd_timeout(ceph::make_timespan(osd_timeout)),
2010 op_throttle_bytes(cct, "objecter_bytes",
2011 cct->_conf->objecter_inflight_op_bytes),
2012 op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops),
2014 retry_writes_after_first_reply(cct->_conf->objecter_retry_writes_after_first_reply)
2016 ~Objecter() override;
2019 void start(const OSDMap *o = nullptr);
2022 // These two templates replace osdmap_(get)|(put)_read. Simply wrap
2023 // whatever functionality you want to use the OSDMap in a lambda like:
2025 // with_osdmap([](const OSDMap& o) { o.do_stuff(); });
2029 // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); });
2031 // Do not call into something that will try to lock the OSDMap from
2032 // here or you will have great woe and misery.
2034 template<typename Callback, typename...Args>
2035 auto with_osdmap(Callback&& cb, Args&&... args) const ->
2036 decltype(cb(*osdmap, std::forward<Args>(args)...)) {
2037 shared_lock l(rwlock);
2038 return std::forward<Callback>(cb)(*osdmap, std::forward<Args>(args)...);
2043 * Tell the objecter to throttle outgoing ops according to its
2044 * budget (in _conf). If you do this, ops can block, in
2045 * which case it will unlock client_lock and sleep until
2046 * incoming messages reduce the used budget low enough for
2047 * the ops to continue going; then it will lock client_lock again.
2049 void set_balanced_budget() { keep_balanced_budget = true; }
2050 void unset_balanced_budget() { keep_balanced_budget = false; }
2052 void set_honor_osdmap_full() { honor_osdmap_full = true; }
2053 void unset_honor_osdmap_full() { honor_osdmap_full = false; }
2055 void set_osdmap_full_try() { osdmap_full_try = true; }
2056 void unset_osdmap_full_try() { osdmap_full_try = false; }
2058 void _scan_requests(OSDSession *s,
2061 map<int64_t, bool> *pool_full_map,
2062 map<ceph_tid_t, Op*>& need_resend,
2063 list<LingerOp*>& need_resend_linger,
2064 map<ceph_tid_t, CommandOp*>& need_resend_command,
2065 shunique_lock& sul);
2067 int64_t get_object_hash_position(int64_t pool, const string& key,
2069 int64_t get_object_pg_hash_position(int64_t pool, const string& key,
2074 bool ms_dispatch(Message *m) override;
2075 bool ms_can_fast_dispatch_any() const override {
2078 bool ms_can_fast_dispatch(const Message *m) const override {
2079 switch (m->get_type()) {
2080 case CEPH_MSG_OSD_OPREPLY:
2081 case CEPH_MSG_WATCH_NOTIFY:
2087 void ms_fast_dispatch(Message *m) override {
2088 if (!ms_dispatch(m)) {
2093 void handle_osd_op_reply(class MOSDOpReply *m);
2094 void handle_osd_backoff(class MOSDBackoff *m);
2095 void handle_watch_notify(class MWatchNotify *m);
2096 void handle_osd_map(class MOSDMap *m);
2097 void wait_for_osd_map();
2100 * Get list of entities blacklisted since this was last called,
2101 * and reset the list.
2103 * Uses a std::set because typical use case is to compare some
2104 * other list of clients to see which overlap with the blacklisted
2108 void consume_blacklist_events(std::set<entity_addr_t> *events);
2110 int pool_snap_by_name(int64_t poolid,
2111 const char *snap_name,
2112 snapid_t *snap) const;
2113 int pool_snap_get_info(int64_t poolid, snapid_t snap,
2114 pool_snap_info_t *info) const;
2115 int pool_snap_list(int64_t poolid, vector<uint64_t> *snaps);
2118 void emit_blacklist_events(const OSDMap::Incremental &inc);
2119 void emit_blacklist_events(const OSDMap &old_osd_map,
2120 const OSDMap &new_osd_map);
2123 void _op_submit(Op *op, shunique_lock& lc, ceph_tid_t *ptid);
2124 void _op_submit_with_budget(Op *op, shunique_lock& lc,
2126 int *ctx_budget = NULL);
2127 inline void unregister_op(Op *op);
2131 void op_submit(Op *op, ceph_tid_t *ptid = NULL, int *ctx_budget = NULL);
2133 shared_lock l(rwlock);
2134 return !((!inflight_ops) && linger_ops.empty() &&
2135 poolstat_ops.empty() && statfs_ops.empty());
2139 * Output in-flight requests
2141 void _dump_active(OSDSession *s);
2142 void _dump_active();
2144 void dump_requests(Formatter *fmt);
2145 void _dump_ops(const OSDSession *s, Formatter *fmt);
2146 void dump_ops(Formatter *fmt);
2147 void _dump_linger_ops(const OSDSession *s, Formatter *fmt);
2148 void dump_linger_ops(Formatter *fmt);
2149 void _dump_command_ops(const OSDSession *s, Formatter *fmt);
2150 void dump_command_ops(Formatter *fmt);
2151 void dump_pool_ops(Formatter *fmt) const;
2152 void dump_pool_stat_ops(Formatter *fmt) const;
2153 void dump_statfs_ops(Formatter *fmt) const;
2155 int get_client_incarnation() const { return client_inc; }
2156 void set_client_incarnation(int inc) { client_inc = inc; }
2158 bool have_map(epoch_t epoch);
2159 /// wait for epoch; true if we already have it
2160 bool wait_for_map(epoch_t epoch, Context *c, int err=0);
2161 void _wait_for_new_map(Context *c, epoch_t epoch, int err=0);
2162 void wait_for_latest_osdmap(Context *fin);
2163 void get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin);
2164 void _get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin);
2166 /** Get the current set of global op flags */
2167 int get_global_op_flags() const { return global_op_flags; }
2168 /** Add a flag to the global op flags, not really atomic operation */
2169 void add_global_op_flags(int flag) {
2170 global_op_flags.fetch_or(flag);
2172 /** Clear the passed flags from the global op flag set */
2173 void clear_global_op_flag(int flags) {
2174 global_op_flags.fetch_and(~flags);
2177 /// cancel an in-progress request with the given return code
2179 int op_cancel(OSDSession *s, ceph_tid_t tid, int r);
2180 int _op_cancel(ceph_tid_t tid, int r);
2182 int op_cancel(ceph_tid_t tid, int r);
2185 * Any write op which is in progress at the start of this call shall no
2186 * longer be in progress when this call ends. Operations started after the
2187 * start of this call may still be in progress when this call ends.
2189 * @return the latest possible epoch in which a cancelled op could have
2190 * existed, or -1 if nothing was cancelled.
2192 epoch_t op_cancel_writes(int r, int64_t pool=-1);
2195 void osd_command(int osd, const std::vector<string>& cmd,
2196 const bufferlist& inbl, ceph_tid_t *ptid,
2197 bufferlist *poutbl, string *prs, Context *onfinish) {
2199 CommandOp *c = new CommandOp(
2206 submit_command(c, ptid);
2208 void pg_command(pg_t pgid, const vector<string>& cmd,
2209 const bufferlist& inbl, ceph_tid_t *ptid,
2210 bufferlist *poutbl, string *prs, Context *onfinish) {
2211 CommandOp *c = new CommandOp(
2218 submit_command(c, ptid);
2221 // mid-level helpers
2222 Op *prepare_mutate_op(
2223 const object_t& oid, const object_locator_t& oloc,
2224 ObjectOperation& op, const SnapContext& snapc,
2225 ceph::real_time mtime, int flags,
2226 Context *oncommit, version_t *objver = NULL,
2227 osd_reqid_t reqid = osd_reqid_t(),
2228 ZTracer::Trace *parent_trace = nullptr) {
2229 Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags |
2230 CEPH_OSD_FLAG_WRITE, oncommit, objver, nullptr, parent_trace);
2231 o->priority = op.priority;
2234 o->out_rval.swap(op.out_rval);
2239 const object_t& oid, const object_locator_t& oloc,
2240 ObjectOperation& op, const SnapContext& snapc,
2241 ceph::real_time mtime, int flags,
2242 Context *oncommit, version_t *objver = NULL,
2243 osd_reqid_t reqid = osd_reqid_t()) {
2244 Op *o = prepare_mutate_op(oid, oloc, op, snapc, mtime, flags,
2245 oncommit, objver, reqid);
2250 Op *prepare_read_op(
2251 const object_t& oid, const object_locator_t& oloc,
2252 ObjectOperation& op,
2253 snapid_t snapid, bufferlist *pbl, int flags,
2254 Context *onack, version_t *objver = NULL,
2255 int *data_offset = NULL,
2256 uint64_t features = 0,
2257 ZTracer::Trace *parent_trace = nullptr) {
2258 Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags |
2259 CEPH_OSD_FLAG_READ, onack, objver, data_offset, parent_trace);
2260 o->priority = op.priority;
2263 if (!o->outbl && op.size() == 1 && op.out_bl[0]->length())
2264 o->outbl = op.out_bl[0];
2265 o->out_bl.swap(op.out_bl);
2266 o->out_handler.swap(op.out_handler);
2267 o->out_rval.swap(op.out_rval);
2271 const object_t& oid, const object_locator_t& oloc,
2272 ObjectOperation& op,
2273 snapid_t snapid, bufferlist *pbl, int flags,
2274 Context *onack, version_t *objver = NULL,
2275 int *data_offset = NULL,
2276 uint64_t features = 0) {
2277 Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver,
2280 o->features = features;
2285 Op *prepare_pg_read_op(
2286 uint32_t hash, object_locator_t oloc,
2287 ObjectOperation& op, bufferlist *pbl, int flags,
2288 Context *onack, epoch_t *reply_epoch,
2290 Op *o = new Op(object_t(), oloc,
2292 flags | global_op_flags | CEPH_OSD_FLAG_READ |
2293 CEPH_OSD_FLAG_IGNORE_OVERLAY,
2295 o->target.precalc_pgid = true;
2296 o->target.base_pgid = pg_t(hash, oloc.pool);
2297 o->priority = op.priority;
2298 o->snapid = CEPH_NOSNAP;
2300 o->out_bl.swap(op.out_bl);
2301 o->out_handler.swap(op.out_handler);
2302 o->out_rval.swap(op.out_rval);
2303 o->reply_epoch = reply_epoch;
2305 // budget is tracked by listing context
2306 o->ctx_budgeted = true;
2311 uint32_t hash, object_locator_t oloc,
2312 ObjectOperation& op, bufferlist *pbl, int flags,
2313 Context *onack, epoch_t *reply_epoch,
2315 Op *o = prepare_pg_read_op(hash, oloc, op, pbl, flags,
2316 onack, reply_epoch, ctx_budget);
2318 op_submit(o, &tid, ctx_budget);
2322 // caller owns a ref
2323 LingerOp *linger_register(const object_t& oid, const object_locator_t& oloc,
2325 ceph_tid_t linger_watch(LingerOp *info,
2326 ObjectOperation& op,
2327 const SnapContext& snapc, ceph::real_time mtime,
2331 ceph_tid_t linger_notify(LingerOp *info,
2332 ObjectOperation& op,
2333 snapid_t snap, bufferlist& inbl,
2337 int linger_check(LingerOp *info);
2338 void linger_cancel(LingerOp *info); // releases a reference
2339 void _linger_cancel(LingerOp *info);
2341 void _do_watch_notify(LingerOp *info, MWatchNotify *m);
2344 * set up initial ops in the op vector, and allocate a final op slot.
2346 * The caller is responsible for filling in the final ops_count ops.
2348 * @param ops op vector
2349 * @param ops_count number of final ops the caller will fill in
2350 * @param extra_ops pointer to [array of] initial op[s]
2351 * @return index of final op (for caller to fill in)
2353 int init_ops(vector<OSDOp>& ops, int ops_count, ObjectOperation *extra_ops) {
2358 extra = extra_ops->ops.size();
2360 ops.resize(ops_count + extra);
2362 for (i=0; i<extra; i++) {
2363 ops[i] = extra_ops->ops[i];
2370 // high-level helpers
2371 Op *prepare_stat_op(
2372 const object_t& oid, const object_locator_t& oloc,
2373 snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
2374 int flags, Context *onfinish, version_t *objver = NULL,
2375 ObjectOperation *extra_ops = NULL) {
2377 int i = init_ops(ops, 1, extra_ops);
2378 ops[i].op.op = CEPH_OSD_OP_STAT;
2379 C_Stat *fin = new C_Stat(psize, pmtime, onfinish);
2380 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2381 CEPH_OSD_FLAG_READ, fin, objver);
2383 o->outbl = &fin->bl;
2387 const object_t& oid, const object_locator_t& oloc,
2388 snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
2389 int flags, Context *onfinish, version_t *objver = NULL,
2390 ObjectOperation *extra_ops = NULL) {
2391 Op *o = prepare_stat_op(oid, oloc, snap, psize, pmtime, flags,
2392 onfinish, objver, extra_ops);
2398 Op *prepare_read_op(
2399 const object_t& oid, const object_locator_t& oloc,
2400 uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl,
2401 int flags, Context *onfinish, version_t *objver = NULL,
2402 ObjectOperation *extra_ops = NULL, int op_flags = 0,
2403 ZTracer::Trace *parent_trace = nullptr) {
2405 int i = init_ops(ops, 1, extra_ops);
2406 ops[i].op.op = CEPH_OSD_OP_READ;
2407 ops[i].op.extent.offset = off;
2408 ops[i].op.extent.length = len;
2409 ops[i].op.extent.truncate_size = 0;
2410 ops[i].op.extent.truncate_seq = 0;
2411 ops[i].op.flags = op_flags;
2412 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2413 CEPH_OSD_FLAG_READ, onfinish, objver, nullptr, parent_trace);
2419 const object_t& oid, const object_locator_t& oloc,
2420 uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl,
2421 int flags, Context *onfinish, version_t *objver = NULL,
2422 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2423 Op *o = prepare_read_op(oid, oloc, off, len, snap, pbl, flags,
2424 onfinish, objver, extra_ops, op_flags);
2430 Op *prepare_cmpext_op(
2431 const object_t& oid, const object_locator_t& oloc,
2432 uint64_t off, bufferlist &cmp_bl,
2433 snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
2434 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2436 int i = init_ops(ops, 1, extra_ops);
2437 ops[i].op.op = CEPH_OSD_OP_CMPEXT;
2438 ops[i].op.extent.offset = off;
2439 ops[i].op.extent.length = cmp_bl.length();
2440 ops[i].op.extent.truncate_size = 0;
2441 ops[i].op.extent.truncate_seq = 0;
2442 ops[i].indata = cmp_bl;
2443 ops[i].op.flags = op_flags;
2444 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2445 CEPH_OSD_FLAG_READ, onfinish, objver);
2451 const object_t& oid, const object_locator_t& oloc,
2452 uint64_t off, bufferlist &cmp_bl,
2453 snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
2454 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2455 Op *o = prepare_cmpext_op(oid, oloc, off, cmp_bl, snap,
2456 flags, onfinish, objver, extra_ops, op_flags);
2462 ceph_tid_t read_trunc(const object_t& oid, const object_locator_t& oloc,
2463 uint64_t off, uint64_t len, snapid_t snap,
2464 bufferlist *pbl, int flags, uint64_t trunc_size,
2465 __u32 trunc_seq, Context *onfinish,
2466 version_t *objver = NULL,
2467 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2469 int i = init_ops(ops, 1, extra_ops);
2470 ops[i].op.op = CEPH_OSD_OP_READ;
2471 ops[i].op.extent.offset = off;
2472 ops[i].op.extent.length = len;
2473 ops[i].op.extent.truncate_size = trunc_size;
2474 ops[i].op.extent.truncate_seq = trunc_seq;
2475 ops[i].op.flags = op_flags;
2476 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2477 CEPH_OSD_FLAG_READ, onfinish, objver);
2484 ceph_tid_t mapext(const object_t& oid, const object_locator_t& oloc,
2485 uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl,
2486 int flags, Context *onfinish, version_t *objver = NULL,
2487 ObjectOperation *extra_ops = NULL) {
2489 int i = init_ops(ops, 1, extra_ops);
2490 ops[i].op.op = CEPH_OSD_OP_MAPEXT;
2491 ops[i].op.extent.offset = off;
2492 ops[i].op.extent.length = len;
2493 ops[i].op.extent.truncate_size = 0;
2494 ops[i].op.extent.truncate_seq = 0;
2495 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2496 CEPH_OSD_FLAG_READ, onfinish, objver);
2503 ceph_tid_t getxattr(const object_t& oid, const object_locator_t& oloc,
2504 const char *name, snapid_t snap, bufferlist *pbl, int flags,
2506 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2508 int i = init_ops(ops, 1, extra_ops);
2509 ops[i].op.op = CEPH_OSD_OP_GETXATTR;
2510 ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2511 ops[i].op.xattr.value_len = 0;
2513 ops[i].indata.append(name);
2514 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2515 CEPH_OSD_FLAG_READ, onfinish, objver);
2523 ceph_tid_t getxattrs(const object_t& oid, const object_locator_t& oloc,
2524 snapid_t snap, map<string,bufferlist>& attrset,
2525 int flags, Context *onfinish, version_t *objver = NULL,
2526 ObjectOperation *extra_ops = NULL) {
2528 int i = init_ops(ops, 1, extra_ops);
2529 ops[i].op.op = CEPH_OSD_OP_GETXATTRS;
2530 C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish);
2531 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2532 CEPH_OSD_FLAG_READ, fin, objver);
2534 o->outbl = &fin->bl;
2540 ceph_tid_t read_full(const object_t& oid, const object_locator_t& oloc,
2541 snapid_t snap, bufferlist *pbl, int flags,
2542 Context *onfinish, version_t *objver = NULL,
2543 ObjectOperation *extra_ops = NULL) {
2544 return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags |
2545 CEPH_OSD_FLAG_READ, onfinish, objver, extra_ops);
2550 ceph_tid_t _modify(const object_t& oid, const object_locator_t& oloc,
2551 vector<OSDOp>& ops, ceph::real_time mtime,
2552 const SnapContext& snapc, int flags,
2554 version_t *objver = NULL) {
2555 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2556 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2563 Op *prepare_write_op(
2564 const object_t& oid, const object_locator_t& oloc,
2565 uint64_t off, uint64_t len, const SnapContext& snapc,
2566 const bufferlist &bl, ceph::real_time mtime, int flags,
2567 Context *oncommit, version_t *objver = NULL,
2568 ObjectOperation *extra_ops = NULL, int op_flags = 0,
2569 ZTracer::Trace *parent_trace = nullptr) {
2571 int i = init_ops(ops, 1, extra_ops);
2572 ops[i].op.op = CEPH_OSD_OP_WRITE;
2573 ops[i].op.extent.offset = off;
2574 ops[i].op.extent.length = len;
2575 ops[i].op.extent.truncate_size = 0;
2576 ops[i].op.extent.truncate_seq = 0;
2578 ops[i].op.flags = op_flags;
2579 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2580 CEPH_OSD_FLAG_WRITE, oncommit, objver,
2581 nullptr, parent_trace);
2587 const object_t& oid, const object_locator_t& oloc,
2588 uint64_t off, uint64_t len, const SnapContext& snapc,
2589 const bufferlist &bl, ceph::real_time mtime, int flags,
2590 Context *oncommit, version_t *objver = NULL,
2591 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2592 Op *o = prepare_write_op(oid, oloc, off, len, snapc, bl, mtime, flags,
2593 oncommit, objver, extra_ops, op_flags);
2598 Op *prepare_append_op(
2599 const object_t& oid, const object_locator_t& oloc,
2600 uint64_t len, const SnapContext& snapc,
2601 const bufferlist &bl, ceph::real_time mtime, int flags,
2603 version_t *objver = NULL,
2604 ObjectOperation *extra_ops = NULL) {
2606 int i = init_ops(ops, 1, extra_ops);
2607 ops[i].op.op = CEPH_OSD_OP_APPEND;
2608 ops[i].op.extent.offset = 0;
2609 ops[i].op.extent.length = len;
2610 ops[i].op.extent.truncate_size = 0;
2611 ops[i].op.extent.truncate_seq = 0;
2613 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2614 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2620 const object_t& oid, const object_locator_t& oloc,
2621 uint64_t len, const SnapContext& snapc,
2622 const bufferlist &bl, ceph::real_time mtime, int flags,
2624 version_t *objver = NULL,
2625 ObjectOperation *extra_ops = NULL) {
2626 Op *o = prepare_append_op(oid, oloc, len, snapc, bl, mtime, flags,
2627 oncommit, objver, extra_ops);
2632 ceph_tid_t write_trunc(const object_t& oid, const object_locator_t& oloc,
2633 uint64_t off, uint64_t len, const SnapContext& snapc,
2634 const bufferlist &bl, ceph::real_time mtime, int flags,
2635 uint64_t trunc_size, __u32 trunc_seq,
2637 version_t *objver = NULL,
2638 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2640 int i = init_ops(ops, 1, extra_ops);
2641 ops[i].op.op = CEPH_OSD_OP_WRITE;
2642 ops[i].op.extent.offset = off;
2643 ops[i].op.extent.length = len;
2644 ops[i].op.extent.truncate_size = trunc_size;
2645 ops[i].op.extent.truncate_seq = trunc_seq;
2647 ops[i].op.flags = op_flags;
2648 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2649 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2656 Op *prepare_write_full_op(
2657 const object_t& oid, const object_locator_t& oloc,
2658 const SnapContext& snapc, const bufferlist &bl,
2659 ceph::real_time mtime, int flags,
2660 Context *oncommit, version_t *objver = NULL,
2661 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2663 int i = init_ops(ops, 1, extra_ops);
2664 ops[i].op.op = CEPH_OSD_OP_WRITEFULL;
2665 ops[i].op.extent.offset = 0;
2666 ops[i].op.extent.length = bl.length();
2668 ops[i].op.flags = op_flags;
2669 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2670 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2675 ceph_tid_t write_full(
2676 const object_t& oid, const object_locator_t& oloc,
2677 const SnapContext& snapc, const bufferlist &bl,
2678 ceph::real_time mtime, int flags,
2679 Context *oncommit, version_t *objver = NULL,
2680 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2681 Op *o = prepare_write_full_op(oid, oloc, snapc, bl, mtime, flags,
2682 oncommit, objver, extra_ops, op_flags);
2687 Op *prepare_writesame_op(
2688 const object_t& oid, const object_locator_t& oloc,
2689 uint64_t write_len, uint64_t off,
2690 const SnapContext& snapc, const bufferlist &bl,
2691 ceph::real_time mtime, int flags,
2692 Context *oncommit, version_t *objver = NULL,
2693 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2696 int i = init_ops(ops, 1, extra_ops);
2697 ops[i].op.op = CEPH_OSD_OP_WRITESAME;
2698 ops[i].op.writesame.offset = off;
2699 ops[i].op.writesame.length = write_len;
2700 ops[i].op.writesame.data_length = bl.length();
2702 ops[i].op.flags = op_flags;
2703 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2704 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2709 ceph_tid_t writesame(
2710 const object_t& oid, const object_locator_t& oloc,
2711 uint64_t write_len, uint64_t off,
2712 const SnapContext& snapc, const bufferlist &bl,
2713 ceph::real_time mtime, int flags,
2714 Context *oncommit, version_t *objver = NULL,
2715 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2717 Op *o = prepare_writesame_op(oid, oloc, write_len, off, snapc, bl,
2718 mtime, flags, oncommit, objver,
2719 extra_ops, op_flags);
2725 ceph_tid_t trunc(const object_t& oid, const object_locator_t& oloc,
2726 const SnapContext& snapc, ceph::real_time mtime, int flags,
2727 uint64_t trunc_size, __u32 trunc_seq,
2728 Context *oncommit, version_t *objver = NULL,
2729 ObjectOperation *extra_ops = NULL) {
2731 int i = init_ops(ops, 1, extra_ops);
2732 ops[i].op.op = CEPH_OSD_OP_TRUNCATE;
2733 ops[i].op.extent.offset = trunc_size;
2734 ops[i].op.extent.truncate_size = trunc_size;
2735 ops[i].op.extent.truncate_seq = trunc_seq;
2736 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2737 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2744 ceph_tid_t zero(const object_t& oid, const object_locator_t& oloc,
2745 uint64_t off, uint64_t len, const SnapContext& snapc,
2746 ceph::real_time mtime, int flags, Context *oncommit,
2747 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2749 int i = init_ops(ops, 1, extra_ops);
2750 ops[i].op.op = CEPH_OSD_OP_ZERO;
2751 ops[i].op.extent.offset = off;
2752 ops[i].op.extent.length = len;
2753 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2754 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2761 ceph_tid_t rollback_object(const object_t& oid, const object_locator_t& oloc,
2762 const SnapContext& snapc, snapid_t snapid,
2763 ceph::real_time mtime, Context *oncommit,
2764 version_t *objver = NULL,
2765 ObjectOperation *extra_ops = NULL) {
2767 int i = init_ops(ops, 1, extra_ops);
2768 ops[i].op.op = CEPH_OSD_OP_ROLLBACK;
2769 ops[i].op.snap.snapid = snapid;
2770 Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, oncommit, objver);
2777 ceph_tid_t create(const object_t& oid, const object_locator_t& oloc,
2778 const SnapContext& snapc, ceph::real_time mtime, int global_flags,
2779 int create_flags, Context *oncommit,
2780 version_t *objver = NULL,
2781 ObjectOperation *extra_ops = NULL) {
2783 int i = init_ops(ops, 1, extra_ops);
2784 ops[i].op.op = CEPH_OSD_OP_CREATE;
2785 ops[i].op.flags = create_flags;
2786 Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags |
2787 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2794 Op *prepare_remove_op(
2795 const object_t& oid, const object_locator_t& oloc,
2796 const SnapContext& snapc, ceph::real_time mtime, int flags,
2798 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2800 int i = init_ops(ops, 1, extra_ops);
2801 ops[i].op.op = CEPH_OSD_OP_DELETE;
2802 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2803 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2809 const object_t& oid, const object_locator_t& oloc,
2810 const SnapContext& snapc, ceph::real_time mtime, int flags,
2812 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2813 Op *o = prepare_remove_op(oid, oloc, snapc, mtime, flags,
2814 oncommit, objver, extra_ops);
2820 ceph_tid_t setxattr(const object_t& oid, const object_locator_t& oloc,
2821 const char *name, const SnapContext& snapc, const bufferlist &bl,
2822 ceph::real_time mtime, int flags,
2824 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2826 int i = init_ops(ops, 1, extra_ops);
2827 ops[i].op.op = CEPH_OSD_OP_SETXATTR;
2828 ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2829 ops[i].op.xattr.value_len = bl.length();
2831 ops[i].indata.append(name);
2832 ops[i].indata.append(bl);
2833 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2834 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2841 ceph_tid_t removexattr(const object_t& oid, const object_locator_t& oloc,
2842 const char *name, const SnapContext& snapc,
2843 ceph::real_time mtime, int flags,
2845 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2847 int i = init_ops(ops, 1, extra_ops);
2848 ops[i].op.op = CEPH_OSD_OP_RMXATTR;
2849 ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2850 ops[i].op.xattr.value_len = 0;
2852 ops[i].indata.append(name);
2853 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2854 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2862 void list_nobjects(NListContext *p, Context *onfinish);
2863 uint32_t list_nobjects_seek(NListContext *p, uint32_t pos);
2864 uint32_t list_nobjects_seek(NListContext *list_context, const hobject_t& c);
2865 void list_nobjects_get_cursor(NListContext *list_context, hobject_t *c);
2867 hobject_t enumerate_objects_begin();
2868 hobject_t enumerate_objects_end();
2869 //hobject_t enumerate_objects_begin(int n, int m);
2870 void enumerate_objects(
2872 const std::string &ns,
2873 const hobject_t &start,
2874 const hobject_t &end,
2876 const bufferlist &filter_bl,
2877 std::list<librados::ListObjectImpl> *result,
2879 Context *on_finish);
2881 void _enumerate_reply(
2884 const hobject_t &end,
2885 const int64_t pool_id,
2887 epoch_t reply_epoch,
2888 std::list<librados::ListObjectImpl> *result,
2890 Context *on_finish);
2891 friend class C_EnumerateReply;
2893 // -------------------------
2896 void pool_op_submit(PoolOp *op);
2897 void _pool_op_submit(PoolOp *op);
2898 void _finish_pool_op(PoolOp *op, int r);
2899 void _do_delete_pool(int64_t pool, Context *onfinish);
2901 int create_pool_snap(int64_t pool, string& snapName, Context *onfinish);
2902 int allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
2904 int delete_pool_snap(int64_t pool, string& snapName, Context *onfinish);
2905 int delete_selfmanaged_snap(int64_t pool, snapid_t snap, Context *onfinish);
2907 int create_pool(string& name, Context *onfinish, uint64_t auid=0,
2909 int delete_pool(int64_t pool, Context *onfinish);
2910 int delete_pool(const string& name, Context *onfinish);
2911 int change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid);
2913 void handle_pool_op_reply(MPoolOpReply *m);
2914 int pool_op_cancel(ceph_tid_t tid, int r);
2916 // --------------------------
2919 void _poolstat_submit(PoolStatOp *op);
2921 void handle_get_pool_stats_reply(MGetPoolStatsReply *m);
2922 void get_pool_stats(list<string>& pools, map<string,pool_stat_t> *result,
2924 int pool_stat_op_cancel(ceph_tid_t tid, int r);
2925 void _finish_pool_stat_op(PoolStatOp *op, int r);
2927 // ---------------------------
2930 void _fs_stats_submit(StatfsOp *op);
2932 void handle_fs_stats_reply(MStatfsReply *m);
2933 void get_fs_stats(struct ceph_statfs& result, boost::optional<int64_t> poolid,
2935 int statfs_op_cancel(ceph_tid_t tid, int r);
2936 void _finish_statfs_op(StatfsOp *op, int r);
2938 // ---------------------------
2939 // some scatter/gather hackery
2941 void _sg_read_finish(vector<ObjectExtent>& extents,
2942 vector<bufferlist>& resultbl,
2943 bufferlist *bl, Context *onfinish);
2945 struct C_SGRead : public Context {
2947 vector<ObjectExtent> extents;
2948 vector<bufferlist> resultbl;
2951 C_SGRead(Objecter *ob,
2952 vector<ObjectExtent>& e, vector<bufferlist>& r, bufferlist *b,
2954 objecter(ob), bl(b), onfinish(c) {
2958 void finish(int r) override {
2959 objecter->_sg_read_finish(extents, resultbl, bl, onfinish);
2963 void sg_read_trunc(vector<ObjectExtent>& extents, snapid_t snap,
2964 bufferlist *bl, int flags, uint64_t trunc_size,
2965 __u32 trunc_seq, Context *onfinish, int op_flags = 0) {
2966 if (extents.size() == 1) {
2967 read_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
2968 extents[0].length, snap, bl, flags, extents[0].truncate_size,
2969 trunc_seq, onfinish, 0, 0, op_flags);
2971 C_GatherBuilder gather(cct);
2972 vector<bufferlist> resultbl(extents.size());
2974 for (vector<ObjectExtent>::iterator p = extents.begin();
2977 read_trunc(p->oid, p->oloc, p->offset, p->length, snap, &resultbl[i++],
2978 flags, p->truncate_size, trunc_seq, gather.new_sub(),
2981 gather.set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish));
2986 void sg_read(vector<ObjectExtent>& extents, snapid_t snap, bufferlist *bl,
2987 int flags, Context *onfinish, int op_flags = 0) {
2988 sg_read_trunc(extents, snap, bl, flags, 0, 0, onfinish, op_flags);
2991 void sg_write_trunc(vector<ObjectExtent>& extents, const SnapContext& snapc,
2992 const bufferlist& bl, ceph::real_time mtime, int flags,
2993 uint64_t trunc_size, __u32 trunc_seq,
2994 Context *oncommit, int op_flags = 0) {
2995 if (extents.size() == 1) {
2996 write_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
2997 extents[0].length, snapc, bl, mtime, flags,
2998 extents[0].truncate_size, trunc_seq, oncommit,
3001 C_GatherBuilder gcom(cct, oncommit);
3002 for (vector<ObjectExtent>::iterator p = extents.begin();
3006 for (vector<pair<uint64_t,uint64_t> >::iterator bit
3007 = p->buffer_extents.begin();
3008 bit != p->buffer_extents.end();
3010 bl.copy(bit->first, bit->second, cur);
3011 assert(cur.length() == p->length);
3012 write_trunc(p->oid, p->oloc, p->offset, p->length,
3013 snapc, cur, mtime, flags, p->truncate_size, trunc_seq,
3014 oncommit ? gcom.new_sub():0,
3021 void sg_write(vector<ObjectExtent>& extents, const SnapContext& snapc,
3022 const bufferlist& bl, ceph::real_time mtime, int flags,
3023 Context *oncommit, int op_flags = 0) {
3024 sg_write_trunc(extents, snapc, bl, mtime, flags, 0, 0, oncommit,
3028 void ms_handle_connect(Connection *con) override;
3029 bool ms_handle_reset(Connection *con) override;
3030 void ms_handle_remote_reset(Connection *con) override;
3031 bool ms_handle_refused(Connection *con) override;
3032 bool ms_get_authorizer(int dest_type,
3033 AuthAuthorizer **authorizer,
3034 bool force_new) override;
3036 void blacklist_self(bool set);
3039 epoch_t epoch_barrier;
3040 bool retry_writes_after_first_reply;
3042 void set_epoch_barrier(epoch_t epoch);
3044 PerfCounters *get_logger() {