Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / librbd / test_internal.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "cls/rbd/cls_rbd_types.h"
5 #include "test/librbd/test_fixture.h"
6 #include "test/librbd/test_support.h"
7 #include "include/rbd/librbd.h"
8 #include "librbd/ExclusiveLock.h"
9 #include "librbd/ImageState.h"
10 #include "librbd/ImageWatcher.h"
11 #include "librbd/internal.h"
12 #include "librbd/ObjectMap.h"
13 #include "librbd/Operations.h"
14 #include "librbd/api/DiffIterate.h"
15 #include "librbd/io/AioCompletion.h"
16 #include "librbd/io/ImageRequest.h"
17 #include "librbd/io/ImageRequestWQ.h"
18 #include "osdc/Striper.h"
19 #include <boost/scope_exit.hpp>
20 #include <boost/algorithm/string/predicate.hpp>
21 #include <boost/assign/list_of.hpp>
22 #include <utility>
23 #include <vector>
24
25 void register_test_internal() {
26 }
27
28 class TestInternal : public TestFixture {
29 public:
30
31   TestInternal() {}
32
33   typedef std::vector<std::pair<std::string, bool> > Snaps;
34
35   void TearDown() override {
36     unlock_image();
37     for (Snaps::iterator iter = m_snaps.begin(); iter != m_snaps.end(); ++iter) {
38       librbd::ImageCtx *ictx;
39       EXPECT_EQ(0, open_image(m_image_name, &ictx));
40       if (iter->second) {
41         EXPECT_EQ(0,
42                   ictx->operations->snap_unprotect(cls::rbd::UserSnapshotNamespace(),
43                                                    iter->first.c_str()));
44       }
45       EXPECT_EQ(0,
46                 ictx->operations->snap_remove(cls::rbd::UserSnapshotNamespace(),
47                                               iter->first.c_str()));
48     }
49
50     TestFixture::TearDown();
51   }
52
53   int create_snapshot(const char *snap_name, bool snap_protect) {
54     librbd::ImageCtx *ictx;
55     int r = open_image(m_image_name, &ictx);
56     if (r < 0) {
57       return r;
58     }
59
60     r = snap_create(*ictx, snap_name);
61     if (r < 0) {
62       return r;
63     }
64
65     m_snaps.push_back(std::make_pair(snap_name, snap_protect));
66     if (snap_protect) {
67       r = ictx->operations->snap_protect(cls::rbd::UserSnapshotNamespace(), snap_name);
68       if (r < 0) {
69         return r;
70       }
71     }
72     close_image(ictx);
73     return 0;
74   }
75
76   Snaps m_snaps;
77 };
78
79 class DummyContext : public Context {
80 public:
81   void finish(int r) override {
82   }
83 };
84
85 void generate_random_iomap(librbd::Image &image, int num_objects, int object_size,
86                            int max_count, map<uint64_t, uint64_t> &iomap)
87 {
88   uint64_t stripe_unit, stripe_count;
89
90   stripe_unit = image.get_stripe_unit();
91   stripe_count = image.get_stripe_count();
92
93   while (max_count-- > 0) {
94     // generate random image offset based on base random object
95     // number and object offset and then map that back to an
96     // object number based on stripe unit and count.
97     uint64_t ono = rand() % num_objects;
98     uint64_t offset = rand() % (object_size - TEST_IO_SIZE);
99     uint64_t imageoff = (ono * object_size) + offset;
100
101     file_layout_t layout;
102     layout.object_size = object_size;
103     layout.stripe_unit = stripe_unit;
104     layout.stripe_count = stripe_count;
105
106     vector<ObjectExtent> ex;
107     Striper::file_to_extents(g_ceph_context, 1, &layout, imageoff, TEST_IO_SIZE, 0, ex);
108
109     // lets not worry if IO spans multiple extents (>1 object). in such
110     // as case we would perform the write multiple times to the same
111     // offset, but we record all objects that would be generated with
112     // this IO. TODO: fix this if such a need is required by your
113     // test.
114     vector<ObjectExtent>::iterator it;
115     map<uint64_t, uint64_t> curr_iomap;
116     for (it = ex.begin(); it != ex.end(); ++it) {
117       if (iomap.find((*it).objectno) != iomap.end()) {
118         break;
119       }
120
121       curr_iomap.insert(make_pair((*it).objectno, imageoff));
122     }
123
124     if (it == ex.end()) {
125       iomap.insert(curr_iomap.begin(), curr_iomap.end());
126     }
127   }
128 }
129
130 TEST_F(TestInternal, OpenByID) {
131    REQUIRE_FORMAT_V2();
132
133    librbd::ImageCtx *ictx;
134    ASSERT_EQ(0, open_image(m_image_name, &ictx));
135    std::string id = ictx->id;
136    close_image(ictx);
137
138    ictx = new librbd::ImageCtx("", id, nullptr, m_ioctx, true);
139    ASSERT_EQ(0, ictx->state->open(false));
140    ASSERT_EQ(ictx->name, m_image_name);
141    close_image(ictx);
142 }
143
144 TEST_F(TestInternal, IsExclusiveLockOwner) {
145   REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
146
147   librbd::ImageCtx *ictx;
148   ASSERT_EQ(0, open_image(m_image_name, &ictx));
149
150   bool is_owner;
151   ASSERT_EQ(0, librbd::is_exclusive_lock_owner(ictx, &is_owner));
152   ASSERT_FALSE(is_owner);
153
154   C_SaferCond ctx;
155   {
156     RWLock::WLocker l(ictx->owner_lock);
157     ictx->exclusive_lock->try_acquire_lock(&ctx);
158   }
159   ASSERT_EQ(0, ctx.wait());
160   ASSERT_EQ(0, librbd::is_exclusive_lock_owner(ictx, &is_owner));
161   ASSERT_TRUE(is_owner);
162 }
163
164 TEST_F(TestInternal, ResizeLocksImage) {
165   REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
166
167   librbd::ImageCtx *ictx;
168   ASSERT_EQ(0, open_image(m_image_name, &ictx));
169
170   librbd::NoOpProgressContext no_op;
171   ASSERT_EQ(0, ictx->operations->resize(m_image_size >> 1, true, no_op));
172
173   bool is_owner;
174   ASSERT_EQ(0, librbd::is_exclusive_lock_owner(ictx, &is_owner));
175   ASSERT_TRUE(is_owner);
176 }
177
178 TEST_F(TestInternal, ResizeFailsToLockImage) {
179   REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
180
181   librbd::ImageCtx *ictx;
182   ASSERT_EQ(0, open_image(m_image_name, &ictx));
183   ASSERT_EQ(0, lock_image(*ictx, LOCK_EXCLUSIVE, "manually locked"));
184
185   librbd::NoOpProgressContext no_op;
186   ASSERT_EQ(-EROFS, ictx->operations->resize(m_image_size >> 1, true, no_op));
187 }
188
189 TEST_F(TestInternal, SnapCreateLocksImage) {
190   REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
191
192   librbd::ImageCtx *ictx;
193   ASSERT_EQ(0, open_image(m_image_name, &ictx));
194
195   ASSERT_EQ(0, snap_create(*ictx, "snap1"));
196   BOOST_SCOPE_EXIT( (ictx) ) {
197     ASSERT_EQ(0,
198               ictx->operations->snap_remove(cls::rbd::UserSnapshotNamespace(),
199                                             "snap1"));
200   } BOOST_SCOPE_EXIT_END;
201
202   bool is_owner;
203   ASSERT_EQ(0, librbd::is_exclusive_lock_owner(ictx, &is_owner));
204   ASSERT_TRUE(is_owner);
205 }
206
207 TEST_F(TestInternal, SnapCreateFailsToLockImage) {
208   REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
209
210   librbd::ImageCtx *ictx;
211   ASSERT_EQ(0, open_image(m_image_name, &ictx));
212   ASSERT_EQ(0, lock_image(*ictx, LOCK_EXCLUSIVE, "manually locked"));
213
214   ASSERT_EQ(-EROFS, snap_create(*ictx, "snap1"));
215 }
216
217 TEST_F(TestInternal, SnapRollbackLocksImage) {
218   REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
219
220   ASSERT_EQ(0, create_snapshot("snap1", false));
221
222   librbd::ImageCtx *ictx;
223   ASSERT_EQ(0, open_image(m_image_name, &ictx));
224
225   librbd::NoOpProgressContext no_op;
226   ASSERT_EQ(0, ictx->operations->snap_rollback(cls::rbd::UserSnapshotNamespace(),
227                                                "snap1",
228                                                no_op));
229
230   bool is_owner;
231   ASSERT_EQ(0, librbd::is_exclusive_lock_owner(ictx, &is_owner));
232   ASSERT_TRUE(is_owner);
233 }
234
235 TEST_F(TestInternal, SnapRollbackFailsToLockImage) {
236   REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
237
238
239   ASSERT_EQ(0, create_snapshot("snap1", false));
240
241   librbd::ImageCtx *ictx;
242   ASSERT_EQ(0, open_image(m_image_name, &ictx));
243   ASSERT_EQ(0, lock_image(*ictx, LOCK_EXCLUSIVE, "manually locked"));
244
245   librbd::NoOpProgressContext no_op;
246   ASSERT_EQ(-EROFS,
247             ictx->operations->snap_rollback(cls::rbd::UserSnapshotNamespace(),
248                                             "snap1",
249                                             no_op));
250 }
251
252 TEST_F(TestInternal, SnapSetReleasesLock) {
253   REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
254
255   ASSERT_EQ(0, create_snapshot("snap1", false));
256
257   librbd::ImageCtx *ictx;
258   ASSERT_EQ(0, open_image(m_image_name, &ictx));
259   ASSERT_EQ(0, librbd::snap_set(ictx, cls::rbd::UserSnapshotNamespace(), "snap1"));
260
261   bool is_owner;
262   ASSERT_EQ(0, librbd::is_exclusive_lock_owner(ictx, &is_owner));
263   ASSERT_FALSE(is_owner);
264 }
265
266 TEST_F(TestInternal, FlattenLocksImage) {
267   REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK | RBD_FEATURE_LAYERING);
268
269   ASSERT_EQ(0, create_snapshot("snap1", true));
270
271   librbd::ImageCtx *ictx;
272   ASSERT_EQ(0, open_image(m_image_name, &ictx));
273
274   uint64_t features;
275   ASSERT_EQ(0, librbd::get_features(ictx, &features));
276
277   std::string clone_name = get_temp_image_name();
278   int order = ictx->order;
279   ASSERT_EQ(0, librbd::clone(m_ioctx, m_image_name.c_str(), "snap1", m_ioctx,
280                              clone_name.c_str(), features, &order, 0, 0));
281
282   librbd::ImageCtx *ictx2;
283   ASSERT_EQ(0, open_image(clone_name, &ictx2));
284
285   librbd::NoOpProgressContext no_op;
286   ASSERT_EQ(0, ictx2->operations->flatten(no_op));
287
288   bool is_owner;
289   ASSERT_EQ(0, librbd::is_exclusive_lock_owner(ictx2, &is_owner));
290   ASSERT_TRUE(is_owner);
291 }
292
293 TEST_F(TestInternal, FlattenFailsToLockImage) {
294   REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK | RBD_FEATURE_LAYERING);
295
296   ASSERT_EQ(0, create_snapshot("snap1", true));
297
298   librbd::ImageCtx *ictx;
299   ASSERT_EQ(0, open_image(m_image_name, &ictx));
300
301   uint64_t features;
302   ASSERT_EQ(0, librbd::get_features(ictx, &features));
303
304   std::string clone_name = get_temp_image_name();
305   int order = ictx->order;
306   ASSERT_EQ(0, librbd::clone(m_ioctx, m_image_name.c_str(), "snap1", m_ioctx,
307                              clone_name.c_str(), features, &order, 0, 0));
308
309   TestInternal *parent = this;
310   librbd::ImageCtx *ictx2 = NULL;
311   BOOST_SCOPE_EXIT( (&m_ioctx) (clone_name) (parent) (&ictx2) ) {
312     if (ictx2 != NULL) {
313       parent->close_image(ictx2);
314       parent->unlock_image();
315     }
316     librbd::NoOpProgressContext no_op;
317     ASSERT_EQ(0, librbd::remove(m_ioctx, clone_name, "", no_op));
318   } BOOST_SCOPE_EXIT_END;
319
320   ASSERT_EQ(0, open_image(clone_name, &ictx2));
321   ASSERT_EQ(0, lock_image(*ictx2, LOCK_EXCLUSIVE, "manually locked"));
322
323   librbd::NoOpProgressContext no_op;
324   ASSERT_EQ(-EROFS, ictx2->operations->flatten(no_op));
325 }
326
327 TEST_F(TestInternal, AioWriteRequestsLock) {
328   REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
329
330   librbd::ImageCtx *ictx;
331   ASSERT_EQ(0, open_image(m_image_name, &ictx));
332   ASSERT_EQ(0, lock_image(*ictx, LOCK_EXCLUSIVE, "manually locked"));
333
334   std::string buffer(256, '1');
335   Context *ctx = new DummyContext();
336   auto c = librbd::io::AioCompletion::create(ctx);
337   c->get();
338
339   bufferlist bl;
340   bl.append(buffer);
341   ictx->io_work_queue->aio_write(c, 0, buffer.size(), std::move(bl), 0);
342
343   bool is_owner;
344   ASSERT_EQ(0, librbd::is_exclusive_lock_owner(ictx, &is_owner));
345   ASSERT_FALSE(is_owner);
346   ASSERT_FALSE(c->is_complete());
347
348   unlock_image();
349   ASSERT_EQ(0, c->wait_for_complete());
350   c->put();
351 }
352
353 TEST_F(TestInternal, AioDiscardRequestsLock) {
354   REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
355
356   librbd::ImageCtx *ictx;
357   ASSERT_EQ(0, open_image(m_image_name, &ictx));
358   ASSERT_EQ(0, lock_image(*ictx, LOCK_EXCLUSIVE, "manually locked"));
359
360   Context *ctx = new DummyContext();
361   auto c = librbd::io::AioCompletion::create(ctx);
362   c->get();
363   ictx->io_work_queue->aio_discard(c, 0, 256, false);
364
365   bool is_owner;
366   ASSERT_EQ(0, librbd::is_exclusive_lock_owner(ictx, &is_owner));
367   ASSERT_FALSE(is_owner);
368   ASSERT_FALSE(c->is_complete());
369
370   unlock_image();
371   ASSERT_EQ(0, c->wait_for_complete());
372   c->put();
373 }
374
375 TEST_F(TestInternal, CancelAsyncResize) {
376   REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
377
378   librbd::ImageCtx *ictx;
379   ASSERT_EQ(0, open_image(m_image_name, &ictx));
380
381   C_SaferCond ctx;
382   {
383     RWLock::WLocker l(ictx->owner_lock);
384     ictx->exclusive_lock->try_acquire_lock(&ctx);
385   }
386
387   ASSERT_EQ(0, ctx.wait());
388   {
389     RWLock::RLocker owner_locker(ictx->owner_lock);
390     ASSERT_TRUE(ictx->exclusive_lock->is_lock_owner());
391   }
392
393   uint64_t size;
394   ASSERT_EQ(0, librbd::get_size(ictx, &size));
395
396   uint32_t attempts = 0;
397   while (attempts++ < 20 && size > 0) {
398     C_SaferCond ctx;
399     librbd::NoOpProgressContext prog_ctx;
400
401     size -= MIN(size, 1<<18);
402     {
403       RWLock::RLocker l(ictx->owner_lock);
404       ictx->operations->execute_resize(size, true, prog_ctx, &ctx, 0);
405     }
406
407     // try to interrupt the in-progress resize
408     ictx->cancel_async_requests();
409
410     int r = ctx.wait();
411     if (r == -ERESTART) {
412       std::cout << "detected canceled async request" << std::endl;
413       break;
414     }
415     ASSERT_EQ(0, r);
416   }
417 }
418
419 TEST_F(TestInternal, MultipleResize) {
420   librbd::ImageCtx *ictx;
421   ASSERT_EQ(0, open_image(m_image_name, &ictx));
422
423   if (ictx->exclusive_lock != nullptr) {
424     C_SaferCond ctx;
425     {
426       RWLock::WLocker l(ictx->owner_lock);
427       ictx->exclusive_lock->try_acquire_lock(&ctx);
428     }
429
430     RWLock::RLocker owner_locker(ictx->owner_lock);
431     ASSERT_EQ(0, ctx.wait());
432     ASSERT_TRUE(ictx->exclusive_lock->is_lock_owner());
433   }
434
435   uint64_t size;
436   ASSERT_EQ(0, librbd::get_size(ictx, &size));
437   uint64_t original_size = size;
438
439   std::vector<C_SaferCond*> contexts;
440
441   uint32_t attempts = 0;
442   librbd::NoOpProgressContext prog_ctx;
443   while (size > 0) {
444     uint64_t new_size = original_size;
445     if (attempts++ % 2 == 0) {
446       size -= MIN(size, 1<<18);
447       new_size = size;
448     }
449
450     RWLock::RLocker l(ictx->owner_lock);
451     contexts.push_back(new C_SaferCond());
452     ictx->operations->execute_resize(new_size, true, prog_ctx, contexts.back(), 0);
453   }
454
455   for (uint32_t i = 0; i < contexts.size(); ++i) {
456     ASSERT_EQ(0, contexts[i]->wait());
457     delete contexts[i];
458   }
459
460   ASSERT_EQ(0, librbd::get_size(ictx, &size));
461   ASSERT_EQ(0U, size);
462 }
463
464 TEST_F(TestInternal, Metadata) {
465   REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
466
467   map<string, bool> test_confs = boost::assign::map_list_of(
468     "aaaaaaa", false)(
469     "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", false)(
470     "cccccccccccccc", false);
471   map<string, bool>::iterator it = test_confs.begin();
472   int r;
473   librbd::ImageCtx *ictx;
474   ASSERT_EQ(0, open_image(m_image_name, &ictx));
475
476   r = ictx->operations->metadata_set(it->first, "value1");
477   ASSERT_EQ(0, r);
478   ++it;
479   r = ictx->operations->metadata_set(it->first, "value2");
480   ASSERT_EQ(0, r);
481   ++it;
482   r = ictx->operations->metadata_set(it->first, "value3");
483   ASSERT_EQ(0, r);
484   r = ictx->operations->metadata_set("abcd", "value4");
485   ASSERT_EQ(0, r);
486   r = ictx->operations->metadata_set("xyz", "value5");
487   ASSERT_EQ(0, r);
488   map<string, bufferlist> pairs;
489   r = librbd::metadata_list(ictx, "", 0, &pairs);
490   ASSERT_EQ(0, r);
491   ASSERT_EQ(5u, pairs.size());
492   r = ictx->operations->metadata_remove("abcd");
493   ASSERT_EQ(0, r);
494   r = ictx->operations->metadata_remove("xyz");
495   ASSERT_EQ(0, r);
496   pairs.clear();
497   r = librbd::metadata_list(ictx, "", 0, &pairs);
498   ASSERT_EQ(0, r);
499   ASSERT_EQ(3u, pairs.size());
500   string val;
501   r = librbd::metadata_get(ictx, it->first, &val);
502   ASSERT_EQ(0, r);
503   ASSERT_STREQ(val.c_str(), "value3");
504 }
505
506 TEST_F(TestInternal, MetadataFilter) {
507   REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
508
509   map<string, bool> test_confs = boost::assign::map_list_of(
510     "aaaaaaa", false)(
511     "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", false)(
512     "cccccccccccccc", false);
513   map<string, bool>::iterator it = test_confs.begin();
514   const string prefix = "test_config_";
515   bool is_continue;
516   librbd::ImageCtx *ictx;
517   ASSERT_EQ(0, open_image(m_image_name, &ictx));
518
519   librbd::Image image1;
520   map<string, bufferlist> pairs, res;
521   pairs["abc"].append("value");
522   pairs["abcabc"].append("value");
523   pairs[prefix+it->first].append("value1");
524   ++it;
525   pairs[prefix+it->first].append("value2");
526   ++it;
527   pairs[prefix+it->first].append("value3");
528   pairs[prefix+"asdfsdaf"].append("value6");
529   pairs[prefix+"zxvzxcv123"].append("value5");
530
531   is_continue = ictx->_filter_metadata_confs(prefix, test_confs, pairs, &res);
532   ASSERT_TRUE(is_continue);
533   ASSERT_TRUE(res.size() == 3U);
534   it = test_confs.begin();
535   ASSERT_TRUE(res.count(it->first));
536   ASSERT_TRUE(it->second);
537   ++it;
538   ASSERT_TRUE(res.count(it->first));
539   ASSERT_TRUE(it->second);
540   ++it;
541   ASSERT_TRUE(res.count(it->first));
542   ASSERT_TRUE(it->second);
543   res.clear();
544
545   pairs["zzzzzzzz"].append("value7");
546   is_continue = ictx->_filter_metadata_confs(prefix, test_confs, pairs, &res);
547   ASSERT_FALSE(is_continue);
548   ASSERT_TRUE(res.size() == 3U);
549 }
550
551 TEST_F(TestInternal, SnapshotCopyup)
552 {
553   REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
554
555   librbd::ImageCtx *ictx;
556   ASSERT_EQ(0, open_image(m_image_name, &ictx));
557
558   bufferlist bl;
559   bl.append(std::string(256, '1'));
560   ASSERT_EQ(256, ictx->io_work_queue->write(0, bl.length(), bufferlist{bl}, 0));
561
562   ASSERT_EQ(0, snap_create(*ictx, "snap1"));
563   ASSERT_EQ(0,
564             ictx->operations->snap_protect(cls::rbd::UserSnapshotNamespace(),
565                                            "snap1"));
566
567   uint64_t features;
568   ASSERT_EQ(0, librbd::get_features(ictx, &features));
569
570   std::string clone_name = get_temp_image_name();
571   int order = ictx->order;
572   ASSERT_EQ(0, librbd::clone(m_ioctx, m_image_name.c_str(), "snap1", m_ioctx,
573                              clone_name.c_str(), features, &order, 0, 0));
574
575   librbd::ImageCtx *ictx2;
576   ASSERT_EQ(0, open_image(clone_name, &ictx2));
577
578   ASSERT_EQ(0, snap_create(*ictx2, "snap1"));
579   ASSERT_EQ(0, snap_create(*ictx2, "snap2"));
580
581   ASSERT_EQ(256, ictx2->io_work_queue->write(256, bl.length(), bufferlist{bl},
582                                              0));
583
584   librados::IoCtx snap_ctx;
585   snap_ctx.dup(ictx2->data_ctx);
586   snap_ctx.snap_set_read(CEPH_SNAPDIR);
587
588   librados::snap_set_t snap_set;
589   ASSERT_EQ(0, snap_ctx.list_snaps(ictx2->get_object_name(0), &snap_set));
590
591   std::vector< std::pair<uint64_t,uint64_t> > expected_overlap =
592     boost::assign::list_of(
593       std::make_pair(0, 256))(
594       std::make_pair(512, 2096640));
595   ASSERT_EQ(2U, snap_set.clones.size());
596   ASSERT_NE(CEPH_NOSNAP, snap_set.clones[0].cloneid);
597   ASSERT_EQ(2U, snap_set.clones[0].snaps.size());
598   ASSERT_EQ(expected_overlap, snap_set.clones[0].overlap);
599   ASSERT_EQ(CEPH_NOSNAP, snap_set.clones[1].cloneid);
600
601   bufferptr read_ptr(256);
602   bufferlist read_bl;
603   read_bl.push_back(read_ptr);
604
605   std::list<std::string> snaps = {"snap1", "snap2", ""};
606   librbd::io::ReadResult read_result{&read_bl};
607   for (std::list<std::string>::iterator it = snaps.begin();
608        it != snaps.end(); ++it) {
609     const char *snap_name = it->empty() ? NULL : it->c_str();
610     ASSERT_EQ(0, librbd::snap_set(ictx2,
611                                   cls::rbd::UserSnapshotNamespace(),
612                                   snap_name));
613
614     ASSERT_EQ(256,
615               ictx2->io_work_queue->read(0, 256,
616                                          librbd::io::ReadResult{read_result},
617                                          0));
618     ASSERT_TRUE(bl.contents_equal(read_bl));
619
620     ASSERT_EQ(256,
621               ictx2->io_work_queue->read(256, 256,
622                                          librbd::io::ReadResult{read_result},
623                                          0));
624     if (snap_name == NULL) {
625       ASSERT_TRUE(bl.contents_equal(read_bl));
626     } else {
627       ASSERT_TRUE(read_bl.is_zero());
628     }
629
630     // verify the object map was properly updated
631     if ((ictx2->features & RBD_FEATURE_OBJECT_MAP) != 0) {
632       uint8_t state = OBJECT_EXISTS;
633       if ((ictx2->features & RBD_FEATURE_FAST_DIFF) != 0 &&
634           it != snaps.begin() && snap_name != NULL) {
635         state = OBJECT_EXISTS_CLEAN;
636       }
637
638       librbd::ObjectMap<> object_map(*ictx2, ictx2->snap_id);
639       C_SaferCond ctx;
640       object_map.open(&ctx);
641       ASSERT_EQ(0, ctx.wait());
642
643       RWLock::WLocker object_map_locker(ictx2->object_map_lock);
644       ASSERT_EQ(state, object_map[0]);
645     }
646   }
647 }
648
649 TEST_F(TestInternal, ResizeCopyup)
650 {
651   REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
652
653   m_image_name = get_temp_image_name();
654   m_image_size = 1 << 14;
655
656   uint64_t features = 0;
657   get_features(&features);
658   int order = 12;
659   ASSERT_EQ(0, m_rbd.create2(m_ioctx, m_image_name.c_str(), m_image_size,
660                              features, &order));
661
662   librbd::ImageCtx *ictx;
663   ASSERT_EQ(0, open_image(m_image_name, &ictx));
664
665   bufferlist bl;
666   bl.append(std::string(4096, '1'));
667   for (size_t i = 0; i < m_image_size; i += bl.length()) {
668     ASSERT_EQ((ssize_t)bl.length(),
669               ictx->io_work_queue->write(i, bl.length(),
670                                          bufferlist{bl}, 0));
671   }
672
673   ASSERT_EQ(0, snap_create(*ictx, "snap1"));
674   ASSERT_EQ(0,
675             ictx->operations->snap_protect(cls::rbd::UserSnapshotNamespace(),
676                                            "snap1"));
677
678   std::string clone_name = get_temp_image_name();
679   ASSERT_EQ(0, librbd::clone(m_ioctx, m_image_name.c_str(), "snap1", m_ioctx,
680                              clone_name.c_str(), features, &order, 0, 0));
681
682   librbd::ImageCtx *ictx2;
683   ASSERT_EQ(0, open_image(clone_name, &ictx2));
684   ASSERT_EQ(0, snap_create(*ictx2, "snap1"));
685
686   bufferptr read_ptr(bl.length());
687   bufferlist read_bl;
688   read_bl.push_back(read_ptr);
689
690   // verify full / partial object removal properly copyup
691   librbd::NoOpProgressContext no_op;
692   ASSERT_EQ(0, ictx2->operations->resize(m_image_size - (1 << order) - 32,
693                                          true, no_op));
694   ASSERT_EQ(0, ictx2->operations->resize(m_image_size - (2 << order) - 32,
695                                          true, no_op));
696   ASSERT_EQ(0, librbd::snap_set(ictx2,
697                                 cls::rbd::UserSnapshotNamespace(),
698                                 "snap1"));
699
700   {
701     // hide the parent from the snapshot
702     RWLock::WLocker snap_locker(ictx2->snap_lock);
703     ictx2->snap_info.begin()->second.parent = librbd::ParentInfo();
704   }
705
706   librbd::io::ReadResult read_result{&read_bl};
707   for (size_t i = 2 << order; i < m_image_size; i += bl.length()) {
708     ASSERT_EQ((ssize_t)bl.length(),
709               ictx2->io_work_queue->read(i, bl.length(),
710                                          librbd::io::ReadResult{read_result},
711                                          0));
712     ASSERT_TRUE(bl.contents_equal(read_bl));
713   }
714 }
715
716 TEST_F(TestInternal, DiscardCopyup)
717 {
718   REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
719
720   CephContext* cct = reinterpret_cast<CephContext*>(_rados.cct());
721   REQUIRE(!cct->_conf->get_val<bool>("rbd_skip_partial_discard"));
722
723   m_image_name = get_temp_image_name();
724   m_image_size = 1 << 14;
725
726   uint64_t features = 0;
727   get_features(&features);
728   int order = 12;
729   ASSERT_EQ(0, m_rbd.create2(m_ioctx, m_image_name.c_str(), m_image_size,
730                              features, &order));
731
732   librbd::ImageCtx *ictx;
733   ASSERT_EQ(0, open_image(m_image_name, &ictx));
734
735   bufferlist bl;
736   bl.append(std::string(4096, '1'));
737   for (size_t i = 0; i < m_image_size; i += bl.length()) {
738     ASSERT_EQ((ssize_t)bl.length(),
739               ictx->io_work_queue->write(i, bl.length(),
740                                          bufferlist{bl}, 0));
741   }
742
743   ASSERT_EQ(0, snap_create(*ictx, "snap1"));
744   ASSERT_EQ(0,
745             ictx->operations->snap_protect(cls::rbd::UserSnapshotNamespace(),
746                                            "snap1"));
747
748   std::string clone_name = get_temp_image_name();
749   ASSERT_EQ(0, librbd::clone(m_ioctx, m_image_name.c_str(), "snap1", m_ioctx,
750                              clone_name.c_str(), features, &order, 0, 0));
751
752   librbd::ImageCtx *ictx2;
753   ASSERT_EQ(0, open_image(clone_name, &ictx2));
754
755   ASSERT_EQ(0, snap_create(*ictx2, "snap1"));
756
757   bufferptr read_ptr(bl.length());
758   bufferlist read_bl;
759   read_bl.push_back(read_ptr);
760
761   ASSERT_EQ(static_cast<int>(m_image_size - 64),
762             ictx2->io_work_queue->discard(32, m_image_size - 64, false));
763   ASSERT_EQ(0, librbd::snap_set(ictx2,
764                                 cls::rbd::UserSnapshotNamespace(),
765                                 "snap1"));
766
767   {
768     // hide the parent from the snapshot
769     RWLock::WLocker snap_locker(ictx2->snap_lock);
770     ictx2->snap_info.begin()->second.parent = librbd::ParentInfo();
771   }
772
773   librbd::io::ReadResult read_result{&read_bl};
774   for (size_t i = 0; i < m_image_size; i += bl.length()) {
775     ASSERT_EQ((ssize_t)bl.length(),
776               ictx2->io_work_queue->read(i, bl.length(),
777                                          librbd::io::ReadResult{read_result},
778                                          0));
779     ASSERT_TRUE(bl.contents_equal(read_bl));
780   }
781 }
782
783 TEST_F(TestInternal, ShrinkFlushesCache) {
784   librbd::ImageCtx *ictx;
785   ASSERT_EQ(0, open_image(m_image_name, &ictx));
786
787   std::string buffer(4096, '1');
788
789   // ensure write-path is initialized
790   bufferlist write_bl;
791   write_bl.append(buffer);
792   ictx->io_work_queue->write(0, buffer.size(), bufferlist{write_bl}, 0);
793
794   C_SaferCond cond_ctx;
795   auto c = librbd::io::AioCompletion::create(&cond_ctx);
796   c->get();
797   ictx->io_work_queue->aio_write(c, 0, buffer.size(), bufferlist{write_bl}, 0);
798
799   librbd::NoOpProgressContext no_op;
800   ASSERT_EQ(0, ictx->operations->resize(m_image_size >> 1, true, no_op));
801
802   ASSERT_TRUE(c->is_complete());
803   ASSERT_EQ(0, c->wait_for_complete());
804   ASSERT_EQ(0, cond_ctx.wait());
805   c->put();
806 }
807
808 TEST_F(TestInternal, ImageOptions) {
809   rbd_image_options_t opts1 = NULL, opts2 = NULL;
810   uint64_t uint64_val1 = 10, uint64_val2 = 0;
811   std::string string_val1;
812
813   librbd::image_options_create(&opts1);
814   ASSERT_NE((rbd_image_options_t)NULL, opts1);
815   ASSERT_TRUE(librbd::image_options_is_empty(opts1));
816
817   ASSERT_EQ(-EINVAL, librbd::image_options_get(opts1, RBD_IMAGE_OPTION_FEATURES,
818           &string_val1));
819   ASSERT_EQ(-ENOENT, librbd::image_options_get(opts1, RBD_IMAGE_OPTION_FEATURES,
820           &uint64_val1));
821
822   ASSERT_EQ(-EINVAL, librbd::image_options_set(opts1, RBD_IMAGE_OPTION_FEATURES,
823           string_val1));
824
825   ASSERT_EQ(0, librbd::image_options_set(opts1, RBD_IMAGE_OPTION_FEATURES,
826           uint64_val1));
827   ASSERT_FALSE(librbd::image_options_is_empty(opts1));
828   ASSERT_EQ(0, librbd::image_options_get(opts1, RBD_IMAGE_OPTION_FEATURES,
829           &uint64_val2));
830   ASSERT_EQ(uint64_val1, uint64_val2);
831
832   librbd::image_options_create_ref(&opts2, opts1);
833   ASSERT_NE((rbd_image_options_t)NULL, opts2);
834   ASSERT_FALSE(librbd::image_options_is_empty(opts2));
835
836   uint64_val2 = 0;
837   ASSERT_NE(uint64_val1, uint64_val2);
838   ASSERT_EQ(0, librbd::image_options_get(opts2, RBD_IMAGE_OPTION_FEATURES,
839           &uint64_val2));
840   ASSERT_EQ(uint64_val1, uint64_val2);
841
842   uint64_val2++;
843   ASSERT_NE(uint64_val1, uint64_val2);
844   ASSERT_EQ(-ENOENT, librbd::image_options_get(opts1, RBD_IMAGE_OPTION_ORDER,
845           &uint64_val1));
846   ASSERT_EQ(-ENOENT, librbd::image_options_get(opts2, RBD_IMAGE_OPTION_ORDER,
847           &uint64_val2));
848   ASSERT_EQ(0, librbd::image_options_set(opts2, RBD_IMAGE_OPTION_ORDER,
849           uint64_val2));
850   ASSERT_EQ(0, librbd::image_options_get(opts1, RBD_IMAGE_OPTION_ORDER,
851           &uint64_val1));
852   ASSERT_EQ(0, librbd::image_options_get(opts2, RBD_IMAGE_OPTION_ORDER,
853           &uint64_val2));
854   ASSERT_EQ(uint64_val1, uint64_val2);
855
856   librbd::image_options_destroy(opts1);
857
858   uint64_val2++;
859   ASSERT_NE(uint64_val1, uint64_val2);
860   ASSERT_EQ(0, librbd::image_options_get(opts2, RBD_IMAGE_OPTION_ORDER,
861           &uint64_val2));
862   ASSERT_EQ(uint64_val1, uint64_val2);
863
864   ASSERT_EQ(0, librbd::image_options_unset(opts2, RBD_IMAGE_OPTION_ORDER));
865   ASSERT_EQ(-ENOENT, librbd::image_options_unset(opts2, RBD_IMAGE_OPTION_ORDER));
866
867   librbd::image_options_clear(opts2);
868   ASSERT_EQ(-ENOENT, librbd::image_options_get(opts2, RBD_IMAGE_OPTION_FEATURES,
869           &uint64_val2));
870   ASSERT_TRUE(librbd::image_options_is_empty(opts2));
871
872   librbd::image_options_destroy(opts2);
873 }
874
875 TEST_F(TestInternal, WriteFullCopyup) {
876   REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
877
878   librbd::ImageCtx *ictx;
879   ASSERT_EQ(0, open_image(m_image_name, &ictx));
880
881   librbd::NoOpProgressContext no_op;
882   ASSERT_EQ(0, ictx->operations->resize(1 << ictx->order, true, no_op));
883
884   bufferlist bl;
885   bl.append(std::string(1 << ictx->order, '1'));
886   ASSERT_EQ((ssize_t)bl.length(),
887             ictx->io_work_queue->write(0, bl.length(), bufferlist{bl}, 0));
888   ASSERT_EQ(0, librbd::flush(ictx));
889
890   ASSERT_EQ(0, create_snapshot("snap1", true));
891
892   std::string clone_name = get_temp_image_name();
893   int order = ictx->order;
894   ASSERT_EQ(0, librbd::clone(m_ioctx, m_image_name.c_str(), "snap1", m_ioctx,
895                              clone_name.c_str(), ictx->features, &order, 0, 0));
896
897   TestInternal *parent = this;
898   librbd::ImageCtx *ictx2 = NULL;
899   BOOST_SCOPE_EXIT( (&m_ioctx) (clone_name) (parent) (&ictx2) ) {
900     if (ictx2 != NULL) {
901       ictx2->operations->snap_remove(cls::rbd::UserSnapshotNamespace(),
902                                      "snap1");
903       parent->close_image(ictx2);
904     }
905
906     librbd::NoOpProgressContext remove_no_op;
907     ASSERT_EQ(0, librbd::remove(m_ioctx, clone_name, "", remove_no_op));
908   } BOOST_SCOPE_EXIT_END;
909
910   ASSERT_EQ(0, open_image(clone_name, &ictx2));
911   ASSERT_EQ(0, ictx2->operations->snap_create(cls::rbd::UserSnapshotNamespace(),
912                                               "snap1"));
913
914   bufferlist write_full_bl;
915   write_full_bl.append(std::string(1 << ictx2->order, '2'));
916   ASSERT_EQ((ssize_t)write_full_bl.length(),
917             ictx2->io_work_queue->write(0, write_full_bl.length(),
918                                         bufferlist{write_full_bl}, 0));
919
920   ASSERT_EQ(0, ictx2->operations->flatten(no_op));
921
922   bufferptr read_ptr(bl.length());
923   bufferlist read_bl;
924   read_bl.push_back(read_ptr);
925
926   librbd::io::ReadResult read_result{&read_bl};
927   ASSERT_EQ((ssize_t)read_bl.length(),
928             ictx2->io_work_queue->read(0, read_bl.length(),
929                                        librbd::io::ReadResult{read_result}, 0));
930   ASSERT_TRUE(write_full_bl.contents_equal(read_bl));
931
932   ASSERT_EQ(0, librbd::snap_set(ictx2,
933                                 cls::rbd::UserSnapshotNamespace(),
934                                 "snap1"));
935   ASSERT_EQ((ssize_t)read_bl.length(),
936             ictx2->io_work_queue->read(0, read_bl.length(),
937                                        librbd::io::ReadResult{read_result}, 0));
938   ASSERT_TRUE(bl.contents_equal(read_bl));
939 }
940
941 TEST_F(TestInternal, RemoveById) {
942   REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
943
944   librbd::ImageCtx *ictx;
945   ASSERT_EQ(0, open_image(m_image_name, &ictx));
946
947   std::string image_id = ictx->id;
948   close_image(ictx);
949
950   librbd::NoOpProgressContext remove_no_op;
951   ASSERT_EQ(0, librbd::remove(m_ioctx, "", image_id, remove_no_op));
952 }
953
954 static int iterate_cb(uint64_t off, size_t len, int exists, void *arg)
955 {
956   interval_set<uint64_t> *diff = static_cast<interval_set<uint64_t> *>(arg);
957   diff->insert(off, len);
958   return 0;
959 }
960
961 TEST_F(TestInternal, DiffIterateCloneOverwrite) {
962   REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
963
964   librbd::RBD rbd;
965   librbd::Image image;
966   uint64_t size = 20 << 20;
967   int order = 0;
968
969   ASSERT_EQ(0, rbd.open(m_ioctx, image, m_image_name.c_str(), NULL));
970
971   bufferlist bl;
972   bl.append(std::string(4096, '1'));
973   ASSERT_EQ(4096, image.write(0, 4096, bl));
974
975   interval_set<uint64_t> one;
976   ASSERT_EQ(0, image.diff_iterate2(NULL, 0, size, false, false, iterate_cb,
977                                    (void *)&one));
978   ASSERT_EQ(0, image.snap_create("one"));
979   ASSERT_EQ(0, image.snap_protect("one"));
980
981   std::string clone_name = this->get_temp_image_name();
982   ASSERT_EQ(0, rbd.clone(m_ioctx, m_image_name.c_str(), "one", m_ioctx,
983                          clone_name.c_str(), RBD_FEATURE_LAYERING, &order));
984
985   librbd::ImageCtx *ictx;
986   ASSERT_EQ(0, open_image(clone_name, &ictx));
987   ASSERT_EQ(0, snap_create(*ictx, "one"));
988   ASSERT_EQ(0,
989             ictx->operations->snap_protect(cls::rbd::UserSnapshotNamespace(),
990                                            "one"));
991
992   // Simulate a client that doesn't support deep flatten (old librbd / krbd)
993   // which will copy up the full object from the parent
994   std::string oid = ictx->object_prefix + ".0000000000000000";
995   librados::IoCtx io_ctx;
996   io_ctx.dup(m_ioctx);
997   io_ctx.selfmanaged_snap_set_write_ctx(ictx->snapc.seq, ictx->snaps);
998   ASSERT_EQ(0, io_ctx.write(oid, bl, 4096, 4096));
999
1000   interval_set<uint64_t> diff;
1001   ASSERT_EQ(0, librbd::snap_set(ictx, cls::rbd::UserSnapshotNamespace(), "one"));
1002   ASSERT_EQ(0, librbd::api::DiffIterate<>::diff_iterate(
1003     ictx, cls::rbd::UserSnapshotNamespace(), nullptr, 0, size, true, false,
1004     iterate_cb, (void *)&diff));
1005   ASSERT_EQ(one, diff);
1006 }
1007
1008 TEST_F(TestInternal, TestCoR)
1009 {
1010   REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
1011
1012   std::string config_value;
1013   ASSERT_EQ(0, _rados.conf_get("rbd_clone_copy_on_read", config_value));
1014   if (config_value == "false") {
1015     std::cout << "SKIPPING due to disabled rbd_copy_on_read" << std::endl;
1016     return;
1017   }
1018
1019   m_image_name = get_temp_image_name();
1020   m_image_size = 4 << 20;
1021
1022   int order = 12; // smallest object size is 4K
1023   uint64_t features;
1024   ASSERT_TRUE(get_features(&features));
1025
1026   ASSERT_EQ(0, create_image_full_pp(m_rbd, m_ioctx, m_image_name, m_image_size,
1027                                     features, false, &order));
1028
1029   librbd::Image image;
1030   ASSERT_EQ(0, m_rbd.open(m_ioctx, image, m_image_name.c_str(), NULL));
1031
1032   librbd::image_info_t info;
1033   ASSERT_EQ(0, image.stat(info, sizeof(info)));
1034
1035   const int object_num = info.size / info.obj_size;
1036   printf("made parent image \"%s\": %ldK (%d * %" PRIu64 "K)\n", m_image_name.c_str(),
1037          (unsigned long)m_image_size, object_num, info.obj_size/1024);
1038
1039   // write something into parent
1040   char test_data[TEST_IO_SIZE + 1];
1041   for (int i = 0; i < TEST_IO_SIZE; ++i) {
1042     test_data[i] = (char) (rand() % (126 - 33) + 33);
1043   }
1044   test_data[TEST_IO_SIZE] = '\0';
1045
1046   // generate a random map which covers every objects with random
1047   // offset
1048   map<uint64_t, uint64_t> write_tracker;
1049   generate_random_iomap(image, object_num, info.obj_size, 100, write_tracker);
1050
1051   printf("generated random write map:\n");
1052   for (map<uint64_t, uint64_t>::iterator itr = write_tracker.begin();
1053        itr != write_tracker.end(); ++itr)
1054     printf("\t [%-8ld, %-8ld]\n",
1055            (unsigned long)itr->first, (unsigned long)itr->second);
1056
1057   bufferlist bl;
1058   bl.append(test_data, TEST_IO_SIZE);
1059
1060   printf("write data based on random map\n");
1061   for (map<uint64_t, uint64_t>::iterator itr = write_tracker.begin();
1062        itr != write_tracker.end(); ++itr) {
1063     printf("\twrite object-%-4ld\t\n", (unsigned long)itr->first);
1064     ASSERT_EQ(TEST_IO_SIZE, image.write(itr->second, TEST_IO_SIZE, bl));
1065   }
1066
1067   ASSERT_EQ(0, image.flush());
1068
1069   bufferlist readbl;
1070   printf("verify written data by reading\n");
1071   {
1072     map<uint64_t, uint64_t>::iterator itr = write_tracker.begin();
1073     printf("\tread object-%-4ld\n", (unsigned long)itr->first);
1074     ASSERT_EQ(TEST_IO_SIZE, image.read(itr->second, TEST_IO_SIZE, readbl));
1075     ASSERT_TRUE(readbl.contents_equal(bl));
1076   }
1077
1078   int64_t data_pool_id = image.get_data_pool_id();
1079   rados_ioctx_t d_ioctx;
1080   ASSERT_EQ(0, rados_wait_for_latest_osdmap(_cluster));
1081   ASSERT_EQ(0, rados_ioctx_create2(_cluster, data_pool_id, &d_ioctx));
1082
1083   std::string block_name_prefix = image.get_block_name_prefix() + ".";
1084
1085   const char *entry;
1086   rados_list_ctx_t list_ctx;
1087   set<string> obj_checker;
1088   ASSERT_EQ(0, rados_nobjects_list_open(d_ioctx, &list_ctx));
1089   while (rados_nobjects_list_next(list_ctx, &entry, NULL, NULL) != -ENOENT) {
1090     if (boost::starts_with(entry, block_name_prefix)) {
1091       const char *block_name_suffix = entry + block_name_prefix.length();
1092       obj_checker.insert(block_name_suffix);
1093     }
1094   }
1095   rados_nobjects_list_close(list_ctx);
1096
1097   std::string snapname = "snap";
1098   std::string clonename = get_temp_image_name();
1099   ASSERT_EQ(0, image.snap_create(snapname.c_str()));
1100   ASSERT_EQ(0, image.close());
1101   ASSERT_EQ(0, m_rbd.open(m_ioctx, image, m_image_name.c_str(), snapname.c_str()));
1102   ASSERT_EQ(0, image.snap_protect(snapname.c_str()));
1103   printf("made snapshot \"%s@parent_snap\" and protect it\n", m_image_name.c_str());
1104
1105   ASSERT_EQ(0, clone_image_pp(m_rbd, image, m_ioctx, m_image_name.c_str(), snapname.c_str(),
1106                               m_ioctx, clonename.c_str(), features));
1107   ASSERT_EQ(0, image.close());
1108   ASSERT_EQ(0, m_rbd.open(m_ioctx, image, clonename.c_str(), NULL));
1109   printf("made and opened clone \"%s\"\n", clonename.c_str());
1110
1111   printf("read from \"child\"\n");
1112   {
1113     map<uint64_t, uint64_t>::iterator itr = write_tracker.begin();
1114     printf("\tread object-%-4ld\n", (unsigned long)itr->first);
1115     ASSERT_EQ(TEST_IO_SIZE, image.read(itr->second, TEST_IO_SIZE, readbl));
1116     ASSERT_TRUE(readbl.contents_equal(bl));
1117   }
1118
1119   for (map<uint64_t, uint64_t>::iterator itr = write_tracker.begin();
1120        itr != write_tracker.end(); ++itr) {
1121     printf("\tread object-%-4ld\n", (unsigned long)itr->first);
1122     ASSERT_EQ(TEST_IO_SIZE, image.read(itr->second, TEST_IO_SIZE, readbl));
1123     ASSERT_TRUE(readbl.contents_equal(bl));
1124   }
1125
1126   printf("read again reversely\n");
1127   for (map<uint64_t, uint64_t>::iterator itr = --write_tracker.end();
1128        itr != write_tracker.begin(); --itr) {
1129     printf("\tread object-%-4ld\n", (unsigned long)itr->first);
1130     ASSERT_EQ(TEST_IO_SIZE, image.read(itr->second, TEST_IO_SIZE, readbl));
1131     ASSERT_TRUE(readbl.contents_equal(bl));
1132   }
1133
1134   // close child to flush all copy-on-read
1135   ASSERT_EQ(0, image.close());
1136
1137   printf("check whether child image has the same set of objects as parent\n");
1138   ASSERT_EQ(0, m_rbd.open(m_ioctx, image, clonename.c_str(), NULL));
1139   block_name_prefix = image.get_block_name_prefix() + ".";
1140
1141   ASSERT_EQ(0, rados_nobjects_list_open(d_ioctx, &list_ctx));
1142   while (rados_nobjects_list_next(list_ctx, &entry, NULL, NULL) != -ENOENT) {
1143     if (boost::starts_with(entry, block_name_prefix)) {
1144       const char *block_name_suffix = entry + block_name_prefix.length();
1145       set<string>::iterator it = obj_checker.find(block_name_suffix);
1146       ASSERT_TRUE(it != obj_checker.end());
1147       obj_checker.erase(it);
1148     }
1149   }
1150   rados_nobjects_list_close(list_ctx);
1151   ASSERT_TRUE(obj_checker.empty());
1152   ASSERT_EQ(0, image.close());
1153
1154   rados_ioctx_destroy(d_ioctx);
1155 }
1156
1157 TEST_F(TestInternal, FlattenNoEmptyObjects)
1158 {
1159   REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
1160
1161   m_image_name = get_temp_image_name();
1162   m_image_size = 4 << 20;
1163
1164   int order = 12; // smallest object size is 4K
1165   uint64_t features;
1166   ASSERT_TRUE(get_features(&features));
1167
1168   ASSERT_EQ(0, create_image_full_pp(m_rbd, m_ioctx, m_image_name, m_image_size,
1169                                     features, false, &order));
1170
1171   librbd::Image image;
1172   ASSERT_EQ(0, m_rbd.open(m_ioctx, image, m_image_name.c_str(), NULL));
1173
1174   librbd::image_info_t info;
1175   ASSERT_EQ(0, image.stat(info, sizeof(info)));
1176
1177   const int object_num = info.size / info.obj_size;
1178   printf("made parent image \"%s\": %" PRIu64 "K (%d * %" PRIu64 "K)\n",
1179          m_image_name.c_str(), m_image_size, object_num, info.obj_size/1024);
1180
1181   // write something into parent
1182   char test_data[TEST_IO_SIZE + 1];
1183   for (int i = 0; i < TEST_IO_SIZE; ++i) {
1184     test_data[i] = (char) (rand() % (126 - 33) + 33);
1185   }
1186   test_data[TEST_IO_SIZE] = '\0';
1187
1188   // generate a random map which covers every objects with random
1189   // offset
1190   map<uint64_t, uint64_t> write_tracker;
1191   generate_random_iomap(image, object_num, info.obj_size, 100, write_tracker);
1192
1193   printf("generated random write map:\n");
1194   for (map<uint64_t, uint64_t>::iterator itr = write_tracker.begin();
1195        itr != write_tracker.end(); ++itr)
1196     printf("\t [%-8ld, %-8ld]\n",
1197            (unsigned long)itr->first, (unsigned long)itr->second);
1198
1199   bufferlist bl;
1200   bl.append(test_data, TEST_IO_SIZE);
1201
1202   printf("write data based on random map\n");
1203   for (map<uint64_t, uint64_t>::iterator itr = write_tracker.begin();
1204        itr != write_tracker.end(); ++itr) {
1205     printf("\twrite object-%-4ld\t\n", (unsigned long)itr->first);
1206     ASSERT_EQ(TEST_IO_SIZE, image.write(itr->second, TEST_IO_SIZE, bl));
1207   }
1208
1209   ASSERT_EQ(0, image.flush());
1210
1211   bufferlist readbl;
1212   printf("verify written data by reading\n");
1213   {
1214     map<uint64_t, uint64_t>::iterator itr = write_tracker.begin();
1215     printf("\tread object-%-4ld\n", (unsigned long)itr->first);
1216     ASSERT_EQ(TEST_IO_SIZE, image.read(itr->second, TEST_IO_SIZE, readbl));
1217     ASSERT_TRUE(readbl.contents_equal(bl));
1218   }
1219
1220   int64_t data_pool_id = image.get_data_pool_id();
1221   rados_ioctx_t d_ioctx;
1222   ASSERT_EQ(0, rados_wait_for_latest_osdmap(_cluster));
1223   ASSERT_EQ(0, rados_ioctx_create2(_cluster, data_pool_id, &d_ioctx));
1224
1225   std::string block_name_prefix = image.get_block_name_prefix() + ".";
1226
1227   const char *entry;
1228   rados_list_ctx_t list_ctx;
1229   set<string> obj_checker;
1230   ASSERT_EQ(0, rados_nobjects_list_open(d_ioctx, &list_ctx));
1231   while (rados_nobjects_list_next(list_ctx, &entry, NULL, NULL) != -ENOENT) {
1232     if (boost::starts_with(entry, block_name_prefix)) {
1233       const char *block_name_suffix = entry + block_name_prefix.length();
1234       obj_checker.insert(block_name_suffix);
1235     }
1236   }
1237   rados_nobjects_list_close(list_ctx);
1238
1239   std::string snapname = "snap";
1240   std::string clonename = get_temp_image_name();
1241   ASSERT_EQ(0, image.snap_create(snapname.c_str()));
1242   ASSERT_EQ(0, image.close());
1243   ASSERT_EQ(0, m_rbd.open(m_ioctx, image, m_image_name.c_str(), snapname.c_str()));
1244   ASSERT_EQ(0, image.snap_protect(snapname.c_str()));
1245   printf("made snapshot \"%s@parent_snap\" and protect it\n", m_image_name.c_str());
1246
1247   ASSERT_EQ(0, clone_image_pp(m_rbd, image, m_ioctx, m_image_name.c_str(), snapname.c_str(),
1248                               m_ioctx, clonename.c_str(), features));
1249   ASSERT_EQ(0, image.close());
1250
1251   ASSERT_EQ(0, m_rbd.open(m_ioctx, image, clonename.c_str(), NULL));
1252   printf("made and opened clone \"%s\"\n", clonename.c_str());
1253
1254   printf("flattening clone: \"%s\"\n", clonename.c_str());
1255   ASSERT_EQ(0, image.flatten());
1256
1257   printf("check whether child image has the same set of objects as parent\n");
1258   block_name_prefix = image.get_block_name_prefix() + ".";
1259
1260   ASSERT_EQ(0, rados_nobjects_list_open(d_ioctx, &list_ctx));
1261   while (rados_nobjects_list_next(list_ctx, &entry, NULL, NULL) != -ENOENT) {
1262     if (boost::starts_with(entry, block_name_prefix)) {
1263       const char *block_name_suffix = entry + block_name_prefix.length();
1264       set<string>::iterator it = obj_checker.find(block_name_suffix);
1265       ASSERT_TRUE(it != obj_checker.end());
1266       obj_checker.erase(it);
1267     }
1268   }
1269   rados_nobjects_list_close(list_ctx);
1270   ASSERT_TRUE(obj_checker.empty());
1271   ASSERT_EQ(0, image.close());
1272
1273   rados_ioctx_destroy(d_ioctx);
1274 }