Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osdc / Objecter.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include "Objecter.h"
16 #include "osd/OSDMap.h"
17 #include "Filer.h"
18
19 #include "mon/MonClient.h"
20
21 #include "msg/Messenger.h"
22 #include "msg/Message.h"
23
24 #include "messages/MPing.h"
25 #include "messages/MOSDOp.h"
26 #include "messages/MOSDOpReply.h"
27 #include "messages/MOSDBackoff.h"
28 #include "messages/MOSDMap.h"
29
30 #include "messages/MPoolOp.h"
31 #include "messages/MPoolOpReply.h"
32
33 #include "messages/MGetPoolStats.h"
34 #include "messages/MGetPoolStatsReply.h"
35 #include "messages/MStatfs.h"
36 #include "messages/MStatfsReply.h"
37
38 #include "messages/MMonCommand.h"
39
40 #include "messages/MCommand.h"
41 #include "messages/MCommandReply.h"
42
43 #include "messages/MWatchNotify.h"
44
45 #include <errno.h>
46
47 #include "common/config.h"
48 #include "common/perf_counters.h"
49 #include "common/scrub_types.h"
50 #include "include/str_list.h"
51 #include "common/errno.h"
52 #include "common/EventTrace.h"
53
54 using ceph::real_time;
55 using ceph::real_clock;
56
57 using ceph::mono_clock;
58 using ceph::mono_time;
59
60 using ceph::timespan;
61
62
63 #define dout_subsys ceph_subsys_objecter
64 #undef dout_prefix
65 #define dout_prefix *_dout << messenger->get_myname() << ".objecter "
66
67
68 enum {
69   l_osdc_first = 123200,
70   l_osdc_op_active,
71   l_osdc_op_laggy,
72   l_osdc_op_send,
73   l_osdc_op_send_bytes,
74   l_osdc_op_resend,
75   l_osdc_op_reply,
76
77   l_osdc_op,
78   l_osdc_op_r,
79   l_osdc_op_w,
80   l_osdc_op_rmw,
81   l_osdc_op_pg,
82
83   l_osdc_osdop_stat,
84   l_osdc_osdop_create,
85   l_osdc_osdop_read,
86   l_osdc_osdop_write,
87   l_osdc_osdop_writefull,
88   l_osdc_osdop_writesame,
89   l_osdc_osdop_append,
90   l_osdc_osdop_zero,
91   l_osdc_osdop_truncate,
92   l_osdc_osdop_delete,
93   l_osdc_osdop_mapext,
94   l_osdc_osdop_sparse_read,
95   l_osdc_osdop_clonerange,
96   l_osdc_osdop_getxattr,
97   l_osdc_osdop_setxattr,
98   l_osdc_osdop_cmpxattr,
99   l_osdc_osdop_rmxattr,
100   l_osdc_osdop_resetxattrs,
101   l_osdc_osdop_tmap_up,
102   l_osdc_osdop_tmap_put,
103   l_osdc_osdop_tmap_get,
104   l_osdc_osdop_call,
105   l_osdc_osdop_watch,
106   l_osdc_osdop_notify,
107   l_osdc_osdop_src_cmpxattr,
108   l_osdc_osdop_pgls,
109   l_osdc_osdop_pgls_filter,
110   l_osdc_osdop_other,
111
112   l_osdc_linger_active,
113   l_osdc_linger_send,
114   l_osdc_linger_resend,
115   l_osdc_linger_ping,
116
117   l_osdc_poolop_active,
118   l_osdc_poolop_send,
119   l_osdc_poolop_resend,
120
121   l_osdc_poolstat_active,
122   l_osdc_poolstat_send,
123   l_osdc_poolstat_resend,
124
125   l_osdc_statfs_active,
126   l_osdc_statfs_send,
127   l_osdc_statfs_resend,
128
129   l_osdc_command_active,
130   l_osdc_command_send,
131   l_osdc_command_resend,
132
133   l_osdc_map_epoch,
134   l_osdc_map_full,
135   l_osdc_map_inc,
136
137   l_osdc_osd_sessions,
138   l_osdc_osd_session_open,
139   l_osdc_osd_session_close,
140   l_osdc_osd_laggy,
141
142   l_osdc_osdop_omap_wr,
143   l_osdc_osdop_omap_rd,
144   l_osdc_osdop_omap_del,
145
146   l_osdc_last,
147 };
148
149
150 // config obs ----------------------------
151
152 static const char *config_keys[] = {
153   "crush_location",
154   NULL
155 };
156
157 class Objecter::RequestStateHook : public AdminSocketHook {
158   Objecter *m_objecter;
159 public:
160   explicit RequestStateHook(Objecter *objecter);
161   bool call(std::string command, cmdmap_t& cmdmap, std::string format,
162             bufferlist& out) override;
163 };
164
165 /**
166  * This is a more limited form of C_Contexts, but that requires
167  * a ceph_context which we don't have here.
168  */
169 class ObjectOperation::C_TwoContexts : public Context {
170   Context *first;
171   Context *second;
172 public:
173   C_TwoContexts(Context *first, Context *second) :
174     first(first), second(second) {}
175   void finish(int r) override {
176     first->complete(r);
177     second->complete(r);
178     first = NULL;
179     second = NULL;
180   }
181
182   ~C_TwoContexts() override {
183     delete first;
184     delete second;
185   }
186 };
187
188 void ObjectOperation::add_handler(Context *extra) {
189   size_t last = out_handler.size() - 1;
190   Context *orig = out_handler[last];
191   if (orig) {
192     Context *wrapper = new C_TwoContexts(orig, extra);
193     out_handler[last] = wrapper;
194   } else {
195     out_handler[last] = extra;
196   }
197 }
198
199 Objecter::OSDSession::unique_completion_lock Objecter::OSDSession::get_lock(
200   object_t& oid)
201 {
202   if (oid.name.empty())
203     return unique_completion_lock();
204
205   static constexpr uint32_t HASH_PRIME = 1021;
206   uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size())
207     % HASH_PRIME;
208
209   return unique_completion_lock(completion_locks[h % num_locks],
210                                 std::defer_lock);
211 }
212
213 const char** Objecter::get_tracked_conf_keys() const
214 {
215   return config_keys;
216 }
217
218
219 void Objecter::handle_conf_change(const struct md_config_t *conf,
220                                   const std::set <std::string> &changed)
221 {
222   if (changed.count("crush_location")) {
223     update_crush_location();
224   }
225 }
226
227 void Objecter::update_crush_location()
228 {
229   unique_lock wl(rwlock);
230   crush_location = cct->crush_location.get_location();
231 }
232
233 // messages ------------------------------
234
235 /*
236  * initialize only internal data structures, don't initiate cluster interaction
237  */
238 void Objecter::init()
239 {
240   assert(!initialized);
241
242   if (!logger) {
243     PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
244
245     pcb.add_u64(l_osdc_op_active, "op_active", "Operations active", "actv",
246                 PerfCountersBuilder::PRIO_CRITICAL);
247     pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
248     pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
249     pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data");
250     pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
251     pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
252
253     pcb.add_u64_counter(l_osdc_op, "op", "Operations");
254     pcb.add_u64_counter(l_osdc_op_r, "op_r", "Read operations", "rd",
255                         PerfCountersBuilder::PRIO_CRITICAL);
256     pcb.add_u64_counter(l_osdc_op_w, "op_w", "Write operations", "wr",
257                         PerfCountersBuilder::PRIO_CRITICAL);
258     pcb.add_u64_counter(l_osdc_op_rmw, "op_rmw", "Read-modify-write operations",
259                         "rdwr", PerfCountersBuilder::PRIO_INTERESTING);
260     pcb.add_u64_counter(l_osdc_op_pg, "op_pg", "PG operation");
261
262     pcb.add_u64_counter(l_osdc_osdop_stat, "osdop_stat", "Stat operations");
263     pcb.add_u64_counter(l_osdc_osdop_create, "osdop_create",
264                         "Create object operations");
265     pcb.add_u64_counter(l_osdc_osdop_read, "osdop_read", "Read operations");
266     pcb.add_u64_counter(l_osdc_osdop_write, "osdop_write", "Write operations");
267     pcb.add_u64_counter(l_osdc_osdop_writefull, "osdop_writefull",
268                         "Write full object operations");
269     pcb.add_u64_counter(l_osdc_osdop_writesame, "osdop_writesame",
270                         "Write same operations");
271     pcb.add_u64_counter(l_osdc_osdop_append, "osdop_append",
272                         "Append operation");
273     pcb.add_u64_counter(l_osdc_osdop_zero, "osdop_zero",
274                         "Set object to zero operations");
275     pcb.add_u64_counter(l_osdc_osdop_truncate, "osdop_truncate",
276                         "Truncate object operations");
277     pcb.add_u64_counter(l_osdc_osdop_delete, "osdop_delete",
278                         "Delete object operations");
279     pcb.add_u64_counter(l_osdc_osdop_mapext, "osdop_mapext",
280                         "Map extent operations");
281     pcb.add_u64_counter(l_osdc_osdop_sparse_read, "osdop_sparse_read",
282                         "Sparse read operations");
283     pcb.add_u64_counter(l_osdc_osdop_clonerange, "osdop_clonerange",
284                         "Clone range operations");
285     pcb.add_u64_counter(l_osdc_osdop_getxattr, "osdop_getxattr",
286                         "Get xattr operations");
287     pcb.add_u64_counter(l_osdc_osdop_setxattr, "osdop_setxattr",
288                         "Set xattr operations");
289     pcb.add_u64_counter(l_osdc_osdop_cmpxattr, "osdop_cmpxattr",
290                         "Xattr comparison operations");
291     pcb.add_u64_counter(l_osdc_osdop_rmxattr, "osdop_rmxattr",
292                         "Remove xattr operations");
293     pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
294                         "Reset xattr operations");
295     pcb.add_u64_counter(l_osdc_osdop_tmap_up, "osdop_tmap_up",
296                         "TMAP update operations");
297     pcb.add_u64_counter(l_osdc_osdop_tmap_put, "osdop_tmap_put",
298                         "TMAP put operations");
299     pcb.add_u64_counter(l_osdc_osdop_tmap_get, "osdop_tmap_get",
300                         "TMAP get operations");
301     pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
302                         "Call (execute) operations");
303     pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
304                         "Watch by object operations");
305     pcb.add_u64_counter(l_osdc_osdop_notify, "osdop_notify",
306                         "Notify about object operations");
307     pcb.add_u64_counter(l_osdc_osdop_src_cmpxattr, "osdop_src_cmpxattr",
308                         "Extended attribute comparison in multi operations");
309     pcb.add_u64_counter(l_osdc_osdop_pgls, "osdop_pgls");
310     pcb.add_u64_counter(l_osdc_osdop_pgls_filter, "osdop_pgls_filter");
311     pcb.add_u64_counter(l_osdc_osdop_other, "osdop_other", "Other operations");
312
313     pcb.add_u64(l_osdc_linger_active, "linger_active",
314                 "Active lingering operations");
315     pcb.add_u64_counter(l_osdc_linger_send, "linger_send",
316                         "Sent lingering operations");
317     pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend",
318                         "Resent lingering operations");
319     pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping",
320                         "Sent pings to lingering operations");
321
322     pcb.add_u64(l_osdc_poolop_active, "poolop_active",
323                 "Active pool operations");
324     pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send",
325                         "Sent pool operations");
326     pcb.add_u64_counter(l_osdc_poolop_resend, "poolop_resend",
327                         "Resent pool operations");
328
329     pcb.add_u64(l_osdc_poolstat_active, "poolstat_active",
330                 "Active get pool stat operations");
331     pcb.add_u64_counter(l_osdc_poolstat_send, "poolstat_send",
332                         "Pool stat operations sent");
333     pcb.add_u64_counter(l_osdc_poolstat_resend, "poolstat_resend",
334                         "Resent pool stats");
335
336     pcb.add_u64(l_osdc_statfs_active, "statfs_active", "Statfs operations");
337     pcb.add_u64_counter(l_osdc_statfs_send, "statfs_send", "Sent FS stats");
338     pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend",
339                         "Resent FS stats");
340
341     pcb.add_u64(l_osdc_command_active, "command_active", "Active commands");
342     pcb.add_u64_counter(l_osdc_command_send, "command_send",
343                         "Sent commands");
344     pcb.add_u64_counter(l_osdc_command_resend, "command_resend",
345                         "Resent commands");
346
347     pcb.add_u64(l_osdc_map_epoch, "map_epoch", "OSD map epoch");
348     pcb.add_u64_counter(l_osdc_map_full, "map_full",
349                         "Full OSD maps received");
350     pcb.add_u64_counter(l_osdc_map_inc, "map_inc",
351                         "Incremental OSD maps received");
352
353     pcb.add_u64(l_osdc_osd_sessions, "osd_sessions",
354                 "Open sessions");  // open sessions
355     pcb.add_u64_counter(l_osdc_osd_session_open, "osd_session_open",
356                         "Sessions opened");
357     pcb.add_u64_counter(l_osdc_osd_session_close, "osd_session_close",
358                         "Sessions closed");
359     pcb.add_u64(l_osdc_osd_laggy, "osd_laggy", "Laggy OSD sessions");
360
361     pcb.add_u64_counter(l_osdc_osdop_omap_wr, "omap_wr",
362                         "OSD OMAP write operations");
363     pcb.add_u64_counter(l_osdc_osdop_omap_rd, "omap_rd",
364                         "OSD OMAP read operations");
365     pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del",
366                         "OSD OMAP delete operations");
367
368     logger = pcb.create_perf_counters();
369     cct->get_perfcounters_collection()->add(logger);
370   }
371
372   m_request_state_hook = new RequestStateHook(this);
373   AdminSocket* admin_socket = cct->get_admin_socket();
374   int ret = admin_socket->register_command("objecter_requests",
375                                            "objecter_requests",
376                                            m_request_state_hook,
377                                            "show in-progress osd requests");
378
379   /* Don't warn on EEXIST, happens if multiple ceph clients
380    * are instantiated from one process */
381   if (ret < 0 && ret != -EEXIST) {
382     lderr(cct) << "error registering admin socket command: "
383                << cpp_strerror(ret) << dendl;
384   }
385
386   update_crush_location();
387
388   cct->_conf->add_observer(this);
389
390   initialized = true;
391 }
392
393 /*
394  * ok, cluster interaction can happen
395  */
396 void Objecter::start(const OSDMap* o)
397 {
398   shared_lock rl(rwlock);
399
400   start_tick();
401   if (o) {
402     osdmap->deepish_copy_from(*o);
403   } else if (osdmap->get_epoch() == 0) {
404     _maybe_request_map();
405   }
406 }
407
408 void Objecter::shutdown()
409 {
410   assert(initialized);
411
412   unique_lock wl(rwlock);
413
414   initialized = false;
415
416   cct->_conf->remove_observer(this);
417
418   map<int,OSDSession*>::iterator p;
419   while (!osd_sessions.empty()) {
420     p = osd_sessions.begin();
421     close_session(p->second);
422   }
423
424   while(!check_latest_map_lingers.empty()) {
425     map<uint64_t, LingerOp*>::iterator i = check_latest_map_lingers.begin();
426     i->second->put();
427     check_latest_map_lingers.erase(i->first);
428   }
429
430   while(!check_latest_map_ops.empty()) {
431     map<ceph_tid_t, Op*>::iterator i = check_latest_map_ops.begin();
432     i->second->put();
433     check_latest_map_ops.erase(i->first);
434   }
435
436   while(!check_latest_map_commands.empty()) {
437     map<ceph_tid_t, CommandOp*>::iterator i
438       = check_latest_map_commands.begin();
439     i->second->put();
440     check_latest_map_commands.erase(i->first);
441   }
442
443   while(!poolstat_ops.empty()) {
444     map<ceph_tid_t,PoolStatOp*>::iterator i = poolstat_ops.begin();
445     delete i->second;
446     poolstat_ops.erase(i->first);
447   }
448
449   while(!statfs_ops.empty()) {
450     map<ceph_tid_t, StatfsOp*>::iterator i = statfs_ops.begin();
451     delete i->second;
452     statfs_ops.erase(i->first);
453   }
454
455   while(!pool_ops.empty()) {
456     map<ceph_tid_t, PoolOp*>::iterator i = pool_ops.begin();
457     delete i->second;
458     pool_ops.erase(i->first);
459   }
460
461   ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
462   while(!homeless_session->linger_ops.empty()) {
463     std::map<uint64_t, LingerOp*>::iterator i
464       = homeless_session->linger_ops.begin();
465     ldout(cct, 10) << " linger_op " << i->first << dendl;
466     LingerOp *lop = i->second;
467     {
468       OSDSession::unique_lock swl(homeless_session->lock);
469       _session_linger_op_remove(homeless_session, lop);
470     }
471     linger_ops.erase(lop->linger_id);
472     linger_ops_set.erase(lop);
473     lop->put();
474   }
475
476   while(!homeless_session->ops.empty()) {
477     std::map<ceph_tid_t, Op*>::iterator i = homeless_session->ops.begin();
478     ldout(cct, 10) << " op " << i->first << dendl;
479     Op *op = i->second;
480     {
481       OSDSession::unique_lock swl(homeless_session->lock);
482       _session_op_remove(homeless_session, op);
483     }
484     op->put();
485   }
486
487   while(!homeless_session->command_ops.empty()) {
488     std::map<ceph_tid_t, CommandOp*>::iterator i
489       = homeless_session->command_ops.begin();
490     ldout(cct, 10) << " command_op " << i->first << dendl;
491     CommandOp *cop = i->second;
492     {
493       OSDSession::unique_lock swl(homeless_session->lock);
494       _session_command_op_remove(homeless_session, cop);
495     }
496     cop->put();
497   }
498
499   if (tick_event) {
500     if (timer.cancel_event(tick_event)) {
501       ldout(cct, 10) <<  " successfully canceled tick" << dendl;
502     }
503     tick_event = 0;
504   }
505
506   if (logger) {
507     cct->get_perfcounters_collection()->remove(logger);
508     delete logger;
509     logger = NULL;
510   }
511
512   // Let go of Objecter write lock so timer thread can shutdown
513   wl.unlock();
514
515   // Outside of lock to avoid cycle WRT calls to RequestStateHook
516   // This is safe because we guarantee no concurrent calls to
517   // shutdown() with the ::initialized check at start.
518   if (m_request_state_hook) {
519     AdminSocket* admin_socket = cct->get_admin_socket();
520     admin_socket->unregister_command("objecter_requests");
521     delete m_request_state_hook;
522     m_request_state_hook = NULL;
523   }
524 }
525
526 void Objecter::_send_linger(LingerOp *info,
527                             shunique_lock& sul)
528 {
529   assert(sul.owns_lock() && sul.mutex() == &rwlock);
530
531   vector<OSDOp> opv;
532   Context *oncommit = NULL;
533   LingerOp::shared_lock watchl(info->watch_lock);
534   bufferlist *poutbl = NULL;
535   if (info->registered && info->is_watch) {
536     ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
537                    << dendl;
538     opv.push_back(OSDOp());
539     opv.back().op.op = CEPH_OSD_OP_WATCH;
540     opv.back().op.watch.cookie = info->get_cookie();
541     opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
542     opv.back().op.watch.gen = ++info->register_gen;
543     oncommit = new C_Linger_Reconnect(this, info);
544   } else {
545     ldout(cct, 15) << "send_linger " << info->linger_id << " register"
546                    << dendl;
547     opv = info->ops;
548     C_Linger_Commit *c = new C_Linger_Commit(this, info);
549     if (!info->is_watch) {
550       info->notify_id = 0;
551       poutbl = &c->outbl;
552     }
553     oncommit = c;
554   }
555   watchl.unlock();
556   Op *o = new Op(info->target.base_oid, info->target.base_oloc,
557                  opv, info->target.flags | CEPH_OSD_FLAG_READ,
558                  oncommit, info->pobjver);
559   o->outbl = poutbl;
560   o->snapid = info->snap;
561   o->snapc = info->snapc;
562   o->mtime = info->mtime;
563
564   o->target = info->target;
565   o->tid = ++last_tid;
566
567   // do not resend this; we will send a new op to reregister
568   o->should_resend = false;
569
570   if (info->register_tid) {
571     // repeat send.  cancel old registeration op, if any.
572     OSDSession::unique_lock sl(info->session->lock);
573     if (info->session->ops.count(info->register_tid)) {
574       Op *o = info->session->ops[info->register_tid];
575       _op_cancel_map_check(o);
576       _cancel_linger_op(o);
577     }
578     sl.unlock();
579
580     _op_submit(o, sul, &info->register_tid);
581   } else {
582     // first send
583     _op_submit_with_budget(o, sul, &info->register_tid);
584   }
585
586   logger->inc(l_osdc_linger_send);
587 }
588
589 void Objecter::_linger_commit(LingerOp *info, int r, bufferlist& outbl)
590 {
591   LingerOp::unique_lock wl(info->watch_lock);
592   ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
593   if (info->on_reg_commit) {
594     info->on_reg_commit->complete(r);
595     info->on_reg_commit = NULL;
596   }
597
598   // only tell the user the first time we do this
599   info->registered = true;
600   info->pobjver = NULL;
601
602   if (!info->is_watch) {
603     // make note of the notify_id
604     bufferlist::iterator p = outbl.begin();
605     try {
606       ::decode(info->notify_id, p);
607       ldout(cct, 10) << "_linger_commit  notify_id=" << info->notify_id
608                      << dendl;
609     }
610     catch (buffer::error& e) {
611     }
612   }
613 }
614
615 struct C_DoWatchError : public Context {
616   Objecter *objecter;
617   Objecter::LingerOp *info;
618   int err;
619   C_DoWatchError(Objecter *o, Objecter::LingerOp *i, int r)
620     : objecter(o), info(i), err(r) {
621     info->get();
622     info->_queued_async();
623   }
624   void finish(int r) override {
625     Objecter::unique_lock wl(objecter->rwlock);
626     bool canceled = info->canceled;
627     wl.unlock();
628
629     if (!canceled) {
630       info->watch_context->handle_error(info->get_cookie(), err);
631     }
632
633     info->finished_async();
634     info->put();
635   }
636 };
637
638 int Objecter::_normalize_watch_error(int r)
639 {
640   // translate ENOENT -> ENOTCONN so that a delete->disconnection
641   // notification and a failure to reconnect becuase we raced with
642   // the delete appear the same to the user.
643   if (r == -ENOENT)
644     r = -ENOTCONN;
645   return r;
646 }
647
648 void Objecter::_linger_reconnect(LingerOp *info, int r)
649 {
650   ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r
651                  << " (last_error " << info->last_error << ")" << dendl;
652   if (r < 0) {
653     LingerOp::unique_lock wl(info->watch_lock);
654     if (!info->last_error) {
655       r = _normalize_watch_error(r);
656       info->last_error = r;
657       if (info->watch_context) {
658         finisher->queue(new C_DoWatchError(this, info, r));
659       }
660     }
661     wl.unlock();
662   }
663 }
664
665 void Objecter::_send_linger_ping(LingerOp *info)
666 {
667   // rwlock is locked unique
668   // info->session->lock is locked
669
670   if (cct->_conf->objecter_inject_no_watch_ping) {
671     ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING"
672                    << dendl;
673     return;
674   }
675   if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
676     ldout(cct, 10) << __func__ << " PAUSERD" << dendl;
677     return;
678   }
679
680   ceph::mono_time now = ceph::mono_clock::now();
681   ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
682                  << dendl;
683
684   vector<OSDOp> opv(1);
685   opv[0].op.op = CEPH_OSD_OP_WATCH;
686   opv[0].op.watch.cookie = info->get_cookie();
687   opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
688   opv[0].op.watch.gen = info->register_gen;
689   C_Linger_Ping *onack = new C_Linger_Ping(this, info);
690   Op *o = new Op(info->target.base_oid, info->target.base_oloc,
691                  opv, info->target.flags | CEPH_OSD_FLAG_READ,
692                  onack, NULL, NULL);
693   o->target = info->target;
694   o->should_resend = false;
695   _send_op_account(o);
696   MOSDOp *m = _prepare_osd_op(o);
697   o->tid = ++last_tid;
698   _session_op_assign(info->session, o);
699   _send_op(o, m);
700   info->ping_tid = o->tid;
701
702   onack->sent = now;
703   logger->inc(l_osdc_linger_ping);
704 }
705
706 void Objecter::_linger_ping(LingerOp *info, int r, mono_time sent,
707                             uint32_t register_gen)
708 {
709   LingerOp::unique_lock l(info->watch_lock);
710   ldout(cct, 10) << __func__ << " " << info->linger_id
711                  << " sent " << sent << " gen " << register_gen << " = " << r
712                  << " (last_error " << info->last_error
713                  << " register_gen " << info->register_gen << ")" << dendl;
714   if (info->register_gen == register_gen) {
715     if (r == 0) {
716       info->watch_valid_thru = sent;
717     } else if (r < 0 && !info->last_error) {
718       r = _normalize_watch_error(r);
719       info->last_error = r;
720       if (info->watch_context) {
721         finisher->queue(new C_DoWatchError(this, info, r));
722       }
723     }
724   } else {
725     ldout(cct, 20) << " ignoring old gen" << dendl;
726   }
727 }
728
729 int Objecter::linger_check(LingerOp *info)
730 {
731   LingerOp::shared_lock l(info->watch_lock);
732
733   mono_time stamp = info->watch_valid_thru;
734   if (!info->watch_pending_async.empty())
735     stamp = MIN(info->watch_valid_thru, info->watch_pending_async.front());
736   auto age = mono_clock::now() - stamp;
737
738   ldout(cct, 10) << __func__ << " " << info->linger_id
739                  << " err " << info->last_error
740                  << " age " << age << dendl;
741   if (info->last_error)
742     return info->last_error;
743   // return a safe upper bound (we are truncating to ms)
744   return
745     1 + std::chrono::duration_cast<std::chrono::milliseconds>(age).count();
746 }
747
748 void Objecter::linger_cancel(LingerOp *info)
749 {
750   unique_lock wl(rwlock);
751   _linger_cancel(info);
752   info->put();
753 }
754
755 void Objecter::_linger_cancel(LingerOp *info)
756 {
757   // rwlock is locked unique
758   ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
759   if (!info->canceled) {
760     OSDSession *s = info->session;
761     OSDSession::unique_lock sl(s->lock);
762     _session_linger_op_remove(s, info);
763     sl.unlock();
764
765     linger_ops.erase(info->linger_id);
766     linger_ops_set.erase(info);
767     assert(linger_ops.size() == linger_ops_set.size());
768
769     info->canceled = true;
770     info->put();
771
772     logger->dec(l_osdc_linger_active);
773   }
774 }
775
776
777
778 Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
779                                               const object_locator_t& oloc,
780                                               int flags)
781 {
782   LingerOp *info = new LingerOp;
783   info->target.base_oid = oid;
784   info->target.base_oloc = oloc;
785   if (info->target.base_oloc.key == oid)
786     info->target.base_oloc.key.clear();
787   info->target.flags = flags;
788   info->watch_valid_thru = mono_clock::now();
789
790   unique_lock l(rwlock);
791
792   // Acquire linger ID
793   info->linger_id = ++max_linger_id;
794   ldout(cct, 10) << __func__ << " info " << info
795                  << " linger_id " << info->linger_id
796                  << " cookie " << info->get_cookie()
797                  << dendl;
798   linger_ops[info->linger_id] = info;
799   linger_ops_set.insert(info);
800   assert(linger_ops.size() == linger_ops_set.size());
801
802   info->get(); // for the caller
803   return info;
804 }
805
806 ceph_tid_t Objecter::linger_watch(LingerOp *info,
807                                   ObjectOperation& op,
808                                   const SnapContext& snapc,
809                                   real_time mtime,
810                                   bufferlist& inbl,
811                                   Context *oncommit,
812                                   version_t *objver)
813 {
814   info->is_watch = true;
815   info->snapc = snapc;
816   info->mtime = mtime;
817   info->target.flags |= CEPH_OSD_FLAG_WRITE;
818   info->ops = op.ops;
819   info->inbl = inbl;
820   info->poutbl = NULL;
821   info->pobjver = objver;
822   info->on_reg_commit = oncommit;
823
824   shunique_lock sul(rwlock, ceph::acquire_unique);
825   _linger_submit(info, sul);
826   logger->inc(l_osdc_linger_active);
827
828   return info->linger_id;
829 }
830
831 ceph_tid_t Objecter::linger_notify(LingerOp *info,
832                                    ObjectOperation& op,
833                                    snapid_t snap, bufferlist& inbl,
834                                    bufferlist *poutbl,
835                                    Context *onfinish,
836                                    version_t *objver)
837 {
838   info->snap = snap;
839   info->target.flags |= CEPH_OSD_FLAG_READ;
840   info->ops = op.ops;
841   info->inbl = inbl;
842   info->poutbl = poutbl;
843   info->pobjver = objver;
844   info->on_reg_commit = onfinish;
845
846   shunique_lock sul(rwlock, ceph::acquire_unique);
847   _linger_submit(info, sul);
848   logger->inc(l_osdc_linger_active);
849
850   return info->linger_id;
851 }
852
853 void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
854 {
855   assert(sul.owns_lock() && sul.mutex() == &rwlock);
856   assert(info->linger_id);
857
858   // Populate Op::target
859   OSDSession *s = NULL;
860   _calc_target(&info->target, nullptr);
861
862   // Create LingerOp<->OSDSession relation
863   int r = _get_session(info->target.osd, &s, sul);
864   assert(r == 0);
865   OSDSession::unique_lock sl(s->lock);
866   _session_linger_op_assign(s, info);
867   sl.unlock();
868   put_session(s);
869
870   _send_linger(info, sul);
871 }
872
873 struct C_DoWatchNotify : public Context {
874   Objecter *objecter;
875   Objecter::LingerOp *info;
876   MWatchNotify *msg;
877   C_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
878     : objecter(o), info(i), msg(m) {
879     info->get();
880     info->_queued_async();
881     msg->get();
882   }
883   void finish(int r) override {
884     objecter->_do_watch_notify(info, msg);
885   }
886 };
887
888 void Objecter::handle_watch_notify(MWatchNotify *m)
889 {
890   shared_lock l(rwlock);
891   if (!initialized) {
892     return;
893   }
894
895   LingerOp *info = reinterpret_cast<LingerOp*>(m->cookie);
896   if (linger_ops_set.count(info) == 0) {
897     ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
898     return;
899   }
900   LingerOp::unique_lock wl(info->watch_lock);
901   if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
902     if (!info->last_error) {
903       info->last_error = -ENOTCONN;
904       if (info->watch_context) {
905         finisher->queue(new C_DoWatchError(this, info, -ENOTCONN));
906       }
907     }
908   } else if (!info->is_watch) {
909     // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
910     // since we know the only user (librados) is safe to call in
911     // fast-dispatch context
912     if (info->notify_id &&
913         info->notify_id != m->notify_id) {
914       ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
915                      << " != " << info->notify_id << ", ignoring" << dendl;
916     } else if (info->on_notify_finish) {
917       info->notify_result_bl->claim(m->get_data());
918       info->on_notify_finish->complete(m->return_code);
919
920       // if we race with reconnect we might get a second notify; only
921       // notify the caller once!
922       info->on_notify_finish = NULL;
923     }
924   } else {
925     finisher->queue(new C_DoWatchNotify(this, info, m));
926   }
927 }
928
929 void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
930 {
931   ldout(cct, 10) << __func__ << " " << *m << dendl;
932
933   shared_lock l(rwlock);
934   assert(initialized);
935
936   if (info->canceled) {
937     l.unlock();
938     goto out;
939   }
940
941   // notify completion?
942   assert(info->is_watch);
943   assert(info->watch_context);
944   assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
945
946   l.unlock();
947
948   switch (m->opcode) {
949   case CEPH_WATCH_EVENT_NOTIFY:
950     info->watch_context->handle_notify(m->notify_id, m->cookie,
951                                        m->notifier_gid, m->bl);
952     break;
953   }
954
955  out:
956   info->finished_async();
957   info->put();
958   m->put();
959 }
960
961 bool Objecter::ms_dispatch(Message *m)
962 {
963   ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
964   if (!initialized)
965     return false;
966
967   switch (m->get_type()) {
968     // these we exlusively handle
969   case CEPH_MSG_OSD_OPREPLY:
970     handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
971     return true;
972
973   case CEPH_MSG_OSD_BACKOFF:
974     handle_osd_backoff(static_cast<MOSDBackoff*>(m));
975     return true;
976
977   case CEPH_MSG_WATCH_NOTIFY:
978     handle_watch_notify(static_cast<MWatchNotify*>(m));
979     m->put();
980     return true;
981
982   case MSG_COMMAND_REPLY:
983     if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) {
984       handle_command_reply(static_cast<MCommandReply*>(m));
985       return true;
986     } else {
987       return false;
988     }
989
990   case MSG_GETPOOLSTATSREPLY:
991     handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply*>(m));
992     return true;
993
994   case CEPH_MSG_POOLOP_REPLY:
995     handle_pool_op_reply(static_cast<MPoolOpReply*>(m));
996     return true;
997
998   case CEPH_MSG_STATFS_REPLY:
999     handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
1000     return true;
1001
1002     // these we give others a chance to inspect
1003
1004     // MDS, OSD
1005   case CEPH_MSG_OSD_MAP:
1006     handle_osd_map(static_cast<MOSDMap*>(m));
1007     return false;
1008   }
1009   return false;
1010 }
1011
1012 void Objecter::_scan_requests(OSDSession *s,
1013                               bool force_resend,
1014                               bool cluster_full,
1015                               map<int64_t, bool> *pool_full_map,
1016                               map<ceph_tid_t, Op*>& need_resend,
1017                               list<LingerOp*>& need_resend_linger,
1018                               map<ceph_tid_t, CommandOp*>& need_resend_command,
1019                               shunique_lock& sul)
1020 {
1021   assert(sul.owns_lock() && sul.mutex() == &rwlock);
1022
1023   list<LingerOp*> unregister_lingers;
1024
1025   OSDSession::unique_lock sl(s->lock);
1026
1027   // check for changed linger mappings (_before_ regular ops)
1028   map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
1029   while (lp != s->linger_ops.end()) {
1030     LingerOp *op = lp->second;
1031     assert(op->session == s);
1032     // check_linger_pool_dne() may touch linger_ops; prevent iterator
1033     // invalidation
1034     ++lp;
1035     ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
1036     bool unregister, force_resend_writes = cluster_full;
1037     int r = _recalc_linger_op_target(op, sul);
1038     if (pool_full_map)
1039       force_resend_writes = force_resend_writes ||
1040         (*pool_full_map)[op->target.base_oloc.pool];
1041     switch (r) {
1042     case RECALC_OP_TARGET_NO_ACTION:
1043       if (!force_resend && !force_resend_writes)
1044         break;
1045       // -- fall-thru --
1046     case RECALC_OP_TARGET_NEED_RESEND:
1047       need_resend_linger.push_back(op);
1048       _linger_cancel_map_check(op);
1049       break;
1050     case RECALC_OP_TARGET_POOL_DNE:
1051       _check_linger_pool_dne(op, &unregister);
1052       if (unregister) {
1053         ldout(cct, 10) << " need to unregister linger op "
1054                        << op->linger_id << dendl;
1055         op->get();
1056         unregister_lingers.push_back(op);
1057       }
1058       break;
1059     }
1060   }
1061
1062   // check for changed request mappings
1063   map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
1064   while (p != s->ops.end()) {
1065     Op *op = p->second;
1066     ++p;   // check_op_pool_dne() may touch ops; prevent iterator invalidation
1067     ldout(cct, 10) << " checking op " << op->tid << dendl;
1068     bool force_resend_writes = cluster_full;
1069     if (pool_full_map)
1070       force_resend_writes = force_resend_writes ||
1071         (*pool_full_map)[op->target.base_oloc.pool];
1072     int r = _calc_target(&op->target,
1073                          op->session ? op->session->con.get() : nullptr);
1074     switch (r) {
1075     case RECALC_OP_TARGET_NO_ACTION:
1076       if (!force_resend && !(force_resend_writes && op->respects_full()))
1077         break;
1078       // -- fall-thru --
1079     case RECALC_OP_TARGET_NEED_RESEND:
1080       if (op->session) {
1081         _session_op_remove(op->session, op);
1082       }
1083       need_resend[op->tid] = op;
1084       _op_cancel_map_check(op);
1085       break;
1086     case RECALC_OP_TARGET_POOL_DNE:
1087       _check_op_pool_dne(op, &sl);
1088       break;
1089     }
1090   }
1091
1092   // commands
1093   map<ceph_tid_t,CommandOp*>::iterator cp = s->command_ops.begin();
1094   while (cp != s->command_ops.end()) {
1095     CommandOp *c = cp->second;
1096     ++cp;
1097     ldout(cct, 10) << " checking command " << c->tid << dendl;
1098     bool force_resend_writes = cluster_full;
1099     if (pool_full_map)
1100       force_resend_writes = force_resend_writes ||
1101         (*pool_full_map)[c->target_pg.pool()];
1102     int r = _calc_command_target(c, sul);
1103     switch (r) {
1104     case RECALC_OP_TARGET_NO_ACTION:
1105       // resend if skipped map; otherwise do nothing.
1106       if (!force_resend && !force_resend_writes)
1107         break;
1108       // -- fall-thru --
1109     case RECALC_OP_TARGET_NEED_RESEND:
1110       need_resend_command[c->tid] = c;
1111       if (c->session) {
1112         _session_command_op_remove(c->session, c);
1113       }
1114       _command_cancel_map_check(c);
1115       break;
1116     case RECALC_OP_TARGET_POOL_DNE:
1117     case RECALC_OP_TARGET_OSD_DNE:
1118     case RECALC_OP_TARGET_OSD_DOWN:
1119       _check_command_map_dne(c);
1120       break;
1121     }
1122   }
1123
1124   sl.unlock();
1125
1126   for (list<LingerOp*>::iterator iter = unregister_lingers.begin();
1127        iter != unregister_lingers.end();
1128        ++iter) {
1129     _linger_cancel(*iter);
1130     (*iter)->put();
1131   }
1132 }
1133
1134 void Objecter::handle_osd_map(MOSDMap *m)
1135 {
1136   shunique_lock sul(rwlock, acquire_unique);
1137   if (!initialized)
1138     return;
1139
1140   assert(osdmap);
1141
1142   if (m->fsid != monc->get_fsid()) {
1143     ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
1144                   << " != " << monc->get_fsid() << dendl;
1145     return;
1146   }
1147
1148   bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1149   bool cluster_full = _osdmap_full_flag();
1150   bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full ||
1151     _osdmap_has_pool_full();
1152   map<int64_t, bool> pool_full_map;
1153   for (map<int64_t, pg_pool_t>::const_iterator it
1154          = osdmap->get_pools().begin();
1155        it != osdmap->get_pools().end(); ++it)
1156     pool_full_map[it->first] = _osdmap_pool_full(it->second);
1157
1158
1159   list<LingerOp*> need_resend_linger;
1160   map<ceph_tid_t, Op*> need_resend;
1161   map<ceph_tid_t, CommandOp*> need_resend_command;
1162
1163   if (m->get_last() <= osdmap->get_epoch()) {
1164     ldout(cct, 3) << "handle_osd_map ignoring epochs ["
1165                   << m->get_first() << "," << m->get_last()
1166                   << "] <= " << osdmap->get_epoch() << dendl;
1167   } else {
1168     ldout(cct, 3) << "handle_osd_map got epochs ["
1169                   << m->get_first() << "," << m->get_last()
1170                   << "] > " << osdmap->get_epoch() << dendl;
1171
1172     if (osdmap->get_epoch()) {
1173       bool skipped_map = false;
1174       // we want incrementals
1175       for (epoch_t e = osdmap->get_epoch() + 1;
1176            e <= m->get_last();
1177            e++) {
1178
1179         if (osdmap->get_epoch() == e-1 &&
1180             m->incremental_maps.count(e)) {
1181           ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e
1182                         << dendl;
1183           OSDMap::Incremental inc(m->incremental_maps[e]);
1184           osdmap->apply_incremental(inc);
1185
1186           emit_blacklist_events(inc);
1187
1188           logger->inc(l_osdc_map_inc);
1189         }
1190         else if (m->maps.count(e)) {
1191           ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
1192           OSDMap *new_osdmap = new OSDMap();
1193           new_osdmap->decode(m->maps[e]);
1194
1195           emit_blacklist_events(*osdmap, *new_osdmap);
1196
1197           osdmap = new_osdmap;
1198
1199           logger->inc(l_osdc_map_full);
1200         }
1201         else {
1202           if (e >= m->get_oldest()) {
1203             ldout(cct, 3) << "handle_osd_map requesting missing epoch "
1204                           << osdmap->get_epoch()+1 << dendl;
1205             _maybe_request_map();
1206             break;
1207           }
1208           ldout(cct, 3) << "handle_osd_map missing epoch "
1209                         << osdmap->get_epoch()+1
1210                         << ", jumping to " << m->get_oldest() << dendl;
1211           e = m->get_oldest() - 1;
1212           skipped_map = true;
1213           continue;
1214         }
1215         logger->set(l_osdc_map_epoch, osdmap->get_epoch());
1216
1217         cluster_full = cluster_full || _osdmap_full_flag();
1218         update_pool_full_map(pool_full_map);
1219
1220         // check all outstanding requests on every epoch
1221         _scan_requests(homeless_session, skipped_map, cluster_full,
1222                        &pool_full_map, need_resend,
1223                        need_resend_linger, need_resend_command, sul);
1224         for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1225              p != osd_sessions.end(); ) {
1226           OSDSession *s = p->second;
1227           _scan_requests(s, skipped_map, cluster_full,
1228                          &pool_full_map, need_resend,
1229                          need_resend_linger, need_resend_command, sul);
1230           ++p;
1231           // osd down or addr change?
1232           if (!osdmap->is_up(s->osd) ||
1233               (s->con &&
1234                s->con->get_peer_addr() != osdmap->get_inst(s->osd).addr)) {
1235             close_session(s);
1236           }
1237         }
1238
1239         assert(e == osdmap->get_epoch());
1240       }
1241
1242     } else {
1243       // first map.  we want the full thing.
1244       if (m->maps.count(m->get_last())) {
1245         for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1246              p != osd_sessions.end(); ++p) {
1247           OSDSession *s = p->second;
1248           _scan_requests(s, false, false, NULL, need_resend,
1249                          need_resend_linger, need_resend_command, sul);
1250         }
1251         ldout(cct, 3) << "handle_osd_map decoding full epoch "
1252                       << m->get_last() << dendl;
1253         osdmap->decode(m->maps[m->get_last()]);
1254
1255         _scan_requests(homeless_session, false, false, NULL,
1256                        need_resend, need_resend_linger,
1257                        need_resend_command, sul);
1258       } else {
1259         ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
1260                       << dendl;
1261         monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
1262         monc->renew_subs();
1263       }
1264     }
1265   }
1266
1267   // make sure need_resend targets reflect latest map
1268   for (auto p = need_resend.begin(); p != need_resend.end(); ) {
1269     Op *op = p->second;
1270     if (op->target.epoch < osdmap->get_epoch()) {
1271       ldout(cct, 10) << __func__ << "  checking op " << p->first << dendl;
1272       int r = _calc_target(&op->target, nullptr);
1273       if (r == RECALC_OP_TARGET_POOL_DNE) {
1274         p = need_resend.erase(p);
1275         _check_op_pool_dne(op, nullptr);
1276       } else {
1277         ++p;
1278       }
1279     } else {
1280       ++p;
1281     }
1282   }
1283
1284   bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1285   bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag()
1286     || _osdmap_has_pool_full();
1287
1288   // was/is paused?
1289   if (was_pauserd || was_pausewr || pauserd || pausewr ||
1290       osdmap->get_epoch() < epoch_barrier) {
1291     _maybe_request_map();
1292   }
1293
1294   // resend requests
1295   for (map<ceph_tid_t, Op*>::iterator p = need_resend.begin();
1296        p != need_resend.end(); ++p) {
1297     Op *op = p->second;
1298     OSDSession *s = op->session;
1299     bool mapped_session = false;
1300     if (!s) {
1301       int r = _map_session(&op->target, &s, sul);
1302       assert(r == 0);
1303       mapped_session = true;
1304     } else {
1305       get_session(s);
1306     }
1307     OSDSession::unique_lock sl(s->lock);
1308     if (mapped_session) {
1309       _session_op_assign(s, op);
1310     }
1311     if (op->should_resend) {
1312       if (!op->session->is_homeless() && !op->target.paused) {
1313         logger->inc(l_osdc_op_resend);
1314         _send_op(op);
1315       }
1316     } else {
1317       _op_cancel_map_check(op);
1318       _cancel_linger_op(op);
1319     }
1320     sl.unlock();
1321     put_session(s);
1322   }
1323   for (list<LingerOp*>::iterator p = need_resend_linger.begin();
1324        p != need_resend_linger.end(); ++p) {
1325     LingerOp *op = *p;
1326     if (!op->session) {
1327       _calc_target(&op->target, nullptr);
1328       OSDSession *s = NULL;
1329       const int r = _get_session(op->target.osd, &s, sul);
1330       assert(r == 0);
1331       assert(s != NULL);
1332       op->session = s;
1333       put_session(s);
1334     }
1335     if (!op->session->is_homeless()) {
1336       logger->inc(l_osdc_linger_resend);
1337       _send_linger(op, sul);
1338     }
1339   }
1340   for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin();
1341        p != need_resend_command.end(); ++p) {
1342     CommandOp *c = p->second;
1343     if (c->target.osd >= 0) {
1344       _assign_command_session(c, sul);
1345       if (c->session && !c->session->is_homeless()) {
1346         _send_command(c);
1347       }
1348     }
1349   }
1350
1351   _dump_active();
1352
1353   // finish any Contexts that were waiting on a map update
1354   map<epoch_t,list< pair< Context*, int > > >::iterator p =
1355     waiting_for_map.begin();
1356   while (p != waiting_for_map.end() &&
1357          p->first <= osdmap->get_epoch()) {
1358     //go through the list and call the onfinish methods
1359     for (list<pair<Context*, int> >::iterator i = p->second.begin();
1360          i != p->second.end(); ++i) {
1361       i->first->complete(i->second);
1362     }
1363     waiting_for_map.erase(p++);
1364   }
1365
1366   monc->sub_got("osdmap", osdmap->get_epoch());
1367
1368   if (!waiting_for_map.empty()) {
1369     _maybe_request_map();
1370   }
1371 }
1372
1373 void Objecter::enable_blacklist_events()
1374 {
1375   unique_lock wl(rwlock);
1376
1377   blacklist_events_enabled = true;
1378 }
1379
1380 void Objecter::consume_blacklist_events(std::set<entity_addr_t> *events)
1381 {
1382   unique_lock wl(rwlock);
1383
1384   if (events->empty()) {
1385     events->swap(blacklist_events);
1386   } else {
1387     for (const auto &i : blacklist_events) {
1388       events->insert(i);
1389     }
1390     blacklist_events.clear();
1391   }
1392 }
1393
1394 void Objecter::emit_blacklist_events(const OSDMap::Incremental &inc)
1395 {
1396   if (!blacklist_events_enabled) {
1397     return;
1398   }
1399
1400   for (const auto &i : inc.new_blacklist) {
1401     blacklist_events.insert(i.first);
1402   }
1403 }
1404
1405 void Objecter::emit_blacklist_events(const OSDMap &old_osd_map,
1406                                      const OSDMap &new_osd_map)
1407 {
1408   if (!blacklist_events_enabled) {
1409     return;
1410   }
1411
1412   std::set<entity_addr_t> old_set;
1413   std::set<entity_addr_t> new_set;
1414
1415   old_osd_map.get_blacklist(&old_set);
1416   new_osd_map.get_blacklist(&new_set);
1417
1418   std::set<entity_addr_t> delta_set;
1419   std::set_difference(
1420       new_set.begin(), new_set.end(), old_set.begin(), old_set.end(),
1421       std::inserter(delta_set, delta_set.begin()));
1422   blacklist_events.insert(delta_set.begin(), delta_set.end());
1423 }
1424
1425 // op pool check
1426
1427 void Objecter::C_Op_Map_Latest::finish(int r)
1428 {
1429   if (r == -EAGAIN || r == -ECANCELED)
1430     return;
1431
1432   lgeneric_subdout(objecter->cct, objecter, 10)
1433     << "op_map_latest r=" << r << " tid=" << tid
1434     << " latest " << latest << dendl;
1435
1436   Objecter::unique_lock wl(objecter->rwlock);
1437
1438   map<ceph_tid_t, Op*>::iterator iter =
1439     objecter->check_latest_map_ops.find(tid);
1440   if (iter == objecter->check_latest_map_ops.end()) {
1441     lgeneric_subdout(objecter->cct, objecter, 10)
1442       << "op_map_latest op "<< tid << " not found" << dendl;
1443     return;
1444   }
1445
1446   Op *op = iter->second;
1447   objecter->check_latest_map_ops.erase(iter);
1448
1449   lgeneric_subdout(objecter->cct, objecter, 20)
1450     << "op_map_latest op "<< op << dendl;
1451
1452   if (op->map_dne_bound == 0)
1453     op->map_dne_bound = latest;
1454
1455   OSDSession::unique_lock sl(op->session->lock, defer_lock);
1456   objecter->_check_op_pool_dne(op, &sl);
1457
1458   op->put();
1459 }
1460
1461 int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name,
1462                                 snapid_t *snap) const
1463 {
1464   shared_lock rl(rwlock);
1465
1466   auto& pools = osdmap->get_pools();
1467   auto iter = pools.find(poolid);
1468   if (iter == pools.end()) {
1469     return -ENOENT;
1470   }
1471   const pg_pool_t& pg_pool = iter->second;
1472   for (auto p = pg_pool.snaps.begin();
1473        p != pg_pool.snaps.end();
1474        ++p) {
1475     if (p->second.name == snap_name) {
1476       *snap = p->first;
1477       return 0;
1478     }
1479   }
1480   return -ENOENT;
1481 }
1482
1483 int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap,
1484                                  pool_snap_info_t *info) const
1485 {
1486   shared_lock rl(rwlock);
1487
1488   auto& pools = osdmap->get_pools();
1489   auto iter = pools.find(poolid);
1490   if (iter == pools.end()) {
1491     return -ENOENT;
1492   }
1493   const pg_pool_t& pg_pool = iter->second;
1494   auto p = pg_pool.snaps.find(snap);
1495   if (p == pg_pool.snaps.end())
1496     return -ENOENT;
1497   *info = p->second;
1498
1499   return 0;
1500 }
1501
1502 int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
1503 {
1504   shared_lock rl(rwlock);
1505
1506   const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
1507   if (!pi)
1508     return -ENOENT;
1509   for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin();
1510        p != pi->snaps.end();
1511        ++p) {
1512     snaps->push_back(p->first);
1513   }
1514   return 0;
1515 }
1516
1517 // sl may be unlocked.
1518 void Objecter::_check_op_pool_dne(Op *op, unique_lock *sl)
1519 {
1520   // rwlock is locked unique
1521
1522   if (op->target.pool_ever_existed) {
1523     // the pool previously existed and now it does not, which means it
1524     // was deleted.
1525     op->map_dne_bound = osdmap->get_epoch();
1526     ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1527                    << " pool previously exists but now does not"
1528                    << dendl;
1529   } else {
1530     ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1531                    << " current " << osdmap->get_epoch()
1532                    << " map_dne_bound " << op->map_dne_bound
1533                    << dendl;
1534   }
1535   if (op->map_dne_bound > 0) {
1536     if (osdmap->get_epoch() >= op->map_dne_bound) {
1537       // we had a new enough map
1538       ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1539                      << " concluding pool " << op->target.base_pgid.pool()
1540                      << " dne" << dendl;
1541       if (op->onfinish) {
1542         op->onfinish->complete(-ENOENT);
1543       }
1544
1545       OSDSession *s = op->session;
1546       if (s) {
1547         assert(s != NULL);
1548         assert(sl->mutex() == &s->lock);
1549         bool session_locked = sl->owns_lock();
1550         if (!session_locked) {
1551           sl->lock();
1552         }
1553         _finish_op(op, 0);
1554         if (!session_locked) {
1555           sl->unlock();
1556         }
1557       } else {
1558         _finish_op(op, 0);      // no session
1559       }
1560     }
1561   } else {
1562     _send_op_map_check(op);
1563   }
1564 }
1565
1566 void Objecter::_send_op_map_check(Op *op)
1567 {
1568   // rwlock is locked unique
1569   // ask the monitor
1570   if (check_latest_map_ops.count(op->tid) == 0) {
1571     op->get();
1572     check_latest_map_ops[op->tid] = op;
1573     C_Op_Map_Latest *c = new C_Op_Map_Latest(this, op->tid);
1574     monc->get_version("osdmap", &c->latest, NULL, c);
1575   }
1576 }
1577
1578 void Objecter::_op_cancel_map_check(Op *op)
1579 {
1580   // rwlock is locked unique
1581   map<ceph_tid_t, Op*>::iterator iter =
1582     check_latest_map_ops.find(op->tid);
1583   if (iter != check_latest_map_ops.end()) {
1584     Op *op = iter->second;
1585     op->put();
1586     check_latest_map_ops.erase(iter);
1587   }
1588 }
1589
1590 // linger pool check
1591
1592 void Objecter::C_Linger_Map_Latest::finish(int r)
1593 {
1594   if (r == -EAGAIN || r == -ECANCELED) {
1595     // ignore callback; we will retry in resend_mon_ops()
1596     return;
1597   }
1598
1599   unique_lock wl(objecter->rwlock);
1600
1601   map<uint64_t, LingerOp*>::iterator iter =
1602     objecter->check_latest_map_lingers.find(linger_id);
1603   if (iter == objecter->check_latest_map_lingers.end()) {
1604     return;
1605   }
1606
1607   LingerOp *op = iter->second;
1608   objecter->check_latest_map_lingers.erase(iter);
1609
1610   if (op->map_dne_bound == 0)
1611     op->map_dne_bound = latest;
1612
1613   bool unregister;
1614   objecter->_check_linger_pool_dne(op, &unregister);
1615
1616   if (unregister) {
1617     objecter->_linger_cancel(op);
1618   }
1619
1620   op->put();
1621 }
1622
1623 void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
1624 {
1625   // rwlock is locked unique
1626
1627   *need_unregister = false;
1628
1629   if (op->register_gen > 0) {
1630     ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1631                    << " pool previously existed but now does not"
1632                    << dendl;
1633     op->map_dne_bound = osdmap->get_epoch();
1634   } else {
1635     ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1636                    << " current " << osdmap->get_epoch()
1637                    << " map_dne_bound " << op->map_dne_bound
1638                    << dendl;
1639   }
1640   if (op->map_dne_bound > 0) {
1641     if (osdmap->get_epoch() >= op->map_dne_bound) {
1642       if (op->on_reg_commit) {
1643         op->on_reg_commit->complete(-ENOENT);
1644       }
1645       *need_unregister = true;
1646     }
1647   } else {
1648     _send_linger_map_check(op);
1649   }
1650 }
1651
1652 void Objecter::_send_linger_map_check(LingerOp *op)
1653 {
1654   // ask the monitor
1655   if (check_latest_map_lingers.count(op->linger_id) == 0) {
1656     op->get();
1657     check_latest_map_lingers[op->linger_id] = op;
1658     C_Linger_Map_Latest *c = new C_Linger_Map_Latest(this, op->linger_id);
1659     monc->get_version("osdmap", &c->latest, NULL, c);
1660   }
1661 }
1662
1663 void Objecter::_linger_cancel_map_check(LingerOp *op)
1664 {
1665   // rwlock is locked unique
1666
1667   map<uint64_t, LingerOp*>::iterator iter =
1668     check_latest_map_lingers.find(op->linger_id);
1669   if (iter != check_latest_map_lingers.end()) {
1670     LingerOp *op = iter->second;
1671     op->put();
1672     check_latest_map_lingers.erase(iter);
1673   }
1674 }
1675
1676 // command pool check
1677
1678 void Objecter::C_Command_Map_Latest::finish(int r)
1679 {
1680   if (r == -EAGAIN || r == -ECANCELED) {
1681     // ignore callback; we will retry in resend_mon_ops()
1682     return;
1683   }
1684
1685   unique_lock wl(objecter->rwlock);
1686
1687   map<uint64_t, CommandOp*>::iterator iter =
1688     objecter->check_latest_map_commands.find(tid);
1689   if (iter == objecter->check_latest_map_commands.end()) {
1690     return;
1691   }
1692
1693   CommandOp *c = iter->second;
1694   objecter->check_latest_map_commands.erase(iter);
1695
1696   if (c->map_dne_bound == 0)
1697     c->map_dne_bound = latest;
1698
1699   objecter->_check_command_map_dne(c);
1700
1701   c->put();
1702 }
1703
1704 void Objecter::_check_command_map_dne(CommandOp *c)
1705 {
1706   // rwlock is locked unique
1707
1708   ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
1709                  << " current " << osdmap->get_epoch()
1710                  << " map_dne_bound " << c->map_dne_bound
1711                  << dendl;
1712   if (c->map_dne_bound > 0) {
1713     if (osdmap->get_epoch() >= c->map_dne_bound) {
1714       _finish_command(c, c->map_check_error, c->map_check_error_str);
1715     }
1716   } else {
1717     _send_command_map_check(c);
1718   }
1719 }
1720
1721 void Objecter::_send_command_map_check(CommandOp *c)
1722 {
1723   // rwlock is locked unique
1724
1725   // ask the monitor
1726   if (check_latest_map_commands.count(c->tid) == 0) {
1727     c->get();
1728     check_latest_map_commands[c->tid] = c;
1729     C_Command_Map_Latest *f = new C_Command_Map_Latest(this, c->tid);
1730     monc->get_version("osdmap", &f->latest, NULL, f);
1731   }
1732 }
1733
1734 void Objecter::_command_cancel_map_check(CommandOp *c)
1735 {
1736   // rwlock is locked uniqe
1737
1738   map<uint64_t, CommandOp*>::iterator iter =
1739     check_latest_map_commands.find(c->tid);
1740   if (iter != check_latest_map_commands.end()) {
1741     CommandOp *c = iter->second;
1742     c->put();
1743     check_latest_map_commands.erase(iter);
1744   }
1745 }
1746
1747
1748 /**
1749  * Look up OSDSession by OSD id.
1750  *
1751  * @returns 0 on success, or -EAGAIN if the lock context requires
1752  * promotion to write.
1753  */
1754 int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
1755 {
1756   assert(sul && sul.mutex() == &rwlock);
1757
1758   if (osd < 0) {
1759     *session = homeless_session;
1760     ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless"
1761                    << dendl;
1762     return 0;
1763   }
1764
1765   map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
1766   if (p != osd_sessions.end()) {
1767     OSDSession *s = p->second;
1768     s->get();
1769     *session = s;
1770     ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1771                    << s->get_nref() << dendl;
1772     return 0;
1773   }
1774   if (!sul.owns_lock()) {
1775     return -EAGAIN;
1776   }
1777   OSDSession *s = new OSDSession(cct, osd);
1778   osd_sessions[osd] = s;
1779   s->con = messenger->get_connection(osdmap->get_inst(osd));
1780   s->con->set_priv(s->get());
1781   logger->inc(l_osdc_osd_session_open);
1782   logger->set(l_osdc_osd_sessions, osd_sessions.size());
1783   s->get();
1784   *session = s;
1785   ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1786                  << s->get_nref() << dendl;
1787   return 0;
1788 }
1789
1790 void Objecter::put_session(Objecter::OSDSession *s)
1791 {
1792   if (s && !s->is_homeless()) {
1793     ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1794                    << s->get_nref() << dendl;
1795     s->put();
1796   }
1797 }
1798
1799 void Objecter::get_session(Objecter::OSDSession *s)
1800 {
1801   assert(s != NULL);
1802
1803   if (!s->is_homeless()) {
1804     ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1805                    << s->get_nref() << dendl;
1806     s->get();
1807   }
1808 }
1809
1810 void Objecter::_reopen_session(OSDSession *s)
1811 {
1812   // s->lock is locked
1813
1814   entity_inst_t inst = osdmap->get_inst(s->osd);
1815   ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
1816                  << inst << dendl;
1817   if (s->con) {
1818     s->con->set_priv(NULL);
1819     s->con->mark_down();
1820     logger->inc(l_osdc_osd_session_close);
1821   }
1822   s->con = messenger->get_connection(inst);
1823   s->con->set_priv(s->get());
1824   s->incarnation++;
1825   logger->inc(l_osdc_osd_session_open);
1826 }
1827
1828 void Objecter::close_session(OSDSession *s)
1829 {
1830   // rwlock is locked unique
1831
1832   ldout(cct, 10) << "close_session for osd." << s->osd << dendl;
1833   if (s->con) {
1834     s->con->set_priv(NULL);
1835     s->con->mark_down();
1836     logger->inc(l_osdc_osd_session_close);
1837   }
1838   OSDSession::unique_lock sl(s->lock);
1839
1840   std::list<LingerOp*> homeless_lingers;
1841   std::list<CommandOp*> homeless_commands;
1842   std::list<Op*> homeless_ops;
1843
1844   while (!s->linger_ops.empty()) {
1845     std::map<uint64_t, LingerOp*>::iterator i = s->linger_ops.begin();
1846     ldout(cct, 10) << " linger_op " << i->first << dendl;
1847     homeless_lingers.push_back(i->second);
1848     _session_linger_op_remove(s, i->second);
1849   }
1850
1851   while (!s->ops.empty()) {
1852     std::map<ceph_tid_t, Op*>::iterator i = s->ops.begin();
1853     ldout(cct, 10) << " op " << i->first << dendl;
1854     homeless_ops.push_back(i->second);
1855     _session_op_remove(s, i->second);
1856   }
1857
1858   while (!s->command_ops.empty()) {
1859     std::map<ceph_tid_t, CommandOp*>::iterator i = s->command_ops.begin();
1860     ldout(cct, 10) << " command_op " << i->first << dendl;
1861     homeless_commands.push_back(i->second);
1862     _session_command_op_remove(s, i->second);
1863   }
1864
1865   osd_sessions.erase(s->osd);
1866   sl.unlock();
1867   put_session(s);
1868
1869   // Assign any leftover ops to the homeless session
1870   {
1871     OSDSession::unique_lock hsl(homeless_session->lock);
1872     for (std::list<LingerOp*>::iterator i = homeless_lingers.begin();
1873          i != homeless_lingers.end(); ++i) {
1874       _session_linger_op_assign(homeless_session, *i);
1875     }
1876     for (std::list<Op*>::iterator i = homeless_ops.begin();
1877          i != homeless_ops.end(); ++i) {
1878       _session_op_assign(homeless_session, *i);
1879     }
1880     for (std::list<CommandOp*>::iterator i = homeless_commands.begin();
1881          i != homeless_commands.end(); ++i) {
1882       _session_command_op_assign(homeless_session, *i);
1883     }
1884   }
1885
1886   logger->set(l_osdc_osd_sessions, osd_sessions.size());
1887 }
1888
1889 void Objecter::wait_for_osd_map()
1890 {
1891   unique_lock l(rwlock);
1892   if (osdmap->get_epoch()) {
1893     l.unlock();
1894     return;
1895   }
1896
1897   // Leave this since it goes with C_SafeCond
1898   Mutex lock("");
1899   Cond cond;
1900   bool done;
1901   lock.Lock();
1902   C_SafeCond *context = new C_SafeCond(&lock, &cond, &done, NULL);
1903   waiting_for_map[0].push_back(pair<Context*, int>(context, 0));
1904   l.unlock();
1905   while (!done)
1906     cond.Wait(lock);
1907   lock.Unlock();
1908 }
1909
1910 struct C_Objecter_GetVersion : public Context {
1911   Objecter *objecter;
1912   uint64_t oldest, newest;
1913   Context *fin;
1914   C_Objecter_GetVersion(Objecter *o, Context *c)
1915     : objecter(o), oldest(0), newest(0), fin(c) {}
1916   void finish(int r) override {
1917     if (r >= 0) {
1918       objecter->get_latest_version(oldest, newest, fin);
1919     } else if (r == -EAGAIN) { // try again as instructed
1920       objecter->wait_for_latest_osdmap(fin);
1921     } else {
1922       // it doesn't return any other error codes!
1923       ceph_abort();
1924     }
1925   }
1926 };
1927
1928 void Objecter::wait_for_latest_osdmap(Context *fin)
1929 {
1930   ldout(cct, 10) << __func__ << dendl;
1931   C_Objecter_GetVersion *c = new C_Objecter_GetVersion(this, fin);
1932   monc->get_version("osdmap", &c->newest, &c->oldest, c);
1933 }
1934
1935 void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
1936 {
1937   unique_lock wl(rwlock);
1938   _get_latest_version(oldest, newest, fin);
1939 }
1940
1941 void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest,
1942                                    Context *fin)
1943 {
1944   // rwlock is locked unique
1945   if (osdmap->get_epoch() >= newest) {
1946   ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
1947     if (fin)
1948       fin->complete(0);
1949     return;
1950   }
1951
1952   ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
1953   _wait_for_new_map(fin, newest, 0);
1954 }
1955
1956 void Objecter::maybe_request_map()
1957 {
1958   shared_lock rl(rwlock);
1959   _maybe_request_map();
1960 }
1961
1962 void Objecter::_maybe_request_map()
1963 {
1964   // rwlock is locked
1965   int flag = 0;
1966   if (_osdmap_full_flag()
1967       || osdmap->test_flag(CEPH_OSDMAP_PAUSERD)
1968       || osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
1969     ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next "
1970       "osd map (FULL flag is set)" << dendl;
1971   } else {
1972     ldout(cct, 10)
1973       << "_maybe_request_map subscribing (onetime) to next osd map" << dendl;
1974     flag = CEPH_SUBSCRIBE_ONETIME;
1975   }
1976   epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0;
1977   if (monc->sub_want("osdmap", epoch, flag)) {
1978     monc->renew_subs();
1979   }
1980 }
1981
1982 void Objecter::_wait_for_new_map(Context *c, epoch_t epoch, int err)
1983 {
1984   // rwlock is locked unique
1985   waiting_for_map[epoch].push_back(pair<Context *, int>(c, err));
1986   _maybe_request_map();
1987 }
1988
1989
1990 /**
1991  * Use this together with wait_for_map: this is a pre-check to avoid
1992  * allocating a Context for wait_for_map if we can see that we
1993  * definitely already have the epoch.
1994  *
1995  * This does *not* replace the need to handle the return value of
1996  * wait_for_map: just because we don't have it in this pre-check
1997  * doesn't mean we won't have it when calling back into wait_for_map,
1998  * since the objecter lock is dropped in between.
1999  */
2000 bool Objecter::have_map(const epoch_t epoch)
2001 {
2002   shared_lock rl(rwlock);
2003   if (osdmap->get_epoch() >= epoch) {
2004     return true;
2005   } else {
2006     return false;
2007   }
2008 }
2009
2010 bool Objecter::wait_for_map(epoch_t epoch, Context *c, int err)
2011 {
2012   unique_lock wl(rwlock);
2013   if (osdmap->get_epoch() >= epoch) {
2014     return true;
2015   }
2016   _wait_for_new_map(c, epoch, err);
2017   return false;
2018 }
2019
2020 void Objecter::kick_requests(OSDSession *session)
2021 {
2022   ldout(cct, 10) << "kick_requests for osd." << session->osd << dendl;
2023
2024   map<uint64_t, LingerOp *> lresend;
2025   unique_lock wl(rwlock);
2026
2027   OSDSession::unique_lock sl(session->lock);
2028   _kick_requests(session, lresend);
2029   sl.unlock();
2030
2031   _linger_ops_resend(lresend, wl);
2032 }
2033
2034 void Objecter::_kick_requests(OSDSession *session,
2035                               map<uint64_t, LingerOp *>& lresend)
2036 {
2037   // rwlock is locked unique
2038
2039   // clear backoffs
2040   session->backoffs.clear();
2041   session->backoffs_by_id.clear();
2042
2043   // resend ops
2044   map<ceph_tid_t,Op*> resend;  // resend in tid order
2045   for (map<ceph_tid_t, Op*>::iterator p = session->ops.begin();
2046        p != session->ops.end();) {
2047     Op *op = p->second;
2048     ++p;
2049     logger->inc(l_osdc_op_resend);
2050     if (op->should_resend) {
2051       if (!op->target.paused)
2052         resend[op->tid] = op;
2053     } else {
2054       _op_cancel_map_check(op);
2055       _cancel_linger_op(op);
2056     }
2057   }
2058
2059   while (!resend.empty()) {
2060     _send_op(resend.begin()->second);
2061     resend.erase(resend.begin());
2062   }
2063
2064   // resend lingers
2065   for (map<ceph_tid_t, LingerOp*>::iterator j = session->linger_ops.begin();
2066        j != session->linger_ops.end(); ++j) {
2067     LingerOp *op = j->second;
2068     op->get();
2069     logger->inc(l_osdc_linger_resend);
2070     assert(lresend.count(j->first) == 0);
2071     lresend[j->first] = op;
2072   }
2073
2074   // resend commands
2075   map<uint64_t,CommandOp*> cresend;  // resend in order
2076   for (map<ceph_tid_t, CommandOp*>::iterator k = session->command_ops.begin();
2077        k != session->command_ops.end(); ++k) {
2078     logger->inc(l_osdc_command_resend);
2079     cresend[k->first] = k->second;
2080   }
2081   while (!cresend.empty()) {
2082     _send_command(cresend.begin()->second);
2083     cresend.erase(cresend.begin());
2084   }
2085 }
2086
2087 void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
2088                                   unique_lock& ul)
2089 {
2090   assert(ul.owns_lock());
2091   shunique_lock sul(std::move(ul));
2092   while (!lresend.empty()) {
2093     LingerOp *op = lresend.begin()->second;
2094     if (!op->canceled) {
2095       _send_linger(op, sul);
2096     }
2097     op->put();
2098     lresend.erase(lresend.begin());
2099   }
2100   ul = unique_lock(sul.release_to_unique());
2101 }
2102
2103 void Objecter::start_tick()
2104 {
2105   assert(tick_event == 0);
2106   tick_event =
2107     timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
2108                     &Objecter::tick, this);
2109 }
2110
2111 void Objecter::tick()
2112 {
2113   shared_lock rl(rwlock);
2114
2115   ldout(cct, 10) << "tick" << dendl;
2116
2117   // we are only called by C_Tick
2118   tick_event = 0;
2119
2120   if (!initialized) {
2121     // we raced with shutdown
2122     ldout(cct, 10) << __func__ << " raced with shutdown" << dendl;
2123     return;
2124   }
2125
2126   set<OSDSession*> toping;
2127
2128
2129   // look for laggy requests
2130   auto cutoff = ceph::mono_clock::now();
2131   cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout);  // timeout
2132
2133   unsigned laggy_ops = 0;
2134
2135   for (map<int,OSDSession*>::iterator siter = osd_sessions.begin();
2136        siter != osd_sessions.end(); ++siter) {
2137     OSDSession *s = siter->second;
2138     OSDSession::lock_guard l(s->lock);
2139     bool found = false;
2140     for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
2141         p != s->ops.end();
2142         ++p) {
2143       Op *op = p->second;
2144       assert(op->session);
2145       if (op->stamp < cutoff) {
2146         ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
2147                       << " is laggy" << dendl;
2148         found = true;
2149         ++laggy_ops;
2150       }
2151     }
2152     for (map<uint64_t,LingerOp*>::iterator p = s->linger_ops.begin();
2153         p != s->linger_ops.end();
2154         ++p) {
2155       LingerOp *op = p->second;
2156       LingerOp::unique_lock wl(op->watch_lock);
2157       assert(op->session);
2158       ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
2159                      << " (osd." << op->session->osd << ")" << dendl;
2160       found = true;
2161       if (op->is_watch && op->registered && !op->last_error)
2162         _send_linger_ping(op);
2163     }
2164     for (map<uint64_t,CommandOp*>::iterator p = s->command_ops.begin();
2165         p != s->command_ops.end();
2166         ++p) {
2167       CommandOp *op = p->second;
2168       assert(op->session);
2169       ldout(cct, 10) << " pinging osd that serves command tid " << p->first
2170                      << " (osd." << op->session->osd << ")" << dendl;
2171       found = true;
2172     }
2173     if (found)
2174       toping.insert(s);
2175   }
2176   if (num_homeless_ops || !toping.empty()) {
2177     _maybe_request_map();
2178   }
2179
2180   logger->set(l_osdc_op_laggy, laggy_ops);
2181   logger->set(l_osdc_osd_laggy, toping.size());
2182
2183   if (!toping.empty()) {
2184     // send a ping to these osds, to ensure we detect any session resets
2185     // (osd reply message policy is lossy)
2186     for (set<OSDSession*>::const_iterator i = toping.begin();
2187          i != toping.end();
2188          ++i) {
2189       (*i)->con->send_message(new MPing);
2190     }
2191   }
2192
2193   // Make sure we don't resechedule if we wake up after shutdown
2194   if (initialized) {
2195     tick_event = timer.reschedule_me(ceph::make_timespan(
2196                                        cct->_conf->objecter_tick_interval));
2197   }
2198 }
2199
2200 void Objecter::resend_mon_ops()
2201 {
2202   unique_lock wl(rwlock);
2203
2204   ldout(cct, 10) << "resend_mon_ops" << dendl;
2205
2206   for (map<ceph_tid_t,PoolStatOp*>::iterator p = poolstat_ops.begin();
2207        p != poolstat_ops.end();
2208        ++p) {
2209     _poolstat_submit(p->second);
2210     logger->inc(l_osdc_poolstat_resend);
2211   }
2212
2213   for (map<ceph_tid_t,StatfsOp*>::iterator p = statfs_ops.begin();
2214        p != statfs_ops.end();
2215        ++p) {
2216     _fs_stats_submit(p->second);
2217     logger->inc(l_osdc_statfs_resend);
2218   }
2219
2220   for (map<ceph_tid_t,PoolOp*>::iterator p = pool_ops.begin();
2221        p != pool_ops.end();
2222        ++p) {
2223     _pool_op_submit(p->second);
2224     logger->inc(l_osdc_poolop_resend);
2225   }
2226
2227   for (map<ceph_tid_t, Op*>::iterator p = check_latest_map_ops.begin();
2228        p != check_latest_map_ops.end();
2229        ++p) {
2230     C_Op_Map_Latest *c = new C_Op_Map_Latest(this, p->second->tid);
2231     monc->get_version("osdmap", &c->latest, NULL, c);
2232   }
2233
2234   for (map<uint64_t, LingerOp*>::iterator p = check_latest_map_lingers.begin();
2235        p != check_latest_map_lingers.end();
2236        ++p) {
2237     C_Linger_Map_Latest *c
2238       = new C_Linger_Map_Latest(this, p->second->linger_id);
2239     monc->get_version("osdmap", &c->latest, NULL, c);
2240   }
2241
2242   for (map<uint64_t, CommandOp*>::iterator p
2243          = check_latest_map_commands.begin();
2244        p != check_latest_map_commands.end();
2245        ++p) {
2246     C_Command_Map_Latest *c = new C_Command_Map_Latest(this, p->second->tid);
2247     monc->get_version("osdmap", &c->latest, NULL, c);
2248   }
2249 }
2250
2251 // read | write ---------------------------
2252
2253 void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
2254 {
2255   shunique_lock rl(rwlock, ceph::acquire_shared);
2256   ceph_tid_t tid = 0;
2257   if (!ptid)
2258     ptid = &tid;
2259   op->trace.event("op submit");
2260   _op_submit_with_budget(op, rl, ptid, ctx_budget);
2261 }
2262
2263 void Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
2264                                       ceph_tid_t *ptid,
2265                                       int *ctx_budget)
2266 {
2267   assert(initialized);
2268
2269   assert(op->ops.size() == op->out_bl.size());
2270   assert(op->ops.size() == op->out_rval.size());
2271   assert(op->ops.size() == op->out_handler.size());
2272
2273   // throttle.  before we look at any state, because
2274   // _take_op_budget() may drop our lock while it blocks.
2275   if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
2276     int op_budget = _take_op_budget(op, sul);
2277     // take and pass out the budget for the first OP
2278     // in the context session
2279     if (ctx_budget && (*ctx_budget == -1)) {
2280       *ctx_budget = op_budget;
2281     }
2282   }
2283
2284   if (osd_timeout > timespan(0)) {
2285     if (op->tid == 0)
2286       op->tid = ++last_tid;
2287     auto tid = op->tid;
2288     op->ontimeout = timer.add_event(osd_timeout,
2289                                     [this, tid]() {
2290                                       op_cancel(tid, -ETIMEDOUT); });
2291   }
2292
2293   _op_submit(op, sul, ptid);
2294 }
2295
2296 void Objecter::_send_op_account(Op *op)
2297 {
2298   inflight_ops++;
2299
2300   // add to gather set(s)
2301   if (op->onfinish) {
2302     num_in_flight++;
2303   } else {
2304     ldout(cct, 20) << " note: not requesting reply" << dendl;
2305   }
2306
2307   logger->inc(l_osdc_op_active);
2308   logger->inc(l_osdc_op);
2309
2310   if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) ==
2311       (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
2312     logger->inc(l_osdc_op_rmw);
2313   else if (op->target.flags & CEPH_OSD_FLAG_WRITE)
2314     logger->inc(l_osdc_op_w);
2315   else if (op->target.flags & CEPH_OSD_FLAG_READ)
2316     logger->inc(l_osdc_op_r);
2317
2318   if (op->target.flags & CEPH_OSD_FLAG_PGOP)
2319     logger->inc(l_osdc_op_pg);
2320
2321   for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); ++p) {
2322     int code = l_osdc_osdop_other;
2323     switch (p->op.op) {
2324     case CEPH_OSD_OP_STAT: code = l_osdc_osdop_stat; break;
2325     case CEPH_OSD_OP_CREATE: code = l_osdc_osdop_create; break;
2326     case CEPH_OSD_OP_READ: code = l_osdc_osdop_read; break;
2327     case CEPH_OSD_OP_WRITE: code = l_osdc_osdop_write; break;
2328     case CEPH_OSD_OP_WRITEFULL: code = l_osdc_osdop_writefull; break;
2329     case CEPH_OSD_OP_WRITESAME: code = l_osdc_osdop_writesame; break;
2330     case CEPH_OSD_OP_APPEND: code = l_osdc_osdop_append; break;
2331     case CEPH_OSD_OP_ZERO: code = l_osdc_osdop_zero; break;
2332     case CEPH_OSD_OP_TRUNCATE: code = l_osdc_osdop_truncate; break;
2333     case CEPH_OSD_OP_DELETE: code = l_osdc_osdop_delete; break;
2334     case CEPH_OSD_OP_MAPEXT: code = l_osdc_osdop_mapext; break;
2335     case CEPH_OSD_OP_SPARSE_READ: code = l_osdc_osdop_sparse_read; break;
2336     case CEPH_OSD_OP_GETXATTR: code = l_osdc_osdop_getxattr; break;
2337     case CEPH_OSD_OP_SETXATTR: code = l_osdc_osdop_setxattr; break;
2338     case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
2339     case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
2340     case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
2341     case CEPH_OSD_OP_TMAPUP: code = l_osdc_osdop_tmap_up; break;
2342     case CEPH_OSD_OP_TMAPPUT: code = l_osdc_osdop_tmap_put; break;
2343     case CEPH_OSD_OP_TMAPGET: code = l_osdc_osdop_tmap_get; break;
2344
2345     // OMAP read operations
2346     case CEPH_OSD_OP_OMAPGETVALS:
2347     case CEPH_OSD_OP_OMAPGETKEYS:
2348     case CEPH_OSD_OP_OMAPGETHEADER:
2349     case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
2350     case CEPH_OSD_OP_OMAP_CMP: code = l_osdc_osdop_omap_rd; break;
2351
2352     // OMAP write operations
2353     case CEPH_OSD_OP_OMAPSETVALS:
2354     case CEPH_OSD_OP_OMAPSETHEADER: code = l_osdc_osdop_omap_wr; break;
2355
2356     // OMAP del operations
2357     case CEPH_OSD_OP_OMAPCLEAR:
2358     case CEPH_OSD_OP_OMAPRMKEYS: code = l_osdc_osdop_omap_del; break;
2359
2360     case CEPH_OSD_OP_CALL: code = l_osdc_osdop_call; break;
2361     case CEPH_OSD_OP_WATCH: code = l_osdc_osdop_watch; break;
2362     case CEPH_OSD_OP_NOTIFY: code = l_osdc_osdop_notify; break;
2363     }
2364     if (code)
2365       logger->inc(code);
2366   }
2367 }
2368
2369 void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
2370 {
2371   // rwlock is locked
2372
2373   ldout(cct, 10) << __func__ << " op " << op << dendl;
2374
2375   // pick target
2376   assert(op->session == NULL);
2377   OSDSession *s = NULL;
2378
2379   bool check_for_latest_map = _calc_target(&op->target, nullptr)
2380     == RECALC_OP_TARGET_POOL_DNE;
2381
2382   // Try to get a session, including a retry if we need to take write lock
2383   int r = _get_session(op->target.osd, &s, sul);
2384   if (r == -EAGAIN ||
2385       (check_for_latest_map && sul.owns_lock_shared())) {
2386     epoch_t orig_epoch = osdmap->get_epoch();
2387     sul.unlock();
2388     if (cct->_conf->objecter_debug_inject_relock_delay) {
2389       sleep(1);
2390     }
2391     sul.lock();
2392     if (orig_epoch != osdmap->get_epoch()) {
2393       // map changed; recalculate mapping
2394       ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target"
2395                      << dendl;
2396       check_for_latest_map = _calc_target(&op->target, nullptr)
2397         == RECALC_OP_TARGET_POOL_DNE;
2398       if (s) {
2399         put_session(s);
2400         s = NULL;
2401         r = -EAGAIN;
2402       }
2403     }
2404   }
2405   if (r == -EAGAIN) {
2406     assert(s == NULL);
2407     r = _get_session(op->target.osd, &s, sul);
2408   }
2409   assert(r == 0);
2410   assert(s);  // may be homeless
2411
2412   _send_op_account(op);
2413
2414   // send?
2415
2416   assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
2417
2418   if (osdmap_full_try) {
2419     op->target.flags |= CEPH_OSD_FLAG_FULL_TRY;
2420   }
2421
2422   bool need_send = false;
2423
2424   if (osdmap->get_epoch() < epoch_barrier) {
2425     ldout(cct, 10) << " barrier, paused " << op << " tid " << op->tid
2426                    << dendl;
2427     op->target.paused = true;
2428     _maybe_request_map();
2429   } else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) &&
2430              osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
2431     ldout(cct, 10) << " paused modify " << op << " tid " << op->tid
2432                    << dendl;
2433     op->target.paused = true;
2434     _maybe_request_map();
2435   } else if ((op->target.flags & CEPH_OSD_FLAG_READ) &&
2436              osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
2437     ldout(cct, 10) << " paused read " << op << " tid " << op->tid
2438                    << dendl;
2439     op->target.paused = true;
2440     _maybe_request_map();
2441   } else if (op->respects_full() &&
2442              (_osdmap_full_flag() ||
2443               _osdmap_pool_full(op->target.base_oloc.pool))) {
2444     ldout(cct, 0) << " FULL, paused modify " << op << " tid "
2445                   << op->tid << dendl;
2446     op->target.paused = true;
2447     _maybe_request_map();
2448   } else if (!s->is_homeless()) {
2449     need_send = true;
2450   } else {
2451     _maybe_request_map();
2452   }
2453
2454   MOSDOp *m = NULL;
2455   if (need_send) {
2456     m = _prepare_osd_op(op);
2457   }
2458
2459   OSDSession::unique_lock sl(s->lock);
2460   if (op->tid == 0)
2461     op->tid = ++last_tid;
2462
2463   ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
2464                  << " '" << op->target.base_oloc << "' '"
2465                  << op->target.target_oloc << "' " << op->ops << " tid "
2466                  << op->tid << " osd." << (!s->is_homeless() ? s->osd : -1)
2467                  << dendl;
2468
2469   _session_op_assign(s, op);
2470
2471   if (need_send) {
2472     _send_op(op, m);
2473   }
2474
2475   // Last chance to touch Op here, after giving up session lock it can
2476   // be freed at any time by response handler.
2477   ceph_tid_t tid = op->tid;
2478   if (check_for_latest_map) {
2479     _send_op_map_check(op);
2480   }
2481   if (ptid)
2482     *ptid = tid;
2483   op = NULL;
2484
2485   sl.unlock();
2486   put_session(s);
2487
2488   ldout(cct, 5) << num_in_flight << " in flight" << dendl;
2489 }
2490
2491 int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
2492 {
2493   assert(initialized);
2494
2495   OSDSession::unique_lock sl(s->lock);
2496
2497   map<ceph_tid_t, Op*>::iterator p = s->ops.find(tid);
2498   if (p == s->ops.end()) {
2499     ldout(cct, 10) << __func__ << " tid " << tid << " dne in session "
2500                    << s->osd << dendl;
2501     return -ENOENT;
2502   }
2503
2504   if (s->con) {
2505     ldout(cct, 20) << " revoking rx buffer for " << tid
2506                    << " on " << s->con << dendl;
2507     s->con->revoke_rx_buffer(tid);
2508   }
2509
2510   ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
2511                  << dendl;
2512   Op *op = p->second;
2513   if (op->onfinish) {
2514     num_in_flight--;
2515     op->onfinish->complete(r);
2516     op->onfinish = NULL;
2517   }
2518   _op_cancel_map_check(op);
2519   _finish_op(op, r);
2520   sl.unlock();
2521
2522   return 0;
2523 }
2524
2525 int Objecter::op_cancel(ceph_tid_t tid, int r)
2526 {
2527   int ret = 0;
2528
2529   unique_lock wl(rwlock);
2530   ret = _op_cancel(tid, r);
2531
2532   return ret;
2533 }
2534
2535 int Objecter::_op_cancel(ceph_tid_t tid, int r)
2536 {
2537   int ret = 0;
2538
2539   ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r
2540                 << dendl;
2541
2542 start:
2543
2544   for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2545        siter != osd_sessions.end(); ++siter) {
2546     OSDSession *s = siter->second;
2547     OSDSession::shared_lock sl(s->lock);
2548     if (s->ops.find(tid) != s->ops.end()) {
2549       sl.unlock();
2550       ret = op_cancel(s, tid, r);
2551       if (ret == -ENOENT) {
2552         /* oh no! raced, maybe tid moved to another session, restarting */
2553         goto start;
2554       }
2555       return ret;
2556     }
2557   }
2558
2559   ldout(cct, 5) << __func__ << ": tid " << tid
2560                 << " not found in live sessions" << dendl;
2561
2562   // Handle case where the op is in homeless session
2563   OSDSession::shared_lock sl(homeless_session->lock);
2564   if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
2565     sl.unlock();
2566     ret = op_cancel(homeless_session, tid, r);
2567     if (ret == -ENOENT) {
2568       /* oh no! raced, maybe tid moved to another session, restarting */
2569       goto start;
2570     } else {
2571       return ret;
2572     }
2573   } else {
2574     sl.unlock();
2575   }
2576
2577   ldout(cct, 5) << __func__ << ": tid " << tid
2578                 << " not found in homeless session" << dendl;
2579
2580   return ret;
2581 }
2582
2583
2584 epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
2585 {
2586   unique_lock wl(rwlock);
2587
2588   std::vector<ceph_tid_t> to_cancel;
2589   bool found = false;
2590
2591   for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2592        siter != osd_sessions.end(); ++siter) {
2593     OSDSession *s = siter->second;
2594     OSDSession::shared_lock sl(s->lock);
2595     for (map<ceph_tid_t, Op*>::iterator op_i = s->ops.begin();
2596          op_i != s->ops.end(); ++op_i) {
2597       if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE
2598           && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) {
2599         to_cancel.push_back(op_i->first);
2600       }
2601     }
2602     sl.unlock();
2603
2604     for (std::vector<ceph_tid_t>::iterator titer = to_cancel.begin();
2605          titer != to_cancel.end();
2606          ++titer) {
2607       int cancel_result = op_cancel(s, *titer, r);
2608       // We hold rwlock across search and cancellation, so cancels
2609       // should always succeed
2610       assert(cancel_result == 0);
2611     }
2612     if (!found && to_cancel.size())
2613       found = true;
2614     to_cancel.clear();
2615   }
2616
2617   const epoch_t epoch = osdmap->get_epoch();
2618
2619   wl.unlock();
2620
2621   if (found) {
2622     return epoch;
2623   } else {
2624     return -1;
2625   }
2626 }
2627
2628 bool Objecter::is_pg_changed(
2629   int oldprimary,
2630   const vector<int>& oldacting,
2631   int newprimary,
2632   const vector<int>& newacting,
2633   bool any_change)
2634 {
2635   if (OSDMap::primary_changed(
2636         oldprimary,
2637         oldacting,
2638         newprimary,
2639         newacting))
2640     return true;
2641   if (any_change && oldacting != newacting)
2642     return true;
2643   return false;      // same primary (tho replicas may have changed)
2644 }
2645
2646 bool Objecter::target_should_be_paused(op_target_t *t)
2647 {
2648   const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2649   bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
2650   bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) ||
2651     _osdmap_full_flag() || _osdmap_pool_full(*pi);
2652
2653   return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
2654     (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
2655     (osdmap->get_epoch() < epoch_barrier);
2656 }
2657
2658 /**
2659  * Locking public accessor for _osdmap_full_flag
2660  */
2661 bool Objecter::osdmap_full_flag() const
2662 {
2663   shared_lock rl(rwlock);
2664
2665   return _osdmap_full_flag();
2666 }
2667
2668 bool Objecter::osdmap_pool_full(const int64_t pool_id) const
2669 {
2670   shared_lock rl(rwlock);
2671
2672   if (_osdmap_full_flag()) {
2673     return true;
2674   }
2675
2676   return _osdmap_pool_full(pool_id);
2677 }
2678
2679 bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
2680 {
2681   const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
2682   if (pool == NULL) {
2683     ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl;
2684     return false;
2685   }
2686
2687   return _osdmap_pool_full(*pool);
2688 }
2689
2690 bool Objecter::_osdmap_has_pool_full() const
2691 {
2692   for (map<int64_t, pg_pool_t>::const_iterator it
2693          = osdmap->get_pools().begin();
2694        it != osdmap->get_pools().end(); ++it) {
2695     if (_osdmap_pool_full(it->second))
2696       return true;
2697   }
2698   return false;
2699 }
2700
2701 bool Objecter::_osdmap_pool_full(const pg_pool_t &p) const
2702 {
2703   return p.has_flag(pg_pool_t::FLAG_FULL) && honor_osdmap_full;
2704 }
2705
2706 /**
2707  * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2708  */
2709 bool Objecter::_osdmap_full_flag() const
2710 {
2711   // Ignore the FULL flag if the caller has honor_osdmap_full
2712   return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_osdmap_full;
2713 }
2714
2715 void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
2716 {
2717   for (map<int64_t, pg_pool_t>::const_iterator it
2718          = osdmap->get_pools().begin();
2719        it != osdmap->get_pools().end(); ++it) {
2720     if (pool_full_map.find(it->first) == pool_full_map.end()) {
2721       pool_full_map[it->first] = _osdmap_pool_full(it->second);
2722     } else {
2723       pool_full_map[it->first] = _osdmap_pool_full(it->second) ||
2724         pool_full_map[it->first];
2725     }
2726   }
2727 }
2728
2729 int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
2730                                            const string& ns)
2731 {
2732   shared_lock rl(rwlock);
2733   const pg_pool_t *p = osdmap->get_pg_pool(pool);
2734   if (!p)
2735     return -ENOENT;
2736   return p->hash_key(key, ns);
2737 }
2738
2739 int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
2740                                               const string& ns)
2741 {
2742   shared_lock rl(rwlock);
2743   const pg_pool_t *p = osdmap->get_pg_pool(pool);
2744   if (!p)
2745     return -ENOENT;
2746   return p->raw_hash_to_pg(p->hash_key(key, ns));
2747 }
2748
2749 int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
2750 {
2751   // rwlock is locked
2752   bool is_read = t->flags & CEPH_OSD_FLAG_READ;
2753   bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
2754   t->epoch = osdmap->get_epoch();
2755   ldout(cct,20) << __func__ << " epoch " << t->epoch
2756                 << " base " << t->base_oid << " " << t->base_oloc
2757                 << " precalc_pgid " << (int)t->precalc_pgid
2758                 << " pgid " << t->base_pgid
2759                 << (is_read ? " is_read" : "")
2760                 << (is_write ? " is_write" : "")
2761                 << dendl;
2762
2763   const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2764   if (!pi) {
2765     t->osd = -1;
2766     return RECALC_OP_TARGET_POOL_DNE;
2767   }
2768   ldout(cct,30) << __func__ << "  base pi " << pi
2769                 << " pg_num " << pi->get_pg_num() << dendl;
2770
2771   bool force_resend = false;
2772   if (osdmap->get_epoch() == pi->last_force_op_resend) {
2773     if (t->last_force_resend < pi->last_force_op_resend) {
2774       t->last_force_resend = pi->last_force_op_resend;
2775       force_resend = true;
2776     } else if (t->last_force_resend == 0) {
2777       force_resend = true;
2778     }
2779   }
2780
2781   // apply tiering
2782   t->target_oid = t->base_oid;
2783   t->target_oloc = t->base_oloc;
2784   if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
2785     if (is_read && pi->has_read_tier())
2786       t->target_oloc.pool = pi->read_tier;
2787     if (is_write && pi->has_write_tier())
2788       t->target_oloc.pool = pi->write_tier;
2789     pi = osdmap->get_pg_pool(t->target_oloc.pool);
2790     if (!pi) {
2791       t->osd = -1;
2792       return RECALC_OP_TARGET_POOL_DNE;
2793     }
2794   }
2795
2796   pg_t pgid;
2797   if (t->precalc_pgid) {
2798     assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
2799     assert(t->base_oid.name.empty()); // make sure this is a pg op
2800     assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
2801     pgid = t->base_pgid;
2802   } else {
2803     int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
2804                                            pgid);
2805     if (ret == -ENOENT) {
2806       t->osd = -1;
2807       return RECALC_OP_TARGET_POOL_DNE;
2808     }
2809   }
2810   ldout(cct,20) << __func__ << " target " << t->target_oid << " "
2811                 << t->target_oloc << " -> pgid " << pgid << dendl;
2812   ldout(cct,30) << __func__ << "  target pi " << pi
2813                 << " pg_num " << pi->get_pg_num() << dendl;
2814   t->pool_ever_existed = true;
2815
2816   int size = pi->size;
2817   int min_size = pi->min_size;
2818   unsigned pg_num = pi->get_pg_num();
2819   int up_primary, acting_primary;
2820   vector<int> up, acting;
2821   osdmap->pg_to_up_acting_osds(pgid, &up, &up_primary,
2822                                &acting, &acting_primary);
2823   bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
2824   bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
2825   unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
2826   pg_t prev_pgid(prev_seed, pgid.pool());
2827   if (any_change && PastIntervals::is_new_interval(
2828         t->acting_primary,
2829         acting_primary,
2830         t->acting,
2831         acting,
2832         t->up_primary,
2833         up_primary,
2834         t->up,
2835         up,
2836         t->size,
2837         size,
2838         t->min_size,
2839         min_size,
2840         t->pg_num,
2841         pg_num,
2842         t->sort_bitwise,
2843         sort_bitwise,
2844         t->recovery_deletes,
2845         recovery_deletes,
2846         prev_pgid)) {
2847     force_resend = true;
2848   }
2849
2850   bool unpaused = false;
2851   if (t->paused && !target_should_be_paused(t)) {
2852     t->paused = false;
2853     unpaused = true;
2854   }
2855
2856   bool legacy_change =
2857     t->pgid != pgid ||
2858       is_pg_changed(
2859         t->acting_primary, t->acting, acting_primary, acting,
2860         t->used_replica || any_change);
2861   bool split = false;
2862   if (t->pg_num) {
2863     split = prev_pgid.is_split(t->pg_num, pg_num, nullptr);
2864   }
2865
2866   if (legacy_change || split || force_resend) {
2867     t->pgid = pgid;
2868     t->acting = acting;
2869     t->acting_primary = acting_primary;
2870     t->up_primary = up_primary;
2871     t->up = up;
2872     t->size = size;
2873     t->min_size = min_size;
2874     t->pg_num = pg_num;
2875     t->pg_num_mask = pi->get_pg_num_mask();
2876     osdmap->get_primary_shard(
2877       pg_t(ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask), pgid.pool()),
2878       &t->actual_pgid);
2879     t->sort_bitwise = sort_bitwise;
2880     t->recovery_deletes = recovery_deletes;
2881     ldout(cct, 10) << __func__ << " "
2882                    << " raw pgid " << pgid << " -> actual " << t->actual_pgid
2883                    << " acting " << acting
2884                    << " primary " << acting_primary << dendl;
2885     t->used_replica = false;
2886     if (acting_primary == -1) {
2887       t->osd = -1;
2888     } else {
2889       int osd;
2890       bool read = is_read && !is_write;
2891       if (read && (t->flags & CEPH_OSD_FLAG_BALANCE_READS)) {
2892         int p = rand() % acting.size();
2893         if (p)
2894           t->used_replica = true;
2895         osd = acting[p];
2896         ldout(cct, 10) << " chose random osd." << osd << " of " << acting
2897                        << dendl;
2898       } else if (read && (t->flags & CEPH_OSD_FLAG_LOCALIZE_READS) &&
2899                  acting.size() > 1) {
2900         // look for a local replica.  prefer the primary if the
2901         // distance is the same.
2902         int best = -1;
2903         int best_locality = 0;
2904         for (unsigned i = 0; i < acting.size(); ++i) {
2905           int locality = osdmap->crush->get_common_ancestor_distance(
2906                  cct, acting[i], crush_location);
2907           ldout(cct, 20) << __func__ << " localize: rank " << i
2908                          << " osd." << acting[i]
2909                          << " locality " << locality << dendl;
2910           if (i == 0 ||
2911               (locality >= 0 && best_locality >= 0 &&
2912                locality < best_locality) ||
2913               (best_locality < 0 && locality >= 0)) {
2914             best = i;
2915             best_locality = locality;
2916             if (i)
2917               t->used_replica = true;
2918           }
2919         }
2920         assert(best >= 0);
2921         osd = acting[best];
2922       } else {
2923         osd = acting_primary;
2924       }
2925       t->osd = osd;
2926     }
2927   }
2928   if (legacy_change || unpaused || force_resend) {
2929     return RECALC_OP_TARGET_NEED_RESEND;
2930   }
2931   if (split && con && con->has_features(CEPH_FEATUREMASK_RESEND_ON_SPLIT)) {
2932     return RECALC_OP_TARGET_NEED_RESEND;
2933   }
2934   return RECALC_OP_TARGET_NO_ACTION;
2935 }
2936
2937 int Objecter::_map_session(op_target_t *target, OSDSession **s,
2938                            shunique_lock& sul)
2939 {
2940   _calc_target(target, nullptr);
2941   return _get_session(target->osd, s, sul);
2942 }
2943
2944 void Objecter::_session_op_assign(OSDSession *to, Op *op)
2945 {
2946   // to->lock is locked
2947   assert(op->session == NULL);
2948   assert(op->tid);
2949
2950   get_session(to);
2951   op->session = to;
2952   to->ops[op->tid] = op;
2953
2954   if (to->is_homeless()) {
2955     num_homeless_ops++;
2956   }
2957
2958   ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
2959 }
2960
2961 void Objecter::_session_op_remove(OSDSession *from, Op *op)
2962 {
2963   assert(op->session == from);
2964   // from->lock is locked
2965
2966   if (from->is_homeless()) {
2967     num_homeless_ops--;
2968   }
2969
2970   from->ops.erase(op->tid);
2971   put_session(from);
2972   op->session = NULL;
2973
2974   ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
2975 }
2976
2977 void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
2978 {
2979   // to lock is locked unique
2980   assert(op->session == NULL);
2981
2982   if (to->is_homeless()) {
2983     num_homeless_ops++;
2984   }
2985
2986   get_session(to);
2987   op->session = to;
2988   to->linger_ops[op->linger_id] = op;
2989
2990   ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id
2991                  << dendl;
2992 }
2993
2994 void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
2995 {
2996   assert(from == op->session);
2997   // from->lock is locked unique
2998
2999   if (from->is_homeless()) {
3000     num_homeless_ops--;
3001   }
3002
3003   from->linger_ops.erase(op->linger_id);
3004   put_session(from);
3005   op->session = NULL;
3006
3007   ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id
3008                  << dendl;
3009 }
3010
3011 void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
3012 {
3013   assert(from == op->session);
3014   // from->lock is locked
3015
3016   if (from->is_homeless()) {
3017     num_homeless_ops--;
3018   }
3019
3020   from->command_ops.erase(op->tid);
3021   put_session(from);
3022   op->session = NULL;
3023
3024   ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3025 }
3026
3027 void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
3028 {
3029   // to->lock is locked
3030   assert(op->session == NULL);
3031   assert(op->tid);
3032
3033   if (to->is_homeless()) {
3034     num_homeless_ops++;
3035   }
3036
3037   get_session(to);
3038   op->session = to;
3039   to->command_ops[op->tid] = op;
3040
3041   ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3042 }
3043
3044 int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
3045                                        shunique_lock& sul)
3046 {
3047   // rwlock is locked unique
3048
3049   int r = _calc_target(&linger_op->target, nullptr, true);
3050   if (r == RECALC_OP_TARGET_NEED_RESEND) {
3051     ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
3052                    << " pgid " << linger_op->target.pgid
3053                    << " acting " << linger_op->target.acting << dendl;
3054
3055     OSDSession *s = NULL;
3056     r = _get_session(linger_op->target.osd, &s, sul);
3057     assert(r == 0);
3058
3059     if (linger_op->session != s) {
3060       // NB locking two sessions (s and linger_op->session) at the
3061       // same time here is only safe because we are the only one that
3062       // takes two, and we are holding rwlock for write.  Disable
3063       // lockdep because it doesn't know that.
3064       OSDSession::unique_lock sl(s->lock);
3065       _session_linger_op_remove(linger_op->session, linger_op);
3066       _session_linger_op_assign(s, linger_op);
3067     }
3068
3069     put_session(s);
3070     return RECALC_OP_TARGET_NEED_RESEND;
3071   }
3072   return r;
3073 }
3074
3075 void Objecter::_cancel_linger_op(Op *op)
3076 {
3077   ldout(cct, 15) << "cancel_op " << op->tid << dendl;
3078
3079   assert(!op->should_resend);
3080   if (op->onfinish) {
3081     delete op->onfinish;
3082     num_in_flight--;
3083   }
3084
3085   _finish_op(op, 0);
3086 }
3087
3088 void Objecter::_finish_op(Op *op, int r)
3089 {
3090   ldout(cct, 15) << "finish_op " << op->tid << dendl;
3091
3092   // op->session->lock is locked unique or op->session is null
3093
3094   if (!op->ctx_budgeted && op->budgeted)
3095     put_op_budget(op);
3096
3097   if (op->ontimeout && r != -ETIMEDOUT)
3098     timer.cancel_event(op->ontimeout);
3099
3100   if (op->session) {
3101     _session_op_remove(op->session, op);
3102   }
3103
3104   logger->dec(l_osdc_op_active);
3105
3106   assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
3107
3108   inflight_ops--;
3109
3110   op->put();
3111 }
3112
3113 void Objecter::finish_op(OSDSession *session, ceph_tid_t tid)
3114 {
3115   ldout(cct, 15) << "finish_op " << tid << dendl;
3116   shared_lock rl(rwlock);
3117
3118   OSDSession::unique_lock wl(session->lock);
3119
3120   map<ceph_tid_t, Op *>::iterator iter = session->ops.find(tid);
3121   if (iter == session->ops.end())
3122     return;
3123
3124   Op *op = iter->second;
3125
3126   _finish_op(op, 0);
3127 }
3128
3129 MOSDOp *Objecter::_prepare_osd_op(Op *op)
3130 {
3131   // rwlock is locked
3132
3133   int flags = op->target.flags;
3134   flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
3135
3136   // Nothing checks this any longer, but needed for compatibility with
3137   // pre-luminous osds
3138   flags |= CEPH_OSD_FLAG_ONDISK;
3139
3140   if (!honor_osdmap_full)
3141     flags |= CEPH_OSD_FLAG_FULL_FORCE;
3142
3143   op->target.paused = false;
3144   op->stamp = ceph::mono_clock::now();
3145
3146   hobject_t hobj = op->target.get_hobj();
3147   MOSDOp *m = new MOSDOp(client_inc, op->tid,
3148                          hobj, op->target.actual_pgid,
3149                          osdmap->get_epoch(),
3150                          flags, op->features);
3151
3152   m->set_snapid(op->snapid);
3153   m->set_snap_seq(op->snapc.seq);
3154   m->set_snaps(op->snapc.snaps);
3155
3156   m->ops = op->ops;
3157   m->set_mtime(op->mtime);
3158   m->set_retry_attempt(op->attempts++);
3159
3160   if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) {
3161     op->trace.init("op", &trace_endpoint);
3162   }
3163
3164   if (op->priority)
3165     m->set_priority(op->priority);
3166   else
3167     m->set_priority(cct->_conf->osd_client_op_priority);
3168
3169   if (op->reqid != osd_reqid_t()) {
3170     m->set_reqid(op->reqid);
3171   }
3172
3173   logger->inc(l_osdc_op_send);
3174   logger->inc(l_osdc_op_send_bytes, m->get_data().length());
3175
3176   return m;
3177 }
3178
3179 void Objecter::_send_op(Op *op, MOSDOp *m)
3180 {
3181   // rwlock is locked
3182   // op->session->lock is locked
3183
3184   // backoff?
3185   hobject_t hoid = op->target.get_hobj();
3186   auto p = op->session->backoffs.find(op->target.actual_pgid);
3187   if (p != op->session->backoffs.end()) {
3188     auto q = p->second.lower_bound(hoid);
3189     if (q != p->second.begin()) {
3190       --q;
3191       if (hoid >= q->second.end) {
3192         ++q;
3193       }
3194     }
3195     if (q != p->second.end()) {
3196       ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
3197                      << "," << q->second.end << ")" << dendl;
3198       int r = cmp(hoid, q->second.begin);
3199       if (r == 0 || (r > 0 && hoid < q->second.end)) {
3200         ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid
3201                        << " id " << q->second.id << " on " << hoid
3202                        << ", queuing " << op << " tid " << op->tid << dendl;
3203         return;
3204       }
3205     }
3206   }
3207
3208   if (!m) {
3209     assert(op->tid > 0);
3210     m = _prepare_osd_op(op);
3211   }
3212
3213   if (op->target.actual_pgid != m->get_spg()) {
3214     ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
3215                    << m->get_spg() << " to " << op->target.actual_pgid
3216                    << ", updating and reencoding" << dendl;
3217     m->set_spg(op->target.actual_pgid);
3218     m->clear_payload();  // reencode
3219   }
3220
3221   ldout(cct, 15) << "_send_op " << op->tid << " to "
3222                  << op->target.actual_pgid << " on osd." << op->session->osd
3223                  << dendl;
3224
3225   ConnectionRef con = op->session->con;
3226   assert(con);
3227
3228   // preallocated rx buffer?
3229   if (op->con) {
3230     ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on "
3231                    << op->con << dendl;
3232     op->con->revoke_rx_buffer(op->tid);
3233   }
3234   if (op->outbl &&
3235       op->ontimeout == 0 &&  // only post rx_buffer if no timeout; see #9582
3236       op->outbl->length()) {
3237     ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << con
3238                    << dendl;
3239     op->con = con;
3240     op->con->post_rx_buffer(op->tid, *op->outbl);
3241   }
3242
3243   op->incarnation = op->session->incarnation;
3244
3245   m->set_tid(op->tid);
3246
3247   if (op->trace.valid()) {
3248     m->trace.init("op msg", nullptr, &op->trace);
3249   }
3250   op->session->con->send_message(m);
3251 }
3252
3253 int Objecter::calc_op_budget(Op *op)
3254 {
3255   int op_budget = 0;
3256   for (vector<OSDOp>::iterator i = op->ops.begin();
3257        i != op->ops.end();
3258        ++i) {
3259     if (i->op.op & CEPH_OSD_OP_MODE_WR) {
3260       op_budget += i->indata.length();
3261     } else if (ceph_osd_op_mode_read(i->op.op)) {
3262       if (ceph_osd_op_type_data(i->op.op)) {
3263         if ((int64_t)i->op.extent.length > 0)
3264           op_budget += (int64_t)i->op.extent.length;
3265       } else if (ceph_osd_op_type_attr(i->op.op)) {
3266         op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
3267       }
3268     }
3269   }
3270   return op_budget;
3271 }
3272
3273 void Objecter::_throttle_op(Op *op,
3274                             shunique_lock& sul,
3275                             int op_budget)
3276 {
3277   assert(sul && sul.mutex() == &rwlock);
3278   bool locked_for_write = sul.owns_lock();
3279
3280   if (!op_budget)
3281     op_budget = calc_op_budget(op);
3282   if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
3283     sul.unlock();
3284     op_throttle_bytes.get(op_budget);
3285     if (locked_for_write)
3286       sul.lock();
3287     else
3288       sul.lock_shared();
3289   }
3290   if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now
3291     sul.unlock();
3292     op_throttle_ops.get(1);
3293     if (locked_for_write)
3294       sul.lock();
3295     else
3296       sul.lock_shared();
3297   }
3298 }
3299
3300 void Objecter::unregister_op(Op *op)
3301 {
3302   OSDSession::unique_lock sl(op->session->lock);
3303   op->session->ops.erase(op->tid);
3304   sl.unlock();
3305   put_session(op->session);
3306   op->session = NULL;
3307
3308   inflight_ops--;
3309 }
3310
3311 /* This function DOES put the passed message before returning */
3312 void Objecter::handle_osd_op_reply(MOSDOpReply *m)
3313 {
3314   ldout(cct, 10) << "in handle_osd_op_reply" << dendl;
3315
3316   // get pio
3317   ceph_tid_t tid = m->get_tid();
3318
3319   shunique_lock sul(rwlock, ceph::acquire_shared);
3320   if (!initialized) {
3321     m->put();
3322     return;
3323   }
3324
3325   ConnectionRef con = m->get_connection();
3326   OSDSession *s = static_cast<OSDSession*>(con->get_priv());
3327   if (!s || s->con != con) {
3328     ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
3329     if (s) {
3330       s->put();
3331     }
3332     m->put();
3333     return;
3334   }
3335
3336   OSDSession::unique_lock sl(s->lock);
3337
3338   map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
3339   if (iter == s->ops.end()) {
3340     ldout(cct, 7) << "handle_osd_op_reply " << tid
3341                   << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ?
3342                                                     " onnvram" : " ack"))
3343                   << " ... stray" << dendl;
3344     sl.unlock();
3345     put_session(s);
3346     m->put();
3347     return;
3348   }
3349
3350   ldout(cct, 7) << "handle_osd_op_reply " << tid
3351                 << (m->is_ondisk() ? " ondisk" :
3352                     (m->is_onnvram() ? " onnvram" : " ack"))
3353                 << " uv " << m->get_user_version()
3354                 << " in " << m->get_pg()
3355                 << " attempt " << m->get_retry_attempt()
3356                 << dendl;
3357   Op *op = iter->second;
3358   op->trace.event("osd op reply");
3359
3360   if (retry_writes_after_first_reply && op->attempts == 1 &&
3361       (op->target.flags & CEPH_OSD_FLAG_WRITE)) {
3362     ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
3363     if (op->onfinish) {
3364       num_in_flight--;
3365     }
3366     _session_op_remove(s, op);
3367     sl.unlock();
3368     put_session(s);
3369
3370     _op_submit(op, sul, NULL);
3371     m->put();
3372     return;
3373   }
3374
3375   if (m->get_retry_attempt() >= 0) {
3376     if (m->get_retry_attempt() != (op->attempts - 1)) {
3377       ldout(cct, 7) << " ignoring reply from attempt "
3378                     << m->get_retry_attempt()
3379                     << " from " << m->get_source_inst()
3380                     << "; last attempt " << (op->attempts - 1) << " sent to "
3381                     << op->session->con->get_peer_addr() << dendl;
3382       m->put();
3383       sl.unlock();
3384       put_session(s);
3385       return;
3386     }
3387   } else {
3388     // we don't know the request attempt because the server is old, so
3389     // just accept this one.  we may do ACK callbacks we shouldn't
3390     // have, but that is better than doing callbacks out of order.
3391   }
3392
3393   Context *onfinish = 0;
3394
3395   int rc = m->get_result();
3396
3397   if (m->is_redirect_reply()) {
3398     ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
3399     if (op->onfinish)
3400       num_in_flight--;
3401     _session_op_remove(s, op);
3402     sl.unlock();
3403     put_session(s);
3404
3405     // FIXME: two redirects could race and reorder
3406
3407     op->tid = 0;
3408     m->get_redirect().combine_with_locator(op->target.target_oloc,
3409                                            op->target.target_oid.name);
3410     op->target.flags |= CEPH_OSD_FLAG_REDIRECTED;
3411     _op_submit(op, sul, NULL);
3412     m->put();
3413     return;
3414   }
3415
3416   if (rc == -EAGAIN) {
3417     ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
3418     if (op->onfinish)
3419       num_in_flight--;
3420     _session_op_remove(s, op);
3421     sl.unlock();
3422     put_session(s);
3423
3424     op->tid = 0;
3425     op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3426                           CEPH_OSD_FLAG_LOCALIZE_READS);
3427     op->target.pgid = pg_t();
3428     _op_submit(op, sul, NULL);
3429     m->put();
3430     return;
3431   }
3432
3433   sul.unlock();
3434
3435   if (op->objver)
3436     *op->objver = m->get_user_version();
3437   if (op->reply_epoch)
3438     *op->reply_epoch = m->get_map_epoch();
3439   if (op->data_offset)
3440     *op->data_offset = m->get_header().data_off;
3441
3442   // got data?
3443   if (op->outbl) {
3444     if (op->con)
3445       op->con->revoke_rx_buffer(op->tid);
3446     m->claim_data(*op->outbl);
3447     op->outbl = 0;
3448   }
3449
3450   // per-op result demuxing
3451   vector<OSDOp> out_ops;
3452   m->claim_ops(out_ops);
3453
3454   if (out_ops.size() != op->ops.size())
3455     ldout(cct, 0) << "WARNING: tid " << op->tid << " reply ops " << out_ops
3456                   << " != request ops " << op->ops
3457                   << " from " << m->get_source_inst() << dendl;
3458
3459   vector<bufferlist*>::iterator pb = op->out_bl.begin();
3460   vector<int*>::iterator pr = op->out_rval.begin();
3461   vector<Context*>::iterator ph = op->out_handler.begin();
3462   assert(op->out_bl.size() == op->out_rval.size());
3463   assert(op->out_bl.size() == op->out_handler.size());
3464   vector<OSDOp>::iterator p = out_ops.begin();
3465   for (unsigned i = 0;
3466        p != out_ops.end() && pb != op->out_bl.end();
3467        ++i, ++p, ++pb, ++pr, ++ph) {
3468     ldout(cct, 10) << " op " << i << " rval " << p->rval
3469                    << " len " << p->outdata.length() << dendl;
3470     if (*pb)
3471       **pb = p->outdata;
3472     // set rval before running handlers so that handlers
3473     // can change it if e.g. decoding fails
3474     if (*pr)
3475       **pr = ceph_to_hostos_errno(p->rval);
3476     if (*ph) {
3477       ldout(cct, 10) << " op " << i << " handler " << *ph << dendl;
3478       (*ph)->complete(ceph_to_hostos_errno(p->rval));
3479       *ph = NULL;
3480     }
3481   }
3482
3483   // NOTE: we assume that since we only request ONDISK ever we will
3484   // only ever get back one (type of) ack ever.
3485
3486   if (op->onfinish) {
3487     num_in_flight--;
3488     onfinish = op->onfinish;
3489     op->onfinish = NULL;
3490   }
3491   logger->inc(l_osdc_op_reply);
3492
3493   /* get it before we call _finish_op() */
3494   auto completion_lock = s->get_lock(op->target.base_oid);
3495
3496   ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
3497   _finish_op(op, 0);
3498
3499   ldout(cct, 5) << num_in_flight << " in flight" << dendl;
3500
3501   // serialize completions
3502   if (completion_lock.mutex()) {
3503     completion_lock.lock();
3504   }
3505   sl.unlock();
3506
3507   // do callbacks
3508   if (onfinish) {
3509     onfinish->complete(rc);
3510   }
3511   if (completion_lock.mutex()) {
3512     completion_lock.unlock();
3513   }
3514
3515   m->put();
3516   put_session(s);
3517 }
3518
3519 void Objecter::handle_osd_backoff(MOSDBackoff *m)
3520 {
3521   ldout(cct, 10) << __func__ << " " << *m << dendl;
3522   shunique_lock sul(rwlock, ceph::acquire_shared);
3523   if (!initialized) {
3524     m->put();
3525     return;
3526   }
3527
3528   ConnectionRef con = m->get_connection();
3529   OSDSession *s = static_cast<OSDSession*>(con->get_priv());
3530   if (!s || s->con != con) {
3531     ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
3532     if (s)
3533       s->put();
3534     m->put();
3535     return;
3536   }
3537
3538   get_session(s);
3539   s->put();  // from get_priv() above
3540
3541   OSDSession::unique_lock sl(s->lock);
3542
3543   switch (m->op) {
3544   case CEPH_OSD_BACKOFF_OP_BLOCK:
3545     {
3546       // register
3547       OSDBackoff& b = s->backoffs[m->pgid][m->begin];
3548       s->backoffs_by_id.insert(make_pair(m->id, &b));
3549       b.pgid = m->pgid;
3550       b.id = m->id;
3551       b.begin = m->begin;
3552       b.end = m->end;
3553
3554       // ack with original backoff's epoch so that the osd can discard this if
3555       // there was a pg split.
3556       Message *r = new MOSDBackoff(m->pgid,
3557                                    m->map_epoch,
3558                                    CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
3559                                    m->id, m->begin, m->end);
3560       // this priority must match the MOSDOps from _prepare_osd_op
3561       r->set_priority(cct->_conf->osd_client_op_priority);
3562       con->send_message(r);
3563     }
3564     break;
3565
3566   case CEPH_OSD_BACKOFF_OP_UNBLOCK:
3567     {
3568       auto p = s->backoffs_by_id.find(m->id);
3569       if (p != s->backoffs_by_id.end()) {
3570         OSDBackoff *b = p->second;
3571         if (b->begin != m->begin &&
3572             b->end != m->end) {
3573           lderr(cct) << __func__ << " got " << m->pgid << " id " << m->id
3574                      << " unblock on ["
3575                      << m->begin << "," << m->end << ") but backoff is ["
3576                      << b->begin << "," << b->end << ")" << dendl;
3577           // hrmpf, unblock it anyway.
3578         }
3579         ldout(cct, 10) << __func__ << " unblock backoff " << b->pgid
3580                        << " id " << b->id
3581                        << " [" << b->begin << "," << b->end
3582                        << ")" << dendl;
3583         auto spgp = s->backoffs.find(b->pgid);
3584         assert(spgp != s->backoffs.end());
3585         spgp->second.erase(b->begin);
3586         if (spgp->second.empty()) {
3587           s->backoffs.erase(spgp);
3588         }
3589         s->backoffs_by_id.erase(p);
3590
3591         // check for any ops to resend
3592         for (auto& q : s->ops) {
3593           if (q.second->target.actual_pgid == m->pgid) {
3594             int r = q.second->target.contained_by(m->begin, m->end);
3595             ldout(cct, 20) << __func__ <<  " contained_by " << r << " on "
3596                            << q.second->target.get_hobj() << dendl;
3597             if (r) {
3598               _send_op(q.second);
3599             }
3600           }
3601         }
3602       } else {
3603         lderr(cct) << __func__ << " " << m->pgid << " id " << m->id
3604                    << " unblock on ["
3605                    << m->begin << "," << m->end << ") but backoff dne" << dendl;
3606       }
3607     }
3608     break;
3609
3610   default:
3611     ldout(cct, 10) << __func__ << " unrecognized op " << (int)m->op << dendl;
3612   }
3613
3614   sul.unlock();
3615   sl.unlock();
3616
3617   m->put();
3618   put_session(s);
3619 }
3620
3621 uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3622                                       uint32_t pos)
3623 {
3624   shared_lock rl(rwlock);
3625   list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3626                                 pos, list_context->pool_id, string());
3627   ldout(cct, 10) << __func__ << list_context
3628                  << " pos " << pos << " -> " << list_context->pos << dendl;
3629   pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
3630   list_context->current_pg = actual.ps();
3631   list_context->at_end_of_pool = false;
3632   return pos;
3633 }
3634
3635 uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3636                                       const hobject_t& cursor)
3637 {
3638   shared_lock rl(rwlock);
3639   ldout(cct, 10) << "list_nobjects_seek " << list_context << dendl;
3640   list_context->pos = cursor;
3641   list_context->at_end_of_pool = false;
3642   pg_t actual = osdmap->raw_pg_to_pg(pg_t(cursor.get_hash(), list_context->pool_id));
3643   list_context->current_pg = actual.ps();
3644   list_context->sort_bitwise = true;
3645   return list_context->current_pg;
3646 }
3647
3648 void Objecter::list_nobjects_get_cursor(NListContext *list_context,
3649                                         hobject_t *cursor)
3650 {
3651   shared_lock rl(rwlock);
3652   if (list_context->list.empty()) {
3653     *cursor = list_context->pos;
3654   } else {
3655     const librados::ListObjectImpl& entry = list_context->list.front();
3656     const string *key = (entry.locator.empty() ? &entry.oid : &entry.locator);
3657     uint32_t h = osdmap->get_pg_pool(list_context->pool_id)->hash_key(*key, entry.nspace);
3658     *cursor = hobject_t(entry.oid, entry.locator, list_context->pool_snap_seq, h, list_context->pool_id, entry.nspace);
3659   }
3660 }
3661
3662 void Objecter::list_nobjects(NListContext *list_context, Context *onfinish)
3663 {
3664   ldout(cct, 10) << __func__ << " pool_id " << list_context->pool_id
3665                  << " pool_snap_seq " << list_context->pool_snap_seq
3666                  << " max_entries " << list_context->max_entries
3667                  << " list_context " << list_context
3668                  << " onfinish " << onfinish
3669                  << " current_pg " << list_context->current_pg
3670                  << " pos " << list_context->pos << dendl;
3671
3672   shared_lock rl(rwlock);
3673   const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
3674   if (!pool) { // pool is gone
3675     rl.unlock();
3676     put_nlist_context_budget(list_context);
3677     onfinish->complete(-ENOENT);
3678     return;
3679   }
3680   int pg_num = pool->get_pg_num();
3681   bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
3682
3683   if (list_context->pos.is_min()) {
3684     list_context->starting_pg_num = 0;
3685     list_context->sort_bitwise = sort_bitwise;
3686     list_context->starting_pg_num = pg_num;
3687   }
3688   if (list_context->sort_bitwise != sort_bitwise) {
3689     list_context->pos = hobject_t(
3690       object_t(), string(), CEPH_NOSNAP,
3691       list_context->current_pg, list_context->pool_id, string());
3692     list_context->sort_bitwise = sort_bitwise;
3693     ldout(cct, 10) << " hobject sort order changed, restarting this pg at "
3694                    << list_context->pos << dendl;
3695   }
3696   if (list_context->starting_pg_num != pg_num) {
3697     if (!sort_bitwise) {
3698       // start reading from the beginning; the pgs have changed
3699       ldout(cct, 10) << " pg_num changed; restarting with " << pg_num << dendl;
3700       list_context->pos = collection_list_handle_t();
3701     }
3702     list_context->starting_pg_num = pg_num;
3703   }
3704
3705   if (list_context->pos.is_max()) {
3706     ldout(cct, 20) << __func__ << " end of pool, list "
3707                    << list_context->list << dendl;
3708     if (list_context->list.empty()) {
3709       list_context->at_end_of_pool = true;
3710     }
3711     // release the listing context's budget once all
3712     // OPs (in the session) are finished
3713     put_nlist_context_budget(list_context);
3714     onfinish->complete(0);
3715     return;
3716   }
3717
3718   ObjectOperation op;
3719   op.pg_nls(list_context->max_entries, list_context->filter,
3720             list_context->pos, osdmap->get_epoch());
3721   list_context->bl.clear();
3722   C_NList *onack = new C_NList(list_context, onfinish, this);
3723   object_locator_t oloc(list_context->pool_id, list_context->nspace);
3724
3725   // note current_pg in case we don't have (or lose) SORTBITWISE
3726   list_context->current_pg = pool->raw_hash_to_pg(list_context->pos.get_hash());
3727   rl.unlock();
3728
3729   pg_read(list_context->current_pg, oloc, op,
3730           &list_context->bl, 0, onack, &onack->epoch,
3731           &list_context->ctx_budget);
3732 }
3733
3734 void Objecter::_nlist_reply(NListContext *list_context, int r,
3735                             Context *final_finish, epoch_t reply_epoch)
3736 {
3737   ldout(cct, 10) << __func__ << " " << list_context << dendl;
3738
3739   bufferlist::iterator iter = list_context->bl.begin();
3740   pg_nls_response_t response;
3741   bufferlist extra_info;
3742   ::decode(response, iter);
3743   if (!iter.end()) {
3744     ::decode(extra_info, iter);
3745   }
3746
3747   // if the osd returns 1 (newer code), or handle MAX, it means we
3748   // hit the end of the pg.
3749   if ((response.handle.is_max() || r == 1) &&
3750       !list_context->sort_bitwise) {
3751     // legacy OSD and !sortbitwise, figure out the next PG on our own
3752     ++list_context->current_pg;
3753     if (list_context->current_pg == list_context->starting_pg_num) {
3754       // end of pool
3755       list_context->pos = hobject_t::get_max();
3756     } else {
3757       // next pg
3758       list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3759                                     list_context->current_pg,
3760                                     list_context->pool_id, string());
3761     }
3762   } else {
3763     list_context->pos = response.handle;
3764   }
3765
3766   int response_size = response.entries.size();
3767   ldout(cct, 20) << " response.entries.size " << response_size
3768                  << ", response.entries " << response.entries
3769                  << ", handle " << response.handle
3770                  << ", tentative new pos " << list_context->pos << dendl;
3771   list_context->extra_info.append(extra_info);
3772   if (response_size) {
3773     list_context->list.splice(list_context->list.end(), response.entries);
3774   }
3775
3776   if (list_context->list.size() >= list_context->max_entries) {
3777     ldout(cct, 20) << " hit max, returning results so far, "
3778                    << list_context->list << dendl;
3779     // release the listing context's budget once all
3780     // OPs (in the session) are finished
3781     put_nlist_context_budget(list_context);
3782     final_finish->complete(0);
3783     return;
3784   }
3785
3786   // continue!
3787   list_nobjects(list_context, final_finish);
3788 }
3789
3790 void Objecter::put_nlist_context_budget(NListContext *list_context)
3791 {
3792   if (list_context->ctx_budget >= 0) {
3793     ldout(cct, 10) << " release listing context's budget " <<
3794       list_context->ctx_budget << dendl;
3795     put_op_budget_bytes(list_context->ctx_budget);
3796     list_context->ctx_budget = -1;
3797   }
3798 }
3799
3800 // snapshots
3801
3802 int Objecter::create_pool_snap(int64_t pool, string& snap_name,
3803                                Context *onfinish)
3804 {
3805   unique_lock wl(rwlock);
3806   ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: "
3807                  << snap_name << dendl;
3808
3809   const pg_pool_t *p = osdmap->get_pg_pool(pool);
3810   if (!p)
3811     return -EINVAL;
3812   if (p->snap_exists(snap_name.c_str()))
3813     return -EEXIST;
3814
3815   PoolOp *op = new PoolOp;
3816   if (!op)
3817     return -ENOMEM;
3818   op->tid = ++last_tid;
3819   op->pool = pool;
3820   op->name = snap_name;
3821   op->onfinish = onfinish;
3822   op->pool_op = POOL_OP_CREATE_SNAP;
3823   pool_ops[op->tid] = op;
3824
3825   pool_op_submit(op);
3826
3827   return 0;
3828 }
3829
3830 struct C_SelfmanagedSnap : public Context {
3831   bufferlist bl;
3832   snapid_t *psnapid;
3833   Context *fin;
3834   C_SelfmanagedSnap(snapid_t *ps, Context *f) : psnapid(ps), fin(f) {}
3835   void finish(int r) override {
3836     if (r == 0) {
3837       bufferlist::iterator p = bl.begin();
3838       ::decode(*psnapid, p);
3839     }
3840     fin->complete(r);
3841   }
3842 };
3843
3844 int Objecter::allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
3845                                         Context *onfinish)
3846 {
3847   unique_lock wl(rwlock);
3848   ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
3849   PoolOp *op = new PoolOp;
3850   if (!op) return -ENOMEM;
3851   op->tid = ++last_tid;
3852   op->pool = pool;
3853   C_SelfmanagedSnap *fin = new C_SelfmanagedSnap(psnapid, onfinish);
3854   op->onfinish = fin;
3855   op->blp = &fin->bl;
3856   op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
3857   pool_ops[op->tid] = op;
3858
3859   pool_op_submit(op);
3860   return 0;
3861 }
3862
3863 int Objecter::delete_pool_snap(int64_t pool, string& snap_name,
3864                                Context *onfinish)
3865 {
3866   unique_lock wl(rwlock);
3867   ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: "
3868                  << snap_name << dendl;
3869
3870   const pg_pool_t *p = osdmap->get_pg_pool(pool);
3871   if (!p)
3872     return -EINVAL;
3873   if (!p->snap_exists(snap_name.c_str()))
3874     return -ENOENT;
3875
3876   PoolOp *op = new PoolOp;
3877   if (!op)
3878     return -ENOMEM;
3879   op->tid = ++last_tid;
3880   op->pool = pool;
3881   op->name = snap_name;
3882   op->onfinish = onfinish;
3883   op->pool_op = POOL_OP_DELETE_SNAP;
3884   pool_ops[op->tid] = op;
3885
3886   pool_op_submit(op);
3887
3888   return 0;
3889 }
3890
3891 int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
3892                                       Context *onfinish)
3893 {
3894   unique_lock wl(rwlock);
3895   ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: "
3896                  << snap << dendl;
3897   PoolOp *op = new PoolOp;
3898   if (!op) return -ENOMEM;
3899   op->tid = ++last_tid;
3900   op->pool = pool;
3901   op->onfinish = onfinish;
3902   op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
3903   op->snapid = snap;
3904   pool_ops[op->tid] = op;
3905
3906   pool_op_submit(op);
3907
3908   return 0;
3909 }
3910
3911 int Objecter::create_pool(string& name, Context *onfinish, uint64_t auid,
3912                           int crush_rule)
3913 {
3914   unique_lock wl(rwlock);
3915   ldout(cct, 10) << "create_pool name=" << name << dendl;
3916
3917   if (osdmap->lookup_pg_pool_name(name) >= 0)
3918     return -EEXIST;
3919
3920   PoolOp *op = new PoolOp;
3921   if (!op)
3922     return -ENOMEM;
3923   op->tid = ++last_tid;
3924   op->pool = 0;
3925   op->name = name;
3926   op->onfinish = onfinish;
3927   op->pool_op = POOL_OP_CREATE;
3928   pool_ops[op->tid] = op;
3929   op->auid = auid;
3930   op->crush_rule = crush_rule;
3931
3932   pool_op_submit(op);
3933
3934   return 0;
3935 }
3936
3937 int Objecter::delete_pool(int64_t pool, Context *onfinish)
3938 {
3939   unique_lock wl(rwlock);
3940   ldout(cct, 10) << "delete_pool " << pool << dendl;
3941
3942   if (!osdmap->have_pg_pool(pool))
3943     return -ENOENT;
3944
3945   _do_delete_pool(pool, onfinish);
3946   return 0;
3947 }
3948
3949 int Objecter::delete_pool(const string &pool_name, Context *onfinish)
3950 {
3951   unique_lock wl(rwlock);
3952   ldout(cct, 10) << "delete_pool " << pool_name << dendl;
3953
3954   int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
3955   if (pool < 0)
3956     return pool;
3957
3958   _do_delete_pool(pool, onfinish);
3959   return 0;
3960 }
3961
3962 void Objecter::_do_delete_pool(int64_t pool, Context *onfinish)
3963 {
3964   PoolOp *op = new PoolOp;
3965   op->tid = ++last_tid;
3966   op->pool = pool;
3967   op->name = "delete";
3968   op->onfinish = onfinish;
3969   op->pool_op = POOL_OP_DELETE;
3970   pool_ops[op->tid] = op;
3971   pool_op_submit(op);
3972 }
3973
3974 /**
3975  * change the auid owner of a pool by contacting the monitor.
3976  * This requires the current connection to have write permissions
3977  * on both the pool's current auid and the new (parameter) auid.
3978  * Uses the standard Context callback when done.
3979  */
3980 int Objecter::change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid)
3981 {
3982   unique_lock wl(rwlock);
3983   ldout(cct, 10) << "change_pool_auid " << pool << " to " << auid << dendl;
3984   PoolOp *op = new PoolOp;
3985   if (!op) return -ENOMEM;
3986   op->tid = ++last_tid;
3987   op->pool = pool;
3988   op->name = "change_pool_auid";
3989   op->onfinish = onfinish;
3990   op->pool_op = POOL_OP_AUID_CHANGE;
3991   op->auid = auid;
3992   pool_ops[op->tid] = op;
3993
3994   logger->set(l_osdc_poolop_active, pool_ops.size());
3995
3996   pool_op_submit(op);
3997   return 0;
3998 }
3999
4000 void Objecter::pool_op_submit(PoolOp *op)
4001 {
4002   // rwlock is locked
4003   if (mon_timeout > timespan(0)) {
4004     op->ontimeout = timer.add_event(mon_timeout,
4005                                     [this, op]() {
4006                                       pool_op_cancel(op->tid, -ETIMEDOUT); });
4007   }
4008   _pool_op_submit(op);
4009 }
4010
4011 void Objecter::_pool_op_submit(PoolOp *op)
4012 {
4013   // rwlock is locked unique
4014
4015   ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
4016   MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
4017                            op->name, op->pool_op,
4018                            op->auid, last_seen_osdmap_version);
4019   if (op->snapid) m->snapid = op->snapid;
4020   if (op->crush_rule) m->crush_rule = op->crush_rule;
4021   monc->send_mon_message(m);
4022   op->last_submit = ceph::mono_clock::now();
4023
4024   logger->inc(l_osdc_poolop_send);
4025 }
4026
4027 /**
4028  * Handle a reply to a PoolOp message. Check that we sent the message
4029  * and give the caller responsibility for the returned bufferlist.
4030  * Then either call the finisher or stash the PoolOp, depending on if we
4031  * have a new enough map.
4032  * Lastly, clean up the message and PoolOp.
4033  */
4034 void Objecter::handle_pool_op_reply(MPoolOpReply *m)
4035 {
4036   FUNCTRACE();
4037   shunique_lock sul(rwlock, acquire_shared);
4038   if (!initialized) {
4039     sul.unlock();
4040     m->put();
4041     return;
4042   }
4043
4044   ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
4045   ceph_tid_t tid = m->get_tid();
4046   map<ceph_tid_t, PoolOp *>::iterator iter = pool_ops.find(tid);
4047   if (iter != pool_ops.end()) {
4048     PoolOp *op = iter->second;
4049     ldout(cct, 10) << "have request " << tid << " at " << op << " Op: "
4050                    << ceph_pool_op_name(op->pool_op) << dendl;
4051     if (op->blp)
4052       op->blp->claim(m->response_data);
4053     if (m->version > last_seen_osdmap_version)
4054       last_seen_osdmap_version = m->version;
4055     if (osdmap->get_epoch() < m->epoch) {
4056       sul.unlock();
4057       sul.lock();
4058       // recheck op existence since we have let go of rwlock
4059       // (for promotion) above.
4060       iter = pool_ops.find(tid);
4061       if (iter == pool_ops.end())
4062         goto done; // op is gone.
4063       if (osdmap->get_epoch() < m->epoch) {
4064         ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
4065                        << " before calling back" << dendl;
4066         _wait_for_new_map(op->onfinish, m->epoch, m->replyCode);
4067       } else {
4068         // map epoch changed, probably because a MOSDMap message
4069         // sneaked in. Do caller-specified callback now or else
4070         // we lose it forever.
4071         assert(op->onfinish);
4072         op->onfinish->complete(m->replyCode);
4073       }
4074     } else {
4075       assert(op->onfinish);
4076       op->onfinish->complete(m->replyCode);
4077     }
4078     op->onfinish = NULL;
4079     if (!sul.owns_lock()) {
4080       sul.unlock();
4081       sul.lock();
4082     }
4083     iter = pool_ops.find(tid);
4084     if (iter != pool_ops.end()) {
4085       _finish_pool_op(op, 0);
4086     }
4087   } else {
4088     ldout(cct, 10) << "unknown request " << tid << dendl;
4089   }
4090
4091 done:
4092   // Not strictly necessary, since we'll release it on return.
4093   sul.unlock();
4094
4095   ldout(cct, 10) << "done" << dendl;
4096   m->put();
4097 }
4098
4099 int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
4100 {
4101   assert(initialized);
4102
4103   unique_lock wl(rwlock);
4104
4105   map<ceph_tid_t, PoolOp*>::iterator it = pool_ops.find(tid);
4106   if (it == pool_ops.end()) {
4107     ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4108     return -ENOENT;
4109   }
4110
4111   ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4112
4113   PoolOp *op = it->second;
4114   if (op->onfinish)
4115     op->onfinish->complete(r);
4116
4117   _finish_pool_op(op, r);
4118   return 0;
4119 }
4120
4121 void Objecter::_finish_pool_op(PoolOp *op, int r)
4122 {
4123   // rwlock is locked unique
4124   pool_ops.erase(op->tid);
4125   logger->set(l_osdc_poolop_active, pool_ops.size());
4126
4127   if (op->ontimeout && r != -ETIMEDOUT) {
4128     timer.cancel_event(op->ontimeout);
4129   }
4130
4131   delete op;
4132 }
4133
4134 // pool stats
4135
4136 void Objecter::get_pool_stats(list<string>& pools,
4137                               map<string,pool_stat_t> *result,
4138                               Context *onfinish)
4139 {
4140   ldout(cct, 10) << "get_pool_stats " << pools << dendl;
4141
4142   PoolStatOp *op = new PoolStatOp;
4143   op->tid = ++last_tid;
4144   op->pools = pools;
4145   op->pool_stats = result;
4146   op->onfinish = onfinish;
4147   if (mon_timeout > timespan(0)) {
4148     op->ontimeout = timer.add_event(mon_timeout,
4149                                     [this, op]() {
4150                                       pool_stat_op_cancel(op->tid,
4151                                                           -ETIMEDOUT); });
4152   } else {
4153     op->ontimeout = 0;
4154   }
4155
4156   unique_lock wl(rwlock);
4157
4158   poolstat_ops[op->tid] = op;
4159
4160   logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4161
4162   _poolstat_submit(op);
4163 }
4164
4165 void Objecter::_poolstat_submit(PoolStatOp *op)
4166 {
4167   ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl;
4168   monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
4169                                            op->pools,
4170                                            last_seen_pgmap_version));
4171   op->last_submit = ceph::mono_clock::now();
4172
4173   logger->inc(l_osdc_poolstat_send);
4174 }
4175
4176 void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
4177 {
4178   ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
4179   ceph_tid_t tid = m->get_tid();
4180
4181   unique_lock wl(rwlock);
4182   if (!initialized) {
4183     m->put();
4184     return;
4185   }
4186
4187   map<ceph_tid_t, PoolStatOp *>::iterator iter = poolstat_ops.find(tid);
4188   if (iter != poolstat_ops.end()) {
4189     PoolStatOp *op = poolstat_ops[tid];
4190     ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4191     *op->pool_stats = m->pool_stats;
4192     if (m->version > last_seen_pgmap_version) {
4193       last_seen_pgmap_version = m->version;
4194     }
4195     op->onfinish->complete(0);
4196     _finish_pool_stat_op(op, 0);
4197   } else {
4198     ldout(cct, 10) << "unknown request " << tid << dendl;
4199   }
4200   ldout(cct, 10) << "done" << dendl;
4201   m->put();
4202 }
4203
4204 int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
4205 {
4206   assert(initialized);
4207
4208   unique_lock wl(rwlock);
4209
4210   map<ceph_tid_t, PoolStatOp*>::iterator it = poolstat_ops.find(tid);
4211   if (it == poolstat_ops.end()) {
4212     ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4213     return -ENOENT;
4214   }
4215
4216   ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4217
4218   PoolStatOp *op = it->second;
4219   if (op->onfinish)
4220     op->onfinish->complete(r);
4221   _finish_pool_stat_op(op, r);
4222   return 0;
4223 }
4224
4225 void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
4226 {
4227   // rwlock is locked unique
4228
4229   poolstat_ops.erase(op->tid);
4230   logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4231
4232   if (op->ontimeout && r != -ETIMEDOUT)
4233     timer.cancel_event(op->ontimeout);
4234
4235   delete op;
4236 }
4237
4238 void Objecter::get_fs_stats(ceph_statfs& result,
4239                             boost::optional<int64_t> data_pool,
4240                             Context *onfinish)
4241 {
4242   ldout(cct, 10) << "get_fs_stats" << dendl;
4243   unique_lock l(rwlock);
4244
4245   StatfsOp *op = new StatfsOp;
4246   op->tid = ++last_tid;
4247   op->stats = &result;
4248   op->data_pool = data_pool;
4249   op->onfinish = onfinish;
4250   if (mon_timeout > timespan(0)) {
4251     op->ontimeout = timer.add_event(mon_timeout,
4252                                     [this, op]() {
4253                                       statfs_op_cancel(op->tid,
4254                                                        -ETIMEDOUT); });
4255   } else {
4256     op->ontimeout = 0;
4257   }
4258   statfs_ops[op->tid] = op;
4259
4260   logger->set(l_osdc_statfs_active, statfs_ops.size());
4261
4262   _fs_stats_submit(op);
4263 }
4264
4265 void Objecter::_fs_stats_submit(StatfsOp *op)
4266 {
4267   // rwlock is locked unique
4268
4269   ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
4270   monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
4271                                      op->data_pool,
4272                                      last_seen_pgmap_version));
4273   op->last_submit = ceph::mono_clock::now();
4274
4275   logger->inc(l_osdc_statfs_send);
4276 }
4277
4278 void Objecter::handle_fs_stats_reply(MStatfsReply *m)
4279 {
4280   unique_lock wl(rwlock);
4281   if (!initialized) {
4282     m->put();
4283     return;
4284   }
4285
4286   ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl;
4287   ceph_tid_t tid = m->get_tid();
4288
4289   if (statfs_ops.count(tid)) {
4290     StatfsOp *op = statfs_ops[tid];
4291     ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4292     *(op->stats) = m->h.st;
4293     if (m->h.version > last_seen_pgmap_version)
4294       last_seen_pgmap_version = m->h.version;
4295     op->onfinish->complete(0);
4296     _finish_statfs_op(op, 0);
4297   } else {
4298     ldout(cct, 10) << "unknown request " << tid << dendl;
4299   }
4300   m->put();
4301   ldout(cct, 10) << "done" << dendl;
4302 }
4303
4304 int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
4305 {
4306   assert(initialized);
4307
4308   unique_lock wl(rwlock);
4309
4310   map<ceph_tid_t, StatfsOp*>::iterator it = statfs_ops.find(tid);
4311   if (it == statfs_ops.end()) {
4312     ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4313     return -ENOENT;
4314   }
4315
4316   ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4317
4318   StatfsOp *op = it->second;
4319   if (op->onfinish)
4320     op->onfinish->complete(r);
4321   _finish_statfs_op(op, r);
4322   return 0;
4323 }
4324
4325 void Objecter::_finish_statfs_op(StatfsOp *op, int r)
4326 {
4327   // rwlock is locked unique
4328
4329   statfs_ops.erase(op->tid);
4330   logger->set(l_osdc_statfs_active, statfs_ops.size());
4331
4332   if (op->ontimeout && r != -ETIMEDOUT)
4333     timer.cancel_event(op->ontimeout);
4334
4335   delete op;
4336 }
4337
4338 // scatter/gather
4339
4340 void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
4341                                vector<bufferlist>& resultbl,
4342                                bufferlist *bl, Context *onfinish)
4343 {
4344   // all done
4345   ldout(cct, 15) << "_sg_read_finish" << dendl;
4346
4347   if (extents.size() > 1) {
4348     Striper::StripedReadResult r;
4349     vector<bufferlist>::iterator bit = resultbl.begin();
4350     for (vector<ObjectExtent>::iterator eit = extents.begin();
4351          eit != extents.end();
4352          ++eit, ++bit) {
4353       r.add_partial_result(cct, *bit, eit->buffer_extents);
4354     }
4355     bl->clear();
4356     r.assemble_result(cct, *bl, false);
4357   } else {
4358     ldout(cct, 15) << "  only one frag" << dendl;
4359     bl->claim(resultbl[0]);
4360   }
4361
4362   // done
4363   uint64_t bytes_read = bl->length();
4364   ldout(cct, 7) << "_sg_read_finish " << bytes_read << " bytes" << dendl;
4365
4366   if (onfinish) {
4367     onfinish->complete(bytes_read);// > 0 ? bytes_read:m->get_result());
4368   }
4369 }
4370
4371
4372 void Objecter::ms_handle_connect(Connection *con)
4373 {
4374   ldout(cct, 10) << "ms_handle_connect " << con << dendl;
4375   if (!initialized)
4376     return;
4377
4378   if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4379     resend_mon_ops();
4380 }
4381
4382 bool Objecter::ms_handle_reset(Connection *con)
4383 {
4384   if (!initialized)
4385     return false;
4386   if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
4387     OSDSession *session = static_cast<OSDSession*>(con->get_priv());
4388     if (session) {
4389       ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
4390                     << " osd." << session->osd << dendl;
4391       unique_lock wl(rwlock);
4392       if (!initialized) {
4393         wl.unlock();
4394         return false;
4395       }
4396       map<uint64_t, LingerOp *> lresend;
4397       OSDSession::unique_lock sl(session->lock);
4398       _reopen_session(session);
4399       _kick_requests(session, lresend);
4400       sl.unlock();
4401       _linger_ops_resend(lresend, wl);
4402       wl.unlock();
4403       maybe_request_map();
4404       session->put();
4405     }
4406     return true;
4407   }
4408   return false;
4409 }
4410
4411 void Objecter::ms_handle_remote_reset(Connection *con)
4412 {
4413   /*
4414    * treat these the same.
4415    */
4416   ms_handle_reset(con);
4417 }
4418
4419 bool Objecter::ms_handle_refused(Connection *con)
4420 {
4421   // just log for now
4422   if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) {
4423     int osd = osdmap->identify_osd(con->get_peer_addr());
4424     if (osd >= 0) {
4425       ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl;
4426     }
4427   }
4428   return false;
4429 }
4430
4431 bool Objecter::ms_get_authorizer(int dest_type,
4432                                  AuthAuthorizer **authorizer,
4433                                  bool force_new)
4434 {
4435   if (!initialized)
4436     return false;
4437   if (dest_type == CEPH_ENTITY_TYPE_MON)
4438     return true;
4439   *authorizer = monc->build_authorizer(dest_type);
4440   return *authorizer != NULL;
4441 }
4442
4443 void Objecter::op_target_t::dump(Formatter *f) const
4444 {
4445   f->dump_stream("pg") << pgid;
4446   f->dump_int("osd", osd);
4447   f->dump_stream("object_id") << base_oid;
4448   f->dump_stream("object_locator") << base_oloc;
4449   f->dump_stream("target_object_id") << target_oid;
4450   f->dump_stream("target_object_locator") << target_oloc;
4451   f->dump_int("paused", (int)paused);
4452   f->dump_int("used_replica", (int)used_replica);
4453   f->dump_int("precalc_pgid", (int)precalc_pgid);
4454 }
4455
4456 void Objecter::_dump_active(OSDSession *s)
4457 {
4458   for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
4459        p != s->ops.end();
4460        ++p) {
4461     Op *op = p->second;
4462     ldout(cct, 20) << op->tid << "\t" << op->target.pgid
4463                    << "\tosd." << (op->session ? op->session->osd : -1)
4464                    << "\t" << op->target.base_oid
4465                    << "\t" << op->ops << dendl;
4466   }
4467 }
4468
4469 void Objecter::_dump_active()
4470 {
4471   ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless"
4472                  << dendl;
4473   for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
4474        siter != osd_sessions.end(); ++siter) {
4475     OSDSession *s = siter->second;
4476     OSDSession::shared_lock sl(s->lock);
4477     _dump_active(s);
4478     sl.unlock();
4479   }
4480   _dump_active(homeless_session);
4481 }
4482
4483 void Objecter::dump_active()
4484 {
4485   shared_lock rl(rwlock);
4486   _dump_active();
4487   rl.unlock();
4488 }
4489
4490 void Objecter::dump_requests(Formatter *fmt)
4491 {
4492   // Read-lock on Objecter held here
4493   fmt->open_object_section("requests");
4494   dump_ops(fmt);
4495   dump_linger_ops(fmt);
4496   dump_pool_ops(fmt);
4497   dump_pool_stat_ops(fmt);
4498   dump_statfs_ops(fmt);
4499   dump_command_ops(fmt);
4500   fmt->close_section(); // requests object
4501 }
4502
4503 void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
4504 {
4505   for (map<ceph_tid_t,Op*>::const_iterator p = s->ops.begin();
4506        p != s->ops.end();
4507        ++p) {
4508     Op *op = p->second;
4509     fmt->open_object_section("op");
4510     fmt->dump_unsigned("tid", op->tid);
4511     op->target.dump(fmt);
4512     fmt->dump_stream("last_sent") << op->stamp;
4513     fmt->dump_int("attempts", op->attempts);
4514     fmt->dump_stream("snapid") << op->snapid;
4515     fmt->dump_stream("snap_context") << op->snapc;
4516     fmt->dump_stream("mtime") << op->mtime;
4517
4518     fmt->open_array_section("osd_ops");
4519     for (vector<OSDOp>::const_iterator it = op->ops.begin();
4520          it != op->ops.end();
4521          ++it) {
4522       fmt->dump_stream("osd_op") << *it;
4523     }
4524     fmt->close_section(); // osd_ops array
4525
4526     fmt->close_section(); // op object
4527   }
4528 }
4529
4530 void Objecter::dump_ops(Formatter *fmt)
4531 {
4532   // Read-lock on Objecter held
4533   fmt->open_array_section("ops");
4534   for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4535        siter != osd_sessions.end(); ++siter) {
4536     OSDSession *s = siter->second;
4537     OSDSession::shared_lock sl(s->lock);
4538     _dump_ops(s, fmt);
4539     sl.unlock();
4540   }
4541   _dump_ops(homeless_session, fmt);
4542   fmt->close_section(); // ops array
4543 }
4544
4545 void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
4546 {
4547   for (map<uint64_t, LingerOp*>::const_iterator p = s->linger_ops.begin();
4548        p != s->linger_ops.end();
4549        ++p) {
4550     LingerOp *op = p->second;
4551     fmt->open_object_section("linger_op");
4552     fmt->dump_unsigned("linger_id", op->linger_id);
4553     op->target.dump(fmt);
4554     fmt->dump_stream("snapid") << op->snap;
4555     fmt->dump_stream("registered") << op->registered;
4556     fmt->close_section(); // linger_op object
4557   }
4558 }
4559
4560 void Objecter::dump_linger_ops(Formatter *fmt)
4561 {
4562   // We have a read-lock on the objecter
4563   fmt->open_array_section("linger_ops");
4564   for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4565        siter != osd_sessions.end(); ++siter) {
4566     OSDSession *s = siter->second;
4567     OSDSession::shared_lock sl(s->lock);
4568     _dump_linger_ops(s, fmt);
4569     sl.unlock();
4570   }
4571   _dump_linger_ops(homeless_session, fmt);
4572   fmt->close_section(); // linger_ops array
4573 }
4574
4575 void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
4576 {
4577   for (map<uint64_t, CommandOp*>::const_iterator p = s->command_ops.begin();
4578        p != s->command_ops.end();
4579        ++p) {
4580     CommandOp *op = p->second;
4581     fmt->open_object_section("command_op");
4582     fmt->dump_unsigned("command_id", op->tid);
4583     fmt->dump_int("osd", op->session ? op->session->osd : -1);
4584     fmt->open_array_section("command");
4585     for (vector<string>::const_iterator q = op->cmd.begin();
4586          q != op->cmd.end(); ++q)
4587       fmt->dump_string("word", *q);
4588     fmt->close_section();
4589     if (op->target_osd >= 0)
4590       fmt->dump_int("target_osd", op->target_osd);
4591     else
4592       fmt->dump_stream("target_pg") << op->target_pg;
4593     fmt->close_section(); // command_op object
4594   }
4595 }
4596
4597 void Objecter::dump_command_ops(Formatter *fmt)
4598 {
4599   // We have a read-lock on the Objecter here
4600   fmt->open_array_section("command_ops");
4601   for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4602        siter != osd_sessions.end(); ++siter) {
4603     OSDSession *s = siter->second;
4604     OSDSession::shared_lock sl(s->lock);
4605     _dump_command_ops(s, fmt);
4606     sl.unlock();
4607   }
4608   _dump_command_ops(homeless_session, fmt);
4609   fmt->close_section(); // command_ops array
4610 }
4611
4612 void Objecter::dump_pool_ops(Formatter *fmt) const
4613 {
4614   fmt->open_array_section("pool_ops");
4615   for (map<ceph_tid_t, PoolOp*>::const_iterator p = pool_ops.begin();
4616        p != pool_ops.end();
4617        ++p) {
4618     PoolOp *op = p->second;
4619     fmt->open_object_section("pool_op");
4620     fmt->dump_unsigned("tid", op->tid);
4621     fmt->dump_int("pool", op->pool);
4622     fmt->dump_string("name", op->name);
4623     fmt->dump_int("operation_type", op->pool_op);
4624     fmt->dump_unsigned("auid", op->auid);
4625     fmt->dump_unsigned("crush_rule", op->crush_rule);
4626     fmt->dump_stream("snapid") << op->snapid;
4627     fmt->dump_stream("last_sent") << op->last_submit;
4628     fmt->close_section(); // pool_op object
4629   }
4630   fmt->close_section(); // pool_ops array
4631 }
4632
4633 void Objecter::dump_pool_stat_ops(Formatter *fmt) const
4634 {
4635   fmt->open_array_section("pool_stat_ops");
4636   for (map<ceph_tid_t, PoolStatOp*>::const_iterator p = poolstat_ops.begin();
4637        p != poolstat_ops.end();
4638        ++p) {
4639     PoolStatOp *op = p->second;
4640     fmt->open_object_section("pool_stat_op");
4641     fmt->dump_unsigned("tid", op->tid);
4642     fmt->dump_stream("last_sent") << op->last_submit;
4643
4644     fmt->open_array_section("pools");
4645     for (list<string>::const_iterator it = op->pools.begin();
4646          it != op->pools.end();
4647          ++it) {
4648       fmt->dump_string("pool", *it);
4649     }
4650     fmt->close_section(); // pools array
4651
4652     fmt->close_section(); // pool_stat_op object
4653   }
4654   fmt->close_section(); // pool_stat_ops array
4655 }
4656
4657 void Objecter::dump_statfs_ops(Formatter *fmt) const
4658 {
4659   fmt->open_array_section("statfs_ops");
4660   for (map<ceph_tid_t, StatfsOp*>::const_iterator p = statfs_ops.begin();
4661        p != statfs_ops.end();
4662        ++p) {
4663     StatfsOp *op = p->second;
4664     fmt->open_object_section("statfs_op");
4665     fmt->dump_unsigned("tid", op->tid);
4666     fmt->dump_stream("last_sent") << op->last_submit;
4667     fmt->close_section(); // statfs_op object
4668   }
4669   fmt->close_section(); // statfs_ops array
4670 }
4671
4672 Objecter::RequestStateHook::RequestStateHook(Objecter *objecter) :
4673   m_objecter(objecter)
4674 {
4675 }
4676
4677 bool Objecter::RequestStateHook::call(std::string command, cmdmap_t& cmdmap,
4678                                       std::string format, bufferlist& out)
4679 {
4680   Formatter *f = Formatter::create(format, "json-pretty", "json-pretty");
4681   shared_lock rl(m_objecter->rwlock);
4682   m_objecter->dump_requests(f);
4683   f->flush(out);
4684   delete f;
4685   return true;
4686 }
4687
4688 void Objecter::blacklist_self(bool set)
4689 {
4690   ldout(cct, 10) << "blacklist_self " << (set ? "add" : "rm") << dendl;
4691
4692   vector<string> cmd;
4693   cmd.push_back("{\"prefix\":\"osd blacklist\", ");
4694   if (set)
4695     cmd.push_back("\"blacklistop\":\"add\",");
4696   else
4697     cmd.push_back("\"blacklistop\":\"rm\",");
4698   stringstream ss;
4699   ss << messenger->get_myaddr();
4700   cmd.push_back("\"addr\":\"" + ss.str() + "\"");
4701
4702   MMonCommand *m = new MMonCommand(monc->get_fsid());
4703   m->cmd = cmd;
4704
4705   monc->send_mon_message(m);
4706 }
4707
4708 // commands
4709
4710 void Objecter::handle_command_reply(MCommandReply *m)
4711 {
4712   unique_lock wl(rwlock);
4713   if (!initialized) {
4714     m->put();
4715     return;
4716   }
4717
4718   ConnectionRef con = m->get_connection();
4719   OSDSession *s = static_cast<OSDSession*>(con->get_priv());
4720   if (!s || s->con != con) {
4721     ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
4722     m->put();
4723     if (s)
4724       s->put();
4725     return;
4726   }
4727
4728   OSDSession::shared_lock sl(s->lock);
4729   map<ceph_tid_t,CommandOp*>::iterator p = s->command_ops.find(m->get_tid());
4730   if (p == s->command_ops.end()) {
4731     ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4732                    << " not found" << dendl;
4733     m->put();
4734     sl.unlock();
4735     if (s)
4736       s->put();
4737     return;
4738   }
4739
4740   CommandOp *c = p->second;
4741   if (!c->session ||
4742       m->get_connection() != c->session->con) {
4743     ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4744                    << " got reply from wrong connection "
4745                    << m->get_connection() << " " << m->get_source_inst()
4746                    << dendl;
4747     m->put();
4748     sl.unlock();
4749     if (s)
4750       s->put();
4751     return;
4752   }
4753   if (c->poutbl) {
4754     c->poutbl->claim(m->get_data());
4755   }
4756
4757   sl.unlock();
4758
4759
4760   _finish_command(c, m->r, m->rs);
4761   m->put();
4762   if (s)
4763     s->put();
4764 }
4765
4766 void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
4767 {
4768   shunique_lock sul(rwlock, ceph::acquire_unique);
4769
4770   ceph_tid_t tid = ++last_tid;
4771   ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
4772   c->tid = tid;
4773
4774   {
4775     OSDSession::unique_lock hs_wl(homeless_session->lock);
4776     _session_command_op_assign(homeless_session, c);
4777   }
4778
4779   _calc_command_target(c, sul);
4780   _assign_command_session(c, sul);
4781   if (osd_timeout > timespan(0)) {
4782     c->ontimeout = timer.add_event(osd_timeout,
4783                                    [this, c, tid]() {
4784                                      command_op_cancel(c->session, tid,
4785                                                        -ETIMEDOUT); });
4786   }
4787
4788   if (!c->session->is_homeless()) {
4789     _send_command(c);
4790   } else {
4791     _maybe_request_map();
4792   }
4793   if (c->map_check_error)
4794     _send_command_map_check(c);
4795   *ptid = tid;
4796
4797   logger->inc(l_osdc_command_active);
4798 }
4799
4800 int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
4801 {
4802   assert(sul.owns_lock() && sul.mutex() == &rwlock);
4803
4804   c->map_check_error = 0;
4805
4806   // ignore overlays, just like we do with pg ops
4807   c->target.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
4808
4809   if (c->target_osd >= 0) {
4810     if (!osdmap->exists(c->target_osd)) {
4811       c->map_check_error = -ENOENT;
4812       c->map_check_error_str = "osd dne";
4813       c->target.osd = -1;
4814       return RECALC_OP_TARGET_OSD_DNE;
4815     }
4816     if (osdmap->is_down(c->target_osd)) {
4817       c->map_check_error = -ENXIO;
4818       c->map_check_error_str = "osd down";
4819       c->target.osd = -1;
4820       return RECALC_OP_TARGET_OSD_DOWN;
4821     }
4822     c->target.osd = c->target_osd;
4823   } else {
4824     int ret = _calc_target(&(c->target), nullptr, true);
4825     if (ret == RECALC_OP_TARGET_POOL_DNE) {
4826       c->map_check_error = -ENOENT;
4827       c->map_check_error_str = "pool dne";
4828       c->target.osd = -1;
4829       return ret;
4830     } else if (ret == RECALC_OP_TARGET_OSD_DOWN) {
4831       c->map_check_error = -ENXIO;
4832       c->map_check_error_str = "osd down";
4833       c->target.osd = -1;
4834       return ret;
4835     }
4836   }
4837
4838   OSDSession *s;
4839   int r = _get_session(c->target.osd, &s, sul);
4840   assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
4841
4842   if (c->session != s) {
4843     put_session(s);
4844     return RECALC_OP_TARGET_NEED_RESEND;
4845   }
4846
4847   put_session(s);
4848
4849   ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, "
4850                  << c->session << dendl;
4851
4852   return RECALC_OP_TARGET_NO_ACTION;
4853 }
4854
4855 void Objecter::_assign_command_session(CommandOp *c,
4856                                        shunique_lock& sul)
4857 {
4858   assert(sul.owns_lock() && sul.mutex() == &rwlock);
4859
4860   OSDSession *s;
4861   int r = _get_session(c->target.osd, &s, sul);
4862   assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
4863
4864   if (c->session != s) {
4865     if (c->session) {
4866       OSDSession *cs = c->session;
4867       OSDSession::unique_lock csl(cs->lock);
4868       _session_command_op_remove(c->session, c);
4869       csl.unlock();
4870     }
4871     OSDSession::unique_lock sl(s->lock);
4872     _session_command_op_assign(s, c);
4873   }
4874
4875   put_session(s);
4876 }
4877
4878 void Objecter::_send_command(CommandOp *c)
4879 {
4880   ldout(cct, 10) << "_send_command " << c->tid << dendl;
4881   assert(c->session);
4882   assert(c->session->con);
4883   MCommand *m = new MCommand(monc->monmap.fsid);
4884   m->cmd = c->cmd;
4885   m->set_data(c->inbl);
4886   m->set_tid(c->tid);
4887   c->session->con->send_message(m);
4888   logger->inc(l_osdc_command_send);
4889 }
4890
4891 int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
4892 {
4893   assert(initialized);
4894
4895   unique_lock wl(rwlock);
4896
4897   map<ceph_tid_t, CommandOp*>::iterator it = s->command_ops.find(tid);
4898   if (it == s->command_ops.end()) {
4899     ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4900     return -ENOENT;
4901   }
4902
4903   ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4904
4905   CommandOp *op = it->second;
4906   _command_cancel_map_check(op);
4907   _finish_command(op, r, "");
4908   return 0;
4909 }
4910
4911 void Objecter::_finish_command(CommandOp *c, int r, string rs)
4912 {
4913   // rwlock is locked unique
4914
4915   ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " "
4916                  << rs << dendl;
4917   if (c->prs)
4918     *c->prs = rs;
4919   if (c->onfinish)
4920     c->onfinish->complete(r);
4921
4922   if (c->ontimeout && r != -ETIMEDOUT)
4923     timer.cancel_event(c->ontimeout);
4924
4925   OSDSession *s = c->session;
4926   OSDSession::unique_lock sl(s->lock);
4927   _session_command_op_remove(c->session, c);
4928   sl.unlock();
4929
4930   c->put();
4931
4932   logger->dec(l_osdc_command_active);
4933 }
4934
4935 Objecter::OSDSession::~OSDSession()
4936 {
4937   // Caller is responsible for re-assigning or
4938   // destroying any ops that were assigned to us
4939   assert(ops.empty());
4940   assert(linger_ops.empty());
4941   assert(command_ops.empty());
4942 }
4943
4944 Objecter::~Objecter()
4945 {
4946   delete osdmap;
4947
4948   assert(homeless_session->get_nref() == 1);
4949   assert(num_homeless_ops == 0);
4950   homeless_session->put();
4951
4952   assert(osd_sessions.empty());
4953   assert(poolstat_ops.empty());
4954   assert(statfs_ops.empty());
4955   assert(pool_ops.empty());
4956   assert(waiting_for_map.empty());
4957   assert(linger_ops.empty());
4958   assert(check_latest_map_lingers.empty());
4959   assert(check_latest_map_ops.empty());
4960   assert(check_latest_map_commands.empty());
4961
4962   assert(!m_request_state_hook);
4963   assert(!logger);
4964 }
4965
4966 /**
4967  * Wait until this OSD map epoch is received before
4968  * sending any more operations to OSDs.  Use this
4969  * when it is known that the client can't trust
4970  * anything from before this epoch (e.g. due to
4971  * client blacklist at this epoch).
4972  */
4973 void Objecter::set_epoch_barrier(epoch_t epoch)
4974 {
4975   unique_lock wl(rwlock);
4976
4977   ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was "
4978                 << epoch_barrier << ") current epoch " << osdmap->get_epoch()
4979                 << dendl;
4980   if (epoch > epoch_barrier) {
4981     epoch_barrier = epoch;
4982     _maybe_request_map();
4983   }
4984 }
4985
4986
4987
4988 hobject_t Objecter::enumerate_objects_begin()
4989 {
4990   return hobject_t();
4991 }
4992
4993 hobject_t Objecter::enumerate_objects_end()
4994 {
4995   return hobject_t::get_max();
4996 }
4997
4998 struct C_EnumerateReply : public Context {
4999   bufferlist bl;
5000
5001   Objecter *objecter;
5002   hobject_t *next;
5003   std::list<librados::ListObjectImpl> *result;
5004   const hobject_t end;
5005   const int64_t pool_id;
5006   Context *on_finish;
5007
5008   epoch_t epoch;
5009   int budget;
5010
5011   C_EnumerateReply(Objecter *objecter_, hobject_t *next_,
5012       std::list<librados::ListObjectImpl> *result_,
5013       const hobject_t end_, const int64_t pool_id_, Context *on_finish_) :
5014     objecter(objecter_), next(next_), result(result_),
5015     end(end_), pool_id(pool_id_), on_finish(on_finish_),
5016     epoch(0), budget(0)
5017   {}
5018
5019   void finish(int r) override {
5020     objecter->_enumerate_reply(
5021       bl, r, end, pool_id, budget, epoch, result, next, on_finish);
5022   }
5023 };
5024
5025 void Objecter::enumerate_objects(
5026     int64_t pool_id,
5027     const std::string &ns,
5028     const hobject_t &start,
5029     const hobject_t &end,
5030     const uint32_t max,
5031     const bufferlist &filter_bl,
5032     std::list<librados::ListObjectImpl> *result, 
5033     hobject_t *next,
5034     Context *on_finish)
5035 {
5036   assert(result);
5037
5038   if (!end.is_max() && start > end) {
5039     lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
5040     on_finish->complete(-EINVAL);
5041     return;
5042   }
5043
5044   if (max < 1) {
5045     lderr(cct) << __func__ << ": result size may not be zero" << dendl;
5046     on_finish->complete(-EINVAL);
5047     return;
5048   }
5049
5050   if (start.is_max()) {
5051     on_finish->complete(0);
5052     return;
5053   }
5054
5055   shared_lock rl(rwlock);
5056   assert(osdmap->get_epoch());
5057   if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
5058     rl.unlock();
5059     lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
5060     on_finish->complete(-EOPNOTSUPP);
5061     return;
5062   }
5063   const pg_pool_t *p = osdmap->get_pg_pool(pool_id);
5064   if (!p) {
5065     lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch "
5066                << osdmap->get_epoch() << dendl;
5067     rl.unlock();
5068     on_finish->complete(-ENOENT);
5069     return;
5070   } else {
5071     rl.unlock();
5072   }
5073
5074   ldout(cct, 20) << __func__ << ": start=" << start << " end=" << end << dendl;
5075
5076   // Stash completion state
5077   C_EnumerateReply *on_ack = new C_EnumerateReply(
5078       this, next, result, end, pool_id, on_finish);
5079
5080   ObjectOperation op;
5081   op.pg_nls(max, filter_bl, start, 0);
5082
5083   // Issue.  See you later in _enumerate_reply
5084   object_locator_t oloc(pool_id, ns);
5085   pg_read(start.get_hash(), oloc, op,
5086           &on_ack->bl, 0, on_ack, &on_ack->epoch, &on_ack->budget);
5087 }
5088
5089 void Objecter::_enumerate_reply(
5090     bufferlist &bl,
5091     int r,
5092     const hobject_t &end,
5093     const int64_t pool_id,
5094     int budget,
5095     epoch_t reply_epoch,
5096     std::list<librados::ListObjectImpl> *result,
5097     hobject_t *next,
5098     Context *on_finish)
5099 {
5100   if (budget > 0) {
5101     put_op_budget_bytes(budget);
5102   }
5103
5104   if (r < 0) {
5105     ldout(cct, 4) << __func__ << ": remote error " << r << dendl;
5106     on_finish->complete(r);
5107     return;
5108   }
5109
5110   assert(next != NULL);
5111
5112   // Decode the results
5113   bufferlist::iterator iter = bl.begin();
5114   pg_nls_response_t response;
5115
5116   // XXX extra_info doesn't seem used anywhere?
5117   bufferlist extra_info;
5118   ::decode(response, iter);
5119   if (!iter.end()) {
5120     ::decode(extra_info, iter);
5121   }
5122
5123   ldout(cct, 10) << __func__ << ": got " << response.entries.size()
5124                  << " handle " << response.handle
5125                  << " reply_epoch " << reply_epoch << dendl;
5126   ldout(cct, 20) << __func__ << ": response.entries.size "
5127                  << response.entries.size() << ", response.entries "
5128                  << response.entries << dendl;
5129   if (response.handle <= end) {
5130     *next = response.handle;
5131   } else {
5132     ldout(cct, 10) << __func__ << ": adjusted next down to end " << end
5133                    << dendl;
5134     *next = end;
5135
5136     // drop anything after 'end'
5137     shared_lock rl(rwlock);
5138     const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
5139     if (!pool) {
5140       // pool is gone, drop any results which are now meaningless.
5141       rl.unlock();
5142       on_finish->complete(-ENOENT);
5143       return;
5144     }
5145     while (!response.entries.empty()) {
5146       uint32_t hash = response.entries.back().locator.empty() ?
5147         pool->hash_key(response.entries.back().oid,
5148                        response.entries.back().nspace) :
5149         pool->hash_key(response.entries.back().locator,
5150                        response.entries.back().nspace);
5151       hobject_t last(response.entries.back().oid,
5152                      response.entries.back().locator,
5153                      CEPH_NOSNAP,
5154                      hash,
5155                      pool_id,
5156                      response.entries.back().nspace);
5157       if (last < end)
5158         break;
5159       ldout(cct, 20) << __func__ << " dropping item " << last
5160                      << " >= end " << end << dendl;
5161       response.entries.pop_back();
5162     }
5163     rl.unlock();
5164   }
5165   if (!response.entries.empty()) {
5166     result->merge(response.entries);
5167   }
5168
5169   // release the listing context's budget once all
5170   // OPs (in the session) are finished
5171 #if 0
5172   put_nlist_context_budget(list_context);
5173 #endif
5174   on_finish->complete(r);
5175   return;
5176 }
5177
5178 namespace {
5179   using namespace librados;
5180
5181   template <typename T>
5182   void do_decode(std::vector<T>& items, std::vector<bufferlist>& bls)
5183   {
5184     for (auto bl : bls) {
5185       auto p = bl.begin();
5186       T t;
5187       decode(t, p);
5188       items.push_back(t);
5189     }
5190   }
5191
5192   struct C_ObjectOperation_scrub_ls : public Context {
5193     bufferlist bl;
5194     uint32_t *interval;
5195     std::vector<inconsistent_obj_t> *objects = nullptr;
5196     std::vector<inconsistent_snapset_t> *snapsets = nullptr;
5197     int *rval;
5198
5199     C_ObjectOperation_scrub_ls(uint32_t *interval,
5200                                std::vector<inconsistent_obj_t> *objects,
5201                                int *rval)
5202       : interval(interval), objects(objects), rval(rval) {}
5203     C_ObjectOperation_scrub_ls(uint32_t *interval,
5204                                std::vector<inconsistent_snapset_t> *snapsets,
5205                                int *rval)
5206       : interval(interval), snapsets(snapsets), rval(rval) {}
5207     void finish(int r) override {
5208       if (r < 0 && r != -EAGAIN) {
5209         if (rval)
5210           *rval = r;
5211         return;
5212       }
5213
5214       if (rval)
5215         *rval = 0;
5216
5217       try {
5218         decode();
5219       } catch (buffer::error&) {
5220         if (rval)
5221           *rval = -EIO;
5222       }
5223     }
5224   private:
5225     void decode() {
5226       scrub_ls_result_t result;
5227       auto p = bl.begin();
5228       result.decode(p);
5229       *interval = result.interval;
5230       if (objects) {
5231         do_decode(*objects, result.vals);
5232       } else {
5233         do_decode(*snapsets, result.vals);
5234       }
5235     }
5236   };
5237
5238   template <typename T>
5239   void do_scrub_ls(::ObjectOperation *op,
5240                    const scrub_ls_arg_t& arg,
5241                    std::vector<T> *items,
5242                    uint32_t *interval,
5243                    int *rval)
5244   {
5245     OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
5246     op->flags |= CEPH_OSD_FLAG_PGOP;
5247     assert(interval);
5248     arg.encode(osd_op.indata);
5249     unsigned p = op->ops.size() - 1;
5250     auto *h = new C_ObjectOperation_scrub_ls{interval, items, rval};
5251     op->out_handler[p] = h;
5252     op->out_bl[p] = &h->bl;
5253     op->out_rval[p] = rval;
5254   }
5255 }
5256
5257 void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5258                                  uint64_t max_to_get,
5259                                  std::vector<librados::inconsistent_obj_t> *objects,
5260                                  uint32_t *interval,
5261                                  int *rval)
5262 {
5263   scrub_ls_arg_t arg = {*interval, 0, start_after, max_to_get};
5264   do_scrub_ls(this, arg, objects, interval, rval);
5265 }
5266
5267 void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5268                                  uint64_t max_to_get,
5269                                  std::vector<librados::inconsistent_snapset_t> *snapsets,
5270                                  uint32_t *interval,
5271                                  int *rval)
5272 {
5273   scrub_ls_arg_t arg = {*interval, 1, start_after, max_to_get};
5274   do_scrub_ls(this, arg, snapsets, interval, rval);
5275 }