Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / libradosstriper / RadosStriperImpl.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2014 Sebastien Ponce <sebastien.ponce@cern.ch>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include <boost/algorithm/string/replace.hpp>
16
17 #include "libradosstriper/RadosStriperImpl.h"
18
19 #include <errno.h>
20
21 #include <sstream>
22 #include <iomanip>
23 #include <algorithm>
24
25 #include "include/types.h"
26 #include "include/uuid.h"
27 #include "include/ceph_fs.h"
28 #include "common/dout.h"
29 #include "common/strtol.h"
30 #include "osdc/Striper.h"
31 #include "librados/AioCompletionImpl.h"
32 #include <cls/lock/cls_lock_client.h>
33
34 /*
35  * This file contents the actual implementation of the rados striped objects interface.
36  *
37  * Striped objects are stored in rados in a set of regular rados objects, after their
38  * content has been striped using the osdc/Striper interface.
39  *
40  * The external attributes of the striped object are mapped to the attributes of the
41  * first underlying object. This first object has a set of extra external attributes
42  * storing the layout of the striped object for future read back. These attributes are :
43  *  - striper.layout.object_size : the size of rados objects used.
44  *                                 Must be a multiple of striper.layout.stripe_unit
45  *  - striper.layout.stripe_unit : the size of a stripe unit
46  *  - striper.layout.stripe_count : the number of stripes used
47  *  - striper.size : total striped object size
48  *
49  * In general operations on striped objects are not atomic.
50  * However, a certain number of safety guards have been put to make the interface closer
51  * to atomicity :
52  *  - each data operation takes a shared lock on the first rados object for the
53  *    whole time of the operation
54  *  - the remove and trunc operations take an exclusive lock on the first rados object
55  *    for the whole time of the operation
56  * This makes sure that no removal/truncation of a striped object occurs while
57  * data operations are happening and vice versa. It thus makes sure that the layout
58  * of a striped object does not change during data operation, which is essential for
59  * data consistency.
60  *
61  * Still the writing to a striped object is not atomic. This means in particular that
62  * the size of an object may not be in sync with its content at all times.
63  * As the size is always garanteed to be updated first and in an atomic way, and as
64  * sparse striped objects are supported (see below), what will typically happen is
65  * that a reader that comes too soon after a write will read 0s instead of the actual
66  * data.
67  *
68  * Note that remove handles the pieces of the striped object in reverse order,
69  * so that the head object is removed last, making the completion of the deletion atomic.
70  *
71  * Striped objects can be sparse, typically in case data was written at the end of the
72  * striped object only. In such a case, some rados objects constituing the striped object
73  * may be missing. Other can be partial (only the beginning will have data)
74  * When dealing with such sparse striped files, missing objects are detected and
75  * considered as full of 0s. They are however not created until real data is written
76  * to them.
77  *
78  * There are a number of missing features/improvements that could be implemented.
79  * Here are some ideas :
80  *    - implementation of missing entry points (compared to rados)
81  *      In particular : clone_range, sparse_read, exec, aio_flush_async, tmaps, omaps, ...
82  *
83  */
84
85 #define dout_subsys ceph_subsys_rados
86 #undef dout_prefix
87 #define dout_prefix *_dout << "libradosstriper: "
88
89 /// size of xattr buffer
90 #define XATTR_BUFFER_SIZE 32
91
92 /// names of the different xattr entries
93 #define XATTR_LAYOUT_STRIPE_UNIT "striper.layout.stripe_unit"
94 #define XATTR_LAYOUT_STRIPE_COUNT "striper.layout.stripe_count"
95 #define XATTR_LAYOUT_OBJECT_SIZE "striper.layout.object_size"
96 #define XATTR_SIZE "striper.size"
97 #define LOCK_PREFIX "lock."
98
99 /// name of the lock used on objects to ensure layout stability during IO
100 #define RADOS_LOCK_NAME "striper.lock"
101
102 /// format of the extension of rados objects created for a given striped object
103 #define RADOS_OBJECT_EXTENSION_FORMAT ".%016llx"
104
105 /// default object layout
106 struct ceph_file_layout default_file_layout = {
107   init_le32(1<<22),     // fl_stripe_unit
108   init_le32(1),         // fl_stripe_count
109   init_le32(1<<22),     // fl_object_size
110   init_le32(0),         // fl_cas_hash
111   init_le32(0),         // fl_object_stripe_unit
112   init_le32(-1),        // fl_unused
113   init_le32(-1),        // fl_pg_pool
114 };
115
116 using libradosstriper::MultiAioCompletionImplPtr;
117
118 namespace {
119
120 ///////////////////////// CompletionData /////////////////////////////
121
122 /**
123  * struct handling the data needed to pass to the call back
124  * function in asynchronous operations
125  */
126 struct CompletionData : RefCountedObject {
127   /// constructor
128   CompletionData(libradosstriper::RadosStriperImpl * striper,
129                  const std::string& soid,
130                  const std::string& lockCookie,
131                  librados::AioCompletionImpl *userCompletion = 0,
132                  int n = 1);
133   /// destructor
134   ~CompletionData() override;
135   /// complete method
136   void complete(int r);
137   /// striper to be used to handle the write completion
138   libradosstriper::RadosStriperImpl *m_striper;
139   /// striped object concerned by the write operation
140   std::string m_soid;
141   /// shared lock to be released at completion
142   std::string m_lockCookie;
143   /// completion handler
144   librados::IoCtxImpl::C_aio_Complete *m_ack;
145 };
146
147 CompletionData::CompletionData
148 (libradosstriper::RadosStriperImpl* striper,
149  const std::string& soid,
150  const std::string& lockCookie,
151  librados::AioCompletionImpl *userCompletion,
152  int n) :
153   RefCountedObject(striper->cct(), n),
154   m_striper(striper), m_soid(soid), m_lockCookie(lockCookie), m_ack(0) {
155   m_striper->get();
156   if (userCompletion) {
157     m_ack = new librados::IoCtxImpl::C_aio_Complete(userCompletion);
158     userCompletion->io = striper->m_ioCtxImpl;
159   }
160 }
161
162 CompletionData::~CompletionData() {
163   if (m_ack) delete m_ack;
164   m_striper->put();
165 }
166
167 void CompletionData::complete(int r) {
168   if (m_ack) m_ack->finish(r);
169 }
170
171 /**
172  * struct handling the data needed to pass to the call back
173  * function in asynchronous read operations
174  */
175 struct ReadCompletionData : CompletionData {
176   /// bufferlist containing final result
177   bufferlist* m_bl;
178   /// extents that will be read
179   std::vector<ObjectExtent>* m_extents;
180   /// intermediate results
181   std::vector<bufferlist>* m_resultbl;
182   /// return code of read completion, to be remembered until unlocking happened
183   int m_readRc;
184   /// completion object for the unlocking of the striped object at the end of the read
185   librados::AioCompletion *m_unlockCompletion;
186   /// constructor
187   ReadCompletionData(libradosstriper::RadosStriperImpl * striper,
188                      const std::string& soid,
189                      const std::string& lockCookie,
190                      librados::AioCompletionImpl *userCompletion,
191                      bufferlist* bl,
192                      std::vector<ObjectExtent>* extents,
193                      std::vector<bufferlist>* resultbl,
194                      int n);
195   /// destructor
196   ~ReadCompletionData() override;
197   /// complete method for when reading is over
198   void complete_read(int r);
199   /// complete method for when object is unlocked
200   void complete_unlock(int r);
201 };
202
203 ReadCompletionData::ReadCompletionData
204 (libradosstriper::RadosStriperImpl* striper,
205  const std::string& soid,
206  const std::string& lockCookie,
207  librados::AioCompletionImpl *userCompletion,
208  bufferlist* bl,
209  std::vector<ObjectExtent>* extents,
210  std::vector<bufferlist>* resultbl,
211  int n) :
212   CompletionData(striper, soid, lockCookie, userCompletion, n),
213   m_bl(bl), m_extents(extents), m_resultbl(resultbl), m_readRc(0),
214   m_unlockCompletion(0) {}
215
216 ReadCompletionData::~ReadCompletionData() {
217   m_unlockCompletion->release();
218   delete m_extents;
219   delete m_resultbl;
220 }
221
222 void ReadCompletionData::complete_read(int r) {
223   // gather data into final buffer
224   Striper::StripedReadResult readResult;
225   vector<bufferlist>::iterator bit = m_resultbl->begin();
226   for (vector<ObjectExtent>::iterator eit = m_extents->begin();
227        eit != m_extents->end();
228        ++eit, ++bit) {
229     readResult.add_partial_result(m_striper->cct(), *bit, eit->buffer_extents);
230   }
231   m_bl->clear();
232   readResult.assemble_result(m_striper->cct(), *m_bl, true);
233   // Remember return code
234   m_readRc = r;
235 }
236
237 void ReadCompletionData::complete_unlock(int r) {
238   // call parent's completion method
239   // Note that we ignore the return code of the unlock as we cannot do much about it
240   CompletionData::complete(m_readRc?m_readRc:m_bl->length());
241 }
242
243 /**
244  * struct handling the data needed to pass to the call back
245  * function in asynchronous write operations
246  */
247 struct WriteCompletionData : CompletionData {
248   /// safe completion handler
249   librados::IoCtxImpl::C_aio_Complete *m_safe;
250   /// return code of write completion, to be remembered until unlocking happened
251   int m_writeRc;
252   /// completion object for the unlocking of the striped object at the end of the write
253   librados::AioCompletion *m_unlockCompletion;
254   /// constructor
255   WriteCompletionData(libradosstriper::RadosStriperImpl * striper,
256                       const std::string& soid,
257                       const std::string& lockCookie,
258                       librados::AioCompletionImpl *userCompletion,
259                       int n);
260   /// destructor
261   ~WriteCompletionData() override;
262   /// complete method for when writing is over
263   void complete_write(int r);
264   /// complete method for when object is unlocked
265   void complete_unlock(int r);
266   /// safe method
267   void safe(int r);
268 };
269
270 WriteCompletionData::WriteCompletionData
271 (libradosstriper::RadosStriperImpl* striper,
272  const std::string& soid,
273  const std::string& lockCookie,
274  librados::AioCompletionImpl *userCompletion,
275  int n) :
276   CompletionData(striper, soid, lockCookie, userCompletion, n), m_safe(0),
277   m_unlockCompletion(0), m_writeRc(0) {
278   if (userCompletion) {
279     m_safe = new librados::IoCtxImpl::C_aio_Complete(userCompletion);
280   }
281 }
282
283 WriteCompletionData::~WriteCompletionData() {
284   m_unlockCompletion->release();
285   if (m_safe) delete m_safe;
286 }
287
288 void WriteCompletionData::complete_unlock(int r) {
289   // call parent's completion method
290   // Note that we ignore the return code of the unlock as we cannot do much about it
291   CompletionData::complete(m_writeRc);
292 }
293
294 void WriteCompletionData::complete_write(int r) {
295   // Remember return code
296   m_writeRc = r;
297 }
298
299 void WriteCompletionData::safe(int r) {
300   if (m_safe) m_safe->finish(r);
301 }
302
303 struct RemoveCompletionData : CompletionData {
304   /// removal flags
305   int flags;
306   /**
307    * constructor
308    * note that the constructed object will take ownership of the lock
309    */
310   RemoveCompletionData(libradosstriper::RadosStriperImpl * striper,
311                        const std::string& soid,
312                        const std::string& lockCookie,
313                        librados::AioCompletionImpl *userCompletion,
314                        int flags = 0) :
315   CompletionData(striper, soid, lockCookie, userCompletion), flags(flags) {}
316 };
317
318 /**
319  * struct handling the data needed to pass to the call back
320  * function in asynchronous truncate operations
321  */
322 struct TruncateCompletionData : RefCountedObject {
323   /// constructor
324   TruncateCompletionData(libradosstriper::RadosStriperImpl* striper,
325                          const std::string& soid,
326                          uint64_t size) :
327     RefCountedObject(striper->cct()),
328     m_striper(striper), m_soid(soid), m_size(size) {
329     m_striper->get();
330   }
331   /// destructor
332   ~TruncateCompletionData() override {
333     m_striper->put();
334   }
335   /// striper to be used
336   libradosstriper::RadosStriperImpl *m_striper;
337   /// striped object concerned by the truncate operation
338   std::string m_soid;
339   /// the final size of the truncated object
340   uint64_t m_size;
341 };
342
343 /**
344  * struct handling the data needed to pass to the call back
345  * function in asynchronous read operations of a Rados File
346  */
347 struct RadosReadCompletionData : RefCountedObject {
348   /// constructor
349   RadosReadCompletionData(MultiAioCompletionImplPtr multiAioCompl,
350                           uint64_t expectedBytes,
351                           bufferlist *bl,
352                           CephContext *context,
353                           int n = 1) :
354     RefCountedObject(context, n),
355     m_multiAioCompl(multiAioCompl), m_expectedBytes(expectedBytes), m_bl(bl) {}
356   /// the multi asynch io completion object to be used
357   MultiAioCompletionImplPtr m_multiAioCompl;
358   /// the expected number of bytes
359   uint64_t m_expectedBytes;
360   /// the bufferlist object where data have been written
361   bufferlist *m_bl;
362 };
363
364 /**
365  * struct handling (most of) the data needed to pass to the call back
366  * function in asynchronous stat operations.
367  * Inherited by the actual type for adding time information in different
368  * versions (time_t or struct timespec)
369  */
370 struct BasicStatCompletionData : CompletionData {
371   /// constructor
372   BasicStatCompletionData(libradosstriper::RadosStriperImpl* striper,
373                           const std::string& soid,
374                           librados::AioCompletionImpl *userCompletion,
375                           libradosstriper::MultiAioCompletionImpl *multiCompletion,
376                           uint64_t *psize,
377                           int n = 1) :
378     CompletionData(striper, soid, "", userCompletion, n),
379     m_multiCompletion(multiCompletion), m_psize(psize),
380     m_statRC(0), m_getxattrRC(0) {};
381   // MultiAioCompletionImpl used to handle the double aysnc
382   // call in the back (stat + getxattr)
383   libradosstriper::MultiAioCompletionImpl *m_multiCompletion;
384   // where to store the size of first objct
385   // this will be ignored but we need a place to store it when
386   // async stat is called
387   uint64_t m_objectSize;
388   // where to store the file size
389   uint64_t *m_psize;
390   /// the bufferlist object used for the getxattr call
391   bufferlist m_bl;
392   /// return code of the stat
393   int m_statRC;
394   /// return code of the getxattr
395   int m_getxattrRC;
396 };
397
398 /**
399  * struct handling the data needed to pass to the call back
400  * function in asynchronous stat operations.
401  * Simple templated extension of BasicStatCompletionData.
402  * The template parameter is the type of the time information
403  * (used with time_t for stat and struct timespec for stat2)
404  */
405 template<class TimeType>
406 struct StatCompletionData : BasicStatCompletionData {
407   /// constructor
408   StatCompletionData(libradosstriper::RadosStriperImpl* striper,
409                      const std::string& soid,
410                      librados::AioCompletionImpl *userCompletion,
411                      libradosstriper::MultiAioCompletionImpl *multiCompletion,
412                      uint64_t *psize,
413                      TimeType *pmtime,
414                      int n = 1) :
415     BasicStatCompletionData(striper, soid, userCompletion, multiCompletion, psize, n),
416     m_pmtime(pmtime) {};
417   // where to store the file time
418   TimeType *m_pmtime;
419 };
420
421 /**
422  * struct handling the data needed to pass to the call back
423  * function in asynchronous remove operations of a Rados File
424  */
425 struct RadosRemoveCompletionData : RefCountedObject {
426   /// constructor
427   RadosRemoveCompletionData(MultiAioCompletionImplPtr multiAioCompl,
428                             CephContext *context) :
429     RefCountedObject(context, 2),
430     m_multiAioCompl(multiAioCompl) {};
431   /// the multi asynch io completion object to be used
432   MultiAioCompletionImplPtr m_multiAioCompl;
433 };
434
435
436 } // namespace {
437
438 ///////////////////////// constructor /////////////////////////////
439
440 libradosstriper::RadosStriperImpl::RadosStriperImpl(librados::IoCtx& ioctx, librados::IoCtxImpl *ioctx_impl) :
441   m_refCnt(0),lock("RadosStriper Refcont", false, false), m_radosCluster(ioctx), m_ioCtx(ioctx), m_ioCtxImpl(ioctx_impl),
442   m_layout(default_file_layout) {}
443
444 ///////////////////////// layout /////////////////////////////
445
446 int libradosstriper::RadosStriperImpl::setObjectLayoutStripeUnit
447 (unsigned int stripe_unit)
448 {
449   /* stripe unit must be non-zero, 64k increment */
450   if (!stripe_unit || (stripe_unit & (CEPH_MIN_STRIPE_UNIT-1)))
451     return -EINVAL;
452   m_layout.fl_stripe_unit = stripe_unit;
453   return 0;
454 }
455
456 int libradosstriper::RadosStriperImpl::setObjectLayoutStripeCount
457 (unsigned int stripe_count)
458 {
459   /* stripe count must be non-zero */
460   if (!stripe_count)
461     return -EINVAL;
462   m_layout.fl_stripe_count = stripe_count;
463   return 0;
464 }
465
466 int libradosstriper::RadosStriperImpl::setObjectLayoutObjectSize
467 (unsigned int object_size)
468 {
469   /* object size must be non-zero, 64k increment */
470   if (!object_size || (object_size & (CEPH_MIN_STRIPE_UNIT-1)))
471     return -EINVAL;
472   /* object size must be a multiple of stripe unit */
473   if (object_size < m_layout.fl_stripe_unit ||
474       object_size % m_layout.fl_stripe_unit)
475     return -EINVAL;
476   m_layout.fl_object_size = object_size;
477   return 0;
478 }
479
480 ///////////////////////// xattrs /////////////////////////////
481
482 int libradosstriper::RadosStriperImpl::getxattr(const object_t& soid,
483                                                 const char *name,
484                                                 bufferlist& bl)
485 {
486   std::string firstObjOid = getObjectId(soid, 0);
487   return m_ioCtx.getxattr(firstObjOid, name, bl);
488 }
489
490 int libradosstriper::RadosStriperImpl::setxattr(const object_t& soid,
491                                                 const char *name,
492                                                 bufferlist& bl)
493 {
494   std::string firstObjOid = getObjectId(soid, 0);
495   return m_ioCtx.setxattr(firstObjOid, name, bl);
496 }
497
498 int libradosstriper::RadosStriperImpl::getxattrs(const object_t& soid,
499                                                  map<string, bufferlist>& attrset)
500 {
501   std::string firstObjOid = getObjectId(soid, 0);
502   int rc = m_ioCtx.getxattrs(firstObjOid, attrset);
503   if (rc) return rc;
504   // cleanup internal attributes dedicated to striping and locking
505   attrset.erase(XATTR_LAYOUT_STRIPE_UNIT);
506   attrset.erase(XATTR_LAYOUT_STRIPE_COUNT);
507   attrset.erase(XATTR_LAYOUT_OBJECT_SIZE);
508   attrset.erase(XATTR_SIZE);
509   attrset.erase(std::string(LOCK_PREFIX) + RADOS_LOCK_NAME);
510   return rc;
511 }
512
513 int libradosstriper::RadosStriperImpl::rmxattr(const object_t& soid,
514                                                const char *name)
515 {
516   std::string firstObjOid = getObjectId(soid, 0);
517   return m_ioCtx.rmxattr(firstObjOid, name);
518 }
519
520 ///////////////////////// io /////////////////////////////
521
522 int libradosstriper::RadosStriperImpl::write(const std::string& soid,
523                                              const bufferlist& bl,
524                                              size_t len,
525                                              uint64_t off) 
526 {
527   // open the object. This will create it if needed, retrieve its layout
528   // and size and take a shared lock on it
529   ceph_file_layout layout;
530   std::string lockCookie;
531   int rc = createAndOpenStripedObject(soid, &layout, len+off, &lockCookie, true);
532   if (rc) return rc;
533   return write_in_open_object(soid, layout, lockCookie, bl, len, off);
534 }
535
536 int libradosstriper::RadosStriperImpl::append(const std::string& soid,
537                                               const bufferlist& bl,
538                                               size_t len) 
539 {
540   // open the object. This will create it if needed, retrieve its layout
541   // and size and take a shared lock on it
542   ceph_file_layout layout;
543   uint64_t size = len;
544   std::string lockCookie;
545   int rc = openStripedObjectForWrite(soid, &layout, &size, &lockCookie, false);
546   if (rc) return rc;
547   return write_in_open_object(soid, layout, lockCookie, bl, len, size);
548 }
549
550 int libradosstriper::RadosStriperImpl::write_full(const std::string& soid,
551                                                   const bufferlist& bl) 
552 {
553   int rc = trunc(soid, 0);
554   if (rc && rc != -ENOENT) return rc; // ENOENT is obviously ok
555   return write(soid, bl, bl.length(), 0);
556 }
557
558 int libradosstriper::RadosStriperImpl::read(const std::string& soid,
559                                             bufferlist* bl,
560                                             size_t len,
561                                             uint64_t off)
562 {
563   // create a completion object
564   librados::AioCompletionImpl c;
565   // call asynchronous method
566   int rc = aio_read(soid, &c, bl, len, off);
567   // and wait for completion
568   if (!rc) {
569     // wait for completion
570     c.wait_for_complete_and_cb();
571     // return result
572     rc = c.get_return_value();
573   }
574   return rc;
575 }
576
577 ///////////////////////// asynchronous io /////////////////////////////
578
579 int libradosstriper::RadosStriperImpl::aio_write(const std::string& soid,
580                                                  librados::AioCompletionImpl *c,
581                                                  const bufferlist& bl,
582                                                  size_t len,
583                                                  uint64_t off)
584 {
585   ceph_file_layout layout;
586   std::string lockCookie;
587   int rc = createAndOpenStripedObject(soid, &layout, len+off, &lockCookie, true);
588   if (rc) return rc;
589   return aio_write_in_open_object(soid, c, layout, lockCookie, bl, len, off);
590 }
591
592 int libradosstriper::RadosStriperImpl::aio_append(const std::string& soid,
593                                                   librados::AioCompletionImpl *c,
594                                                   const bufferlist& bl,
595                                                   size_t len)
596 {
597   ceph_file_layout layout;
598   uint64_t size = len;
599   std::string lockCookie;
600   int rc = openStripedObjectForWrite(soid, &layout, &size, &lockCookie, false);
601   if (rc) return rc;
602   // create a completion object
603   return aio_write_in_open_object(soid, c, layout, lockCookie, bl, len, size);
604 }
605
606 int libradosstriper::RadosStriperImpl::aio_write_full(const std::string& soid,
607                                                       librados::AioCompletionImpl *c,
608                                                       const bufferlist& bl)
609 {
610   int rc = trunc(soid, 0);
611   if (rc) return rc;
612   return aio_write(soid, c, bl, bl.length(), 0);
613 }
614
615 static void rados_read_aio_unlock_complete(rados_striper_multi_completion_t c, void *arg)
616 {
617   auto cdata = reinterpret_cast<ReadCompletionData*>(arg);
618   libradosstriper::MultiAioCompletionImpl *comp =
619     reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(c);
620   cdata->complete_unlock(comp->rval);
621   cdata->put();
622 }
623
624 static void striper_read_aio_req_complete(rados_striper_multi_completion_t c, void *arg)
625 {
626   auto cdata = reinterpret_cast<ReadCompletionData*>(arg);
627   // launch the async unlocking of the object
628   cdata->m_striper->aio_unlockObject(cdata->m_soid, cdata->m_lockCookie, cdata->m_unlockCompletion);
629   // complete the read part in parallel
630   libradosstriper::MultiAioCompletionImpl *comp =
631     reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(c);
632   cdata->complete_read(comp->rval);
633 }
634
635 static void rados_req_read_safe(rados_completion_t c, void *arg)
636 {
637   auto data = reinterpret_cast<RadosReadCompletionData*>(arg);
638   int rc = rados_aio_get_return_value(c);
639   // ENOENT means that we are dealing with a sparse file. This is fine,
640   // data (0s) will be created on the fly by the rados_req_read_complete method
641   if (rc == -ENOENT) rc = 0;
642   auto multiAioComp = data->m_multiAioCompl;
643   multiAioComp->safe_request(rc);
644   data->put();
645 }
646
647 static void rados_req_read_complete(rados_completion_t c, void *arg)
648 {
649   auto data = reinterpret_cast<RadosReadCompletionData*>(arg);
650   int rc = rados_aio_get_return_value(c);
651   // We need to handle the case of sparse files here
652   if (rc == -ENOENT) {
653     // the object did not exist at all. This can happen for sparse files.
654     // we consider we've read 0 bytes and it will fall into next case
655     rc = 0;
656   }
657   if (rc >= 0 && (((uint64_t)rc) < data->m_expectedBytes)) {
658     // only partial data were present in the object (or the object did not
659     // even exist if we've gone through previous case).
660     // This is typical of sparse file and we need to complete with 0s.
661     unsigned int lenOfZeros = data->m_expectedBytes-rc;
662     unsigned int existingDataToZero = min(data->m_bl->length()-rc, lenOfZeros);
663     if (existingDataToZero > 0) {
664       data->m_bl->zero(rc, existingDataToZero);
665     }
666     if (lenOfZeros > existingDataToZero) {
667       ceph::bufferptr zeros(ceph::buffer::create(lenOfZeros-existingDataToZero));
668       zeros.zero();
669       data->m_bl->push_back(zeros);
670     }
671     rc = data->m_expectedBytes;
672   }
673   auto multiAioComp = data->m_multiAioCompl;
674   multiAioComp->complete_request(rc);
675   data->put();
676 }
677
678 int libradosstriper::RadosStriperImpl::aio_read(const std::string& soid,
679                                                 librados::AioCompletionImpl *c,
680                                                 bufferlist* bl,
681                                                 size_t len,
682                                                 uint64_t off)
683 {
684   // open the object. This will retrieve its layout and size
685   // and take a shared lock on it
686   ceph_file_layout layout;
687   uint64_t size;
688   std::string lockCookie;
689   int rc = openStripedObjectForRead(soid, &layout, &size, &lockCookie);
690   if (rc) return rc;
691   // find out the actual number of bytes we can read
692   uint64_t read_len;
693   if (off >= size) {
694     // nothing to read ! We are done.
695     read_len = 0;
696   } else {
697     read_len = min(len, (size_t)(size-off));
698   }
699   // get list of extents to be read from
700   vector<ObjectExtent> *extents = new vector<ObjectExtent>();
701   if (read_len > 0) {
702     std::string format = soid;
703     boost::replace_all(format, "%", "%%");
704     format += RADOS_OBJECT_EXTENSION_FORMAT;
705     file_layout_t l;
706     l.from_legacy(layout);
707     Striper::file_to_extents(cct(), format.c_str(), &l, off, read_len,
708                              0, *extents);
709   }
710
711   // create a completion object and transfer ownership of extents and resultbl
712   vector<bufferlist> *resultbl = new vector<bufferlist>(extents->size());
713   ReadCompletionData *cdata = new ReadCompletionData(this, soid, lockCookie, c,
714                                                      bl, extents, resultbl, 1);
715   c->is_read = true;
716   c->io = m_ioCtxImpl;
717   // create a completion for the unlocking of the striped object at the end of the read
718   librados::AioCompletion *unlock_completion =
719     librados::Rados::aio_create_completion(cdata, rados_read_aio_unlock_complete, 0);
720   cdata->m_unlockCompletion = unlock_completion;
721   // create the multiCompletion object handling the reads
722   MultiAioCompletionImplPtr nc{new libradosstriper::MultiAioCompletionImpl,
723                                false};
724   nc->set_complete_callback(cdata, striper_read_aio_req_complete);
725   // go through the extents
726   int r = 0, i = 0;
727   for (vector<ObjectExtent>::iterator p = extents->begin(); p != extents->end(); ++p) {
728     // create a buffer list describing where to place data read from current extend
729     bufferlist *oid_bl = &((*resultbl)[i++]);
730     for (vector<pair<uint64_t,uint64_t> >::iterator q = p->buffer_extents.begin();
731         q != p->buffer_extents.end();
732         ++q) {
733       bufferlist buffer_bl;
734       buffer_bl.substr_of(*bl, q->first, q->second);
735       oid_bl->append(buffer_bl);
736     }
737     // read all extends of a given object in one go
738     nc->add_request();
739     // we need 2 references on data as both rados_req_read_safe and rados_req_read_complete
740     // will release one
741     RadosReadCompletionData *data = new RadosReadCompletionData(nc, p->length, oid_bl, cct(), 2);
742     librados::AioCompletion *rados_completion =
743       librados::Rados::aio_create_completion(data, rados_req_read_complete, rados_req_read_safe);
744     r = m_ioCtx.aio_read(p->oid.name, rados_completion, oid_bl, p->length, p->offset);
745     rados_completion->release();
746     if (r < 0)
747       break;
748   }
749   nc->finish_adding_requests();
750   return r;
751 }
752
753 int libradosstriper::RadosStriperImpl::aio_read(const std::string& soid,
754                                                 librados::AioCompletionImpl *c,
755                                                 char* buf,
756                                                 size_t len,
757                                                 uint64_t off)
758 {
759   // create a buffer list and store it inside the completion object
760   c->bl.clear();
761   c->bl.push_back(buffer::create_static(len, buf));
762   // call the bufferlist version of this method
763   return aio_read(soid, c, &c->bl, len, off);
764 }
765
766 int libradosstriper::RadosStriperImpl::aio_flush() 
767 {
768   int ret;
769   // pass to the rados level
770   ret = m_ioCtx.aio_flush();
771   if (ret < 0)
772     return ret;
773   //wait all CompletionData are released
774   lock.Lock();
775   while (m_refCnt > 1)
776     cond.Wait(lock);
777   lock.Unlock();
778   return ret;
779 }
780
781 ///////////////////////// stat and deletion /////////////////////////////
782
783 int libradosstriper::RadosStriperImpl::stat(const std::string& soid, uint64_t *psize, time_t *pmtime)
784 {
785   // create a completion object
786   librados::AioCompletionImpl c;
787   // call asynchronous version of stat
788   int rc = aio_stat(soid, &c, psize, pmtime);
789   if (rc == 0) {
790     // wait for completion of the remove
791     c.wait_for_complete();
792     // get result
793     rc = c.get_return_value();
794   }
795   return rc;
796 }
797
798 static void striper_stat_aio_stat_complete(rados_completion_t c, void *arg) {
799   auto data = reinterpret_cast<BasicStatCompletionData*>(arg);
800   int rc = rados_aio_get_return_value(c);
801   if (rc == -ENOENT) {
802     // remember this has failed
803     data->m_statRC = rc;
804   }
805   data->m_multiCompletion->complete_request(rc);
806   data->put();
807 }
808
809 static void striper_stat_aio_getxattr_complete(rados_completion_t c, void *arg) {
810   auto data = reinterpret_cast<BasicStatCompletionData*>(arg);
811   int rc = rados_aio_get_return_value(c);
812   // We need to handle the case of sparse files here
813   if (rc < 0) {
814     // remember this has failed
815     data->m_getxattrRC = rc;
816   } else {
817     // this intermediate string allows to add a null terminator before calling strtol
818     std::string err;
819     std::string strsize(data->m_bl.c_str(), data->m_bl.length());
820     *data->m_psize = strict_strtoll(strsize.c_str(), 10, &err);
821     if (!err.empty()) {
822       lderr(data->m_striper->cct()) << XATTR_SIZE << " : " << err << dendl;
823       data->m_getxattrRC = -EINVAL;
824     }
825     rc = 0;
826   }
827   data->m_multiCompletion->complete_request(rc);
828   data->put();
829 }
830
831 static void striper_stat_aio_req_complete(rados_striper_multi_completion_t c,
832                                           void *arg) {
833   auto data = reinterpret_cast<BasicStatCompletionData*>(arg);
834   if (data->m_statRC) {
835     data->complete(data->m_statRC);
836   } else {
837     if (data->m_getxattrRC < 0) {
838       data->complete(data->m_getxattrRC);
839     } else {
840       data->complete(0);
841     }
842   }
843   data->put();
844 }
845
846 template<class TimeType>
847 int libradosstriper::RadosStriperImpl::aio_generic_stat
848 (const std::string& soid,
849  librados::AioCompletionImpl *c,
850  uint64_t *psize,
851  TimeType *pmtime,
852  typename libradosstriper::RadosStriperImpl::StatFunction<TimeType>::Type statFunction)
853 {
854   // use a MultiAioCompletion object for dealing with the fact
855   // that we'll do 2 asynchronous calls in parallel
856   MultiAioCompletionImplPtr multi_completion{
857     new libradosstriper::MultiAioCompletionImpl, false};
858   // Data object used for passing context to asynchronous calls
859   std::string firstObjOid = getObjectId(soid, 0);
860   StatCompletionData<TimeType> *cdata =
861     new StatCompletionData<TimeType>(this, firstObjOid, c,
862                                      multi_completion.get(), psize, pmtime, 4);
863   multi_completion->set_complete_callback(cdata, striper_stat_aio_req_complete);
864   // use a regular AioCompletion for the stat async call
865   librados::AioCompletion *stat_completion =
866     librados::Rados::aio_create_completion(cdata, striper_stat_aio_stat_complete, 0);
867   multi_completion->add_safe_request();
868   object_t obj(firstObjOid);
869   int rc = (m_ioCtxImpl->*statFunction)(obj, stat_completion->pc,
870                                         &cdata->m_objectSize, cdata->m_pmtime);
871   stat_completion->release();
872   if (rc < 0) {
873     // nothing is really started so cancel everything
874     delete cdata;
875     return rc;
876   }
877   // use a regular AioCompletion for the getxattr async call
878   librados::AioCompletion *getxattr_completion =
879     librados::Rados::aio_create_completion(cdata, striper_stat_aio_getxattr_complete, 0);
880   multi_completion->add_safe_request();
881   // in parallel, get the pmsize from the first object asynchronously
882   rc = m_ioCtxImpl->aio_getxattr(obj, getxattr_completion->pc,
883                                  XATTR_SIZE, cdata->m_bl);
884   getxattr_completion->release();
885   multi_completion->finish_adding_requests();
886   if (rc < 0) {
887     // the async stat is ongoing, so we need to go on
888     // we mark the getxattr as failed in the data object
889     cdata->m_getxattrRC = rc;
890     multi_completion->complete_request(rc);
891     return rc;
892   }
893   cdata->put();
894   return 0;
895 }
896
897 int libradosstriper::RadosStriperImpl::aio_stat(const std::string& soid,
898                                                 librados::AioCompletionImpl *c,
899                                                 uint64_t *psize,
900                                                 time_t *pmtime)
901 {
902   return aio_generic_stat<time_t>(soid, c, psize, pmtime, &librados::IoCtxImpl::aio_stat);
903 }
904
905 int libradosstriper::RadosStriperImpl::stat2(const std::string& soid, uint64_t *psize, struct timespec *pts)
906 {
907   // create a completion object
908   librados::AioCompletionImpl c;
909   // call asynchronous version of stat
910   int rc = aio_stat2(soid, &c, psize, pts);
911   if (rc == 0) {
912     // wait for completion of the remove
913     c.wait_for_complete_and_cb();
914     // get result
915     rc = c.get_return_value();
916   }
917   return rc;
918 }
919
920 int libradosstriper::RadosStriperImpl::aio_stat2(const std::string& soid,
921                                                 librados::AioCompletionImpl *c,
922                                                 uint64_t *psize,
923                                                 struct timespec *pts)
924 {
925   return aio_generic_stat<struct timespec>(soid, c, psize, pts, &librados::IoCtxImpl::aio_stat2);
926 }
927
928 static void rados_req_remove_complete(rados_completion_t c, void *arg)
929 {
930   auto cdata = reinterpret_cast<RadosRemoveCompletionData*>(arg);
931   int rc = rados_aio_get_return_value(c);
932   // in case the object did not exist, it means we had a sparse file, all is fine
933   if (rc == -ENOENT) {
934     rc = 0;
935   }
936   cdata->m_multiAioCompl->complete_request(rc);
937   cdata->put();
938 }
939
940 static void rados_req_remove_safe(rados_completion_t c, void *arg)
941 {
942   auto cdata = reinterpret_cast<RadosRemoveCompletionData*>(arg);
943   int rc = rados_aio_get_return_value(c);
944   // in case the object did not exist, it means we had a sparse file, all is fine
945   if (rc == -ENOENT) {
946     rc = 0;
947   }
948   cdata->m_multiAioCompl->safe_request(rc);
949   cdata->put();
950 }
951
952 static void striper_remove_aio_req_complete(rados_striper_multi_completion_t c, void *arg)
953 {
954   auto cdata = reinterpret_cast<RemoveCompletionData*>(arg);
955   libradosstriper::MultiAioCompletionImpl *comp =
956     reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(c);
957   ldout(cdata->m_striper->cct(), 10)
958     << "RadosStriperImpl : striper_remove_aio_req_complete called for "
959     << cdata->m_soid << dendl;
960   int rc = comp->rval;
961   if (rc == 0) {
962     // All went fine, synchronously remove first object
963     rc = cdata->m_striper->m_ioCtx.remove(cdata->m_striper->getObjectId(cdata->m_soid, 0),
964                                           cdata->flags);
965   } else {
966     lderr(cdata->m_striper->cct())
967       << "RadosStriperImpl : deletion/truncation incomplete for " << cdata->m_soid
968       << ", as errors were encountered. The file is left present but it's content "
969       << " has been partially removed"
970       << dendl;
971   }
972   cdata->complete(rc);
973   cdata->put();
974 }
975
976 int libradosstriper::RadosStriperImpl::remove(const std::string& soid, int flags)
977 {
978   // create a completion object
979   librados::AioCompletionImpl c;
980   // call asynchronous version of remove
981   int rc = aio_remove(soid, &c, flags);
982   if (rc == 0) {
983     // wait for completion of the remove
984     c.wait_for_complete_and_cb();
985     // get result
986     rc = c.get_return_value();
987   }
988   return rc;
989 }
990
991 int libradosstriper::RadosStriperImpl::aio_remove(const std::string& soid,
992                                                   librados::AioCompletionImpl *c,
993                                                   int flags)
994 {
995   // the RemoveCompletionData object will lock the given soid for the duration
996   // of the removal
997   std::string lockCookie = getUUID();
998   int rc = m_ioCtx.lock_exclusive(getObjectId(soid, 0), RADOS_LOCK_NAME, lockCookie, "", 0, 0);
999   if (rc) return rc;
1000   // create CompletionData for the async remove call
1001   RemoveCompletionData *cdata = new RemoveCompletionData(this, soid, lockCookie, c, flags);
1002   MultiAioCompletionImplPtr multi_completion{
1003     new libradosstriper::MultiAioCompletionImpl, false};
1004   multi_completion->set_complete_callback(cdata, striper_remove_aio_req_complete);
1005   // call asynchronous internal version of remove
1006   ldout(cct(), 10)
1007     << "RadosStriperImpl : Aio_remove starting for "
1008     << soid << dendl;
1009   rc = internal_aio_remove(soid, multi_completion);
1010   return rc;
1011 }
1012
1013 int libradosstriper::RadosStriperImpl::internal_aio_remove(
1014  const std::string& soid,
1015  MultiAioCompletionImplPtr multi_completion,
1016  int flags)
1017 {
1018   std::string firstObjOid = getObjectId(soid, 0);
1019   try {
1020     // check size and get number of rados objects to delete
1021     uint64_t nb_objects = 0;
1022     bufferlist bl2;
1023     int rc = getxattr(soid, XATTR_SIZE, bl2);
1024     if (rc < 0) {
1025       // no object size (or not able to get it)
1026       // try to find the number of object "by hand"
1027       uint64_t psize;
1028       time_t pmtime;
1029       while (!m_ioCtx.stat(getObjectId(soid, nb_objects), &psize, &pmtime)) {
1030         nb_objects++;
1031       }
1032     } else {
1033       // count total number of rados objects in the striped object
1034       std::string err;
1035       // this intermediate string allows to add a null terminator before calling strtol
1036       std::string strsize(bl2.c_str(), bl2.length());
1037       uint64_t size = strict_strtoll(strsize.c_str(), 10, &err);
1038       if (!err.empty()) {
1039         lderr(cct()) << XATTR_SIZE << " : " << err << dendl;
1040         
1041         return -EINVAL;
1042       }
1043       uint64_t object_size = m_layout.fl_object_size;
1044       uint64_t su = m_layout.fl_stripe_unit;
1045       uint64_t stripe_count = m_layout.fl_stripe_count;
1046       uint64_t nb_complete_sets = size / (object_size*stripe_count);
1047       uint64_t remaining_data = size % (object_size*stripe_count);
1048       uint64_t remaining_stripe_units = (remaining_data + su -1) / su;
1049       uint64_t remaining_objects = std::min(remaining_stripe_units, stripe_count);
1050       nb_objects = nb_complete_sets * stripe_count + remaining_objects;
1051     }
1052     // delete rados objects in reverse order
1053     // Note that we do not drop the first object. This one will only be dropped
1054     // if all other removals have been successful, and this is done in the
1055     // callback of the multi_completion object
1056     int rcr = 0;
1057     for (int i = nb_objects-1; i >= 1; i--) {
1058       multi_completion->add_request();
1059       RadosRemoveCompletionData *data =
1060         new RadosRemoveCompletionData(multi_completion, cct());
1061       librados::AioCompletion *rados_completion =
1062         librados::Rados::aio_create_completion(data,
1063                                                rados_req_remove_complete,
1064                                                rados_req_remove_safe);
1065       if (flags == 0) {
1066         rcr = m_ioCtx.aio_remove(getObjectId(soid, i), rados_completion);
1067       } else {
1068         rcr = m_ioCtx.aio_remove(getObjectId(soid, i), rados_completion, flags);
1069       }
1070       rados_completion->release();
1071       if (rcr < 0 and -ENOENT != rcr) {
1072         lderr(cct()) << "RadosStriperImpl::remove : deletion incomplete for " << soid
1073                      << ", as " << getObjectId(soid, i) << " could not be deleted (rc=" << rc << ")"
1074                      << dendl;
1075         break;
1076       }
1077     }
1078     // we are over adding requests to the multi_completion object
1079     multi_completion->finish_adding_requests();
1080     // return
1081     return rcr;
1082   } catch (ErrorCode &e) {
1083     // errror caught when trying to take the exclusive lock
1084     return e.m_code;
1085   }
1086
1087 }
1088
1089 int libradosstriper::RadosStriperImpl::trunc(const std::string& soid, uint64_t size)
1090 {
1091   // lock the object in exclusive mode
1092   std::string firstObjOid = getObjectId(soid, 0);
1093   librados::ObjectWriteOperation op;
1094   op.assert_exists();
1095   std::string lockCookie = RadosStriperImpl::getUUID();
1096   utime_t dur = utime_t();
1097   rados::cls::lock::lock(&op, RADOS_LOCK_NAME, LOCK_EXCLUSIVE, lockCookie, "", "", dur, 0);
1098   int rc = m_ioCtx.operate(firstObjOid, &op);
1099   if (rc) return rc;
1100   // load layout and size
1101   ceph_file_layout layout;
1102   uint64_t original_size;
1103   rc = internal_get_layout_and_size(firstObjOid, &layout, &original_size);
1104   if (!rc) {
1105     if (size < original_size) {
1106       rc = truncate(soid, original_size, size, layout);
1107     } else if (size > original_size) {
1108       rc = grow(soid, original_size, size, layout);
1109     }
1110   }
1111   // unlock object, ignore return code as we cannot do much
1112   m_ioCtx.unlock(firstObjOid, RADOS_LOCK_NAME, lockCookie);
1113   // final return
1114   return rc;
1115 }
1116
1117
1118 ///////////////////////// private helpers /////////////////////////////
1119
1120 std::string libradosstriper::RadosStriperImpl::getObjectId(const object_t& soid,
1121                                                            long long unsigned objectno)
1122 {
1123   std::ostringstream s;
1124   s << soid << '.' << std::setfill ('0') << std::setw(16) << std::hex << objectno;
1125   return s.str();
1126 }
1127
1128 void libradosstriper::RadosStriperImpl::unlockObject(const std::string& soid,
1129                                                      const std::string& lockCookie)
1130 {
1131   // unlock the shared lock on the first rados object
1132   std::string firstObjOid = getObjectId(soid, 0);
1133   m_ioCtx.unlock(firstObjOid, RADOS_LOCK_NAME, lockCookie);
1134 }
1135
1136 void libradosstriper::RadosStriperImpl::aio_unlockObject(const std::string& soid,
1137                                                          const std::string& lockCookie,
1138                                                          librados::AioCompletion *c)
1139 {
1140   // unlock the shared lock on the first rados object
1141   std::string firstObjOid = getObjectId(soid, 0);
1142   m_ioCtx.aio_unlock(firstObjOid, RADOS_LOCK_NAME, lockCookie, c);
1143 }
1144
1145 static void rados_write_aio_unlock_complete(rados_striper_multi_completion_t c, void *arg)
1146 {
1147   auto cdata = reinterpret_cast<WriteCompletionData*>(arg);
1148   libradosstriper::MultiAioCompletionImpl *comp =
1149     reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(c);
1150   cdata->complete_unlock(comp->rval);
1151   cdata->put();
1152 }
1153
1154 static void striper_write_aio_req_complete(rados_striper_multi_completion_t c, void *arg)
1155 {
1156   auto cdata = reinterpret_cast<WriteCompletionData*>(arg);
1157   // launch the async unlocking of the object
1158   cdata->m_striper->aio_unlockObject(cdata->m_soid, cdata->m_lockCookie, cdata->m_unlockCompletion);
1159   // complete the write part in parallel
1160   libradosstriper::MultiAioCompletionImpl *comp =
1161     reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(c);
1162   cdata->complete_write(comp->rval);
1163   cdata->put();
1164 }
1165
1166 static void striper_write_aio_req_safe(rados_striper_multi_completion_t c, void *arg)
1167 {
1168   auto cdata = reinterpret_cast<WriteCompletionData*>(arg);
1169   libradosstriper::MultiAioCompletionImpl *comp =
1170     reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(c);
1171   cdata->safe(comp->rval);
1172   cdata->put();
1173 }
1174
1175 int libradosstriper::RadosStriperImpl::write_in_open_object(const std::string& soid,
1176                                                             const ceph_file_layout& layout,
1177                                                             const std::string& lockCookie,
1178                                                             const bufferlist& bl,
1179                                                             size_t len,
1180                                                             uint64_t off) {
1181   // create a completion object to be passed to the callbacks of the multicompletion
1182   // we need 3 references as striper_write_aio_req_complete will release two and
1183   // striper_write_aio_req_safe will release one
1184   WriteCompletionData *cdata = new WriteCompletionData(this, soid, lockCookie, 0, 3);
1185   cdata->get(); // local ref
1186   // create a completion object for the unlocking of the striped object at the end of the write
1187   librados::AioCompletion *unlock_completion =
1188     librados::Rados::aio_create_completion(cdata, rados_write_aio_unlock_complete, 0);
1189   cdata->m_unlockCompletion = unlock_completion;
1190   // create the multicompletion that will handle the write completion
1191   MultiAioCompletionImplPtr c{new libradosstriper::MultiAioCompletionImpl,
1192                               false};
1193   c->set_complete_callback(cdata, striper_write_aio_req_complete);
1194   c->set_safe_callback(cdata, striper_write_aio_req_safe);
1195   // call the asynchronous API
1196   int rc = internal_aio_write(soid, c, bl, len, off, layout);
1197   if (!rc) {
1198     // wait for completion and safety of data
1199     c->wait_for_complete_and_cb();
1200     c->wait_for_safe_and_cb();
1201     // wait for the unlocking
1202     unlock_completion->wait_for_complete();
1203     // return result
1204     rc = c->get_return_value();
1205   }
1206   cdata->put();
1207   return rc;
1208 }
1209
1210 int libradosstriper::RadosStriperImpl::aio_write_in_open_object(const std::string& soid,
1211                                                                 librados::AioCompletionImpl *c,
1212                                                                 const ceph_file_layout& layout,
1213                                                                 const std::string& lockCookie,
1214                                                                 const bufferlist& bl,
1215                                                                 size_t len,
1216                                                                 uint64_t off) {
1217   // create a completion object to be passed to the callbacks of the multicompletion
1218   // we need 3 references as striper_write_aio_req_complete will release two and
1219   // striper_write_aio_req_safe will release one
1220   WriteCompletionData *cdata = new WriteCompletionData(this, soid, lockCookie, c, 3);
1221   cdata->get(); // local ref
1222   m_ioCtxImpl->get();
1223   c->io = m_ioCtxImpl;
1224   // create a completion object for the unlocking of the striped object at the end of the write
1225   librados::AioCompletion *unlock_completion =
1226     librados::Rados::aio_create_completion(cdata, rados_write_aio_unlock_complete, 0);
1227   cdata->m_unlockCompletion = unlock_completion;
1228   // create the multicompletion that will handle the write completion
1229   libradosstriper::MultiAioCompletionImplPtr nc{
1230     new libradosstriper::MultiAioCompletionImpl, false};
1231   nc->set_complete_callback(cdata, striper_write_aio_req_complete);
1232   nc->set_safe_callback(cdata, striper_write_aio_req_safe);
1233   // internal asynchronous API
1234   int rc = internal_aio_write(soid, nc, bl, len, off, layout);
1235   cdata->put();
1236   return rc;
1237 }
1238
1239 static void rados_req_write_safe(rados_completion_t c, void *arg)
1240 {
1241   libradosstriper::MultiAioCompletionImpl *comp =
1242     reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(arg);
1243   comp->safe_request(rados_aio_get_return_value(c));
1244 }
1245
1246 static void rados_req_write_complete(rados_completion_t c, void *arg)
1247 {
1248   libradosstriper::MultiAioCompletionImpl *comp =
1249     reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(arg);
1250   comp->complete_request(rados_aio_get_return_value(c));
1251 }
1252
1253 int
1254 libradosstriper::RadosStriperImpl::internal_aio_write(const std::string& soid,
1255                                                       libradosstriper::MultiAioCompletionImplPtr c,
1256                                                       const bufferlist& bl,
1257                                                       size_t len,
1258                                                       uint64_t off,
1259                                                       const ceph_file_layout& layout)
1260 {
1261   int r = 0;
1262   // Do not try anything if we are called with empty buffer,
1263   // file_to_extents would raise an exception
1264   if (len > 0) {
1265     // get list of extents to be written to
1266     vector<ObjectExtent> extents;
1267     std::string format = soid;
1268     boost::replace_all(format, "%", "%%");
1269     format += RADOS_OBJECT_EXTENSION_FORMAT;
1270     file_layout_t l;
1271     l.from_legacy(layout);
1272     Striper::file_to_extents(cct(), format.c_str(), &l, off, len, 0, extents);
1273     // go through the extents
1274     for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); ++p) {
1275       // assemble pieces of a given object into a single buffer list
1276       bufferlist oid_bl;
1277       for (vector<pair<uint64_t,uint64_t> >::iterator q = p->buffer_extents.begin();
1278            q != p->buffer_extents.end();
1279            ++q) {
1280         bufferlist buffer_bl;
1281         buffer_bl.substr_of(bl, q->first, q->second);
1282         oid_bl.append(buffer_bl);
1283       }
1284       // and write the object
1285       c->add_request();
1286       librados::AioCompletion *rados_completion =
1287         librados::Rados::aio_create_completion(c.get(),
1288                                                rados_req_write_complete,
1289                                                rados_req_write_safe);
1290       r = m_ioCtx.aio_write(p->oid.name, rados_completion, oid_bl,
1291                             p->length, p->offset);
1292       rados_completion->release();
1293       if (r < 0)
1294         break;
1295     }
1296   }
1297   c->finish_adding_requests();
1298   return r;
1299 }
1300
1301 int libradosstriper::RadosStriperImpl::extract_uint32_attr
1302 (std::map<std::string, bufferlist> &attrs,
1303  const std::string& key,
1304  ceph_le32 *value)
1305 {
1306   std::map<std::string, bufferlist>::iterator attrsIt = attrs.find(key);
1307   if (attrsIt != attrs.end()) {
1308     // this intermediate string allows to add a null terminator before calling strtol
1309     std::string strvalue(attrsIt->second.c_str(), attrsIt->second.length());
1310     std::string err;   
1311     *value = strict_strtol(strvalue.c_str(), 10, &err);
1312     if (!err.empty()) {
1313       lderr(cct()) << key << " : " << err << dendl;
1314       return -EINVAL;
1315     }
1316   } else {
1317     return -ENOENT;
1318   }
1319   return 0;
1320 }
1321
1322 int libradosstriper::RadosStriperImpl::extract_sizet_attr
1323 (std::map<std::string, bufferlist> &attrs,
1324  const std::string& key,
1325  size_t *value)
1326 {
1327   std::map<std::string, bufferlist>::iterator attrsIt = attrs.find(key);
1328   if (attrsIt != attrs.end()) {
1329     // this intermediate string allows to add a null terminator before calling strtol
1330     std::string strvalue(attrsIt->second.c_str(), attrsIt->second.length());
1331     std::string err;   
1332     *value = strict_strtoll(strvalue.c_str(), 10, &err);
1333     if (!err.empty()) {
1334       lderr(cct()) << key << " : " << err << dendl;
1335       return -EINVAL;
1336     }
1337   } else {
1338     return -ENOENT;
1339   }
1340   return 0;
1341 }
1342
1343 int libradosstriper::RadosStriperImpl::internal_get_layout_and_size(
1344   const std::string& oid,
1345   ceph_file_layout *layout,
1346   uint64_t *size)
1347 {
1348   // get external attributes of the first rados object
1349   std::map<std::string, bufferlist> attrs;
1350   int rc = m_ioCtx.getxattrs(oid, attrs);
1351   if (rc) return rc;
1352   // deal with stripe_unit
1353   rc = extract_uint32_attr(attrs, XATTR_LAYOUT_STRIPE_UNIT, &layout->fl_stripe_unit);
1354   if (rc) return rc;
1355   // deal with stripe_count
1356   rc = extract_uint32_attr(attrs, XATTR_LAYOUT_STRIPE_COUNT, &layout->fl_stripe_count);
1357   if (rc) return rc;
1358   // deal with object_size
1359   rc = extract_uint32_attr(attrs, XATTR_LAYOUT_OBJECT_SIZE, &layout->fl_object_size);
1360   if (rc) return rc;
1361   // deal with size
1362   size_t ssize;
1363   rc = extract_sizet_attr(attrs, XATTR_SIZE, &ssize);
1364   if (rc) {
1365     return rc;
1366   }
1367   *size = ssize;
1368   // make valgrind happy by setting unused fl_pg_pool
1369   layout->fl_pg_pool = 0;
1370   return 0;
1371 }
1372
1373 int libradosstriper::RadosStriperImpl::openStripedObjectForRead(
1374   const std::string& soid,
1375   ceph_file_layout *layout,
1376   uint64_t *size,
1377   std::string *lockCookie)
1378 {
1379   // take a lock the first rados object, if it exists and gets its size
1380   // check, lock and size reading must be atomic and are thus done within a single operation
1381   librados::ObjectWriteOperation op;
1382   op.assert_exists();
1383   *lockCookie = getUUID();
1384   utime_t dur = utime_t();
1385   rados::cls::lock::lock(&op, RADOS_LOCK_NAME, LOCK_SHARED, *lockCookie, "Tag", "", dur, 0);
1386   std::string firstObjOid = getObjectId(soid, 0);
1387   int rc = m_ioCtx.operate(firstObjOid, &op);
1388   if (rc) {
1389     // error case (including -ENOENT)
1390     return rc;
1391   }
1392   rc = internal_get_layout_and_size(firstObjOid, layout, size);
1393   if (rc) {
1394     unlockObject(soid, *lockCookie);
1395     lderr(cct()) << "RadosStriperImpl::openStripedObjectForRead : "
1396                  << "could not load layout and size for "
1397                  << soid << " : rc = " << rc << dendl;
1398   }
1399   return rc;
1400 }
1401
1402 int libradosstriper::RadosStriperImpl::openStripedObjectForWrite(const std::string& soid,
1403                                                                  ceph_file_layout *layout,
1404                                                                  uint64_t *size,
1405                                                                  std::string *lockCookie,
1406                                                                  bool isFileSizeAbsolute)
1407 {
1408   // take a lock the first rados object, if it exists
1409   // check and lock must be atomic and are thus done within a single operation
1410   librados::ObjectWriteOperation op;
1411   op.assert_exists();
1412   *lockCookie = getUUID();
1413   utime_t dur = utime_t();
1414   rados::cls::lock::lock(&op, RADOS_LOCK_NAME, LOCK_SHARED, *lockCookie, "Tag", "", dur, 0);
1415   std::string firstObjOid = getObjectId(soid, 0);
1416   int rc = m_ioCtx.operate(firstObjOid, &op);
1417   if (rc) {
1418     if (rc == -ENOENT) {
1419       // object does not exist, delegate to createEmptyStripedObject
1420       int rc = createAndOpenStripedObject(soid, layout, *size, lockCookie, isFileSizeAbsolute);
1421       // return original size
1422       *size = 0;
1423       return rc; 
1424     } else {
1425       return rc;
1426     }
1427   }
1428   // all fine
1429   uint64_t curSize;
1430   rc = internal_get_layout_and_size(firstObjOid, layout, &curSize);
1431   if (rc) {
1432     unlockObject(soid, *lockCookie);
1433     lderr(cct()) << "RadosStriperImpl::openStripedObjectForWrite : "
1434                    << "could not load layout and size for "
1435                    << soid << " : rc = " << rc << dendl;
1436     return rc;
1437   }
1438   // atomically update object size, only if smaller than current one
1439   if (!isFileSizeAbsolute)
1440     *size += curSize;
1441   librados::ObjectWriteOperation writeOp;
1442   writeOp.cmpxattr(XATTR_SIZE, LIBRADOS_CMPXATTR_OP_GT, *size);
1443   std::ostringstream oss;
1444   oss << *size;
1445   bufferlist bl;
1446   bl.append(oss.str());
1447   writeOp.setxattr(XATTR_SIZE, bl);
1448   rc = m_ioCtx.operate(firstObjOid, &writeOp);
1449   // return current size
1450   *size = curSize;
1451   // handle case where objectsize is already bigger than size
1452   if (-ECANCELED == rc) 
1453     rc = 0;
1454   if (rc) {
1455     unlockObject(soid, *lockCookie);
1456     lderr(cct()) << "RadosStriperImpl::openStripedObjectForWrite : "
1457                    << "could not set new size for "
1458                    << soid << " : rc = " << rc << dendl;
1459   }
1460   return rc;
1461 }
1462
1463 int libradosstriper::RadosStriperImpl::createAndOpenStripedObject(const std::string& soid,
1464                                                                   ceph_file_layout *layout,
1465                                                                   uint64_t size,
1466                                                                   std::string *lockCookie,
1467                                                                   bool isFileSizeAbsolute)
1468 {
1469   // build atomic write operation
1470   librados::ObjectWriteOperation writeOp;
1471   writeOp.create(true);
1472   // object_size
1473   std::ostringstream oss_object_size;
1474   oss_object_size << m_layout.fl_object_size;
1475   bufferlist bl_object_size;
1476   bl_object_size.append(oss_object_size.str());
1477   writeOp.setxattr(XATTR_LAYOUT_OBJECT_SIZE, bl_object_size);
1478   // stripe unit
1479   std::ostringstream oss_stripe_unit;
1480   oss_stripe_unit << m_layout.fl_stripe_unit;
1481   bufferlist bl_stripe_unit;
1482   bl_stripe_unit.append(oss_stripe_unit.str());
1483   writeOp.setxattr(XATTR_LAYOUT_STRIPE_UNIT, bl_stripe_unit);
1484   // stripe count
1485   std::ostringstream oss_stripe_count;
1486   oss_stripe_count << m_layout.fl_stripe_count;
1487   bufferlist bl_stripe_count;
1488   bl_stripe_count.append(oss_stripe_count.str());
1489   writeOp.setxattr(XATTR_LAYOUT_STRIPE_COUNT, bl_stripe_count);
1490   // size
1491   std::ostringstream oss_size;
1492   oss_size << (isFileSizeAbsolute?size:0);
1493   bufferlist bl_size;
1494   bl_size.append(oss_size.str());
1495   writeOp.setxattr(XATTR_SIZE, bl_size);
1496   // effectively change attributes
1497   std::string firstObjOid = getObjectId(soid, 0);
1498   int rc = m_ioCtx.operate(firstObjOid, &writeOp);
1499   // in case of error (but no EEXIST which would mean the object existed), return
1500   if (rc && -EEXIST != rc) return rc;
1501   // Otherwise open the object
1502   uint64_t fileSize = size;
1503   return openStripedObjectForWrite(soid, layout, &fileSize, lockCookie, isFileSizeAbsolute);
1504 }
1505
1506 static void striper_truncate_aio_req_complete(rados_striper_multi_completion_t c, void *arg)
1507 {
1508   auto cdata = reinterpret_cast<TruncateCompletionData*>(arg);
1509   libradosstriper::MultiAioCompletionImpl *comp =
1510     reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(c);
1511   if (0 == comp->rval) {
1512     // all went fine, change size in the external attributes
1513     std::ostringstream oss;
1514     oss << cdata->m_size;
1515     bufferlist bl;
1516     bl.append(oss.str());
1517     cdata->m_striper->setxattr(cdata->m_soid, XATTR_SIZE, bl);
1518   }
1519   cdata->put();
1520 }
1521
1522 int libradosstriper::RadosStriperImpl::truncate(const std::string& soid,
1523                                                 uint64_t original_size,
1524                                                 uint64_t size,
1525                                                 ceph_file_layout &layout) 
1526 {
1527   TruncateCompletionData *cdata = new TruncateCompletionData(this, soid, size);
1528   libradosstriper::MultiAioCompletionImplPtr multi_completion{
1529     new libradosstriper::MultiAioCompletionImpl, false};
1530   multi_completion->set_complete_callback(cdata, striper_truncate_aio_req_complete);
1531   // call asynchrous version of truncate
1532   int rc = aio_truncate(soid, multi_completion, original_size, size, layout);
1533   // wait for completion of the truncation
1534   multi_completion->finish_adding_requests();
1535   multi_completion->wait_for_complete_and_cb();
1536   // return result
1537   if (rc == 0) {
1538     rc = multi_completion->get_return_value();
1539   }
1540   return rc;
1541 }
1542
1543 int libradosstriper::RadosStriperImpl::aio_truncate
1544 (const std::string& soid,
1545  libradosstriper::MultiAioCompletionImplPtr multi_completion,
1546  uint64_t original_size,
1547  uint64_t size,
1548  ceph_file_layout &layout)
1549 {
1550   // handle the underlying rados objects. 3 cases here :
1551   //  -- the objects belonging to object sets entirely located
1552   //     before the truncation are unchanged
1553   //  -- the objects belonging to the object set where the
1554   //     truncation took place are truncated or removed
1555   //  -- the objects belonging to object sets entirely located
1556   //     after the truncation are removed
1557   // Note that we do it backward and that we change the size in
1558   // the external attributes only at the end. This make sure that
1559   // no rados object stays behind if we remove the striped object
1560   // after a truncation has failed
1561   uint64_t trunc_objectsetno = size / layout.fl_object_size / layout.fl_stripe_count;
1562   uint64_t last_objectsetno = original_size / layout.fl_object_size / layout.fl_stripe_count;
1563   bool exists = false;
1564   for (int64_t objectno = (last_objectsetno+1) * layout.fl_stripe_count-1;
1565        objectno >= (int64_t)((trunc_objectsetno + 1) * layout.fl_stripe_count);
1566        objectno--) {
1567     // if no object existed so far, check object existence
1568     if (!exists) {
1569       uint64_t nb_full_object_set = objectno / layout.fl_stripe_count;
1570       uint64_t object_index_in_set = objectno % layout.fl_stripe_count;
1571       uint64_t set_start_off = nb_full_object_set * layout.fl_object_size * layout.fl_stripe_count;
1572       uint64_t object_start_off = set_start_off + object_index_in_set * layout.fl_stripe_unit;
1573       exists = (original_size > object_start_off);
1574     }
1575     if (exists) {
1576       // remove asynchronously
1577       multi_completion->add_request();
1578       RadosRemoveCompletionData *data =
1579         new RadosRemoveCompletionData(multi_completion, cct());
1580       librados::AioCompletion *rados_completion =
1581         librados::Rados::aio_create_completion(data,
1582                                                rados_req_remove_complete,
1583                                                rados_req_remove_safe);
1584       int rc = m_ioCtx.aio_remove(getObjectId(soid, objectno), rados_completion);
1585       rados_completion->release();
1586       // in case the object did not exist, it means we had a sparse file, all is fine
1587       if (rc && rc != -ENOENT) return rc;
1588     }
1589   }
1590   for (int64_t objectno = ((trunc_objectsetno + 1) * layout.fl_stripe_count) -1;
1591        objectno >= (int64_t)(trunc_objectsetno * layout.fl_stripe_count);
1592        objectno--) {
1593     // if no object existed so far, check object existence
1594     if (!exists) {
1595       uint64_t object_start_off = ((objectno / layout.fl_stripe_count) * layout.fl_object_size) +
1596         ((objectno % layout.fl_stripe_count) * layout.fl_stripe_unit);
1597       exists = (original_size > object_start_off);
1598     }
1599     if (exists) {
1600       // truncate
1601       file_layout_t l;
1602       l.from_legacy(layout);
1603       uint64_t new_object_size = Striper::object_truncate_size(cct(), &l, objectno, size);
1604       int rc;
1605       if (new_object_size > 0 or 0 == objectno) {
1606         // trunc is synchronous as there is no async version
1607         // but note that only a single object will be truncated
1608         // reducing the overload to a fixed amount
1609         rc = m_ioCtx.trunc(getObjectId(soid, objectno), new_object_size);
1610       } else {
1611         // removes are asynchronous in order to speed up truncations of big files
1612         multi_completion->add_request();
1613         RadosRemoveCompletionData *data =
1614           new RadosRemoveCompletionData(multi_completion, cct());
1615         librados::AioCompletion *rados_completion =
1616           librados::Rados::aio_create_completion(data,
1617                                                  rados_req_remove_complete,
1618                                                  rados_req_remove_safe);
1619         rc = m_ioCtx.aio_remove(getObjectId(soid, objectno), rados_completion);
1620         rados_completion->release();
1621       }
1622       // in case the object did not exist, it means we had a sparse file, all is fine
1623       if (rc && rc != -ENOENT) return rc;
1624     }
1625   }
1626   return 0;
1627 }  
1628
1629 int libradosstriper::RadosStriperImpl::grow(const std::string& soid,
1630                                             uint64_t original_size,
1631                                             uint64_t size,
1632                                             ceph_file_layout &layout) 
1633 {
1634   // handle the underlying rados objects. As we support sparse objects,
1635   // we only have to change the size in the external attributes
1636   std::ostringstream oss;
1637   oss << size;
1638   bufferlist bl;
1639   bl.append(oss.str());
1640   int rc = m_ioCtx.setxattr(getObjectId(soid, 0), XATTR_SIZE, bl);
1641   return rc;
1642 }  
1643
1644 std::string libradosstriper::RadosStriperImpl::getUUID()
1645 {
1646   struct uuid_d uuid;
1647   uuid.generate_random();
1648   char suuid[37];
1649   uuid.print(suuid);
1650   return std::string(suuid);
1651 }