Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_file.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef RGW_FILE_H
5 #define RGW_FILE_H
6
7 #include "include/rados/rgw_file.h"
8
9 /* internal header */
10 #include <string.h>
11 #include <sys/stat.h>
12 #include <stdint.h>
13
14 #include <atomic>
15 #include <chrono>
16 #include <thread>
17 #include <mutex>
18 #include <vector>
19 #include <deque>
20 #include <algorithm>
21 #include <functional>
22 #include <boost/intrusive_ptr.hpp>
23 #include <boost/range/adaptor/reversed.hpp>
24 #include <boost/container/flat_map.hpp>
25 #include <boost/variant.hpp>
26 #include <boost/utility/string_ref.hpp>
27 #include <boost/optional.hpp>
28 #include "xxhash.h"
29 #include "include/buffer.h"
30 #include "common/cohort_lru.h"
31 #include "common/ceph_timer.h"
32 #include "rgw_common.h"
33 #include "rgw_user.h"
34 #include "rgw_lib.h"
35 #include "rgw_ldap.h"
36 #include "rgw_token.h"
37 #include "rgw_compression.h"
38
39
40 /* XXX
41  * ASSERT_H somehow not defined after all the above (which bring
42  * in common/debug.h [e.g., dout])
43  */
44 #include "include/assert.h"
45
46
47 #define RGW_RWXMODE  (S_IRWXU | S_IRWXG | S_IRWXO)
48
49 #define RGW_RWMODE (RGW_RWXMODE &                       \
50                       ~(S_IXUSR | S_IXGRP | S_IXOTH))
51
52
53 namespace rgw {
54
55   template <typename T>
56   static inline void ignore(T &&) {}
57
58
59   namespace bi = boost::intrusive;
60
61   class RGWLibFS;
62   class RGWFileHandle;
63   class RGWWriteRequest;
64
65   static inline bool operator <(const struct timespec& lhs,
66                                 const struct timespec& rhs) {
67     if (lhs.tv_sec == rhs.tv_sec)
68       return lhs.tv_nsec < rhs.tv_nsec;
69     else
70       return lhs.tv_sec < rhs.tv_sec;
71   }
72
73   static inline bool operator ==(const struct timespec& lhs,
74                                  const struct timespec& rhs) {
75     return ((lhs.tv_sec == rhs.tv_sec) &&
76             (lhs.tv_nsec == rhs.tv_nsec));
77   }
78
79   /*
80    * XXX
81    * The current 64-bit, non-cryptographic hash used here is intended
82    * for prototyping only.
83    *
84    * However, the invariant being prototyped is that objects be
85    * identifiable by their hash components alone.  We believe this can
86    * be legitimately implemented using 128-hash values for bucket and
87    * object components, together with a cluster-resident cryptographic
88    * key.  Since an MD5 or SHA-1 key is 128 bits and the (fast),
89    * non-cryptographic CityHash128 hash algorithm takes a 128-bit seed,
90    * speculatively we could use that for the final hash computations.
91    */
92   struct fh_key
93   {
94     rgw_fh_hk fh_hk;
95     uint32_t version;
96
97     static constexpr uint64_t seed = 8675309;
98
99     fh_key() : version(0) {}
100
101     fh_key(const rgw_fh_hk& _hk)
102       : fh_hk(_hk), version(0) {
103       // nothing
104     }
105
106     fh_key(const uint64_t bk, const uint64_t ok)
107       : version(0) {
108       fh_hk.bucket = bk;
109       fh_hk.object = ok;
110     }
111
112     fh_key(const uint64_t bk, const char *_o)
113       : version(0) {
114       fh_hk.bucket = bk;
115       fh_hk.object = XXH64(_o, ::strlen(_o), seed);
116     }
117     
118     fh_key(const std::string& _b, const std::string& _o)
119       : version(0) {
120       fh_hk.bucket = XXH64(_b.c_str(), _o.length(), seed);
121       fh_hk.object = XXH64(_o.c_str(), _o.length(), seed);
122     }
123
124     void encode(buffer::list& bl) const {
125       ENCODE_START(2, 1, bl);
126       ::encode(fh_hk.bucket, bl);
127       ::encode(fh_hk.object, bl);
128       ::encode((uint32_t)2, bl);
129       ENCODE_FINISH(bl);
130     }
131
132     void decode(bufferlist::iterator& bl) {
133       DECODE_START(2, bl);
134       ::decode(fh_hk.bucket, bl);
135       ::decode(fh_hk.object, bl);
136       if (struct_v >= 2) {
137         ::decode(version, bl);
138       }
139       DECODE_FINISH(bl);
140     }
141   }; /* fh_key */
142
143   WRITE_CLASS_ENCODER(fh_key);
144
145   inline bool operator<(const fh_key& lhs, const fh_key& rhs)
146   {
147     return ((lhs.fh_hk.bucket < rhs.fh_hk.bucket) ||
148             ((lhs.fh_hk.bucket == rhs.fh_hk.bucket) &&
149               (lhs.fh_hk.object < rhs.fh_hk.object)));
150   }
151
152   inline bool operator>(const fh_key& lhs, const fh_key& rhs)
153   {
154     return (rhs < lhs);
155   }
156
157   inline bool operator==(const fh_key& lhs, const fh_key& rhs)
158   {
159     return ((lhs.fh_hk.bucket == rhs.fh_hk.bucket) &&
160             (lhs.fh_hk.object == rhs.fh_hk.object));
161   }
162
163   inline bool operator!=(const fh_key& lhs, const fh_key& rhs)
164   {
165     return !(lhs == rhs);
166   }
167
168   inline bool operator<=(const fh_key& lhs, const fh_key& rhs)
169   {
170     return (lhs < rhs) || (lhs == rhs);
171   }
172
173   using boost::variant;
174   using boost::container::flat_map;
175
176   typedef std::tuple<bool, bool> DecodeAttrsResult;
177
178   class RGWFileHandle : public cohort::lru::Object
179   {
180     struct rgw_file_handle fh;
181     std::mutex mtx;
182
183     RGWLibFS* fs;
184     RGWFileHandle* bucket;
185     RGWFileHandle* parent;
186     /* const */ std::string name; /* XXX file or bucket name */
187     /* const */ fh_key fhk;
188
189     using lock_guard = std::lock_guard<std::mutex>;
190     using unique_lock = std::unique_lock<std::mutex>;
191
192     /* TODO: keeping just the last marker is sufficient for
193      * nfs-ganesha 2.4.5; in the near future, nfs-ganesha will
194      * be able to hint the name of the next dirent required,
195      * from which we can directly synthesize a RADOS marker.
196      * using marker_cache_t = flat_map<uint64_t, rgw_obj_key>;
197      */
198
199     struct State {
200       uint64_t dev;
201       uint64_t size;
202       uint64_t nlink;
203       uint32_t owner_uid; /* XXX need Unix attr */
204       uint32_t owner_gid; /* XXX need Unix attr */
205       mode_t unix_mode;
206       struct timespec ctime;
207       struct timespec mtime;
208       struct timespec atime;
209       uint32_t version;
210       State() : dev(0), size(0), nlink(1), owner_uid(0), owner_gid(0),
211                 ctime{0,0}, mtime{0,0}, atime{0,0}, version(0) {}
212     } state;
213
214     struct file {
215       RGWWriteRequest* write_req;
216       file() : write_req(nullptr) {}
217       ~file();
218     };
219
220     struct directory {
221
222       static constexpr uint32_t FLAG_NONE =     0x0000;
223
224       uint32_t flags;
225       rgw_obj_key last_marker;
226       struct timespec last_readdir;
227
228       directory() : flags(FLAG_NONE), last_readdir{0,0} {}
229     };
230
231     void clear_state();
232
233     boost::variant<file, directory> variant_type;
234
235     uint16_t depth;
236     uint32_t flags;
237
238   public:
239     const static std::string root_name;
240
241     static constexpr uint16_t MAX_DEPTH = 256;
242
243     static constexpr uint32_t FLAG_NONE =    0x0000;
244     static constexpr uint32_t FLAG_OPEN =    0x0001;
245     static constexpr uint32_t FLAG_ROOT =    0x0002;
246     static constexpr uint32_t FLAG_CREATE =  0x0004;
247     static constexpr uint32_t FLAG_CREATING =  0x0008;
248     static constexpr uint32_t FLAG_DIRECTORY = 0x0010;
249     static constexpr uint32_t FLAG_BUCKET = 0x0020;
250     static constexpr uint32_t FLAG_LOCK =   0x0040;
251     static constexpr uint32_t FLAG_DELETED = 0x0080;
252     static constexpr uint32_t FLAG_UNLINK_THIS = 0x0100;
253     static constexpr uint32_t FLAG_LOCKED = 0x0200;
254     static constexpr uint32_t FLAG_STATELESS_OPEN = 0x0400;
255     static constexpr uint32_t FLAG_EXACT_MATCH = 0x0800;
256     static constexpr uint32_t FLAG_MOUNT = 0x1000;
257
258 #define CREATE_FLAGS(x) \
259     ((x) & ~(RGWFileHandle::FLAG_CREATE|RGWFileHandle::FLAG_LOCK))
260
261     friend class RGWLibFS;
262
263   private:
264     RGWFileHandle(RGWLibFS* _fs)
265       : fs(_fs), bucket(nullptr), parent(nullptr), variant_type{directory()},
266         depth(0), flags(FLAG_NONE)
267       {
268         /* root */
269         fh.fh_type = RGW_FS_TYPE_DIRECTORY;
270         variant_type = directory();
271         /* stat */
272         state.unix_mode = RGW_RWXMODE|S_IFDIR;
273         /* pointer to self */
274         fh.fh_private = this;
275       }
276
277     uint64_t init_fsid(std::string& uid) {
278       return XXH64(uid.c_str(), uid.length(), fh_key::seed);
279     }
280
281     void init_rootfs(std::string& fsid, const std::string& object_name,
282                      bool is_bucket) {
283       /* fh_key */
284       fh.fh_hk.bucket = XXH64(fsid.c_str(), fsid.length(), fh_key::seed);
285       fh.fh_hk.object = XXH64(object_name.c_str(), object_name.length(),
286                               fh_key::seed);
287       fhk = fh.fh_hk;
288       name = object_name;
289
290       state.dev = init_fsid(fsid);
291
292       if (is_bucket) {
293         flags |= RGWFileHandle::FLAG_BUCKET | RGWFileHandle::FLAG_MOUNT;
294         bucket = this;
295         depth = 1;
296       } else {
297         flags |= RGWFileHandle::FLAG_ROOT | RGWFileHandle::FLAG_MOUNT;
298       }
299     }
300
301   public:
302     RGWFileHandle(RGWLibFS* _fs, RGWFileHandle* _parent,
303                   const fh_key& _fhk, std::string& _name, uint32_t _flags)
304       : fs(_fs), bucket(nullptr), parent(_parent), name(std::move(_name)),
305         fhk(_fhk), flags(_flags) {
306
307       if (parent->is_root()) {
308         fh.fh_type = RGW_FS_TYPE_DIRECTORY;
309         variant_type = directory();
310         flags |= FLAG_BUCKET;
311       } else {
312         bucket = parent->is_bucket() ? parent
313           : parent->bucket;
314         if (flags & FLAG_DIRECTORY) {
315           fh.fh_type = RGW_FS_TYPE_DIRECTORY;
316           variant_type = directory();
317         } else {
318           fh.fh_type = RGW_FS_TYPE_FILE;
319           variant_type = file();
320         }
321       }
322
323       depth = parent->depth + 1;
324
325       /* save constant fhk */
326       fh.fh_hk = fhk.fh_hk; /* XXX redundant in fh_hk */
327
328       /* inherits parent's fsid */
329       state.dev = parent->state.dev;
330
331       switch (fh.fh_type) {
332       case RGW_FS_TYPE_DIRECTORY:
333         state.unix_mode = RGW_RWXMODE|S_IFDIR;
334         break;
335       case RGW_FS_TYPE_FILE:
336         state.unix_mode = RGW_RWMODE|S_IFREG;
337       default:
338         break;
339       }
340
341       /* pointer to self */
342       fh.fh_private = this;
343     }
344
345     const fh_key& get_key() const {
346       return fhk;
347     }
348
349     directory* get_directory() {
350       return get<directory>(&variant_type);
351     }
352
353     size_t get_size() const { return state.size; }
354
355     const char* stype() {
356       return is_dir() ? "DIR" : "FILE";
357     }
358
359     uint16_t get_depth() const { return depth; }
360
361     struct rgw_file_handle* get_fh() { return &fh; }
362
363     RGWLibFS* get_fs() { return fs; }
364
365     RGWFileHandle* get_parent() { return parent; }
366
367     uint32_t get_owner_uid() const { return state.owner_uid; }
368     uint32_t get_owner_gid() const { return state.owner_gid; }
369
370     struct timespec get_ctime() const { return state.ctime; }
371     struct timespec get_mtime() const { return state.mtime; }
372
373     void create_stat(struct stat* st, uint32_t mask) {
374       if (mask & RGW_SETATTR_UID)
375         state.owner_uid = st->st_uid;
376
377       if (mask & RGW_SETATTR_GID)
378         state.owner_gid = st->st_gid;
379
380       if (mask & RGW_SETATTR_MODE)  {
381         switch (fh.fh_type) {
382         case RGW_FS_TYPE_DIRECTORY:
383           state.unix_mode = st->st_mode|S_IFDIR;
384           break;
385         case RGW_FS_TYPE_FILE:
386           state.unix_mode = st->st_mode|S_IFREG;
387       default:
388         break;
389         }
390       }
391
392       if (mask & RGW_SETATTR_ATIME)
393         state.atime = st->st_atim;
394       if (mask & RGW_SETATTR_MTIME)
395         state.mtime = st->st_mtim;
396       if (mask & RGW_SETATTR_CTIME)
397         state.ctime = st->st_ctim;
398     }
399
400     int stat(struct stat* st) {
401       /* partial Unix attrs */
402       memset(st, 0, sizeof(struct stat));
403       st->st_dev = state.dev;
404       st->st_ino = fh.fh_hk.object; // XXX
405
406       st->st_uid = state.owner_uid;
407       st->st_gid = state.owner_gid;
408
409       st->st_mode = state.unix_mode;
410
411 #ifdef HAVE_STAT_ST_MTIMESPEC_TV_NSEC
412       st->st_atimespec = state.atime;
413       st->st_mtimespec = state.mtime;
414       st->st_ctimespec = state.ctime;
415 #else
416       st->st_atim = state.atime;
417       st->st_mtim = state.mtime;
418       st->st_ctim = state.ctime;
419 #endif
420
421       switch (fh.fh_type) {
422       case RGW_FS_TYPE_DIRECTORY:
423         st->st_nlink = state.nlink;
424         break;
425       case RGW_FS_TYPE_FILE:
426         st->st_nlink = 1;
427         st->st_blksize = 4096;
428         st->st_size = state.size;
429         st->st_blocks = (state.size) / 512;
430       default:
431         break;
432       }
433
434       return 0;
435     }
436
437     const std::string& bucket_name() const {
438       if (is_root())
439         return root_name;
440       if (is_bucket())
441         return name;
442       return bucket->object_name();
443     }
444
445     const std::string& object_name() const { return name; }
446
447     std::string full_object_name(bool omit_bucket = false) const {
448       std::string path;
449       std::vector<const std::string*> segments;
450       int reserve = 0;
451       const RGWFileHandle* tfh = this;
452       while (tfh && !tfh->is_root() && !(tfh->is_bucket() && omit_bucket)) {
453         segments.push_back(&tfh->object_name());
454         reserve += (1 + tfh->object_name().length());
455         tfh = tfh->parent;
456       }
457       bool first = true;
458       path.reserve(reserve);
459       for (auto& s : boost::adaptors::reverse(segments)) {
460         if (! first)
461           path += "/";
462         else {
463           if (!omit_bucket && (path.front() != '/')) // pretty-print
464             path += "/";
465           first = false;
466         }
467         path += *s;
468       }
469       return path;
470     }
471
472     inline std::string relative_object_name() const {
473       return full_object_name(true /* omit_bucket */);
474     }
475
476     inline std::string format_child_name(const std::string& cbasename,
477                                          bool is_dir) const {
478       std::string child_name{relative_object_name()};
479       if ((child_name.size() > 0) &&
480           (child_name.back() != '/'))
481         child_name += "/";
482       child_name += cbasename;
483       if (is_dir)
484         child_name += "/";
485       return child_name;
486     }
487
488     inline std::string make_key_name(const char *name) const {
489       std::string key_name{full_object_name()};
490       if (key_name.length() > 0)
491         key_name += "/";
492       key_name += name;
493       return key_name;
494     }
495
496     fh_key make_fhk(const std::string& name) const {
497       if (depth <= 1)
498         return fh_key(fhk.fh_hk.object, name.c_str());
499       else {
500         std::string key_name = make_key_name(name.c_str());
501         return fh_key(fhk.fh_hk.bucket, key_name.c_str());
502       }
503     }
504
505     void add_marker(uint64_t off, const rgw_obj_key& marker,
506                     uint8_t obj_type) {
507       using std::get;
508       directory* d = get<directory>(&variant_type);
509       if (d) {
510         unique_lock guard(mtx);
511         d->last_marker = marker;
512       }
513     }
514
515     const rgw_obj_key* find_marker(uint64_t off) const {
516       using std::get;
517       if (off > 0) {
518         const directory* d = get<directory>(&variant_type);
519         if (d ) {
520           return &d->last_marker;
521         }
522       }
523       return nullptr;
524     }
525
526     int offset_of(const std::string& name, int64_t *offset, uint32_t flags) {
527       if (unlikely(! is_dir())) {
528         return -EINVAL;
529       }
530       *offset = XXH64(name.c_str(), name.length(), fh_key::seed);
531       return 0;
532     }
533
534     bool is_open() const { return flags & FLAG_OPEN; }
535     bool is_root() const { return flags & FLAG_ROOT; }
536     bool is_mount() const { return flags & FLAG_MOUNT; }
537     bool is_bucket() const { return flags & FLAG_BUCKET; }
538     bool is_object() const { return !is_bucket(); }
539     bool is_file() const { return (fh.fh_type == RGW_FS_TYPE_FILE); }
540     bool is_dir() const { return (fh.fh_type == RGW_FS_TYPE_DIRECTORY); }
541     bool creating() const { return flags & FLAG_CREATING; }
542     bool deleted() const { return flags & FLAG_DELETED; }
543     bool stateless_open() const { return flags & FLAG_STATELESS_OPEN; }
544     bool has_children() const;
545
546     int open(uint32_t gsh_flags) {
547       lock_guard guard(mtx);
548       if (! is_open()) {
549         if (gsh_flags & RGW_OPEN_FLAG_V3) {
550           flags |= FLAG_STATELESS_OPEN;
551         }
552         flags |= FLAG_OPEN;
553         return 0;
554       }
555       return -EPERM;
556     }
557
558     typedef boost::variant<uint64_t*, const char*> readdir_offset;
559
560     int readdir(rgw_readdir_cb rcb, void *cb_arg, readdir_offset offset,
561                 bool *eof, uint32_t flags);
562
563     int write(uint64_t off, size_t len, size_t *nbytes, void *buffer);
564
565     int commit(uint64_t offset, uint64_t length, uint32_t flags) {
566       /* NFS3 and NFSv4 COMMIT implementation
567        * the current atomic update strategy doesn't actually permit
568        * clients to read-stable until either CLOSE (NFSv4+) or the
569        * expiration of the active write timer (NFS3).  In the
570        * interim, the client may send an arbitrary number of COMMIT
571        * operations which must return a success result */
572       return 0;
573     }
574
575     int write_finish(uint32_t flags = FLAG_NONE);
576     int close();
577
578     void open_for_create() {
579       lock_guard guard(mtx);
580       flags |= FLAG_CREATING;
581     }
582
583     void clear_creating() {
584       lock_guard guard(mtx);
585       flags &= ~FLAG_CREATING;
586     }
587
588     void inc_nlink(const uint64_t n) {
589       state.nlink += n;
590     }
591
592     void set_nlink(const uint64_t n) {
593       state.nlink = n;
594     }
595
596     void set_size(const size_t size) {
597       state.size = size;
598     }
599
600     void set_times(real_time t) {
601       state.ctime = real_clock::to_timespec(t);
602       state.mtime = state.ctime;
603       state.atime = state.ctime;
604     }
605
606     void set_ctime(const struct timespec &ts) {
607       state.ctime = ts;
608     }
609
610     void set_mtime(const struct timespec &ts) {
611       state.mtime = ts;
612     }
613
614     void set_atime(const struct timespec &ts) {
615       state.atime = ts;
616     }
617
618     void encode(buffer::list& bl) const {
619       ENCODE_START(2, 1, bl);
620       ::encode(uint32_t(fh.fh_type), bl);
621       ::encode(state.dev, bl);
622       ::encode(state.size, bl);
623       ::encode(state.nlink, bl);
624       ::encode(state.owner_uid, bl);
625       ::encode(state.owner_gid, bl);
626       ::encode(state.unix_mode, bl);
627       for (const auto& t : { state.ctime, state.mtime, state.atime }) {
628         ::encode(real_clock::from_timespec(t), bl);
629       }
630       ::encode((uint32_t)2, bl);
631       ENCODE_FINISH(bl);
632     }
633
634     void decode(bufferlist::iterator& bl) {
635       DECODE_START(2, bl);
636       uint32_t fh_type;
637       ::decode(fh_type, bl);
638       assert(fh.fh_type == fh_type);
639       ::decode(state.dev, bl);
640       ::decode(state.size, bl);
641       ::decode(state.nlink, bl);
642       ::decode(state.owner_uid, bl);
643       ::decode(state.owner_gid, bl);
644       ::decode(state.unix_mode, bl);
645       ceph::real_time enc_time;
646       for (auto t : { &(state.ctime), &(state.mtime), &(state.atime) }) {
647         ::decode(enc_time, bl);
648         *t = real_clock::to_timespec(enc_time);
649       }
650       if (struct_v >= 2) {
651         ::decode(state.version, bl);
652       }
653       DECODE_FINISH(bl);
654     }
655
656     void encode_attrs(ceph::buffer::list& ux_key1,
657                       ceph::buffer::list& ux_attrs1);
658
659     DecodeAttrsResult decode_attrs(const ceph::buffer::list* ux_key1,
660                                    const ceph::buffer::list* ux_attrs1);
661
662     void invalidate();
663
664     bool reclaim() override;
665
666     typedef cohort::lru::LRU<std::mutex> FhLRU;
667
668     struct FhLT
669     {
670       // for internal ordering
671       bool operator()(const RGWFileHandle& lhs, const RGWFileHandle& rhs) const
672         { return (lhs.get_key() < rhs.get_key()); }
673
674       // for external search by fh_key
675       bool operator()(const fh_key& k, const RGWFileHandle& fh) const
676         { return k < fh.get_key(); }
677
678       bool operator()(const RGWFileHandle& fh, const fh_key& k) const
679         { return fh.get_key() < k; }
680     };
681
682     struct FhEQ
683     {
684       bool operator()(const RGWFileHandle& lhs, const RGWFileHandle& rhs) const
685         { return (lhs.get_key() == rhs.get_key()); }
686
687       bool operator()(const fh_key& k, const RGWFileHandle& fh) const
688         { return k == fh.get_key(); }
689
690       bool operator()(const RGWFileHandle& fh, const fh_key& k) const
691         { return fh.get_key() == k; }
692     };
693
694     typedef bi::link_mode<bi::safe_link> link_mode; /* XXX normal */
695 #if defined(FHCACHE_AVL)
696     typedef bi::avl_set_member_hook<link_mode> tree_hook_type;
697 #else
698     /* RBT */
699     typedef bi::set_member_hook<link_mode> tree_hook_type;
700 #endif
701     tree_hook_type fh_hook;
702
703     typedef bi::member_hook<
704       RGWFileHandle, tree_hook_type, &RGWFileHandle::fh_hook> FhHook;
705
706 #if defined(FHCACHE_AVL)
707     typedef bi::avltree<RGWFileHandle, bi::compare<FhLT>, FhHook> FHTree;
708 #else
709     typedef bi::rbtree<RGWFileHandle, bi::compare<FhLT>, FhHook> FhTree;
710 #endif
711     typedef cohort::lru::TreeX<RGWFileHandle, FhTree, FhLT, FhEQ, fh_key,
712                                std::mutex> FHCache;
713
714     ~RGWFileHandle() override;
715
716     friend std::ostream& operator<<(std::ostream &os,
717                                     RGWFileHandle const &rgw_fh);
718
719     class Factory : public cohort::lru::ObjectFactory
720     {
721     public:
722       RGWLibFS* fs;
723       RGWFileHandle* parent;
724       const fh_key& fhk;
725       std::string& name;
726       uint32_t flags;
727
728       Factory() = delete;
729
730       Factory(RGWLibFS* _fs, RGWFileHandle* _parent,
731               const fh_key& _fhk, std::string& _name, uint32_t _flags)
732         : fs(_fs), parent(_parent), fhk(_fhk), name(_name),
733           flags(_flags) {}
734
735       void recycle (cohort::lru::Object* o) override {
736         /* re-use an existing object */
737         o->~Object(); // call lru::Object virtual dtor
738         // placement new!
739         new (o) RGWFileHandle(fs, parent, fhk, name, flags);
740       }
741
742       cohort::lru::Object* alloc() override {
743         return new RGWFileHandle(fs, parent, fhk, name, flags);
744       }
745     }; /* Factory */
746
747   }; /* RGWFileHandle */
748
749   WRITE_CLASS_ENCODER(RGWFileHandle);
750
751   static inline RGWFileHandle* get_rgwfh(struct rgw_file_handle* fh) {
752     return static_cast<RGWFileHandle*>(fh->fh_private);
753   }
754
755   static inline enum rgw_fh_type fh_type_of(uint32_t flags) {
756     enum rgw_fh_type fh_type;
757     switch(flags & RGW_LOOKUP_TYPE_FLAGS)
758     {
759     case RGW_LOOKUP_FLAG_DIR:
760       fh_type = RGW_FS_TYPE_DIRECTORY;
761       break;
762     case RGW_LOOKUP_FLAG_FILE:
763       fh_type = RGW_FS_TYPE_FILE;
764       break;
765     default:
766       fh_type = RGW_FS_TYPE_NIL;
767     };
768     return fh_type;
769   }
770
771   typedef std::tuple<RGWFileHandle*, uint32_t> LookupFHResult;
772   typedef std::tuple<RGWFileHandle*, int> MkObjResult;
773
774   class RGWLibFS
775   {
776     CephContext* cct;
777     struct rgw_fs fs{};
778     RGWFileHandle root_fh;
779     rgw_fh_callback_t invalidate_cb;
780     void *invalidate_arg;
781     bool shutdown;
782
783     mutable std::atomic<uint64_t> refcnt;
784
785     RGWFileHandle::FHCache fh_cache;
786     RGWFileHandle::FhLRU fh_lru;
787     
788     std::string uid; // should match user.user_id, iiuc
789
790     RGWUserInfo user;
791     RGWAccessKey key; // XXXX acc_key
792
793     static std::atomic<uint32_t> fs_inst_counter;
794
795     static uint32_t write_completion_interval_s;
796
797     using lock_guard = std::lock_guard<std::mutex>;
798     using unique_lock = std::unique_lock<std::mutex>;
799
800     struct event
801     {
802       enum class type : uint8_t { READDIR } ;
803       type t;
804       const fh_key fhk;
805       struct timespec ts;
806       event(type t, const fh_key& k, const struct timespec& ts)
807         : t(t), fhk(k), ts(ts) {}
808     };
809
810     friend std::ostream& operator<<(std::ostream &os,
811                                     RGWLibFS::event const &ev);
812
813     using event_vector = /* boost::small_vector<event, 16> */
814       std::vector<event>;
815
816     struct WriteCompletion
817     {
818       RGWFileHandle& rgw_fh;
819
820       WriteCompletion(RGWFileHandle& _fh) : rgw_fh(_fh) {
821         rgw_fh.get_fs()->ref(&rgw_fh);
822       }
823
824       void operator()() {
825         rgw_fh.close(); /* will finish in-progress write */
826         rgw_fh.get_fs()->unref(&rgw_fh);
827       }
828     };
829
830     static ceph::timer<ceph::mono_clock> write_timer;
831
832     struct State {
833       std::mutex mtx;
834       std::atomic<uint32_t> flags;
835       std::deque<event> events;
836
837       State() : flags(0) {}
838
839       void push_event(const event& ev) {
840         events.push_back(ev);
841       }
842     } state;
843
844     uint32_t new_inst() {
845       return ++fs_inst_counter;
846     }
847
848     friend class RGWFileHandle;
849     friend class RGWLibProcess;
850
851   public:
852
853     static constexpr uint32_t FLAG_NONE =      0x0000;
854     static constexpr uint32_t FLAG_CLOSED =    0x0001;
855
856     struct BucketStats {
857       size_t size;
858       size_t size_rounded;
859       real_time creation_time;
860       uint64_t num_entries;
861     };
862
863     RGWLibFS(CephContext* _cct, const char *_uid, const char *_user_id,
864             const char* _key, const char *root)
865       : cct(_cct), root_fh(this), invalidate_cb(nullptr),
866         invalidate_arg(nullptr), shutdown(false), refcnt(1),
867         fh_cache(cct->_conf->rgw_nfs_fhcache_partitions,
868                  cct->_conf->rgw_nfs_fhcache_size),
869         fh_lru(cct->_conf->rgw_nfs_lru_lanes,
870                cct->_conf->rgw_nfs_lru_lane_hiwat),
871         uid(_uid), key(_user_id, _key) {
872
873       if (!root || !strcmp(root, "/")) {
874         root_fh.init_rootfs(uid, RGWFileHandle::root_name, false);
875       } else {
876         root_fh.init_rootfs(uid, root, true);
877       }
878
879       /* pointer to self */
880       fs.fs_private = this;
881
882       /* expose public root fh */
883       fs.root_fh = root_fh.get_fh();
884
885       new_inst();
886     }
887
888     friend void intrusive_ptr_add_ref(const RGWLibFS* fs) {
889       fs->refcnt.fetch_add(1, std::memory_order_relaxed);
890     }
891
892     friend void intrusive_ptr_release(const RGWLibFS* fs) {
893       if (fs->refcnt.fetch_sub(1, std::memory_order_release) == 0) {
894         std::atomic_thread_fence(std::memory_order_acquire);
895         delete fs;
896       }
897     }
898
899     RGWLibFS* ref() {
900       intrusive_ptr_add_ref(this);
901       return this;
902     }
903
904     inline void rele() {
905       intrusive_ptr_release(this);
906     }
907
908     void stop() { shutdown = true; }
909
910     void release_evict(RGWFileHandle* fh) {
911       /* remove from cache, releases sentinel ref */
912       fh_cache.remove(fh->fh.fh_hk.object, fh,
913                       RGWFileHandle::FHCache::FLAG_LOCK);
914       /* release call-path ref */
915       (void) fh_lru.unref(fh, cohort::lru::FLAG_NONE);
916     }
917
918     int authorize(RGWRados* store) {
919       int ret = rgw_get_user_info_by_access_key(store, key.id, user);
920       if (ret == 0) {
921         RGWAccessKey* key0 = user.get_key0();
922         if (!key0 ||
923             (key0->key != key.key))
924           return -EINVAL;
925         if (user.suspended)
926           return -ERR_USER_SUSPENDED;
927       } else {
928         /* try external authenticators (ldap for now) */
929         rgw::LDAPHelper* ldh = rgwlib.get_ldh(); /* !nullptr */
930         RGWToken token;
931         /* boost filters and/or string_ref may throw on invalid input */
932         try {
933           token = rgw::from_base64(key.id);
934         } catch(...) {
935           token = std::string("");
936         }
937         if (token.valid() && (ldh->auth(token.id, token.key) == 0)) {
938           /* try to store user if it doesn't already exist */
939           if (rgw_get_user_info_by_uid(store, token.id, user) < 0) {
940             int ret = rgw_store_user_info(store, user, NULL, NULL, real_time(),
941                                           true);
942             if (ret < 0) {
943               lsubdout(get_context(), rgw, 10)
944                 << "NOTICE: failed to store new user's info: ret=" << ret
945                 << dendl;
946             }
947           }
948         } /* auth success */
949       }
950       return ret;
951     } /* authorize */
952
953     int register_invalidate(rgw_fh_callback_t cb, void *arg, uint32_t flags) {
954       invalidate_cb = cb;
955       invalidate_arg = arg;
956       return 0;
957     }
958
959     /* find RGWFileHandle by id  */
960     LookupFHResult lookup_fh(const fh_key& fhk,
961                              const uint32_t flags = RGWFileHandle::FLAG_NONE) {
962       using std::get;
963
964       // cast int32_t(RGWFileHandle::FLAG_NONE) due to strictness of Clang 
965       // the cast transfers a lvalue into a rvalue  in the ctor
966       // check the commit message for the full details
967       LookupFHResult fhr { nullptr, uint32_t(RGWFileHandle::FLAG_NONE) };
968
969       RGWFileHandle::FHCache::Latch lat;
970       bool fh_locked = flags & RGWFileHandle::FLAG_LOCKED;
971
972     retry:
973       RGWFileHandle* fh =
974         fh_cache.find_latch(fhk.fh_hk.object /* partition selector*/,
975                             fhk /* key */, lat /* serializer */,
976                             RGWFileHandle::FHCache::FLAG_LOCK);
977       /* LATCHED */
978       if (fh) {
979         if (likely(! fh_locked))
980             fh->mtx.lock(); // XXX !RAII because may-return-LOCKED
981         /* need initial ref from LRU (fast path) */
982         if (! fh_lru.ref(fh, cohort::lru::FLAG_INITIAL)) {
983           lat.lock->unlock();
984           if (likely(! fh_locked))
985             fh->mtx.unlock();
986           goto retry; /* !LATCHED */
987         }
988         /* LATCHED, LOCKED */
989         if (! (flags & RGWFileHandle::FLAG_LOCK))
990           fh->mtx.unlock(); /* ! LOCKED */
991       }
992       lat.lock->unlock(); /* !LATCHED */
993       get<0>(fhr) = fh;
994       if (fh) {
995             lsubdout(get_context(), rgw, 17)
996               << __func__ << " 1 " << *fh
997               << dendl;
998       }
999       return fhr;
1000     } /* lookup_fh(const fh_key&) */
1001
1002     /* find or create an RGWFileHandle */
1003     LookupFHResult lookup_fh(RGWFileHandle* parent, const char *name,
1004                              const uint32_t flags = RGWFileHandle::FLAG_NONE) {
1005       using std::get;
1006
1007       // cast int32_t(RGWFileHandle::FLAG_NONE) due to strictness of Clang 
1008       // the cast transfers a lvalue into a rvalue  in the ctor
1009       // check the commit message for the full details
1010       LookupFHResult fhr { nullptr, uint32_t(RGWFileHandle::FLAG_NONE) };
1011
1012       /* mount is stale? */
1013       if (state.flags & FLAG_CLOSED)
1014         return fhr;
1015
1016       RGWFileHandle::FHCache::Latch lat;
1017       bool fh_locked = flags & RGWFileHandle::FLAG_LOCKED;
1018
1019       std::string obj_name{name};
1020       std::string key_name{parent->make_key_name(name)};
1021
1022       lsubdout(get_context(), rgw, 10)
1023         << __func__ << " lookup called on "
1024         << parent->object_name() << " for " << key_name
1025         << " (" << obj_name << ")"
1026         << dendl;
1027
1028       fh_key fhk = parent->make_fhk(obj_name);
1029
1030     retry:
1031       RGWFileHandle* fh =
1032         fh_cache.find_latch(fhk.fh_hk.object /* partition selector*/,
1033                             fhk /* key */, lat /* serializer */,
1034                             RGWFileHandle::FHCache::FLAG_LOCK);
1035       /* LATCHED */
1036       if (fh) {
1037         if (likely(! fh_locked))
1038           fh->mtx.lock(); // XXX !RAII because may-return-LOCKED
1039         if (fh->flags & RGWFileHandle::FLAG_DELETED) {
1040           /* for now, delay briefly and retry */
1041           lat.lock->unlock();
1042           if (likely(! fh_locked))
1043             fh->mtx.unlock();
1044           std::this_thread::sleep_for(std::chrono::milliseconds(20));
1045           goto retry; /* !LATCHED */
1046         }
1047         /* need initial ref from LRU (fast path) */
1048         if (! fh_lru.ref(fh, cohort::lru::FLAG_INITIAL)) {
1049           lat.lock->unlock();
1050           if (likely(! fh_locked))
1051             fh->mtx.unlock();
1052           goto retry; /* !LATCHED */
1053         }
1054         /* LATCHED, LOCKED */
1055         if (! (flags & RGWFileHandle::FLAG_LOCK))
1056           if (likely(! fh_locked))
1057             fh->mtx.unlock(); /* ! LOCKED */
1058       } else {
1059         /* make or re-use handle */
1060         RGWFileHandle::Factory prototype(this, parent, fhk,
1061                                          obj_name, CREATE_FLAGS(flags));
1062         fh = static_cast<RGWFileHandle*>(
1063           fh_lru.insert(&prototype,
1064                         cohort::lru::Edge::MRU,
1065                         cohort::lru::FLAG_INITIAL));
1066         if (fh) {
1067           /* lock fh (LATCHED) */
1068           if (flags & RGWFileHandle::FLAG_LOCK)
1069             fh->mtx.lock();
1070           /* inserts, releasing latch */
1071           fh_cache.insert_latched(fh, lat, RGWFileHandle::FHCache::FLAG_UNLOCK);
1072           get<1>(fhr) |= RGWFileHandle::FLAG_CREATE;
1073           /* ref parent (non-initial ref cannot fail on valid object) */
1074           if (! parent->is_mount()) {
1075             (void) fh_lru.ref(parent, cohort::lru::FLAG_NONE);
1076           }
1077           goto out; /* !LATCHED */
1078         } else {
1079           lat.lock->unlock();
1080           goto retry; /* !LATCHED */
1081         }
1082       }
1083       lat.lock->unlock(); /* !LATCHED */
1084     out:
1085       get<0>(fhr) = fh;
1086       if (fh) {
1087             lsubdout(get_context(), rgw, 17)
1088               << __func__ << " 2 " << *fh
1089               << dendl;
1090       }
1091       return fhr;
1092     } /*  lookup_fh(RGWFileHandle*, const char *, const uint32_t) */
1093
1094     inline void unref(RGWFileHandle* fh) {
1095       if (likely(! fh->is_mount())) {
1096         (void) fh_lru.unref(fh, cohort::lru::FLAG_NONE);
1097       }
1098     }
1099
1100     inline RGWFileHandle* ref(RGWFileHandle* fh) {
1101       if (likely(! fh->is_mount())) {
1102         fh_lru.ref(fh, cohort::lru::FLAG_NONE);
1103       }
1104       return fh;
1105     }
1106
1107     int getattr(RGWFileHandle* rgw_fh, struct stat* st);
1108
1109     int setattr(RGWFileHandle* rgw_fh, struct stat* st, uint32_t mask,
1110                 uint32_t flags);
1111
1112     void update_fh(RGWFileHandle *rgw_fh);
1113
1114     LookupFHResult stat_bucket(RGWFileHandle* parent, const char *path,
1115                                RGWLibFS::BucketStats& bs,
1116                                uint32_t flags);
1117
1118     LookupFHResult stat_leaf(RGWFileHandle* parent, const char *path,
1119                              enum rgw_fh_type type = RGW_FS_TYPE_NIL,
1120                              uint32_t flags = RGWFileHandle::FLAG_NONE);
1121
1122     int read(RGWFileHandle* rgw_fh, uint64_t offset, size_t length,
1123              size_t* bytes_read, void* buffer, uint32_t flags);
1124
1125     int rename(RGWFileHandle* old_fh, RGWFileHandle* new_fh,
1126                const char *old_name, const char *new_name);
1127
1128     MkObjResult create(RGWFileHandle* parent, const char *name, struct stat *st,
1129                       uint32_t mask, uint32_t flags);
1130
1131     MkObjResult mkdir(RGWFileHandle* parent, const char *name, struct stat *st,
1132                       uint32_t mask, uint32_t flags);
1133
1134     int unlink(RGWFileHandle* rgw_fh, const char *name,
1135                uint32_t flags = FLAG_NONE);
1136
1137     /* find existing RGWFileHandle */
1138     RGWFileHandle* lookup_handle(struct rgw_fh_hk fh_hk) {
1139
1140       if (state.flags & FLAG_CLOSED)
1141         return nullptr;
1142
1143       RGWFileHandle::FHCache::Latch lat;
1144       fh_key fhk(fh_hk);
1145
1146     retry:
1147       RGWFileHandle* fh =
1148         fh_cache.find_latch(fhk.fh_hk.object /* partition selector*/,
1149                             fhk /* key */, lat /* serializer */,
1150                             RGWFileHandle::FHCache::FLAG_LOCK);
1151       /* LATCHED */
1152       if (! fh) {
1153         lsubdout(get_context(), rgw, 0)
1154           << __func__ << " handle lookup failed <"
1155           << fhk.fh_hk.bucket << "," << fhk.fh_hk.object << ">"
1156           << "(need persistent handles)"
1157           << dendl;
1158         goto out;
1159       }
1160       fh->mtx.lock();
1161       if (fh->flags & RGWFileHandle::FLAG_DELETED) {
1162         /* for now, delay briefly and retry */
1163         lat.lock->unlock();
1164         fh->mtx.unlock(); /* !LOCKED */
1165         std::this_thread::sleep_for(std::chrono::milliseconds(20));
1166         goto retry; /* !LATCHED */
1167       }
1168       if (! fh_lru.ref(fh, cohort::lru::FLAG_INITIAL)) {
1169         lat.lock->unlock();
1170         fh->mtx.unlock();
1171         goto retry; /* !LATCHED */
1172       }
1173       /* LATCHED */
1174       fh->mtx.unlock(); /* !LOCKED */
1175     out:
1176       lat.lock->unlock(); /* !LATCHED */
1177
1178       /* special case:  lookup root_fh */
1179       if (! fh) {
1180         if (unlikely(fh_hk == root_fh.fh.fh_hk)) {
1181           fh = &root_fh;
1182         }
1183       }
1184
1185       return fh;
1186     }
1187
1188     CephContext* get_context() {
1189       return cct;
1190     }
1191
1192     struct rgw_fs* get_fs() { return &fs; }
1193
1194     uint64_t get_fsid() { return root_fh.state.dev; }
1195
1196     RGWUserInfo* get_user() { return &user; }
1197
1198     void close();
1199     void gc();
1200   }; /* RGWLibFS */
1201
1202 static inline std::string make_uri(const std::string& bucket_name,
1203                                    const std::string& object_name) {
1204   std::string uri("/");
1205   uri.reserve(bucket_name.length() + object_name.length() + 2);
1206   uri += bucket_name;
1207   uri += "/";
1208   uri += object_name;
1209   return uri;
1210 }
1211
1212 /*
1213   read directory content (buckets)
1214 */
1215
1216 class RGWListBucketsRequest : public RGWLibRequest,
1217                               public RGWListBuckets /* RGWOp */
1218 {
1219 public:
1220   RGWFileHandle* rgw_fh;
1221   RGWFileHandle::readdir_offset offset;
1222   void* cb_arg;
1223   rgw_readdir_cb rcb;
1224   uint64_t* ioff;
1225   size_t ix;
1226   uint32_t d_count;
1227
1228   RGWListBucketsRequest(CephContext* _cct, RGWUserInfo *_user,
1229                         RGWFileHandle* _rgw_fh, rgw_readdir_cb _rcb,
1230                         void* _cb_arg, RGWFileHandle::readdir_offset& _offset)
1231     : RGWLibRequest(_cct, _user), rgw_fh(_rgw_fh), offset(_offset),
1232       cb_arg(_cb_arg), rcb(_rcb), ioff(nullptr), ix(0), d_count(0) {
1233
1234     using boost::get;
1235
1236     if (unlikely(!! get<uint64_t*>(&offset))) {
1237       ioff = get<uint64_t*>(offset);
1238       const auto& mk = rgw_fh->find_marker(*ioff);
1239       if (mk) {
1240         marker = mk->name;
1241       }
1242     } else {
1243       const char* mk = get<const char*>(offset);
1244       if (mk) {
1245         marker = mk;
1246       }
1247     }
1248     op = this;
1249   }
1250
1251   bool only_bucket() override { return false; }
1252
1253   int op_init() override {
1254     // assign store, s, and dialect_handler
1255     RGWObjectCtx* rados_ctx
1256       = static_cast<RGWObjectCtx*>(get_state()->obj_ctx);
1257     // framework promises to call op_init after parent init
1258     assert(rados_ctx);
1259     RGWOp::init(rados_ctx->store, get_state(), this);
1260     op = this; // assign self as op: REQUIRED
1261     return 0;
1262   }
1263
1264   int header_init() override {
1265     struct req_state* s = get_state();
1266     s->info.method = "GET";
1267     s->op = OP_GET;
1268
1269     /* XXX derp derp derp */
1270     s->relative_uri = "/";
1271     s->info.request_uri = "/"; // XXX
1272     s->info.effective_uri = "/";
1273     s->info.request_params = "";
1274     s->info.domain = ""; /* XXX ? */
1275
1276     // woo
1277     s->user = user;
1278
1279     return 0;
1280   }
1281
1282   int get_params() override {
1283     limit = -1; /* no limit */
1284     return 0;
1285   }
1286
1287   void send_response_begin(bool has_buckets) override {
1288     sent_data = true;
1289   }
1290
1291   void send_response_data(RGWUserBuckets& buckets) override {
1292     if (!sent_data)
1293       return;
1294     map<string, RGWBucketEnt>& m = buckets.get_buckets();
1295     for (const auto& iter : m) {
1296       boost::string_ref marker{iter.first};
1297       const RGWBucketEnt& ent = iter.second;
1298       if (! this->operator()(ent.bucket.name, marker)) {
1299         /* caller cannot accept more */
1300         lsubdout(cct, rgw, 5) << "ListBuckets rcb failed"
1301                               << " dirent=" << ent.bucket.name
1302                               << " call count=" << ix
1303                               << dendl;
1304         return;
1305       }
1306       ++ix;
1307     }
1308   } /* send_response_data */
1309
1310   void send_response_end() override {
1311     // do nothing
1312   }
1313
1314   int operator()(const boost::string_ref& name,
1315                  const boost::string_ref& marker) {
1316     uint64_t off = XXH64(name.data(), name.length(), fh_key::seed);
1317     if (!! ioff) {
1318       *ioff = off;
1319     }
1320     /* update traversal cache */
1321     rgw_fh->add_marker(off, rgw_obj_key{marker.data(), ""},
1322                        RGW_FS_TYPE_DIRECTORY);
1323     ++d_count;
1324     return rcb(name.data(), cb_arg, off, RGW_LOOKUP_FLAG_DIR);
1325   }
1326
1327   bool eof() {
1328     lsubdout(cct, rgw, 15) << "READDIR offset: " << offset
1329                            << " is_truncated: " << is_truncated
1330                            << dendl;
1331     return !is_truncated;
1332   }
1333
1334 }; /* RGWListBucketsRequest */
1335
1336 /*
1337   read directory content (bucket objects)
1338 */
1339
1340 class RGWReaddirRequest : public RGWLibRequest,
1341                           public RGWListBucket /* RGWOp */
1342 {
1343 public:
1344   RGWFileHandle* rgw_fh;
1345   RGWFileHandle::readdir_offset offset;
1346   void* cb_arg;
1347   rgw_readdir_cb rcb;
1348   uint64_t* ioff;
1349   size_t ix;
1350   uint32_t d_count;
1351
1352   RGWReaddirRequest(CephContext* _cct, RGWUserInfo *_user,
1353                     RGWFileHandle* _rgw_fh, rgw_readdir_cb _rcb,
1354                     void* _cb_arg, RGWFileHandle::readdir_offset& _offset)
1355     : RGWLibRequest(_cct, _user), rgw_fh(_rgw_fh), offset(_offset),
1356       cb_arg(_cb_arg), rcb(_rcb), ioff(nullptr), ix(0), d_count(0) {
1357
1358     using boost::get;
1359
1360     if (unlikely(!! get<uint64_t*>(&offset))) {
1361       ioff = get<uint64_t*>(offset);
1362       const auto& mk = rgw_fh->find_marker(*ioff);
1363       if (mk) {
1364         marker = *mk;
1365       }
1366     } else {
1367       const char* mk = get<const char*>(offset);
1368       if (mk) {
1369         std::string tmark{rgw_fh->relative_object_name()};
1370         tmark += "/";
1371         tmark += mk;    
1372         marker = rgw_obj_key{std::move(tmark), "", ""};
1373       }
1374     }
1375
1376     default_max = 1000; // XXX was being omitted
1377     op = this;
1378   }
1379
1380   bool only_bucket() override { return true; }
1381
1382   int op_init() override {
1383     // assign store, s, and dialect_handler
1384     RGWObjectCtx* rados_ctx
1385       = static_cast<RGWObjectCtx*>(get_state()->obj_ctx);
1386     // framework promises to call op_init after parent init
1387     assert(rados_ctx);
1388     RGWOp::init(rados_ctx->store, get_state(), this);
1389     op = this; // assign self as op: REQUIRED
1390     return 0;
1391   }
1392
1393   int header_init() override {
1394     struct req_state* s = get_state();
1395     s->info.method = "GET";
1396     s->op = OP_GET;
1397
1398     /* XXX derp derp derp */
1399     std::string uri = "/" + rgw_fh->bucket_name() + "/";
1400     s->relative_uri = uri;
1401     s->info.request_uri = uri; // XXX
1402     s->info.effective_uri = uri;
1403     s->info.request_params = "";
1404     s->info.domain = ""; /* XXX ? */
1405
1406     // woo
1407     s->user = user;
1408
1409     prefix = rgw_fh->relative_object_name();
1410     if (prefix.length() > 0)
1411       prefix += "/";
1412     delimiter = '/';
1413
1414     return 0;
1415   }
1416
1417   int operator()(const boost::string_ref name, const rgw_obj_key& marker,
1418                 uint8_t type) {
1419
1420     assert(name.length() > 0); // XXX
1421
1422     /* hash offset of name in parent (short name) for NFS readdir cookie */
1423     uint64_t off = XXH64(name.data(), name.length(), fh_key::seed);
1424     if (unlikely(!! ioff)) {
1425       *ioff = off;
1426     }
1427     /* update traversal cache */
1428     rgw_fh->add_marker(off, marker, type);
1429     ++d_count;
1430     return rcb(name.data(), cb_arg, off,
1431                (type == RGW_FS_TYPE_DIRECTORY) ?
1432                RGW_LOOKUP_FLAG_DIR :
1433                RGW_LOOKUP_FLAG_FILE);
1434   }
1435
1436   int get_params() override {
1437     max = default_max;
1438     return 0;
1439   }
1440
1441   void send_response() override {
1442     struct req_state* s = get_state();
1443     for (const auto& iter : objs) {
1444
1445       boost::string_ref sref {iter.key.name};
1446
1447       lsubdout(cct, rgw, 15) << "readdir objects prefix: " << prefix
1448                              << " obj: " << sref << dendl;
1449
1450       size_t last_del = sref.find_last_of('/');
1451       if (last_del != string::npos)
1452         sref.remove_prefix(last_del+1);
1453
1454       /* leaf directory? */
1455       if (sref.empty())
1456         continue;
1457
1458       lsubdout(cct, rgw, 15) << "RGWReaddirRequest "
1459                              << __func__ << " "
1460                              << "list uri=" << s->relative_uri << " "
1461                              << " prefix=" << prefix << " "
1462                              << " obj path=" << iter.key.name
1463                              << " (" << sref << ")" << ""
1464                              << dendl;
1465
1466       if(! this->operator()(sref, next_marker, RGW_FS_TYPE_FILE)) {
1467         /* caller cannot accept more */
1468         lsubdout(cct, rgw, 5) << "readdir rcb failed"
1469                               << " dirent=" << sref.data()
1470                               << " call count=" << ix
1471                               << dendl;
1472         return;
1473       }
1474       ++ix;
1475     }
1476     for (auto& iter : common_prefixes) {
1477
1478       lsubdout(cct, rgw, 15) << "readdir common prefixes prefix: " << prefix
1479                              << " iter first: " << iter.first
1480                              << " iter second: " << iter.second
1481                              << dendl;
1482
1483       /* XXX aieee--I have seen this case! */
1484       if (iter.first == "/")
1485         continue;
1486
1487       /* it's safest to modify the element in place--a suffix-modifying
1488        * string_ref operation is problematic since ULP rgw_file callers
1489        * will ultimately need a c-string */
1490       if (iter.first.back() == '/')
1491         const_cast<std::string&>(iter.first).pop_back();
1492
1493       boost::string_ref sref{iter.first};
1494
1495       size_t last_del = sref.find_last_of('/');
1496       if (last_del != string::npos)
1497         sref.remove_prefix(last_del+1);
1498
1499       lsubdout(cct, rgw, 15) << "RGWReaddirRequest "
1500                              << __func__ << " "
1501                              << "list uri=" << s->relative_uri << " "
1502                              << " prefix=" << prefix << " "
1503                              << " cpref=" << sref
1504                              << dendl;
1505
1506       this->operator()(sref, next_marker, RGW_FS_TYPE_DIRECTORY);
1507       ++ix;
1508     }
1509   }
1510
1511   virtual void send_versioned_response() {
1512     send_response();
1513   }
1514
1515   bool eof() {
1516     lsubdout(cct, rgw, 15) << "READDIR offset: " << offset
1517                            << " next marker: " << next_marker
1518                            << " is_truncated: " << is_truncated
1519                            << dendl;
1520     return !is_truncated;
1521   }
1522
1523 }; /* RGWReaddirRequest */
1524
1525 /*
1526   dir has-children predicate (bucket objects)
1527 */
1528
1529 class RGWRMdirCheck : public RGWLibRequest,
1530                       public RGWListBucket /* RGWOp */
1531 {
1532 public:
1533   const RGWFileHandle* rgw_fh;
1534   bool valid;
1535   bool has_children;
1536
1537   RGWRMdirCheck (CephContext* _cct, RGWUserInfo *_user,
1538                  const RGWFileHandle* _rgw_fh)
1539     : RGWLibRequest(_cct, _user), rgw_fh(_rgw_fh), valid(false),
1540       has_children(false) {
1541     default_max = 2;
1542     op = this;
1543   }
1544
1545   bool only_bucket() override { return true; }
1546
1547   int op_init() override {
1548     // assign store, s, and dialect_handler
1549     RGWObjectCtx* rados_ctx
1550       = static_cast<RGWObjectCtx*>(get_state()->obj_ctx);
1551     // framework promises to call op_init after parent init
1552     assert(rados_ctx);
1553     RGWOp::init(rados_ctx->store, get_state(), this);
1554     op = this; // assign self as op: REQUIRED
1555     return 0;
1556   }
1557
1558   int header_init() override {
1559     struct req_state* s = get_state();
1560     s->info.method = "GET";
1561     s->op = OP_GET;
1562
1563     std::string uri = "/" + rgw_fh->bucket_name() + "/";
1564     s->relative_uri = uri;
1565     s->info.request_uri = uri;
1566     s->info.effective_uri = uri;
1567     s->info.request_params = "";
1568     s->info.domain = ""; /* XXX ? */
1569
1570     s->user = user;
1571
1572     prefix = rgw_fh->relative_object_name();
1573     if (prefix.length() > 0)
1574       prefix += "/";
1575     delimiter = '/';
1576
1577     return 0;
1578   }
1579
1580   int get_params() override {
1581     max = default_max;
1582     return 0;
1583   }
1584
1585   void send_response() override {
1586     valid = true;
1587     if ((objs.size() > 1) ||
1588         (! objs.empty() &&
1589          (objs.front().key.name != prefix))) {
1590       has_children = true;
1591       return;
1592     }
1593     for (auto& iter : common_prefixes) {
1594       /* readdir never produces a name for this case */
1595       if (iter.first == "/")
1596         continue;
1597       has_children = true;
1598       break;
1599     }
1600   }
1601
1602   virtual void send_versioned_response() {
1603     send_response();
1604   }
1605
1606 }; /* RGWRMdirCheck */
1607
1608 /*
1609   create bucket
1610 */
1611
1612 class RGWCreateBucketRequest : public RGWLibRequest,
1613                                public RGWCreateBucket /* RGWOp */
1614 {
1615 public:
1616   const std::string& bucket_name;
1617
1618   RGWCreateBucketRequest(CephContext* _cct, RGWUserInfo *_user,
1619                         std::string& _bname)
1620     : RGWLibRequest(_cct, _user), bucket_name(_bname) {
1621     op = this;
1622   }
1623
1624   bool only_bucket() override { return false; }
1625
1626   int read_permissions(RGWOp* op_obj) override {
1627     /* we ARE a 'create bucket' request (cf. rgw_rest.cc, ll. 1305-6) */
1628     return 0;
1629   }
1630
1631   int op_init() override {
1632     // assign store, s, and dialect_handler
1633     RGWObjectCtx* rados_ctx
1634       = static_cast<RGWObjectCtx*>(get_state()->obj_ctx);
1635     // framework promises to call op_init after parent init
1636     assert(rados_ctx);
1637     RGWOp::init(rados_ctx->store, get_state(), this);
1638     op = this; // assign self as op: REQUIRED
1639     return 0;
1640   }
1641
1642   int header_init() override {
1643
1644     struct req_state* s = get_state();
1645     s->info.method = "PUT";
1646     s->op = OP_PUT;
1647
1648     string uri = "/" + bucket_name;
1649     /* XXX derp derp derp */
1650     s->relative_uri = uri;
1651     s->info.request_uri = uri; // XXX
1652     s->info.effective_uri = uri;
1653     s->info.request_params = "";
1654     s->info.domain = ""; /* XXX ? */
1655
1656     // woo
1657     s->user = user;
1658
1659     return 0;
1660   }
1661
1662   int get_params() override {
1663     struct req_state* s = get_state();
1664     RGWAccessControlPolicy_S3 s3policy(s->cct);
1665     /* we don't have (any) headers, so just create canned ACLs */
1666     int ret = s3policy.create_canned(s->owner, s->bucket_owner, s->canned_acl);
1667     policy = s3policy;
1668     return ret;
1669   }
1670
1671   void send_response() override {
1672     /* TODO: something (maybe) */
1673   }
1674 }; /* RGWCreateBucketRequest */
1675
1676 /*
1677   delete bucket
1678 */
1679
1680 class RGWDeleteBucketRequest : public RGWLibRequest,
1681                                public RGWDeleteBucket /* RGWOp */
1682 {
1683 public:
1684   const std::string& bucket_name;
1685
1686   RGWDeleteBucketRequest(CephContext* _cct, RGWUserInfo *_user,
1687                         std::string& _bname)
1688     : RGWLibRequest(_cct, _user), bucket_name(_bname) {
1689     op = this;
1690   }
1691
1692   bool only_bucket() override { return true; }
1693
1694   int op_init() override {
1695     // assign store, s, and dialect_handler
1696     RGWObjectCtx* rados_ctx
1697       = static_cast<RGWObjectCtx*>(get_state()->obj_ctx);
1698     // framework promises to call op_init after parent init
1699     assert(rados_ctx);
1700     RGWOp::init(rados_ctx->store, get_state(), this);
1701     op = this; // assign self as op: REQUIRED
1702     return 0;
1703   }
1704
1705   int header_init() override {
1706
1707     struct req_state* s = get_state();
1708     s->info.method = "DELETE";
1709     s->op = OP_DELETE;
1710
1711     string uri = "/" + bucket_name;
1712     /* XXX derp derp derp */
1713     s->relative_uri = uri;
1714     s->info.request_uri = uri; // XXX
1715     s->info.effective_uri = uri;
1716     s->info.request_params = "";
1717     s->info.domain = ""; /* XXX ? */
1718
1719     // woo
1720     s->user = user;
1721
1722     return 0;
1723   }
1724
1725   void send_response() override {}
1726
1727 }; /* RGWDeleteBucketRequest */
1728
1729 /*
1730   put object
1731 */
1732 class RGWPutObjRequest : public RGWLibRequest,
1733                          public RGWPutObj /* RGWOp */
1734 {
1735 public:
1736   const std::string& bucket_name;
1737   const std::string& obj_name;
1738   buffer::list& bl; /* XXX */
1739   size_t bytes_written;
1740
1741   RGWPutObjRequest(CephContext* _cct, RGWUserInfo *_user,
1742                   const std::string& _bname, const std::string& _oname,
1743                   buffer::list& _bl)
1744     : RGWLibRequest(_cct, _user), bucket_name(_bname), obj_name(_oname),
1745       bl(_bl), bytes_written(0) {
1746     op = this;
1747   }
1748
1749   bool only_bucket() override { return true; }
1750
1751   int op_init() override {
1752     // assign store, s, and dialect_handler
1753     RGWObjectCtx* rados_ctx
1754       = static_cast<RGWObjectCtx*>(get_state()->obj_ctx);
1755     // framework promises to call op_init after parent init
1756     assert(rados_ctx);
1757     RGWOp::init(rados_ctx->store, get_state(), this);
1758     op = this; // assign self as op: REQUIRED
1759
1760     int rc = valid_s3_object_name(obj_name);
1761     if (rc != 0)
1762       return rc;
1763
1764     return 0;
1765   }
1766
1767   int header_init() override {
1768
1769     struct req_state* s = get_state();
1770     s->info.method = "PUT";
1771     s->op = OP_PUT;
1772
1773     /* XXX derp derp derp */
1774     std::string uri = make_uri(bucket_name, obj_name);
1775     s->relative_uri = uri;
1776     s->info.request_uri = uri; // XXX
1777     s->info.effective_uri = uri;
1778     s->info.request_params = "";
1779     s->info.domain = ""; /* XXX ? */
1780
1781     /* XXX required in RGWOp::execute() */
1782     s->content_length = bl.length();
1783
1784     // woo
1785     s->user = user;
1786
1787     return 0;
1788   }
1789
1790   int get_params() override {
1791     struct req_state* s = get_state();
1792     RGWAccessControlPolicy_S3 s3policy(s->cct);
1793     /* we don't have (any) headers, so just create canned ACLs */
1794     int ret = s3policy.create_canned(s->owner, s->bucket_owner, s->canned_acl);
1795     policy = s3policy;
1796     return ret;
1797   }
1798
1799   int get_data(buffer::list& _bl) override {
1800     /* XXX for now, use sharing semantics */
1801     _bl.claim(bl);
1802     uint32_t len = _bl.length();
1803     bytes_written += len;
1804     return len;
1805   }
1806
1807   void send_response() override {}
1808
1809   int verify_params() override {
1810     if (bl.length() > cct->_conf->rgw_max_put_size)
1811       return -ERR_TOO_LARGE;
1812     return 0;
1813   }
1814
1815 }; /* RGWPutObjRequest */
1816
1817 /*
1818   get object
1819 */
1820
1821 class RGWReadRequest : public RGWLibRequest,
1822                        public RGWGetObj /* RGWOp */
1823 {
1824 public:
1825   RGWFileHandle* rgw_fh;
1826   void *ulp_buffer;
1827   size_t nread;
1828   size_t read_resid; /* initialize to len, <= sizeof(ulp_buffer) */
1829   bool do_hexdump = false;
1830
1831   RGWReadRequest(CephContext* _cct, RGWUserInfo *_user,
1832                  RGWFileHandle* _rgw_fh, uint64_t off, uint64_t len,
1833                  void *_ulp_buffer)
1834     : RGWLibRequest(_cct, _user), rgw_fh(_rgw_fh), ulp_buffer(_ulp_buffer),
1835       nread(0), read_resid(len) {
1836     op = this;
1837
1838     /* fixup RGWGetObj (already know range parameters) */
1839     RGWGetObj::range_parsed = true;
1840     RGWGetObj::get_data = true; // XXX
1841     RGWGetObj::partial_content = true;
1842     RGWGetObj::ofs = off;
1843     RGWGetObj::end = off + len;
1844   }
1845
1846   bool only_bucket() override { return false; }
1847
1848   int op_init() override {
1849     // assign store, s, and dialect_handler
1850     RGWObjectCtx* rados_ctx
1851       = static_cast<RGWObjectCtx*>(get_state()->obj_ctx);
1852     // framework promises to call op_init after parent init
1853     assert(rados_ctx);
1854     RGWOp::init(rados_ctx->store, get_state(), this);
1855     op = this; // assign self as op: REQUIRED
1856     return 0;
1857   }
1858
1859   int header_init() override {
1860
1861     struct req_state* s = get_state();
1862     s->info.method = "GET";
1863     s->op = OP_GET;
1864
1865     /* XXX derp derp derp */
1866     s->relative_uri = make_uri(rgw_fh->bucket_name(),
1867                                rgw_fh->relative_object_name());
1868     s->info.request_uri = s->relative_uri; // XXX
1869     s->info.effective_uri = s->relative_uri;
1870     s->info.request_params = "";
1871     s->info.domain = ""; /* XXX ? */
1872
1873     // woo
1874     s->user = user;
1875
1876     return 0;
1877   }
1878
1879   int get_params() override {
1880     return 0;
1881   }
1882
1883   int send_response_data(ceph::buffer::list& bl, off_t bl_off,
1884                          off_t bl_len) override {
1885     size_t bytes;
1886     for (auto& bp : bl.buffers()) {
1887       /* if for some reason bl_off indicates the start-of-data is not at
1888        * the current buffer::ptr, skip it and account */
1889       if (bl_off > bp.length()) {
1890         bl_off -= bp.length();
1891         continue;
1892       }
1893       /* read no more than read_resid */
1894       bytes = std::min(read_resid, size_t(bp.length()-bl_off));
1895       memcpy(static_cast<char*>(ulp_buffer)+nread, bp.c_str()+bl_off, bytes);
1896       read_resid -= bytes; /* reduce read_resid by bytes read */
1897       nread += bytes;
1898       bl_off = 0;
1899       /* stop if we have no residual ulp_buffer */
1900       if (! read_resid)
1901         break;
1902     }
1903     return 0;
1904   }
1905
1906   int send_response_data_error() override {
1907     /* S3 implementation just sends nothing--there is no side effect
1908      * to simulate here */
1909     return 0;
1910   }
1911
1912 }; /* RGWReadRequest */
1913
1914 /*
1915   delete object
1916 */
1917
1918 class RGWDeleteObjRequest : public RGWLibRequest,
1919                             public RGWDeleteObj /* RGWOp */
1920 {
1921 public:
1922   const std::string& bucket_name;
1923   const std::string& obj_name;
1924
1925   RGWDeleteObjRequest(CephContext* _cct, RGWUserInfo *_user,
1926                       const std::string& _bname, const std::string& _oname)
1927     : RGWLibRequest(_cct, _user), bucket_name(_bname), obj_name(_oname) {
1928     op = this;
1929   }
1930
1931   bool only_bucket() override { return true; }
1932
1933   int op_init() override {
1934     // assign store, s, and dialect_handler
1935     RGWObjectCtx* rados_ctx
1936       = static_cast<RGWObjectCtx*>(get_state()->obj_ctx);
1937     // framework promises to call op_init after parent init
1938     assert(rados_ctx);
1939     RGWOp::init(rados_ctx->store, get_state(), this);
1940     op = this; // assign self as op: REQUIRED
1941     return 0;
1942   }
1943
1944   int header_init() override {
1945
1946     struct req_state* s = get_state();
1947     s->info.method = "DELETE";
1948     s->op = OP_DELETE;
1949
1950     /* XXX derp derp derp */
1951     std::string uri = make_uri(bucket_name, obj_name);
1952     s->relative_uri = uri;
1953     s->info.request_uri = uri; // XXX
1954     s->info.effective_uri = uri;
1955     s->info.request_params = "";
1956     s->info.domain = ""; /* XXX ? */
1957
1958     // woo
1959     s->user = user;
1960
1961     return 0;
1962   }
1963
1964   void send_response() override {}
1965
1966 }; /* RGWDeleteObjRequest */
1967
1968 class RGWStatObjRequest : public RGWLibRequest,
1969                           public RGWGetObj /* RGWOp */
1970 {
1971 public:
1972   const std::string& bucket_name;
1973   const std::string& obj_name;
1974   uint64_t _size;
1975   uint32_t flags;
1976
1977   static constexpr uint32_t FLAG_NONE = 0x000;
1978
1979   RGWStatObjRequest(CephContext* _cct, RGWUserInfo *_user,
1980                     const std::string& _bname, const std::string& _oname,
1981                     uint32_t _flags)
1982     : RGWLibRequest(_cct, _user), bucket_name(_bname), obj_name(_oname),
1983       _size(0), flags(_flags) {
1984     op = this;
1985
1986     /* fixup RGWGetObj (already know range parameters) */
1987     RGWGetObj::range_parsed = true;
1988     RGWGetObj::get_data = false; // XXX
1989     RGWGetObj::partial_content = true;
1990     RGWGetObj::ofs = 0;
1991     RGWGetObj::end = UINT64_MAX;
1992   }
1993
1994   const string name() override { return "stat_obj"; }
1995   RGWOpType get_type() override { return RGW_OP_STAT_OBJ; }
1996
1997   real_time get_mtime() const {
1998     return lastmod;
1999   }
2000
2001   /* attributes */
2002   uint64_t get_size() { return _size; }
2003   real_time ctime() { return mod_time; } // XXX
2004   real_time mtime() { return mod_time; }
2005   std::map<string, bufferlist>& get_attrs() { return attrs; }
2006
2007   buffer::list* get_attr(const std::string& k) {
2008     auto iter = attrs.find(k);
2009     return (iter != attrs.end()) ? &(iter->second) : nullptr;
2010   }
2011
2012   bool only_bucket() override { return false; }
2013
2014   int op_init() override {
2015     // assign store, s, and dialect_handler
2016     RGWObjectCtx* rados_ctx
2017       = static_cast<RGWObjectCtx*>(get_state()->obj_ctx);
2018     // framework promises to call op_init after parent init
2019     assert(rados_ctx);
2020     RGWOp::init(rados_ctx->store, get_state(), this);
2021     op = this; // assign self as op: REQUIRED
2022     return 0;
2023   }
2024
2025   int header_init() override {
2026
2027     struct req_state* s = get_state();
2028     s->info.method = "GET";
2029     s->op = OP_GET;
2030
2031     /* XXX derp derp derp */
2032     s->relative_uri = make_uri(bucket_name, obj_name);
2033     s->info.request_uri = s->relative_uri; // XXX
2034     s->info.effective_uri = s->relative_uri;
2035     s->info.request_params = "";
2036     s->info.domain = ""; /* XXX ? */
2037
2038     // woo
2039     s->user = user;
2040
2041     return 0;
2042   }
2043
2044   int get_params() override {
2045     return 0;
2046   }
2047
2048   int send_response_data(ceph::buffer::list& _bl, off_t s_off,
2049                          off_t e_off) override {
2050     /* NOP */
2051     /* XXX save attrs? */
2052     return 0;
2053   }
2054
2055   int send_response_data_error() override {
2056     /* NOP */
2057     return 0;
2058   }
2059
2060   void execute() override {
2061     RGWGetObj::execute();
2062     _size = get_state()->obj_size;
2063   }
2064
2065 }; /* RGWStatObjRequest */
2066
2067 class RGWStatBucketRequest : public RGWLibRequest,
2068                              public RGWStatBucket /* RGWOp */
2069 {
2070 public:
2071   std::string uri;
2072   std::map<std::string, buffer::list> attrs;
2073   RGWLibFS::BucketStats& bs;
2074
2075   RGWStatBucketRequest(CephContext* _cct, RGWUserInfo *_user,
2076                        const std::string& _path,
2077                        RGWLibFS::BucketStats& _stats)
2078     : RGWLibRequest(_cct, _user), bs(_stats) {
2079     uri = "/" + _path;
2080     op = this;
2081   }
2082
2083   buffer::list* get_attr(const std::string& k) {
2084     auto iter = attrs.find(k);
2085     return (iter != attrs.end()) ? &(iter->second) : nullptr;
2086   }
2087
2088   real_time get_ctime() const {
2089     return bucket.creation_time;
2090   }
2091
2092   bool only_bucket() override { return false; }
2093
2094   int op_init() override {
2095     // assign store, s, and dialect_handler
2096     RGWObjectCtx* rados_ctx
2097       = static_cast<RGWObjectCtx*>(get_state()->obj_ctx);
2098     // framework promises to call op_init after parent init
2099     assert(rados_ctx);
2100     RGWOp::init(rados_ctx->store, get_state(), this);
2101     op = this; // assign self as op: REQUIRED
2102     return 0;
2103   }
2104
2105   int header_init() override {
2106
2107     struct req_state* s = get_state();
2108     s->info.method = "GET";
2109     s->op = OP_GET;
2110
2111     /* XXX derp derp derp */
2112     s->relative_uri = uri;
2113     s->info.request_uri = uri; // XXX
2114     s->info.effective_uri = uri;
2115     s->info.request_params = "";
2116     s->info.domain = ""; /* XXX ? */
2117
2118     // woo
2119     s->user = user;
2120
2121     return 0;
2122   }
2123
2124   virtual int get_params() {
2125     return 0;
2126   }
2127
2128   void send_response() override {
2129     bucket.creation_time = get_state()->bucket_info.creation_time;
2130     bs.size = bucket.size;
2131     bs.size_rounded = bucket.size_rounded;
2132     bs.creation_time = bucket.creation_time;
2133     bs.num_entries = bucket.count;
2134     std::swap(attrs, get_state()->bucket_attrs);
2135   }
2136
2137   bool matched() {
2138     return (bucket.bucket.name.length() > 0);
2139   }
2140
2141 }; /* RGWStatBucketRequest */
2142
2143 class RGWStatLeafRequest : public RGWLibRequest,
2144                            public RGWListBucket /* RGWOp */
2145 {
2146 public:
2147   RGWFileHandle* rgw_fh;
2148   std::string path;
2149   bool matched;
2150   bool is_dir;
2151   bool exact_matched;
2152
2153   RGWStatLeafRequest(CephContext* _cct, RGWUserInfo *_user,
2154                      RGWFileHandle* _rgw_fh, const std::string& _path)
2155     : RGWLibRequest(_cct, _user), rgw_fh(_rgw_fh), path(_path),
2156       matched(false), is_dir(false), exact_matched(false) {
2157     default_max = 1000; // logical max {"foo", "foo/"}
2158     op = this;
2159   }
2160
2161   bool only_bucket() override { return true; }
2162
2163   int op_init() override {
2164     // assign store, s, and dialect_handler
2165     RGWObjectCtx* rados_ctx
2166       = static_cast<RGWObjectCtx*>(get_state()->obj_ctx);
2167     // framework promises to call op_init after parent init
2168     assert(rados_ctx);
2169     RGWOp::init(rados_ctx->store, get_state(), this);
2170     op = this; // assign self as op: REQUIRED
2171     return 0;
2172   }
2173
2174   int header_init() override {
2175
2176     struct req_state* s = get_state();
2177     s->info.method = "GET";
2178     s->op = OP_GET;
2179
2180     /* XXX derp derp derp */
2181     std::string uri = "/" + rgw_fh->bucket_name() + "/";
2182     s->relative_uri = uri;
2183     s->info.request_uri = uri; // XXX
2184     s->info.effective_uri = uri;
2185     s->info.request_params = "";
2186     s->info.domain = ""; /* XXX ? */
2187
2188     // woo
2189     s->user = user;
2190
2191     prefix = rgw_fh->relative_object_name();
2192     if (prefix.length() > 0)
2193       prefix += "/";
2194     prefix += path;
2195     delimiter = '/';
2196
2197     return 0;
2198   }
2199
2200   int get_params() override {
2201     max = default_max;
2202     return 0;
2203   }
2204
2205   void send_response() override {
2206     struct req_state* s = get_state();
2207     // try objects
2208     for (const auto& iter : objs) {
2209       auto& name = iter.key.name;
2210       lsubdout(cct, rgw, 15) << "RGWStatLeafRequest "
2211                              << __func__ << " "
2212                              << "list uri=" << s->relative_uri << " "
2213                              << " prefix=" << prefix << " "
2214                              << " obj path=" << name << ""
2215                              << " target = " << path << ""
2216                              << dendl;
2217       /* XXX is there a missing match-dir case (trailing '/')? */
2218       matched = true;
2219       if (name == path)
2220         exact_matched = true;
2221       return;
2222     }
2223     // try prefixes
2224     for (auto& iter : common_prefixes) {
2225       auto& name = iter.first;
2226       lsubdout(cct, rgw, 15) << "RGWStatLeafRequest "
2227                              << __func__ << " "
2228                              << "list uri=" << s->relative_uri << " "
2229                              << " prefix=" << prefix << " "
2230                              << " pref path=" << name << " (not chomped)"
2231                              << " target = " << path << ""
2232                              << dendl;
2233       matched = true;
2234       is_dir = true;
2235       break;
2236     }
2237   }
2238
2239   virtual void send_versioned_response() {
2240     send_response();
2241   }
2242 }; /* RGWStatLeafRequest */
2243
2244 /*
2245   put object
2246 */
2247
2248 class RGWWriteRequest : public RGWLibContinuedReq,
2249                         public RGWPutObj /* RGWOp */
2250 {
2251 public:
2252   const std::string& bucket_name;
2253   const std::string& obj_name;
2254   RGWFileHandle* rgw_fh;
2255   RGWPutObjProcessor* processor;
2256   RGWPutObjDataProcessor* filter;
2257   boost::optional<RGWPutObj_Compress> compressor;
2258   CompressorRef plugin;
2259   buffer::list data;
2260   uint64_t timer_id;
2261   MD5 hash;
2262   off_t real_ofs;
2263   size_t bytes_written;
2264   bool multipart;
2265   bool eio;
2266
2267   RGWWriteRequest(CephContext* _cct, RGWUserInfo *_user, RGWFileHandle* _fh,
2268                   const std::string& _bname, const std::string& _oname)
2269     : RGWLibContinuedReq(_cct, _user), bucket_name(_bname), obj_name(_oname),
2270       rgw_fh(_fh), processor(nullptr), filter(nullptr), real_ofs(0),
2271       bytes_written(0), multipart(false), eio(false) {
2272
2273     int ret = header_init();
2274     if (ret == 0) {
2275       ret = init_from_header(get_state());
2276     }
2277     op = this;
2278   }
2279
2280   bool only_bucket() override { return true; }
2281
2282   int op_init() override {
2283     // assign store, s, and dialect_handler
2284     RGWObjectCtx* rados_ctx
2285       = static_cast<RGWObjectCtx*>(get_state()->obj_ctx);
2286     // framework promises to call op_init after parent init
2287     assert(rados_ctx);
2288     RGWOp::init(rados_ctx->store, get_state(), this);
2289     op = this; // assign self as op: REQUIRED
2290     return 0;
2291   }
2292
2293   int header_init() override {
2294
2295     struct req_state* s = get_state();
2296     s->info.method = "PUT";
2297     s->op = OP_PUT;
2298
2299     /* XXX derp derp derp */
2300     std::string uri = make_uri(bucket_name, obj_name);
2301     s->relative_uri = uri;
2302     s->info.request_uri = uri; // XXX
2303     s->info.effective_uri = uri;
2304     s->info.request_params = "";
2305     s->info.domain = ""; /* XXX ? */
2306
2307     // woo
2308     s->user = user;
2309
2310     return 0;
2311   }
2312
2313   RGWPutObjProcessor *select_processor(RGWObjectCtx& obj_ctx,
2314                                        bool *is_multipart) override {
2315     struct req_state* s = get_state();
2316     uint64_t part_size = s->cct->_conf->rgw_obj_stripe_size;
2317     RGWPutObjProcessor_Atomic *processor =
2318       new RGWPutObjProcessor_Atomic(obj_ctx, s->bucket_info, s->bucket,
2319                                     s->object.name, part_size, s->req_id,
2320                                     s->bucket_info.versioning_enabled());
2321     processor->set_olh_epoch(olh_epoch);
2322     processor->set_version_id(version_id);
2323     return processor;
2324   }
2325
2326   int get_params() override {
2327     struct req_state* s = get_state();
2328     RGWAccessControlPolicy_S3 s3policy(s->cct);
2329     /* we don't have (any) headers, so just create canned ACLs */
2330     int ret = s3policy.create_canned(s->owner, s->bucket_owner, s->canned_acl);
2331     policy = s3policy;
2332     return ret;
2333   }
2334
2335   int get_data(buffer::list& _bl) override {
2336     /* XXX for now, use sharing semantics */
2337     uint32_t len = data.length();
2338     _bl.claim(data);
2339     bytes_written += len;
2340     return len;
2341   }
2342
2343   void put_data(off_t off, buffer::list& _bl) {
2344     if (off != real_ofs) {
2345       eio = true;
2346     }
2347     data.claim(_bl);
2348     real_ofs += data.length();
2349     ofs = off; /* consumed in exec_continue() */
2350   }
2351
2352   int exec_start() override;
2353   int exec_continue() override;
2354   int exec_finish() override;
2355
2356   void send_response() override {}
2357
2358   int verify_params() override {
2359     return 0;
2360   }
2361 }; /* RGWWriteRequest */
2362
2363 /*
2364   copy object
2365 */
2366 class RGWCopyObjRequest : public RGWLibRequest,
2367                           public RGWCopyObj /* RGWOp */
2368 {
2369 public:
2370   RGWFileHandle* src_parent;
2371   RGWFileHandle* dst_parent;
2372   const std::string& src_name;
2373   const std::string& dst_name;
2374
2375   RGWCopyObjRequest(CephContext* _cct, RGWUserInfo *_user,
2376                     RGWFileHandle* _src_parent, RGWFileHandle* _dst_parent,
2377                     const std::string& _src_name, const std::string& _dst_name)
2378     : RGWLibRequest(_cct, _user), src_parent(_src_parent),
2379       dst_parent(_dst_parent), src_name(_src_name), dst_name(_dst_name) {
2380     /* all requests have this */
2381     op = this;
2382
2383     /* allow this request to replace selected attrs */
2384     attrs_mod = RGWRados::ATTRSMOD_MERGE;
2385   }
2386
2387   bool only_bucket() override { return true; }
2388
2389   int op_init() override {
2390     // assign store, s, and dialect_handler
2391     RGWObjectCtx* rados_ctx
2392       = static_cast<RGWObjectCtx*>(get_state()->obj_ctx);
2393     // framework promises to call op_init after parent init
2394     assert(rados_ctx);
2395     RGWOp::init(rados_ctx->store, get_state(), this);
2396     op = this; // assign self as op: REQUIRED
2397
2398     return 0;
2399   }
2400
2401   int header_init() override {
2402
2403     struct req_state* s = get_state();
2404     s->info.method = "PUT"; // XXX check
2405     s->op = OP_PUT;
2406
2407     src_bucket_name = src_parent->bucket_name();
2408     // need s->src_bucket_name?
2409     src_object.name = src_parent->format_child_name(src_name, false);
2410     // need s->src_object?
2411
2412     dest_bucket_name = dst_parent->bucket_name();
2413     // need s->bucket.name?
2414     dest_object = dst_parent->format_child_name(dst_name, false);
2415     // need s->object_name?
2416
2417     int rc = valid_s3_object_name(dest_object);
2418     if (rc != 0)
2419       return rc;
2420
2421     /* XXX and fixup key attr (could optimize w/string ref and
2422      * dest_object) */
2423     buffer::list ux_key;
2424     fh_key fhk = dst_parent->make_fhk(dst_name);
2425     rgw::encode(fhk, ux_key);
2426     emplace_attr(RGW_ATTR_UNIX_KEY1, std::move(ux_key));
2427
2428 #if 0 /* XXX needed? */
2429     s->relative_uri = uri;
2430     s->info.request_uri = uri; // XXX
2431     s->info.effective_uri = uri;
2432     s->info.request_params = "";
2433     s->info.domain = ""; /* XXX ? */
2434 #endif
2435
2436     // woo
2437     s->user = user;
2438
2439     return 0;
2440   }
2441
2442   int get_params() override {
2443     struct req_state* s = get_state();
2444     RGWAccessControlPolicy_S3 s3policy(s->cct);
2445     /* we don't have (any) headers, so just create canned ACLs */
2446     int ret = s3policy.create_canned(s->owner, s->bucket_owner, s->canned_acl);
2447     dest_policy = s3policy;
2448     return ret;
2449   }
2450
2451   void send_response() override {}
2452   void send_partial_response(off_t ofs) override {}
2453
2454 }; /* RGWCopyObjRequest */
2455
2456 class RGWSetAttrsRequest : public RGWLibRequest,
2457                            public RGWSetAttrs /* RGWOp */
2458 {
2459 public:
2460   const std::string& bucket_name;
2461   const std::string& obj_name;
2462
2463   RGWSetAttrsRequest(CephContext* _cct, RGWUserInfo *_user,
2464                      const std::string& _bname, const std::string& _oname)
2465     : RGWLibRequest(_cct, _user), bucket_name(_bname), obj_name(_oname) {
2466     op = this;
2467   }
2468
2469   bool only_bucket() override { return false; }
2470
2471   int op_init() override {
2472     // assign store, s, and dialect_handler
2473     RGWObjectCtx* rados_ctx
2474       = static_cast<RGWObjectCtx*>(get_state()->obj_ctx);
2475     // framework promises to call op_init after parent init
2476     assert(rados_ctx);
2477     RGWOp::init(rados_ctx->store, get_state(), this);
2478     op = this; // assign self as op: REQUIRED
2479     return 0;
2480   }
2481
2482   int header_init() override {
2483
2484     struct req_state* s = get_state();
2485     s->info.method = "PUT";
2486     s->op = OP_PUT;
2487
2488     /* XXX derp derp derp */
2489     std::string uri = make_uri(bucket_name, obj_name);
2490     s->relative_uri = uri;
2491     s->info.request_uri = uri; // XXX
2492     s->info.effective_uri = uri;
2493     s->info.request_params = "";
2494     s->info.domain = ""; /* XXX ? */
2495
2496     // woo
2497     s->user = user;
2498
2499     return 0;
2500   }
2501
2502   int get_params() override {
2503     return 0;
2504   }
2505
2506   void send_response() override {}
2507
2508 }; /* RGWSetAttrsRequest */
2509
2510 } /* namespace rgw */
2511
2512 #endif /* RGW_FILE_H */