Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osdc / Objecter.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #ifndef CEPH_OBJECTER_H
16 #define CEPH_OBJECTER_H
17
18 #include <condition_variable>
19 #include <list>
20 #include <map>
21 #include <mutex>
22 #include <memory>
23 #include <sstream>
24 #include <type_traits>
25
26 #include <boost/thread/shared_mutex.hpp>
27
28 #include "include/assert.h"
29 #include "include/buffer.h"
30 #include "include/types.h"
31 #include "include/rados/rados_types.hpp"
32
33 #include "common/admin_socket.h"
34 #include "common/ceph_time.h"
35 #include "common/ceph_timer.h"
36 #include "common/Finisher.h"
37 #include "common/shunique_lock.h"
38 #include "common/zipkin_trace.h"
39
40 #include "messages/MOSDOp.h"
41 #include "osd/OSDMap.h"
42
43 using namespace std;
44
45 class Context;
46 class Messenger;
47 class OSDMap;
48 class MonClient;
49 class Message;
50 class Finisher;
51
52 class MPoolOpReply;
53
54 class MGetPoolStatsReply;
55 class MStatfsReply;
56 class MCommandReply;
57 class MWatchNotify;
58
59 class PerfCounters;
60
61 // -----------------------------------------
62
63 struct ObjectOperation {
64   vector<OSDOp> ops;
65   int flags;
66   int priority;
67
68   vector<bufferlist*> out_bl;
69   vector<Context*> out_handler;
70   vector<int*> out_rval;
71
72   ObjectOperation() : flags(0), priority(0) {}
73   ~ObjectOperation() {
74     while (!out_handler.empty()) {
75       delete out_handler.back();
76       out_handler.pop_back();
77     }
78   }
79
80   size_t size() {
81     return ops.size();
82   }
83
84   void set_last_op_flags(int flags) {
85     assert(!ops.empty());
86     ops.rbegin()->op.flags = flags;
87   }
88
89   class C_TwoContexts;
90   /**
91    * Add a callback to run when this operation completes,
92    * after any other callbacks for it.
93    */
94   void add_handler(Context *extra);
95
96   OSDOp& add_op(int op) {
97     int s = ops.size();
98     ops.resize(s+1);
99     ops[s].op.op = op;
100     out_bl.resize(s+1);
101     out_bl[s] = NULL;
102     out_handler.resize(s+1);
103     out_handler[s] = NULL;
104     out_rval.resize(s+1);
105     out_rval[s] = NULL;
106     return ops[s];
107   }
108   void add_data(int op, uint64_t off, uint64_t len, bufferlist& bl) {
109     OSDOp& osd_op = add_op(op);
110     osd_op.op.extent.offset = off;
111     osd_op.op.extent.length = len;
112     osd_op.indata.claim_append(bl);
113   }
114   void add_writesame(int op, uint64_t off, uint64_t write_len,
115                      bufferlist& bl) {
116     OSDOp& osd_op = add_op(op);
117     osd_op.op.writesame.offset = off;
118     osd_op.op.writesame.length = write_len;
119     osd_op.op.writesame.data_length = bl.length();
120     osd_op.indata.claim_append(bl);
121   }
122   void add_xattr(int op, const char *name, const bufferlist& data) {
123     OSDOp& osd_op = add_op(op);
124     osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
125     osd_op.op.xattr.value_len = data.length();
126     if (name)
127       osd_op.indata.append(name);
128     osd_op.indata.append(data);
129   }
130   void add_xattr_cmp(int op, const char *name, uint8_t cmp_op,
131                      uint8_t cmp_mode, const bufferlist& data) {
132     OSDOp& osd_op = add_op(op);
133     osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
134     osd_op.op.xattr.value_len = data.length();
135     osd_op.op.xattr.cmp_op = cmp_op;
136     osd_op.op.xattr.cmp_mode = cmp_mode;
137     if (name)
138       osd_op.indata.append(name);
139     osd_op.indata.append(data);
140   }
141   void add_call(int op, const char *cname, const char *method,
142                 bufferlist &indata,
143                 bufferlist *outbl, Context *ctx, int *prval) {
144     OSDOp& osd_op = add_op(op);
145
146     unsigned p = ops.size() - 1;
147     out_handler[p] = ctx;
148     out_bl[p] = outbl;
149     out_rval[p] = prval;
150
151     osd_op.op.cls.class_len = strlen(cname);
152     osd_op.op.cls.method_len = strlen(method);
153     osd_op.op.cls.indata_len = indata.length();
154     osd_op.indata.append(cname, osd_op.op.cls.class_len);
155     osd_op.indata.append(method, osd_op.op.cls.method_len);
156     osd_op.indata.append(indata);
157   }
158   void add_pgls(int op, uint64_t count, collection_list_handle_t cookie,
159                 epoch_t start_epoch) {
160     OSDOp& osd_op = add_op(op);
161     osd_op.op.pgls.count = count;
162     osd_op.op.pgls.start_epoch = start_epoch;
163     ::encode(cookie, osd_op.indata);
164   }
165   void add_pgls_filter(int op, uint64_t count, const bufferlist& filter,
166                        collection_list_handle_t cookie, epoch_t start_epoch) {
167     OSDOp& osd_op = add_op(op);
168     osd_op.op.pgls.count = count;
169     osd_op.op.pgls.start_epoch = start_epoch;
170     string cname = "pg";
171     string mname = "filter";
172     ::encode(cname, osd_op.indata);
173     ::encode(mname, osd_op.indata);
174     osd_op.indata.append(filter);
175     ::encode(cookie, osd_op.indata);
176   }
177   void add_alloc_hint(int op, uint64_t expected_object_size,
178                       uint64_t expected_write_size,
179                       uint32_t flags) {
180     OSDOp& osd_op = add_op(op);
181     osd_op.op.alloc_hint.expected_object_size = expected_object_size;
182     osd_op.op.alloc_hint.expected_write_size = expected_write_size;
183     osd_op.op.alloc_hint.flags = flags;
184   }
185
186   // ------
187
188   // pg
189   void pg_ls(uint64_t count, bufferlist& filter,
190              collection_list_handle_t cookie, epoch_t start_epoch) {
191     if (filter.length() == 0)
192       add_pgls(CEPH_OSD_OP_PGLS, count, cookie, start_epoch);
193     else
194       add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER, count, filter, cookie,
195                       start_epoch);
196     flags |= CEPH_OSD_FLAG_PGOP;
197   }
198
199   void pg_nls(uint64_t count, const bufferlist& filter,
200               collection_list_handle_t cookie, epoch_t start_epoch) {
201     if (filter.length() == 0)
202       add_pgls(CEPH_OSD_OP_PGNLS, count, cookie, start_epoch);
203     else
204       add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER, count, filter, cookie,
205                       start_epoch);
206     flags |= CEPH_OSD_FLAG_PGOP;
207   }
208
209   void scrub_ls(const librados::object_id_t& start_after,
210                 uint64_t max_to_get,
211                 std::vector<librados::inconsistent_obj_t> *objects,
212                 uint32_t *interval,
213                 int *rval);
214   void scrub_ls(const librados::object_id_t& start_after,
215                 uint64_t max_to_get,
216                 std::vector<librados::inconsistent_snapset_t> *objects,
217                 uint32_t *interval,
218                 int *rval);
219
220   void create(bool excl) {
221     OSDOp& o = add_op(CEPH_OSD_OP_CREATE);
222     o.op.flags = (excl ? CEPH_OSD_OP_FLAG_EXCL : 0);
223   }
224
225   struct C_ObjectOperation_stat : public Context {
226     bufferlist bl;
227     uint64_t *psize;
228     ceph::real_time *pmtime;
229     time_t *ptime;
230     struct timespec *pts;
231     int *prval;
232     C_ObjectOperation_stat(uint64_t *ps, ceph::real_time *pm, time_t *pt, struct timespec *_pts,
233                            int *prval)
234       : psize(ps), pmtime(pm), ptime(pt), pts(_pts), prval(prval) {}
235     void finish(int r) override {
236       if (r >= 0) {
237         bufferlist::iterator p = bl.begin();
238         try {
239           uint64_t size;
240           ceph::real_time mtime;
241           ::decode(size, p);
242           ::decode(mtime, p);
243           if (psize)
244             *psize = size;
245           if (pmtime)
246             *pmtime = mtime;
247           if (ptime)
248             *ptime = ceph::real_clock::to_time_t(mtime);
249           if (pts)
250             *pts = ceph::real_clock::to_timespec(mtime);
251         } catch (buffer::error& e) {
252           if (prval)
253             *prval = -EIO;
254         }
255       }
256     }
257   };
258   void stat(uint64_t *psize, ceph::real_time *pmtime, int *prval) {
259     add_op(CEPH_OSD_OP_STAT);
260     unsigned p = ops.size() - 1;
261     C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, pmtime, NULL, NULL,
262                                                            prval);
263     out_bl[p] = &h->bl;
264     out_handler[p] = h;
265     out_rval[p] = prval;
266   }
267   void stat(uint64_t *psize, time_t *ptime, int *prval) {
268     add_op(CEPH_OSD_OP_STAT);
269     unsigned p = ops.size() - 1;
270     C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, NULL, ptime, NULL,
271                                                            prval);
272     out_bl[p] = &h->bl;
273     out_handler[p] = h;
274     out_rval[p] = prval;
275   }
276   void stat(uint64_t *psize, struct timespec *pts, int *prval) {
277     add_op(CEPH_OSD_OP_STAT);
278     unsigned p = ops.size() - 1;
279     C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, NULL, NULL, pts,
280                                                            prval);
281     out_bl[p] = &h->bl;
282     out_handler[p] = h;
283     out_rval[p] = prval;
284   }
285   // object cmpext
286   struct C_ObjectOperation_cmpext : public Context {
287     int *prval;
288     C_ObjectOperation_cmpext(int *prval)
289       : prval(prval) {}
290
291     void finish(int r) {
292       if (prval)
293         *prval = r;
294     }
295   };
296
297   void cmpext(uint64_t off, bufferlist& cmp_bl, int *prval) {
298     add_data(CEPH_OSD_OP_CMPEXT, off, cmp_bl.length(), cmp_bl);
299     unsigned p = ops.size() - 1;
300     C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval);
301     out_handler[p] = h;
302     out_rval[p] = prval;
303   }
304
305   // Used by C API
306   void cmpext(uint64_t off, uint64_t cmp_len, const char *cmp_buf, int *prval) {
307     bufferlist cmp_bl;
308     cmp_bl.append(cmp_buf, cmp_len);
309     add_data(CEPH_OSD_OP_CMPEXT, off, cmp_len, cmp_bl);
310     unsigned p = ops.size() - 1;
311     C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval);
312     out_handler[p] = h;
313     out_rval[p] = prval;
314   }
315
316   void read(uint64_t off, uint64_t len, bufferlist *pbl, int *prval,
317             Context* ctx) {
318     bufferlist bl;
319     add_data(CEPH_OSD_OP_READ, off, len, bl);
320     unsigned p = ops.size() - 1;
321     out_bl[p] = pbl;
322     out_rval[p] = prval;
323     out_handler[p] = ctx;
324   }
325
326   struct C_ObjectOperation_sparse_read : public Context {
327     bufferlist bl;
328     bufferlist *data_bl;
329     std::map<uint64_t, uint64_t> *extents;
330     int *prval;
331     C_ObjectOperation_sparse_read(bufferlist *data_bl,
332                                   std::map<uint64_t, uint64_t> *extents,
333                                   int *prval)
334       : data_bl(data_bl), extents(extents), prval(prval) {}
335     void finish(int r) override {
336       bufferlist::iterator iter = bl.begin();
337       if (r >= 0) {
338         try {
339           ::decode(*extents, iter);
340           ::decode(*data_bl, iter);
341         } catch (buffer::error& e) {
342           if (prval)
343             *prval = -EIO;
344         }
345       }
346     }
347   };
348   void sparse_read(uint64_t off, uint64_t len, std::map<uint64_t,uint64_t> *m,
349                    bufferlist *data_bl, int *prval) {
350     bufferlist bl;
351     add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
352     unsigned p = ops.size() - 1;
353     C_ObjectOperation_sparse_read *h =
354       new C_ObjectOperation_sparse_read(data_bl, m, prval);
355     out_bl[p] = &h->bl;
356     out_handler[p] = h;
357     out_rval[p] = prval;
358   }
359   void write(uint64_t off, bufferlist& bl,
360              uint64_t truncate_size,
361              uint32_t truncate_seq) {
362     add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl);
363     OSDOp& o = *ops.rbegin();
364     o.op.extent.truncate_size = truncate_size;
365     o.op.extent.truncate_seq = truncate_seq;
366   }
367   void write(uint64_t off, bufferlist& bl) {
368     write(off, bl, 0, 0);
369   }
370   void write_full(bufferlist& bl) {
371     add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl);
372   }
373   void writesame(uint64_t off, uint64_t write_len, bufferlist& bl) {
374     add_writesame(CEPH_OSD_OP_WRITESAME, off, write_len, bl);
375   }
376   void append(bufferlist& bl) {
377     add_data(CEPH_OSD_OP_APPEND, 0, bl.length(), bl);
378   }
379   void zero(uint64_t off, uint64_t len) {
380     bufferlist bl;
381     add_data(CEPH_OSD_OP_ZERO, off, len, bl);
382   }
383   void truncate(uint64_t off) {
384     bufferlist bl;
385     add_data(CEPH_OSD_OP_TRUNCATE, off, 0, bl);
386   }
387   void remove() {
388     bufferlist bl;
389     add_data(CEPH_OSD_OP_DELETE, 0, 0, bl);
390   }
391   void mapext(uint64_t off, uint64_t len) {
392     bufferlist bl;
393     add_data(CEPH_OSD_OP_MAPEXT, off, len, bl);
394   }
395   void sparse_read(uint64_t off, uint64_t len) {
396     bufferlist bl;
397     add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
398   }
399
400   void checksum(uint8_t type, const bufferlist &init_value_bl,
401                 uint64_t off, uint64_t len, size_t chunk_size,
402                 bufferlist *pbl, int *prval, Context *ctx) {
403     OSDOp& osd_op = add_op(CEPH_OSD_OP_CHECKSUM);
404     osd_op.op.checksum.offset = off;
405     osd_op.op.checksum.length = len;
406     osd_op.op.checksum.type = type;
407     osd_op.op.checksum.chunk_size = chunk_size;
408     osd_op.indata.append(init_value_bl);
409
410     unsigned p = ops.size() - 1;
411     out_bl[p] = pbl;
412     out_rval[p] = prval;
413     out_handler[p] = ctx;
414   }
415
416   // object attrs
417   void getxattr(const char *name, bufferlist *pbl, int *prval) {
418     bufferlist bl;
419     add_xattr(CEPH_OSD_OP_GETXATTR, name, bl);
420     unsigned p = ops.size() - 1;
421     out_bl[p] = pbl;
422     out_rval[p] = prval;
423   }
424   struct C_ObjectOperation_decodevals : public Context {
425     uint64_t max_entries;
426     bufferlist bl;
427     std::map<std::string,bufferlist> *pattrs;
428     bool *ptruncated;
429     int *prval;
430     C_ObjectOperation_decodevals(uint64_t m, std::map<std::string,bufferlist> *pa,
431                                  bool *pt, int *pr)
432       : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) {
433       if (ptruncated) {
434         *ptruncated = false;
435       }
436     }
437     void finish(int r) override {
438       if (r >= 0) {
439         bufferlist::iterator p = bl.begin();
440         try {
441           if (pattrs)
442             ::decode(*pattrs, p);
443           if (ptruncated) {
444             std::map<std::string,bufferlist> ignore;
445             if (!pattrs) {
446               ::decode(ignore, p);
447               pattrs = &ignore;
448             }
449             if (!p.end()) {
450               ::decode(*ptruncated, p);
451             } else {
452               // the OSD did not provide this.  since old OSDs do not
453               // enfoce omap result limits either, we can infer it from
454               // the size of the result
455               *ptruncated = (pattrs->size() == max_entries);
456             }
457           }
458         }
459         catch (buffer::error& e) {
460           if (prval)
461             *prval = -EIO;
462         }
463       }
464     }
465   };
466   struct C_ObjectOperation_decodekeys : public Context {
467     uint64_t max_entries;
468     bufferlist bl;
469     std::set<std::string> *pattrs;
470     bool *ptruncated;
471     int *prval;
472     C_ObjectOperation_decodekeys(uint64_t m, std::set<std::string> *pa, bool *pt,
473                                  int *pr)
474       : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) {
475       if (ptruncated) {
476         *ptruncated = false;
477       }
478     }
479     void finish(int r) override {
480       if (r >= 0) {
481         bufferlist::iterator p = bl.begin();
482         try {
483           if (pattrs)
484             ::decode(*pattrs, p);
485           if (ptruncated) {
486             std::set<std::string> ignore;
487             if (!pattrs) {
488               ::decode(ignore, p);
489               pattrs = &ignore;
490             }
491             if (!p.end()) {
492               ::decode(*ptruncated, p);
493             } else {
494               // the OSD did not provide this.  since old OSDs do not
495               // enfoce omap result limits either, we can infer it from
496               // the size of the result
497               *ptruncated = (pattrs->size() == max_entries);
498             }
499           }
500         }
501         catch (buffer::error& e) {
502           if (prval)
503             *prval = -EIO;
504         }
505       }
506     }
507   };
508   struct C_ObjectOperation_decodewatchers : public Context {
509     bufferlist bl;
510     list<obj_watch_t> *pwatchers;
511     int *prval;
512     C_ObjectOperation_decodewatchers(list<obj_watch_t> *pw, int *pr)
513       : pwatchers(pw), prval(pr) {}
514     void finish(int r) override {
515       if (r >= 0) {
516         bufferlist::iterator p = bl.begin();
517         try {
518           obj_list_watch_response_t resp;
519           ::decode(resp, p);
520           if (pwatchers) {
521             for (list<watch_item_t>::iterator i = resp.entries.begin() ;
522                  i != resp.entries.end() ; ++i) {
523               obj_watch_t ow;
524               ostringstream sa;
525               sa << i->addr;
526               strncpy(ow.addr, sa.str().c_str(), 256);
527               ow.watcher_id = i->name.num();
528               ow.cookie = i->cookie;
529               ow.timeout_seconds = i->timeout_seconds;
530               pwatchers->push_back(ow);
531             }
532           }
533         }
534         catch (buffer::error& e) {
535           if (prval)
536             *prval = -EIO;
537         }
538       }
539     }
540   };
541   struct C_ObjectOperation_decodesnaps : public Context {
542     bufferlist bl;
543     librados::snap_set_t *psnaps;
544     int *prval;
545     C_ObjectOperation_decodesnaps(librados::snap_set_t *ps, int *pr)
546       : psnaps(ps), prval(pr) {}
547     void finish(int r) override {
548       if (r >= 0) {
549         bufferlist::iterator p = bl.begin();
550         try {
551           obj_list_snap_response_t resp;
552           ::decode(resp, p);
553           if (psnaps) {
554             psnaps->clones.clear();
555             for (vector<clone_info>::iterator ci = resp.clones.begin();
556                  ci != resp.clones.end();
557                  ++ci) {
558               librados::clone_info_t clone;
559
560               clone.cloneid = ci->cloneid;
561               clone.snaps.reserve(ci->snaps.size());
562               clone.snaps.insert(clone.snaps.end(), ci->snaps.begin(),
563                                  ci->snaps.end());
564               clone.overlap = ci->overlap;
565               clone.size = ci->size;
566
567               psnaps->clones.push_back(clone);
568             }
569             psnaps->seq = resp.seq;
570           }
571         } catch (buffer::error& e) {
572           if (prval)
573             *prval = -EIO;
574         }
575       }
576     }
577   };
578   void getxattrs(std::map<std::string,bufferlist> *pattrs, int *prval) {
579     add_op(CEPH_OSD_OP_GETXATTRS);
580     if (pattrs || prval) {
581       unsigned p = ops.size() - 1;
582       C_ObjectOperation_decodevals *h
583         = new C_ObjectOperation_decodevals(0, pattrs, nullptr, prval);
584       out_handler[p] = h;
585       out_bl[p] = &h->bl;
586       out_rval[p] = prval;
587     }
588   }
589   void setxattr(const char *name, const bufferlist& bl) {
590     add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
591   }
592   void setxattr(const char *name, const string& s) {
593     bufferlist bl;
594     bl.append(s);
595     add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
596   }
597   void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode,
598                 const bufferlist& bl) {
599     add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, bl);
600   }
601   void rmxattr(const char *name) {
602     bufferlist bl;
603     add_xattr(CEPH_OSD_OP_RMXATTR, name, bl);
604   }
605   void setxattrs(map<string, bufferlist>& attrs) {
606     bufferlist bl;
607     ::encode(attrs, bl);
608     add_xattr(CEPH_OSD_OP_RESETXATTRS, 0, bl.length());
609   }
610   void resetxattrs(const char *prefix, map<string, bufferlist>& attrs) {
611     bufferlist bl;
612     ::encode(attrs, bl);
613     add_xattr(CEPH_OSD_OP_RESETXATTRS, prefix, bl);
614   }
615
616   // trivialmap
617   void tmap_update(bufferlist& bl) {
618     add_data(CEPH_OSD_OP_TMAPUP, 0, 0, bl);
619   }
620   void tmap_put(bufferlist& bl) {
621     add_data(CEPH_OSD_OP_TMAPPUT, 0, bl.length(), bl);
622   }
623   void tmap_get(bufferlist *pbl, int *prval) {
624     add_op(CEPH_OSD_OP_TMAPGET);
625     unsigned p = ops.size() - 1;
626     out_bl[p] = pbl;
627     out_rval[p] = prval;
628   }
629   void tmap_get() {
630     add_op(CEPH_OSD_OP_TMAPGET);
631   }
632   void tmap_to_omap(bool nullok=false) {
633      OSDOp& osd_op = add_op(CEPH_OSD_OP_TMAP2OMAP);
634      if (nullok)
635        osd_op.op.tmap2omap.flags = CEPH_OSD_TMAP2OMAP_NULLOK;
636   }
637
638   // objectmap
639   void omap_get_keys(const string &start_after,
640                      uint64_t max_to_get,
641                      std::set<std::string> *out_set,
642                      bool *ptruncated,
643                      int *prval) {
644     OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS);
645     bufferlist bl;
646     ::encode(start_after, bl);
647     ::encode(max_to_get, bl);
648     op.op.extent.offset = 0;
649     op.op.extent.length = bl.length();
650     op.indata.claim_append(bl);
651     if (prval || ptruncated || out_set) {
652       unsigned p = ops.size() - 1;
653       C_ObjectOperation_decodekeys *h =
654         new C_ObjectOperation_decodekeys(max_to_get, out_set, ptruncated, prval);
655       out_handler[p] = h;
656       out_bl[p] = &h->bl;
657       out_rval[p] = prval;
658     }
659   }
660
661   void omap_get_vals(const string &start_after,
662                      const string &filter_prefix,
663                      uint64_t max_to_get,
664                      std::map<std::string, bufferlist> *out_set,
665                      bool *ptruncated,
666                      int *prval) {
667     OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALS);
668     bufferlist bl;
669     ::encode(start_after, bl);
670     ::encode(max_to_get, bl);
671     ::encode(filter_prefix, bl);
672     op.op.extent.offset = 0;
673     op.op.extent.length = bl.length();
674     op.indata.claim_append(bl);
675     if (prval || out_set || ptruncated) {
676       unsigned p = ops.size() - 1;
677       C_ObjectOperation_decodevals *h =
678         new C_ObjectOperation_decodevals(max_to_get, out_set, ptruncated, prval);
679       out_handler[p] = h;
680       out_bl[p] = &h->bl;
681       out_rval[p] = prval;
682     }
683   }
684
685   void omap_get_vals_by_keys(const std::set<std::string> &to_get,
686                             std::map<std::string, bufferlist> *out_set,
687                             int *prval) {
688     OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS);
689     bufferlist bl;
690     ::encode(to_get, bl);
691     op.op.extent.offset = 0;
692     op.op.extent.length = bl.length();
693     op.indata.claim_append(bl);
694     if (prval || out_set) {
695       unsigned p = ops.size() - 1;
696       C_ObjectOperation_decodevals *h =
697         new C_ObjectOperation_decodevals(0, out_set, nullptr, prval);
698       out_handler[p] = h;
699       out_bl[p] = &h->bl;
700       out_rval[p] = prval;
701     }
702   }
703
704   void omap_cmp(const std::map<std::string, pair<bufferlist,int> > &assertions,
705                 int *prval) {
706     OSDOp &op = add_op(CEPH_OSD_OP_OMAP_CMP);
707     bufferlist bl;
708     ::encode(assertions, bl);
709     op.op.extent.offset = 0;
710     op.op.extent.length = bl.length();
711     op.indata.claim_append(bl);
712     if (prval) {
713       unsigned p = ops.size() - 1;
714       out_rval[p] = prval;
715     }
716   }
717
718   struct C_ObjectOperation_copyget : public Context {
719     bufferlist bl;
720     object_copy_cursor_t *cursor;
721     uint64_t *out_size;
722     ceph::real_time *out_mtime;
723     std::map<std::string,bufferlist> *out_attrs;
724     bufferlist *out_data, *out_omap_header, *out_omap_data;
725     vector<snapid_t> *out_snaps;
726     snapid_t *out_snap_seq;
727     uint32_t *out_flags;
728     uint32_t *out_data_digest;
729     uint32_t *out_omap_digest;
730     mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *out_reqids;
731     uint64_t *out_truncate_seq;
732     uint64_t *out_truncate_size;
733     int *prval;
734     C_ObjectOperation_copyget(object_copy_cursor_t *c,
735                               uint64_t *s,
736                               ceph::real_time *m,
737                               std::map<std::string,bufferlist> *a,
738                               bufferlist *d, bufferlist *oh,
739                               bufferlist *o,
740                               std::vector<snapid_t> *osnaps,
741                               snapid_t *osnap_seq,
742                               uint32_t *flags,
743                               uint32_t *dd,
744                               uint32_t *od,
745                               mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *oreqids,
746                               uint64_t *otseq,
747                               uint64_t *otsize,
748                               int *r)
749       : cursor(c),
750         out_size(s), out_mtime(m),
751         out_attrs(a), out_data(d), out_omap_header(oh),
752         out_omap_data(o), out_snaps(osnaps), out_snap_seq(osnap_seq),
753         out_flags(flags), out_data_digest(dd), out_omap_digest(od),
754         out_reqids(oreqids),
755         out_truncate_seq(otseq),
756         out_truncate_size(otsize),
757         prval(r) {}
758     void finish(int r) override {
759       // reqids are copied on ENOENT
760       if (r < 0 && r != -ENOENT)
761         return;
762       try {
763         bufferlist::iterator p = bl.begin();
764         object_copy_data_t copy_reply;
765         ::decode(copy_reply, p);
766         if (r == -ENOENT) {
767           if (out_reqids)
768             *out_reqids = copy_reply.reqids;
769           return;
770         }
771         if (out_size)
772           *out_size = copy_reply.size;
773         if (out_mtime)
774           *out_mtime = ceph::real_clock::from_ceph_timespec(copy_reply.mtime);
775         if (out_attrs)
776           *out_attrs = copy_reply.attrs;
777         if (out_data)
778           out_data->claim_append(copy_reply.data);
779         if (out_omap_header)
780           out_omap_header->claim_append(copy_reply.omap_header);
781         if (out_omap_data)
782           *out_omap_data = copy_reply.omap_data;
783         if (out_snaps)
784           *out_snaps = copy_reply.snaps;
785         if (out_snap_seq)
786           *out_snap_seq = copy_reply.snap_seq;
787         if (out_flags)
788           *out_flags = copy_reply.flags;
789         if (out_data_digest)
790           *out_data_digest = copy_reply.data_digest;
791         if (out_omap_digest)
792           *out_omap_digest = copy_reply.omap_digest;
793         if (out_reqids)
794           *out_reqids = copy_reply.reqids;
795         if (out_truncate_seq)
796           *out_truncate_seq = copy_reply.truncate_seq;
797         if (out_truncate_size)
798           *out_truncate_size = copy_reply.truncate_size;
799         *cursor = copy_reply.cursor;
800       } catch (buffer::error& e) {
801         if (prval)
802           *prval = -EIO;
803       }
804     }
805   };
806
807   void copy_get(object_copy_cursor_t *cursor,
808                 uint64_t max,
809                 uint64_t *out_size,
810                 ceph::real_time *out_mtime,
811                 std::map<std::string,bufferlist> *out_attrs,
812                 bufferlist *out_data,
813                 bufferlist *out_omap_header,
814                 bufferlist *out_omap_data,
815                 vector<snapid_t> *out_snaps,
816                 snapid_t *out_snap_seq,
817                 uint32_t *out_flags,
818                 uint32_t *out_data_digest,
819                 uint32_t *out_omap_digest,
820                 mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *out_reqids,
821                 uint64_t *truncate_seq,
822                 uint64_t *truncate_size,
823                 int *prval) {
824     OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET);
825     osd_op.op.copy_get.max = max;
826     ::encode(*cursor, osd_op.indata);
827     ::encode(max, osd_op.indata);
828     unsigned p = ops.size() - 1;
829     out_rval[p] = prval;
830     C_ObjectOperation_copyget *h =
831       new C_ObjectOperation_copyget(cursor, out_size, out_mtime,
832                                     out_attrs, out_data, out_omap_header,
833                                     out_omap_data, out_snaps, out_snap_seq,
834                                     out_flags, out_data_digest,
835                                     out_omap_digest, out_reqids, truncate_seq,
836                                     truncate_size, prval);
837     out_bl[p] = &h->bl;
838     out_handler[p] = h;
839   }
840
841   void undirty() {
842     add_op(CEPH_OSD_OP_UNDIRTY);
843   }
844
845   struct C_ObjectOperation_isdirty : public Context {
846     bufferlist bl;
847     bool *pisdirty;
848     int *prval;
849     C_ObjectOperation_isdirty(bool *p, int *r)
850       : pisdirty(p), prval(r) {}
851     void finish(int r) override {
852       if (r < 0)
853         return;
854       try {
855         bufferlist::iterator p = bl.begin();
856         bool isdirty;
857         ::decode(isdirty, p);
858         if (pisdirty)
859           *pisdirty = isdirty;
860       } catch (buffer::error& e) {
861         if (prval)
862           *prval = -EIO;
863       }
864     }
865   };
866
867   void is_dirty(bool *pisdirty, int *prval) {
868     add_op(CEPH_OSD_OP_ISDIRTY);
869     unsigned p = ops.size() - 1;
870     out_rval[p] = prval;
871     C_ObjectOperation_isdirty *h =
872       new C_ObjectOperation_isdirty(pisdirty, prval);
873     out_bl[p] = &h->bl;
874     out_handler[p] = h;
875   }
876
877   struct C_ObjectOperation_hit_set_ls : public Context {
878     bufferlist bl;
879     std::list< std::pair<time_t, time_t> > *ptls;
880     std::list< std::pair<ceph::real_time, ceph::real_time> > *putls;
881     int *prval;
882     C_ObjectOperation_hit_set_ls(std::list< std::pair<time_t, time_t> > *t,
883                                  std::list< std::pair<ceph::real_time,
884                                                       ceph::real_time> > *ut,
885                                  int *r)
886       : ptls(t), putls(ut), prval(r) {}
887     void finish(int r) override {
888       if (r < 0)
889         return;
890       try {
891         bufferlist::iterator p = bl.begin();
892         std::list< std::pair<ceph::real_time, ceph::real_time> > ls;
893         ::decode(ls, p);
894         if (ptls) {
895           ptls->clear();
896           for (auto p = ls.begin(); p != ls.end(); ++p)
897             // round initial timestamp up to the next full second to
898             // keep this a valid interval.
899             ptls->push_back(
900               make_pair(ceph::real_clock::to_time_t(
901                           ceph::ceil(p->first,
902                                      // Sadly, no time literals until C++14.
903                                      std::chrono::seconds(1))),
904                         ceph::real_clock::to_time_t(p->second)));
905         }
906         if (putls)
907           putls->swap(ls);
908       } catch (buffer::error& e) {
909         r = -EIO;
910       }
911       if (prval)
912         *prval = r;
913     }
914   };
915
916   /**
917    * list available HitSets.
918    *
919    * We will get back a list of time intervals.  Note that the most
920    * recent range may have an empty end timestamp if it is still
921    * accumulating.
922    *
923    * @param pls [out] list of time intervals
924    * @param prval [out] return value
925    */
926   void hit_set_ls(std::list< std::pair<time_t, time_t> > *pls, int *prval) {
927     add_op(CEPH_OSD_OP_PG_HITSET_LS);
928     unsigned p = ops.size() - 1;
929     out_rval[p] = prval;
930     C_ObjectOperation_hit_set_ls *h =
931       new C_ObjectOperation_hit_set_ls(pls, NULL, prval);
932     out_bl[p] = &h->bl;
933     out_handler[p] = h;
934   }
935   void hit_set_ls(std::list<std::pair<ceph::real_time, ceph::real_time> > *pls,
936                   int *prval) {
937     add_op(CEPH_OSD_OP_PG_HITSET_LS);
938     unsigned p = ops.size() - 1;
939     out_rval[p] = prval;
940     C_ObjectOperation_hit_set_ls *h =
941       new C_ObjectOperation_hit_set_ls(NULL, pls, prval);
942     out_bl[p] = &h->bl;
943     out_handler[p] = h;
944   }
945
946   /**
947    * get HitSet
948    *
949    * Return an encoded HitSet that includes the provided time
950    * interval.
951    *
952    * @param stamp [in] timestamp
953    * @param pbl [out] target buffer for encoded HitSet
954    * @param prval [out] return value
955    */
956   void hit_set_get(ceph::real_time stamp, bufferlist *pbl, int *prval) {
957     OSDOp& op = add_op(CEPH_OSD_OP_PG_HITSET_GET);
958     op.op.hit_set_get.stamp = ceph::real_clock::to_ceph_timespec(stamp);
959     unsigned p = ops.size() - 1;
960     out_rval[p] = prval;
961     out_bl[p] = pbl;
962   }
963
964   void omap_get_header(bufferlist *bl, int *prval) {
965     add_op(CEPH_OSD_OP_OMAPGETHEADER);
966     unsigned p = ops.size() - 1;
967     out_bl[p] = bl;
968     out_rval[p] = prval;
969   }
970
971   void omap_set(const map<string, bufferlist> &map) {
972     bufferlist bl;
973     ::encode(map, bl);
974     add_data(CEPH_OSD_OP_OMAPSETVALS, 0, bl.length(), bl);
975   }
976
977   void omap_set_header(bufferlist &bl) {
978     add_data(CEPH_OSD_OP_OMAPSETHEADER, 0, bl.length(), bl);
979   }
980
981   void omap_clear() {
982     add_op(CEPH_OSD_OP_OMAPCLEAR);
983   }
984
985   void omap_rm_keys(const std::set<std::string> &to_remove) {
986     bufferlist bl;
987     ::encode(to_remove, bl);
988     add_data(CEPH_OSD_OP_OMAPRMKEYS, 0, bl.length(), bl);
989   }
990
991   // object classes
992   void call(const char *cname, const char *method, bufferlist &indata) {
993     add_call(CEPH_OSD_OP_CALL, cname, method, indata, NULL, NULL, NULL);
994   }
995
996   void call(const char *cname, const char *method, bufferlist &indata,
997             bufferlist *outdata, Context *ctx, int *prval) {
998     add_call(CEPH_OSD_OP_CALL, cname, method, indata, outdata, ctx, prval);
999   }
1000
1001   // watch/notify
1002   void watch(uint64_t cookie, __u8 op, uint32_t timeout = 0) {
1003     OSDOp& osd_op = add_op(CEPH_OSD_OP_WATCH);
1004     osd_op.op.watch.cookie = cookie;
1005     osd_op.op.watch.op = op;
1006     osd_op.op.watch.timeout = timeout;
1007   }
1008
1009   void notify(uint64_t cookie, uint32_t prot_ver, uint32_t timeout,
1010               bufferlist &bl, bufferlist *inbl) {
1011     OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY);
1012     osd_op.op.notify.cookie = cookie;
1013     ::encode(prot_ver, *inbl);
1014     ::encode(timeout, *inbl);
1015     ::encode(bl, *inbl);
1016     osd_op.indata.append(*inbl);
1017   }
1018
1019   void notify_ack(uint64_t notify_id, uint64_t cookie,
1020                   bufferlist& reply_bl) {
1021     OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY_ACK);
1022     bufferlist bl;
1023     ::encode(notify_id, bl);
1024     ::encode(cookie, bl);
1025     ::encode(reply_bl, bl);
1026     osd_op.indata.append(bl);
1027   }
1028
1029   void list_watchers(list<obj_watch_t> *out,
1030                      int *prval) {
1031     (void)add_op(CEPH_OSD_OP_LIST_WATCHERS);
1032     if (prval || out) {
1033       unsigned p = ops.size() - 1;
1034       C_ObjectOperation_decodewatchers *h =
1035         new C_ObjectOperation_decodewatchers(out, prval);
1036       out_handler[p] = h;
1037       out_bl[p] = &h->bl;
1038       out_rval[p] = prval;
1039     }
1040   }
1041
1042   void list_snaps(librados::snap_set_t *out, int *prval) {
1043     (void)add_op(CEPH_OSD_OP_LIST_SNAPS);
1044     if (prval || out) {
1045       unsigned p = ops.size() - 1;
1046       C_ObjectOperation_decodesnaps *h =
1047         new C_ObjectOperation_decodesnaps(out, prval);
1048       out_handler[p] = h;
1049       out_bl[p] = &h->bl;
1050       out_rval[p] = prval;
1051     }
1052   }
1053
1054   void assert_version(uint64_t ver) {
1055     OSDOp& osd_op = add_op(CEPH_OSD_OP_ASSERT_VER);
1056     osd_op.op.assert_ver.ver = ver;
1057   }
1058
1059   void cmpxattr(const char *name, const bufferlist& val,
1060                 int op, int mode) {
1061     add_xattr(CEPH_OSD_OP_CMPXATTR, name, val);
1062     OSDOp& o = *ops.rbegin();
1063     o.op.xattr.cmp_op = op;
1064     o.op.xattr.cmp_mode = mode;
1065   }
1066
1067   void rollback(uint64_t snapid) {
1068     OSDOp& osd_op = add_op(CEPH_OSD_OP_ROLLBACK);
1069     osd_op.op.snap.snapid = snapid;
1070   }
1071
1072   void copy_from(object_t src, snapid_t snapid, object_locator_t src_oloc,
1073                  version_t src_version, unsigned flags,
1074                  unsigned src_fadvise_flags) {
1075     OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_FROM);
1076     osd_op.op.copy_from.snapid = snapid;
1077     osd_op.op.copy_from.src_version = src_version;
1078     osd_op.op.copy_from.flags = flags;
1079     osd_op.op.copy_from.src_fadvise_flags = src_fadvise_flags;
1080     ::encode(src, osd_op.indata);
1081     ::encode(src_oloc, osd_op.indata);
1082   }
1083
1084   /**
1085    * writeback content to backing tier
1086    *
1087    * If object is marked dirty in the cache tier, write back content
1088    * to backing tier. If the object is clean this is a no-op.
1089    *
1090    * If writeback races with an update, the update will block.
1091    *
1092    * use with IGNORE_CACHE to avoid triggering promote.
1093    */
1094   void cache_flush() {
1095     add_op(CEPH_OSD_OP_CACHE_FLUSH);
1096   }
1097
1098   /**
1099    * writeback content to backing tier
1100    *
1101    * If object is marked dirty in the cache tier, write back content
1102    * to backing tier. If the object is clean this is a no-op.
1103    *
1104    * If writeback races with an update, return EAGAIN.  Requires that
1105    * the SKIPRWLOCKS flag be set.
1106    *
1107    * use with IGNORE_CACHE to avoid triggering promote.
1108    */
1109   void cache_try_flush() {
1110     add_op(CEPH_OSD_OP_CACHE_TRY_FLUSH);
1111   }
1112
1113   /**
1114    * evict object from cache tier
1115    *
1116    * If object is marked clean, remove the object from the cache tier.
1117    * Otherwise, return EBUSY.
1118    *
1119    * use with IGNORE_CACHE to avoid triggering promote.
1120    */
1121   void cache_evict() {
1122     add_op(CEPH_OSD_OP_CACHE_EVICT);
1123   }
1124
1125   /*
1126    * Extensible tier
1127    */
1128   void set_redirect(object_t tgt, snapid_t snapid, object_locator_t tgt_oloc, 
1129                     version_t tgt_version) {
1130     OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_REDIRECT);
1131     osd_op.op.copy_from.snapid = snapid;
1132     osd_op.op.copy_from.src_version = tgt_version;
1133     ::encode(tgt, osd_op.indata);
1134     ::encode(tgt_oloc, osd_op.indata);
1135   }
1136
1137   void set_alloc_hint(uint64_t expected_object_size,
1138                       uint64_t expected_write_size,
1139                       uint32_t flags) {
1140     add_alloc_hint(CEPH_OSD_OP_SETALLOCHINT, expected_object_size,
1141                    expected_write_size, flags);
1142
1143     // CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
1144     // not worth a feature bit.  Set FAILOK per-op flag to make
1145     // sure older osds don't trip over an unsupported opcode.
1146     set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
1147   }
1148
1149   void dup(vector<OSDOp>& sops) {
1150     ops = sops;
1151     out_bl.resize(sops.size());
1152     out_handler.resize(sops.size());
1153     out_rval.resize(sops.size());
1154     for (uint32_t i = 0; i < sops.size(); i++) {
1155       out_bl[i] = &sops[i].outdata;
1156       out_handler[i] = NULL;
1157       out_rval[i] = &sops[i].rval;
1158     }
1159   }
1160
1161   /**
1162    * Pin/unpin an object in cache tier
1163    */
1164   void cache_pin() {
1165     add_op(CEPH_OSD_OP_CACHE_PIN);
1166   }
1167
1168   void cache_unpin() {
1169     add_op(CEPH_OSD_OP_CACHE_UNPIN);
1170   }
1171 };
1172
1173
1174 // ----------------
1175
1176
1177 class Objecter : public md_config_obs_t, public Dispatcher {
1178 public:
1179   // config observer bits
1180   const char** get_tracked_conf_keys() const override;
1181   void handle_conf_change(const struct md_config_t *conf,
1182                           const std::set <std::string> &changed) override;
1183
1184 public:
1185   Messenger *messenger;
1186   MonClient *monc;
1187   Finisher *finisher;
1188   ZTracer::Endpoint trace_endpoint;
1189 private:
1190   OSDMap    *osdmap;
1191 public:
1192   using Dispatcher::cct;
1193   std::multimap<string,string> crush_location;
1194
1195   std::atomic<bool> initialized{false};
1196
1197 private:
1198   std::atomic<uint64_t> last_tid{0};
1199   std::atomic<unsigned> inflight_ops{0};
1200   std::atomic<int> client_inc{-1};
1201   uint64_t max_linger_id;
1202   std::atomic<unsigned> num_in_flight{0};
1203   std::atomic<int> global_op_flags{0}; // flags which are applied to each IO op
1204   bool keep_balanced_budget;
1205   bool honor_osdmap_full;
1206   bool osdmap_full_try;
1207
1208   // If this is true, accumulate a set of blacklisted entities
1209   // to be drained by consume_blacklist_events.
1210   bool blacklist_events_enabled;
1211   std::set<entity_addr_t> blacklist_events;
1212
1213 public:
1214   void maybe_request_map();
1215
1216   void enable_blacklist_events();
1217 private:
1218
1219   void _maybe_request_map();
1220
1221   version_t last_seen_osdmap_version;
1222   version_t last_seen_pgmap_version;
1223
1224   mutable boost::shared_mutex rwlock;
1225   using lock_guard = std::unique_lock<decltype(rwlock)>;
1226   using unique_lock = std::unique_lock<decltype(rwlock)>;
1227   using shared_lock = boost::shared_lock<decltype(rwlock)>;
1228   using shunique_lock = ceph::shunique_lock<decltype(rwlock)>;
1229   ceph::timer<ceph::mono_clock> timer;
1230
1231   PerfCounters *logger;
1232
1233   uint64_t tick_event;
1234
1235   void start_tick();
1236   void tick();
1237   void update_crush_location();
1238
1239   class RequestStateHook;
1240
1241   RequestStateHook *m_request_state_hook;
1242
1243 public:
1244   /*** track pending operations ***/
1245   // read
1246  public:
1247
1248   struct OSDSession;
1249
1250   struct op_target_t {
1251     int flags = 0;
1252
1253     epoch_t epoch = 0;  ///< latest epoch we calculated the mapping
1254
1255     object_t base_oid;
1256     object_locator_t base_oloc;
1257     object_t target_oid;
1258     object_locator_t target_oloc;
1259
1260     ///< true if we are directed at base_pgid, not base_oid
1261     bool precalc_pgid = false;
1262
1263     ///< true if we have ever mapped to a valid pool
1264     bool pool_ever_existed = false;
1265
1266     ///< explcit pg target, if any
1267     pg_t base_pgid;
1268
1269     pg_t pgid; ///< last (raw) pg we mapped to
1270     spg_t actual_pgid; ///< last (actual) spg_t we mapped to
1271     unsigned pg_num = 0; ///< last pg_num we mapped to
1272     unsigned pg_num_mask = 0; ///< last pg_num_mask we mapped to
1273     vector<int> up; ///< set of up osds for last pg we mapped to
1274     vector<int> acting; ///< set of acting osds for last pg we mapped to
1275     int up_primary = -1; ///< last up_primary we mapped to
1276     int acting_primary = -1;  ///< last acting_primary we mapped to
1277     int size = -1; ///< the size of the pool when were were last mapped
1278     int min_size = -1; ///< the min size of the pool when were were last mapped
1279     bool sort_bitwise = false; ///< whether the hobject_t sort order is bitwise
1280     bool recovery_deletes = false; ///< whether the deletes are performed during recovery instead of peering
1281
1282     bool used_replica = false;
1283     bool paused = false;
1284
1285     int osd = -1;      ///< the final target osd, or -1
1286
1287     epoch_t last_force_resend = 0;
1288
1289     op_target_t(object_t oid, object_locator_t oloc, int flags)
1290       : flags(flags),
1291         base_oid(oid),
1292         base_oloc(oloc)
1293       {}
1294
1295     op_target_t(pg_t pgid)
1296       : base_oloc(pgid.pool(), pgid.ps()),
1297         precalc_pgid(true),
1298         base_pgid(pgid)
1299       {}
1300
1301     op_target_t() = default;
1302
1303     hobject_t get_hobj() {
1304       return hobject_t(target_oid,
1305                        target_oloc.key,
1306                        CEPH_NOSNAP,
1307                        target_oloc.hash >= 0 ? target_oloc.hash : pgid.ps(),
1308                        target_oloc.pool,
1309                        target_oloc.nspace);
1310     }
1311
1312     bool contained_by(const hobject_t& begin, const hobject_t& end) {
1313       hobject_t h = get_hobj();
1314       int r = cmp(h, begin);
1315       return r == 0 || (r > 0 && h < end);
1316     }
1317
1318     void dump(Formatter *f) const;
1319   };
1320
1321   struct Op : public RefCountedObject {
1322     OSDSession *session;
1323     int incarnation;
1324
1325     op_target_t target;
1326
1327     ConnectionRef con;  // for rx buffer only
1328     uint64_t features;  // explicitly specified op features
1329
1330     vector<OSDOp> ops;
1331
1332     snapid_t snapid;
1333     SnapContext snapc;
1334     ceph::real_time mtime;
1335
1336     bufferlist *outbl;
1337     vector<bufferlist*> out_bl;
1338     vector<Context*> out_handler;
1339     vector<int*> out_rval;
1340
1341     int priority;
1342     Context *onfinish;
1343     uint64_t ontimeout;
1344
1345     ceph_tid_t tid;
1346     int attempts;
1347
1348     version_t *objver;
1349     epoch_t *reply_epoch;
1350
1351     ceph::mono_time stamp;
1352
1353     epoch_t map_dne_bound;
1354
1355     bool budgeted;
1356
1357     /// true if we should resend this message on failure
1358     bool should_resend;
1359
1360     /// true if the throttle budget is get/put on a series of OPs,
1361     /// instead of per OP basis, when this flag is set, the budget is
1362     /// acquired before sending the very first OP of the series and
1363     /// released upon receiving the last OP reply.
1364     bool ctx_budgeted;
1365
1366     int *data_offset;
1367
1368     osd_reqid_t reqid; // explicitly setting reqid
1369     ZTracer::Trace trace;
1370
1371     Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
1372        int f, Context *fin, version_t *ov, int *offset = NULL,
1373        ZTracer::Trace *parent_trace = nullptr) :
1374       session(NULL), incarnation(0),
1375       target(o, ol, f),
1376       con(NULL),
1377       features(CEPH_FEATURES_SUPPORTED_DEFAULT),
1378       snapid(CEPH_NOSNAP),
1379       outbl(NULL),
1380       priority(0),
1381       onfinish(fin),
1382       ontimeout(0),
1383       tid(0),
1384       attempts(0),
1385       objver(ov),
1386       reply_epoch(NULL),
1387       map_dne_bound(0),
1388       budgeted(false),
1389       should_resend(true),
1390       ctx_budgeted(false),
1391       data_offset(offset) {
1392       ops.swap(op);
1393
1394       /* initialize out_* to match op vector */
1395       out_bl.resize(ops.size());
1396       out_rval.resize(ops.size());
1397       out_handler.resize(ops.size());
1398       for (unsigned i = 0; i < ops.size(); i++) {
1399         out_bl[i] = NULL;
1400         out_handler[i] = NULL;
1401         out_rval[i] = NULL;
1402       }
1403
1404       if (target.base_oloc.key == o)
1405         target.base_oloc.key.clear();
1406
1407       if (parent_trace && parent_trace->valid()) {
1408         trace.init("op", nullptr, parent_trace);
1409         trace.event("start");
1410       }
1411     }
1412
1413     bool operator<(const Op& other) const {
1414       return tid < other.tid;
1415     }
1416
1417     bool respects_full() const {
1418       return
1419         (target.flags & (CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_RWORDERED)) &&
1420         !(target.flags & (CEPH_OSD_FLAG_FULL_TRY | CEPH_OSD_FLAG_FULL_FORCE));
1421     }
1422
1423   private:
1424     ~Op() override {
1425       while (!out_handler.empty()) {
1426         delete out_handler.back();
1427         out_handler.pop_back();
1428       }
1429       trace.event("finish");
1430     }
1431   };
1432
1433   struct C_Op_Map_Latest : public Context {
1434     Objecter *objecter;
1435     ceph_tid_t tid;
1436     version_t latest;
1437     C_Op_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t),
1438                                                  latest(0) {}
1439     void finish(int r) override;
1440   };
1441
1442   struct C_Command_Map_Latest : public Context {
1443     Objecter *objecter;
1444     uint64_t tid;
1445     version_t latest;
1446     C_Command_Map_Latest(Objecter *o, ceph_tid_t t) :  objecter(o), tid(t),
1447                                                        latest(0) {}
1448     void finish(int r) override;
1449   };
1450
1451   struct C_Stat : public Context {
1452     bufferlist bl;
1453     uint64_t *psize;
1454     ceph::real_time *pmtime;
1455     Context *fin;
1456     C_Stat(uint64_t *ps, ceph::real_time *pm, Context *c) :
1457       psize(ps), pmtime(pm), fin(c) {}
1458     void finish(int r) override {
1459       if (r >= 0) {
1460         bufferlist::iterator p = bl.begin();
1461         uint64_t s;
1462         ceph::real_time m;
1463         ::decode(s, p);
1464         ::decode(m, p);
1465         if (psize)
1466           *psize = s;
1467         if (pmtime)
1468           *pmtime = m;
1469       }
1470       fin->complete(r);
1471     }
1472   };
1473
1474   struct C_GetAttrs : public Context {
1475     bufferlist bl;
1476     map<string,bufferlist>& attrset;
1477     Context *fin;
1478     C_GetAttrs(map<string, bufferlist>& set, Context *c) : attrset(set),
1479                                                            fin(c) {}
1480     void finish(int r) override {
1481       if (r >= 0) {
1482         bufferlist::iterator p = bl.begin();
1483         ::decode(attrset, p);
1484       }
1485       fin->complete(r);
1486     }
1487   };
1488
1489
1490   // Pools and statistics
1491   struct NListContext {
1492     collection_list_handle_t pos;
1493
1494     // these are for !sortbitwise compat only
1495     int current_pg = 0;
1496     int starting_pg_num = 0;
1497     bool sort_bitwise = false;
1498
1499     bool at_end_of_pool = false; ///< publicly visible end flag
1500
1501     int64_t pool_id = -1;
1502     int pool_snap_seq = 0;
1503     uint64_t max_entries = 0;
1504     string nspace;
1505
1506     bufferlist bl;   // raw data read to here
1507     std::list<librados::ListObjectImpl> list;
1508
1509     bufferlist filter;
1510
1511     bufferlist extra_info;
1512
1513     // The budget associated with this context, once it is set (>= 0),
1514     // the budget is not get/released on OP basis, instead the budget
1515     // is acquired before sending the first OP and released upon receiving
1516     // the last op reply.
1517     int ctx_budget = -1;
1518
1519     bool at_end() const {
1520       return at_end_of_pool;
1521     }
1522
1523     uint32_t get_pg_hash_position() const {
1524       return pos.get_hash();
1525     }
1526   };
1527
1528   struct C_NList : public Context {
1529     NListContext *list_context;
1530     Context *final_finish;
1531     Objecter *objecter;
1532     epoch_t epoch;
1533     C_NList(NListContext *lc, Context * finish, Objecter *ob) :
1534       list_context(lc), final_finish(finish), objecter(ob), epoch(0) {}
1535     void finish(int r) override {
1536       if (r >= 0) {
1537         objecter->_nlist_reply(list_context, r, final_finish, epoch);
1538       } else {
1539         final_finish->complete(r);
1540       }
1541     }
1542   };
1543
1544   struct PoolStatOp {
1545     ceph_tid_t tid;
1546     list<string> pools;
1547
1548     map<string,pool_stat_t> *pool_stats;
1549     Context *onfinish;
1550     uint64_t ontimeout;
1551
1552     ceph::mono_time last_submit;
1553   };
1554
1555   struct StatfsOp {
1556     ceph_tid_t tid;
1557     struct ceph_statfs *stats;
1558     boost::optional<int64_t> data_pool;
1559     Context *onfinish;
1560     uint64_t ontimeout;
1561
1562     ceph::mono_time last_submit;
1563   };
1564
1565   struct PoolOp {
1566     ceph_tid_t tid;
1567     int64_t pool;
1568     string name;
1569     Context *onfinish;
1570     uint64_t ontimeout;
1571     int pool_op;
1572     uint64_t auid;
1573     int16_t crush_rule;
1574     snapid_t snapid;
1575     bufferlist *blp;
1576
1577     ceph::mono_time last_submit;
1578     PoolOp() : tid(0), pool(0), onfinish(NULL), ontimeout(0), pool_op(0),
1579                auid(0), crush_rule(0), snapid(0), blp(NULL) {}
1580   };
1581
1582   // -- osd commands --
1583   struct CommandOp : public RefCountedObject {
1584     OSDSession *session = nullptr;
1585     ceph_tid_t tid = 0;
1586     vector<string> cmd;
1587     bufferlist inbl;
1588     bufferlist *poutbl = nullptr;
1589     string *prs = nullptr;
1590
1591     // target_osd == -1 means target_pg is valid
1592     const int target_osd = -1;
1593     const pg_t target_pg;
1594
1595     op_target_t target;
1596
1597     epoch_t map_dne_bound = 0;
1598     int map_check_error = 0; // error to return if map check fails
1599     const char *map_check_error_str = nullptr;
1600
1601     Context *onfinish = nullptr;
1602     uint64_t ontimeout = 0;
1603     ceph::mono_time last_submit;
1604
1605     CommandOp(
1606       int target_osd,
1607       const vector<string> &cmd,
1608       bufferlist inbl,
1609       bufferlist *poutbl,
1610       string *prs,
1611       Context *onfinish)
1612       : cmd(cmd),
1613         inbl(inbl),
1614         poutbl(poutbl),
1615         prs(prs),
1616         target_osd(target_osd),
1617         onfinish(onfinish) {}
1618
1619     CommandOp(
1620       pg_t pgid,
1621       const vector<string> &cmd,
1622       bufferlist inbl,
1623       bufferlist *poutbl,
1624       string *prs,
1625       Context *onfinish)
1626       : cmd(cmd),
1627         inbl(inbl),
1628         poutbl(poutbl),
1629         prs(prs),
1630         target_pg(pgid),
1631         target(pgid),
1632         onfinish(onfinish) {}
1633
1634   };
1635
1636   void submit_command(CommandOp *c, ceph_tid_t *ptid);
1637   int _calc_command_target(CommandOp *c, shunique_lock &sul);
1638   void _assign_command_session(CommandOp *c, shunique_lock &sul);
1639   void _send_command(CommandOp *c);
1640   int command_op_cancel(OSDSession *s, ceph_tid_t tid, int r);
1641   void _finish_command(CommandOp *c, int r, string rs);
1642   void handle_command_reply(MCommandReply *m);
1643
1644
1645   // -- lingering ops --
1646
1647   struct WatchContext {
1648     // this simply mirrors librados WatchCtx2
1649     virtual void handle_notify(uint64_t notify_id,
1650                                uint64_t cookie,
1651                                uint64_t notifier_id,
1652                                bufferlist& bl) = 0;
1653     virtual void handle_error(uint64_t cookie, int err) = 0;
1654     virtual ~WatchContext() {}
1655   };
1656
1657   struct LingerOp : public RefCountedObject {
1658     uint64_t linger_id;
1659
1660     op_target_t target;
1661
1662     snapid_t snap;
1663     SnapContext snapc;
1664     ceph::real_time mtime;
1665
1666     vector<OSDOp> ops;
1667     bufferlist inbl;
1668     bufferlist *poutbl;
1669     version_t *pobjver;
1670
1671     bool is_watch;
1672     ceph::mono_time watch_valid_thru; ///< send time for last acked ping
1673     int last_error;  ///< error from last failed ping|reconnect, if any
1674     boost::shared_mutex watch_lock;
1675     using lock_guard = std::unique_lock<decltype(watch_lock)>;
1676     using unique_lock = std::unique_lock<decltype(watch_lock)>;
1677     using shared_lock = boost::shared_lock<decltype(watch_lock)>;
1678     using shunique_lock = ceph::shunique_lock<decltype(watch_lock)>;
1679
1680     // queue of pending async operations, with the timestamp of
1681     // when they were queued.
1682     list<ceph::mono_time> watch_pending_async;
1683
1684     uint32_t register_gen;
1685     bool registered;
1686     bool canceled;
1687     Context *on_reg_commit;
1688
1689     // we trigger these from an async finisher
1690     Context *on_notify_finish;
1691     bufferlist *notify_result_bl;
1692     uint64_t notify_id;
1693
1694     WatchContext *watch_context;
1695
1696     OSDSession *session;
1697
1698     ceph_tid_t register_tid;
1699     ceph_tid_t ping_tid;
1700     epoch_t map_dne_bound;
1701
1702     void _queued_async() {
1703       // watch_lock ust be locked unique
1704       watch_pending_async.push_back(ceph::mono_clock::now());
1705     }
1706     void finished_async() {
1707       unique_lock l(watch_lock);
1708       assert(!watch_pending_async.empty());
1709       watch_pending_async.pop_front();
1710     }
1711
1712     LingerOp() : linger_id(0),
1713                  target(object_t(), object_locator_t(), 0),
1714                  snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL),
1715                  is_watch(false), last_error(0),
1716                  register_gen(0),
1717                  registered(false),
1718                  canceled(false),
1719                  on_reg_commit(NULL),
1720                  on_notify_finish(NULL),
1721                  notify_result_bl(NULL),
1722                  notify_id(0),
1723                  watch_context(NULL),
1724                  session(NULL),
1725                  register_tid(0),
1726                  ping_tid(0),
1727                  map_dne_bound(0) {}
1728
1729     // no copy!
1730     const LingerOp &operator=(const LingerOp& r);
1731     LingerOp(const LingerOp& o);
1732
1733     uint64_t get_cookie() {
1734       return reinterpret_cast<uint64_t>(this);
1735     }
1736
1737   private:
1738     ~LingerOp() override {
1739       delete watch_context;
1740     }
1741   };
1742
1743   struct C_Linger_Commit : public Context {
1744     Objecter *objecter;
1745     LingerOp *info;
1746     bufferlist outbl;  // used for notify only
1747     C_Linger_Commit(Objecter *o, LingerOp *l) : objecter(o), info(l) {
1748       info->get();
1749     }
1750     ~C_Linger_Commit() override {
1751       info->put();
1752     }
1753     void finish(int r) override {
1754       objecter->_linger_commit(info, r, outbl);
1755     }
1756   };
1757
1758   struct C_Linger_Reconnect : public Context {
1759     Objecter *objecter;
1760     LingerOp *info;
1761     C_Linger_Reconnect(Objecter *o, LingerOp *l) : objecter(o), info(l) {
1762       info->get();
1763     }
1764     ~C_Linger_Reconnect() override {
1765       info->put();
1766     }
1767     void finish(int r) override {
1768       objecter->_linger_reconnect(info, r);
1769     }
1770   };
1771
1772   struct C_Linger_Ping : public Context {
1773     Objecter *objecter;
1774     LingerOp *info;
1775     ceph::mono_time sent;
1776     uint32_t register_gen;
1777     C_Linger_Ping(Objecter *o, LingerOp *l)
1778       : objecter(o), info(l), register_gen(info->register_gen) {
1779       info->get();
1780     }
1781     ~C_Linger_Ping() override {
1782       info->put();
1783     }
1784     void finish(int r) override {
1785       objecter->_linger_ping(info, r, sent, register_gen);
1786     }
1787   };
1788
1789   struct C_Linger_Map_Latest : public Context {
1790     Objecter *objecter;
1791     uint64_t linger_id;
1792     version_t latest;
1793     C_Linger_Map_Latest(Objecter *o, uint64_t id) :
1794       objecter(o), linger_id(id), latest(0) {}
1795     void finish(int r) override;
1796   };
1797
1798   // -- osd sessions --
1799   struct OSDBackoff {
1800     spg_t pgid;
1801     uint64_t id;
1802     hobject_t begin, end;
1803   };
1804
1805   struct OSDSession : public RefCountedObject {
1806     boost::shared_mutex lock;
1807     using lock_guard = std::lock_guard<decltype(lock)>;
1808     using unique_lock = std::unique_lock<decltype(lock)>;
1809     using shared_lock = boost::shared_lock<decltype(lock)>;
1810     using shunique_lock = ceph::shunique_lock<decltype(lock)>;
1811
1812     // pending ops
1813     map<ceph_tid_t,Op*> ops;
1814     map<uint64_t, LingerOp*> linger_ops;
1815     map<ceph_tid_t,CommandOp*> command_ops;
1816
1817     // backoffs
1818     map<spg_t,map<hobject_t,OSDBackoff>> backoffs;
1819     map<uint64_t,OSDBackoff*> backoffs_by_id;
1820
1821     int osd;
1822     int incarnation;
1823     ConnectionRef con;
1824     int num_locks;
1825     std::unique_ptr<std::mutex[]> completion_locks;
1826     using unique_completion_lock = std::unique_lock<
1827       decltype(completion_locks)::element_type>;
1828
1829
1830     OSDSession(CephContext *cct, int o) :
1831       osd(o), incarnation(0), con(NULL),
1832       num_locks(cct->_conf->objecter_completion_locks_per_session),
1833       completion_locks(new std::mutex[num_locks]) {}
1834
1835     ~OSDSession() override;
1836
1837     bool is_homeless() { return (osd == -1); }
1838
1839     unique_completion_lock get_lock(object_t& oid);
1840   };
1841   map<int,OSDSession*> osd_sessions;
1842
1843   bool osdmap_full_flag() const;
1844   bool osdmap_pool_full(const int64_t pool_id) const;
1845
1846  private:
1847
1848   /**
1849    * Test pg_pool_t::FLAG_FULL on a pool
1850    *
1851    * @return true if the pool exists and has the flag set, or
1852    *         the global full flag is set, else false
1853    */
1854   bool _osdmap_pool_full(const int64_t pool_id) const;
1855   bool _osdmap_pool_full(const pg_pool_t &p) const;
1856   void update_pool_full_map(map<int64_t, bool>& pool_full_map);
1857
1858   map<uint64_t, LingerOp*> linger_ops;
1859   // we use this just to confirm a cookie is valid before dereferencing the ptr
1860   set<LingerOp*> linger_ops_set;
1861
1862   map<ceph_tid_t,PoolStatOp*> poolstat_ops;
1863   map<ceph_tid_t,StatfsOp*> statfs_ops;
1864   map<ceph_tid_t,PoolOp*> pool_ops;
1865   std::atomic<unsigned> num_homeless_ops{0};
1866
1867   OSDSession *homeless_session;
1868
1869   // ops waiting for an osdmap with a new pool or confirmation that
1870   // the pool does not exist (may be expanded to other uses later)
1871   map<uint64_t, LingerOp*> check_latest_map_lingers;
1872   map<ceph_tid_t, Op*> check_latest_map_ops;
1873   map<ceph_tid_t, CommandOp*> check_latest_map_commands;
1874
1875   map<epoch_t,list< pair<Context*, int> > > waiting_for_map;
1876
1877   ceph::timespan mon_timeout;
1878   ceph::timespan osd_timeout;
1879
1880   MOSDOp *_prepare_osd_op(Op *op);
1881   void _send_op(Op *op, MOSDOp *m = NULL);
1882   void _send_op_account(Op *op);
1883   void _cancel_linger_op(Op *op);
1884   void finish_op(OSDSession *session, ceph_tid_t tid);
1885   void _finish_op(Op *op, int r);
1886   static bool is_pg_changed(
1887     int oldprimary,
1888     const vector<int>& oldacting,
1889     int newprimary,
1890     const vector<int>& newacting,
1891     bool any_change=false);
1892   enum recalc_op_target_result {
1893     RECALC_OP_TARGET_NO_ACTION = 0,
1894     RECALC_OP_TARGET_NEED_RESEND,
1895     RECALC_OP_TARGET_POOL_DNE,
1896     RECALC_OP_TARGET_OSD_DNE,
1897     RECALC_OP_TARGET_OSD_DOWN,
1898   };
1899   bool _osdmap_full_flag() const;
1900   bool _osdmap_has_pool_full() const;
1901
1902   bool target_should_be_paused(op_target_t *op);
1903   int _calc_target(op_target_t *t, Connection *con,
1904                    bool any_change = false);
1905   int _map_session(op_target_t *op, OSDSession **s,
1906                    shunique_lock& lc);
1907
1908   void _session_op_assign(OSDSession *s, Op *op);
1909   void _session_op_remove(OSDSession *s, Op *op);
1910   void _session_linger_op_assign(OSDSession *to, LingerOp *op);
1911   void _session_linger_op_remove(OSDSession *from, LingerOp *op);
1912   void _session_command_op_assign(OSDSession *to, CommandOp *op);
1913   void _session_command_op_remove(OSDSession *from, CommandOp *op);
1914
1915   int _assign_op_target_session(Op *op, shunique_lock& lc,
1916                                 bool src_session_locked,
1917                                 bool dst_session_locked);
1918   int _recalc_linger_op_target(LingerOp *op, shunique_lock& lc);
1919
1920   void _linger_submit(LingerOp *info, shunique_lock& sul);
1921   void _send_linger(LingerOp *info, shunique_lock& sul);
1922   void _linger_commit(LingerOp *info, int r, bufferlist& outbl);
1923   void _linger_reconnect(LingerOp *info, int r);
1924   void _send_linger_ping(LingerOp *info);
1925   void _linger_ping(LingerOp *info, int r, ceph::mono_time sent,
1926                     uint32_t register_gen);
1927   int _normalize_watch_error(int r);
1928
1929   friend class C_DoWatchError;
1930 public:
1931   void linger_callback_flush(Context *ctx) {
1932     finisher->queue(ctx);
1933   }
1934
1935 private:
1936   void _check_op_pool_dne(Op *op, unique_lock *sl);
1937   void _send_op_map_check(Op *op);
1938   void _op_cancel_map_check(Op *op);
1939   void _check_linger_pool_dne(LingerOp *op, bool *need_unregister);
1940   void _send_linger_map_check(LingerOp *op);
1941   void _linger_cancel_map_check(LingerOp *op);
1942   void _check_command_map_dne(CommandOp *op);
1943   void _send_command_map_check(CommandOp *op);
1944   void _command_cancel_map_check(CommandOp *op);
1945
1946   void kick_requests(OSDSession *session);
1947   void _kick_requests(OSDSession *session, map<uint64_t, LingerOp *>& lresend);
1948   void _linger_ops_resend(map<uint64_t, LingerOp *>& lresend, unique_lock& ul);
1949
1950   int _get_session(int osd, OSDSession **session, shunique_lock& sul);
1951   void put_session(OSDSession *s);
1952   void get_session(OSDSession *s);
1953   void _reopen_session(OSDSession *session);
1954   void close_session(OSDSession *session);
1955
1956   void _nlist_reply(NListContext *list_context, int r, Context *final_finish,
1957                    epoch_t reply_epoch);
1958
1959   void resend_mon_ops();
1960
1961   /**
1962    * handle a budget for in-flight ops
1963    * budget is taken whenever an op goes into the ops map
1964    * and returned whenever an op is removed from the map
1965    * If throttle_op needs to throttle it will unlock client_lock.
1966    */
1967   int calc_op_budget(Op *op);
1968   void _throttle_op(Op *op, shunique_lock& sul, int op_size = 0);
1969   int _take_op_budget(Op *op, shunique_lock& sul) {
1970     assert(sul && sul.mutex() == &rwlock);
1971     int op_budget = calc_op_budget(op);
1972     if (keep_balanced_budget) {
1973       _throttle_op(op, sul, op_budget);
1974     } else {
1975       op_throttle_bytes.take(op_budget);
1976       op_throttle_ops.take(1);
1977     }
1978     op->budgeted = true;
1979     return op_budget;
1980   }
1981   void put_op_budget_bytes(int op_budget) {
1982     assert(op_budget >= 0);
1983     op_throttle_bytes.put(op_budget);
1984     op_throttle_ops.put(1);
1985   }
1986   void put_op_budget(Op *op) {
1987     assert(op->budgeted);
1988     int op_budget = calc_op_budget(op);
1989     put_op_budget_bytes(op_budget);
1990   }
1991   void put_nlist_context_budget(NListContext *list_context);
1992   Throttle op_throttle_bytes, op_throttle_ops;
1993
1994  public:
1995   Objecter(CephContext *cct_, Messenger *m, MonClient *mc,
1996            Finisher *fin,
1997            double mon_timeout,
1998            double osd_timeout) :
1999     Dispatcher(cct_), messenger(m), monc(mc), finisher(fin),
2000     trace_endpoint("0.0.0.0", 0, "Objecter"),
2001     osdmap(new OSDMap),
2002     max_linger_id(0),
2003     keep_balanced_budget(false), honor_osdmap_full(true), osdmap_full_try(false),
2004     blacklist_events_enabled(false),
2005     last_seen_osdmap_version(0), last_seen_pgmap_version(0),
2006     logger(NULL), tick_event(0), m_request_state_hook(NULL),
2007     homeless_session(new OSDSession(cct, -1)),
2008     mon_timeout(ceph::make_timespan(mon_timeout)),
2009     osd_timeout(ceph::make_timespan(osd_timeout)),
2010     op_throttle_bytes(cct, "objecter_bytes",
2011                       cct->_conf->objecter_inflight_op_bytes),
2012     op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops),
2013     epoch_barrier(0),
2014     retry_writes_after_first_reply(cct->_conf->objecter_retry_writes_after_first_reply)
2015   { }
2016   ~Objecter() override;
2017
2018   void init();
2019   void start(const OSDMap *o = nullptr);
2020   void shutdown();
2021
2022   // These two templates replace osdmap_(get)|(put)_read. Simply wrap
2023   // whatever functionality you want to use the OSDMap in a lambda like:
2024   //
2025   // with_osdmap([](const OSDMap& o) { o.do_stuff(); });
2026   //
2027   // or
2028   //
2029   // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); });
2030   //
2031   // Do not call into something that will try to lock the OSDMap from
2032   // here or you will have great woe and misery.
2033
2034   template<typename Callback, typename...Args>
2035   auto with_osdmap(Callback&& cb, Args&&... args) const ->
2036     decltype(cb(*osdmap, std::forward<Args>(args)...)) {
2037     shared_lock l(rwlock);
2038     return std::forward<Callback>(cb)(*osdmap, std::forward<Args>(args)...);
2039   }
2040
2041
2042   /**
2043    * Tell the objecter to throttle outgoing ops according to its
2044    * budget (in _conf). If you do this, ops can block, in
2045    * which case it will unlock client_lock and sleep until
2046    * incoming messages reduce the used budget low enough for
2047    * the ops to continue going; then it will lock client_lock again.
2048    */
2049   void set_balanced_budget() { keep_balanced_budget = true; }
2050   void unset_balanced_budget() { keep_balanced_budget = false; }
2051
2052   void set_honor_osdmap_full() { honor_osdmap_full = true; }
2053   void unset_honor_osdmap_full() { honor_osdmap_full = false; }
2054
2055   void set_osdmap_full_try() { osdmap_full_try = true; }
2056   void unset_osdmap_full_try() { osdmap_full_try = false; }
2057
2058   void _scan_requests(OSDSession *s,
2059                       bool force_resend,
2060                       bool cluster_full,
2061                       map<int64_t, bool> *pool_full_map,
2062                       map<ceph_tid_t, Op*>& need_resend,
2063                       list<LingerOp*>& need_resend_linger,
2064                       map<ceph_tid_t, CommandOp*>& need_resend_command,
2065                       shunique_lock& sul);
2066
2067   int64_t get_object_hash_position(int64_t pool, const string& key,
2068                                    const string& ns);
2069   int64_t get_object_pg_hash_position(int64_t pool, const string& key,
2070                                       const string& ns);
2071
2072   // messages
2073  public:
2074   bool ms_dispatch(Message *m) override;
2075   bool ms_can_fast_dispatch_any() const override {
2076     return true;
2077   }
2078   bool ms_can_fast_dispatch(const Message *m) const override {
2079     switch (m->get_type()) {
2080     case CEPH_MSG_OSD_OPREPLY:
2081     case CEPH_MSG_WATCH_NOTIFY:
2082       return true;
2083     default:
2084       return false;
2085     }
2086   }
2087   void ms_fast_dispatch(Message *m) override {
2088     if (!ms_dispatch(m)) {
2089       m->put();
2090     }
2091   }
2092
2093   void handle_osd_op_reply(class MOSDOpReply *m);
2094   void handle_osd_backoff(class MOSDBackoff *m);
2095   void handle_watch_notify(class MWatchNotify *m);
2096   void handle_osd_map(class MOSDMap *m);
2097   void wait_for_osd_map();
2098
2099   /**
2100    * Get list of entities blacklisted since this was last called,
2101    * and reset the list.
2102    *
2103    * Uses a std::set because typical use case is to compare some
2104    * other list of clients to see which overlap with the blacklisted
2105    * addrs.
2106    *
2107    */
2108   void consume_blacklist_events(std::set<entity_addr_t> *events);
2109
2110   int pool_snap_by_name(int64_t poolid,
2111                         const char *snap_name,
2112                         snapid_t *snap) const;
2113   int pool_snap_get_info(int64_t poolid, snapid_t snap,
2114                          pool_snap_info_t *info) const;
2115   int pool_snap_list(int64_t poolid, vector<uint64_t> *snaps);
2116 private:
2117
2118   void emit_blacklist_events(const OSDMap::Incremental &inc);
2119   void emit_blacklist_events(const OSDMap &old_osd_map,
2120                              const OSDMap &new_osd_map);
2121
2122   // low-level
2123   void _op_submit(Op *op, shunique_lock& lc, ceph_tid_t *ptid);
2124   void _op_submit_with_budget(Op *op, shunique_lock& lc,
2125                               ceph_tid_t *ptid,
2126                               int *ctx_budget = NULL);
2127   inline void unregister_op(Op *op);
2128
2129   // public interface
2130 public:
2131   void op_submit(Op *op, ceph_tid_t *ptid = NULL, int *ctx_budget = NULL);
2132   bool is_active() {
2133     shared_lock l(rwlock);
2134     return !((!inflight_ops) && linger_ops.empty() &&
2135              poolstat_ops.empty() && statfs_ops.empty());
2136   }
2137
2138   /**
2139    * Output in-flight requests
2140    */
2141   void _dump_active(OSDSession *s);
2142   void _dump_active();
2143   void dump_active();
2144   void dump_requests(Formatter *fmt);
2145   void _dump_ops(const OSDSession *s, Formatter *fmt);
2146   void dump_ops(Formatter *fmt);
2147   void _dump_linger_ops(const OSDSession *s, Formatter *fmt);
2148   void dump_linger_ops(Formatter *fmt);
2149   void _dump_command_ops(const OSDSession *s, Formatter *fmt);
2150   void dump_command_ops(Formatter *fmt);
2151   void dump_pool_ops(Formatter *fmt) const;
2152   void dump_pool_stat_ops(Formatter *fmt) const;
2153   void dump_statfs_ops(Formatter *fmt) const;
2154
2155   int get_client_incarnation() const { return client_inc; }
2156   void set_client_incarnation(int inc) { client_inc = inc; }
2157
2158   bool have_map(epoch_t epoch);
2159   /// wait for epoch; true if we already have it
2160   bool wait_for_map(epoch_t epoch, Context *c, int err=0);
2161   void _wait_for_new_map(Context *c, epoch_t epoch, int err=0);
2162   void wait_for_latest_osdmap(Context *fin);
2163   void get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin);
2164   void _get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin);
2165
2166   /** Get the current set of global op flags */
2167   int get_global_op_flags() const { return global_op_flags; }
2168   /** Add a flag to the global op flags, not really atomic operation */
2169   void add_global_op_flags(int flag) {
2170     global_op_flags.fetch_or(flag);
2171   }
2172   /** Clear the passed flags from the global op flag set */
2173   void clear_global_op_flag(int flags) {
2174     global_op_flags.fetch_and(~flags);
2175   }
2176
2177   /// cancel an in-progress request with the given return code
2178 private:
2179   int op_cancel(OSDSession *s, ceph_tid_t tid, int r);
2180   int _op_cancel(ceph_tid_t tid, int r);
2181 public:
2182   int op_cancel(ceph_tid_t tid, int r);
2183
2184   /**
2185    * Any write op which is in progress at the start of this call shall no
2186    * longer be in progress when this call ends.  Operations started after the
2187    * start of this call may still be in progress when this call ends.
2188    *
2189    * @return the latest possible epoch in which a cancelled op could have
2190    *         existed, or -1 if nothing was cancelled.
2191    */
2192   epoch_t op_cancel_writes(int r, int64_t pool=-1);
2193
2194   // commands
2195   void osd_command(int osd, const std::vector<string>& cmd,
2196                   const bufferlist& inbl, ceph_tid_t *ptid,
2197                   bufferlist *poutbl, string *prs, Context *onfinish) {
2198     assert(osd >= 0);
2199     CommandOp *c = new CommandOp(
2200       osd,
2201       cmd,
2202       inbl,
2203       poutbl,
2204       prs,
2205       onfinish);
2206     submit_command(c, ptid);
2207   }
2208   void pg_command(pg_t pgid, const vector<string>& cmd,
2209                  const bufferlist& inbl, ceph_tid_t *ptid,
2210                  bufferlist *poutbl, string *prs, Context *onfinish) {
2211     CommandOp *c = new CommandOp(
2212       pgid,
2213       cmd,
2214       inbl,
2215       poutbl,
2216       prs,
2217       onfinish);
2218     submit_command(c, ptid);
2219   }
2220
2221   // mid-level helpers
2222   Op *prepare_mutate_op(
2223     const object_t& oid, const object_locator_t& oloc,
2224     ObjectOperation& op, const SnapContext& snapc,
2225     ceph::real_time mtime, int flags,
2226     Context *oncommit, version_t *objver = NULL,
2227     osd_reqid_t reqid = osd_reqid_t(),
2228     ZTracer::Trace *parent_trace = nullptr) {
2229     Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags |
2230                    CEPH_OSD_FLAG_WRITE, oncommit, objver, nullptr, parent_trace);
2231     o->priority = op.priority;
2232     o->mtime = mtime;
2233     o->snapc = snapc;
2234     o->out_rval.swap(op.out_rval);
2235     o->reqid = reqid;
2236     return o;
2237   }
2238   ceph_tid_t mutate(
2239     const object_t& oid, const object_locator_t& oloc,
2240     ObjectOperation& op, const SnapContext& snapc,
2241     ceph::real_time mtime, int flags,
2242     Context *oncommit, version_t *objver = NULL,
2243     osd_reqid_t reqid = osd_reqid_t()) {
2244     Op *o = prepare_mutate_op(oid, oloc, op, snapc, mtime, flags,
2245                               oncommit, objver, reqid);
2246     ceph_tid_t tid;
2247     op_submit(o, &tid);
2248     return tid;
2249   }
2250   Op *prepare_read_op(
2251     const object_t& oid, const object_locator_t& oloc,
2252     ObjectOperation& op,
2253     snapid_t snapid, bufferlist *pbl, int flags,
2254     Context *onack, version_t *objver = NULL,
2255     int *data_offset = NULL,
2256     uint64_t features = 0,
2257     ZTracer::Trace *parent_trace = nullptr) {
2258     Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags |
2259                    CEPH_OSD_FLAG_READ, onack, objver, data_offset, parent_trace);
2260     o->priority = op.priority;
2261     o->snapid = snapid;
2262     o->outbl = pbl;
2263     if (!o->outbl && op.size() == 1 && op.out_bl[0]->length())
2264         o->outbl = op.out_bl[0];
2265     o->out_bl.swap(op.out_bl);
2266     o->out_handler.swap(op.out_handler);
2267     o->out_rval.swap(op.out_rval);
2268     return o;
2269   }
2270   ceph_tid_t read(
2271     const object_t& oid, const object_locator_t& oloc,
2272     ObjectOperation& op,
2273     snapid_t snapid, bufferlist *pbl, int flags,
2274     Context *onack, version_t *objver = NULL,
2275     int *data_offset = NULL,
2276     uint64_t features = 0) {
2277     Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver,
2278                             data_offset);
2279     if (features)
2280       o->features = features;
2281     ceph_tid_t tid;
2282     op_submit(o, &tid);
2283     return tid;
2284   }
2285   Op *prepare_pg_read_op(
2286     uint32_t hash, object_locator_t oloc,
2287     ObjectOperation& op, bufferlist *pbl, int flags,
2288     Context *onack, epoch_t *reply_epoch,
2289     int *ctx_budget) {
2290     Op *o = new Op(object_t(), oloc,
2291                    op.ops,
2292                    flags | global_op_flags | CEPH_OSD_FLAG_READ |
2293                    CEPH_OSD_FLAG_IGNORE_OVERLAY,
2294                    onack, NULL);
2295     o->target.precalc_pgid = true;
2296     o->target.base_pgid = pg_t(hash, oloc.pool);
2297     o->priority = op.priority;
2298     o->snapid = CEPH_NOSNAP;
2299     o->outbl = pbl;
2300     o->out_bl.swap(op.out_bl);
2301     o->out_handler.swap(op.out_handler);
2302     o->out_rval.swap(op.out_rval);
2303     o->reply_epoch = reply_epoch;
2304     if (ctx_budget) {
2305       // budget is tracked by listing context
2306       o->ctx_budgeted = true;
2307     }
2308     return o;
2309   }
2310   ceph_tid_t pg_read(
2311     uint32_t hash, object_locator_t oloc,
2312     ObjectOperation& op, bufferlist *pbl, int flags,
2313     Context *onack, epoch_t *reply_epoch,
2314     int *ctx_budget) {
2315     Op *o = prepare_pg_read_op(hash, oloc, op, pbl, flags,
2316                                onack, reply_epoch, ctx_budget);
2317     ceph_tid_t tid;
2318     op_submit(o, &tid, ctx_budget);
2319     return tid;
2320   }
2321
2322   // caller owns a ref
2323   LingerOp *linger_register(const object_t& oid, const object_locator_t& oloc,
2324                             int flags);
2325   ceph_tid_t linger_watch(LingerOp *info,
2326                           ObjectOperation& op,
2327                           const SnapContext& snapc, ceph::real_time mtime,
2328                           bufferlist& inbl,
2329                           Context *onfinish,
2330                           version_t *objver);
2331   ceph_tid_t linger_notify(LingerOp *info,
2332                            ObjectOperation& op,
2333                            snapid_t snap, bufferlist& inbl,
2334                            bufferlist *poutbl,
2335                            Context *onack,
2336                            version_t *objver);
2337   int linger_check(LingerOp *info);
2338   void linger_cancel(LingerOp *info);  // releases a reference
2339   void _linger_cancel(LingerOp *info);
2340
2341   void _do_watch_notify(LingerOp *info, MWatchNotify *m);
2342
2343   /**
2344    * set up initial ops in the op vector, and allocate a final op slot.
2345    *
2346    * The caller is responsible for filling in the final ops_count ops.
2347    *
2348    * @param ops op vector
2349    * @param ops_count number of final ops the caller will fill in
2350    * @param extra_ops pointer to [array of] initial op[s]
2351    * @return index of final op (for caller to fill in)
2352    */
2353   int init_ops(vector<OSDOp>& ops, int ops_count, ObjectOperation *extra_ops) {
2354     int i;
2355     int extra = 0;
2356
2357     if (extra_ops)
2358       extra = extra_ops->ops.size();
2359
2360     ops.resize(ops_count + extra);
2361
2362     for (i=0; i<extra; i++) {
2363       ops[i] = extra_ops->ops[i];
2364     }
2365
2366     return i;
2367   }
2368
2369
2370   // high-level helpers
2371   Op *prepare_stat_op(
2372     const object_t& oid, const object_locator_t& oloc,
2373     snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
2374     int flags, Context *onfinish, version_t *objver = NULL,
2375     ObjectOperation *extra_ops = NULL) {
2376     vector<OSDOp> ops;
2377     int i = init_ops(ops, 1, extra_ops);
2378     ops[i].op.op = CEPH_OSD_OP_STAT;
2379     C_Stat *fin = new C_Stat(psize, pmtime, onfinish);
2380     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2381                    CEPH_OSD_FLAG_READ, fin, objver);
2382     o->snapid = snap;
2383     o->outbl = &fin->bl;
2384     return o;
2385   }
2386   ceph_tid_t stat(
2387     const object_t& oid, const object_locator_t& oloc,
2388     snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
2389     int flags, Context *onfinish, version_t *objver = NULL,
2390     ObjectOperation *extra_ops = NULL) {
2391     Op *o = prepare_stat_op(oid, oloc, snap, psize, pmtime, flags,
2392                             onfinish, objver, extra_ops);
2393     ceph_tid_t tid;
2394     op_submit(o, &tid);
2395     return tid;
2396   }
2397
2398   Op *prepare_read_op(
2399     const object_t& oid, const object_locator_t& oloc,
2400     uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl,
2401     int flags, Context *onfinish, version_t *objver = NULL,
2402     ObjectOperation *extra_ops = NULL, int op_flags = 0,
2403     ZTracer::Trace *parent_trace = nullptr) {
2404     vector<OSDOp> ops;
2405     int i = init_ops(ops, 1, extra_ops);
2406     ops[i].op.op = CEPH_OSD_OP_READ;
2407     ops[i].op.extent.offset = off;
2408     ops[i].op.extent.length = len;
2409     ops[i].op.extent.truncate_size = 0;
2410     ops[i].op.extent.truncate_seq = 0;
2411     ops[i].op.flags = op_flags;
2412     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2413                    CEPH_OSD_FLAG_READ, onfinish, objver, nullptr, parent_trace);
2414     o->snapid = snap;
2415     o->outbl = pbl;
2416     return o;
2417   }
2418   ceph_tid_t read(
2419     const object_t& oid, const object_locator_t& oloc,
2420     uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl,
2421     int flags, Context *onfinish, version_t *objver = NULL,
2422     ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2423     Op *o = prepare_read_op(oid, oloc, off, len, snap, pbl, flags,
2424                             onfinish, objver, extra_ops, op_flags);
2425     ceph_tid_t tid;
2426     op_submit(o, &tid);
2427     return tid;
2428   }
2429
2430   Op *prepare_cmpext_op(
2431     const object_t& oid, const object_locator_t& oloc,
2432     uint64_t off, bufferlist &cmp_bl,
2433     snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
2434     ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2435     vector<OSDOp> ops;
2436     int i = init_ops(ops, 1, extra_ops);
2437     ops[i].op.op = CEPH_OSD_OP_CMPEXT;
2438     ops[i].op.extent.offset = off;
2439     ops[i].op.extent.length = cmp_bl.length();
2440     ops[i].op.extent.truncate_size = 0;
2441     ops[i].op.extent.truncate_seq = 0;
2442     ops[i].indata = cmp_bl;
2443     ops[i].op.flags = op_flags;
2444     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2445                    CEPH_OSD_FLAG_READ, onfinish, objver);
2446     o->snapid = snap;
2447     return o;
2448   }
2449
2450   ceph_tid_t cmpext(
2451     const object_t& oid, const object_locator_t& oloc,
2452     uint64_t off, bufferlist &cmp_bl,
2453     snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
2454     ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2455     Op *o = prepare_cmpext_op(oid, oloc, off, cmp_bl, snap,
2456                               flags, onfinish, objver, extra_ops, op_flags);
2457     ceph_tid_t tid;
2458     op_submit(o, &tid);
2459     return tid;
2460   }
2461
2462   ceph_tid_t read_trunc(const object_t& oid, const object_locator_t& oloc,
2463                         uint64_t off, uint64_t len, snapid_t snap,
2464                         bufferlist *pbl, int flags, uint64_t trunc_size,
2465                         __u32 trunc_seq, Context *onfinish,
2466                         version_t *objver = NULL,
2467                         ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2468     vector<OSDOp> ops;
2469     int i = init_ops(ops, 1, extra_ops);
2470     ops[i].op.op = CEPH_OSD_OP_READ;
2471     ops[i].op.extent.offset = off;
2472     ops[i].op.extent.length = len;
2473     ops[i].op.extent.truncate_size = trunc_size;
2474     ops[i].op.extent.truncate_seq = trunc_seq;
2475     ops[i].op.flags = op_flags;
2476     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2477                    CEPH_OSD_FLAG_READ, onfinish, objver);
2478     o->snapid = snap;
2479     o->outbl = pbl;
2480     ceph_tid_t tid;
2481     op_submit(o, &tid);
2482     return tid;
2483   }
2484   ceph_tid_t mapext(const object_t& oid, const object_locator_t& oloc,
2485                     uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl,
2486                     int flags, Context *onfinish, version_t *objver = NULL,
2487                     ObjectOperation *extra_ops = NULL) {
2488     vector<OSDOp> ops;
2489     int i = init_ops(ops, 1, extra_ops);
2490     ops[i].op.op = CEPH_OSD_OP_MAPEXT;
2491     ops[i].op.extent.offset = off;
2492     ops[i].op.extent.length = len;
2493     ops[i].op.extent.truncate_size = 0;
2494     ops[i].op.extent.truncate_seq = 0;
2495     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2496                    CEPH_OSD_FLAG_READ, onfinish, objver);
2497     o->snapid = snap;
2498     o->outbl = pbl;
2499     ceph_tid_t tid;
2500     op_submit(o, &tid);
2501     return tid;
2502   }
2503   ceph_tid_t getxattr(const object_t& oid, const object_locator_t& oloc,
2504              const char *name, snapid_t snap, bufferlist *pbl, int flags,
2505              Context *onfinish,
2506              version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2507     vector<OSDOp> ops;
2508     int i = init_ops(ops, 1, extra_ops);
2509     ops[i].op.op = CEPH_OSD_OP_GETXATTR;
2510     ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2511     ops[i].op.xattr.value_len = 0;
2512     if (name)
2513       ops[i].indata.append(name);
2514     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2515                    CEPH_OSD_FLAG_READ, onfinish, objver);
2516     o->snapid = snap;
2517     o->outbl = pbl;
2518     ceph_tid_t tid;
2519     op_submit(o, &tid);
2520     return tid;
2521   }
2522
2523   ceph_tid_t getxattrs(const object_t& oid, const object_locator_t& oloc,
2524                        snapid_t snap, map<string,bufferlist>& attrset,
2525                        int flags, Context *onfinish, version_t *objver = NULL,
2526                        ObjectOperation *extra_ops = NULL) {
2527     vector<OSDOp> ops;
2528     int i = init_ops(ops, 1, extra_ops);
2529     ops[i].op.op = CEPH_OSD_OP_GETXATTRS;
2530     C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish);
2531     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2532                    CEPH_OSD_FLAG_READ, fin, objver);
2533     o->snapid = snap;
2534     o->outbl = &fin->bl;
2535     ceph_tid_t tid;
2536     op_submit(o, &tid);
2537     return tid;
2538   }
2539
2540   ceph_tid_t read_full(const object_t& oid, const object_locator_t& oloc,
2541                        snapid_t snap, bufferlist *pbl, int flags,
2542                        Context *onfinish, version_t *objver = NULL,
2543                        ObjectOperation *extra_ops = NULL) {
2544     return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags |
2545                 CEPH_OSD_FLAG_READ, onfinish, objver, extra_ops);
2546   }
2547
2548
2549   // writes
2550   ceph_tid_t _modify(const object_t& oid, const object_locator_t& oloc,
2551                      vector<OSDOp>& ops, ceph::real_time mtime,
2552                      const SnapContext& snapc, int flags,
2553                      Context *oncommit,
2554                      version_t *objver = NULL) {
2555     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2556                    CEPH_OSD_FLAG_WRITE, oncommit, objver);
2557     o->mtime = mtime;
2558     o->snapc = snapc;
2559     ceph_tid_t tid;
2560     op_submit(o, &tid);
2561     return tid;
2562   }
2563   Op *prepare_write_op(
2564     const object_t& oid, const object_locator_t& oloc,
2565     uint64_t off, uint64_t len, const SnapContext& snapc,
2566     const bufferlist &bl, ceph::real_time mtime, int flags,
2567     Context *oncommit, version_t *objver = NULL,
2568     ObjectOperation *extra_ops = NULL, int op_flags = 0,
2569     ZTracer::Trace *parent_trace = nullptr) {
2570     vector<OSDOp> ops;
2571     int i = init_ops(ops, 1, extra_ops);
2572     ops[i].op.op = CEPH_OSD_OP_WRITE;
2573     ops[i].op.extent.offset = off;
2574     ops[i].op.extent.length = len;
2575     ops[i].op.extent.truncate_size = 0;
2576     ops[i].op.extent.truncate_seq = 0;
2577     ops[i].indata = bl;
2578     ops[i].op.flags = op_flags;
2579     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2580                    CEPH_OSD_FLAG_WRITE, oncommit, objver,
2581                    nullptr, parent_trace);
2582     o->mtime = mtime;
2583     o->snapc = snapc;
2584     return o;
2585   }
2586   ceph_tid_t write(
2587     const object_t& oid, const object_locator_t& oloc,
2588     uint64_t off, uint64_t len, const SnapContext& snapc,
2589     const bufferlist &bl, ceph::real_time mtime, int flags,
2590     Context *oncommit, version_t *objver = NULL,
2591     ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2592     Op *o = prepare_write_op(oid, oloc, off, len, snapc, bl, mtime, flags,
2593                              oncommit, objver, extra_ops, op_flags);
2594     ceph_tid_t tid;
2595     op_submit(o, &tid);
2596     return tid;
2597   }
2598   Op *prepare_append_op(
2599     const object_t& oid, const object_locator_t& oloc,
2600     uint64_t len, const SnapContext& snapc,
2601     const bufferlist &bl, ceph::real_time mtime, int flags,
2602     Context *oncommit,
2603     version_t *objver = NULL,
2604     ObjectOperation *extra_ops = NULL) {
2605     vector<OSDOp> ops;
2606     int i = init_ops(ops, 1, extra_ops);
2607     ops[i].op.op = CEPH_OSD_OP_APPEND;
2608     ops[i].op.extent.offset = 0;
2609     ops[i].op.extent.length = len;
2610     ops[i].op.extent.truncate_size = 0;
2611     ops[i].op.extent.truncate_seq = 0;
2612     ops[i].indata = bl;
2613     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2614                    CEPH_OSD_FLAG_WRITE, oncommit, objver);
2615     o->mtime = mtime;
2616     o->snapc = snapc;
2617     return o;
2618   }
2619   ceph_tid_t append(
2620     const object_t& oid, const object_locator_t& oloc,
2621     uint64_t len, const SnapContext& snapc,
2622     const bufferlist &bl, ceph::real_time mtime, int flags,
2623     Context *oncommit,
2624     version_t *objver = NULL,
2625     ObjectOperation *extra_ops = NULL) {
2626     Op *o = prepare_append_op(oid, oloc, len, snapc, bl, mtime, flags,
2627                               oncommit, objver, extra_ops);
2628     ceph_tid_t tid;
2629     op_submit(o, &tid);
2630     return tid;
2631   }
2632   ceph_tid_t write_trunc(const object_t& oid, const object_locator_t& oloc,
2633                          uint64_t off, uint64_t len, const SnapContext& snapc,
2634                          const bufferlist &bl, ceph::real_time mtime, int flags,
2635                          uint64_t trunc_size, __u32 trunc_seq,
2636                          Context *oncommit,
2637                          version_t *objver = NULL,
2638                          ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2639     vector<OSDOp> ops;
2640     int i = init_ops(ops, 1, extra_ops);
2641     ops[i].op.op = CEPH_OSD_OP_WRITE;
2642     ops[i].op.extent.offset = off;
2643     ops[i].op.extent.length = len;
2644     ops[i].op.extent.truncate_size = trunc_size;
2645     ops[i].op.extent.truncate_seq = trunc_seq;
2646     ops[i].indata = bl;
2647     ops[i].op.flags = op_flags;
2648     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2649                    CEPH_OSD_FLAG_WRITE, oncommit, objver);
2650     o->mtime = mtime;
2651     o->snapc = snapc;
2652     ceph_tid_t tid;
2653     op_submit(o, &tid);
2654     return tid;
2655   }
2656   Op *prepare_write_full_op(
2657     const object_t& oid, const object_locator_t& oloc,
2658     const SnapContext& snapc, const bufferlist &bl,
2659     ceph::real_time mtime, int flags,
2660     Context *oncommit, version_t *objver = NULL,
2661     ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2662     vector<OSDOp> ops;
2663     int i = init_ops(ops, 1, extra_ops);
2664     ops[i].op.op = CEPH_OSD_OP_WRITEFULL;
2665     ops[i].op.extent.offset = 0;
2666     ops[i].op.extent.length = bl.length();
2667     ops[i].indata = bl;
2668     ops[i].op.flags = op_flags;
2669     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2670                    CEPH_OSD_FLAG_WRITE, oncommit, objver);
2671     o->mtime = mtime;
2672     o->snapc = snapc;
2673     return o;
2674   }
2675   ceph_tid_t write_full(
2676     const object_t& oid, const object_locator_t& oloc,
2677     const SnapContext& snapc, const bufferlist &bl,
2678     ceph::real_time mtime, int flags,
2679     Context *oncommit, version_t *objver = NULL,
2680     ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2681     Op *o = prepare_write_full_op(oid, oloc, snapc, bl, mtime, flags,
2682                                   oncommit, objver, extra_ops, op_flags);
2683     ceph_tid_t tid;
2684     op_submit(o, &tid);
2685     return tid;
2686   }
2687   Op *prepare_writesame_op(
2688     const object_t& oid, const object_locator_t& oloc,
2689     uint64_t write_len, uint64_t off,
2690     const SnapContext& snapc, const bufferlist &bl,
2691     ceph::real_time mtime, int flags,
2692     Context *oncommit, version_t *objver = NULL,
2693     ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2694
2695     vector<OSDOp> ops;
2696     int i = init_ops(ops, 1, extra_ops);
2697     ops[i].op.op = CEPH_OSD_OP_WRITESAME;
2698     ops[i].op.writesame.offset = off;
2699     ops[i].op.writesame.length = write_len;
2700     ops[i].op.writesame.data_length = bl.length();
2701     ops[i].indata = bl;
2702     ops[i].op.flags = op_flags;
2703     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2704                    CEPH_OSD_FLAG_WRITE, oncommit, objver);
2705     o->mtime = mtime;
2706     o->snapc = snapc;
2707     return o;
2708   }
2709   ceph_tid_t writesame(
2710     const object_t& oid, const object_locator_t& oloc,
2711     uint64_t write_len, uint64_t off,
2712     const SnapContext& snapc, const bufferlist &bl,
2713     ceph::real_time mtime, int flags,
2714     Context *oncommit, version_t *objver = NULL,
2715     ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2716
2717     Op *o = prepare_writesame_op(oid, oloc, write_len, off, snapc, bl,
2718                                  mtime, flags, oncommit, objver,
2719                                  extra_ops, op_flags);
2720
2721     ceph_tid_t tid;
2722     op_submit(o, &tid);
2723     return tid;
2724   }
2725   ceph_tid_t trunc(const object_t& oid, const object_locator_t& oloc,
2726                    const SnapContext& snapc, ceph::real_time mtime, int flags,
2727                    uint64_t trunc_size, __u32 trunc_seq,
2728                    Context *oncommit, version_t *objver = NULL,
2729                    ObjectOperation *extra_ops = NULL) {
2730     vector<OSDOp> ops;
2731     int i = init_ops(ops, 1, extra_ops);
2732     ops[i].op.op = CEPH_OSD_OP_TRUNCATE;
2733     ops[i].op.extent.offset = trunc_size;
2734     ops[i].op.extent.truncate_size = trunc_size;
2735     ops[i].op.extent.truncate_seq = trunc_seq;
2736     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2737                    CEPH_OSD_FLAG_WRITE, oncommit, objver);
2738     o->mtime = mtime;
2739     o->snapc = snapc;
2740     ceph_tid_t tid;
2741     op_submit(o, &tid);
2742     return tid;
2743   }
2744   ceph_tid_t zero(const object_t& oid, const object_locator_t& oloc,
2745                   uint64_t off, uint64_t len, const SnapContext& snapc,
2746                   ceph::real_time mtime, int flags, Context *oncommit,
2747              version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2748     vector<OSDOp> ops;
2749     int i = init_ops(ops, 1, extra_ops);
2750     ops[i].op.op = CEPH_OSD_OP_ZERO;
2751     ops[i].op.extent.offset = off;
2752     ops[i].op.extent.length = len;
2753     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2754                    CEPH_OSD_FLAG_WRITE, oncommit, objver);
2755     o->mtime = mtime;
2756     o->snapc = snapc;
2757     ceph_tid_t tid;
2758     op_submit(o, &tid);
2759     return tid;
2760   }
2761   ceph_tid_t rollback_object(const object_t& oid, const object_locator_t& oloc,
2762                              const SnapContext& snapc, snapid_t snapid,
2763                              ceph::real_time mtime, Context *oncommit,
2764                              version_t *objver = NULL,
2765                              ObjectOperation *extra_ops = NULL) {
2766     vector<OSDOp> ops;
2767     int i = init_ops(ops, 1, extra_ops);
2768     ops[i].op.op = CEPH_OSD_OP_ROLLBACK;
2769     ops[i].op.snap.snapid = snapid;
2770     Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, oncommit, objver);
2771     o->mtime = mtime;
2772     o->snapc = snapc;
2773     ceph_tid_t tid;
2774     op_submit(o, &tid);
2775     return tid;
2776   }
2777   ceph_tid_t create(const object_t& oid, const object_locator_t& oloc,
2778                     const SnapContext& snapc, ceph::real_time mtime, int global_flags,
2779                     int create_flags, Context *oncommit,
2780                     version_t *objver = NULL,
2781                     ObjectOperation *extra_ops = NULL) {
2782     vector<OSDOp> ops;
2783     int i = init_ops(ops, 1, extra_ops);
2784     ops[i].op.op = CEPH_OSD_OP_CREATE;
2785     ops[i].op.flags = create_flags;
2786     Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags |
2787                    CEPH_OSD_FLAG_WRITE, oncommit, objver);
2788     o->mtime = mtime;
2789     o->snapc = snapc;
2790     ceph_tid_t tid;
2791     op_submit(o, &tid);
2792     return tid;
2793   }
2794   Op *prepare_remove_op(
2795     const object_t& oid, const object_locator_t& oloc,
2796     const SnapContext& snapc, ceph::real_time mtime, int flags,
2797     Context *oncommit,
2798     version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2799     vector<OSDOp> ops;
2800     int i = init_ops(ops, 1, extra_ops);
2801     ops[i].op.op = CEPH_OSD_OP_DELETE;
2802     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2803                    CEPH_OSD_FLAG_WRITE, oncommit, objver);
2804     o->mtime = mtime;
2805     o->snapc = snapc;
2806     return o;
2807   }
2808   ceph_tid_t remove(
2809     const object_t& oid, const object_locator_t& oloc,
2810     const SnapContext& snapc, ceph::real_time mtime, int flags,
2811     Context *oncommit,
2812     version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2813     Op *o = prepare_remove_op(oid, oloc, snapc, mtime, flags,
2814                               oncommit, objver, extra_ops);
2815     ceph_tid_t tid;
2816     op_submit(o, &tid);
2817     return tid;
2818   }
2819
2820   ceph_tid_t setxattr(const object_t& oid, const object_locator_t& oloc,
2821               const char *name, const SnapContext& snapc, const bufferlist &bl,
2822               ceph::real_time mtime, int flags,
2823               Context *oncommit,
2824               version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2825     vector<OSDOp> ops;
2826     int i = init_ops(ops, 1, extra_ops);
2827     ops[i].op.op = CEPH_OSD_OP_SETXATTR;
2828     ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2829     ops[i].op.xattr.value_len = bl.length();
2830     if (name)
2831       ops[i].indata.append(name);
2832     ops[i].indata.append(bl);
2833     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2834                    CEPH_OSD_FLAG_WRITE, oncommit, objver);
2835     o->mtime = mtime;
2836     o->snapc = snapc;
2837     ceph_tid_t tid;
2838     op_submit(o, &tid);
2839     return tid;
2840   }
2841   ceph_tid_t removexattr(const object_t& oid, const object_locator_t& oloc,
2842               const char *name, const SnapContext& snapc,
2843               ceph::real_time mtime, int flags,
2844               Context *oncommit,
2845               version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2846     vector<OSDOp> ops;
2847     int i = init_ops(ops, 1, extra_ops);
2848     ops[i].op.op = CEPH_OSD_OP_RMXATTR;
2849     ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2850     ops[i].op.xattr.value_len = 0;
2851     if (name)
2852       ops[i].indata.append(name);
2853     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2854                    CEPH_OSD_FLAG_WRITE, oncommit, objver);
2855     o->mtime = mtime;
2856     o->snapc = snapc;
2857     ceph_tid_t tid;
2858     op_submit(o, &tid);
2859     return tid;
2860   }
2861
2862   void list_nobjects(NListContext *p, Context *onfinish);
2863   uint32_t list_nobjects_seek(NListContext *p, uint32_t pos);
2864   uint32_t list_nobjects_seek(NListContext *list_context, const hobject_t& c);
2865   void list_nobjects_get_cursor(NListContext *list_context, hobject_t *c);
2866
2867   hobject_t enumerate_objects_begin();
2868   hobject_t enumerate_objects_end();
2869   //hobject_t enumerate_objects_begin(int n, int m);
2870   void enumerate_objects(
2871     int64_t pool_id,
2872     const std::string &ns,
2873     const hobject_t &start,
2874     const hobject_t &end,
2875     const uint32_t max,
2876     const bufferlist &filter_bl,
2877     std::list<librados::ListObjectImpl> *result, 
2878     hobject_t *next,
2879     Context *on_finish);
2880
2881   void _enumerate_reply(
2882       bufferlist &bl,
2883       int r,
2884       const hobject_t &end,
2885       const int64_t pool_id,
2886       int budget,
2887       epoch_t reply_epoch,
2888       std::list<librados::ListObjectImpl> *result, 
2889       hobject_t *next,
2890       Context *on_finish);
2891   friend class C_EnumerateReply;
2892
2893   // -------------------------
2894   // pool ops
2895 private:
2896   void pool_op_submit(PoolOp *op);
2897   void _pool_op_submit(PoolOp *op);
2898   void _finish_pool_op(PoolOp *op, int r);
2899   void _do_delete_pool(int64_t pool, Context *onfinish);
2900 public:
2901   int create_pool_snap(int64_t pool, string& snapName, Context *onfinish);
2902   int allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
2903                                 Context *onfinish);
2904   int delete_pool_snap(int64_t pool, string& snapName, Context *onfinish);
2905   int delete_selfmanaged_snap(int64_t pool, snapid_t snap, Context *onfinish);
2906
2907   int create_pool(string& name, Context *onfinish, uint64_t auid=0,
2908                   int crush_rule=-1);
2909   int delete_pool(int64_t pool, Context *onfinish);
2910   int delete_pool(const string& name, Context *onfinish);
2911   int change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid);
2912
2913   void handle_pool_op_reply(MPoolOpReply *m);
2914   int pool_op_cancel(ceph_tid_t tid, int r);
2915
2916   // --------------------------
2917   // pool stats
2918 private:
2919   void _poolstat_submit(PoolStatOp *op);
2920 public:
2921   void handle_get_pool_stats_reply(MGetPoolStatsReply *m);
2922   void get_pool_stats(list<string>& pools, map<string,pool_stat_t> *result,
2923                       Context *onfinish);
2924   int pool_stat_op_cancel(ceph_tid_t tid, int r);
2925   void _finish_pool_stat_op(PoolStatOp *op, int r);
2926
2927   // ---------------------------
2928   // df stats
2929 private:
2930   void _fs_stats_submit(StatfsOp *op);
2931 public:
2932   void handle_fs_stats_reply(MStatfsReply *m);
2933   void get_fs_stats(struct ceph_statfs& result, boost::optional<int64_t> poolid,
2934                     Context *onfinish);
2935   int statfs_op_cancel(ceph_tid_t tid, int r);
2936   void _finish_statfs_op(StatfsOp *op, int r);
2937
2938   // ---------------------------
2939   // some scatter/gather hackery
2940
2941   void _sg_read_finish(vector<ObjectExtent>& extents,
2942                        vector<bufferlist>& resultbl,
2943                        bufferlist *bl, Context *onfinish);
2944
2945   struct C_SGRead : public Context {
2946     Objecter *objecter;
2947     vector<ObjectExtent> extents;
2948     vector<bufferlist> resultbl;
2949     bufferlist *bl;
2950     Context *onfinish;
2951     C_SGRead(Objecter *ob,
2952              vector<ObjectExtent>& e, vector<bufferlist>& r, bufferlist *b,
2953              Context *c) :
2954       objecter(ob), bl(b), onfinish(c) {
2955       extents.swap(e);
2956       resultbl.swap(r);
2957     }
2958     void finish(int r) override {
2959       objecter->_sg_read_finish(extents, resultbl, bl, onfinish);
2960     }
2961   };
2962
2963   void sg_read_trunc(vector<ObjectExtent>& extents, snapid_t snap,
2964                      bufferlist *bl, int flags, uint64_t trunc_size,
2965                      __u32 trunc_seq, Context *onfinish, int op_flags = 0) {
2966     if (extents.size() == 1) {
2967       read_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
2968                  extents[0].length, snap, bl, flags, extents[0].truncate_size,
2969                  trunc_seq, onfinish, 0, 0, op_flags);
2970     } else {
2971       C_GatherBuilder gather(cct);
2972       vector<bufferlist> resultbl(extents.size());
2973       int i=0;
2974       for (vector<ObjectExtent>::iterator p = extents.begin();
2975            p != extents.end();
2976            ++p) {
2977         read_trunc(p->oid, p->oloc, p->offset, p->length, snap, &resultbl[i++],
2978                    flags, p->truncate_size, trunc_seq, gather.new_sub(),
2979                    0, 0, op_flags);
2980       }
2981       gather.set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish));
2982       gather.activate();
2983     }
2984   }
2985
2986   void sg_read(vector<ObjectExtent>& extents, snapid_t snap, bufferlist *bl,
2987                int flags, Context *onfinish, int op_flags = 0) {
2988     sg_read_trunc(extents, snap, bl, flags, 0, 0, onfinish, op_flags);
2989   }
2990
2991   void sg_write_trunc(vector<ObjectExtent>& extents, const SnapContext& snapc,
2992                       const bufferlist& bl, ceph::real_time mtime, int flags,
2993                       uint64_t trunc_size, __u32 trunc_seq,
2994                       Context *oncommit, int op_flags = 0) {
2995     if (extents.size() == 1) {
2996       write_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
2997                   extents[0].length, snapc, bl, mtime, flags,
2998                   extents[0].truncate_size, trunc_seq, oncommit,
2999                   0, 0, op_flags);
3000     } else {
3001       C_GatherBuilder gcom(cct, oncommit);
3002       for (vector<ObjectExtent>::iterator p = extents.begin();
3003            p != extents.end();
3004            ++p) {
3005         bufferlist cur;
3006         for (vector<pair<uint64_t,uint64_t> >::iterator bit
3007                = p->buffer_extents.begin();
3008              bit != p->buffer_extents.end();
3009              ++bit)
3010           bl.copy(bit->first, bit->second, cur);
3011         assert(cur.length() == p->length);
3012         write_trunc(p->oid, p->oloc, p->offset, p->length,
3013               snapc, cur, mtime, flags, p->truncate_size, trunc_seq,
3014               oncommit ? gcom.new_sub():0,
3015               0, 0, op_flags);
3016       }
3017       gcom.activate();
3018     }
3019   }
3020
3021   void sg_write(vector<ObjectExtent>& extents, const SnapContext& snapc,
3022                 const bufferlist& bl, ceph::real_time mtime, int flags,
3023                 Context *oncommit, int op_flags = 0) {
3024     sg_write_trunc(extents, snapc, bl, mtime, flags, 0, 0, oncommit,
3025                    op_flags);
3026   }
3027
3028   void ms_handle_connect(Connection *con) override;
3029   bool ms_handle_reset(Connection *con) override;
3030   void ms_handle_remote_reset(Connection *con) override;
3031   bool ms_handle_refused(Connection *con) override;
3032   bool ms_get_authorizer(int dest_type,
3033                          AuthAuthorizer **authorizer,
3034                          bool force_new) override;
3035
3036   void blacklist_self(bool set);
3037
3038 private:
3039   epoch_t epoch_barrier;
3040   bool retry_writes_after_first_reply;
3041 public:
3042   void set_epoch_barrier(epoch_t epoch);
3043
3044   PerfCounters *get_logger() {
3045     return logger;
3046   }
3047 };
3048
3049 #endif