Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / librados / IoCtxImpl.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include <limits.h>
16
17 #include "IoCtxImpl.h"
18
19 #include "librados/AioCompletionImpl.h"
20 #include "librados/PoolAsyncCompletionImpl.h"
21 #include "librados/RadosClient.h"
22 #include "include/assert.h"
23 #include "common/valgrind.h"
24 #include "common/EventTrace.h"
25
26 #define dout_subsys ceph_subsys_rados
27 #undef dout_prefix
28 #define dout_prefix *_dout << "librados: "
29
30 namespace librados {
31 namespace {
32
33 struct C_notify_Finish : public Context {
34   CephContext *cct;
35   Context *ctx;
36   Objecter *objecter;
37   Objecter::LingerOp *linger_op;
38   bufferlist reply_bl;
39   bufferlist *preply_bl;
40   char **preply_buf;
41   size_t *preply_buf_len;
42
43   C_notify_Finish(CephContext *_cct, Context *_ctx, Objecter *_objecter,
44                   Objecter::LingerOp *_linger_op, bufferlist *_preply_bl,
45                   char **_preply_buf, size_t *_preply_buf_len)
46     : cct(_cct), ctx(_ctx), objecter(_objecter), linger_op(_linger_op),
47       preply_bl(_preply_bl), preply_buf(_preply_buf),
48       preply_buf_len(_preply_buf_len)
49   {
50     linger_op->on_notify_finish = this;
51     linger_op->notify_result_bl = &reply_bl;
52   }
53
54   void finish(int r) override
55   {
56     ldout(cct, 10) << __func__ << " completed notify (linger op "
57                    << linger_op << "), r = " << r << dendl;
58
59     // pass result back to user
60     // NOTE: we do this regardless of what error code we return
61     if (preply_buf) {
62       if (reply_bl.length()) {
63         *preply_buf = (char*)malloc(reply_bl.length());
64         memcpy(*preply_buf, reply_bl.c_str(), reply_bl.length());
65       } else {
66         *preply_buf = NULL;
67       }
68     }
69     if (preply_buf_len)
70       *preply_buf_len = reply_bl.length();
71     if (preply_bl)
72       preply_bl->claim(reply_bl);
73
74     ctx->complete(r);
75   }
76 };
77
78 struct C_aio_linger_cancel : public Context {
79   Objecter *objecter;
80   Objecter::LingerOp *linger_op;
81
82   C_aio_linger_cancel(Objecter *_objecter, Objecter::LingerOp *_linger_op)
83     : objecter(_objecter), linger_op(_linger_op)
84   {
85   }
86
87   void finish(int r) override
88   {
89     objecter->linger_cancel(linger_op);
90   }
91 };
92
93 struct C_aio_linger_Complete : public Context {
94   AioCompletionImpl *c;
95   Objecter::LingerOp *linger_op;
96   bool cancel;
97
98   C_aio_linger_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op, bool _cancel)
99     : c(_c), linger_op(_linger_op), cancel(_cancel)
100   {
101     c->get();
102   }
103
104   void finish(int r) override {
105     if (cancel || r < 0)
106       c->io->client->finisher.queue(new C_aio_linger_cancel(c->io->objecter,
107                                                             linger_op));
108
109     c->lock.Lock();
110     c->rval = r;
111     c->complete = true;
112     c->cond.Signal();
113
114     if (c->callback_complete ||
115         c->callback_safe) {
116       c->io->client->finisher.queue(new C_AioComplete(c));
117     }
118     c->put_unlock();
119   }
120 };
121
122 struct C_aio_notify_Complete : public C_aio_linger_Complete {
123   Mutex lock;
124   bool acked = false;
125   bool finished = false;
126   int ret_val = 0;
127
128   C_aio_notify_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op)
129     : C_aio_linger_Complete(_c, _linger_op, false),
130       lock("C_aio_notify_Complete::lock") {
131   }
132
133   void handle_ack(int r) {
134     // invoked by C_aio_notify_Ack
135     lock.Lock();
136     acked = true;
137     complete_unlock(r);
138   }
139
140   void complete(int r) override {
141     // invoked by C_notify_Finish (or C_aio_notify_Ack on failure)
142     lock.Lock();
143     finished = true;
144     complete_unlock(r);
145   }
146
147   void complete_unlock(int r) {
148     if (ret_val == 0 && r < 0) {
149       ret_val = r;
150     }
151
152     if (acked && finished) {
153       lock.Unlock();
154       cancel = true;
155       C_aio_linger_Complete::complete(ret_val);
156     } else {
157       lock.Unlock();
158     }
159   }
160 };
161
162 struct C_aio_notify_Ack : public Context {
163   CephContext *cct;
164   C_notify_Finish *onfinish;
165   C_aio_notify_Complete *oncomplete;
166
167   C_aio_notify_Ack(CephContext *_cct, C_notify_Finish *_onfinish,
168                    C_aio_notify_Complete *_oncomplete)
169     : cct(_cct), onfinish(_onfinish), oncomplete(_oncomplete)
170   {
171   }
172
173   void finish(int r) override
174   {
175     ldout(cct, 10) << __func__ << " linger op " << oncomplete->linger_op << " "
176                    << "acked (" << r << ")" << dendl;
177     oncomplete->handle_ack(r);
178     if (r < 0) {
179       // on failure, we won't expect to see a notify_finish callback
180       onfinish->complete(r);
181     }
182   }
183 };
184
185 struct C_aio_selfmanaged_snap_op_Complete : public Context {
186   librados::RadosClient *client;
187   librados::AioCompletionImpl *c;
188
189   C_aio_selfmanaged_snap_op_Complete(librados::RadosClient *client,
190                                      librados::AioCompletionImpl *c)
191     : client(client), c(c) {
192     c->get();
193   }
194
195   void finish(int r) override {
196     c->lock.Lock();
197     c->rval = r;
198     c->complete = true;
199     c->cond.Signal();
200
201     if (c->callback_complete || c->callback_safe) {
202       client->finisher.queue(new librados::C_AioComplete(c));
203     }
204     c->put_unlock();
205   }
206 };
207
208 struct C_aio_selfmanaged_snap_create_Complete : public C_aio_selfmanaged_snap_op_Complete {
209   snapid_t snapid;
210   uint64_t *dest_snapid;
211
212   C_aio_selfmanaged_snap_create_Complete(librados::RadosClient *client,
213                                          librados::AioCompletionImpl *c,
214                                          uint64_t *dest_snapid)
215     : C_aio_selfmanaged_snap_op_Complete(client, c),
216       dest_snapid(dest_snapid) {
217   }
218
219   void finish(int r) override {
220     if (r >= 0) {
221       *dest_snapid = snapid;
222     }
223     C_aio_selfmanaged_snap_op_Complete::finish(r);
224   }
225 };
226
227 } // anonymous namespace
228 } // namespace librados
229
230 librados::IoCtxImpl::IoCtxImpl() :
231   ref_cnt(0), client(NULL), poolid(0), assert_ver(0), last_objver(0),
232   notify_timeout(30), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"),
233   aio_write_seq(0), objecter(NULL)
234 {
235 }
236
237 librados::IoCtxImpl::IoCtxImpl(RadosClient *c, Objecter *objecter,
238                                int64_t poolid, snapid_t s)
239   : ref_cnt(0), client(c), poolid(poolid), snap_seq(s),
240     assert_ver(0), last_objver(0),
241     notify_timeout(c->cct->_conf->client_notify_timeout),
242     oloc(poolid), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"),
243     aio_write_seq(0), objecter(objecter)
244 {
245 }
246
247 void librados::IoCtxImpl::set_snap_read(snapid_t s)
248 {
249   if (!s)
250     s = CEPH_NOSNAP;
251   ldout(client->cct, 10) << "set snap read " << snap_seq << " -> " << s << dendl;
252   snap_seq = s;
253 }
254
255 int librados::IoCtxImpl::set_snap_write_context(snapid_t seq, vector<snapid_t>& snaps)
256 {
257   ::SnapContext n;
258   ldout(client->cct, 10) << "set snap write context: seq = " << seq
259                          << " and snaps = " << snaps << dendl;
260   n.seq = seq;
261   n.snaps = snaps;
262   if (!n.is_valid())
263     return -EINVAL;
264   snapc = n;
265   return 0;
266 }
267
268 int librados::IoCtxImpl::get_object_hash_position(
269     const std::string& oid, uint32_t *hash_position)
270 {
271   int64_t r = objecter->get_object_hash_position(poolid, oid, oloc.nspace);
272   if (r < 0)
273     return r;
274   *hash_position = (uint32_t)r;
275   return 0;
276 }
277
278 int librados::IoCtxImpl::get_object_pg_hash_position(
279     const std::string& oid, uint32_t *pg_hash_position)
280 {
281   int64_t r = objecter->get_object_pg_hash_position(poolid, oid, oloc.nspace);
282   if (r < 0)
283     return r;
284   *pg_hash_position = (uint32_t)r;
285   return 0;
286 }
287
288 void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl *c)
289 {
290   get();
291   aio_write_list_lock.Lock();
292   assert(c->io == this);
293   c->aio_write_seq = ++aio_write_seq;
294   ldout(client->cct, 20) << "queue_aio_write " << this << " completion " << c
295                          << " write_seq " << aio_write_seq << dendl;
296   aio_write_list.push_back(&c->aio_write_list_item);
297   aio_write_list_lock.Unlock();
298 }
299
300 void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c)
301 {
302   ldout(client->cct, 20) << "complete_aio_write " << c << dendl;
303   aio_write_list_lock.Lock();
304   assert(c->io == this);
305   c->aio_write_list_item.remove_myself();
306
307   map<ceph_tid_t, std::list<AioCompletionImpl*> >::iterator waiters = aio_write_waiters.begin();
308   while (waiters != aio_write_waiters.end()) {
309     if (!aio_write_list.empty() &&
310         aio_write_list.front()->aio_write_seq <= waiters->first) {
311       ldout(client->cct, 20) << " next outstanding write is " << aio_write_list.front()->aio_write_seq
312                              << " <= waiter " << waiters->first
313                              << ", stopping" << dendl;
314       break;
315     }
316     ldout(client->cct, 20) << " waking waiters on seq " << waiters->first << dendl;
317     for (std::list<AioCompletionImpl*>::iterator it = waiters->second.begin();
318          it != waiters->second.end(); ++it) {
319       client->finisher.queue(new C_AioCompleteAndSafe(*it));
320       (*it)->put();
321     }
322     aio_write_waiters.erase(waiters++);
323   }
324
325   aio_write_cond.Signal();
326   aio_write_list_lock.Unlock();
327   put();
328 }
329
330 void librados::IoCtxImpl::flush_aio_writes_async(AioCompletionImpl *c)
331 {
332   ldout(client->cct, 20) << "flush_aio_writes_async " << this
333                          << " completion " << c << dendl;
334   Mutex::Locker l(aio_write_list_lock);
335   ceph_tid_t seq = aio_write_seq;
336   if (aio_write_list.empty()) {
337     ldout(client->cct, 20) << "flush_aio_writes_async no writes. (tid "
338                            << seq << ")" << dendl;
339     client->finisher.queue(new C_AioCompleteAndSafe(c));
340   } else {
341     ldout(client->cct, 20) << "flush_aio_writes_async " << aio_write_list.size()
342                            << " writes in flight; waiting on tid " << seq << dendl;
343     c->get();
344     aio_write_waiters[seq].push_back(c);
345   }
346 }
347
348 void librados::IoCtxImpl::flush_aio_writes()
349 {
350   ldout(client->cct, 20) << "flush_aio_writes" << dendl;
351   aio_write_list_lock.Lock();
352   ceph_tid_t seq = aio_write_seq;
353   while (!aio_write_list.empty() &&
354          aio_write_list.front()->aio_write_seq <= seq)
355     aio_write_cond.Wait(aio_write_list_lock);
356   aio_write_list_lock.Unlock();
357 }
358
359 string librados::IoCtxImpl::get_cached_pool_name()
360 {
361   std::string pn;
362   client->pool_get_name(get_id(), &pn);
363   return pn;
364 }
365
366 // SNAPS
367
368 int librados::IoCtxImpl::snap_create(const char *snapName)
369 {
370   int reply;
371   string sName(snapName);
372
373   Mutex mylock ("IoCtxImpl::snap_create::mylock");
374   Cond cond;
375   bool done;
376   Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
377   reply = objecter->create_pool_snap(poolid, sName, onfinish);
378
379   if (reply < 0) {
380     delete onfinish;
381   } else {
382     mylock.Lock();
383     while (!done)
384       cond.Wait(mylock);
385     mylock.Unlock();
386   }
387   return reply;
388 }
389
390 int librados::IoCtxImpl::selfmanaged_snap_create(uint64_t *psnapid)
391 {
392   int reply;
393
394   Mutex mylock("IoCtxImpl::selfmanaged_snap_create::mylock");
395   Cond cond;
396   bool done;
397   Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
398   snapid_t snapid;
399   reply = objecter->allocate_selfmanaged_snap(poolid, &snapid, onfinish);
400
401   if (reply < 0) {
402     delete onfinish;
403   } else {
404     mylock.Lock();
405     while (!done)
406       cond.Wait(mylock);
407     mylock.Unlock();
408     if (reply == 0)
409       *psnapid = snapid;
410   }
411   return reply;
412 }
413
414 void librados::IoCtxImpl::aio_selfmanaged_snap_create(uint64_t *snapid,
415                                                       AioCompletionImpl *c)
416 {
417   C_aio_selfmanaged_snap_create_Complete *onfinish =
418     new C_aio_selfmanaged_snap_create_Complete(client, c, snapid);
419   int r = objecter->allocate_selfmanaged_snap(poolid, &onfinish->snapid,
420                                               onfinish);
421   if (r < 0) {
422     onfinish->complete(r);
423   }
424 }
425
426 int librados::IoCtxImpl::snap_remove(const char *snapName)
427 {
428   int reply;
429   string sName(snapName);
430
431   Mutex mylock ("IoCtxImpl::snap_remove::mylock");
432   Cond cond;
433   bool done;
434   Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
435   reply = objecter->delete_pool_snap(poolid, sName, onfinish);
436
437   if (reply < 0) {
438     delete onfinish; 
439   } else {
440     mylock.Lock();
441     while(!done)
442       cond.Wait(mylock);
443     mylock.Unlock();
444   }
445   return reply;
446 }
447
448 int librados::IoCtxImpl::selfmanaged_snap_rollback_object(const object_t& oid,
449                                                           ::SnapContext& snapc,
450                                                           uint64_t snapid)
451 {
452   int reply;
453
454   Mutex mylock("IoCtxImpl::snap_rollback::mylock");
455   Cond cond;
456   bool done;
457   Context *onack = new C_SafeCond(&mylock, &cond, &done, &reply);
458
459   ::ObjectOperation op;
460   prepare_assert_ops(&op);
461   op.rollback(snapid);
462   objecter->mutate(oid, oloc,
463                    op, snapc, ceph::real_clock::now(), 0,
464                    onack, NULL);
465
466   mylock.Lock();
467   while (!done) cond.Wait(mylock);
468   mylock.Unlock();
469   return reply;
470 }
471
472 int librados::IoCtxImpl::rollback(const object_t& oid, const char *snapName)
473 {
474   snapid_t snap;
475
476   int r = objecter->pool_snap_by_name(poolid, snapName, &snap);
477   if (r < 0) {
478     return r;
479   }
480
481   return selfmanaged_snap_rollback_object(oid, snapc, snap);
482 }
483
484 int librados::IoCtxImpl::selfmanaged_snap_remove(uint64_t snapid)
485 {
486   int reply;
487
488   Mutex mylock("IoCtxImpl::selfmanaged_snap_remove::mylock");
489   Cond cond;
490   bool done;
491   objecter->delete_selfmanaged_snap(poolid, snapid_t(snapid),
492                                     new C_SafeCond(&mylock, &cond, &done, &reply));
493
494   mylock.Lock();
495   while (!done) cond.Wait(mylock);
496   mylock.Unlock();
497   return (int)reply;
498 }
499
500 void librados::IoCtxImpl::aio_selfmanaged_snap_remove(uint64_t snapid,
501                                                       AioCompletionImpl *c)
502 {
503   Context *onfinish = new C_aio_selfmanaged_snap_op_Complete(client, c);
504   objecter->delete_selfmanaged_snap(poolid, snapid, onfinish);
505 }
506
507 int librados::IoCtxImpl::pool_change_auid(unsigned long long auid)
508 {
509   int reply;
510
511   Mutex mylock("IoCtxImpl::pool_change_auid::mylock");
512   Cond cond;
513   bool done;
514   objecter->change_pool_auid(poolid,
515                              new C_SafeCond(&mylock, &cond, &done, &reply),
516                              auid);
517
518   mylock.Lock();
519   while (!done) cond.Wait(mylock);
520   mylock.Unlock();
521   return reply;
522 }
523
524 int librados::IoCtxImpl::pool_change_auid_async(unsigned long long auid,
525                                                   PoolAsyncCompletionImpl *c)
526 {
527   objecter->change_pool_auid(poolid,
528                              new C_PoolAsync_Safe(c),
529                              auid);
530   return 0;
531 }
532
533 int librados::IoCtxImpl::snap_list(vector<uint64_t> *snaps)
534 {
535   return objecter->pool_snap_list(poolid, snaps);
536 }
537
538 int librados::IoCtxImpl::snap_lookup(const char *name, uint64_t *snapid)
539 {
540   return objecter->pool_snap_by_name(poolid, name, (snapid_t *)snapid);
541 }
542
543 int librados::IoCtxImpl::snap_get_name(uint64_t snapid, std::string *s)
544 {
545   pool_snap_info_t info;
546   int ret = objecter->pool_snap_get_info(poolid, snapid, &info);
547   if (ret < 0) {
548     return ret;
549   }
550   *s = info.name.c_str();
551   return 0;
552 }
553
554 int librados::IoCtxImpl::snap_get_stamp(uint64_t snapid, time_t *t)
555 {
556   pool_snap_info_t info;
557   int ret = objecter->pool_snap_get_info(poolid, snapid, &info);
558   if (ret < 0) {
559     return ret;
560   }
561   *t = info.stamp.sec();
562   return 0;
563 }
564
565
566 // IO
567
568 int librados::IoCtxImpl::nlist(Objecter::NListContext *context, int max_entries)
569 {
570   Cond cond;
571   bool done;
572   int r = 0;
573   Mutex mylock("IoCtxImpl::nlist::mylock");
574
575   if (context->at_end())
576     return 0;
577
578   context->max_entries = max_entries;
579   context->nspace = oloc.nspace;
580
581   objecter->list_nobjects(context, new C_SafeCond(&mylock, &cond, &done, &r));
582
583   mylock.Lock();
584   while(!done)
585     cond.Wait(mylock);
586   mylock.Unlock();
587
588   return r;
589 }
590
591 uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext *context,
592                                         uint32_t pos)
593 {
594   context->list.clear();
595   return objecter->list_nobjects_seek(context, pos);
596 }
597
598 uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext *context,
599                                     const rados_object_list_cursor& cursor)
600 {
601   context->list.clear();
602   return objecter->list_nobjects_seek(context, *(const hobject_t *)cursor);
603 }
604
605 rados_object_list_cursor librados::IoCtxImpl::nlist_get_cursor(Objecter::NListContext *context)
606 {
607   hobject_t *c = new hobject_t;
608
609   objecter->list_nobjects_get_cursor(context, c);
610   return (rados_object_list_cursor)c;
611 }
612
613 int librados::IoCtxImpl::create(const object_t& oid, bool exclusive)
614 {
615   ::ObjectOperation op;
616   prepare_assert_ops(&op);
617   op.create(exclusive);
618   return operate(oid, &op, NULL);
619 }
620
621 /*
622  * add any version assert operations that are appropriate given the
623  * stat in the IoCtx, either the target version assert or any src
624  * object asserts.  these affect a single ioctx operation, so clear
625  * the ioctx state when we're doing.
626  *
627  * return a pointer to the ObjectOperation if we added any events;
628  * this is convenient for passing the extra_ops argument into Objecter
629  * methods.
630  */
631 ::ObjectOperation *librados::IoCtxImpl::prepare_assert_ops(::ObjectOperation *op)
632 {
633   ::ObjectOperation *pop = NULL;
634   if (assert_ver) {
635     op->assert_version(assert_ver);
636     assert_ver = 0;
637     pop = op;
638   }
639   return pop;
640 }
641
642 int librados::IoCtxImpl::write(const object_t& oid, bufferlist& bl,
643                                size_t len, uint64_t off)
644 {
645   if (len > UINT_MAX/2)
646     return -E2BIG;
647   ::ObjectOperation op;
648   prepare_assert_ops(&op);
649   bufferlist mybl;
650   mybl.substr_of(bl, 0, len);
651   op.write(off, mybl);
652   return operate(oid, &op, NULL);
653 }
654
655 int librados::IoCtxImpl::append(const object_t& oid, bufferlist& bl, size_t len)
656 {
657   if (len > UINT_MAX/2)
658     return -E2BIG;
659   ::ObjectOperation op;
660   prepare_assert_ops(&op);
661   bufferlist mybl;
662   mybl.substr_of(bl, 0, len);
663   op.append(mybl);
664   return operate(oid, &op, NULL);
665 }
666
667 int librados::IoCtxImpl::write_full(const object_t& oid, bufferlist& bl)
668 {
669   if (bl.length() > UINT_MAX/2)
670     return -E2BIG;
671   ::ObjectOperation op;
672   prepare_assert_ops(&op);
673   op.write_full(bl);
674   return operate(oid, &op, NULL);
675 }
676
677 int librados::IoCtxImpl::writesame(const object_t& oid, bufferlist& bl,
678                                    size_t write_len, uint64_t off)
679 {
680   if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2))
681     return -E2BIG;
682   if ((bl.length() == 0) || (write_len % bl.length()))
683     return -EINVAL;
684   ::ObjectOperation op;
685   prepare_assert_ops(&op);
686   bufferlist mybl;
687   mybl.substr_of(bl, 0, bl.length());
688   op.writesame(off, write_len, mybl);
689   return operate(oid, &op, NULL);
690 }
691
692 int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o,
693                                  ceph::real_time *pmtime, int flags)
694 {
695   ceph::real_time ut = (pmtime ? *pmtime :
696     ceph::real_clock::now());
697
698   /* can't write to a snapshot */
699   if (snap_seq != CEPH_NOSNAP)
700     return -EROFS;
701
702   if (!o->size())
703     return 0;
704
705   Mutex mylock("IoCtxImpl::operate::mylock");
706   Cond cond;
707   bool done;
708   int r;
709   version_t ver;
710
711   Context *oncommit = new C_SafeCond(&mylock, &cond, &done, &r);
712
713   int op = o->ops[0].op.op;
714   ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid
715                          << " nspace=" << oloc.nspace << dendl;
716   Objecter::Op *objecter_op = objecter->prepare_mutate_op(oid, oloc,
717                                                           *o, snapc, ut, flags,
718                                                           oncommit, &ver);
719   objecter->op_submit(objecter_op);
720
721   mylock.Lock();
722   while (!done)
723     cond.Wait(mylock);
724   mylock.Unlock();
725   ldout(client->cct, 10) << "Objecter returned from "
726         << ceph_osd_op_name(op) << " r=" << r << dendl;
727
728   set_sync_op_version(ver);
729
730   return r;
731 }
732
733 int librados::IoCtxImpl::operate_read(const object_t& oid,
734                                       ::ObjectOperation *o,
735                                       bufferlist *pbl,
736                                       int flags)
737 {
738   if (!o->size())
739     return 0;
740
741   Mutex mylock("IoCtxImpl::operate_read::mylock");
742   Cond cond;
743   bool done;
744   int r;
745   version_t ver;
746
747   Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
748
749   int op = o->ops[0].op.op;
750   ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid << " nspace=" << oloc.nspace << dendl;
751   Objecter::Op *objecter_op = objecter->prepare_read_op(oid, oloc,
752                                               *o, snap_seq, pbl, flags,
753                                               onack, &ver);
754   objecter->op_submit(objecter_op);
755
756   mylock.Lock();
757   while (!done)
758     cond.Wait(mylock);
759   mylock.Unlock();
760   ldout(client->cct, 10) << "Objecter returned from "
761         << ceph_osd_op_name(op) << " r=" << r << dendl;
762
763   set_sync_op_version(ver);
764
765   return r;
766 }
767
768 int librados::IoCtxImpl::aio_operate_read(const object_t &oid,
769                                           ::ObjectOperation *o,
770                                           AioCompletionImpl *c,
771                                           int flags,
772                                           bufferlist *pbl,
773                                           const blkin_trace_info *trace_info)
774 {
775   FUNCTRACE();
776   Context *oncomplete = new C_aio_Complete(c);
777
778 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
779   ((C_aio_Complete *) oncomplete)->oid = oid;
780 #endif
781   c->is_read = true;
782   c->io = this;
783
784   ZTracer::Trace trace;
785   if (trace_info) {
786     ZTracer::Trace parent_trace("", nullptr, trace_info);
787     trace.init("rados operate read", &objecter->trace_endpoint, &parent_trace);
788   }
789
790   trace.event("init root span");
791   Objecter::Op *objecter_op = objecter->prepare_read_op(oid, oloc,
792                  *o, snap_seq, pbl, flags,
793                  oncomplete, &c->objver, nullptr, 0, &trace);
794   objecter->op_submit(objecter_op, &c->tid);
795   trace.event("rados operate read submitted");
796
797   return 0;
798 }
799
800 int librados::IoCtxImpl::aio_operate(const object_t& oid,
801                                      ::ObjectOperation *o, AioCompletionImpl *c,
802                                      const SnapContext& snap_context, int flags,
803                                      const blkin_trace_info *trace_info)
804 {
805   FUNCTRACE();
806   OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN");
807   auto ut = ceph::real_clock::now();
808   /* can't write to a snapshot */
809   if (snap_seq != CEPH_NOSNAP)
810     return -EROFS;
811
812   Context *oncomplete = new C_aio_Complete(c);
813 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
814   ((C_aio_Complete *) oncomplete)->oid = oid;
815 #endif
816
817   c->io = this;
818   queue_aio_write(c);
819
820   ZTracer::Trace trace;
821   if (trace_info) {
822     ZTracer::Trace parent_trace("", nullptr, trace_info);
823     trace.init("rados operate", &objecter->trace_endpoint, &parent_trace);
824   }
825
826   trace.event("init root span");
827   Objecter::Op *op = objecter->prepare_mutate_op(
828     oid, oloc, *o, snap_context, ut, flags,
829     oncomplete, &c->objver, osd_reqid_t(), &trace);
830   objecter->op_submit(op, &c->tid);
831   trace.event("rados operate op submitted");
832
833   return 0;
834 }
835
836 int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
837                                   bufferlist *pbl, size_t len, uint64_t off,
838                                   uint64_t snapid, const blkin_trace_info *info)
839 {
840   FUNCTRACE();
841   if (len > (size_t) INT_MAX)
842     return -EDOM;
843
844   OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
845   Context *oncomplete = new C_aio_Complete(c);
846
847 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
848   ((C_aio_Complete *) oncomplete)->oid = oid;
849 #endif
850   c->is_read = true;
851   c->io = this;
852   c->blp = pbl;
853
854   ZTracer::Trace trace;
855   if (info)
856     trace.init("rados read", &objecter->trace_endpoint, info);
857
858   Objecter::Op *o = objecter->prepare_read_op(
859     oid, oloc,
860     off, len, snapid, pbl, 0,
861     oncomplete, &c->objver, nullptr, 0, &trace);
862   objecter->op_submit(o, &c->tid);
863   return 0;
864 }
865
866 int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
867                                   char *buf, size_t len, uint64_t off,
868                                   uint64_t snapid, const blkin_trace_info *info)
869 {
870   FUNCTRACE();
871   if (len > (size_t) INT_MAX)
872     return -EDOM;
873
874   OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
875   Context *oncomplete = new C_aio_Complete(c);
876
877 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
878   ((C_aio_Complete *) oncomplete)->oid = oid;
879 #endif
880   c->is_read = true;
881   c->io = this;
882   c->bl.clear();
883   c->bl.push_back(buffer::create_static(len, buf));
884   c->blp = &c->bl;
885   c->out_buf = buf;
886
887   ZTracer::Trace trace;
888   if (info)
889     trace.init("rados read", &objecter->trace_endpoint, info);
890
891   Objecter::Op *o = objecter->prepare_read_op(
892     oid, oloc,
893     off, len, snapid, &c->bl, 0,
894     oncomplete, &c->objver, nullptr, 0, &trace);
895   objecter->op_submit(o, &c->tid);
896   return 0;
897 }
898
899 class C_ObjectOperation : public Context {
900 public:
901   ::ObjectOperation m_ops;
902   explicit C_ObjectOperation(Context *c) : m_ctx(c) {}
903   void finish(int r) override {
904     m_ctx->complete(r);
905   }
906 private:
907   Context *m_ctx;
908 };
909
910 int librados::IoCtxImpl::aio_sparse_read(const object_t oid,
911                                          AioCompletionImpl *c,
912                                          std::map<uint64_t,uint64_t> *m,
913                                          bufferlist *data_bl, size_t len,
914                                          uint64_t off, uint64_t snapid)
915 {
916   FUNCTRACE();
917   if (len > (size_t) INT_MAX)
918     return -EDOM;
919
920   Context *nested = new C_aio_Complete(c);
921   C_ObjectOperation *onack = new C_ObjectOperation(nested);
922
923 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
924   ((C_aio_Complete *) nested)->oid = oid;
925 #endif
926   c->is_read = true;
927   c->io = this;
928
929   onack->m_ops.sparse_read(off, len, m, data_bl, NULL);
930
931   Objecter::Op *o = objecter->prepare_read_op(
932     oid, oloc,
933     onack->m_ops, snapid, NULL, 0,
934     onack, &c->objver);
935   objecter->op_submit(o, &c->tid);
936   return 0;
937 }
938
939 int librados::IoCtxImpl::aio_cmpext(const object_t& oid,
940                                     AioCompletionImpl *c,
941                                     uint64_t off,
942                                     bufferlist& cmp_bl)
943 {
944   if (cmp_bl.length() > UINT_MAX/2)
945     return -E2BIG;
946
947   Context *onack = new C_aio_Complete(c);
948
949   c->is_read = true;
950   c->io = this;
951
952   Objecter::Op *o = objecter->prepare_cmpext_op(
953     oid, oloc, off, cmp_bl, snap_seq, 0,
954     onack, &c->objver);
955   objecter->op_submit(o, &c->tid);
956
957   return 0;
958 }
959
960 /* use m_ops.cmpext() + prepare_read_op() for non-bufferlist C API */
961 int librados::IoCtxImpl::aio_cmpext(const object_t& oid,
962                                     AioCompletionImpl *c,
963                                     const char *cmp_buf,
964                                     size_t cmp_len,
965                                     uint64_t off)
966 {
967   if (cmp_len > UINT_MAX/2)
968     return -E2BIG;
969
970   bufferlist cmp_bl;
971   cmp_bl.append(cmp_buf, cmp_len);
972
973   Context *nested = new C_aio_Complete(c);
974   C_ObjectOperation *onack = new C_ObjectOperation(nested);
975
976   c->is_read = true;
977   c->io = this;
978
979   onack->m_ops.cmpext(off, cmp_len, cmp_buf, NULL);
980
981   Objecter::Op *o = objecter->prepare_read_op(
982     oid, oloc, onack->m_ops, snap_seq, NULL, 0, onack, &c->objver);
983   objecter->op_submit(o, &c->tid);
984   return 0;
985 }
986
987 int librados::IoCtxImpl::aio_write(const object_t &oid, AioCompletionImpl *c,
988                                    const bufferlist& bl, size_t len,
989                                    uint64_t off, const blkin_trace_info *info)
990 {
991   FUNCTRACE();
992   auto ut = ceph::real_clock::now();
993   ldout(client->cct, 20) << "aio_write " << oid << " " << off << "~" << len << " snapc=" << snapc << " snap_seq=" << snap_seq << dendl;
994   OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN");
995
996   if (len > UINT_MAX/2)
997     return -E2BIG;
998   /* can't write to a snapshot */
999   if (snap_seq != CEPH_NOSNAP)
1000     return -EROFS;
1001
1002   Context *oncomplete = new C_aio_Complete(c);
1003
1004 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1005   ((C_aio_Complete *) oncomplete)->oid = oid;
1006 #endif
1007   ZTracer::Trace trace;
1008   if (info)
1009     trace.init("rados write", &objecter->trace_endpoint, info);
1010
1011   c->io = this;
1012   queue_aio_write(c);
1013
1014   Objecter::Op *o = objecter->prepare_write_op(
1015     oid, oloc,
1016     off, len, snapc, bl, ut, 0,
1017     oncomplete, &c->objver, nullptr, 0, &trace);
1018   objecter->op_submit(o, &c->tid);
1019
1020   return 0;
1021 }
1022
1023 int librados::IoCtxImpl::aio_append(const object_t &oid, AioCompletionImpl *c,
1024                                     const bufferlist& bl, size_t len)
1025 {
1026   FUNCTRACE();
1027   auto ut = ceph::real_clock::now();
1028
1029   if (len > UINT_MAX/2)
1030     return -E2BIG;
1031   /* can't write to a snapshot */
1032   if (snap_seq != CEPH_NOSNAP)
1033     return -EROFS;
1034
1035   Context *oncomplete = new C_aio_Complete(c);
1036 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1037   ((C_aio_Complete *) oncomplete)->oid = oid;
1038 #endif
1039
1040   c->io = this;
1041   queue_aio_write(c);
1042
1043   Objecter::Op *o = objecter->prepare_append_op(
1044     oid, oloc,
1045     len, snapc, bl, ut, 0,
1046     oncomplete, &c->objver);
1047   objecter->op_submit(o, &c->tid);
1048
1049   return 0;
1050 }
1051
1052 int librados::IoCtxImpl::aio_write_full(const object_t &oid,
1053                                         AioCompletionImpl *c,
1054                                         const bufferlist& bl)
1055 {
1056   FUNCTRACE();
1057   auto ut = ceph::real_clock::now();
1058
1059   if (bl.length() > UINT_MAX/2)
1060     return -E2BIG;
1061   /* can't write to a snapshot */
1062   if (snap_seq != CEPH_NOSNAP)
1063     return -EROFS;
1064
1065   Context *oncomplete = new C_aio_Complete(c);
1066 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1067   ((C_aio_Complete *) oncomplete)->oid = oid;
1068 #endif
1069
1070   c->io = this;
1071   queue_aio_write(c);
1072
1073   Objecter::Op *o = objecter->prepare_write_full_op(
1074     oid, oloc,
1075     snapc, bl, ut, 0,
1076     oncomplete, &c->objver);
1077   objecter->op_submit(o, &c->tid);
1078
1079   return 0;
1080 }
1081
1082 int librados::IoCtxImpl::aio_writesame(const object_t &oid,
1083                                        AioCompletionImpl *c,
1084                                        const bufferlist& bl,
1085                                        size_t write_len,
1086                                        uint64_t off)
1087 {
1088   FUNCTRACE();
1089   auto ut = ceph::real_clock::now();
1090
1091   if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2))
1092     return -E2BIG;
1093   if ((bl.length() == 0) || (write_len % bl.length()))
1094     return -EINVAL;
1095   /* can't write to a snapshot */
1096   if (snap_seq != CEPH_NOSNAP)
1097     return -EROFS;
1098
1099   Context *oncomplete = new C_aio_Complete(c);
1100
1101 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1102   ((C_aio_Complete *) oncomplete)->oid = oid;
1103 #endif
1104   c->io = this;
1105   queue_aio_write(c);
1106
1107   Objecter::Op *o = objecter->prepare_writesame_op(
1108     oid, oloc,
1109     write_len, off,
1110     snapc, bl, ut, 0,
1111     oncomplete, &c->objver);
1112   objecter->op_submit(o, &c->tid);
1113
1114   return 0;
1115 }
1116
1117 int librados::IoCtxImpl::aio_remove(const object_t &oid, AioCompletionImpl *c, int flags)
1118 {
1119   FUNCTRACE();
1120   auto ut = ceph::real_clock::now();
1121
1122   /* can't write to a snapshot */
1123   if (snap_seq != CEPH_NOSNAP)
1124     return -EROFS;
1125
1126   Context *oncomplete = new C_aio_Complete(c);
1127
1128 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1129   ((C_aio_Complete *) oncomplete)->oid = oid;
1130 #endif
1131   c->io = this;
1132   queue_aio_write(c);
1133
1134   Objecter::Op *o = objecter->prepare_remove_op(
1135     oid, oloc,
1136     snapc, ut, flags,
1137     oncomplete, &c->objver);
1138   objecter->op_submit(o, &c->tid);
1139
1140   return 0;
1141 }
1142
1143
1144 int librados::IoCtxImpl::aio_stat(const object_t& oid, AioCompletionImpl *c,
1145                                   uint64_t *psize, time_t *pmtime)
1146 {
1147   C_aio_stat_Ack *onack = new C_aio_stat_Ack(c, pmtime);
1148   c->is_read = true;
1149   c->io = this;
1150   Objecter::Op *o = objecter->prepare_stat_op(
1151     oid, oloc,
1152     snap_seq, psize, &onack->mtime, 0,
1153     onack, &c->objver);
1154   objecter->op_submit(o, &c->tid);
1155   return 0;
1156 }
1157
1158 int librados::IoCtxImpl::aio_stat2(const object_t& oid, AioCompletionImpl *c,
1159                                   uint64_t *psize, struct timespec *pts)
1160 {
1161   C_aio_stat2_Ack *onack = new C_aio_stat2_Ack(c, pts);
1162   c->is_read = true;
1163   c->io = this;
1164   Objecter::Op *o = objecter->prepare_stat_op(
1165     oid, oloc,
1166     snap_seq, psize, &onack->mtime, 0,
1167     onack, &c->objver);
1168   objecter->op_submit(o, &c->tid);
1169   return 0;
1170 }
1171
1172 int librados::IoCtxImpl::aio_getxattr(const object_t& oid, AioCompletionImpl *c,
1173                                       const char *name, bufferlist& bl)
1174 {
1175   ::ObjectOperation rd;
1176   prepare_assert_ops(&rd);
1177   rd.getxattr(name, &bl, NULL);
1178   int r = aio_operate_read(oid, &rd, c, 0, &bl);
1179   return r;
1180 }
1181
1182 int librados::IoCtxImpl::aio_rmxattr(const object_t& oid, AioCompletionImpl *c,
1183                                      const char *name)
1184 {
1185   ::ObjectOperation op;
1186   prepare_assert_ops(&op);
1187   op.rmxattr(name);
1188   return aio_operate(oid, &op, c, snapc, 0);
1189 }
1190
1191 int librados::IoCtxImpl::aio_setxattr(const object_t& oid, AioCompletionImpl *c,
1192                                       const char *name, bufferlist& bl)
1193 {
1194   ::ObjectOperation op;
1195   prepare_assert_ops(&op);
1196   op.setxattr(name, bl);
1197   return aio_operate(oid, &op, c, snapc, 0);
1198 }
1199
1200 namespace {
1201 struct AioGetxattrsData {
1202   AioGetxattrsData(librados::AioCompletionImpl *c, map<string, bufferlist>* attrset,
1203                    librados::RadosClient *_client) :
1204     user_completion(c), user_attrset(attrset), client(_client) {}
1205   struct librados::C_AioCompleteAndSafe user_completion;
1206   map<string, bufferlist> result_attrset;
1207   map<std::string, bufferlist>* user_attrset;
1208   librados::RadosClient *client;
1209 };
1210 }
1211
1212 static void aio_getxattrs_complete(rados_completion_t c, void *arg) {
1213   AioGetxattrsData *cdata = reinterpret_cast<AioGetxattrsData*>(arg);
1214   int rc = rados_aio_get_return_value(c);
1215   cdata->user_attrset->clear();
1216   if (rc >= 0) {
1217     for (map<string,bufferlist>::iterator p = cdata->result_attrset.begin();
1218          p != cdata->result_attrset.end();
1219          ++p) {
1220       ldout(cdata->client->cct, 10) << "IoCtxImpl::getxattrs: xattr=" << p->first << dendl;
1221       (*cdata->user_attrset)[p->first] = p->second;
1222     }
1223   }
1224   cdata->user_completion.finish(rc);
1225   ((librados::AioCompletionImpl*)c)->put();
1226   delete cdata;
1227 }
1228
1229 int librados::IoCtxImpl::aio_getxattrs(const object_t& oid, AioCompletionImpl *c,
1230                                        map<std::string, bufferlist>& attrset)
1231 {
1232   AioGetxattrsData *cdata = new AioGetxattrsData(c, &attrset, client);
1233   ::ObjectOperation rd;
1234   prepare_assert_ops(&rd);
1235   rd.getxattrs(&cdata->result_attrset, NULL);
1236   librados::AioCompletionImpl *comp = new librados::AioCompletionImpl;
1237   comp->set_complete_callback(cdata, aio_getxattrs_complete);
1238   return aio_operate_read(oid, &rd, comp, 0, NULL);
1239 }
1240
1241 int librados::IoCtxImpl::aio_cancel(AioCompletionImpl *c)
1242 {
1243   return objecter->op_cancel(c->tid, -ECANCELED);
1244 }
1245
1246
1247 int librados::IoCtxImpl::hit_set_list(uint32_t hash, AioCompletionImpl *c,
1248                               std::list< std::pair<time_t, time_t> > *pls)
1249 {
1250   Context *oncomplete = new C_aio_Complete(c);
1251   c->is_read = true;
1252   c->io = this;
1253
1254   ::ObjectOperation rd;
1255   rd.hit_set_ls(pls, NULL);
1256   object_locator_t oloc(poolid);
1257   Objecter::Op *o = objecter->prepare_pg_read_op(
1258     hash, oloc, rd, NULL, 0, oncomplete, NULL, NULL);
1259   objecter->op_submit(o, &c->tid);
1260   return 0;
1261 }
1262
1263 int librados::IoCtxImpl::hit_set_get(uint32_t hash, AioCompletionImpl *c,
1264                                      time_t stamp,
1265                                      bufferlist *pbl)
1266 {
1267   Context *oncomplete = new C_aio_Complete(c);
1268   c->is_read = true;
1269   c->io = this;
1270
1271   ::ObjectOperation rd;
1272   rd.hit_set_get(ceph::real_clock::from_time_t(stamp), pbl, 0);
1273   object_locator_t oloc(poolid);
1274   Objecter::Op *o = objecter->prepare_pg_read_op(
1275     hash, oloc, rd, NULL, 0, oncomplete, NULL, NULL);
1276   objecter->op_submit(o, &c->tid);
1277   return 0;
1278 }
1279
1280 int librados::IoCtxImpl::remove(const object_t& oid)
1281 {
1282   ::ObjectOperation op;
1283   prepare_assert_ops(&op);
1284   op.remove();
1285   return operate(oid, &op, NULL);
1286 }
1287
1288 int librados::IoCtxImpl::remove(const object_t& oid, int flags)
1289 {
1290   ::ObjectOperation op;
1291   prepare_assert_ops(&op);
1292   op.remove();
1293   return operate(oid, &op, NULL, flags);
1294 }
1295
1296 int librados::IoCtxImpl::trunc(const object_t& oid, uint64_t size)
1297 {
1298   ::ObjectOperation op;
1299   prepare_assert_ops(&op);
1300   op.truncate(size);
1301   return operate(oid, &op, NULL);
1302 }
1303
1304 int librados::IoCtxImpl::get_inconsistent_objects(const pg_t& pg,
1305                                                   const librados::object_id_t& start_after,
1306                                                   uint64_t max_to_get,
1307                                                   AioCompletionImpl *c,
1308                                                   std::vector<inconsistent_obj_t>* objects,
1309                                                   uint32_t* interval)
1310 {
1311   Context *oncomplete = new C_aio_Complete(c);
1312   c->is_read = true;
1313   c->io = this;
1314
1315   ::ObjectOperation op;
1316   op.scrub_ls(start_after, max_to_get, objects, interval, nullptr);
1317   object_locator_t oloc{poolid, pg.ps()};
1318   Objecter::Op *o = objecter->prepare_pg_read_op(
1319     oloc.hash, oloc, op, nullptr, CEPH_OSD_FLAG_PGOP, oncomplete,
1320     nullptr, nullptr);
1321   objecter->op_submit(o, &c->tid);
1322   return 0;
1323 }
1324
1325 int librados::IoCtxImpl::get_inconsistent_snapsets(const pg_t& pg,
1326                                                    const librados::object_id_t& start_after,
1327                                                    uint64_t max_to_get,
1328                                                    AioCompletionImpl *c,
1329                                                    std::vector<inconsistent_snapset_t>* snapsets,
1330                                                    uint32_t* interval)
1331 {
1332   Context *oncomplete = new C_aio_Complete(c);
1333   c->is_read = true;
1334   c->io = this;
1335
1336   ::ObjectOperation op;
1337   op.scrub_ls(start_after, max_to_get, snapsets, interval, nullptr);
1338   object_locator_t oloc{poolid, pg.ps()};
1339   Objecter::Op *o = objecter->prepare_pg_read_op(
1340     oloc.hash, oloc, op, nullptr, CEPH_OSD_FLAG_PGOP, oncomplete,
1341     nullptr, nullptr);
1342   objecter->op_submit(o, &c->tid);
1343   return 0;
1344 }
1345
1346 int librados::IoCtxImpl::tmap_update(const object_t& oid, bufferlist& cmdbl)
1347 {
1348   ::ObjectOperation wr;
1349   prepare_assert_ops(&wr);
1350   wr.tmap_update(cmdbl);
1351   return operate(oid, &wr, NULL);
1352 }
1353
1354 int librados::IoCtxImpl::tmap_put(const object_t& oid, bufferlist& bl)
1355 {
1356   ::ObjectOperation wr;
1357   prepare_assert_ops(&wr);
1358   wr.tmap_put(bl);
1359   return operate(oid, &wr, NULL);
1360 }
1361
1362 int librados::IoCtxImpl::tmap_get(const object_t& oid, bufferlist& bl)
1363 {
1364   ::ObjectOperation rd;
1365   prepare_assert_ops(&rd);
1366   rd.tmap_get(&bl, NULL);
1367   return operate_read(oid, &rd, NULL);
1368 }
1369
1370 int librados::IoCtxImpl::tmap_to_omap(const object_t& oid, bool nullok)
1371 {
1372   ::ObjectOperation wr;
1373   prepare_assert_ops(&wr);
1374   wr.tmap_to_omap(nullok);
1375   return operate(oid, &wr, NULL);
1376 }
1377
1378 int librados::IoCtxImpl::exec(const object_t& oid,
1379                               const char *cls, const char *method,
1380                               bufferlist& inbl, bufferlist& outbl)
1381 {
1382   ::ObjectOperation rd;
1383   prepare_assert_ops(&rd);
1384   rd.call(cls, method, inbl);
1385   return operate_read(oid, &rd, &outbl);
1386 }
1387
1388 int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
1389                                   const char *cls, const char *method,
1390                                   bufferlist& inbl, bufferlist *outbl)
1391 {
1392   FUNCTRACE();
1393   Context *oncomplete = new C_aio_Complete(c);
1394
1395 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1396   ((C_aio_Complete *) oncomplete)->oid = oid;
1397 #endif
1398   c->is_read = true;
1399   c->io = this;
1400
1401   ::ObjectOperation rd;
1402   prepare_assert_ops(&rd);
1403   rd.call(cls, method, inbl);
1404   Objecter::Op *o = objecter->prepare_read_op(
1405     oid, oloc, rd, snap_seq, outbl, 0, oncomplete, &c->objver);
1406   objecter->op_submit(o, &c->tid);
1407   return 0;
1408 }
1409
1410 int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
1411                                   const char *cls, const char *method,
1412                                   bufferlist& inbl, char *buf, size_t out_len)
1413 {
1414   FUNCTRACE();
1415   Context *oncomplete = new C_aio_Complete(c);
1416
1417 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1418   ((C_aio_Complete *) oncomplete)->oid = oid;
1419 #endif
1420   c->is_read = true;
1421   c->io = this;
1422   c->bl.clear();
1423   c->bl.push_back(buffer::create_static(out_len, buf));
1424   c->blp = &c->bl;
1425   c->out_buf = buf;
1426
1427   ::ObjectOperation rd;
1428   prepare_assert_ops(&rd);
1429   rd.call(cls, method, inbl);
1430   Objecter::Op *o = objecter->prepare_read_op(
1431     oid, oloc, rd, snap_seq, &c->bl, 0, oncomplete, &c->objver);
1432   objecter->op_submit(o, &c->tid);
1433   return 0;
1434 }
1435
1436 int librados::IoCtxImpl::read(const object_t& oid,
1437                               bufferlist& bl, size_t len, uint64_t off)
1438 {
1439   if (len > (size_t) INT_MAX)
1440     return -EDOM;
1441   OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
1442
1443   ::ObjectOperation rd;
1444   prepare_assert_ops(&rd);
1445   rd.read(off, len, &bl, NULL, NULL);
1446   int r = operate_read(oid, &rd, &bl);
1447   if (r < 0)
1448     return r;
1449
1450   if (bl.length() < len) {
1451     ldout(client->cct, 10) << "Returned length " << bl.length()
1452              << " less than original length "<< len << dendl;
1453   }
1454
1455   return bl.length();
1456 }
1457
1458 int librados::IoCtxImpl::cmpext(const object_t& oid, uint64_t off,
1459                                 bufferlist& cmp_bl)
1460 {
1461   if (cmp_bl.length() > UINT_MAX/2)
1462     return -E2BIG;
1463
1464   ::ObjectOperation op;
1465   prepare_assert_ops(&op);
1466   op.cmpext(off, cmp_bl, NULL);
1467   return operate_read(oid, &op, NULL);
1468 }
1469
1470 int librados::IoCtxImpl::mapext(const object_t& oid,
1471                                 uint64_t off, size_t len,
1472                                 std::map<uint64_t,uint64_t>& m)
1473 {
1474   bufferlist bl;
1475
1476   Mutex mylock("IoCtxImpl::read::mylock");
1477   Cond cond;
1478   bool done;
1479   int r;
1480   Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
1481
1482   objecter->mapext(oid, oloc,
1483                    off, len, snap_seq, &bl, 0,
1484                    onack);
1485
1486   mylock.Lock();
1487   while (!done)
1488     cond.Wait(mylock);
1489   mylock.Unlock();
1490   ldout(client->cct, 10) << "Objecter returned from read r=" << r << dendl;
1491
1492   if (r < 0)
1493     return r;
1494
1495   bufferlist::iterator iter = bl.begin();
1496   ::decode(m, iter);
1497
1498   return m.size();
1499 }
1500
1501 int librados::IoCtxImpl::sparse_read(const object_t& oid,
1502                                      std::map<uint64_t,uint64_t>& m,
1503                                      bufferlist& data_bl, size_t len,
1504                                      uint64_t off)
1505 {
1506   if (len > (size_t) INT_MAX)
1507     return -EDOM;
1508
1509   ::ObjectOperation rd;
1510   prepare_assert_ops(&rd);
1511   rd.sparse_read(off, len, &m, &data_bl, NULL);
1512
1513   int r = operate_read(oid, &rd, NULL);
1514   if (r < 0)
1515     return r;
1516
1517   return m.size();
1518 }
1519
1520 int librados::IoCtxImpl::checksum(const object_t& oid, uint8_t type,
1521                                   const bufferlist &init_value, size_t len,
1522                                   uint64_t off, size_t chunk_size,
1523                                   bufferlist *pbl)
1524 {
1525   if (len > (size_t) INT_MAX) {
1526     return -EDOM;
1527   }
1528
1529   ::ObjectOperation rd;
1530   prepare_assert_ops(&rd);
1531   rd.checksum(type, init_value, off, len, chunk_size, pbl, nullptr, nullptr);
1532
1533   int r = operate_read(oid, &rd, nullptr);
1534   if (r < 0) {
1535     return r;
1536   }
1537
1538   return 0;
1539 }
1540
1541 int librados::IoCtxImpl::stat(const object_t& oid, uint64_t *psize, time_t *pmtime)
1542 {
1543   uint64_t size;
1544   real_time mtime;
1545
1546   if (!psize)
1547     psize = &size;
1548
1549   ::ObjectOperation rd;
1550   prepare_assert_ops(&rd);
1551   rd.stat(psize, &mtime, NULL);
1552   int r = operate_read(oid, &rd, NULL);
1553
1554   if (r >= 0 && pmtime) {
1555     *pmtime = real_clock::to_time_t(mtime);
1556   }
1557
1558   return r;
1559 }
1560
1561 int librados::IoCtxImpl::stat2(const object_t& oid, uint64_t *psize, struct timespec *pts)
1562 {
1563   uint64_t size;
1564   ceph::real_time mtime;
1565
1566   if (!psize)
1567     psize = &size;
1568
1569   ::ObjectOperation rd;
1570   prepare_assert_ops(&rd);
1571   rd.stat(psize, &mtime, NULL);
1572   int r = operate_read(oid, &rd, NULL);
1573   if (r < 0) {
1574     return r;
1575   }
1576
1577   if (pts) {
1578     *pts = ceph::real_clock::to_timespec(mtime);
1579   }
1580
1581   return 0;
1582 }
1583
1584 int librados::IoCtxImpl::getxattr(const object_t& oid,
1585                                     const char *name, bufferlist& bl)
1586 {
1587   ::ObjectOperation rd;
1588   prepare_assert_ops(&rd);
1589   rd.getxattr(name, &bl, NULL);
1590   int r = operate_read(oid, &rd, &bl);
1591   if (r < 0)
1592     return r;
1593
1594   return bl.length();
1595 }
1596
1597 int librados::IoCtxImpl::rmxattr(const object_t& oid, const char *name)
1598 {
1599   ::ObjectOperation op;
1600   prepare_assert_ops(&op);
1601   op.rmxattr(name);
1602   return operate(oid, &op, NULL);
1603 }
1604
1605 int librados::IoCtxImpl::setxattr(const object_t& oid,
1606                                     const char *name, bufferlist& bl)
1607 {
1608   ::ObjectOperation op;
1609   prepare_assert_ops(&op);
1610   op.setxattr(name, bl);
1611   return operate(oid, &op, NULL);
1612 }
1613
1614 int librados::IoCtxImpl::getxattrs(const object_t& oid,
1615                                      map<std::string, bufferlist>& attrset)
1616 {
1617   map<string, bufferlist> aset;
1618
1619   ::ObjectOperation rd;
1620   prepare_assert_ops(&rd);
1621   rd.getxattrs(&aset, NULL);
1622   int r = operate_read(oid, &rd, NULL);
1623
1624   attrset.clear();
1625   if (r >= 0) {
1626     for (map<string,bufferlist>::iterator p = aset.begin(); p != aset.end(); ++p) {
1627       ldout(client->cct, 10) << "IoCtxImpl::getxattrs: xattr=" << p->first << dendl;
1628       attrset[p->first.c_str()] = p->second;
1629     }
1630   }
1631
1632   return r;
1633 }
1634
1635 void librados::IoCtxImpl::set_sync_op_version(version_t ver)
1636 {
1637   ANNOTATE_BENIGN_RACE_SIZED(&last_objver, sizeof(last_objver),
1638                              "IoCtxImpl last_objver");
1639   last_objver = ver;
1640 }
1641
1642 struct WatchInfo : public Objecter::WatchContext {
1643   librados::IoCtxImpl *ioctx;
1644   object_t oid;
1645   librados::WatchCtx *ctx;
1646   librados::WatchCtx2 *ctx2;
1647   bool internal = false;
1648
1649   WatchInfo(librados::IoCtxImpl *io, object_t o,
1650             librados::WatchCtx *c, librados::WatchCtx2 *c2,
1651             bool inter)
1652     : ioctx(io), oid(o), ctx(c), ctx2(c2), internal(inter) {
1653     ioctx->get();
1654   }
1655   ~WatchInfo() override {
1656     ioctx->put();
1657     if (internal) {
1658       delete ctx;
1659       delete ctx2;
1660     }
1661   }
1662
1663   void handle_notify(uint64_t notify_id,
1664                      uint64_t cookie,
1665                      uint64_t notifier_id,
1666                      bufferlist& bl) override {
1667     ldout(ioctx->client->cct, 10) << __func__ << " " << notify_id
1668                                   << " cookie " << cookie
1669                                   << " notifier_id " << notifier_id
1670                                   << " len " << bl.length()
1671                                   << dendl;
1672
1673     if (ctx2)
1674       ctx2->handle_notify(notify_id, cookie, notifier_id, bl);
1675     if (ctx) {
1676       ctx->notify(0, 0, bl);
1677
1678       // send ACK back to OSD if using legacy protocol
1679       bufferlist empty;
1680       ioctx->notify_ack(oid, notify_id, cookie, empty);
1681     }
1682   }
1683   void handle_error(uint64_t cookie, int err) override {
1684     ldout(ioctx->client->cct, 10) << __func__ << " cookie " << cookie
1685                                   << " err " << err
1686                                   << dendl;
1687     if (ctx2)
1688       ctx2->handle_error(cookie, err);
1689   }
1690 };
1691
1692 int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
1693                                librados::WatchCtx *ctx,
1694                                librados::WatchCtx2 *ctx2,
1695                                bool internal)
1696 {
1697   return watch(oid, handle, ctx, ctx2, 0, internal);
1698 }
1699
1700 int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
1701                                librados::WatchCtx *ctx,
1702                                librados::WatchCtx2 *ctx2,
1703                                uint32_t timeout,
1704                                bool internal)
1705 {
1706   ::ObjectOperation wr;
1707   version_t objver;
1708   C_SaferCond onfinish;
1709
1710   Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
1711   *handle = linger_op->get_cookie();
1712   linger_op->watch_context = new WatchInfo(this,
1713                                            oid, ctx, ctx2, internal);
1714
1715   prepare_assert_ops(&wr);
1716   wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
1717   bufferlist bl;
1718   objecter->linger_watch(linger_op, wr,
1719                          snapc, ceph::real_clock::now(), bl,
1720                          &onfinish,
1721                          &objver);
1722
1723   int r = onfinish.wait();
1724
1725   set_sync_op_version(objver);
1726
1727   if (r < 0) {
1728     objecter->linger_cancel(linger_op);
1729     *handle = 0;
1730   }
1731
1732   return r;
1733 }
1734
1735 int librados::IoCtxImpl::aio_watch(const object_t& oid,
1736                                    AioCompletionImpl *c,
1737                                    uint64_t *handle,
1738                                    librados::WatchCtx *ctx,
1739                                    librados::WatchCtx2 *ctx2,
1740                                    bool internal) {
1741   return aio_watch(oid, c, handle, ctx, ctx2, 0, internal);
1742 }
1743
1744 int librados::IoCtxImpl::aio_watch(const object_t& oid,
1745                                    AioCompletionImpl *c,
1746                                    uint64_t *handle,
1747                                    librados::WatchCtx *ctx,
1748                                    librados::WatchCtx2 *ctx2,
1749                                    uint32_t timeout,
1750                                    bool internal)
1751 {
1752   Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
1753   c->io = this;
1754   Context *oncomplete = new C_aio_linger_Complete(c, linger_op, false);
1755
1756   ::ObjectOperation wr;
1757   *handle = linger_op->get_cookie();
1758   linger_op->watch_context = new WatchInfo(this, oid, ctx, ctx2, internal);
1759
1760   prepare_assert_ops(&wr);
1761   wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
1762   bufferlist bl;
1763   objecter->linger_watch(linger_op, wr,
1764                          snapc, ceph::real_clock::now(), bl,
1765                          oncomplete, &c->objver);
1766
1767   return 0;
1768 }
1769
1770
1771 int librados::IoCtxImpl::notify_ack(
1772   const object_t& oid,
1773   uint64_t notify_id,
1774   uint64_t cookie,
1775   bufferlist& bl)
1776 {
1777   ::ObjectOperation rd;
1778   prepare_assert_ops(&rd);
1779   rd.notify_ack(notify_id, cookie, bl);
1780   objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0);
1781   return 0;
1782 }
1783
1784 int librados::IoCtxImpl::watch_check(uint64_t cookie)
1785 {
1786   Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
1787   return objecter->linger_check(linger_op);
1788 }
1789
1790 int librados::IoCtxImpl::unwatch(uint64_t cookie)
1791 {
1792   Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
1793   C_SaferCond onfinish;
1794   version_t ver = 0;
1795
1796   ::ObjectOperation wr;
1797   prepare_assert_ops(&wr);
1798   wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
1799   objecter->mutate(linger_op->target.base_oid, oloc, wr,
1800                    snapc, ceph::real_clock::now(), 0,
1801                    &onfinish, &ver);
1802   objecter->linger_cancel(linger_op);
1803
1804   int r = onfinish.wait();
1805   set_sync_op_version(ver);
1806   return r;
1807 }
1808
1809 int librados::IoCtxImpl::aio_unwatch(uint64_t cookie, AioCompletionImpl *c)
1810 {
1811   c->io = this;
1812   Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
1813   Context *oncomplete = new C_aio_linger_Complete(c, linger_op, true);
1814
1815   ::ObjectOperation wr;
1816   prepare_assert_ops(&wr);
1817   wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
1818   objecter->mutate(linger_op->target.base_oid, oloc, wr,
1819                    snapc, ceph::real_clock::now(), 0,
1820                    oncomplete, &c->objver);
1821   return 0;
1822 }
1823
1824 int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
1825                                 uint64_t timeout_ms,
1826                                 bufferlist *preply_bl,
1827                                 char **preply_buf, size_t *preply_buf_len)
1828 {
1829   Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
1830
1831   C_SaferCond notify_finish_cond;
1832   Context *notify_finish = new C_notify_Finish(client->cct, &notify_finish_cond,
1833                                                objecter, linger_op, preply_bl,
1834                                                preply_buf, preply_buf_len);
1835
1836   uint32_t timeout = notify_timeout;
1837   if (timeout_ms)
1838     timeout = timeout_ms / 1000;
1839
1840   // Construct RADOS op
1841   ::ObjectOperation rd;
1842   prepare_assert_ops(&rd);
1843   bufferlist inbl;
1844   rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);
1845
1846   // Issue RADOS op
1847   C_SaferCond onack;
1848   version_t objver;
1849   objecter->linger_notify(linger_op,
1850                           rd, snap_seq, inbl, NULL,
1851                           &onack, &objver);
1852
1853   ldout(client->cct, 10) << __func__ << " issued linger op " << linger_op << dendl;
1854   int r = onack.wait();
1855   ldout(client->cct, 10) << __func__ << " linger op " << linger_op
1856                          << " acked (" << r << ")" << dendl;
1857
1858   if (r == 0) {
1859     ldout(client->cct, 10) << __func__ << " waiting for watch_notify finish "
1860                            << linger_op << dendl;
1861     r = notify_finish_cond.wait();
1862
1863   } else {
1864     ldout(client->cct, 10) << __func__ << " failed to initiate notify, r = "
1865                            << r << dendl;
1866     notify_finish->complete(r);
1867   }
1868
1869   objecter->linger_cancel(linger_op);
1870
1871   set_sync_op_version(objver);
1872   return r;
1873 }
1874
1875 int librados::IoCtxImpl::aio_notify(const object_t& oid, AioCompletionImpl *c,
1876                                     bufferlist& bl, uint64_t timeout_ms,
1877                                     bufferlist *preply_bl, char **preply_buf,
1878                                     size_t *preply_buf_len)
1879 {
1880   Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
1881
1882   c->io = this;
1883
1884   C_aio_notify_Complete *oncomplete = new C_aio_notify_Complete(c, linger_op);
1885   C_notify_Finish *onnotify = new C_notify_Finish(client->cct, oncomplete,
1886                                                   objecter, linger_op,
1887                                                   preply_bl, preply_buf,
1888                                                   preply_buf_len);
1889   Context *onack = new C_aio_notify_Ack(client->cct, onnotify, oncomplete);
1890
1891   uint32_t timeout = notify_timeout;
1892   if (timeout_ms)
1893     timeout = timeout_ms / 1000;
1894
1895   // Construct RADOS op
1896   ::ObjectOperation rd;
1897   prepare_assert_ops(&rd);
1898   bufferlist inbl;
1899   rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);
1900
1901   // Issue RADOS op
1902   objecter->linger_notify(linger_op,
1903                           rd, snap_seq, inbl, NULL,
1904                           onack, &c->objver);
1905   return 0;
1906 }
1907
1908 int librados::IoCtxImpl::set_alloc_hint(const object_t& oid,
1909                                         uint64_t expected_object_size,
1910                                         uint64_t expected_write_size,
1911                                         uint32_t flags)
1912 {
1913   ::ObjectOperation wr;
1914   prepare_assert_ops(&wr);
1915   wr.set_alloc_hint(expected_object_size, expected_write_size, flags);
1916   return operate(oid, &wr, NULL);
1917 }
1918
1919 version_t librados::IoCtxImpl::last_version()
1920 {
1921   return last_objver;
1922 }
1923
1924 void librados::IoCtxImpl::set_assert_version(uint64_t ver)
1925 {
1926   assert_ver = ver;
1927 }
1928
1929 void librados::IoCtxImpl::set_notify_timeout(uint32_t timeout)
1930 {
1931   notify_timeout = timeout;
1932 }
1933
1934 int librados::IoCtxImpl::cache_pin(const object_t& oid)
1935 {
1936   ::ObjectOperation wr;
1937   prepare_assert_ops(&wr);
1938   wr.cache_pin();
1939   return operate(oid, &wr, NULL);
1940 }
1941
1942 int librados::IoCtxImpl::cache_unpin(const object_t& oid)
1943 {
1944   ::ObjectOperation wr;
1945   prepare_assert_ops(&wr);
1946   wr.cache_unpin();
1947   return operate(oid, &wr, NULL);
1948 }
1949
1950
1951 ///////////////////////////// C_aio_stat_Ack ////////////////////////////
1952
1953 librados::IoCtxImpl::C_aio_stat_Ack::C_aio_stat_Ack(AioCompletionImpl *_c,
1954                                                     time_t *pm)
1955    : c(_c), pmtime(pm)
1956 {
1957   assert(!c->io);
1958   c->get();
1959 }
1960
1961 void librados::IoCtxImpl::C_aio_stat_Ack::finish(int r)
1962 {
1963   c->lock.Lock();
1964   c->rval = r;
1965   c->complete = true;
1966   c->cond.Signal();
1967
1968   if (r >= 0 && pmtime) {
1969     *pmtime = real_clock::to_time_t(mtime);
1970   }
1971
1972   if (c->callback_complete) {
1973     c->io->client->finisher.queue(new C_AioComplete(c));
1974   }
1975
1976   c->put_unlock();
1977 }
1978
1979 ///////////////////////////// C_aio_stat2_Ack ////////////////////////////
1980
1981 librados::IoCtxImpl::C_aio_stat2_Ack::C_aio_stat2_Ack(AioCompletionImpl *_c,
1982                                                      struct timespec *pt)
1983    : c(_c), pts(pt)
1984 {
1985   assert(!c->io);
1986   c->get();
1987 }
1988
1989 void librados::IoCtxImpl::C_aio_stat2_Ack::finish(int r)
1990 {
1991   c->lock.Lock();
1992   c->rval = r;
1993   c->complete = true;
1994   c->cond.Signal();
1995
1996   if (r >= 0 && pts) {
1997     *pts = real_clock::to_timespec(mtime);
1998   }
1999
2000   if (c->callback_complete) {
2001     c->io->client->finisher.queue(new C_AioComplete(c));
2002   }
2003
2004   c->put_unlock();
2005 }
2006
2007 //////////////////////////// C_aio_Complete ////////////////////////////////
2008
2009 librados::IoCtxImpl::C_aio_Complete::C_aio_Complete(AioCompletionImpl *_c)
2010   : c(_c)
2011 {
2012   c->get();
2013 }
2014
2015 void librados::IoCtxImpl::C_aio_Complete::finish(int r)
2016 {
2017   c->lock.Lock();
2018   c->rval = r;
2019   c->complete = true;
2020   c->cond.Signal();
2021
2022   if (r == 0 && c->blp && c->blp->length() > 0) {
2023     if (c->out_buf && !c->blp->is_provided_buffer(c->out_buf))
2024       c->blp->copy(0, c->blp->length(), c->out_buf);
2025     c->rval = c->blp->length();
2026   }
2027
2028   if (c->callback_complete ||
2029       c->callback_safe) {
2030     c->io->client->finisher.queue(new C_AioComplete(c));
2031   }
2032
2033   if (c->aio_write_seq) {
2034     c->io->complete_aio_write(c);
2035   }
2036
2037 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
2038   OID_EVENT_TRACE(oid.name.c_str(), "RADOS_OP_COMPLETE");
2039 #endif
2040   c->put_unlock();
2041 }
2042
2043 void librados::IoCtxImpl::object_list_slice(
2044   const hobject_t start,
2045   const hobject_t finish,
2046   const size_t n,
2047   const size_t m,
2048   hobject_t *split_start,
2049   hobject_t *split_finish)
2050 {
2051   if (start.is_max()) {
2052     *split_start = hobject_t::get_max();
2053     *split_finish = hobject_t::get_max();
2054     return;
2055   }
2056
2057   uint64_t start_hash = hobject_t::_reverse_bits(start.get_hash());
2058   uint64_t finish_hash =
2059     finish.is_max() ? 0x100000000 :
2060     hobject_t::_reverse_bits(finish.get_hash());
2061
2062   uint64_t diff = finish_hash - start_hash;
2063   uint64_t rev_start = start_hash + (diff * n / m);
2064   uint64_t rev_finish = start_hash + (diff * (n + 1) / m);
2065   if (n == 0) {
2066     *split_start = start;
2067   } else {
2068     *split_start = hobject_t(
2069       object_t(), string(), CEPH_NOSNAP,
2070       hobject_t::_reverse_bits(rev_start), poolid, string());
2071   }
2072
2073   if (n == m - 1)
2074     *split_finish = finish;
2075   else if (rev_finish >= 0x100000000)
2076     *split_finish = hobject_t::get_max();
2077   else
2078     *split_finish = hobject_t(
2079       object_t(), string(), CEPH_NOSNAP,
2080       hobject_t::_reverse_bits(rev_finish), poolid, string());
2081 }
2082
2083 int librados::IoCtxImpl::application_enable(const std::string& app_name,
2084                                             bool force)
2085 {
2086   auto c = new PoolAsyncCompletionImpl();
2087   application_enable_async(app_name, force, c);
2088
2089   int r = c->wait();
2090   assert(r == 0);
2091
2092   r = c->get_return_value();
2093   c->release();
2094   if (r < 0) {
2095     return r;
2096   }
2097
2098   return client->wait_for_latest_osdmap();
2099 }
2100
2101 void librados::IoCtxImpl::application_enable_async(const std::string& app_name,
2102                                                    bool force,
2103                                                    PoolAsyncCompletionImpl *c)
2104 {
2105   // pre-Luminous clusters will return -EINVAL and application won't be
2106   // preserved until Luminous is configured as minimim version.
2107   if (!client->get_required_monitor_features().contains_all(
2108         ceph::features::mon::FEATURE_LUMINOUS)) {
2109     client->finisher.queue(new C_PoolAsync_Safe(c), -EOPNOTSUPP);
2110     return;
2111   }
2112
2113   std::stringstream cmd;
2114   cmd << "{"
2115       << "\"prefix\": \"osd pool application enable\","
2116       << "\"pool\": \"" << get_cached_pool_name() << "\","
2117       << "\"app\": \"" << app_name << "\"";
2118   if (force) {
2119     cmd << ",\"force\":\"--yes-i-really-mean-it\"";
2120   }
2121   cmd << "}";
2122
2123   std::vector<std::string> cmds;
2124   cmds.push_back(cmd.str());
2125   bufferlist inbl;
2126   client->mon_command_async(cmds, inbl, nullptr, nullptr,
2127                             new C_PoolAsync_Safe(c));
2128 }
2129
2130 int librados::IoCtxImpl::application_list(std::set<std::string> *app_names)
2131 {
2132   int r = 0;
2133   app_names->clear();
2134   objecter->with_osdmap([&](const OSDMap& o) {
2135       auto pg_pool = o.get_pg_pool(poolid);
2136       if (pg_pool == nullptr) {
2137         r = -ENOENT;
2138         return;
2139       }
2140
2141       for (auto &pair : pg_pool->application_metadata) {
2142         app_names->insert(pair.first);
2143       }
2144     });
2145   return r;
2146 }
2147
2148 int librados::IoCtxImpl::application_metadata_get(const std::string& app_name,
2149                                                   const std::string &key,
2150                                                   std::string* value)
2151 {
2152   int r = 0;
2153   objecter->with_osdmap([&](const OSDMap& o) {
2154       auto pg_pool = o.get_pg_pool(poolid);
2155       if (pg_pool == nullptr) {
2156         r = -ENOENT;
2157         return;
2158       }
2159
2160       auto app_it = pg_pool->application_metadata.find(app_name);
2161       if (app_it == pg_pool->application_metadata.end()) {
2162         r = -ENOENT;
2163         return;
2164       }
2165
2166       auto it = app_it->second.find(key);
2167       if (it == app_it->second.end()) {
2168         r = -ENOENT;
2169         return;
2170       }
2171
2172       *value = it->second;
2173     });
2174   return r;
2175 }
2176
2177 int librados::IoCtxImpl::application_metadata_set(const std::string& app_name,
2178                                                   const std::string &key,
2179                                                   const std::string& value)
2180 {
2181   std::stringstream cmd;
2182   cmd << "{"
2183       << "\"prefix\":\"osd pool application set\","
2184       << "\"pool\":\"" << get_cached_pool_name() << "\","
2185       << "\"app\":\"" << app_name << "\","
2186       << "\"key\":\"" << key << "\","
2187       << "\"value\":\"" << value << "\""
2188       << "}";
2189
2190   std::vector<std::string> cmds;
2191   cmds.push_back(cmd.str());
2192   bufferlist inbl;
2193   int r = client->mon_command(cmds, inbl, nullptr, nullptr);
2194   if (r < 0) {
2195     return r;
2196   }
2197
2198   // ensure we have the latest osd map epoch before proceeding
2199   return client->wait_for_latest_osdmap();
2200 }
2201
2202 int librados::IoCtxImpl::application_metadata_remove(const std::string& app_name,
2203                                                      const std::string &key)
2204 {
2205   std::stringstream cmd;
2206   cmd << "{"
2207       << "\"prefix\":\"osd pool application rm\","
2208       << "\"pool\":\"" << get_cached_pool_name() << "\","
2209       << "\"app\":\"" << app_name << "\","
2210       << "\"key\":\"" << key << "\""
2211       << "}";
2212
2213   std::vector<std::string> cmds;
2214   cmds.push_back(cmd.str());
2215   bufferlist inbl;
2216   int r = client->mon_command(cmds, inbl, nullptr, nullptr);
2217   if (r < 0) {
2218     return r;
2219   }
2220
2221   // ensure we have the latest osd map epoch before proceeding
2222   return client->wait_for_latest_osdmap();
2223 }
2224
2225 int librados::IoCtxImpl::application_metadata_list(const std::string& app_name,
2226                                                    std::map<std::string, std::string> *values)
2227 {
2228   int r = 0;
2229   values->clear();
2230   objecter->with_osdmap([&](const OSDMap& o) {
2231       auto pg_pool = o.get_pg_pool(poolid);
2232       if (pg_pool == nullptr) {
2233         r = -ENOENT;
2234         return;
2235       }
2236
2237       auto it = pg_pool->application_metadata.find(app_name);
2238       if (it == pg_pool->application_metadata.end()) {
2239         r = -ENOENT;
2240         return;
2241       }
2242
2243       *values = it->second;
2244     });
2245   return r;
2246 }
2247