Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / librgw_file_nfsns.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2015 Red Hat, Inc.
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include <stdint.h>
16 #include <tuple>
17 #include <iostream>
18 #include <fstream>
19 #include <stack>
20
21 #include "include/rados/librgw.h"
22 #include "include/rados/rgw_file.h"
23 #include "rgw/rgw_file.h"
24 #include "rgw/rgw_lib_frontend.h" // direct requests
25
26 #include "gtest/gtest.h"
27 #include "common/backport14.h"
28 #include "common/ceph_argparse.h"
29 #include "common/debug.h"
30 #include "global/global_init.h"
31 #include "include/assert.h"
32
33 #define dout_subsys ceph_subsys_rgw
34
35 namespace {
36
37   using namespace rgw;
38   using std::get;
39   using std::string;
40
41   librgw_t rgw_h = nullptr;
42   string userid("testuser");
43   string access_key("");
44   string secret_key("");
45   struct rgw_fs *fs = nullptr;
46   CephContext* cct = nullptr;
47
48   uint32_t owner_uid = 867;
49   uint32_t owner_gid = 5309;
50
51   uint32_t magic_uid = 1701;
52   uint32_t magic_gid = 9876;
53
54   uint32_t create_mask = RGW_SETATTR_UID | RGW_SETATTR_GID | RGW_SETATTR_MODE;
55
56   string bucket_name("nfsroot");
57   string dirs1_bucket_name("bdirs1");
58   string readf_name("toyland");
59   string readf_out_name("rgwlib_readf.out");
60   std::string writef_name{"bigbird"};
61
62   int n_dirs1_dirs = 3;
63   int n_dirs1_objs = 2;
64
65   class obj_rec
66   {
67   public:
68     string name;
69     struct rgw_file_handle* fh;
70     struct rgw_file_handle* parent_fh;
71     RGWFileHandle* rgw_fh; // alias into fh
72
73     struct state {
74       bool readdir;
75       state() : readdir(false) {}
76     } state;
77
78     obj_rec(string _name, struct rgw_file_handle* _fh,
79             struct rgw_file_handle* _parent_fh, RGWFileHandle* _rgw_fh)
80       : name(std::move(_name)), fh(_fh), parent_fh(_parent_fh),
81         rgw_fh(_rgw_fh) {}
82
83     void clear() {
84       fh = nullptr;
85       rgw_fh = nullptr;
86     }
87
88     void sync() {
89       if (fh)
90         rgw_fh = get_rgwfh(fh);
91     }
92
93     friend ostream& operator<<(ostream& os, const obj_rec& rec);
94   };
95
96   ostream& operator<<(ostream& os, const obj_rec& rec)
97   {
98     RGWFileHandle* rgw_fh = rec.rgw_fh;
99     if (rgw_fh) {
100       const char* type = rgw_fh->is_dir() ? "DIR " : "FILE ";
101       os << rec.rgw_fh->full_object_name()
102          << " (" << rec.rgw_fh->object_name() << "): "
103          << type;
104     }
105     return os;
106   }
107   
108   std::stack<obj_rec> obj_stack;
109   std::deque<obj_rec> cleanup_queue;
110
111   typedef std::vector<obj_rec> obj_vec;
112   typedef std::tuple<obj_rec, obj_vec> dirs1_rec;
113   typedef std::vector<dirs1_rec> dirs1_vec;
114
115   dirs1_vec dirs_vec;
116
117   struct obj_rec_st
118   {
119     const obj_rec& obj;
120     const struct stat& st;
121
122     obj_rec_st(const obj_rec& _obj, const struct stat& _st)
123       : obj(_obj), st(_st) {}
124   };
125
126   ostream& operator<<(ostream& os, const obj_rec_st& rec)
127   {
128     RGWFileHandle* rgw_fh = rec.obj.rgw_fh;
129     if (rgw_fh) {
130       const char* type = rgw_fh->is_dir() ? "DIR " : "FILE ";
131       os << rgw_fh->full_object_name()
132          << " (" << rgw_fh->object_name() << "): "
133          << type;
134       const struct stat& st = rec.st;
135       switch(uint8_t(rgw_fh->is_dir())) {
136       case 1:
137         os << " mode: " << st.st_mode;
138         os << " nlinks: " << st.st_nlink;
139         break;
140       case 0:
141       default:
142         os << " mode: " << st.st_mode;
143         os << " size: " << st.st_size;
144         // xxx
145         break;
146       }
147     }
148     return os;
149   }
150
151   bool do_hier1 = false;
152   bool do_dirs1 = false;
153   bool do_readf = false;
154   bool do_writef = false;
155   bool do_marker1 = false;
156   bool do_create = false;
157   bool do_delete = false;
158   bool do_rename = false;
159   bool do_setattr = false;
160   bool verbose = false;
161
162   string marker_dir("nfs_marker");
163   struct rgw_file_handle *bucket_fh = nullptr;
164   struct rgw_file_handle *marker_fh;
165   static constexpr int marker_nobjs = 2*1024;
166   std::deque<obj_rec> marker_objs;
167
168   using dirent_t = std::tuple<std::string, uint64_t>;
169   struct dirent_vec
170   {
171     std::vector<dirent_t> obj_names;
172     uint32_t count;
173     dirent_vec() : count(0) {}
174   };
175
176   obj_rec dirs1_b{dirs1_bucket_name, nullptr, nullptr, nullptr};
177
178   dirs1_vec renames_vec;
179
180   struct {
181     int argc;
182     char **argv;
183   } saved_args;
184 }
185
186 TEST(LibRGW, INIT) {
187   int ret = librgw_create(&rgw_h, saved_args.argc, saved_args.argv);
188   ASSERT_EQ(ret, 0);
189   ASSERT_NE(rgw_h, nullptr);
190 }
191
192 TEST(LibRGW, MOUNT) {
193   int ret = rgw_mount2(rgw_h, userid.c_str(), access_key.c_str(),
194                        secret_key.c_str(), "/", &fs, RGW_MOUNT_FLAG_NONE);
195   ASSERT_EQ(ret, 0);
196   ASSERT_NE(fs, nullptr);
197
198   cct = static_cast<RGWLibFS*>(fs->fs_private)->get_context();
199 }
200
201 TEST(LibRGW, SETUP_HIER1)
202 {
203   if (do_hier1) {
204     (void) rgw_lookup(fs, fs->root_fh, bucket_name.c_str(), &bucket_fh,
205                       RGW_LOOKUP_FLAG_NONE);
206     if (! bucket_fh) {
207       if (do_create) {
208         struct stat st;
209
210         st.st_uid = owner_uid;
211         st.st_gid = owner_gid;
212         st.st_mode = 755;
213
214         int rc = rgw_mkdir(fs, fs->root_fh, bucket_name.c_str(), &st,
215                           create_mask, &bucket_fh, RGW_MKDIR_FLAG_NONE);
216         ASSERT_EQ(rc, 0);
217       }
218     }
219
220     ASSERT_NE(bucket_fh, nullptr);
221
222     if (do_create) {
223       /* create objects directly */
224       std::vector<std::string> obj_names =
225         {"foo/bar/baz/quux",
226          "foo/f1",
227          "foo/f2",
228          "foo/bar/f1",
229          "foo/bar/d1/",
230          "foo/bar/baz/hungry",
231          "foo/bar/baz/hungry/",
232          "foo/bar/baz/momma",
233          "foo/bar/baz/bear/",
234          "foo/bar/baz/sasquatch",
235          "foo/bar/baz/sasquatch/",
236          "foo/bar/baz/frobozz"};
237
238       buffer::list bl; // empty object
239       RGWLibFS *fs_private = static_cast<RGWLibFS*>(fs->fs_private);
240
241       for (const auto& obj_name : obj_names) {
242         if (verbose) {
243           std::cout << "creating: " << bucket_name << ":" << obj_name
244                     << std::endl;
245         }
246         RGWPutObjRequest req(cct, fs_private->get_user(), bucket_name, obj_name,
247                             bl);
248         int rc = rgwlib.get_fe()->execute_req(&req);
249         int rc2 = req.get_ret();
250         ASSERT_EQ(rc, 0);
251         ASSERT_EQ(rc2, 0);
252       }
253     }
254   }
255 }
256
257 TEST(LibRGW, SETUP_DIRS1) {
258   if (do_dirs1) {
259     int rc;
260     struct stat st;
261
262     st.st_uid = owner_uid;
263     st.st_gid = owner_gid;
264     st.st_mode = 755;
265
266     dirs1_b.parent_fh = fs->root_fh;
267
268     (void) rgw_lookup(fs, dirs1_b.parent_fh, dirs1_bucket_name.c_str(),
269                       &dirs1_b.fh, RGW_LOOKUP_FLAG_NONE);
270
271     if (! dirs1_b.fh) {
272       if (do_create) {
273         rc = rgw_mkdir(fs, dirs1_b.parent_fh, dirs1_b.name.c_str(), &st,
274                       create_mask, &dirs1_b.fh, RGW_MKDIR_FLAG_NONE);
275         ASSERT_EQ(rc, 0);
276       } else {
277         /* no top-level dir and can't create it--skip remaining tests */
278         return;
279       }
280     }
281     dirs1_b.sync();
282
283     /* make top-level dirs */
284     int d_ix;
285     obj_vec ovec;
286     for (d_ix = 0; d_ix < n_dirs1_dirs; ++d_ix) {
287       std::string dname{"dir_"};
288       dname += to_string(d_ix);
289       obj_rec dir{dname, nullptr, dirs1_b.fh, nullptr};
290       ovec.clear();
291
292       (void) rgw_lookup(fs, dir.parent_fh, dir.name.c_str(), &dir.fh,
293                         RGW_LOOKUP_FLAG_NONE);
294       if (! dir.fh) {
295         if (do_create) {
296           rc = rgw_mkdir(fs, dir.parent_fh, dir.name.c_str(), &st, create_mask,
297                          &dir.fh, RGW_MKDIR_FLAG_NONE);
298           ASSERT_EQ(rc, 0);
299         }
300       }
301
302       ASSERT_NE(dir.fh, nullptr);
303       dir.sync();
304       ASSERT_NE(dir.rgw_fh, nullptr);
305       ASSERT_TRUE(dir.rgw_fh->is_dir());
306
307       int f_ix;
308       for (f_ix = 0; f_ix < n_dirs1_objs; ++f_ix) {
309         /* child dir */
310         std::string sdname{"sdir_"};
311         sdname += to_string(f_ix);
312         obj_rec sdir{sdname, nullptr, dir.fh, nullptr};
313
314         (void) rgw_lookup(fs, sdir.parent_fh, sdir.name.c_str(), &sdir.fh,
315                           RGW_LOOKUP_FLAG_NONE);
316
317         if (! sdir.fh) {
318           if (do_create) {
319             rc = rgw_mkdir(fs, sdir.parent_fh, sdir.name.c_str(), &st,
320                           create_mask, &sdir.fh, RGW_MKDIR_FLAG_NONE);
321             ASSERT_EQ(rc, 0);
322           }
323         }
324
325         ASSERT_NE(sdir.fh, nullptr); // suppress !lookup && !create case
326
327         sdir.sync();
328         ASSERT_TRUE(sdir.rgw_fh->is_dir());
329         ovec.push_back(sdir);
330
331         /* child file */
332         std::string sfname{"sfile_"};
333
334         sfname += to_string(f_ix);
335         obj_rec sf{sfname, nullptr, dir.fh, nullptr};
336
337         (void) rgw_lookup(fs, sf.parent_fh, sf.name.c_str(), &sf.fh,
338                           RGW_LOOKUP_FLAG_NONE);
339
340         if (! sf.fh) {
341           if (do_create) {
342             /* make a new file object (the hard way) */
343             rc = rgw_lookup(fs, sf.parent_fh, sf.name.c_str(), &sf.fh,
344                             RGW_LOOKUP_FLAG_CREATE);
345             ASSERT_EQ(rc, 0);
346             sf.sync();
347             ASSERT_TRUE(sf.rgw_fh->is_file());
348
349             /* because we made it the hard way, fixup attributes */
350             struct stat st;
351             st.st_uid = owner_uid;
352             st.st_gid = owner_gid;
353             st.st_mode = 644;
354             sf.rgw_fh->create_stat(&st, create_mask);
355
356             /* open handle */
357             rc = rgw_open(fs, sf.fh, 0 /* posix flags */, 0 /* flags */);
358             ASSERT_EQ(rc, 0);
359             ASSERT_TRUE(sf.rgw_fh->is_open());
360             /* stage seq write */
361             size_t nbytes;
362             string data = "data for " + sf.name;
363             rc = rgw_write(fs, sf.fh, 0, data.length(), &nbytes,
364                            (void*) data.c_str(), RGW_WRITE_FLAG_NONE);
365             ASSERT_EQ(rc, 0);
366             ASSERT_EQ(nbytes, data.length());
367             /* commit write transaction */
368             rc = rgw_close(fs, sf.fh, 0 /* flags */);
369             ASSERT_EQ(rc, 0);
370           }
371         } else {
372           sf.sync();
373           ASSERT_TRUE(sf.rgw_fh->is_file());
374         }
375         
376         if (sf.fh)
377           ovec.push_back(sf);
378       }
379       dirs_vec.push_back(dirs1_rec{dir, ovec});
380     }
381   } /* dirs1 top-level !exist */
382 }
383
384 TEST(LibRGW, SETATTR) {
385   if (do_dirs1) {
386     if (do_setattr) {
387
388       int rc;
389       struct stat st;
390
391       st.st_uid = owner_uid;
392       st.st_gid = owner_gid;
393       st.st_mode = 755;
394
395       std::string dname{"dir_0"};
396       obj_rec dir{dname, nullptr, dirs1_b.fh, nullptr};
397
398       /* dir_0 MUST exist and MUST be resident */
399       (void) rgw_lookup(fs, dir.parent_fh, dir.name.c_str(), &dir.fh,
400                         RGW_LOOKUP_FLAG_NONE);
401
402       ASSERT_NE(dir.fh, nullptr);
403       dir.sync();
404       ASSERT_NE(dir.rgw_fh, nullptr);
405       ASSERT_TRUE(dir.rgw_fh->is_dir());
406
407       /* child file */
408       std::string sfname{"setattr_file_0"};
409       obj_rec sf{sfname, nullptr, dir.fh, nullptr};
410
411       (void) rgw_lookup(fs, sf.parent_fh, sf.name.c_str(), &sf.fh,
412                         RGW_LOOKUP_FLAG_NONE);
413
414       if (! sf.fh) {
415         /* make a new file object (the hard way) */
416         rc = rgw_lookup(fs, sf.parent_fh, sf.name.c_str(), &sf.fh,
417                         RGW_LOOKUP_FLAG_CREATE);
418         ASSERT_EQ(rc, 0);
419         sf.sync();
420         ASSERT_TRUE(sf.rgw_fh->is_file());
421
422         /* because we made it the hard way, fixup attributes */
423         st.st_uid = owner_uid;
424         st.st_gid = owner_gid;
425         st.st_mode = 644;
426         sf.rgw_fh->create_stat(&st, create_mask);
427
428         /* open handle */
429         rc = rgw_open(fs, sf.fh, 0 /* posix flags */, 0 /* flags */);
430         ASSERT_EQ(rc, 0);
431         ASSERT_TRUE(sf.rgw_fh->is_open());
432         /* stage seq write */
433         size_t nbytes;
434         string data = "data for " + sf.name;
435         rc = rgw_write(fs, sf.fh, 0, data.length(), &nbytes,
436                       (void*) data.c_str(), RGW_WRITE_FLAG_NONE);
437         ASSERT_EQ(rc, 0);
438         ASSERT_EQ(nbytes, data.length());
439         /* commit write transaction */
440         rc = rgw_close(fs, sf.fh, 0 /* flags */);
441         ASSERT_EQ(rc, 0);
442       } else {
443         sf.sync();
444         ASSERT_TRUE(sf.rgw_fh->is_file());
445       }
446
447       /* sf MUST now be materialized--now change it's attributes */
448       st.st_uid = magic_uid;
449       st.st_gid = magic_gid;
450
451       rc = rgw_setattr(fs, sf.fh, &st, create_mask, RGW_SETATTR_FLAG_NONE);
452       ASSERT_EQ(rc, 0);
453
454       /* force evict--subsequent lookups must reload */
455       static_cast<RGWLibFS*>(fs->fs_private)->release_evict(sf.rgw_fh);
456
457       sf.clear();
458
459       /* revalidate -- expect magic uid and gid */
460       (void) rgw_lookup(fs, sf.parent_fh, sf.name.c_str(), &sf.fh,
461                         RGW_LOOKUP_FLAG_NONE);
462       sf.sync();
463       ASSERT_NE(sf.fh, nullptr);
464
465       memset(&st, 0, sizeof(struct stat)); /* nothing up my sleeve... */
466
467       rc =  rgw_getattr(fs, sf.fh, &st, RGW_GETATTR_FLAG_NONE);
468       ASSERT_EQ(rc, 0);
469
470       ASSERT_EQ(st.st_uid, magic_uid);
471       ASSERT_EQ(st.st_gid, magic_gid);
472
473       /* release 1 ref on sf */
474       rgw_fh_rele(fs, sf.fh, RGW_FH_RELE_FLAG_NONE);
475
476       /* release 1 ref on dir */
477       rgw_fh_rele(fs, dir.fh, RGW_FH_RELE_FLAG_NONE);
478     } /* dirs1 */
479   }
480 }
481
482 TEST(LibRGW, RGW_CREATE_DIRS1) {
483   /* verify rgw_create (create [empty] file objects the easy way) */
484   if (do_dirs1) {
485     if (do_create) {
486       int rc;
487       struct stat st;
488
489       st.st_uid = owner_uid;
490       st.st_gid = owner_gid;
491       st.st_mode = 644;
492
493       for (auto& dirs_rec : dirs_vec) {
494         /* create 1 more file in each sdir */
495         obj_rec& dir = get<0>(dirs_rec);
496         std::string sfname{"sfile_" + to_string(n_dirs1_objs)};
497         obj_rec sf{sfname, nullptr, dir.fh, nullptr};
498         (void) rgw_lookup(fs, sf.parent_fh, sf.name.c_str(), &sf.fh,
499                           RGW_LOOKUP_FLAG_NONE);
500         if (! sf.fh) {
501           rc = rgw_create(fs, sf.parent_fh, sf.name.c_str(), &st, create_mask,
502                           &sf.fh, 0 /* posix flags */, RGW_CREATE_FLAG_NONE);
503           ASSERT_EQ(rc, 0);
504         }
505         sf.sync();
506       }
507       n_dirs1_objs++;
508     }
509   }
510 }
511
512 TEST(LibRGW, RGW_SETUP_RENAME1) {
513   /* verify rgw_create (create [empty] file objects the easy way) */
514   if (do_rename) {
515     int rc;
516     struct stat st;
517     obj_vec ovec;
518
519     st.st_uid = owner_uid;
520     st.st_gid = owner_gid;
521     st.st_mode = 755;
522
523     for (int b_ix : {0, 1}) {
524       std::string bname{"brename_" + to_string(b_ix)};
525       obj_rec brec{bname, nullptr, nullptr, nullptr};
526       (void) rgw_lookup(fs, fs->root_fh, brec.name.c_str(), &brec.fh,
527                         RGW_LOOKUP_FLAG_NONE);
528       if (! brec.fh) {
529         if (do_create) {
530           struct stat st;
531           int rc = rgw_mkdir(fs, fs->root_fh, brec.name.c_str(), &st,
532                             create_mask, &brec.fh, RGW_MKDIR_FLAG_NONE);
533           ASSERT_EQ(rc, 0);
534         }
535       }
536       ASSERT_NE(brec.fh, nullptr);
537       brec.sync();
538
539       st.st_mode = 644; /* file mask */
540
541       for (int f_ix : {0, 1}) {
542         std::string rfname{"rfile_"};
543         rfname += to_string(f_ix);
544         obj_rec rf{rfname, nullptr, brec.fh, nullptr};
545         (void) rgw_lookup(fs, rf.parent_fh, rf.name.c_str(), &rf.fh,
546                           RGW_LOOKUP_FLAG_NONE);
547         if (! rf.fh) {
548           rc = rgw_create(fs, rf.parent_fh, rf.name.c_str(), &st, create_mask,
549                           &rf.fh, 0 /* posix flags */, RGW_CREATE_FLAG_NONE);
550           ASSERT_EQ(rc, 0);
551         }
552         rf.sync();
553         ovec.push_back(rf);
554       }
555       renames_vec.push_back(dirs1_rec{brec, ovec});
556       ovec.clear();
557     } /* b_ix */
558   }
559 }
560
561 TEST(LibRGW, RGW_INTRABUCKET_RENAME1) {
562   /* rgw_rename a file within a bucket */
563   if (do_rename) {
564     int rc;
565     obj_rec& bdir0 = get<0>(renames_vec[0]);
566     obj_rec& src_obj = get<1>(renames_vec[0])[0];
567     std::string rfname{"rfile_r0"};
568     if (verbose) {
569       std::cout << "rename file " << src_obj.name << " to "
570                 << rfname << " (bucket " << bdir0.name << ")"
571                 << std::endl;
572     }
573     rc = rgw_rename(fs, bdir0.fh, src_obj.name.c_str(), bdir0.fh,
574                     rfname.c_str(), RGW_RENAME_FLAG_NONE);
575     ASSERT_EQ(rc, 0);
576   }
577 }
578
579 TEST(LibRGW, RGW_CROSSBUCKET_RENAME1) {
580   /* rgw_rename a file within a bucket */
581   if (do_rename) {
582     int rc;
583     obj_rec& bdir0 = get<0>(renames_vec[0]);
584     obj_rec& bdir1 = get<0>(renames_vec[1]);
585     obj_rec& src_obj = get<1>(renames_vec[0])[1];
586     std::string rfname{"rfile_rhilldog"};
587     if (verbose) {
588       std::cout << "rename file " << src_obj.name
589                 << " (bucket " << bdir0.name << ") to "
590                 << rfname << " (bucket " << bdir1.name << ")"
591                 << std::endl;
592     }
593     rc = rgw_rename(fs, bdir0.fh, src_obj.name.c_str(), bdir1.fh,
594                     rfname.c_str(), RGW_RENAME_FLAG_NONE);
595     ASSERT_EQ(rc, 0);
596   }
597 }
598
599 TEST(LibRGW, BAD_DELETES_DIRS1) {
600   if (do_dirs1) {
601     int rc;
602
603     if (dirs_vec.size() == 0) {
604       /* skip */
605       return;
606     }
607
608     if (do_delete) {
609       /* try to unlink a non-empty directory (bucket) */
610       rc = rgw_unlink(fs, dirs1_b.parent_fh, dirs1_b.name.c_str(),
611                       RGW_UNLINK_FLAG_NONE);
612       ASSERT_NE(rc, 0);
613     }
614     /* try to unlink a non-empty directory (non-bucket) */
615     obj_rec& sdir_0 = get<1>(dirs_vec[0])[0];
616     ASSERT_EQ(sdir_0.name, "sdir_0");
617     ASSERT_TRUE(sdir_0.rgw_fh->is_dir());
618     /* XXX we can't enforce this currently */
619 #if 0
620     ASSERT_EQ(sdir_0.name, "sdir_0");
621     ASSERT_TRUE(sdir_0.rgw_fh->is_dir());
622     rc = rgw_unlink(fs, sdir_0.parent_fh, sdir_0.name.c_str(),
623                     RGW_UNLINK_FLAG_NONE);
624     ASSERT_NE(rc, 0);
625 #endif
626   }
627 }
628
629 TEST(LibRGW, GETATTR_DIRS1)
630 {
631   if (do_dirs1) {
632     int rc;
633     struct stat st;
634     for (auto& dirs_rec : dirs_vec) {
635       obj_rec& dir = get<0>(dirs_rec);
636       if (verbose) {
637         std::cout << "scanning objects in "
638                   << dir.rgw_fh->full_object_name()
639                   << std::endl;
640       }
641       for (auto& sobj : get<1>(dirs_rec)) {
642         rc = rgw_getattr(fs, sobj.fh, &st, RGW_GETATTR_FLAG_NONE);
643         ASSERT_EQ(rc, 0);
644         /* validate, pretty-print */
645         if (sobj.rgw_fh->object_name().find("sfile") != std::string::npos) {
646           ASSERT_TRUE(sobj.rgw_fh->is_file());
647           ASSERT_TRUE(S_ISREG(st.st_mode));
648         }
649         if (sobj.rgw_fh->object_name().find("sdir") != std::string::npos) {
650           ASSERT_TRUE(sobj.rgw_fh->is_dir());
651           ASSERT_TRUE(S_ISDIR(st.st_mode));
652         }
653         /* validate Unix owners */
654         ASSERT_EQ(st.st_uid, owner_uid);
655         ASSERT_EQ(st.st_gid, owner_gid);
656         if (verbose) {
657           obj_rec_st rec_st{sobj, st};
658           std::cout << "\t"
659                     << rec_st
660                     << std::endl;
661         }
662       }
663     }
664   }
665 }
666
667 TEST(LibRGW, READ_DIRS1)
668 {
669   if (do_dirs1) {
670     int rc;
671     char buf[256];
672     size_t nread;
673     for (auto& dirs_rec : dirs_vec) {
674       obj_rec& dir = get<0>(dirs_rec);
675       if (verbose) {
676         std::cout << "read back objects in "
677                   << dir.rgw_fh->full_object_name()
678                   << std::endl;
679       }
680       for (auto& sobj : get<1>(dirs_rec)) {
681         /* only the first 2 file objects have data */
682         if ((sobj.rgw_fh->object_name().find("sfile_0")
683              != std::string::npos) ||
684             (sobj.rgw_fh->object_name().find("sfile_1")
685              != std::string::npos)) {
686           ASSERT_TRUE(sobj.rgw_fh->is_file());
687           ASSERT_EQ(sobj.rgw_fh->get_size(), 16UL);
688           // do it
689           memset(buf, 0, 256);
690           if (verbose) {
691             std::cout << "reading 0,256 " << sobj.rgw_fh->relative_object_name()
692                       << std::endl;
693           }
694           rc = rgw_read(fs, sobj.fh, 0, 256, &nread, buf, RGW_READ_FLAG_NONE);
695           ASSERT_EQ(rc, 0);
696           if (verbose) {
697             std::cout << "\tread back from " << sobj.name
698                       << " : \"" << buf << "\""
699                       << std::endl;
700           }
701         }
702       }
703     }
704   }
705 }
706
707 TEST(LibRGW, READF_SETUP1)
708 {
709   struct stat st;
710   if (do_dirs1) {
711     if (do_create) {
712       if ((! stat(readf_out_name.c_str(), &st)) &&
713           (S_ISREG(st.st_mode)) &&
714           (st.st_size == 6291456))
715         return;
716       ofstream of;
717       of.open(readf_out_name, ios::out|ios::app|ios::binary);
718       for (int ix1 = 0; ix1 < 6; ++ix1) {
719         for (int ix2 = 0; ix2 < 1024*1024; ++ix2) {
720           of << ix1;
721         }
722       }
723     }
724   }
725 }
726
727 TEST(LibRGW, READF_DIRS1) {
728   if (do_dirs1) {
729     if (do_readf) {
730       obj_rec fobj{readf_name, nullptr, dirs1_b.fh, nullptr};
731
732       int rc = rgw_lookup(fs, dirs1_b.fh, fobj.name.c_str(), &fobj.fh,
733                           RGW_LOOKUP_FLAG_NONE);
734       ASSERT_EQ(rc, 0);
735       ASSERT_NE(fobj.fh, nullptr);
736       fobj.sync();
737
738       ofstream of;
739       of.open(readf_out_name, ios::out|ios::app|ios::binary);
740       int bufsz = 1024 * 1024 * sizeof(char);
741       auto buffer = ceph::make_unique<char[]>(bufsz);
742
743       uint64_t offset = 0;
744       uint64_t length = bufsz;
745       for (int ix = 0; ix < 6; ++ix) {
746         size_t nread = 0;
747         memset(buffer.get(), 0, length); // XXX
748         rc = rgw_read(fs, fobj.fh, offset, length, &nread, buffer.get(),
749                       RGW_READ_FLAG_NONE);
750         ASSERT_EQ(rc, 0);
751         ASSERT_EQ(nread, length);
752         of.write(buffer.get(), length);
753         offset += nread;
754       }
755       of.close();
756       rgw_fh_rele(fs, fobj.fh, 0 /* flags */);
757     }
758   }
759 }
760
761 TEST(LibRGW, WRITEF_DIRS1) {
762   if (do_dirs1) {
763     if (do_writef) {
764       int rc;
765       ifstream ifs;
766       ifs.open(readf_out_name, ios::out|ios::app|ios::binary);
767       ASSERT_TRUE(ifs.is_open());
768
769       obj_rec fobj{writef_name, nullptr, dirs1_b.fh, nullptr};
770
771       (void) rgw_lookup(fs, fobj.parent_fh, fobj.name.c_str(), &fobj.fh,
772                         RGW_LOOKUP_FLAG_NONE);
773       if (! fobj.fh) {
774         if (do_create) {
775           /* make a new file object (the hard way) */
776           rc = rgw_lookup(fs, fobj.parent_fh, fobj.name.c_str(), &fobj.fh,
777                           RGW_LOOKUP_FLAG_CREATE);
778           ASSERT_EQ(rc, 0);
779         }
780       }
781       ASSERT_NE(fobj.fh, nullptr);
782       fobj.sync();
783
784       /* begin write transaction */
785       rc = rgw_open(fs, fobj.fh, 0 /* posix flags */, 0 /* flags */);
786       ASSERT_EQ(rc, 0);
787       ASSERT_TRUE(fobj.rgw_fh->is_open());
788
789       int bufsz = 1024 * 1024 * sizeof(char);
790       char *buffer = (char*) malloc(bufsz);
791
792       uint64_t offset = 0;
793       uint64_t length = bufsz;
794       for (int ix = 0; ix < 6; ++ix) {
795         ASSERT_TRUE(ifs.good());
796         ifs.read(buffer, bufsz);
797         size_t nwritten = 0;
798         string str;
799         str.assign(buffer, 4);
800         if (verbose) {
801           std::cout << "read and writing " << length << " bytes"
802                     << " from " << readf_out_name
803                     << " at offset " << offset
804                     << " (" << str << "... [first 4 chars])"
805                     << std::endl;
806         }
807         char* leakbuf = (char*) malloc(bufsz);
808         memcpy(leakbuf, buffer, length);
809         rc = rgw_write(fs, fobj.fh, offset, length, &nwritten, leakbuf,
810                       RGW_WRITE_FLAG_NONE);
811         ASSERT_EQ(rc, 0);
812         ASSERT_EQ(nwritten, length);
813         offset += length;
814       }
815
816       /* commit write transaction */
817       rc = rgw_close(fs, fobj.fh, RGW_CLOSE_FLAG_NONE);
818       ASSERT_EQ(rc, 0);
819
820       ifs.close();
821       free(buffer);
822       rgw_fh_rele(fs, fobj.fh, 0 /* flags */);
823     }
824   }
825 }
826
827 TEST(LibRGW, RELEASE_DIRS1) {
828   if (do_dirs1) {
829     /* force release of handles for children of dirs1--force subsequent
830      * checks to reload them from the cluster.
831      *
832      * while doing this, verify handle cleanup and correct LRU state
833      * (not reachable)
834      */
835     int rc;
836     for (auto& dirs_rec : dirs_vec) {
837       for (auto& obj : get<1>(dirs_rec)) {
838         if (verbose) {
839           std::cout << "release " << obj.name
840                     << " type: " << obj.rgw_fh->stype()
841                     << " refs: " << obj.rgw_fh->get_refcnt()
842                     << std::endl;
843         }
844         ASSERT_EQ(obj.rgw_fh->get_refcnt(), 2UL);
845         rc = rgw_fh_rele(fs, obj.fh, 0 /* flags */);
846         ASSERT_EQ(rc, 0);
847         ASSERT_EQ(obj.rgw_fh->get_refcnt(), 1UL);
848         /* try-discard handle */
849         /* clear obj_rec vec */
850       }
851     }
852   }
853 }
854
855 extern "C" {
856   static bool r1_cb(const char* name, void *arg, uint64_t offset,
857                     uint32_t flags) {
858     struct rgw_file_handle* parent_fh
859       = static_cast<struct rgw_file_handle*>(arg);
860     RGWFileHandle* rgw_fh = get_rgwfh(parent_fh);
861     lsubdout(cct, rgw, 10) << __func__
862                            << " bucket=" << rgw_fh->bucket_name()
863                            << " dir=" << rgw_fh->full_object_name()
864                            << " called back name=" << name
865                            << " flags=" << flags
866                            << dendl;
867     string name_str{name};
868     if (! ((name_str == ".") ||
869            (name_str == ".."))) {
870       obj_stack.push(
871         obj_rec{std::move(name_str), nullptr, parent_fh, nullptr});
872     }
873     return true; /* XXX */
874   }
875 }
876
877 TEST(LibRGW, HIER1) {
878   if (do_hier1) {
879     int rc;
880     obj_stack.push(
881       obj_rec{bucket_name, nullptr, nullptr, nullptr});
882     while (! obj_stack.empty()) {
883       auto& elt = obj_stack.top();
884       if (! elt.fh) {
885         struct rgw_file_handle* parent_fh = elt.parent_fh
886           ? elt.parent_fh : fs->root_fh;
887         RGWFileHandle* pfh = get_rgwfh(parent_fh);
888         rgw::ignore(pfh);
889         lsubdout(cct, rgw, 10)
890           << "rgw_lookup:"
891           << " parent object_name()=" << pfh->object_name()
892           << " parent full_object_name()=" << pfh->full_object_name()
893           << " elt.name=" << elt.name
894           << dendl;
895         rc = rgw_lookup(fs, parent_fh, elt.name.c_str(), &elt.fh,
896                         RGW_LOOKUP_FLAG_NONE);
897         ASSERT_EQ(rc, 0);
898         // XXXX
899         RGWFileHandle* efh = get_rgwfh(elt.fh);
900         rgw::ignore(efh);
901         lsubdout(cct, rgw, 10)
902           << "rgw_lookup result:"
903           << " elt object_name()=" << efh->object_name()
904           << " elt full_object_name()=" << efh->full_object_name()
905           << " elt.name=" << elt.name
906           << dendl;
907
908         ASSERT_NE(elt.fh, nullptr);
909         elt.rgw_fh = get_rgwfh(elt.fh);
910         elt.parent_fh = elt.rgw_fh->get_parent()->get_fh();
911         ASSERT_EQ(elt.parent_fh, parent_fh);
912         continue;
913       } else {
914         // we have a handle in some state in top position
915         switch(elt.fh->fh_type) {
916         case RGW_FS_TYPE_DIRECTORY:
917           if (! elt.state.readdir) {
918             // descending
919             uint64_t offset = 0;
920             bool eof; // XXX
921             lsubdout(cct, rgw, 10)
922               << "readdir in"
923               << " bucket: " << elt.rgw_fh->bucket_name()
924               << " object_name: " << elt.rgw_fh->object_name()
925               << " full_name: " << elt.rgw_fh->full_object_name()
926               << dendl;
927             rc = rgw_readdir(fs, elt.fh, &offset, r1_cb, elt.fh, &eof,
928                             RGW_READDIR_FLAG_DOTDOT);
929             elt.state.readdir = true;
930             ASSERT_EQ(rc, 0);
931             // ASSERT_TRUE(eof); // XXXX working incorrectly w/single readdir
932           } else {
933             // ascending
934             std::cout << elt << std::endl;
935             cleanup_queue.push_back(elt);
936             obj_stack.pop();
937           }
938           break;
939         case RGW_FS_TYPE_FILE:
940           // ascending
941           std::cout << elt << std::endl;
942           cleanup_queue.push_back(elt);
943           obj_stack.pop();
944           break;
945         default:
946           abort();
947         };
948       }
949     }
950   }
951 }
952
953 TEST(LibRGW, MARKER1_SETUP_BUCKET) {
954   /* "large" directory enumeration test.  this one deals only with
955    * file objects */
956   if (do_marker1) {
957     struct stat st;
958     int ret;
959
960     st.st_uid = owner_uid;
961     st.st_gid = owner_gid;
962     st.st_mode = 755;
963
964     if (do_create) {
965       ret = rgw_mkdir(fs, bucket_fh, marker_dir.c_str(), &st, create_mask,
966                       &marker_fh, RGW_MKDIR_FLAG_NONE);
967     } else {
968       ret = rgw_lookup(fs, bucket_fh, marker_dir.c_str(), &marker_fh,
969                        RGW_LOOKUP_FLAG_NONE);
970     }
971     ASSERT_EQ(ret, 0);
972   }
973 }
974
975 TEST(LibRGW, MARKER1_SETUP_OBJECTS)
976 {
977   /* "large" directory enumeration test.  this one deals only with
978    * file objects */
979   if (do_marker1 && do_create) {
980     int ret;
981
982     for (int ix = 0; ix < marker_nobjs; ++ix) {
983       std::string object_name("f_");
984       object_name += to_string(ix);
985       obj_rec obj{object_name, nullptr, marker_fh, nullptr};
986       // lookup object--all operations are by handle
987       ret = rgw_lookup(fs, marker_fh, obj.name.c_str(), &obj.fh,
988                        RGW_LOOKUP_FLAG_CREATE);
989       ASSERT_EQ(ret, 0);
990       obj.rgw_fh = get_rgwfh(obj.fh);
991       // open object--open transaction
992       ret = rgw_open(fs, obj.fh, 0 /* posix flags */, RGW_OPEN_FLAG_NONE);
993       ASSERT_EQ(ret, 0);
994       ASSERT_TRUE(obj.rgw_fh->is_open());
995       // unstable write data
996       size_t nbytes;
997       string data("data for ");
998       data += object_name;
999       int ret = rgw_write(fs, obj.fh, 0, data.length(), &nbytes,
1000                           (void*) data.c_str(), RGW_WRITE_FLAG_NONE);
1001       ASSERT_EQ(ret, 0);
1002       ASSERT_EQ(nbytes, data.length());
1003       // commit transaction (write on close)
1004       ret = rgw_close(fs, obj.fh, 0 /* flags */);
1005       ASSERT_EQ(ret, 0);
1006       // save for cleanup
1007       marker_objs.push_back(obj);
1008     }
1009   }
1010 }
1011
1012 extern "C" {
1013   static bool r2_cb(const char* name, void *arg, uint64_t offset,
1014                     uint32_t flags) {
1015     dirent_vec& dvec =
1016       *(static_cast<dirent_vec*>(arg));
1017     lsubdout(cct, rgw, 10) << __func__
1018                            << " bucket=" << bucket_name
1019                            << " dir=" << marker_dir
1020                            << " iv count=" << dvec.count
1021                            << " called back name=" << name
1022                            << " flags=" << flags
1023                            << dendl;
1024     string name_str{name};
1025     if (! ((name_str == ".") ||
1026            (name_str == ".."))) {
1027       dvec.obj_names.push_back(dirent_t{std::move(name_str), offset});
1028     }
1029     return true; /* XXX */
1030   }
1031 }
1032
1033 TEST(LibRGW, MARKER1_READDIR)
1034 {
1035   if (do_marker1) {
1036     using std::get;
1037
1038     dirent_vec dvec;
1039     uint64_t offset = 0;
1040     bool eof = false;
1041
1042     /* because RGWReaddirRequest::default_max is 1000 (XXX make
1043      * configurable?) and marker_nobjs is 5*1024, the number
1044      * of required rgw_readdir operations N should be
1045      * marker_nobjs/1000 < N < marker_nobjs/1000+1, i.e., 6 when
1046      * marker_nobjs==5*1024 */
1047     uint32_t max_iterations = marker_nobjs/1000+1;
1048
1049     do {
1050       ASSERT_TRUE(dvec.count <= max_iterations);
1051       int ret = rgw_readdir(fs, marker_fh, &offset, r2_cb, &dvec, &eof,
1052                             RGW_READDIR_FLAG_DOTDOT);
1053       ASSERT_EQ(ret, 0);
1054       ASSERT_EQ(offset, get<1>(dvec.obj_names.back())); // cookie check
1055       ++dvec.count;
1056     } while(!eof);
1057     std::cout << "Read " << dvec.obj_names.size() << " objects in "
1058               << marker_dir.c_str() << std::endl;
1059   }
1060 }
1061
1062 TEST(LibRGW, MARKER1_OBJ_CLEANUP)
1063 {
1064   int rc;
1065   for (auto& obj : marker_objs) {
1066     if (obj.fh) {
1067       if (do_delete) {
1068         if (verbose) {
1069           std::cout << "unlinking: " << bucket_name << ":" << obj.name
1070                     << std::endl;
1071         }
1072         rc = rgw_unlink(fs, marker_fh, obj.name.c_str(), RGW_UNLINK_FLAG_NONE);
1073       }
1074       rc = rgw_fh_rele(fs, obj.fh, 0 /* flags */);
1075       ASSERT_EQ(rc, 0);
1076     }
1077   }
1078   marker_objs.clear();
1079 }
1080
1081 TEST(LibRGW, CLEANUP) {
1082   int rc;
1083
1084   if (do_marker1) {
1085     cleanup_queue.push_back(
1086       obj_rec{bucket_name, bucket_fh, fs->root_fh, get_rgwfh(fs->root_fh)});
1087   }
1088
1089   for (auto& elt : cleanup_queue) {
1090     if (elt.fh) {
1091       rc = rgw_fh_rele(fs, elt.fh, 0 /* flags */);
1092       ASSERT_EQ(rc, 0);
1093     }
1094   }
1095   cleanup_queue.clear();
1096 }
1097
1098 TEST(LibRGW, UMOUNT) {
1099   if (! fs)
1100     return;
1101
1102   int ret = rgw_umount(fs, RGW_UMOUNT_FLAG_NONE);
1103   ASSERT_EQ(ret, 0);
1104 }
1105
1106 TEST(LibRGW, SHUTDOWN) {
1107   librgw_shutdown(rgw_h);
1108 }
1109
1110 int main(int argc, char *argv[])
1111 {
1112   char *v{nullptr};
1113   string val;
1114   vector<const char*> args;
1115
1116   argv_to_vec(argc, const_cast<const char**>(argv), args);
1117   env_to_vec(args);
1118
1119   v = getenv("AWS_ACCESS_KEY_ID");
1120   if (v) {
1121     access_key = v;
1122   }
1123
1124   v = getenv("AWS_SECRET_ACCESS_KEY");
1125   if (v) {
1126     secret_key = v;
1127   }
1128
1129   for (auto arg_iter = args.begin(); arg_iter != args.end();) {
1130     if (ceph_argparse_witharg(args, arg_iter, &val, "--access",
1131                               (char*) nullptr)) {
1132       access_key = val;
1133     } else if (ceph_argparse_witharg(args, arg_iter, &val, "--secret",
1134                                      (char*) nullptr)) {
1135       secret_key = val;
1136     } else if (ceph_argparse_witharg(args, arg_iter, &val, "--userid",
1137                                      (char*) nullptr)) {
1138       userid = val;
1139     } else if (ceph_argparse_witharg(args, arg_iter, &val, "--bn",
1140                                      (char*) nullptr)) {
1141       bucket_name = val;
1142     } else if (ceph_argparse_witharg(args, arg_iter, &val, "--uid",
1143                                      (char*) nullptr)) {
1144       owner_uid = std::stoi(val);
1145     } else if (ceph_argparse_witharg(args, arg_iter, &val, "--gid",
1146                                      (char*) nullptr)) {
1147       owner_gid = std::stoi(val);
1148     } else if (ceph_argparse_flag(args, arg_iter, "--hier1",
1149                                             (char*) nullptr)) {
1150       do_hier1 = true;
1151     } else if (ceph_argparse_flag(args, arg_iter, "--dirs1",
1152                                             (char*) nullptr)) {
1153       do_dirs1 = true;
1154     } else if (ceph_argparse_flag(args, arg_iter, "--marker1",
1155                                             (char*) nullptr)) {
1156       do_marker1 = true;
1157     } else if (ceph_argparse_flag(args, arg_iter, "--setattr",
1158                                             (char*) nullptr)) {
1159       do_setattr = true;
1160     } else if (ceph_argparse_flag(args, arg_iter, "--create",
1161                                             (char*) nullptr)) {
1162       do_create = true;
1163     } else if (ceph_argparse_flag(args, arg_iter, "--delete",
1164                                             (char*) nullptr)) {
1165       do_delete = true;
1166     } else if (ceph_argparse_flag(args, arg_iter, "--rename",
1167                                             (char*) nullptr)) {
1168       do_rename = true;
1169     } else if (ceph_argparse_flag(args, arg_iter, "--readf",
1170                                             (char*) nullptr)) {
1171       do_readf = true;
1172     } else if (ceph_argparse_flag(args, arg_iter, "--writef",
1173                                             (char*) nullptr)) {
1174       do_writef = true;
1175     } else if (ceph_argparse_flag(args, arg_iter, "--verbose",
1176                                             (char*) nullptr)) {
1177       verbose = true;
1178     } else {
1179       ++arg_iter;
1180     }
1181   }
1182
1183   /* dont accidentally run as anonymous */
1184   if ((access_key == "") ||
1185       (secret_key == "")) {
1186     std::cout << argv[0] << " no AWS credentials, exiting" << std::endl;
1187     return EPERM;
1188   }
1189
1190   saved_args.argc = argc;
1191   saved_args.argv = argv;
1192
1193   ::testing::InitGoogleTest(&argc, argv);
1194   return RUN_ALL_TESTS();
1195 }