Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_file.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/compat.h"
5 #include "include/rados/rgw_file.h"
6
7 #include <sys/types.h>
8 #include <sys/stat.h>
9
10 #include "rgw_lib.h"
11 #include "rgw_rados.h"
12 #include "rgw_resolve.h"
13 #include "rgw_op.h"
14 #include "rgw_rest.h"
15 #include "rgw_acl.h"
16 #include "rgw_acl_s3.h"
17 #include "rgw_frontend.h"
18 #include "rgw_request.h"
19 #include "rgw_process.h"
20 #include "rgw_rest_user.h"
21 #include "rgw_rest_s3.h"
22 #include "rgw_os_lib.h"
23 #include "rgw_auth_s3.h"
24 #include "rgw_user.h"
25 #include "rgw_bucket.h"
26
27 #include "rgw_file.h"
28 #include "rgw_lib_frontend.h"
29
30 #include <atomic>
31
32 #define dout_subsys ceph_subsys_rgw
33
34 using namespace rgw;
35
36 namespace rgw {
37
38   extern RGWLib rgwlib;
39
40   const string RGWFileHandle::root_name = "/";
41
42   std::atomic<uint32_t> RGWLibFS::fs_inst_counter;
43
44   uint32_t RGWLibFS::write_completion_interval_s = 10;
45
46   ceph::timer<ceph::mono_clock> RGWLibFS::write_timer{
47     ceph::construct_suspended};
48
49   inline int valid_fs_bucket_name(const string& name) {
50     int rc = valid_s3_bucket_name(name, false /* relaxed */);
51     if (rc != 0) {
52       if (name.size() > 255)
53         return -ENAMETOOLONG;
54       return -EINVAL;
55     }
56     return 0;
57   }
58
59   inline int valid_fs_object_name(const string& name) {
60     int rc = valid_s3_object_name(name);
61     if (rc != 0) {
62       if (name.size() > 1024)
63         return -ENAMETOOLONG;
64       return -EINVAL;
65     }
66     return 0;
67   }
68
69   LookupFHResult RGWLibFS::stat_bucket(RGWFileHandle* parent, const char *path,
70                                        RGWLibFS::BucketStats& bs,
71                                        uint32_t flags)
72   {
73     LookupFHResult fhr{nullptr, 0};
74     std::string bucket_name{path};
75     RGWStatBucketRequest req(cct, get_user(), bucket_name, bs);
76
77     int rc = rgwlib.get_fe()->execute_req(&req);
78     if ((rc == 0) &&
79         (req.get_ret() == 0) &&
80         (req.matched())) {
81       fhr = lookup_fh(parent, path,
82                       (flags & RGWFileHandle::FLAG_LOCKED)|
83                       RGWFileHandle::FLAG_CREATE|
84                       RGWFileHandle::FLAG_BUCKET);
85       if (get<0>(fhr)) {
86         RGWFileHandle* rgw_fh = get<0>(fhr);
87         if (! (flags & RGWFileHandle::FLAG_LOCKED)) {
88           rgw_fh->mtx.lock();
89         }
90         rgw_fh->set_times(req.get_ctime());
91         /* restore attributes */
92         auto ux_key = req.get_attr(RGW_ATTR_UNIX_KEY1);
93         auto ux_attrs = req.get_attr(RGW_ATTR_UNIX1);
94         if (ux_key && ux_attrs) {
95           DecodeAttrsResult dar = rgw_fh->decode_attrs(ux_key, ux_attrs);
96           if (get<0>(dar) || get<1>(dar)) {
97             update_fh(rgw_fh);
98           }
99         }
100         if (! (flags & RGWFileHandle::FLAG_LOCKED)) {
101           rgw_fh->mtx.unlock();
102         }
103       }
104     }
105     return fhr;
106   }
107
108   LookupFHResult RGWLibFS::stat_leaf(RGWFileHandle* parent,
109                                      const char *path,
110                                      enum rgw_fh_type type,
111                                      uint32_t flags)
112   {
113     /* find either-of <object_name>, <object_name/>, only one of
114      * which should exist;  atomicity? */
115     using std::get;
116
117     LookupFHResult fhr{nullptr, 0};
118
119     /* XXX the need for two round-trip operations to identify file or
120      * directory leaf objects is unecessary--the current proposed
121      * mechanism to avoid this is to store leaf object names with an
122      * object locator w/o trailing slash */
123
124     std::string obj_path = parent->format_child_name(path, false);
125
126     for (auto ix : { 0, 1, 2 }) {
127       switch (ix) {
128       case 0:
129       {
130         /* type hint */
131         if (type == RGW_FS_TYPE_DIRECTORY)
132           continue;
133
134         RGWStatObjRequest req(cct, get_user(),
135                               parent->bucket_name(), obj_path,
136                               RGWStatObjRequest::FLAG_NONE);
137         int rc = rgwlib.get_fe()->execute_req(&req);
138         if ((rc == 0) &&
139             (req.get_ret() == 0)) {
140           fhr = lookup_fh(parent, path, RGWFileHandle::FLAG_CREATE);
141           if (get<0>(fhr)) {
142             RGWFileHandle* rgw_fh = get<0>(fhr);
143             lock_guard guard(rgw_fh->mtx);
144             rgw_fh->set_size(req.get_size());
145             rgw_fh->set_times(req.get_mtime());
146             /* restore attributes */
147             auto ux_key = req.get_attr(RGW_ATTR_UNIX_KEY1);
148             auto ux_attrs = req.get_attr(RGW_ATTR_UNIX1);
149             if (ux_key && ux_attrs) {
150               DecodeAttrsResult dar = rgw_fh->decode_attrs(ux_key, ux_attrs);
151               if (get<0>(dar) || get<1>(dar)) {
152                 update_fh(rgw_fh);
153               }
154             }
155           }
156           goto done;
157         }
158       }
159       break;
160       case 1:
161       {
162         /* try dir form */
163         /* type hint */
164         if (type == RGW_FS_TYPE_FILE)
165           continue;
166
167         obj_path += "/";
168         RGWStatObjRequest req(cct, get_user(),
169                               parent->bucket_name(), obj_path,
170                               RGWStatObjRequest::FLAG_NONE);
171         int rc = rgwlib.get_fe()->execute_req(&req);
172         if ((rc == 0) &&
173             (req.get_ret() == 0)) {
174           fhr = lookup_fh(parent, path, RGWFileHandle::FLAG_DIRECTORY);
175           if (get<0>(fhr)) {
176             RGWFileHandle* rgw_fh = get<0>(fhr);
177             lock_guard guard(rgw_fh->mtx);
178             rgw_fh->set_size(req.get_size());
179             rgw_fh->set_times(req.get_mtime());
180             /* restore attributes */
181             auto ux_key = req.get_attr(RGW_ATTR_UNIX_KEY1);
182             auto ux_attrs = req.get_attr(RGW_ATTR_UNIX1);
183             if (ux_key && ux_attrs) {
184               DecodeAttrsResult dar = rgw_fh->decode_attrs(ux_key, ux_attrs);
185               if (get<0>(dar) || get<1>(dar)) {
186                 update_fh(rgw_fh);
187               }
188             }
189           }
190           goto done;
191         }
192       }
193       break;
194       case 2:
195       {
196         std::string object_name{path};
197         RGWStatLeafRequest req(cct, get_user(), parent, object_name);
198         int rc = rgwlib.get_fe()->execute_req(&req);
199         if ((rc == 0) &&
200             (req.get_ret() == 0)) {
201           if (req.matched) {
202             /* we need rgw object's key name equal to file name, if
203              * not return NULL */
204             if ((flags & RGWFileHandle::FLAG_EXACT_MATCH) &&
205                 !req.exact_matched) {
206               lsubdout(get_context(), rgw, 15)
207                 << __func__
208                 << ": stat leaf not exact match file name = "
209                 << path << dendl;
210               goto done;
211             }
212             fhr = lookup_fh(parent, path,
213                             RGWFileHandle::FLAG_CREATE|
214                             ((req.is_dir) ?
215                               RGWFileHandle::FLAG_DIRECTORY :
216                               RGWFileHandle::FLAG_NONE));
217             /* XXX we don't have an object--in general, there need not
218              * be one (just a path segment in some other object).  In
219              * actual leaf an object exists, but we'd need another round
220              * trip to get attrs */
221             if (get<0>(fhr)) {
222               /* for now use the parent object's mtime */
223               RGWFileHandle* rgw_fh = get<0>(fhr);
224               lock_guard guard(rgw_fh->mtx);
225               rgw_fh->set_mtime(parent->get_mtime());
226             }
227           }
228         }
229       }
230       break;
231       default:
232         /* not reached */
233         break;
234       }
235     }
236   done:
237     return fhr;
238   } /* RGWLibFS::stat_leaf */
239
240   int RGWLibFS::read(RGWFileHandle* rgw_fh, uint64_t offset, size_t length,
241                      size_t* bytes_read, void* buffer, uint32_t flags)
242   {
243     if (! rgw_fh->is_file())
244       return -EINVAL;
245
246     if (rgw_fh->deleted())
247       return -ESTALE;
248
249     RGWReadRequest req(get_context(), get_user(), rgw_fh, offset, length,
250                        buffer);
251
252     int rc = rgwlib.get_fe()->execute_req(&req);
253     if ((rc == 0) &&
254         (req.get_ret() == 0)) {
255       lock_guard(rgw_fh->mtx);
256       rgw_fh->set_atime(real_clock::to_timespec(real_clock::now()));
257       *bytes_read = req.nread;
258     }
259
260     return rc;
261   }
262
263   int RGWLibFS::unlink(RGWFileHandle* rgw_fh, const char* name, uint32_t flags)
264   {
265     int rc = 0;
266     BucketStats bs;
267     RGWFileHandle* parent = nullptr;
268     RGWFileHandle* bkt_fh = nullptr;
269
270     if (unlikely(flags & RGWFileHandle::FLAG_UNLINK_THIS)) {
271       /* LOCKED */
272       parent = rgw_fh->get_parent();
273     } else {
274       /* atomicity */
275       parent = rgw_fh;
276       LookupFHResult fhr = lookup_fh(parent, name, RGWFileHandle::FLAG_LOCK);
277       rgw_fh = get<0>(fhr);
278       /* LOCKED */
279     }
280
281     if (parent->is_root()) {
282       /* a bucket may have an object storing Unix attributes, check
283        * for and delete it */
284       LookupFHResult fhr;
285       fhr = stat_bucket(parent, name, bs, (rgw_fh) ?
286                         RGWFileHandle::FLAG_LOCKED :
287                         RGWFileHandle::FLAG_NONE);
288       bkt_fh = get<0>(fhr);
289       if (unlikely(! bkt_fh)) {
290         /* implies !rgw_fh, so also !LOCKED */
291         return -ENOENT;
292       }
293
294       if (bs.num_entries > 1) {
295         unref(bkt_fh); /* return stat_bucket ref */
296         if (likely(!! rgw_fh)) { /* return lock and ref from
297                                   * lookup_fh (or caller in the
298                                   * special case of
299                                   * RGWFileHandle::FLAG_UNLINK_THIS) */
300           rgw_fh->mtx.unlock();
301           unref(rgw_fh);
302         }
303         return -ENOTEMPTY;
304       } else {
305         /* delete object w/key "<bucket>/" (uxattrs), if any */
306         string oname{"/"};
307         RGWDeleteObjRequest req(cct, get_user(), bkt_fh->bucket_name(), oname);
308         rc = rgwlib.get_fe()->execute_req(&req);
309         /* don't care if ENOENT */
310         unref(bkt_fh);
311       }
312
313       string bname{name};
314       RGWDeleteBucketRequest req(cct, get_user(), bname);
315       rc = rgwlib.get_fe()->execute_req(&req);
316       if (! rc) {
317         rc = req.get_ret();
318       }
319     } else {
320       /*
321        * leaf object
322        */
323       if (! rgw_fh) {
324         /* XXX for now, peform a hard lookup to deduce the type of
325          * object to be deleted ("foo" vs. "foo/")--also, ensures
326          * atomicity at this endpoint */
327         struct rgw_file_handle *fh;
328         rc = rgw_lookup(get_fs(), parent->get_fh(), name, &fh,
329                         RGW_LOOKUP_FLAG_NONE);
330         if (!! rc)
331           return rc;
332
333         /* rgw_fh ref+ */
334         rgw_fh = get_rgwfh(fh);
335         rgw_fh->mtx.lock(); /* LOCKED */
336       }
337
338       std::string oname = rgw_fh->relative_object_name();
339       if (rgw_fh->is_dir()) {
340         /* for the duration of our cache timer, trust positive
341          * child cache */
342         if (rgw_fh->has_children()) {
343           rgw_fh->mtx.unlock();
344           unref(rgw_fh);
345           return(-ENOTEMPTY);
346         }
347         oname += "/";
348       }
349       RGWDeleteObjRequest req(cct, get_user(), parent->bucket_name(),
350                               oname);
351       rc = rgwlib.get_fe()->execute_req(&req);
352       if (! rc) {
353         rc = req.get_ret();
354       }
355     }
356
357     /* ENOENT when raced with other s3 gateway */
358     if (! rc || rc == -ENOENT) {
359       rgw_fh->flags |= RGWFileHandle::FLAG_DELETED;
360       fh_cache.remove(rgw_fh->fh.fh_hk.object, rgw_fh,
361                       RGWFileHandle::FHCache::FLAG_LOCK);
362     }
363
364     if (! rc) {
365       real_time t = real_clock::now();
366       parent->set_mtime(real_clock::to_timespec(t));
367       parent->set_ctime(real_clock::to_timespec(t));
368     }
369
370     rgw_fh->mtx.unlock();
371     unref(rgw_fh);
372
373     return rc;
374   } /* RGWLibFS::unlink */
375
376   int RGWLibFS::rename(RGWFileHandle* src_fh, RGWFileHandle* dst_fh,
377                        const char *_src_name, const char *_dst_name)
378
379   {
380     /* XXX initial implementation: try-copy, and delete if copy
381      * succeeds */
382     int rc = -EINVAL;
383
384     real_time t;
385
386     std::string src_name{_src_name};
387     std::string dst_name{_dst_name};
388
389     /* atomicity */
390     LookupFHResult fhr = lookup_fh(src_fh, _src_name, RGWFileHandle::FLAG_LOCK);
391     RGWFileHandle* rgw_fh = get<0>(fhr);
392
393     /* should not happen */
394     if (! rgw_fh) {
395       ldout(get_context(), 0) << __func__
396                        << " BUG no such src renaming path="
397                        << src_name
398                        << dendl;
399       goto out;
400     }
401
402     /* forbid renaming of directories (unreasonable at scale) */
403     if (rgw_fh->is_dir()) {
404       ldout(get_context(), 12) << __func__
405                         << " rejecting attempt to rename directory path="
406                         << rgw_fh->full_object_name()
407                         << dendl;
408       rc = -EPERM;
409       goto unlock;
410     }
411
412     /* forbid renaming open files (violates intent, for now) */
413     if (rgw_fh->is_open()) {
414       ldout(get_context(), 12) << __func__
415                         << " rejecting attempt to rename open file path="
416                         << rgw_fh->full_object_name()
417                         << dendl;
418       rc = -EPERM;
419       goto unlock;
420     }
421
422     t = real_clock::now();
423
424     for (int ix : {0, 1}) {
425       switch (ix) {
426       case 0:
427       {
428         RGWCopyObjRequest req(cct, get_user(), src_fh, dst_fh, src_name,
429                               dst_name);
430         int rc = rgwlib.get_fe()->execute_req(&req);
431         if ((rc != 0) ||
432             ((rc = req.get_ret()) != 0)) {
433           ldout(get_context(), 1)
434             << __func__
435             << " rename step 0 failed src="
436             << src_fh->full_object_name() << " " << src_name
437             << " dst=" << dst_fh->full_object_name()
438             << " " << dst_name
439             << "rc " << rc
440             << dendl;
441           goto unlock;
442         }
443         ldout(get_context(), 12)
444           << __func__
445           << " rename step 0 success src="
446           << src_fh->full_object_name() << " " << src_name
447           << " dst=" << dst_fh->full_object_name()
448           << " " << dst_name
449           << " rc " << rc
450           << dendl;
451         /* update dst change id */
452         dst_fh->set_times(t);
453       }
454       break;
455       case 1:
456       {
457         rc = this->unlink(rgw_fh /* LOCKED */, _src_name,
458                           RGWFileHandle::FLAG_UNLINK_THIS);
459         /* !LOCKED, -ref */
460         if (! rc) {
461           ldout(get_context(), 12)
462             << __func__
463             << " rename step 1 success src="
464             << src_fh->full_object_name() << " " << src_name
465             << " dst=" << dst_fh->full_object_name()
466             << " " << dst_name
467             << " rc " << rc
468             << dendl;
469           /* update src change id */
470           src_fh->set_times(t);
471         } else {
472           ldout(get_context(), 1)
473             << __func__
474             << " rename step 1 failed src="
475             << src_fh->full_object_name() << " " << src_name
476             << " dst=" << dst_fh->full_object_name()
477             << " " << dst_name
478             << " rc " << rc
479             << dendl;
480         }
481       }
482       goto out;
483       default:
484         abort();
485       } /* switch */
486     } /* ix */
487   unlock:
488     rgw_fh->mtx.unlock(); /* !LOCKED */
489     unref(rgw_fh); /* -ref */
490
491   out:
492     return rc;
493   } /* RGWLibFS::rename */
494
495   MkObjResult RGWLibFS::mkdir(RGWFileHandle* parent, const char *name,
496                               struct stat *st, uint32_t mask, uint32_t flags)
497   {
498     int rc, rc2;
499     rgw_file_handle *lfh;
500
501     rc = rgw_lookup(get_fs(), parent->get_fh(), name, &lfh,
502                     RGW_LOOKUP_FLAG_NONE);
503     if (! rc) {
504       /* conflict! */
505       rc = rgw_fh_rele(get_fs(), lfh, RGW_FH_RELE_FLAG_NONE);
506       return MkObjResult{nullptr, -EEXIST};
507     }
508
509     MkObjResult mkr{nullptr, -EINVAL};
510     LookupFHResult fhr;
511     RGWFileHandle* rgw_fh = nullptr;
512     buffer::list ux_key, ux_attrs;
513
514     fhr = lookup_fh(parent, name,
515                     RGWFileHandle::FLAG_CREATE|
516                     RGWFileHandle::FLAG_DIRECTORY|
517                     RGWFileHandle::FLAG_LOCK);
518     rgw_fh = get<0>(fhr);
519     if (rgw_fh) {
520       rgw_fh->create_stat(st, mask);
521       rgw_fh->set_times(real_clock::now());
522       /* save attrs */
523       rgw_fh->encode_attrs(ux_key, ux_attrs);
524       if (st)
525         rgw_fh->stat(st);
526       get<0>(mkr) = rgw_fh;
527     } else {
528       get<1>(mkr) = -EIO;
529       return mkr;
530     }
531
532     if (parent->is_root()) {
533       /* bucket */
534       string bname{name};
535       /* enforce S3 name restrictions */
536       rc = valid_fs_bucket_name(bname);
537       if (rc != 0) {
538         rgw_fh->flags |= RGWFileHandle::FLAG_DELETED;
539         fh_cache.remove(rgw_fh->fh.fh_hk.object, rgw_fh,
540                         RGWFileHandle::FHCache::FLAG_LOCK);
541         rgw_fh->mtx.unlock();
542         unref(rgw_fh);
543         get<0>(mkr) = nullptr;
544         get<1>(mkr) = rc;
545         return mkr;
546       }
547
548       RGWCreateBucketRequest req(get_context(), get_user(), bname);
549
550       /* save attrs */
551       req.emplace_attr(RGW_ATTR_UNIX_KEY1, std::move(ux_key));
552       req.emplace_attr(RGW_ATTR_UNIX1, std::move(ux_attrs));
553
554       rc = rgwlib.get_fe()->execute_req(&req);
555       rc2 = req.get_ret();
556     } else {
557       /* create an object representing the directory */
558       buffer::list bl;
559       string dir_name = parent->format_child_name(name, true);
560
561       /* need valid S3 name (characters, length <= 1024, etc) */
562       rc = valid_fs_object_name(dir_name);
563       if (rc != 0) {
564         rgw_fh->flags |= RGWFileHandle::FLAG_DELETED;
565         fh_cache.remove(rgw_fh->fh.fh_hk.object, rgw_fh,
566                         RGWFileHandle::FHCache::FLAG_LOCK);
567         rgw_fh->mtx.unlock();
568         unref(rgw_fh);
569         get<0>(mkr) = nullptr;
570         get<1>(mkr) = rc;
571         return mkr;
572       }
573
574       RGWPutObjRequest req(get_context(), get_user(), parent->bucket_name(),
575                           dir_name, bl);
576
577       /* save attrs */
578       req.emplace_attr(RGW_ATTR_UNIX_KEY1, std::move(ux_key));
579       req.emplace_attr(RGW_ATTR_UNIX1, std::move(ux_attrs));
580
581       rc = rgwlib.get_fe()->execute_req(&req);
582       rc2 = req.get_ret();
583     }
584
585     if (! ((rc == 0) &&
586            (rc2 == 0))) {
587       /* op failed */
588       rgw_fh->flags |= RGWFileHandle::FLAG_DELETED;
589       rgw_fh->mtx.unlock(); /* !LOCKED */
590       unref(rgw_fh);
591       get<0>(mkr) = nullptr;
592       /* fixup rc */
593       if (!rc)
594         rc = rc2;
595     } else {
596       real_time t = real_clock::now();
597       parent->set_mtime(real_clock::to_timespec(t));
598       parent->set_ctime(real_clock::to_timespec(t));
599       rgw_fh->mtx.unlock(); /* !LOCKED */
600     }
601
602     get<1>(mkr) = rc;
603
604     return mkr;
605   } /* RGWLibFS::mkdir */
606
607   MkObjResult RGWLibFS::create(RGWFileHandle* parent, const char *name,
608                               struct stat *st, uint32_t mask, uint32_t flags)
609   {
610     int rc, rc2;
611
612     using std::get;
613
614     rgw_file_handle *lfh;
615     rc = rgw_lookup(get_fs(), parent->get_fh(), name, &lfh,
616                     RGW_LOOKUP_FLAG_NONE);
617     if (! rc) {
618       /* conflict! */
619       rc = rgw_fh_rele(get_fs(), lfh, RGW_FH_RELE_FLAG_NONE);
620       return MkObjResult{nullptr, -EEXIST};
621     }
622
623     /* expand and check name */
624     std::string obj_name = parent->format_child_name(name, false);
625     rc = valid_fs_object_name(obj_name);
626     if (rc != 0) {
627       return MkObjResult{nullptr, rc};
628     }
629
630     /* create it */
631     buffer::list bl;
632     RGWPutObjRequest req(cct, get_user(), parent->bucket_name(), obj_name, bl);
633     MkObjResult mkr{nullptr, -EINVAL};
634
635     rc = rgwlib.get_fe()->execute_req(&req);
636     rc2 = req.get_ret();
637
638     if ((rc == 0) &&
639         (rc2 == 0)) {
640       /* XXX atomicity */
641       LookupFHResult fhr = lookup_fh(parent, name, RGWFileHandle::FLAG_CREATE |
642                                                    RGWFileHandle::FLAG_LOCK);
643       RGWFileHandle* rgw_fh = get<0>(fhr);
644       if (rgw_fh) {
645         if (get<1>(fhr) & RGWFileHandle::FLAG_CREATE) {
646           /* fill in stat data */
647           real_time t = real_clock::now();
648           rgw_fh->create_stat(st, mask);
649           rgw_fh->set_times(t);
650
651           parent->set_mtime(real_clock::to_timespec(t));
652           parent->set_ctime(real_clock::to_timespec(t));
653         }
654         if (st)
655           (void) rgw_fh->stat(st);
656         get<0>(mkr) = rgw_fh;
657         rgw_fh->mtx.unlock();
658       } else
659         rc = -EIO;
660     }
661
662     get<1>(mkr) = rc;
663
664     return mkr;
665   } /* RGWLibFS::create */
666
667   int RGWLibFS::getattr(RGWFileHandle* rgw_fh, struct stat* st)
668   {
669     switch(rgw_fh->fh.fh_type) {
670     case RGW_FS_TYPE_FILE:
671     {
672       if (rgw_fh->deleted())
673         return -ESTALE;
674     }
675     break;
676     default:
677       break;
678     };
679
680     return rgw_fh->stat(st);
681   } /* RGWLibFS::getattr */
682
683   int RGWLibFS::setattr(RGWFileHandle* rgw_fh, struct stat* st, uint32_t mask,
684                         uint32_t flags)
685   {
686     int rc, rc2;
687     buffer::list ux_key, ux_attrs;
688
689     lock_guard guard(rgw_fh->mtx);
690
691     switch(rgw_fh->fh.fh_type) {
692     case RGW_FS_TYPE_FILE:
693     {
694       if (rgw_fh->deleted())
695         return -ESTALE;
696     }
697     break;
698     default:
699       break;
700     };
701
702     string obj_name{rgw_fh->relative_object_name()};
703
704     if (rgw_fh->is_dir() &&
705         (likely(! rgw_fh->is_bucket()))) {
706       obj_name += "/";
707     }
708
709     RGWSetAttrsRequest req(cct, get_user(), rgw_fh->bucket_name(), obj_name);
710
711     rgw_fh->create_stat(st, mask);
712     rgw_fh->encode_attrs(ux_key, ux_attrs);
713
714     /* save attrs */
715     req.emplace_attr(RGW_ATTR_UNIX_KEY1, std::move(ux_key));
716     req.emplace_attr(RGW_ATTR_UNIX1, std::move(ux_attrs));
717
718     rc = rgwlib.get_fe()->execute_req(&req);
719     rc2 = req.get_ret();
720
721     if (rc == -ENOENT) {
722       /* special case:  materialize placeholder dir */
723       buffer::list bl;
724       RGWPutObjRequest req(get_context(), get_user(), rgw_fh->bucket_name(),
725                            obj_name, bl);
726
727       rgw_fh->encode_attrs(ux_key, ux_attrs); /* because std::moved */
728
729       /* save attrs */
730       req.emplace_attr(RGW_ATTR_UNIX_KEY1, std::move(ux_key));
731       req.emplace_attr(RGW_ATTR_UNIX1, std::move(ux_attrs));
732
733       rc = rgwlib.get_fe()->execute_req(&req);
734       rc2 = req.get_ret();
735     }
736
737     if ((rc != 0) || (rc2 != 0)) {
738       return -EIO;
739     }
740
741     rgw_fh->set_ctime(real_clock::to_timespec(real_clock::now()));
742
743     return 0;
744   } /* RGWLibFS::setattr */
745
746   /* called under rgw_fh->mtx held */
747   void RGWLibFS::update_fh(RGWFileHandle *rgw_fh)
748   {
749     int rc, rc2;
750     string obj_name{rgw_fh->relative_object_name()};
751     buffer::list ux_key, ux_attrs;
752
753     if (rgw_fh->is_dir() &&
754         (likely(! rgw_fh->is_bucket()))) {
755       obj_name += "/";
756     }
757
758     lsubdout(get_context(), rgw, 17)
759       << __func__
760       << " update old versioned fh : " << obj_name
761       << dendl;
762
763     RGWSetAttrsRequest req(cct, get_user(), rgw_fh->bucket_name(), obj_name);
764
765     rgw_fh->encode_attrs(ux_key, ux_attrs);
766
767     req.emplace_attr(RGW_ATTR_UNIX_KEY1, std::move(ux_key));
768     req.emplace_attr(RGW_ATTR_UNIX1, std::move(ux_attrs));
769
770     rc = rgwlib.get_fe()->execute_req(&req);
771     rc2 = req.get_ret();
772
773     if ((rc != 0) || (rc2 != 0)) {
774       lsubdout(get_context(), rgw, 17)
775         << __func__
776         << " update fh failed : " << obj_name
777         << dendl;
778     }
779   } /* RGWLibFS::update_fh */
780
781   void RGWLibFS::close()
782   {
783     state.flags |= FLAG_CLOSED;
784
785     class ObjUnref
786     {
787       RGWLibFS* fs;
788     public:
789       ObjUnref(RGWLibFS* _fs) : fs(_fs) {}
790       void operator()(RGWFileHandle* fh) const {
791         lsubdout(fs->get_context(), rgw, 5)
792           << __func__
793           << fh->name
794           << " before ObjUnref refs=" << fh->get_refcnt()
795           << dendl;
796         fs->unref(fh);
797       }
798     };
799
800     /* force cache drain, forces objects to evict */
801     fh_cache.drain(ObjUnref(this),
802                   RGWFileHandle::FHCache::FLAG_LOCK);
803     rgwlib.get_fe()->get_process()->unregister_fs(this);
804     rele();
805   } /* RGWLibFS::close */
806
807   inline std::ostream& operator<<(std::ostream &os, struct timespec const &ts) {
808       os << "<timespec: tv_sec=";
809       os << ts.tv_sec;
810       os << "; tv_nsec=";
811       os << ts.tv_nsec;
812       os << ">";
813     return os;
814   }
815
816   std::ostream& operator<<(std::ostream &os, RGWLibFS::event const &ev) {
817     os << "<event:";
818       switch (ev.t) {
819       case RGWLibFS::event::type::READDIR:
820         os << "type=READDIR;";
821         break;
822       default:
823         os << "type=UNKNOWN;";
824         break;
825       };
826     os << "fid=" << ev.fhk.fh_hk.bucket << ":" << ev.fhk.fh_hk.object
827        << ";ts=" << ev.ts << ">";
828     return os;
829   }
830
831   void RGWLibFS::gc()
832   {
833     using std::get;
834     using directory = RGWFileHandle::directory;
835
836     /* dirent invalidate timeout--basically, the upper-bound on
837      * inconsistency with the S3 namespace */
838     auto expire_s
839       = get_context()->_conf->rgw_nfs_namespace_expire_secs;
840
841     /* max events to gc in one cycle */
842     uint32_t max_ev = get_context()->_conf->rgw_nfs_max_gc;
843
844     struct timespec now, expire_ts;
845     event_vector ve;
846     bool stop = false;
847     std::deque<event> &events = state.events;
848
849     do {
850       (void) clock_gettime(CLOCK_MONOTONIC_COARSE, &now);
851       lsubdout(get_context(), rgw, 15)
852         << "GC: top of expire loop"
853         << " now=" << now
854         << " expire_s=" << expire_s
855         << dendl;
856       {
857         lock_guard guard(state.mtx); /* LOCKED */
858         /* just return if no events */
859         if (events.empty()) {
860           return;
861         }
862         uint32_t _max_ev =
863           (events.size() < 500) ? max_ev : (events.size() / 4);
864         for (uint32_t ix = 0; (ix < _max_ev) && (events.size() > 0); ++ix) {
865           event& ev = events.front();
866           expire_ts = ev.ts;
867           expire_ts.tv_sec += expire_s;
868           if (expire_ts > now) {
869             stop = true;
870             break;
871           }
872           ve.push_back(ev);
873           events.pop_front();
874         }
875       } /* anon */
876       /* !LOCKED */
877       for (auto& ev : ve) {
878         lsubdout(get_context(), rgw, 15)
879           << "try-expire ev: " << ev << dendl;
880         if (likely(ev.t == event::type::READDIR)) {
881           RGWFileHandle* rgw_fh = lookup_handle(ev.fhk.fh_hk);
882           lsubdout(get_context(), rgw, 15)
883             << "ev rgw_fh: " << rgw_fh << dendl;
884           if (rgw_fh) {
885             RGWFileHandle::directory* d;
886             if (unlikely(! rgw_fh->is_dir())) {
887               lsubdout(get_context(), rgw, 0)
888                 << __func__
889                 << " BUG non-directory found with READDIR event "
890                 << "(" << rgw_fh->bucket_name() << ","
891                 << rgw_fh->object_name() << ")"
892                 << dendl;
893               goto rele;
894             }
895             /* maybe clear state */
896             d = get<directory>(&rgw_fh->variant_type);
897             if (d) {
898               struct timespec ev_ts = ev.ts;
899               lock_guard guard(rgw_fh->mtx);
900               struct timespec d_last_readdir = d->last_readdir;
901               if (unlikely(ev_ts < d_last_readdir)) {
902                 /* readdir cycle in progress, don't invalidate */
903                 lsubdout(get_context(), rgw, 15)
904                   << "GC: delay expiration for "
905                   << rgw_fh->object_name()
906                   << " ev.ts=" << ev_ts
907                   << " last_readdir=" << d_last_readdir
908                   << dendl;
909                 continue;
910               } else {
911                 lsubdout(get_context(), rgw, 15)
912                   << "GC: expiring "
913                   << rgw_fh->object_name()
914                   << dendl;
915                 rgw_fh->clear_state();
916                 rgw_fh->invalidate();
917               }
918             }
919           rele:
920             unref(rgw_fh);
921           } /* rgw_fh */
922         } /* event::type::READDIR */
923       } /* ev */
924       ve.clear();
925     } while (! (stop || shutdown));
926   } /* RGWLibFS::gc */
927
928   std::ostream& operator<<(std::ostream &os,
929                            RGWFileHandle const &rgw_fh)
930   {
931     const auto& fhk = rgw_fh.get_key();
932     const auto& fh = const_cast<RGWFileHandle&>(rgw_fh).get_fh();
933     os << "<RGWFileHandle:";
934     os << "addr=" << &rgw_fh << ";";
935     switch (fh->fh_type) {
936     case RGW_FS_TYPE_DIRECTORY:
937         os << "type=DIRECTORY;";
938         break;
939     case RGW_FS_TYPE_FILE:
940         os << "type=FILE;";
941         break;
942     default:
943         os << "type=UNKNOWN;";
944         break;
945       };
946     os << "fid=" << fhk.fh_hk.bucket << ":" << fhk.fh_hk.object << ";";
947     os << "name=" << rgw_fh.object_name() << ";";
948     os << "refcnt=" << rgw_fh.get_refcnt() << ";";
949     os << ">";
950     return os;
951   }
952
953   RGWFileHandle::~RGWFileHandle() {
954     /* in the non-delete case, handle may still be in handle table */
955     if (fh_hook.is_linked()) {
956       fs->fh_cache.remove(fh.fh_hk.object, this, FHCache::FLAG_LOCK);
957     }
958     /* cond-unref parent */
959     if (parent && (! parent->is_mount())) {
960       /* safe because if parent->unref causes its deletion,
961        * there are a) by refcnt, no other objects/paths pointing
962        * to it and b) by the semantics of valid iteration of
963        * fh_lru (observed, e.g., by cohort_lru<T,...>::drain())
964        * no unsafe iterators reaching it either--n.b., this constraint
965        * is binding oncode which may in future attempt to e.g.,
966        * cause the eviction of objects in LRU order */
967       (void) get_fs()->unref(parent);
968     }
969   }
970
971   void RGWFileHandle::encode_attrs(ceph::buffer::list& ux_key1,
972                                    ceph::buffer::list& ux_attrs1)
973   {
974     fh_key fhk(this->fh.fh_hk);
975     rgw::encode(fhk, ux_key1);
976     rgw::encode(*this, ux_attrs1);
977   } /* RGWFileHandle::encode_attrs */
978
979   DecodeAttrsResult RGWFileHandle::decode_attrs(const ceph::buffer::list* ux_key1,
980                                                 const ceph::buffer::list* ux_attrs1)
981   {
982     DecodeAttrsResult dar { false, false };
983     fh_key fhk;
984     auto bl_iter_key1  = const_cast<buffer::list*>(ux_key1)->begin();
985     rgw::decode(fhk, bl_iter_key1);
986     if (fhk.version >= 2) {
987       assert(this->fh.fh_hk == fhk.fh_hk);
988     } else {
989       get<0>(dar) = true;
990     }
991
992     auto bl_iter_unix1 = const_cast<buffer::list*>(ux_attrs1)->begin();
993     rgw::decode(*this, bl_iter_unix1);
994     if (this->state.version < 2) {
995       get<1>(dar) = true;
996     }
997
998     return dar;
999   } /* RGWFileHandle::decode_attrs */
1000
1001   bool RGWFileHandle::reclaim() {
1002     lsubdout(fs->get_context(), rgw, 17)
1003       << __func__ << " " << *this
1004       << dendl;
1005     /* remove if still in fh_cache */
1006     if (fh_hook.is_linked()) {
1007       fs->fh_cache.remove(fh.fh_hk.object, this, FHCache::FLAG_LOCK);
1008     }
1009     return true;
1010   } /* RGWFileHandle::reclaim */
1011
1012   bool RGWFileHandle::has_children() const
1013   {
1014     if (unlikely(! is_dir()))
1015       return false;
1016
1017     RGWRMdirCheck req(fs->get_context(), fs->get_user(), this);
1018     int rc = rgwlib.get_fe()->execute_req(&req);
1019     if (! rc) {
1020       return req.valid && req.has_children;
1021     }
1022
1023     return false;
1024   }
1025
1026   std::ostream& operator<<(std::ostream &os,
1027                            RGWFileHandle::readdir_offset const &offset)
1028   {
1029     using boost::get;
1030     if (unlikely(!! get<uint64_t*>(&offset))) {
1031       uint64_t* ioff = get<uint64_t*>(offset);
1032       os << *ioff;
1033     }
1034     else
1035       os << get<const char*>(offset);
1036     return os;
1037   }
1038
1039   int RGWFileHandle::readdir(rgw_readdir_cb rcb, void *cb_arg,
1040                              readdir_offset offset,
1041                              bool *eof, uint32_t flags)
1042   {
1043     using event = RGWLibFS::event;
1044     using boost::get;
1045     int rc = 0;
1046     struct timespec now;
1047     CephContext* cct = fs->get_context();
1048
1049     directory* d = get<directory>(&variant_type);
1050     if (d) {
1051       (void) clock_gettime(CLOCK_MONOTONIC_COARSE, &now); /* !LOCKED */
1052       lock_guard guard(mtx);
1053       d->last_readdir = now;
1054     }
1055
1056     bool initial_off;
1057     if (likely(!! get<const char*>(&offset))) {
1058       initial_off = ! get<const char*>(offset);
1059     } else {
1060       initial_off = (*get<uint64_t*>(offset) == 0);
1061     }
1062
1063     if (is_root()) {
1064       RGWListBucketsRequest req(cct, fs->get_user(), this, rcb, cb_arg,
1065                                 offset);
1066       rc = rgwlib.get_fe()->execute_req(&req);
1067       if (! rc) {
1068         (void) clock_gettime(CLOCK_MONOTONIC_COARSE, &now); /* !LOCKED */
1069         lock_guard guard(mtx);
1070         state.atime = now;
1071         if (initial_off)
1072           set_nlink(2);
1073         inc_nlink(req.d_count);
1074         *eof = req.eof();
1075         event ev(event::type::READDIR, get_key(), state.atime);
1076         lock_guard sguard(fs->state.mtx);
1077         fs->state.push_event(ev);
1078       }
1079     } else {
1080       RGWReaddirRequest req(cct, fs->get_user(), this, rcb, cb_arg, offset);
1081       rc = rgwlib.get_fe()->execute_req(&req);
1082       if (! rc) {
1083         (void) clock_gettime(CLOCK_MONOTONIC_COARSE, &now); /* !LOCKED */
1084         lock_guard guard(mtx);
1085         state.atime = now;
1086         if (initial_off)
1087           set_nlink(2);
1088         inc_nlink(req.d_count);
1089         *eof = req.eof();
1090         event ev(event::type::READDIR, get_key(), state.atime);
1091         lock_guard sguard(fs->state.mtx);
1092         fs->state.push_event(ev);
1093       }
1094     }
1095
1096     lsubdout(fs->get_context(), rgw, 15)
1097       << __func__
1098       << " final link count=" << state.nlink
1099       << dendl;
1100
1101     return rc;
1102   } /* RGWFileHandle::readdir */
1103
1104   int RGWFileHandle::write(uint64_t off, size_t len, size_t *bytes_written,
1105                            void *buffer)
1106   {
1107     using std::get;
1108     using WriteCompletion = RGWLibFS::WriteCompletion;
1109
1110     lock_guard guard(mtx);
1111
1112     int rc = 0;
1113
1114     file* f = get<file>(&variant_type);
1115     if (! f)
1116       return -EISDIR;
1117
1118     if (deleted()) {
1119       lsubdout(fs->get_context(), rgw, 5)
1120         << __func__
1121         << " write attempted on deleted object "
1122         << this->object_name()
1123         << dendl;
1124       /* zap write transaction, if any */
1125       if (f->write_req) {
1126         delete f->write_req;
1127         f->write_req = nullptr;
1128       }
1129       return -ESTALE;
1130     }
1131
1132     if (! f->write_req) {
1133       /* guard--we do not support (e.g., COW-backed) partial writes */
1134       if (off != 0) {
1135         lsubdout(fs->get_context(), rgw, 5)
1136           << __func__
1137           << " " << object_name()
1138           << " non-0 initial write position " << off
1139           << dendl;
1140         return -EIO;
1141       }
1142
1143       /* start */
1144       std::string object_name = relative_object_name();
1145       f->write_req =
1146         new RGWWriteRequest(fs->get_context(), fs->get_user(), this,
1147                             bucket_name(), object_name);
1148       rc = rgwlib.get_fe()->start_req(f->write_req);
1149       if (rc < 0) {
1150         lsubdout(fs->get_context(), rgw, 5)
1151           << __func__
1152           << this->object_name()
1153           << " write start failed " << off
1154           << " (" << rc << ")"
1155           << dendl;
1156         /* zap failed write transaction */
1157         delete f->write_req;
1158         f->write_req = nullptr;
1159         return -EIO;
1160       } else {
1161         if (stateless_open())  {
1162           /* start write timer */
1163           f->write_req->timer_id =
1164             RGWLibFS::write_timer.add_event(
1165               std::chrono::seconds(RGWLibFS::write_completion_interval_s),
1166               WriteCompletion(*this));
1167         }
1168       }
1169     }
1170
1171     int overlap = 0;
1172     if ((static_cast<off_t>(off) < f->write_req->real_ofs) &&
1173         ((f->write_req->real_ofs - off) <= len)) {
1174       overlap = f->write_req->real_ofs - off;
1175       off = f->write_req->real_ofs;
1176       buffer = static_cast<char*>(buffer) + overlap;
1177       len -= overlap;
1178     }
1179
1180     buffer::list bl;
1181     /* XXXX */
1182 #if 0
1183     bl.push_back(
1184       buffer::create_static(len, static_cast<char*>(buffer)));
1185 #else
1186     bl.push_back(
1187       buffer::copy(static_cast<char*>(buffer), len));
1188 #endif
1189
1190     f->write_req->put_data(off, bl);
1191     rc = f->write_req->exec_continue();
1192
1193     if (rc == 0) {
1194       size_t min_size = off + len;
1195       if (min_size > get_size())
1196         set_size(min_size);
1197       if (stateless_open()) {
1198         /* bump write timer */
1199         RGWLibFS::write_timer.adjust_event(
1200           f->write_req->timer_id, std::chrono::seconds(10));
1201       }
1202     } else {
1203       /* continuation failed (e.g., non-contiguous write position) */
1204       lsubdout(fs->get_context(), rgw, 5)
1205         << __func__
1206         << object_name()
1207         << " failed write at position " << off
1208         << " (fails write transaction) "
1209         << dendl;
1210       /* zap failed write transaction */
1211       delete f->write_req;
1212       f->write_req = nullptr;
1213       rc = -EIO;
1214     }
1215
1216     *bytes_written = (rc == 0) ? (len + overlap) : 0;
1217     return rc;
1218   } /* RGWFileHandle::write */
1219
1220   int RGWFileHandle::write_finish(uint32_t flags)
1221   {
1222     unique_lock guard{mtx, std::defer_lock};
1223     int rc = 0;
1224
1225     if (! (flags & FLAG_LOCKED)) {
1226       guard.lock();
1227     }
1228
1229     file* f = get<file>(&variant_type);
1230     if (f && (f->write_req)) {
1231       lsubdout(fs->get_context(), rgw, 10)
1232         << __func__
1233         << " finishing write trans on " << object_name()
1234         << dendl;
1235       rc = rgwlib.get_fe()->finish_req(f->write_req);
1236       if (! rc) {
1237         rc = f->write_req->get_ret();
1238       }
1239       delete f->write_req;
1240       f->write_req = nullptr;
1241     }
1242
1243     return rc;
1244   } /* RGWFileHandle::write_finish */
1245
1246   int RGWFileHandle::close()
1247   {
1248     lock_guard guard(mtx);
1249
1250     int rc = write_finish(FLAG_LOCKED);
1251
1252     flags &= ~FLAG_OPEN;
1253     flags &= ~FLAG_STATELESS_OPEN;
1254
1255     return rc;
1256   } /* RGWFileHandle::close */
1257
1258   RGWFileHandle::file::~file()
1259   {
1260     delete write_req;
1261   }
1262
1263   void RGWFileHandle::clear_state()
1264   {
1265     directory* d = get<directory>(&variant_type);
1266     if (d) {
1267       state.nlink = 2;
1268       d->last_marker = rgw_obj_key{};
1269     }
1270   }
1271
1272   void RGWFileHandle::invalidate() {
1273     RGWLibFS *fs = get_fs();
1274     if (fs->invalidate_cb) {
1275       fs->invalidate_cb(fs->invalidate_arg, get_key().fh_hk);
1276     }
1277   }
1278
1279   int RGWWriteRequest::exec_start() {
1280     struct req_state* s = get_state();
1281
1282     auto compression_type =
1283       get_store()->get_zone_params().get_compression_type(
1284         s->bucket_info.placement_rule);
1285
1286     /* not obviously supportable */
1287     assert(! dlo_manifest);
1288     assert(! slo_info);
1289
1290     perfcounter->inc(l_rgw_put);
1291     op_ret = -EINVAL;
1292
1293     if (s->object.empty()) {
1294       ldout(s->cct, 0) << __func__ << " called on empty object" << dendl;
1295       goto done;
1296     }
1297
1298     op_ret = get_params();
1299     if (op_ret < 0)
1300       goto done;
1301
1302     op_ret = get_system_versioning_params(s, &olh_epoch, &version_id);
1303     if (op_ret < 0) {
1304       goto done;
1305     }
1306
1307     /* user-supplied MD5 check skipped (not supplied) */
1308     /* early quota check skipped--we don't have size yet */
1309     /* skipping user-supplied etag--we might have one in future, but
1310      * like data it and other attrs would arrive after open */
1311     processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
1312                                  &multipart);
1313     op_ret = processor->prepare(get_store(), NULL);
1314     if (op_ret < 0) {
1315       ldout(s->cct, 20) << "processor->prepare() returned ret=" << op_ret
1316                         << dendl;
1317       goto done;
1318     }
1319
1320     filter = processor;
1321     if (compression_type != "none") {
1322       plugin = Compressor::create(s->cct, compression_type);
1323     if (! plugin) {
1324       ldout(s->cct, 1) << "Cannot load plugin for rgw_compression_type "
1325                        << compression_type << dendl;
1326     } else {
1327       compressor.emplace(s->cct, plugin, filter);
1328       filter = &*compressor;
1329     }
1330   }
1331
1332   done:
1333     return op_ret;
1334   } /* exec_start */
1335
1336   int RGWWriteRequest::exec_continue()
1337   {
1338     struct req_state* s = get_state();
1339     op_ret = 0;
1340
1341     /* check guards (e.g., contig write) */
1342     if (eio)
1343       return -EIO;
1344
1345     size_t len = data.length();
1346     if (! len)
1347       return 0;
1348
1349     /* XXX we are currently synchronous--supplied data buffers cannot
1350      * be used after the caller returns  */
1351     bool need_to_wait = true;
1352     bufferlist orig_data;
1353
1354     if (need_to_wait) {
1355       orig_data = data;
1356     }
1357     hash.Update((const byte *)data.c_str(), data.length());
1358     op_ret = put_data_and_throttle(filter, data, ofs, need_to_wait);
1359     if (op_ret < 0) {
1360       if (!need_to_wait || op_ret != -EEXIST) {
1361         ldout(s->cct, 20) << "processor->thottle_data() returned ret="
1362                           << op_ret << dendl;
1363         goto done;
1364       }
1365
1366       ldout(s->cct, 5) << "NOTICE: processor->throttle_data() returned -EEXIST, need to restart write" << dendl;
1367
1368       /* restore original data */
1369       data.swap(orig_data);
1370
1371       /* restart processing with different oid suffix */
1372       dispose_processor(processor);
1373       processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
1374                                    &multipart);
1375       filter = processor;
1376
1377       string oid_rand;
1378       char buf[33];
1379       gen_rand_alphanumeric(get_store()->ctx(), buf, sizeof(buf) - 1);
1380       oid_rand.append(buf);
1381
1382       op_ret = processor->prepare(get_store(), &oid_rand);
1383       if (op_ret < 0) {
1384         ldout(s->cct, 0) << "ERROR: processor->prepare() returned "
1385                          << op_ret << dendl;
1386         goto done;
1387       }
1388
1389       /* restore compression filter, if any */
1390       if (compressor) {
1391         compressor.emplace(s->cct, plugin, filter);
1392         filter = &*compressor;
1393       }
1394
1395       op_ret = put_data_and_throttle(filter, data, ofs, false);
1396       if (op_ret < 0) {
1397         goto done;
1398       }
1399     }
1400     bytes_written += len;
1401
1402   done:
1403     return op_ret;
1404   } /* exec_continue */
1405
1406   int RGWWriteRequest::exec_finish()
1407   {
1408     buffer::list bl, aclbl, ux_key, ux_attrs;
1409     map<string, string>::iterator iter;
1410     char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
1411     unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
1412     struct req_state* s = get_state();
1413
1414     size_t osize = rgw_fh->get_size();
1415     struct timespec octime = rgw_fh->get_ctime();
1416     struct timespec omtime = rgw_fh->get_mtime();
1417     real_time appx_t = real_clock::now();
1418
1419     s->obj_size = bytes_written;
1420     perfcounter->inc(l_rgw_put_b, s->obj_size);
1421
1422     op_ret = get_store()->check_quota(s->bucket_owner.get_id(), s->bucket,
1423                                       user_quota, bucket_quota, s->obj_size);
1424     if (op_ret < 0) {
1425       goto done;
1426     }
1427
1428     op_ret = get_store()->check_bucket_shards(s->bucket_info, s->bucket,
1429                                               bucket_quota);
1430     if (op_ret < 0) {
1431       goto done;
1432     }
1433
1434     hash.Final(m);
1435
1436     if (compressor && compressor->is_compressed()) {
1437       bufferlist tmp;
1438       RGWCompressionInfo cs_info;
1439       cs_info.compression_type = plugin->get_type_name();
1440       cs_info.orig_size = s->obj_size;
1441       cs_info.blocks = std::move(compressor->get_compression_blocks());
1442       ::encode(cs_info, tmp);
1443       attrs[RGW_ATTR_COMPRESSION] = tmp;
1444       ldout(s->cct, 20) << "storing " << RGW_ATTR_COMPRESSION
1445                         << " with type=" << cs_info.compression_type
1446                         << ", orig_size=" << cs_info.orig_size
1447                         << ", blocks=" << cs_info.blocks.size() << dendl;
1448     }
1449
1450     buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
1451     etag = calc_md5;
1452
1453     bl.append(etag.c_str(), etag.size() + 1);
1454     emplace_attr(RGW_ATTR_ETAG, std::move(bl));
1455
1456     policy.encode(aclbl);
1457     emplace_attr(RGW_ATTR_ACL, std::move(aclbl));
1458
1459     /* unix attrs */
1460     rgw_fh->set_mtime(real_clock::to_timespec(appx_t));
1461     rgw_fh->set_ctime(real_clock::to_timespec(appx_t));
1462     rgw_fh->set_size(bytes_written);
1463     rgw_fh->encode_attrs(ux_key, ux_attrs);
1464
1465     emplace_attr(RGW_ATTR_UNIX_KEY1, std::move(ux_key));
1466     emplace_attr(RGW_ATTR_UNIX1, std::move(ux_attrs));
1467
1468     for (iter = s->generic_attrs.begin(); iter != s->generic_attrs.end();
1469          ++iter) {
1470       buffer::list& attrbl = attrs[iter->first];
1471       const string& val = iter->second;
1472       attrbl.append(val.c_str(), val.size() + 1);
1473     }
1474
1475     op_ret = rgw_get_request_metadata(s->cct, s->info, attrs);
1476     if (op_ret < 0) {
1477       goto done;
1478     }
1479     encode_delete_at_attr(delete_at, attrs);
1480
1481     /* Add a custom metadata to expose the information whether an object
1482      * is an SLO or not. Appending the attribute must be performed AFTER
1483      * processing any input from user in order to prohibit overwriting. */
1484     if (unlikely(!! slo_info)) {
1485       buffer::list slo_userindicator_bl;
1486       ::encode("True", slo_userindicator_bl);
1487       emplace_attr(RGW_ATTR_SLO_UINDICATOR, std::move(slo_userindicator_bl));
1488     }
1489
1490     op_ret = processor->complete(s->obj_size, etag, &mtime, real_time(), attrs,
1491                                  (delete_at ? *delete_at : real_time()),
1492                                  if_match, if_nomatch);
1493     if (op_ret != 0) {
1494       /* revert attr updates */
1495       rgw_fh->set_mtime(omtime);
1496       rgw_fh->set_ctime(octime);
1497       rgw_fh->set_size(osize);
1498     }
1499
1500   done:
1501     dispose_processor(processor);
1502     perfcounter->tinc(l_rgw_put_lat,
1503                       (ceph_clock_now() - s->time));
1504     return op_ret;
1505   } /* exec_finish */
1506
1507 } /* namespace rgw */
1508
1509 /* librgw */
1510 extern "C" {
1511
1512 void rgwfile_version(int *major, int *minor, int *extra)
1513 {
1514   if (major)
1515     *major = LIBRGW_FILE_VER_MAJOR;
1516   if (minor)
1517     *minor = LIBRGW_FILE_VER_MINOR;
1518   if (extra)
1519     *extra = LIBRGW_FILE_VER_EXTRA;
1520 }
1521
1522 /*
1523  attach rgw namespace
1524 */
1525   int rgw_mount(librgw_t rgw, const char *uid, const char *acc_key,
1526                 const char *sec_key, struct rgw_fs **rgw_fs,
1527                 uint32_t flags)
1528 {
1529   int rc = 0;
1530
1531   /* stash access data for "mount" */
1532   RGWLibFS* new_fs = new RGWLibFS(static_cast<CephContext*>(rgw), uid, acc_key,
1533                                   sec_key, "/");
1534   assert(new_fs);
1535
1536   rc = new_fs->authorize(rgwlib.get_store());
1537   if (rc != 0) {
1538     delete new_fs;
1539     return -EINVAL;
1540   }
1541
1542   /* register fs for shared gc */
1543   rgwlib.get_fe()->get_process()->register_fs(new_fs);
1544
1545   struct rgw_fs *fs = new_fs->get_fs();
1546   fs->rgw = rgw;
1547
1548   /* XXX we no longer assume "/" is unique, but we aren't tracking the
1549    * roots atm */
1550
1551   *rgw_fs = fs;
1552
1553   return 0;
1554 }
1555
1556 int rgw_mount2(librgw_t rgw, const char *uid, const char *acc_key,
1557                const char *sec_key, const char *root, struct rgw_fs **rgw_fs,
1558                uint32_t flags)
1559 {
1560   int rc = 0;
1561
1562   /* stash access data for "mount" */
1563   RGWLibFS* new_fs = new RGWLibFS(static_cast<CephContext*>(rgw), uid, acc_key,
1564                                   sec_key, root);
1565   assert(new_fs);
1566
1567   rc = new_fs->authorize(rgwlib.get_store());
1568   if (rc != 0) {
1569     delete new_fs;
1570     return -EINVAL;
1571   }
1572
1573   /* register fs for shared gc */
1574   rgwlib.get_fe()->get_process()->register_fs(new_fs);
1575
1576   struct rgw_fs *fs = new_fs->get_fs();
1577   fs->rgw = rgw;
1578
1579   /* XXX we no longer assume "/" is unique, but we aren't tracking the
1580    * roots atm */
1581
1582   *rgw_fs = fs;
1583
1584   return 0;
1585 }
1586
1587 /*
1588  register invalidate callbacks
1589 */
1590 int rgw_register_invalidate(struct rgw_fs *rgw_fs, rgw_fh_callback_t cb,
1591                             void *arg, uint32_t flags)
1592
1593 {
1594   RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
1595   return fs->register_invalidate(cb, arg, flags);
1596 }
1597
1598 /*
1599  detach rgw namespace
1600 */
1601 int rgw_umount(struct rgw_fs *rgw_fs, uint32_t flags)
1602 {
1603   RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
1604   fs->close();
1605   return 0;
1606 }
1607
1608 /*
1609   get filesystem attributes
1610 */
1611 int rgw_statfs(struct rgw_fs *rgw_fs,
1612                struct rgw_file_handle *parent_fh,
1613                struct rgw_statvfs *vfs_st, uint32_t flags)
1614 {
1615   RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
1616
1617   /* XXX for now, just publish a huge capacity and
1618    * limited utiliztion */
1619   vfs_st->f_bsize = 1024*1024 /* 1M */;
1620   vfs_st->f_frsize = 1024;    /* minimal allocation unit (who cares) */
1621   vfs_st->f_blocks = UINT64_MAX;
1622   vfs_st->f_bfree = UINT64_MAX;
1623   vfs_st->f_bavail = UINT64_MAX;
1624   vfs_st->f_files = 1024; /* object count, do we have an est? */
1625   vfs_st->f_ffree = UINT64_MAX;
1626   vfs_st->f_fsid[0] = fs->get_fsid();
1627   vfs_st->f_fsid[1] = fs->get_fsid();
1628   vfs_st->f_flag = 0;
1629   vfs_st->f_namemax = 4096;
1630   return 0;
1631 }
1632
1633 /*
1634   generic create -- create an empty regular file
1635 */
1636 int rgw_create(struct rgw_fs *rgw_fs, struct rgw_file_handle *parent_fh,
1637                const char *name, struct stat *st, uint32_t mask,
1638                struct rgw_file_handle **fh, uint32_t posix_flags,
1639                uint32_t flags)
1640 {
1641   using std::get;
1642
1643   RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
1644   RGWFileHandle* parent = get_rgwfh(parent_fh);
1645
1646   if ((! parent) ||
1647       (parent->is_root()) ||
1648       (parent->is_file())) {
1649     /* bad parent */
1650     return -EINVAL;
1651   }
1652
1653   MkObjResult fhr = fs->create(parent, name, st, mask, flags);
1654   RGWFileHandle *nfh = get<0>(fhr); // nullptr if !success
1655
1656   if (nfh)
1657     *fh = nfh->get_fh();
1658
1659   return get<1>(fhr);
1660 } /* rgw_create */
1661
1662 /*
1663   create a new directory
1664 */
1665 int rgw_mkdir(struct rgw_fs *rgw_fs,
1666               struct rgw_file_handle *parent_fh,
1667               const char *name, struct stat *st, uint32_t mask,
1668               struct rgw_file_handle **fh, uint32_t flags)
1669 {
1670   using std::get;
1671
1672   RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
1673   RGWFileHandle* parent = get_rgwfh(parent_fh);
1674
1675   if (! parent) {
1676     /* bad parent */
1677     return -EINVAL;
1678   }
1679
1680   MkObjResult fhr = fs->mkdir(parent, name, st, mask, flags);
1681   RGWFileHandle *nfh = get<0>(fhr); // nullptr if !success
1682
1683   if (nfh)
1684     *fh = nfh->get_fh();
1685
1686   return get<1>(fhr);
1687 } /* rgw_mkdir */
1688
1689 /*
1690   rename object
1691 */
1692 int rgw_rename(struct rgw_fs *rgw_fs,
1693                struct rgw_file_handle *src, const char* src_name,
1694                struct rgw_file_handle *dst, const char* dst_name,
1695                uint32_t flags)
1696 {
1697   RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
1698
1699   RGWFileHandle* src_fh = get_rgwfh(src);
1700   RGWFileHandle* dst_fh = get_rgwfh(dst);
1701
1702   return fs->rename(src_fh, dst_fh, src_name, dst_name);
1703 }
1704
1705 /*
1706   remove file or directory
1707 */
1708 int rgw_unlink(struct rgw_fs *rgw_fs, struct rgw_file_handle *parent_fh,
1709                const char *name, uint32_t flags)
1710 {
1711   RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
1712   RGWFileHandle* parent = get_rgwfh(parent_fh);
1713
1714   return fs->unlink(parent, name);
1715 }
1716
1717 /*
1718   lookup object by name (POSIX style)
1719 */
1720 int rgw_lookup(struct rgw_fs *rgw_fs,
1721               struct rgw_file_handle *parent_fh, const char* path,
1722               struct rgw_file_handle **fh, uint32_t flags)
1723 {
1724   //CephContext* cct = static_cast<CephContext*>(rgw_fs->rgw);
1725   RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
1726
1727   RGWFileHandle* parent = get_rgwfh(parent_fh);
1728   if ((! parent) ||
1729       (! parent->is_dir())) {
1730     /* bad parent */
1731     return -EINVAL;
1732   }
1733
1734   RGWFileHandle* rgw_fh;
1735   LookupFHResult fhr;
1736
1737   if (parent->is_root()) {
1738     /* special: parent lookup--note lack of ref()! */
1739     if (unlikely((strcmp(path, "..") == 0) ||
1740                  (strcmp(path, "/") == 0))) {
1741       rgw_fh = parent;
1742     } else {
1743       RGWLibFS::BucketStats bstat;
1744       fhr = fs->stat_bucket(parent, path, bstat, RGWFileHandle::FLAG_NONE);
1745       rgw_fh = get<0>(fhr);
1746       if (! rgw_fh)
1747         return -ENOENT;
1748     }
1749   } else {
1750     /* special: after readdir--note extra ref()! */
1751     if (unlikely((strcmp(path, "..") == 0))) {
1752       rgw_fh = parent;
1753       lsubdout(fs->get_context(), rgw, 17)
1754         << __func__ << "BANG"<< *rgw_fh
1755         << dendl;
1756       fs->ref(rgw_fh);
1757     } else {
1758       /* lookup in a readdir callback */
1759       enum rgw_fh_type fh_type = fh_type_of(flags);
1760
1761       uint32_t sl_flags = (flags & RGW_LOOKUP_FLAG_RCB)
1762         ? RGWFileHandle::FLAG_NONE
1763         : RGWFileHandle::FLAG_EXACT_MATCH;
1764
1765       fhr = fs->stat_leaf(parent, path, fh_type, sl_flags);
1766       if (! get<0>(fhr)) {
1767         if (! (flags & RGW_LOOKUP_FLAG_CREATE))
1768           return -ENOENT;
1769         else
1770           fhr = fs->lookup_fh(parent, path, RGWFileHandle::FLAG_CREATE);
1771       }
1772       rgw_fh = get<0>(fhr);
1773     }
1774   } /* !root */
1775
1776   struct rgw_file_handle *rfh = rgw_fh->get_fh();
1777   *fh = rfh;
1778
1779   return 0;
1780 } /* rgw_lookup */
1781
1782 /*
1783   lookup object by handle (NFS style)
1784 */
1785 int rgw_lookup_handle(struct rgw_fs *rgw_fs, struct rgw_fh_hk *fh_hk,
1786                       struct rgw_file_handle **fh, uint32_t flags)
1787 {
1788   RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
1789
1790   RGWFileHandle* rgw_fh = fs->lookup_handle(*fh_hk);
1791   if (! rgw_fh) {
1792     /* not found */
1793     return -ENOENT;
1794   }
1795
1796   struct rgw_file_handle *rfh = rgw_fh->get_fh();
1797   *fh = rfh;
1798
1799   return 0;
1800 }
1801
1802 /*
1803  * release file handle
1804  */
1805 int rgw_fh_rele(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh,
1806                 uint32_t flags)
1807 {
1808   RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
1809   RGWFileHandle* rgw_fh = get_rgwfh(fh);
1810
1811   lsubdout(fs->get_context(), rgw, 17)
1812     << __func__ << " " << *rgw_fh
1813     << dendl;
1814
1815   fs->unref(rgw_fh);
1816   return 0;
1817 }
1818
1819 /*
1820    get unix attributes for object
1821 */
1822 int rgw_getattr(struct rgw_fs *rgw_fs,
1823                 struct rgw_file_handle *fh, struct stat *st, uint32_t flags)
1824 {
1825   RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
1826   RGWFileHandle* rgw_fh = get_rgwfh(fh);
1827
1828   return fs->getattr(rgw_fh, st);
1829 }
1830
1831 /*
1832   set unix attributes for object
1833 */
1834 int rgw_setattr(struct rgw_fs *rgw_fs,
1835                 struct rgw_file_handle *fh, struct stat *st,
1836                 uint32_t mask, uint32_t flags)
1837 {
1838   RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
1839   RGWFileHandle* rgw_fh = get_rgwfh(fh);
1840
1841   return fs->setattr(rgw_fh, st, mask, flags);
1842 }
1843
1844 /*
1845    truncate file
1846 */
1847 int rgw_truncate(struct rgw_fs *rgw_fs,
1848                  struct rgw_file_handle *fh, uint64_t size, uint32_t flags)
1849 {
1850   return 0;
1851 }
1852
1853 /*
1854    open file
1855 */
1856 int rgw_open(struct rgw_fs *rgw_fs,
1857              struct rgw_file_handle *fh, uint32_t posix_flags, uint32_t flags)
1858 {
1859   RGWFileHandle* rgw_fh = get_rgwfh(fh);
1860
1861   /* XXX 
1862    * need to track specific opens--at least read opens and
1863    * a write open;  we need to know when a write open is returned,
1864    * that closes a write transaction
1865    *
1866    * for now, we will support single-open only, it's preferable to
1867    * anything we can otherwise do without access to the NFS state
1868    */
1869   if (! rgw_fh->is_file())
1870     return -EISDIR;
1871
1872   return rgw_fh->open(flags);
1873 }
1874
1875 /*
1876    close file
1877 */
1878 int rgw_close(struct rgw_fs *rgw_fs,
1879               struct rgw_file_handle *fh, uint32_t flags)
1880 {
1881   RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
1882   RGWFileHandle* rgw_fh = get_rgwfh(fh);
1883   int rc = rgw_fh->close(/* XXX */);
1884
1885   if (flags & RGW_CLOSE_FLAG_RELE)
1886     fs->unref(rgw_fh);
1887
1888   return rc;
1889 }
1890
1891 int rgw_readdir(struct rgw_fs *rgw_fs,
1892                 struct rgw_file_handle *parent_fh, uint64_t *offset,
1893                 rgw_readdir_cb rcb, void *cb_arg, bool *eof,
1894                 uint32_t flags)
1895 {
1896   RGWFileHandle* parent = get_rgwfh(parent_fh);
1897   if (! parent) {
1898     /* bad parent */
1899     return -EINVAL;
1900   }
1901
1902   lsubdout(parent->get_fs()->get_context(), rgw, 15)
1903     << __func__
1904     << " offset=" << *offset
1905     << dendl;
1906
1907   if ((*offset == 0) &&
1908       (flags & RGW_READDIR_FLAG_DOTDOT)) {
1909     /* send '.' and '..' with their NFS-defined offsets */
1910     rcb(".", cb_arg, 1, RGW_LOOKUP_FLAG_DIR);
1911     rcb("..", cb_arg, 2, RGW_LOOKUP_FLAG_DIR);
1912   }
1913
1914   int rc = parent->readdir(rcb, cb_arg, offset, eof, flags);
1915   return rc;
1916 } /* rgw_readdir */
1917
1918 /* enumeration continuing from name */
1919 int rgw_readdir2(struct rgw_fs *rgw_fs,
1920                  struct rgw_file_handle *parent_fh, const char *name,
1921                  rgw_readdir_cb rcb, void *cb_arg, bool *eof,
1922                  uint32_t flags)
1923 {
1924   RGWFileHandle* parent = get_rgwfh(parent_fh);
1925   if (! parent) {
1926     /* bad parent */
1927     return -EINVAL;
1928   }
1929
1930   lsubdout(parent->get_fs()->get_context(), rgw, 15)
1931     << __func__
1932     << " offset=" << name
1933     << dendl;
1934
1935   if ((! name) &&
1936       (flags & RGW_READDIR_FLAG_DOTDOT)) {
1937     /* send '.' and '..' with their NFS-defined offsets */
1938     rcb(".", cb_arg, 1, RGW_LOOKUP_FLAG_DIR);
1939     rcb("..", cb_arg, 2, RGW_LOOKUP_FLAG_DIR);
1940   }
1941
1942   int rc = parent->readdir(rcb, cb_arg, name, eof, flags);
1943   return rc;
1944 } /* rgw_readdir2 */
1945
1946 /* project offset of dirent name */
1947 int rgw_dirent_offset(struct rgw_fs *rgw_fs,
1948                       struct rgw_file_handle *parent_fh,
1949                       const char *name, int64_t *offset,
1950                       uint32_t flags)
1951 {
1952   RGWFileHandle* parent = get_rgwfh(parent_fh);
1953   if ((! parent)) {
1954     /* bad parent */
1955     return -EINVAL;
1956   }
1957   std::string sname{name};
1958   int rc = parent->offset_of(sname, offset, flags);
1959   return rc;
1960 }
1961
1962 /*
1963    read data from file
1964 */
1965 int rgw_read(struct rgw_fs *rgw_fs,
1966              struct rgw_file_handle *fh, uint64_t offset,
1967              size_t length, size_t *bytes_read, void *buffer,
1968              uint32_t flags)
1969 {
1970   RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
1971   RGWFileHandle* rgw_fh = get_rgwfh(fh);
1972
1973   return fs->read(rgw_fh, offset, length, bytes_read, buffer, flags);
1974 }
1975
1976 /*
1977    write data to file
1978 */
1979 int rgw_write(struct rgw_fs *rgw_fs,
1980               struct rgw_file_handle *fh, uint64_t offset,
1981               size_t length, size_t *bytes_written, void *buffer,
1982               uint32_t flags)
1983 {
1984   RGWFileHandle* rgw_fh = get_rgwfh(fh);
1985   int rc;
1986
1987   *bytes_written = 0;
1988
1989   if (! rgw_fh->is_file())
1990     return -EISDIR;
1991
1992   if (! rgw_fh->is_open()) {
1993     if (flags & RGW_OPEN_FLAG_V3) {
1994       rc = rgw_fh->open(flags);
1995       if (!! rc)
1996         return rc;
1997     } else
1998       return -EPERM;
1999   }
2000
2001   rc = rgw_fh->write(offset, length, bytes_written, buffer);
2002
2003   return rc;
2004 }
2005
2006 /*
2007    read data from file (vector)
2008 */
2009 class RGWReadV
2010 {
2011   buffer::list bl;
2012   struct rgw_vio* vio;
2013
2014 public:
2015   RGWReadV(buffer::list& _bl, rgw_vio* _vio) : vio(_vio) {
2016     bl.claim(_bl);
2017   }
2018
2019   struct rgw_vio* get_vio() { return vio; }
2020
2021   const std::list<buffer::ptr>& buffers() { return bl.buffers(); }
2022
2023   unsigned /* XXX */ length() { return bl.length(); }
2024
2025 };
2026
2027 void rgw_readv_rele(struct rgw_uio *uio, uint32_t flags)
2028 {
2029   RGWReadV* rdv = static_cast<RGWReadV*>(uio->uio_p1);
2030   rdv->~RGWReadV();
2031   ::operator delete(rdv);
2032 }
2033
2034 int rgw_readv(struct rgw_fs *rgw_fs,
2035               struct rgw_file_handle *fh, rgw_uio *uio, uint32_t flags)
2036 {
2037 #if 0 /* XXX */
2038   CephContext* cct = static_cast<CephContext*>(rgw_fs->rgw);
2039   RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
2040   RGWFileHandle* rgw_fh = get_rgwfh(fh);
2041
2042   if (! rgw_fh->is_file())
2043     return -EINVAL;
2044
2045   int rc = 0;
2046
2047   buffer::list bl;
2048   RGWGetObjRequest req(cct, fs->get_user(), rgw_fh->bucket_name(),
2049                       rgw_fh->object_name(), uio->uio_offset, uio->uio_resid,
2050                       bl);
2051   req.do_hexdump = false;
2052
2053   rc = rgwlib.get_fe()->execute_req(&req);
2054
2055   if (! rc) {
2056     RGWReadV* rdv = static_cast<RGWReadV*>(
2057       ::operator new(sizeof(RGWReadV) +
2058                     (bl.buffers().size() * sizeof(struct rgw_vio))));
2059
2060     (void) new (rdv)
2061       RGWReadV(bl, reinterpret_cast<rgw_vio*>(rdv+sizeof(RGWReadV)));
2062
2063     uio->uio_p1 = rdv;
2064     uio->uio_cnt = rdv->buffers().size();
2065     uio->uio_resid = rdv->length();
2066     uio->uio_vio = rdv->get_vio();
2067     uio->uio_rele = rgw_readv_rele;
2068
2069     int ix = 0;
2070     auto& buffers = rdv->buffers();
2071     for (auto& bp : buffers) {
2072       rgw_vio *vio = &(uio->uio_vio[ix]);
2073       vio->vio_base = const_cast<char*>(bp.c_str());
2074       vio->vio_len = bp.length();
2075       vio->vio_u1 = nullptr;
2076       vio->vio_p1 = nullptr;
2077       ++ix;
2078     }
2079   }
2080
2081   return rc;
2082 #else
2083   return 0;
2084 #endif
2085 }
2086
2087 /*
2088    write data to file (vector)
2089 */
2090 int rgw_writev(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh,
2091               rgw_uio *uio, uint32_t flags)
2092 {
2093
2094   return -ENOTSUP;
2095
2096   CephContext* cct = static_cast<CephContext*>(rgw_fs->rgw);
2097   RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
2098   RGWFileHandle* rgw_fh = get_rgwfh(fh);
2099
2100   if (! rgw_fh->is_file())
2101     return -EINVAL;
2102
2103   buffer::list bl;
2104   for (unsigned int ix = 0; ix < uio->uio_cnt; ++ix) {
2105     rgw_vio *vio = &(uio->uio_vio[ix]);
2106     bl.push_back(
2107       buffer::create_static(vio->vio_len,
2108                             static_cast<char*>(vio->vio_base)));
2109   }
2110
2111   std::string oname = rgw_fh->relative_object_name();
2112   RGWPutObjRequest req(cct, fs->get_user(), rgw_fh->bucket_name(),
2113                        oname, bl);
2114
2115   int rc = rgwlib.get_fe()->execute_req(&req);
2116
2117   /* XXX update size (in request) */
2118
2119   return rc;
2120 }
2121
2122 /*
2123    sync written data
2124 */
2125 int rgw_fsync(struct rgw_fs *rgw_fs, struct rgw_file_handle *handle,
2126               uint32_t flags)
2127 {
2128   return 0;
2129 }
2130
2131 int rgw_commit(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh,
2132                uint64_t offset, uint64_t length, uint32_t flags)
2133 {
2134   RGWFileHandle* rgw_fh = get_rgwfh(fh);
2135
2136   return rgw_fh->commit(offset, length, RGWFileHandle::FLAG_NONE);
2137 }
2138
2139 } /* extern "C" */