Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / librados / tier.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #include "gtest/gtest.h"
4
5 #include "mds/mdstypes.h"
6 #include "include/buffer.h"
7 #include "include/rbd_types.h"
8 #include "include/rados/librados.h"
9 #include "include/rados/librados.hpp"
10 #include "include/stringify.h"
11 #include "include/types.h"
12 #include "global/global_context.h"
13 #include "common/Cond.h"
14 #include "test/librados/test.h"
15 #include "test/librados/TestCase.h"
16 #include "json_spirit/json_spirit.h"
17
18 #include "osd/HitSet.h"
19
20 #include <errno.h>
21 #include <map>
22 #include <sstream>
23 #include <string>
24
25 using namespace librados;
26 using std::map;
27 using std::ostringstream;
28 using std::string;
29
30 typedef RadosTestPP LibRadosTierPP;
31 typedef RadosTestECPP LibRadosTierECPP;
32
33 void flush_evict_all(librados::Rados& cluster, librados::IoCtx& cache_ioctx)
34 {
35   bufferlist inbl;
36   cache_ioctx.set_namespace(all_nspaces);
37   for (NObjectIterator it = cache_ioctx.nobjects_begin();
38        it != cache_ioctx.nobjects_end(); ++it) {
39     cache_ioctx.locator_set_key(it->get_locator());
40     cache_ioctx.set_namespace(it->get_nspace());
41     {
42       ObjectReadOperation op;
43       op.cache_flush();
44       librados::AioCompletion *completion = cluster.aio_create_completion();
45       cache_ioctx.aio_operate(
46         it->get_oid(), completion, &op,
47         librados::OPERATION_IGNORE_OVERLAY, NULL);
48       completion->wait_for_safe();
49       completion->get_return_value();
50       completion->release();
51     }
52     {
53       ObjectReadOperation op;
54       op.cache_evict();
55       librados::AioCompletion *completion = cluster.aio_create_completion();
56       cache_ioctx.aio_operate(
57         it->get_oid(), completion, &op,
58         librados::OPERATION_IGNORE_OVERLAY, NULL);
59       completion->wait_for_safe();
60       completion->get_return_value();
61       completion->release();
62     }
63   }
64 }
65
66 class LibRadosTwoPoolsPP : public RadosTestPP
67 {
68 public:
69   LibRadosTwoPoolsPP() {};
70   ~LibRadosTwoPoolsPP() override {};
71 protected:
72   static void SetUpTestCase() {
73     pool_name = get_temp_pool_name();
74     ASSERT_EQ("", create_one_pool_pp(pool_name, s_cluster));
75   }
76   static void TearDownTestCase() {
77     ASSERT_EQ(0, destroy_one_pool_pp(pool_name, s_cluster));
78   }
79   static std::string cache_pool_name;
80
81   void SetUp() override {
82     cache_pool_name = get_temp_pool_name();
83     ASSERT_EQ(0, s_cluster.pool_create(cache_pool_name.c_str()));
84     RadosTestPP::SetUp();
85
86     ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx));
87     cache_ioctx.application_enable("rados", true);
88     cache_ioctx.set_namespace(nspace);
89   }
90   void TearDown() override {
91     // flush + evict cache
92     flush_evict_all(cluster, cache_ioctx);
93
94     bufferlist inbl;
95     // tear down tiers
96     ASSERT_EQ(0, cluster.mon_command(
97       "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + pool_name +
98       "\"}",
99       inbl, NULL, NULL));
100     ASSERT_EQ(0, cluster.mon_command(
101       "{\"prefix\": \"osd tier remove\", \"pool\": \"" + pool_name +
102       "\", \"tierpool\": \"" + cache_pool_name + "\"}",
103     inbl, NULL, NULL));
104
105     // wait for maps to settle before next test
106     cluster.wait_for_latest_osdmap();
107
108     RadosTestPP::TearDown();
109
110     cleanup_default_namespace(cache_ioctx);
111     cleanup_namespace(cache_ioctx, nspace);
112
113     cache_ioctx.close();
114     ASSERT_EQ(0, s_cluster.pool_delete(cache_pool_name.c_str()));
115   }
116   librados::IoCtx cache_ioctx;
117 };
118
119 class Completions
120 {
121 public:
122   Completions() = default;
123   librados::AioCompletion* getCompletion() {
124     librados::AioCompletion* comp = librados::Rados::aio_create_completion();
125     m_completions.push_back(comp);
126     return comp;
127   }
128
129   ~Completions() {
130     for (auto& comp : m_completions) {
131       comp->release();
132     }
133   }
134
135 private:
136   vector<librados::AioCompletion *> m_completions;
137 };
138
139 Completions completions;
140
141 std::string LibRadosTwoPoolsPP::cache_pool_name;
142
143 TEST_F(LibRadosTierPP, Dirty) {
144   {
145     ObjectWriteOperation op;
146     op.undirty();
147     ASSERT_EQ(0, ioctx.operate("foo", &op)); // still get 0 if it dne
148   }
149   {
150     ObjectWriteOperation op;
151     op.create(true);
152     ASSERT_EQ(0, ioctx.operate("foo", &op));
153   }
154   {
155     bool dirty = false;
156     int r = -1;
157     ObjectReadOperation op;
158     op.is_dirty(&dirty, &r);
159     ASSERT_EQ(0, ioctx.operate("foo", &op, NULL));
160     ASSERT_TRUE(dirty);
161     ASSERT_EQ(0, r);
162   }
163   {
164     ObjectWriteOperation op;
165     op.undirty();
166     ASSERT_EQ(0, ioctx.operate("foo", &op));
167   }
168   {
169     ObjectWriteOperation op;
170     op.undirty();
171     ASSERT_EQ(0, ioctx.operate("foo", &op));  // still 0 if already clean
172   }
173   {
174     bool dirty = false;
175     int r = -1;
176     ObjectReadOperation op;
177     op.is_dirty(&dirty, &r);
178     ASSERT_EQ(0, ioctx.operate("foo", &op, NULL));
179     ASSERT_FALSE(dirty);
180     ASSERT_EQ(0, r);
181   }
182   {
183     ObjectWriteOperation op;
184     op.truncate(0);  // still a write even tho it is a no-op
185     ASSERT_EQ(0, ioctx.operate("foo", &op));
186   }
187   {
188     bool dirty = false;
189     int r = -1;
190     ObjectReadOperation op;
191     op.is_dirty(&dirty, &r);
192     ASSERT_EQ(0, ioctx.operate("foo", &op, NULL));
193     ASSERT_TRUE(dirty);
194     ASSERT_EQ(0, r);
195   }
196 }
197
198 TEST_F(LibRadosTwoPoolsPP, Overlay) {
199   // create objects
200   {
201     bufferlist bl;
202     bl.append("base");
203     ObjectWriteOperation op;
204     op.write_full(bl);
205     ASSERT_EQ(0, ioctx.operate("foo", &op));
206   }
207   {
208     bufferlist bl;
209     bl.append("cache");
210     ObjectWriteOperation op;
211     op.write_full(bl);
212     ASSERT_EQ(0, cache_ioctx.operate("foo", &op));
213   }
214
215   // configure cache
216   bufferlist inbl;
217   ASSERT_EQ(0, cluster.mon_command(
218     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
219     "\", \"tierpool\": \"" + cache_pool_name +
220     "\", \"force_nonempty\": \"--force-nonempty\" }",
221     inbl, NULL, NULL));
222   ASSERT_EQ(0, cluster.mon_command(
223     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
224     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
225     inbl, NULL, NULL));
226
227   // wait for maps to settle
228   cluster.wait_for_latest_osdmap();
229
230   // by default, the overlay sends us to cache pool
231   {
232     bufferlist bl;
233     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
234     ASSERT_EQ('c', bl[0]);
235   }
236   {
237     bufferlist bl;
238     ASSERT_EQ(1, cache_ioctx.read("foo", bl, 1, 0));
239     ASSERT_EQ('c', bl[0]);
240   }
241
242   // unless we say otherwise
243   {
244     bufferlist bl;
245     ObjectReadOperation op;
246     op.read(0, 1, &bl, NULL);
247     librados::AioCompletion *completion = cluster.aio_create_completion();
248     ASSERT_EQ(0, ioctx.aio_operate(
249         "foo", completion, &op,
250         librados::OPERATION_IGNORE_OVERLAY, NULL));
251     completion->wait_for_safe();
252     ASSERT_EQ(0, completion->get_return_value());
253     completion->release();
254     ASSERT_EQ('b', bl[0]);
255   }
256 }
257
258 TEST_F(LibRadosTwoPoolsPP, Promote) {
259   // create object
260   {
261     bufferlist bl;
262     bl.append("hi there");
263     ObjectWriteOperation op;
264     op.write_full(bl);
265     ASSERT_EQ(0, ioctx.operate("foo", &op));
266   }
267
268   // configure cache
269   bufferlist inbl;
270   ASSERT_EQ(0, cluster.mon_command(
271     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
272     "\", \"tierpool\": \"" + cache_pool_name +
273     "\", \"force_nonempty\": \"--force-nonempty\" }",
274     inbl, NULL, NULL));
275   ASSERT_EQ(0, cluster.mon_command(
276     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
277     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
278     inbl, NULL, NULL));
279   ASSERT_EQ(0, cluster.mon_command(
280     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
281     "\", \"mode\": \"writeback\"}",
282     inbl, NULL, NULL));
283
284   // wait for maps to settle
285   cluster.wait_for_latest_osdmap();
286
287   // read, trigger a promote
288   {
289     bufferlist bl;
290     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
291   }
292
293   // read, trigger a whiteout
294   {
295     bufferlist bl;
296     ASSERT_EQ(-ENOENT, ioctx.read("bar", bl, 1, 0));
297     ASSERT_EQ(-ENOENT, ioctx.read("bar", bl, 1, 0));
298   }
299
300   // verify the object is present in the cache tier
301   {
302     NObjectIterator it = cache_ioctx.nobjects_begin();
303     ASSERT_TRUE(it != cache_ioctx.nobjects_end());
304     ASSERT_TRUE(it->get_oid() == string("foo") || it->get_oid() == string("bar"));
305     ++it;
306     ASSERT_TRUE(it->get_oid() == string("foo") || it->get_oid() == string("bar"));
307     ++it;
308     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
309   }
310 }
311
312 TEST_F(LibRadosTwoPoolsPP, PromoteSnap) {
313   // create object
314   {
315     bufferlist bl;
316     bl.append("hi there");
317     ObjectWriteOperation op;
318     op.write_full(bl);
319     ASSERT_EQ(0, ioctx.operate("foo", &op));
320   }
321   {
322     bufferlist bl;
323     bl.append("hi there");
324     ObjectWriteOperation op;
325     op.write_full(bl);
326     ASSERT_EQ(0, ioctx.operate("bar", &op));
327   }
328   {
329     bufferlist bl;
330     bl.append("hi there");
331     ObjectWriteOperation op;
332     op.write_full(bl);
333     ASSERT_EQ(0, ioctx.operate("baz", &op));
334   }
335   {
336     bufferlist bl;
337     bl.append("hi there");
338     ObjectWriteOperation op;
339     op.write_full(bl);
340     ASSERT_EQ(0, ioctx.operate("bam", &op));
341   }
342
343   // create a snapshot, clone
344   vector<uint64_t> my_snaps(1);
345   ASSERT_EQ(0, ioctx.selfmanaged_snap_create(&my_snaps[0]));
346   ASSERT_EQ(0, ioctx.selfmanaged_snap_set_write_ctx(my_snaps[0],
347                                                          my_snaps));
348   {
349     bufferlist bl;
350     bl.append("ciao!");
351     ObjectWriteOperation op;
352     op.write_full(bl);
353     ASSERT_EQ(0, ioctx.operate("foo", &op));
354   }
355   {
356     bufferlist bl;
357     bl.append("ciao!");
358     ObjectWriteOperation op;
359     op.write_full(bl);
360     ASSERT_EQ(0, ioctx.operate("bar", &op));
361   }
362   {
363     ObjectWriteOperation op;
364     op.remove();
365     ASSERT_EQ(0, ioctx.operate("baz", &op));
366   }
367   {
368     bufferlist bl;
369     bl.append("ciao!");
370     ObjectWriteOperation op;
371     op.write_full(bl);
372     ASSERT_EQ(0, ioctx.operate("bam", &op));
373   }
374
375   // configure cache
376   bufferlist inbl;
377   ASSERT_EQ(0, cluster.mon_command(
378     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
379     "\", \"tierpool\": \"" + cache_pool_name +
380     "\", \"force_nonempty\": \"--force-nonempty\" }",
381     inbl, NULL, NULL));
382   ASSERT_EQ(0, cluster.mon_command(
383     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
384     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
385     inbl, NULL, NULL));
386   ASSERT_EQ(0, cluster.mon_command(
387     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
388     "\", \"mode\": \"writeback\"}",
389     inbl, NULL, NULL));
390
391   // wait for maps to settle
392   cluster.wait_for_latest_osdmap();
393
394   // read, trigger a promote on the head
395   {
396     bufferlist bl;
397     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
398     ASSERT_EQ('c', bl[0]);
399   }
400   {
401     bufferlist bl;
402     ASSERT_EQ(1, ioctx.read("bam", bl, 1, 0));
403     ASSERT_EQ('c', bl[0]);
404   }
405
406   ioctx.snap_set_read(my_snaps[0]);
407
408   // read foo snap
409   {
410     bufferlist bl;
411     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
412     ASSERT_EQ('h', bl[0]);
413   }
414
415   // read bar snap
416   {
417     bufferlist bl;
418     ASSERT_EQ(1, ioctx.read("bar", bl, 1, 0));
419     ASSERT_EQ('h', bl[0]);
420   }
421
422   // read baz snap
423   {
424     bufferlist bl;
425     ASSERT_EQ(1, ioctx.read("baz", bl, 1, 0));
426     ASSERT_EQ('h', bl[0]);
427   }
428
429   ioctx.snap_set_read(librados::SNAP_HEAD);
430
431   // read foo
432   {
433     bufferlist bl;
434     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
435     ASSERT_EQ('c', bl[0]);
436   }
437
438   // read bar
439   {
440     bufferlist bl;
441     ASSERT_EQ(1, ioctx.read("bar", bl, 1, 0));
442     ASSERT_EQ('c', bl[0]);
443   }
444
445   // read baz
446   {
447     bufferlist bl;
448     ASSERT_EQ(-ENOENT, ioctx.read("baz", bl, 1, 0));
449   }
450
451   // cleanup
452   ioctx.selfmanaged_snap_remove(my_snaps[0]);
453 }
454
455 TEST_F(LibRadosTwoPoolsPP, PromoteSnapScrub) {
456   int num = 100;
457
458   // create objects
459   for (int i=0; i<num; ++i) {
460     bufferlist bl;
461     bl.append("hi there");
462     ObjectWriteOperation op;
463     op.write_full(bl);
464     ASSERT_EQ(0, ioctx.operate(string("foo") + stringify(i), &op));
465   }
466
467   vector<uint64_t> my_snaps;
468   for (int snap=0; snap<4; ++snap) {
469     // create a snapshot, clone
470     vector<uint64_t> ns(1);
471     ns.insert(ns.end(), my_snaps.begin(), my_snaps.end());
472     my_snaps.swap(ns);
473     ASSERT_EQ(0, ioctx.selfmanaged_snap_create(&my_snaps[0]));
474     cout << "my_snaps " << my_snaps << std::endl;
475     ASSERT_EQ(0, ioctx.selfmanaged_snap_set_write_ctx(my_snaps[0],
476                                                       my_snaps));
477     for (int i=0; i<num; ++i) {
478       bufferlist bl;
479       bl.append(string("ciao! snap") + stringify(snap));
480       ObjectWriteOperation op;
481       op.write_full(bl);
482       ASSERT_EQ(0, ioctx.operate(string("foo") + stringify(i), &op));
483     }
484   }
485
486   // configure cache
487   bufferlist inbl;
488   ASSERT_EQ(0, cluster.mon_command(
489     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
490     "\", \"tierpool\": \"" + cache_pool_name +
491     "\", \"force_nonempty\": \"--force-nonempty\" }",
492     inbl, NULL, NULL));
493   ASSERT_EQ(0, cluster.mon_command(
494     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
495     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
496     inbl, NULL, NULL));
497   ASSERT_EQ(0, cluster.mon_command(
498     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
499     "\", \"mode\": \"writeback\"}",
500     inbl, NULL, NULL));
501
502   // wait for maps to settle
503   cluster.wait_for_latest_osdmap();
504
505   // read, trigger a promote on _some_ heads to make sure we handle cases
506   // where snaps are present and where they are not.
507   cout << "promoting some heads" << std::endl;
508   for (int i=0; i<num; ++i) {
509     if (i % 5 == 0 || i > num - 3) {
510       bufferlist bl;
511       ASSERT_EQ(1, ioctx.read(string("foo") + stringify(i), bl, 1, 0));
512       ASSERT_EQ('c', bl[0]);
513     }
514   }
515
516   for (unsigned snap = 0; snap < my_snaps.size(); ++snap) {
517     cout << "promoting from clones for snap " << my_snaps[snap] << std::endl;
518     ioctx.snap_set_read(my_snaps[snap]);
519
520     // read some snaps, semi-randomly
521     for (int i=0; i<50; ++i) {
522       bufferlist bl;
523       string o = string("foo") + stringify((snap * i * 137) % 80);
524       //cout << o << std::endl;
525       ASSERT_EQ(1, ioctx.read(o, bl, 1, 0));
526     }
527   }
528
529   // ok, stop and scrub this pool (to make sure scrub can handle
530   // missing clones in the cache tier).
531   {
532     IoCtx cache_ioctx;
533     ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx));
534     for (int i=0; i<10; ++i) {
535       do {
536         ostringstream ss;
537         ss << "{\"prefix\": \"pg scrub\", \"pgid\": \""
538            << cache_ioctx.get_id() << "." << i
539            << "\"}";
540         int r = cluster.mon_command(ss.str(), inbl, NULL, NULL);
541         if (r == -ENOENT ||  // in case mgr osdmap is stale
542             r == -EAGAIN) {
543           sleep(5);
544           continue;
545         }
546       } while (false);
547     }
548
549     // give it a few seconds to go.  this is sloppy but is usually enough time
550     cout << "waiting for scrubs..." << std::endl;
551     sleep(30);
552     cout << "done waiting" << std::endl;
553   }
554
555   ioctx.snap_set_read(librados::SNAP_HEAD);
556
557   //cleanup
558   for (unsigned snap = 0; snap < my_snaps.size(); ++snap) {
559     ioctx.selfmanaged_snap_remove(my_snaps[snap]);
560   }
561 }
562
563 TEST_F(LibRadosTwoPoolsPP, PromoteSnapTrimRace) {
564   // create object
565   {
566     bufferlist bl;
567     bl.append("hi there");
568     ObjectWriteOperation op;
569     op.write_full(bl);
570     ASSERT_EQ(0, ioctx.operate("foo", &op));
571   }
572
573   // create a snapshot, clone
574   vector<uint64_t> my_snaps(1);
575   ASSERT_EQ(0, ioctx.selfmanaged_snap_create(&my_snaps[0]));
576   ASSERT_EQ(0, ioctx.selfmanaged_snap_set_write_ctx(my_snaps[0],
577                                                          my_snaps));
578   {
579     bufferlist bl;
580     bl.append("ciao!");
581     ObjectWriteOperation op;
582     op.write_full(bl);
583     ASSERT_EQ(0, ioctx.operate("foo", &op));
584   }
585
586   // configure cache
587   bufferlist inbl;
588   ASSERT_EQ(0, cluster.mon_command(
589     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
590     "\", \"tierpool\": \"" + cache_pool_name +
591     "\", \"force_nonempty\": \"--force-nonempty\" }",
592     inbl, NULL, NULL));
593   ASSERT_EQ(0, cluster.mon_command(
594     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
595     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
596     inbl, NULL, NULL));
597   ASSERT_EQ(0, cluster.mon_command(
598     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
599     "\", \"mode\": \"writeback\"}",
600     inbl, NULL, NULL));
601
602   // wait for maps to settle
603   cluster.wait_for_latest_osdmap();
604
605   // delete the snap
606   ASSERT_EQ(0, ioctx.selfmanaged_snap_remove(my_snaps[0]));
607
608   ioctx.snap_set_read(my_snaps[0]);
609
610   // read foo snap
611   {
612     bufferlist bl;
613     ASSERT_EQ(-ENOENT, ioctx.read("foo", bl, 1, 0));
614   }
615
616   // cleanup
617   ioctx.selfmanaged_snap_remove(my_snaps[0]);
618 }
619
620 TEST_F(LibRadosTwoPoolsPP, Whiteout) {
621   // create object
622   {
623     bufferlist bl;
624     bl.append("hi there");
625     ObjectWriteOperation op;
626     op.write_full(bl);
627     ASSERT_EQ(0, ioctx.operate("foo", &op));
628   }
629
630   // configure cache
631   bufferlist inbl;
632   ASSERT_EQ(0, cluster.mon_command(
633     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
634     "\", \"tierpool\": \"" + cache_pool_name +
635     "\", \"force_nonempty\": \"--force-nonempty\" }",
636     inbl, NULL, NULL));
637   ASSERT_EQ(0, cluster.mon_command(
638     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
639     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
640     inbl, NULL, NULL));
641   ASSERT_EQ(0, cluster.mon_command(
642     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
643     "\", \"mode\": \"writeback\"}",
644     inbl, NULL, NULL));
645
646   // wait for maps to settle
647   cluster.wait_for_latest_osdmap();
648
649   // create some whiteouts, verify they behave
650   {
651     ObjectWriteOperation op;
652     op.assert_exists();
653     op.remove();
654     ASSERT_EQ(0, ioctx.operate("foo", &op));
655   }
656
657   {
658     ObjectWriteOperation op;
659     op.assert_exists();
660     op.remove();
661     ASSERT_EQ(-ENOENT, ioctx.operate("bar", &op));
662   }
663   {
664     ObjectWriteOperation op;
665     op.assert_exists();
666     op.remove();
667     ASSERT_EQ(-ENOENT, ioctx.operate("bar", &op));
668   }
669
670   // verify the whiteouts are there in the cache tier
671   {
672     NObjectIterator it = cache_ioctx.nobjects_begin();
673     ASSERT_TRUE(it != cache_ioctx.nobjects_end());
674     ASSERT_TRUE(it->get_oid() == string("foo") || it->get_oid() == string("bar"));
675     ++it;
676     ASSERT_TRUE(it->get_oid() == string("foo") || it->get_oid() == string("bar"));
677     ++it;
678     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
679   }
680
681   // delete a whiteout and verify it goes away
682   ASSERT_EQ(-ENOENT, ioctx.remove("foo"));
683   {
684     ObjectWriteOperation op;
685     op.remove();
686     librados::AioCompletion *completion = cluster.aio_create_completion();
687     ASSERT_EQ(0, ioctx.aio_operate("bar", completion, &op,
688                                    librados::OPERATION_IGNORE_CACHE));
689     completion->wait_for_safe();
690     ASSERT_EQ(0, completion->get_return_value());
691     completion->release();
692
693     NObjectIterator it = cache_ioctx.nobjects_begin();
694     ASSERT_TRUE(it != cache_ioctx.nobjects_end());
695     ASSERT_TRUE(it->get_oid() == string("foo"));
696     ++it;
697     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
698   }
699
700   // recreate an object and verify we can read it
701   {
702     bufferlist bl;
703     bl.append("hi there");
704     ObjectWriteOperation op;
705     op.write_full(bl);
706     ASSERT_EQ(0, ioctx.operate("foo", &op));
707   }
708   {
709     bufferlist bl;
710     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
711     ASSERT_EQ('h', bl[0]);
712   }
713 }
714
715 TEST_F(LibRadosTwoPoolsPP, WhiteoutDeleteCreate) {
716   // configure cache
717   bufferlist inbl;
718   ASSERT_EQ(0, cluster.mon_command(
719     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
720     "\", \"tierpool\": \"" + cache_pool_name +
721     "\", \"force_nonempty\": \"--force-nonempty\" }",
722     inbl, NULL, NULL));
723   ASSERT_EQ(0, cluster.mon_command(
724     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
725     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
726     inbl, NULL, NULL));
727   ASSERT_EQ(0, cluster.mon_command(
728     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
729     "\", \"mode\": \"writeback\"}",
730     inbl, NULL, NULL));
731
732   // wait for maps to settle
733   cluster.wait_for_latest_osdmap();
734
735   // create an object
736   {
737     bufferlist bl;
738     bl.append("foo");
739     ASSERT_EQ(0, ioctx.write_full("foo", bl));
740   }
741
742   // do delete + create operation
743   {
744     ObjectWriteOperation op;
745     op.remove();
746     bufferlist bl;
747     bl.append("bar");
748     op.write_full(bl);
749     ASSERT_EQ(0, ioctx.operate("foo", &op));
750   }
751
752   // verify it still "exists" (w/ new content)
753   {
754     bufferlist bl;
755     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
756     ASSERT_EQ('b', bl[0]);
757   }
758 }
759
760 TEST_F(LibRadosTwoPoolsPP, Evict) {
761   // create object
762   {
763     bufferlist bl;
764     bl.append("hi there");
765     ObjectWriteOperation op;
766     op.write_full(bl);
767     ASSERT_EQ(0, ioctx.operate("foo", &op));
768   }
769
770   // configure cache
771   bufferlist inbl;
772   ASSERT_EQ(0, cluster.mon_command(
773     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
774     "\", \"tierpool\": \"" + cache_pool_name +
775     "\", \"force_nonempty\": \"--force-nonempty\" }",
776     inbl, NULL, NULL));
777   ASSERT_EQ(0, cluster.mon_command(
778     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
779     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
780     inbl, NULL, NULL));
781   ASSERT_EQ(0, cluster.mon_command(
782     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
783     "\", \"mode\": \"writeback\"}",
784     inbl, NULL, NULL));
785
786   // wait for maps to settle
787   cluster.wait_for_latest_osdmap();
788
789   // read, trigger a promote
790   {
791     bufferlist bl;
792     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
793   }
794
795   // read, trigger a whiteout, and a dirty object
796   {
797     bufferlist bl;
798     ASSERT_EQ(-ENOENT, ioctx.read("bar", bl, 1, 0));
799     ASSERT_EQ(-ENOENT, ioctx.read("bar", bl, 1, 0));
800     ASSERT_EQ(0, ioctx.write("bar", bl, bl.length(), 0));
801   }
802
803   // verify the object is present in the cache tier
804   {
805     NObjectIterator it = cache_ioctx.nobjects_begin();
806     ASSERT_TRUE(it != cache_ioctx.nobjects_end());
807     ASSERT_TRUE(it->get_oid() == string("foo") || it->get_oid() == string("bar"));
808     ++it;
809     ASSERT_TRUE(it->get_oid() == string("foo") || it->get_oid() == string("bar"));
810     ++it;
811     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
812   }
813
814   // pin
815   {
816     ObjectWriteOperation op;
817     op.cache_pin();
818     librados::AioCompletion *completion = cluster.aio_create_completion();
819     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op));
820     completion->wait_for_safe();
821     ASSERT_EQ(0, completion->get_return_value());
822     completion->release();
823   }
824
825   // evict the pinned object with -EPERM
826   {
827     ObjectReadOperation op;
828     op.cache_evict();
829     librados::AioCompletion *completion = cluster.aio_create_completion();
830     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op,
831                                          librados::OPERATION_IGNORE_CACHE,
832                                          NULL));
833     completion->wait_for_safe();
834     ASSERT_EQ(-EPERM, completion->get_return_value());
835     completion->release();
836   }
837
838   // unpin
839   {
840     ObjectWriteOperation op;
841     op.cache_unpin();
842     librados::AioCompletion *completion = cluster.aio_create_completion();
843     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op));
844     completion->wait_for_safe();
845     ASSERT_EQ(0, completion->get_return_value());
846     completion->release();
847   }
848
849   // flush
850   {
851     ObjectReadOperation op;
852     op.cache_flush();
853     librados::AioCompletion *completion = cluster.aio_create_completion();
854     ASSERT_EQ(0, cache_ioctx.aio_operate(
855       "foo", completion, &op,
856       librados::OPERATION_IGNORE_OVERLAY, NULL));
857     completion->wait_for_safe();
858     ASSERT_EQ(0, completion->get_return_value());
859     completion->release();
860   }
861
862   // verify clean
863   {
864     bool dirty = false;
865     int r = -1;
866     ObjectReadOperation op;
867     op.is_dirty(&dirty, &r);
868     ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL));
869     ASSERT_FALSE(dirty);
870     ASSERT_EQ(0, r);
871   }
872
873   // evict
874   {
875     ObjectReadOperation op;
876     op.cache_evict();
877     librados::AioCompletion *completion = cluster.aio_create_completion();
878     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op,
879                                          librados::OPERATION_IGNORE_CACHE,
880                                          NULL));
881     completion->wait_for_safe();
882     ASSERT_EQ(0, completion->get_return_value());
883     completion->release();
884   }
885   {
886     ObjectReadOperation op;
887     op.cache_evict();
888     librados::AioCompletion *completion = cluster.aio_create_completion();
889     ASSERT_EQ(0, cache_ioctx.aio_operate(
890       "foo", completion, &op,
891       librados::OPERATION_IGNORE_CACHE, NULL));
892     completion->wait_for_safe();
893     ASSERT_EQ(0, completion->get_return_value());
894     completion->release();
895   }
896   {
897     ObjectReadOperation op;
898     op.cache_evict();
899     librados::AioCompletion *completion = cluster.aio_create_completion();
900     ASSERT_EQ(0, cache_ioctx.aio_operate(
901       "bar", completion, &op,
902       librados::OPERATION_IGNORE_CACHE, NULL));
903     completion->wait_for_safe();
904     ASSERT_EQ(-EBUSY, completion->get_return_value());
905     completion->release();
906   }
907 }
908
909 TEST_F(LibRadosTwoPoolsPP, EvictSnap) {
910   // create object
911   {
912     bufferlist bl;
913     bl.append("hi there");
914     ObjectWriteOperation op;
915     op.write_full(bl);
916     ASSERT_EQ(0, ioctx.operate("foo", &op));
917   }
918   {
919     bufferlist bl;
920     bl.append("hi there");
921     ObjectWriteOperation op;
922     op.write_full(bl);
923     ASSERT_EQ(0, ioctx.operate("bar", &op));
924   }
925   {
926     bufferlist bl;
927     bl.append("hi there");
928     ObjectWriteOperation op;
929     op.write_full(bl);
930     ASSERT_EQ(0, ioctx.operate("baz", &op));
931   }
932   {
933     bufferlist bl;
934     bl.append("hi there");
935     ObjectWriteOperation op;
936     op.write_full(bl);
937     ASSERT_EQ(0, ioctx.operate("bam", &op));
938   }
939
940   // create a snapshot, clone
941   vector<uint64_t> my_snaps(1);
942   ASSERT_EQ(0, ioctx.selfmanaged_snap_create(&my_snaps[0]));
943   ASSERT_EQ(0, ioctx.selfmanaged_snap_set_write_ctx(my_snaps[0],
944                                                          my_snaps));
945   {
946     bufferlist bl;
947     bl.append("ciao!");
948     ObjectWriteOperation op;
949     op.write_full(bl);
950     ASSERT_EQ(0, ioctx.operate("foo", &op));
951   }
952   {
953     bufferlist bl;
954     bl.append("ciao!");
955     ObjectWriteOperation op;
956     op.write_full(bl);
957     ASSERT_EQ(0, ioctx.operate("bar", &op));
958   }
959   {
960     ObjectWriteOperation op;
961     op.remove();
962     ASSERT_EQ(0, ioctx.operate("baz", &op));
963   }
964   {
965     bufferlist bl;
966     bl.append("ciao!");
967     ObjectWriteOperation op;
968     op.write_full(bl);
969     ASSERT_EQ(0, ioctx.operate("bam", &op));
970   }
971
972   // configure cache
973   bufferlist inbl;
974   ASSERT_EQ(0, cluster.mon_command(
975     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
976     "\", \"tierpool\": \"" + cache_pool_name +
977     "\", \"force_nonempty\": \"--force-nonempty\" }",
978     inbl, NULL, NULL));
979   ASSERT_EQ(0, cluster.mon_command(
980     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
981     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
982     inbl, NULL, NULL));
983   ASSERT_EQ(0, cluster.mon_command(
984     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
985     "\", \"mode\": \"writeback\"}",
986     inbl, NULL, NULL));
987
988   // wait for maps to settle
989   cluster.wait_for_latest_osdmap();
990
991   // read, trigger a promote on the head
992   {
993     bufferlist bl;
994     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
995     ASSERT_EQ('c', bl[0]);
996   }
997   {
998     bufferlist bl;
999     ASSERT_EQ(1, ioctx.read("bam", bl, 1, 0));
1000     ASSERT_EQ('c', bl[0]);
1001   }
1002
1003   // evict bam
1004   {
1005     ObjectReadOperation op;
1006     op.cache_evict();
1007     librados::AioCompletion *completion = cluster.aio_create_completion();
1008     ASSERT_EQ(0, cache_ioctx.aio_operate(
1009       "bam", completion, &op,
1010       librados::OPERATION_IGNORE_CACHE, NULL));
1011     completion->wait_for_safe();
1012     ASSERT_EQ(0, completion->get_return_value());
1013     completion->release();
1014   }
1015   {
1016     bufferlist bl;
1017     ObjectReadOperation op;
1018     op.read(1, 0, &bl, NULL);
1019     librados::AioCompletion *completion = cluster.aio_create_completion();
1020     ASSERT_EQ(0, cache_ioctx.aio_operate(
1021       "bam", completion, &op,
1022       librados::OPERATION_IGNORE_CACHE, NULL));
1023     completion->wait_for_safe();
1024     ASSERT_EQ(-ENOENT, completion->get_return_value());
1025     completion->release();
1026   }
1027
1028   // read foo snap
1029   ioctx.snap_set_read(my_snaps[0]);
1030   {
1031     bufferlist bl;
1032     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
1033     ASSERT_EQ('h', bl[0]);
1034   }
1035
1036   // evict foo snap
1037   {
1038     ObjectReadOperation op;
1039     op.cache_evict();
1040     librados::AioCompletion *completion = cluster.aio_create_completion();
1041     ASSERT_EQ(0, ioctx.aio_operate(
1042       "foo", completion, &op,
1043       librados::OPERATION_IGNORE_CACHE, NULL));
1044     completion->wait_for_safe();
1045     ASSERT_EQ(0, completion->get_return_value());
1046     completion->release();
1047   }
1048   // snap is gone...
1049   {
1050     bufferlist bl;
1051     ObjectReadOperation op;
1052     op.read(1, 0, &bl, NULL);
1053     librados::AioCompletion *completion = cluster.aio_create_completion();
1054     ASSERT_EQ(0, ioctx.aio_operate(
1055       "foo", completion, &op,
1056       librados::OPERATION_IGNORE_CACHE, NULL));
1057     completion->wait_for_safe();
1058     ASSERT_EQ(-ENOENT, completion->get_return_value());
1059     completion->release();
1060   }
1061   // head is still there...
1062   ioctx.snap_set_read(librados::SNAP_HEAD);
1063   {
1064     bufferlist bl;
1065     ObjectReadOperation op;
1066     op.read(1, 0, &bl, NULL);
1067     librados::AioCompletion *completion = cluster.aio_create_completion();
1068     ASSERT_EQ(0, ioctx.aio_operate(
1069       "foo", completion, &op,
1070       librados::OPERATION_IGNORE_CACHE, NULL));
1071     completion->wait_for_safe();
1072     ASSERT_EQ(0, completion->get_return_value());
1073     completion->release();
1074   }
1075
1076   // promote head + snap of bar
1077   ioctx.snap_set_read(librados::SNAP_HEAD);
1078   {
1079     bufferlist bl;
1080     ASSERT_EQ(1, ioctx.read("bar", bl, 1, 0));
1081     ASSERT_EQ('c', bl[0]);
1082   }
1083   ioctx.snap_set_read(my_snaps[0]);
1084   {
1085     bufferlist bl;
1086     ASSERT_EQ(1, ioctx.read("bar", bl, 1, 0));
1087     ASSERT_EQ('h', bl[0]);
1088   }
1089
1090   // evict bar head (fail)
1091   ioctx.snap_set_read(librados::SNAP_HEAD);
1092   {
1093     ObjectReadOperation op;
1094     op.cache_evict();
1095     librados::AioCompletion *completion = cluster.aio_create_completion();
1096     ASSERT_EQ(0, ioctx.aio_operate(
1097       "bar", completion, &op,
1098       librados::OPERATION_IGNORE_CACHE, NULL));
1099     completion->wait_for_safe();
1100     ASSERT_EQ(-EBUSY, completion->get_return_value());
1101     completion->release();
1102   }
1103
1104   // evict bar snap
1105   ioctx.snap_set_read(my_snaps[0]);
1106   {
1107     ObjectReadOperation op;
1108     op.cache_evict();
1109     librados::AioCompletion *completion = cluster.aio_create_completion();
1110     ASSERT_EQ(0, ioctx.aio_operate(
1111       "bar", completion, &op,
1112       librados::OPERATION_IGNORE_CACHE, NULL));
1113     completion->wait_for_safe();
1114     ASSERT_EQ(0, completion->get_return_value());
1115     completion->release();
1116   }
1117   // ...and then head
1118   ioctx.snap_set_read(librados::SNAP_HEAD);
1119   {
1120     bufferlist bl;
1121     ObjectReadOperation op;
1122     op.read(1, 0, &bl, NULL);
1123     librados::AioCompletion *completion = cluster.aio_create_completion();
1124     ASSERT_EQ(0, ioctx.aio_operate(
1125       "bar", completion, &op,
1126       librados::OPERATION_IGNORE_CACHE, NULL));
1127     completion->wait_for_safe();
1128     ASSERT_EQ(0, completion->get_return_value());
1129     completion->release();
1130   }
1131   {
1132     ObjectReadOperation op;
1133     op.cache_evict();
1134     librados::AioCompletion *completion = cluster.aio_create_completion();
1135     ASSERT_EQ(0, ioctx.aio_operate(
1136       "bar", completion, &op,
1137       librados::OPERATION_IGNORE_CACHE, NULL));
1138     completion->wait_for_safe();
1139     ASSERT_EQ(0, completion->get_return_value());
1140     completion->release();
1141   }
1142
1143   // cleanup
1144   ioctx.selfmanaged_snap_remove(my_snaps[0]);
1145 }
1146
1147 // this test case reproduces http://tracker.ceph.com/issues/8629
1148 TEST_F(LibRadosTwoPoolsPP, EvictSnap2) {
1149   // create object
1150   {
1151     bufferlist bl;
1152     bl.append("hi there");
1153     ObjectWriteOperation op;
1154     op.write_full(bl);
1155     ASSERT_EQ(0, ioctx.operate("foo", &op));
1156   }
1157   // create a snapshot, clone
1158   vector<uint64_t> my_snaps(1);
1159   ASSERT_EQ(0, ioctx.selfmanaged_snap_create(&my_snaps[0]));
1160   ASSERT_EQ(0, ioctx.selfmanaged_snap_set_write_ctx(my_snaps[0],
1161                                                          my_snaps));
1162   {
1163     bufferlist bl;
1164     bl.append("ciao!");
1165     ObjectWriteOperation op;
1166     op.write_full(bl);
1167     ASSERT_EQ(0, ioctx.operate("foo", &op));
1168   }
1169   // configure cache
1170   bufferlist inbl;
1171   ASSERT_EQ(0, cluster.mon_command(
1172     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
1173     "\", \"tierpool\": \"" + cache_pool_name +
1174     "\", \"force_nonempty\": \"--force-nonempty\" }",
1175     inbl, NULL, NULL));
1176   ASSERT_EQ(0, cluster.mon_command(
1177     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
1178     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
1179     inbl, NULL, NULL));
1180   ASSERT_EQ(0, cluster.mon_command(
1181     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
1182     "\", \"mode\": \"writeback\"}",
1183     inbl, NULL, NULL));
1184
1185   // wait for maps to settle
1186   cluster.wait_for_latest_osdmap();
1187
1188   // read, trigger a promote on the head
1189   {
1190     bufferlist bl;
1191     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
1192     ASSERT_EQ('c', bl[0]);
1193   }
1194
1195   // evict
1196   {
1197     ObjectReadOperation op;
1198     op.cache_evict();
1199     librados::AioCompletion *completion = cluster.aio_create_completion();
1200     ASSERT_EQ(0, cache_ioctx.aio_operate(
1201       "foo", completion, &op,
1202       librados::OPERATION_IGNORE_CACHE, NULL));
1203     completion->wait_for_safe();
1204     ASSERT_EQ(0, completion->get_return_value());
1205     completion->release();
1206   }
1207
1208   // verify the snapdir is not present in the cache pool
1209   {
1210     ObjectReadOperation op;
1211     librados::snap_set_t snapset;
1212     op.list_snaps(&snapset, NULL);
1213     ioctx.snap_set_read(librados::SNAP_DIR);
1214     librados::AioCompletion *completion = cluster.aio_create_completion();
1215     ASSERT_EQ(0, ioctx.aio_operate("foo", completion, &op,
1216                                    librados::OPERATION_IGNORE_CACHE, NULL));
1217     completion->wait_for_safe();
1218     ASSERT_EQ(-ENOENT, completion->get_return_value());
1219     completion->release();
1220   }
1221 }
1222
1223 TEST_F(LibRadosTwoPoolsPP, TryFlush) {
1224   // configure cache
1225   bufferlist inbl;
1226   ASSERT_EQ(0, cluster.mon_command(
1227     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
1228     "\", \"tierpool\": \"" + cache_pool_name +
1229     "\", \"force_nonempty\": \"--force-nonempty\" }",
1230     inbl, NULL, NULL));
1231   ASSERT_EQ(0, cluster.mon_command(
1232     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
1233     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
1234     inbl, NULL, NULL));
1235   ASSERT_EQ(0, cluster.mon_command(
1236     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
1237     "\", \"mode\": \"writeback\"}",
1238     inbl, NULL, NULL));
1239
1240   // wait for maps to settle
1241   cluster.wait_for_latest_osdmap();
1242
1243   // create object
1244   {
1245     bufferlist bl;
1246     bl.append("hi there");
1247     ObjectWriteOperation op;
1248     op.write_full(bl);
1249     ASSERT_EQ(0, ioctx.operate("foo", &op));
1250   }
1251
1252   // verify the object is present in the cache tier
1253   {
1254     NObjectIterator it = cache_ioctx.nobjects_begin();
1255     ASSERT_TRUE(it != cache_ioctx.nobjects_end());
1256     ASSERT_TRUE(it->get_oid() == string("foo"));
1257     ++it;
1258     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
1259   }
1260
1261   // verify the object is NOT present in the base tier
1262   {
1263     NObjectIterator it = ioctx.nobjects_begin();
1264     ASSERT_TRUE(it == ioctx.nobjects_end());
1265   }
1266
1267   // verify dirty
1268   {
1269     bool dirty = false;
1270     int r = -1;
1271     ObjectReadOperation op;
1272     op.is_dirty(&dirty, &r);
1273     ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL));
1274     ASSERT_TRUE(dirty);
1275     ASSERT_EQ(0, r);
1276   }
1277
1278   // pin
1279   {
1280     ObjectWriteOperation op;
1281     op.cache_pin();
1282     librados::AioCompletion *completion = cluster.aio_create_completion();
1283     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op));
1284     completion->wait_for_safe();
1285     ASSERT_EQ(0, completion->get_return_value());
1286     completion->release();
1287   }
1288
1289   // flush the pinned object with -EPERM
1290   {
1291     ObjectReadOperation op;
1292     op.cache_try_flush();
1293     librados::AioCompletion *completion = cluster.aio_create_completion();
1294     ASSERT_EQ(0, cache_ioctx.aio_operate(
1295       "foo", completion, &op,
1296       librados::OPERATION_IGNORE_OVERLAY |
1297       librados::OPERATION_SKIPRWLOCKS, NULL));
1298     completion->wait_for_safe();
1299     ASSERT_EQ(-EPERM, completion->get_return_value());
1300     completion->release();
1301   }
1302
1303   // unpin
1304   {
1305     ObjectWriteOperation op;
1306     op.cache_unpin();
1307     librados::AioCompletion *completion = cluster.aio_create_completion();
1308     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op));
1309     completion->wait_for_safe();
1310     ASSERT_EQ(0, completion->get_return_value());
1311     completion->release();
1312   }
1313
1314   // flush
1315   {
1316     ObjectReadOperation op;
1317     op.cache_try_flush();
1318     librados::AioCompletion *completion = cluster.aio_create_completion();
1319     ASSERT_EQ(0, cache_ioctx.aio_operate(
1320       "foo", completion, &op,
1321       librados::OPERATION_IGNORE_OVERLAY |
1322       librados::OPERATION_SKIPRWLOCKS, NULL));
1323     completion->wait_for_safe();
1324     ASSERT_EQ(0, completion->get_return_value());
1325     completion->release();
1326   }
1327
1328   // verify clean
1329   {
1330     bool dirty = false;
1331     int r = -1;
1332     ObjectReadOperation op;
1333     op.is_dirty(&dirty, &r);
1334     ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL));
1335     ASSERT_FALSE(dirty);
1336     ASSERT_EQ(0, r);
1337   }
1338
1339   // verify in base tier
1340   {
1341     NObjectIterator it = ioctx.nobjects_begin();
1342     ASSERT_TRUE(it != ioctx.nobjects_end());
1343     ASSERT_TRUE(it->get_oid() == string("foo"));
1344     ++it;
1345     ASSERT_TRUE(it == ioctx.nobjects_end());
1346   }
1347
1348   // evict it
1349   {
1350     ObjectReadOperation op;
1351     op.cache_evict();
1352     librados::AioCompletion *completion = cluster.aio_create_completion();
1353     ASSERT_EQ(0, cache_ioctx.aio_operate(
1354          "foo", completion, &op, librados::OPERATION_IGNORE_CACHE, NULL));
1355     completion->wait_for_safe();
1356     ASSERT_EQ(0, completion->get_return_value());
1357     completion->release();
1358   }
1359
1360   // verify no longer in cache tier
1361   {
1362     NObjectIterator it = cache_ioctx.nobjects_begin();
1363     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
1364   }
1365 }
1366
1367 TEST_F(LibRadosTwoPoolsPP, Flush) {
1368   // configure cache
1369   bufferlist inbl;
1370   ASSERT_EQ(0, cluster.mon_command(
1371     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
1372     "\", \"tierpool\": \"" + cache_pool_name +
1373     "\", \"force_nonempty\": \"--force-nonempty\" }",
1374     inbl, NULL, NULL));
1375   ASSERT_EQ(0, cluster.mon_command(
1376     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
1377     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
1378     inbl, NULL, NULL));
1379   ASSERT_EQ(0, cluster.mon_command(
1380     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
1381     "\", \"mode\": \"writeback\"}",
1382     inbl, NULL, NULL));
1383
1384   // wait for maps to settle
1385   cluster.wait_for_latest_osdmap();
1386
1387   uint64_t user_version = 0;
1388
1389   // create object
1390   {
1391     bufferlist bl;
1392     bl.append("hi there");
1393     ObjectWriteOperation op;
1394     op.write_full(bl);
1395     ASSERT_EQ(0, ioctx.operate("foo", &op));
1396   }
1397
1398   // verify the object is present in the cache tier
1399   {
1400     NObjectIterator it = cache_ioctx.nobjects_begin();
1401     ASSERT_TRUE(it != cache_ioctx.nobjects_end());
1402     ASSERT_TRUE(it->get_oid() == string("foo"));
1403     ++it;
1404     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
1405   }
1406
1407   // verify the object is NOT present in the base tier
1408   {
1409     NObjectIterator it = ioctx.nobjects_begin();
1410     ASSERT_TRUE(it == ioctx.nobjects_end());
1411   }
1412
1413   // verify dirty
1414   {
1415     bool dirty = false;
1416     int r = -1;
1417     ObjectReadOperation op;
1418     op.is_dirty(&dirty, &r);
1419     ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL));
1420     ASSERT_TRUE(dirty);
1421     ASSERT_EQ(0, r);
1422     user_version = cache_ioctx.get_last_version();
1423   }
1424
1425   // pin
1426   {
1427     ObjectWriteOperation op;
1428     op.cache_pin();
1429     librados::AioCompletion *completion = cluster.aio_create_completion();
1430     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op));
1431     completion->wait_for_safe();
1432     ASSERT_EQ(0, completion->get_return_value());
1433     completion->release();
1434   }
1435
1436   // flush the pinned object with -EPERM
1437   {
1438     ObjectReadOperation op;
1439     op.cache_try_flush();
1440     librados::AioCompletion *completion = cluster.aio_create_completion();
1441     ASSERT_EQ(0, cache_ioctx.aio_operate(
1442       "foo", completion, &op,
1443       librados::OPERATION_IGNORE_OVERLAY |
1444       librados::OPERATION_SKIPRWLOCKS, NULL));
1445     completion->wait_for_safe();
1446     ASSERT_EQ(-EPERM, completion->get_return_value());
1447     completion->release();
1448   }
1449
1450   // unpin
1451   {
1452     ObjectWriteOperation op;
1453     op.cache_unpin();
1454     librados::AioCompletion *completion = cluster.aio_create_completion();
1455     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op));
1456     completion->wait_for_safe();
1457     ASSERT_EQ(0, completion->get_return_value());
1458     completion->release();
1459   }
1460
1461   // flush
1462   {
1463     ObjectReadOperation op;
1464     op.cache_flush();
1465     librados::AioCompletion *completion = cluster.aio_create_completion();
1466     ASSERT_EQ(0, cache_ioctx.aio_operate(
1467       "foo", completion, &op,
1468       librados::OPERATION_IGNORE_OVERLAY, NULL));
1469     completion->wait_for_safe();
1470     ASSERT_EQ(0, completion->get_return_value());
1471     completion->release();
1472   }
1473
1474   // verify clean
1475   {
1476     bool dirty = false;
1477     int r = -1;
1478     ObjectReadOperation op;
1479     op.is_dirty(&dirty, &r);
1480     ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL));
1481     ASSERT_FALSE(dirty);
1482     ASSERT_EQ(0, r);
1483   }
1484
1485   // verify in base tier
1486   {
1487     NObjectIterator it = ioctx.nobjects_begin();
1488     ASSERT_TRUE(it != ioctx.nobjects_end());
1489     ASSERT_TRUE(it->get_oid() == string("foo"));
1490     ++it;
1491     ASSERT_TRUE(it == ioctx.nobjects_end());
1492   }
1493
1494   // evict it
1495   {
1496     ObjectReadOperation op;
1497     op.cache_evict();
1498     librados::AioCompletion *completion = cluster.aio_create_completion();
1499     ASSERT_EQ(0, cache_ioctx.aio_operate(
1500          "foo", completion, &op, librados::OPERATION_IGNORE_CACHE, NULL));
1501     completion->wait_for_safe();
1502     ASSERT_EQ(0, completion->get_return_value());
1503     completion->release();
1504   }
1505
1506   // verify no longer in cache tier
1507   {
1508     NObjectIterator it = cache_ioctx.nobjects_begin();
1509     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
1510   }
1511
1512   // read it again and verify the version is consistent
1513   {
1514     bufferlist bl;
1515     ASSERT_EQ(1, cache_ioctx.read("foo", bl, 1, 0));
1516     ASSERT_EQ(user_version, cache_ioctx.get_last_version());
1517   }
1518
1519   // erase it
1520   {
1521     ObjectWriteOperation op;
1522     op.remove();
1523     ASSERT_EQ(0, ioctx.operate("foo", &op));
1524   }
1525
1526   // flush whiteout
1527   {
1528     ObjectReadOperation op;
1529     op.cache_flush();
1530     librados::AioCompletion *completion = cluster.aio_create_completion();
1531     ASSERT_EQ(0, cache_ioctx.aio_operate(
1532       "foo", completion, &op,
1533       librados::OPERATION_IGNORE_OVERLAY, NULL));
1534     completion->wait_for_safe();
1535     ASSERT_EQ(0, completion->get_return_value());
1536     completion->release();
1537   }
1538
1539   // evict
1540   {
1541     ObjectReadOperation op;
1542     op.cache_evict();
1543     librados::AioCompletion *completion = cluster.aio_create_completion();
1544     ASSERT_EQ(0, cache_ioctx.aio_operate(
1545          "foo", completion, &op, librados::OPERATION_IGNORE_CACHE, NULL));
1546     completion->wait_for_safe();
1547     ASSERT_EQ(0, completion->get_return_value());
1548     completion->release();
1549   }
1550
1551   // verify no longer in cache tier
1552   {
1553     NObjectIterator it = cache_ioctx.nobjects_begin();
1554     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
1555   }
1556   // or base tier
1557   {
1558     NObjectIterator it = ioctx.nobjects_begin();
1559     ASSERT_TRUE(it == ioctx.nobjects_end());
1560   }
1561 }
1562
1563 TEST_F(LibRadosTwoPoolsPP, FlushSnap) {
1564   // configure cache
1565   bufferlist inbl;
1566   ASSERT_EQ(0, cluster.mon_command(
1567     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
1568     "\", \"tierpool\": \"" + cache_pool_name +
1569     "\", \"force_nonempty\": \"--force-nonempty\" }",
1570     inbl, NULL, NULL));
1571   ASSERT_EQ(0, cluster.mon_command(
1572     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
1573     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
1574     inbl, NULL, NULL));
1575   ASSERT_EQ(0, cluster.mon_command(
1576     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
1577     "\", \"mode\": \"writeback\"}",
1578     inbl, NULL, NULL));
1579
1580   // wait for maps to settle
1581   cluster.wait_for_latest_osdmap();
1582
1583   // create object
1584   {
1585     bufferlist bl;
1586     bl.append("a");
1587     ObjectWriteOperation op;
1588     op.write_full(bl);
1589     ASSERT_EQ(0, ioctx.operate("foo", &op));
1590   }
1591
1592   // create a snapshot, clone
1593   vector<uint64_t> my_snaps(1);
1594   ASSERT_EQ(0, ioctx.selfmanaged_snap_create(&my_snaps[0]));
1595   ASSERT_EQ(0, ioctx.selfmanaged_snap_set_write_ctx(my_snaps[0],
1596                                                          my_snaps));
1597   {
1598     bufferlist bl;
1599     bl.append("b");
1600     ObjectWriteOperation op;
1601     op.write_full(bl);
1602     ASSERT_EQ(0, ioctx.operate("foo", &op));
1603   }
1604
1605   // and another
1606   my_snaps.resize(2);
1607   my_snaps[1] = my_snaps[0];
1608   ASSERT_EQ(0, ioctx.selfmanaged_snap_create(&my_snaps[0]));
1609   ASSERT_EQ(0, ioctx.selfmanaged_snap_set_write_ctx(my_snaps[0],
1610                                                          my_snaps));
1611   {
1612     bufferlist bl;
1613     bl.append("c");
1614     ObjectWriteOperation op;
1615     op.write_full(bl);
1616     ASSERT_EQ(0, ioctx.operate("foo", &op));
1617   }
1618
1619   // verify the object is present in the cache tier
1620   {
1621     NObjectIterator it = cache_ioctx.nobjects_begin();
1622     ASSERT_TRUE(it != cache_ioctx.nobjects_end());
1623     ASSERT_TRUE(it->get_oid() == string("foo"));
1624     ++it;
1625     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
1626   }
1627
1628   // verify the object is NOT present in the base tier
1629   {
1630     NObjectIterator it = ioctx.nobjects_begin();
1631     ASSERT_TRUE(it == ioctx.nobjects_end());
1632   }
1633
1634   // flush on head (should fail)
1635   ioctx.snap_set_read(librados::SNAP_HEAD);
1636   {
1637     ObjectReadOperation op;
1638     op.cache_flush();
1639     librados::AioCompletion *completion = cluster.aio_create_completion();
1640     ASSERT_EQ(0, ioctx.aio_operate(
1641       "foo", completion, &op,
1642       librados::OPERATION_IGNORE_CACHE, NULL));
1643     completion->wait_for_safe();
1644     ASSERT_EQ(-EBUSY, completion->get_return_value());
1645     completion->release();
1646   }
1647   // flush on recent snap (should fail)
1648   ioctx.snap_set_read(my_snaps[0]);
1649   {
1650     ObjectReadOperation op;
1651     op.cache_flush();
1652     librados::AioCompletion *completion = cluster.aio_create_completion();
1653     ASSERT_EQ(0, ioctx.aio_operate(
1654       "foo", completion, &op,
1655       librados::OPERATION_IGNORE_CACHE, NULL));
1656     completion->wait_for_safe();
1657     ASSERT_EQ(-EBUSY, completion->get_return_value());
1658     completion->release();
1659   }
1660   // flush on oldest snap
1661   ioctx.snap_set_read(my_snaps[1]);
1662   {
1663     ObjectReadOperation op;
1664     op.cache_flush();
1665     librados::AioCompletion *completion = cluster.aio_create_completion();
1666     ASSERT_EQ(0, ioctx.aio_operate(
1667       "foo", completion, &op,
1668       librados::OPERATION_IGNORE_CACHE, NULL));
1669     completion->wait_for_safe();
1670     ASSERT_EQ(0, completion->get_return_value());
1671     completion->release();
1672   }
1673   // flush on next oldest snap
1674   ioctx.snap_set_read(my_snaps[0]);
1675   {
1676     ObjectReadOperation op;
1677     op.cache_flush();
1678     librados::AioCompletion *completion = cluster.aio_create_completion();
1679     ASSERT_EQ(0, ioctx.aio_operate(
1680       "foo", completion, &op,
1681       librados::OPERATION_IGNORE_CACHE, NULL));
1682     completion->wait_for_safe();
1683     ASSERT_EQ(0, completion->get_return_value());
1684     completion->release();
1685   }
1686   // flush on head
1687   ioctx.snap_set_read(librados::SNAP_HEAD);
1688   {
1689     ObjectReadOperation op;
1690     op.cache_flush();
1691     librados::AioCompletion *completion = cluster.aio_create_completion();
1692     ASSERT_EQ(0, ioctx.aio_operate(
1693       "foo", completion, &op,
1694       librados::OPERATION_IGNORE_CACHE, NULL));
1695     completion->wait_for_safe();
1696     ASSERT_EQ(0, completion->get_return_value());
1697     completion->release();
1698   }
1699
1700   // verify i can read the snaps from the cache pool
1701   ioctx.snap_set_read(librados::SNAP_HEAD);
1702   {
1703     bufferlist bl;
1704     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
1705     ASSERT_EQ('c', bl[0]);
1706   }
1707   ioctx.snap_set_read(my_snaps[0]);
1708   {
1709     bufferlist bl;
1710     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
1711     ASSERT_EQ('b', bl[0]);
1712   }
1713   ioctx.snap_set_read(my_snaps[1]);
1714   {
1715     bufferlist bl;
1716     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
1717     ASSERT_EQ('a', bl[0]);
1718   }
1719
1720   // remove overlay
1721   ASSERT_EQ(0, cluster.mon_command(
1722     "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + pool_name +
1723     "\"}",
1724     inbl, NULL, NULL));
1725
1726   // wait for maps to settle
1727   cluster.wait_for_latest_osdmap();
1728
1729   // verify i can read the snaps from the base pool
1730   ioctx.snap_set_read(librados::SNAP_HEAD);
1731   {
1732     bufferlist bl;
1733     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
1734     ASSERT_EQ('c', bl[0]);
1735   }
1736   ioctx.snap_set_read(my_snaps[0]);
1737   {
1738     bufferlist bl;
1739     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
1740     ASSERT_EQ('b', bl[0]);
1741   }
1742   ioctx.snap_set_read(my_snaps[1]);
1743   {
1744     bufferlist bl;
1745     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
1746     ASSERT_EQ('a', bl[0]);
1747   }
1748
1749   ASSERT_EQ(0, cluster.mon_command(
1750     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
1751     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
1752     inbl, NULL, NULL));
1753
1754   // cleanup
1755   ioctx.selfmanaged_snap_remove(my_snaps[0]);
1756 }
1757
1758 TEST_F(LibRadosTierPP, FlushWriteRaces) {
1759   Rados cluster;
1760   std::string pool_name = get_temp_pool_name();
1761   std::string cache_pool_name = pool_name + "-cache";
1762   ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
1763   ASSERT_EQ(0, cluster.pool_create(cache_pool_name.c_str()));
1764   IoCtx cache_ioctx;
1765   ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx));
1766   cache_ioctx.application_enable("rados", true);
1767   IoCtx ioctx;
1768   ASSERT_EQ(0, cluster.ioctx_create(pool_name.c_str(), ioctx));
1769
1770   // configure cache
1771   bufferlist inbl;
1772   ASSERT_EQ(0, cluster.mon_command(
1773     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
1774     "\", \"tierpool\": \"" + cache_pool_name + "\"}",
1775     inbl, NULL, NULL));
1776   ASSERT_EQ(0, cluster.mon_command(
1777     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
1778     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
1779     inbl, NULL, NULL));
1780   ASSERT_EQ(0, cluster.mon_command(
1781     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
1782     "\", \"mode\": \"writeback\"}",
1783     inbl, NULL, NULL));
1784
1785   // wait for maps to settle
1786   cluster.wait_for_latest_osdmap();
1787
1788   // create/dirty object
1789   bufferlist bl;
1790   bl.append("hi there");
1791   {
1792     ObjectWriteOperation op;
1793     op.write_full(bl);
1794     ASSERT_EQ(0, ioctx.operate("foo", &op));
1795   }
1796
1797   // flush + write
1798   {
1799     ObjectReadOperation op;
1800     op.cache_flush();
1801     librados::AioCompletion *completion = cluster.aio_create_completion();
1802     ASSERT_EQ(0, cache_ioctx.aio_operate(
1803       "foo", completion, &op,
1804       librados::OPERATION_IGNORE_OVERLAY, NULL));
1805
1806     ObjectWriteOperation op2;
1807     op2.write_full(bl);
1808     librados::AioCompletion *completion2 = cluster.aio_create_completion();
1809     ASSERT_EQ(0, ioctx.aio_operate(
1810       "foo", completion2, &op2, 0));
1811
1812     completion->wait_for_safe();
1813     completion2->wait_for_safe();
1814     ASSERT_EQ(0, completion->get_return_value());
1815     ASSERT_EQ(0, completion2->get_return_value());
1816     completion->release();
1817     completion2->release();
1818   }
1819
1820   int tries = 1000;
1821   do {
1822     // create/dirty object
1823     {
1824       bufferlist bl;
1825       bl.append("hi there");
1826       ObjectWriteOperation op;
1827       op.write_full(bl);
1828       ASSERT_EQ(0, ioctx.operate("foo", &op));
1829     }
1830
1831     // try-flush + write
1832     {
1833       ObjectReadOperation op;
1834       op.cache_try_flush();
1835       librados::AioCompletion *completion = cluster.aio_create_completion();
1836       ASSERT_EQ(0, cache_ioctx.aio_operate(
1837         "foo", completion, &op,
1838         librados::OPERATION_IGNORE_OVERLAY |
1839         librados::OPERATION_SKIPRWLOCKS, NULL));
1840
1841       ObjectWriteOperation op2;
1842       op2.write_full(bl);
1843       librados::AioCompletion *completion2 = cluster.aio_create_completion();
1844       ASSERT_EQ(0, ioctx.aio_operate("foo", completion2, &op2, 0));
1845
1846       completion->wait_for_safe();
1847       completion2->wait_for_safe();
1848       int r = completion->get_return_value();
1849       ASSERT_TRUE(r == -EBUSY || r == 0);
1850       ASSERT_EQ(0, completion2->get_return_value());
1851       completion->release();
1852       completion2->release();
1853       if (r == -EBUSY)
1854         break;
1855       cout << "didn't get EBUSY, trying again" << std::endl;
1856     }
1857     ASSERT_TRUE(--tries);
1858   } while (true);
1859
1860   // tear down tiers
1861   ASSERT_EQ(0, cluster.mon_command(
1862     "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + pool_name +
1863     "\"}",
1864     inbl, NULL, NULL));
1865   ASSERT_EQ(0, cluster.mon_command(
1866     "{\"prefix\": \"osd tier remove\", \"pool\": \"" + pool_name +
1867     "\", \"tierpool\": \"" + cache_pool_name + "\"}",
1868     inbl, NULL, NULL));
1869
1870   // wait for maps to settle before next test
1871   cluster.wait_for_latest_osdmap();
1872
1873   ASSERT_EQ(0, cluster.pool_delete(cache_pool_name.c_str()));
1874   ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
1875 }
1876
1877 TEST_F(LibRadosTwoPoolsPP, FlushTryFlushRaces) {
1878   // configure cache
1879   bufferlist inbl;
1880   ASSERT_EQ(0, cluster.mon_command(
1881     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
1882     "\", \"tierpool\": \"" + cache_pool_name +
1883     "\", \"force_nonempty\": \"--force-nonempty\" }",
1884     inbl, NULL, NULL));
1885   ASSERT_EQ(0, cluster.mon_command(
1886     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
1887     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
1888     inbl, NULL, NULL));
1889   ASSERT_EQ(0, cluster.mon_command(
1890     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
1891     "\", \"mode\": \"writeback\"}",
1892     inbl, NULL, NULL));
1893
1894   // wait for maps to settle
1895   cluster.wait_for_latest_osdmap();
1896
1897   // create/dirty object
1898   {
1899     bufferlist bl;
1900     bl.append("hi there");
1901     ObjectWriteOperation op;
1902     op.write_full(bl);
1903     ASSERT_EQ(0, ioctx.operate("foo", &op));
1904   }
1905
1906   // flush + flush
1907   {
1908     ObjectReadOperation op;
1909     op.cache_flush();
1910     librados::AioCompletion *completion = cluster.aio_create_completion();
1911     ASSERT_EQ(0, cache_ioctx.aio_operate(
1912       "foo", completion, &op,
1913       librados::OPERATION_IGNORE_OVERLAY, NULL));
1914
1915     ObjectReadOperation op2;
1916     op2.cache_flush();
1917     librados::AioCompletion *completion2 = cluster.aio_create_completion();
1918     ASSERT_EQ(0, cache_ioctx.aio_operate(
1919       "foo", completion2, &op2,
1920       librados::OPERATION_IGNORE_OVERLAY, NULL));
1921
1922     completion->wait_for_safe();
1923     completion2->wait_for_safe();
1924     ASSERT_EQ(0, completion->get_return_value());
1925     ASSERT_EQ(0, completion2->get_return_value());
1926     completion->release();
1927     completion2->release();
1928   }
1929
1930   // create/dirty object
1931   {
1932     bufferlist bl;
1933     bl.append("hi there");
1934     ObjectWriteOperation op;
1935     op.write_full(bl);
1936     ASSERT_EQ(0, ioctx.operate("foo", &op));
1937   }
1938
1939   // flush + try-flush
1940   {
1941     ObjectReadOperation op;
1942     op.cache_flush();
1943     librados::AioCompletion *completion = cluster.aio_create_completion();
1944     ASSERT_EQ(0, cache_ioctx.aio_operate(
1945       "foo", completion, &op,
1946       librados::OPERATION_IGNORE_OVERLAY, NULL));
1947
1948     ObjectReadOperation op2;
1949     op2.cache_try_flush();
1950     librados::AioCompletion *completion2 = cluster.aio_create_completion();
1951     ASSERT_EQ(0, cache_ioctx.aio_operate(
1952       "foo", completion2, &op2,
1953       librados::OPERATION_IGNORE_OVERLAY |
1954       librados::OPERATION_SKIPRWLOCKS, NULL));
1955
1956     completion->wait_for_safe();
1957     completion2->wait_for_safe();
1958     ASSERT_EQ(0, completion->get_return_value());
1959     ASSERT_EQ(0, completion2->get_return_value());
1960     completion->release();
1961     completion2->release();
1962   }
1963
1964   // create/dirty object
1965   int tries = 1000;
1966   do {
1967     {
1968       bufferlist bl;
1969       bl.append("hi there");
1970       ObjectWriteOperation op;
1971       op.write_full(bl);
1972       ASSERT_EQ(0, ioctx.operate("foo", &op));
1973     }
1974
1975     // try-flush + flush
1976     //  (flush will not piggyback on try-flush)
1977     {
1978       ObjectReadOperation op;
1979       op.cache_try_flush();
1980       librados::AioCompletion *completion = cluster.aio_create_completion();
1981       ASSERT_EQ(0, cache_ioctx.aio_operate(
1982         "foo", completion, &op,
1983         librados::OPERATION_IGNORE_OVERLAY |
1984         librados::OPERATION_SKIPRWLOCKS, NULL));
1985
1986       ObjectReadOperation op2;
1987       op2.cache_flush();
1988       librados::AioCompletion *completion2 = cluster.aio_create_completion();
1989       ASSERT_EQ(0, cache_ioctx.aio_operate(
1990         "foo", completion2, &op2,
1991         librados::OPERATION_IGNORE_OVERLAY, NULL));
1992
1993       completion->wait_for_safe();
1994       completion2->wait_for_safe();
1995       int r = completion->get_return_value();
1996       ASSERT_TRUE(r == -EBUSY || r == 0);
1997       ASSERT_EQ(0, completion2->get_return_value());
1998       completion->release();
1999       completion2->release();
2000       if (r == -EBUSY)
2001         break;
2002       cout << "didn't get EBUSY, trying again" << std::endl;
2003     }
2004     ASSERT_TRUE(--tries);
2005   } while (true);
2006
2007   // create/dirty object
2008   {
2009     bufferlist bl;
2010     bl.append("hi there");
2011     ObjectWriteOperation op;
2012     op.write_full(bl);
2013     ASSERT_EQ(0, ioctx.operate("foo", &op));
2014   }
2015
2016   // try-flush + try-flush
2017   {
2018     ObjectReadOperation op;
2019     op.cache_try_flush();
2020     librados::AioCompletion *completion = cluster.aio_create_completion();
2021     ASSERT_EQ(0, cache_ioctx.aio_operate(
2022       "foo", completion, &op,
2023       librados::OPERATION_IGNORE_OVERLAY |
2024       librados::OPERATION_SKIPRWLOCKS, NULL));
2025
2026     ObjectReadOperation op2;
2027     op2.cache_try_flush();
2028     librados::AioCompletion *completion2 = cluster.aio_create_completion();
2029     ASSERT_EQ(0, cache_ioctx.aio_operate(
2030       "foo", completion2, &op2,
2031       librados::OPERATION_IGNORE_OVERLAY |
2032       librados::OPERATION_SKIPRWLOCKS, NULL));
2033
2034     completion->wait_for_safe();
2035     completion2->wait_for_safe();
2036     ASSERT_EQ(0, completion->get_return_value());
2037     ASSERT_EQ(0, completion2->get_return_value());
2038     completion->release();
2039     completion2->release();
2040   }
2041 }
2042
2043
2044 IoCtx *read_ioctx = 0;
2045 Mutex test_lock("FlushReadRaces::lock");
2046 Cond cond;
2047 int max_reads = 100;
2048 int num_reads = 0; // in progress
2049
2050 void flush_read_race_cb(completion_t cb, void *arg);
2051
2052 void start_flush_read()
2053 {
2054   //cout << " starting read" << std::endl;
2055   ObjectReadOperation op;
2056   op.stat(NULL, NULL, NULL);
2057   librados::AioCompletion *completion = completions.getCompletion();
2058   completion->set_complete_callback(0, flush_read_race_cb);
2059   read_ioctx->aio_operate("foo", completion, &op, NULL);
2060 }
2061
2062 void flush_read_race_cb(completion_t cb, void *arg)
2063 {
2064   //cout << " finished read" << std::endl;
2065   test_lock.Lock();
2066   if (num_reads > max_reads) {
2067     num_reads--;
2068     cond.Signal();
2069   } else {
2070     start_flush_read();
2071   }
2072   test_lock.Unlock();
2073 }
2074
2075 TEST_F(LibRadosTwoPoolsPP, TryFlushReadRace) {
2076   // configure cache
2077   bufferlist inbl;
2078   ASSERT_EQ(0, cluster.mon_command(
2079     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
2080     "\", \"tierpool\": \"" + cache_pool_name +
2081     "\", \"force_nonempty\": \"--force-nonempty\" }",
2082     inbl, NULL, NULL));
2083   ASSERT_EQ(0, cluster.mon_command(
2084     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
2085     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
2086     inbl, NULL, NULL));
2087   ASSERT_EQ(0, cluster.mon_command(
2088     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
2089     "\", \"mode\": \"writeback\"}",
2090     inbl, NULL, NULL));
2091
2092   // wait for maps to settle
2093   cluster.wait_for_latest_osdmap();
2094
2095   // create/dirty object
2096   {
2097     bufferlist bl;
2098     bl.append("hi there");
2099     bufferptr bp(4000000);  // make it big!
2100     bp.zero();
2101     bl.append(bp);
2102     ObjectWriteOperation op;
2103     op.write_full(bl);
2104     ASSERT_EQ(0, ioctx.operate("foo", &op));
2105   }
2106
2107   // start a continuous stream of reads
2108   read_ioctx = &ioctx;
2109   test_lock.Lock();
2110   for (int i = 0; i < max_reads; ++i) {
2111     start_flush_read();
2112     num_reads++;
2113   }
2114   test_lock.Unlock();
2115
2116   // try-flush
2117   ObjectReadOperation op;
2118   op.cache_try_flush();
2119   librados::AioCompletion *completion = cluster.aio_create_completion();
2120   ASSERT_EQ(0, cache_ioctx.aio_operate(
2121       "foo", completion, &op,
2122       librados::OPERATION_IGNORE_OVERLAY |
2123       librados::OPERATION_SKIPRWLOCKS, NULL));
2124
2125   completion->wait_for_safe();
2126   ASSERT_EQ(0, completion->get_return_value());
2127   completion->release();
2128
2129   // stop reads
2130   test_lock.Lock();
2131   max_reads = 0;
2132   while (num_reads > 0)
2133     cond.Wait(test_lock);
2134   test_lock.Unlock();
2135 }
2136
2137 TEST_F(LibRadosTierPP, HitSetNone) {
2138   {
2139     list< pair<time_t,time_t> > ls;
2140     AioCompletion *c = librados::Rados::aio_create_completion();
2141     ASSERT_EQ(0, ioctx.hit_set_list(123, c, &ls));
2142     c->wait_for_complete();
2143     ASSERT_EQ(0, c->get_return_value());
2144     ASSERT_TRUE(ls.empty());
2145     c->release();
2146   }
2147   {
2148     bufferlist bl;
2149     AioCompletion *c = librados::Rados::aio_create_completion();
2150     ASSERT_EQ(0, ioctx.hit_set_get(123, c, 12345, &bl));
2151     c->wait_for_complete();
2152     ASSERT_EQ(-ENOENT, c->get_return_value());
2153     c->release();
2154   }
2155 }
2156
2157 string set_pool_str(string pool, string var, string val)
2158 {
2159   return string("{\"prefix\": \"osd pool set\",\"pool\":\"") + pool
2160     + string("\",\"var\": \"") + var + string("\",\"val\": \"")
2161     + val + string("\"}");
2162 }
2163
2164 string set_pool_str(string pool, string var, int val)
2165 {
2166   return string("{\"prefix\": \"osd pool set\",\"pool\":\"") + pool
2167     + string("\",\"var\": \"") + var + string("\",\"val\": \"")
2168     + stringify(val) + string("\"}");
2169 }
2170
2171 TEST_F(LibRadosTwoPoolsPP, HitSetRead) {
2172   // make it a tier
2173   bufferlist inbl;
2174   ASSERT_EQ(0, cluster.mon_command(
2175     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
2176     "\", \"tierpool\": \"" + cache_pool_name +
2177     "\", \"force_nonempty\": \"--force-nonempty\" }",
2178     inbl, NULL, NULL));
2179
2180   // enable hitset tracking for this pool
2181   ASSERT_EQ(0, cluster.mon_command(set_pool_str(cache_pool_name, "hit_set_count", 2),
2182                                                 inbl, NULL, NULL));
2183   ASSERT_EQ(0, cluster.mon_command(set_pool_str(cache_pool_name, "hit_set_period", 600),
2184                                                 inbl, NULL, NULL));
2185   ASSERT_EQ(0, cluster.mon_command(set_pool_str(cache_pool_name, "hit_set_type",
2186                                                 "explicit_object"),
2187                                    inbl, NULL, NULL));
2188
2189   // wait for maps to settle
2190   cluster.wait_for_latest_osdmap();
2191
2192   cache_ioctx.set_namespace("");
2193
2194   // keep reading until we see our object appear in the HitSet
2195   utime_t start = ceph_clock_now();
2196   utime_t hard_stop = start + utime_t(600, 0);
2197
2198   while (true) {
2199     utime_t now = ceph_clock_now();
2200     ASSERT_TRUE(now < hard_stop);
2201
2202     string name = "foo";
2203     uint32_t hash; 
2204     ASSERT_EQ(0, cache_ioctx.get_object_hash_position2(name, &hash));
2205     hobject_t oid(sobject_t(name, CEPH_NOSNAP), "", hash,
2206                   cluster.pool_lookup(cache_pool_name.c_str()), "");
2207
2208     bufferlist bl;
2209     ASSERT_EQ(-ENOENT, cache_ioctx.read("foo", bl, 1, 0));
2210
2211     bufferlist hbl;
2212     AioCompletion *c = librados::Rados::aio_create_completion();
2213     ASSERT_EQ(0, cache_ioctx.hit_set_get(hash, c, now.sec(), &hbl));
2214     c->wait_for_complete();
2215     c->release();
2216
2217     if (hbl.length()) {
2218       bufferlist::iterator p = hbl.begin();
2219       HitSet hs;
2220       ::decode(hs, p);
2221       if (hs.contains(oid)) {
2222         cout << "ok, hit_set contains " << oid << std::endl;
2223         break;
2224       }
2225       cout << "hmm, not in HitSet yet" << std::endl;
2226     } else {
2227       cout << "hmm, no HitSet yet" << std::endl;
2228     }
2229
2230     sleep(1);
2231   }
2232 }
2233
2234 static int _get_pg_num(Rados& cluster, string pool_name)
2235 {
2236   bufferlist inbl;
2237   string cmd = string("{\"prefix\": \"osd pool get\",\"pool\":\"")
2238     + pool_name
2239     + string("\",\"var\": \"pg_num\",\"format\": \"json\"}");
2240   bufferlist outbl;
2241   int r = cluster.mon_command(cmd, inbl, &outbl, NULL);
2242   assert(r >= 0);
2243   string outstr(outbl.c_str(), outbl.length());
2244   json_spirit::Value v;
2245   if (!json_spirit::read(outstr, v)) {
2246     cerr <<" unable to parse json " << outstr << std::endl;
2247     return -1;
2248   }
2249
2250   json_spirit::Object& o = v.get_obj();
2251   for (json_spirit::Object::size_type i=0; i<o.size(); i++) {
2252     json_spirit::Pair& p = o[i];
2253     if (p.name_ == "pg_num") {
2254       cout << "pg_num = " << p.value_.get_int() << std::endl;
2255       return p.value_.get_int();
2256     }
2257   }
2258   cerr << "didn't find pg_num in " << outstr << std::endl;
2259   return -1;
2260 }
2261
2262
2263 TEST_F(LibRadosTwoPoolsPP, HitSetWrite) {
2264   int num_pg = _get_pg_num(cluster, pool_name);
2265   assert(num_pg > 0);
2266
2267   // make it a tier
2268   bufferlist inbl;
2269   ASSERT_EQ(0, cluster.mon_command(
2270     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
2271     "\", \"tierpool\": \"" + cache_pool_name +
2272     "\", \"force_nonempty\": \"--force-nonempty\" }",
2273     inbl, NULL, NULL));
2274
2275   // enable hitset tracking for this pool
2276   ASSERT_EQ(0, cluster.mon_command(set_pool_str(cache_pool_name, "hit_set_count", 8),
2277                                                 inbl, NULL, NULL));
2278   ASSERT_EQ(0, cluster.mon_command(set_pool_str(cache_pool_name, "hit_set_period", 600),
2279                                                 inbl, NULL, NULL));
2280   ASSERT_EQ(0, cluster.mon_command(set_pool_str(cache_pool_name, "hit_set_type",
2281                                                 "explicit_hash"),
2282                                    inbl, NULL, NULL));
2283
2284   // wait for maps to settle
2285   cluster.wait_for_latest_osdmap();
2286
2287   cache_ioctx.set_namespace("");
2288
2289   int num = 200;
2290
2291   // do a bunch of writes
2292   for (int i=0; i<num; ++i) {
2293     bufferlist bl;
2294     bl.append("a");
2295     ASSERT_EQ(0, cache_ioctx.write(stringify(i), bl, 1, 0));
2296   }
2297
2298   // get HitSets
2299   std::map<int,HitSet> hitsets;
2300   for (int i=0; i<num_pg; ++i) {
2301     list< pair<time_t,time_t> > ls;
2302     AioCompletion *c = librados::Rados::aio_create_completion();
2303     ASSERT_EQ(0, cache_ioctx.hit_set_list(i, c, &ls));
2304     c->wait_for_complete();
2305     c->release();
2306     std::cout << "pg " << i << " ls " << ls << std::endl;
2307     ASSERT_FALSE(ls.empty());
2308
2309     // get the latest
2310     c = librados::Rados::aio_create_completion();
2311     bufferlist bl;
2312     ASSERT_EQ(0, cache_ioctx.hit_set_get(i, c, ls.back().first, &bl));
2313     c->wait_for_complete();
2314     c->release();
2315
2316     try {
2317       bufferlist::iterator p = bl.begin();
2318       ::decode(hitsets[i], p);
2319     }
2320     catch (buffer::error& e) {
2321       std::cout << "failed to decode hit set; bl len is " << bl.length() << "\n";
2322       bl.hexdump(std::cout);
2323       std::cout << std::endl;
2324       throw e;
2325     }
2326
2327     // cope with racing splits by refreshing pg_num
2328     if (i == num_pg - 1)
2329       num_pg = _get_pg_num(cluster, cache_pool_name);
2330   }
2331
2332   for (int i=0; i<num; ++i) {
2333     string n = stringify(i);
2334     uint32_t hash;
2335     ASSERT_EQ(0, cache_ioctx.get_object_hash_position2(n, &hash));
2336     hobject_t oid(sobject_t(n, CEPH_NOSNAP), "", hash,
2337                   cluster.pool_lookup(cache_pool_name.c_str()), "");
2338     std::cout << "checking for " << oid << std::endl;
2339     bool found = false;
2340     for (int p=0; p<num_pg; ++p) {
2341       if (hitsets[p].contains(oid)) {
2342         found = true;
2343         break;
2344       }
2345     }
2346     ASSERT_TRUE(found);
2347   }
2348 }
2349
2350 TEST_F(LibRadosTwoPoolsPP, HitSetTrim) {
2351   unsigned count = 3;
2352   unsigned period = 3;
2353
2354   // make it a tier
2355   bufferlist inbl;
2356   ASSERT_EQ(0, cluster.mon_command(
2357     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
2358     "\", \"tierpool\": \"" + cache_pool_name +
2359     "\", \"force_nonempty\": \"--force-nonempty\" }",
2360     inbl, NULL, NULL));
2361
2362   // enable hitset tracking for this pool
2363   ASSERT_EQ(0, cluster.mon_command(set_pool_str(cache_pool_name, "hit_set_count", count),
2364                                                 inbl, NULL, NULL));
2365   ASSERT_EQ(0, cluster.mon_command(set_pool_str(cache_pool_name, "hit_set_period", period),
2366                                                 inbl, NULL, NULL));
2367   ASSERT_EQ(0, cluster.mon_command(set_pool_str(cache_pool_name, "hit_set_type", "bloom"),
2368                                    inbl, NULL, NULL));
2369   ASSERT_EQ(0, cluster.mon_command(set_pool_str(cache_pool_name, "hit_set_fpp", ".01"),
2370                                    inbl, NULL, NULL));
2371
2372   // wait for maps to settle
2373   cluster.wait_for_latest_osdmap();
2374
2375   cache_ioctx.set_namespace("");
2376
2377   // do a bunch of writes and make sure the hitsets rotate
2378   utime_t start = ceph_clock_now();
2379   utime_t hard_stop = start + utime_t(count * period * 50, 0);
2380
2381   time_t first = 0;
2382   while (true) {
2383     string name = "foo";
2384     uint32_t hash; 
2385     ASSERT_EQ(0, cache_ioctx.get_object_hash_position2(name, &hash));
2386     hobject_t oid(sobject_t(name, CEPH_NOSNAP), "", hash, -1, "");
2387
2388     bufferlist bl;
2389     bl.append("f");
2390     ASSERT_EQ(0, cache_ioctx.write("foo", bl, 1, 0));
2391
2392     list<pair<time_t, time_t> > ls;
2393     AioCompletion *c = librados::Rados::aio_create_completion();
2394     ASSERT_EQ(0, cache_ioctx.hit_set_list(hash, c, &ls));
2395     c->wait_for_complete();
2396     c->release();
2397
2398     cout << " got ls " << ls << std::endl;
2399     if (!ls.empty()) {
2400       if (!first) {
2401         first = ls.front().first;
2402         cout << "first is " << first << std::endl;
2403       } else {
2404         if (ls.front().first != first) {
2405           cout << "first now " << ls.front().first << ", trimmed" << std::endl;
2406           break;
2407         }
2408       }
2409     }
2410
2411     utime_t now = ceph_clock_now();
2412     ASSERT_TRUE(now < hard_stop);
2413
2414     sleep(1);
2415   }
2416 }
2417
2418 TEST_F(LibRadosTwoPoolsPP, PromoteOn2ndRead) {
2419   // create object
2420   for (int i=0; i<20; ++i) {
2421     bufferlist bl;
2422     bl.append("hi there");
2423     ObjectWriteOperation op;
2424     op.write_full(bl);
2425     ASSERT_EQ(0, ioctx.operate("foo" + stringify(i), &op));
2426   }
2427
2428   // configure cache
2429   bufferlist inbl;
2430   ASSERT_EQ(0, cluster.mon_command(
2431     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
2432     "\", \"tierpool\": \"" + cache_pool_name +
2433     "\", \"force_nonempty\": \"--force-nonempty\" }",
2434     inbl, NULL, NULL));
2435   ASSERT_EQ(0, cluster.mon_command(
2436     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
2437     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
2438     inbl, NULL, NULL));
2439   ASSERT_EQ(0, cluster.mon_command(
2440     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
2441     "\", \"mode\": \"writeback\"}",
2442     inbl, NULL, NULL));
2443
2444   // enable hitset tracking for this pool
2445   ASSERT_EQ(0, cluster.mon_command(
2446     set_pool_str(cache_pool_name, "hit_set_count", 2),
2447     inbl, NULL, NULL));
2448   ASSERT_EQ(0, cluster.mon_command(
2449     set_pool_str(cache_pool_name, "hit_set_period", 600),
2450     inbl, NULL, NULL));
2451   ASSERT_EQ(0, cluster.mon_command(
2452     set_pool_str(cache_pool_name, "hit_set_type", "bloom"),
2453     inbl, NULL, NULL));
2454   ASSERT_EQ(0, cluster.mon_command(
2455     set_pool_str(cache_pool_name, "min_read_recency_for_promote", 1),
2456     inbl, NULL, NULL));
2457   ASSERT_EQ(0, cluster.mon_command(
2458     set_pool_str(cache_pool_name, "hit_set_grade_decay_rate", 20),
2459     inbl, NULL, NULL));
2460   ASSERT_EQ(0, cluster.mon_command(
2461     set_pool_str(cache_pool_name, "hit_set_search_last_n", 1),
2462     inbl, NULL, NULL));
2463
2464   // wait for maps to settle
2465   cluster.wait_for_latest_osdmap();
2466
2467   int fake = 0;  // set this to non-zero to test spurious promotion,
2468                  // e.g. from thrashing
2469   int attempt = 0;
2470   string obj;
2471   while (true) {
2472     // 1st read, don't trigger a promote
2473     obj = "foo" + stringify(attempt);
2474     cout << obj << std::endl;
2475     {
2476       bufferlist bl;
2477       ASSERT_EQ(1, ioctx.read(obj.c_str(), bl, 1, 0));
2478       if (--fake >= 0) {
2479         sleep(1);
2480         ASSERT_EQ(1, ioctx.read(obj.c_str(), bl, 1, 0));
2481         sleep(1);
2482       }
2483     }
2484
2485     // verify the object is NOT present in the cache tier
2486     {
2487       bool found = false;
2488       NObjectIterator it = cache_ioctx.nobjects_begin();
2489       while (it != cache_ioctx.nobjects_end()) {
2490         cout << " see " << it->get_oid() << std::endl;
2491         if (it->get_oid() == string(obj.c_str())) {
2492           found = true;
2493           break;
2494         }
2495         ++it;
2496       }
2497       if (!found)
2498         break;
2499     }
2500
2501     ++attempt;
2502     ASSERT_LE(attempt, 20);
2503     cout << "hrm, object is present in cache on attempt " << attempt
2504          << ", retrying" << std::endl;
2505   }
2506
2507   // Read until the object is present in the cache tier
2508   cout << "verifying " << obj << " is eventually promoted" << std::endl;
2509   while (true) {
2510     bufferlist bl;
2511     ASSERT_EQ(1, ioctx.read(obj.c_str(), bl, 1, 0));
2512
2513     bool there = false;
2514     NObjectIterator it = cache_ioctx.nobjects_begin();
2515     while (it != cache_ioctx.nobjects_end()) {
2516       if (it->get_oid() == string(obj.c_str())) {
2517         there = true;
2518         break;
2519       }
2520       ++it;
2521     }
2522     if (there)
2523       break;
2524
2525     sleep(1);
2526   }
2527
2528   // tear down tiers
2529   ASSERT_EQ(0, cluster.mon_command(
2530     "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + pool_name +
2531     "\"}",
2532     inbl, NULL, NULL));
2533   ASSERT_EQ(0, cluster.mon_command(
2534     "{\"prefix\": \"osd tier remove\", \"pool\": \"" + pool_name +
2535     "\", \"tierpool\": \"" + cache_pool_name + "\"}",
2536     inbl, NULL, NULL));
2537
2538   // wait for maps to settle before next test
2539   cluster.wait_for_latest_osdmap();
2540 }
2541
2542 TEST_F(LibRadosTwoPoolsPP, ProxyRead) {
2543   // create object
2544   {
2545     bufferlist bl;
2546     bl.append("hi there");
2547     ObjectWriteOperation op;
2548     op.write_full(bl);
2549     ASSERT_EQ(0, ioctx.operate("foo", &op));
2550   }
2551
2552   // configure cache
2553   bufferlist inbl;
2554   ASSERT_EQ(0, cluster.mon_command(
2555     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
2556     "\", \"tierpool\": \"" + cache_pool_name +
2557     "\", \"force_nonempty\": \"--force-nonempty\" }",
2558     inbl, NULL, NULL));
2559   ASSERT_EQ(0, cluster.mon_command(
2560     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
2561     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
2562     inbl, NULL, NULL));
2563   ASSERT_EQ(0, cluster.mon_command(
2564     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
2565     "\", \"mode\": \"readproxy\"}",
2566     inbl, NULL, NULL));
2567
2568   // wait for maps to settle
2569   cluster.wait_for_latest_osdmap();
2570
2571   // read and verify the object
2572   {
2573     bufferlist bl;
2574     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
2575     ASSERT_EQ('h', bl[0]);
2576   }
2577
2578   // Verify 10 times the object is NOT present in the cache tier
2579   uint32_t i = 0;
2580   while (i++ < 10) {
2581     NObjectIterator it = cache_ioctx.nobjects_begin();
2582     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
2583     sleep(1);
2584   }
2585
2586   // tear down tiers
2587   ASSERT_EQ(0, cluster.mon_command(
2588     "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + pool_name +
2589     "\"}",
2590     inbl, NULL, NULL));
2591   ASSERT_EQ(0, cluster.mon_command(
2592     "{\"prefix\": \"osd tier remove\", \"pool\": \"" + pool_name +
2593     "\", \"tierpool\": \"" + cache_pool_name + "\"}",
2594     inbl, NULL, NULL));
2595
2596   // wait for maps to settle before next test
2597   cluster.wait_for_latest_osdmap();
2598 }
2599
2600 TEST_F(LibRadosTwoPoolsPP, CachePin) {
2601   // create object
2602   {
2603     bufferlist bl;
2604     bl.append("hi there");
2605     ObjectWriteOperation op;
2606     op.write_full(bl);
2607     ASSERT_EQ(0, ioctx.operate("foo", &op));
2608   }
2609   {
2610     bufferlist bl;
2611     bl.append("hi there");
2612     ObjectWriteOperation op;
2613     op.write_full(bl);
2614     ASSERT_EQ(0, ioctx.operate("bar", &op));
2615   }
2616   {
2617     bufferlist bl;
2618     bl.append("hi there");
2619     ObjectWriteOperation op;
2620     op.write_full(bl);
2621     ASSERT_EQ(0, ioctx.operate("baz", &op));
2622   }
2623   {
2624     bufferlist bl;
2625     bl.append("hi there");
2626     ObjectWriteOperation op;
2627     op.write_full(bl);
2628     ASSERT_EQ(0, ioctx.operate("bam", &op));
2629   }
2630
2631   // configure cache
2632   bufferlist inbl;
2633   ASSERT_EQ(0, cluster.mon_command(
2634     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
2635     "\", \"tierpool\": \"" + cache_pool_name +
2636     "\", \"force_nonempty\": \"--force-nonempty\" }",
2637     inbl, NULL, NULL));
2638   ASSERT_EQ(0, cluster.mon_command(
2639     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
2640     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
2641     inbl, NULL, NULL));
2642   ASSERT_EQ(0, cluster.mon_command(
2643     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
2644     "\", \"mode\": \"writeback\"}",
2645     inbl, NULL, NULL));
2646
2647   // wait for maps to settle
2648   cluster.wait_for_latest_osdmap();
2649
2650   // read, trigger promote
2651   {
2652     bufferlist bl;
2653     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
2654     ASSERT_EQ(1, ioctx.read("bar", bl, 1, 0));
2655     ASSERT_EQ(1, ioctx.read("baz", bl, 1, 0));
2656     ASSERT_EQ(1, ioctx.read("bam", bl, 1, 0));
2657   }
2658
2659   // verify the objects are present in the cache tier
2660   {
2661     NObjectIterator it = cache_ioctx.nobjects_begin();
2662     ASSERT_TRUE(it != cache_ioctx.nobjects_end());
2663     for (uint32_t i = 0; i < 4; i++) {
2664       ASSERT_TRUE(it->get_oid() == string("foo") ||
2665                   it->get_oid() == string("bar") ||
2666                   it->get_oid() == string("baz") ||
2667                   it->get_oid() == string("bam"));
2668       ++it;
2669     }
2670     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
2671   }
2672
2673   // pin objects
2674   {
2675     ObjectWriteOperation op;
2676     op.cache_pin();
2677     librados::AioCompletion *completion = cluster.aio_create_completion();
2678     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op));
2679     completion->wait_for_safe();
2680     ASSERT_EQ(0, completion->get_return_value());
2681     completion->release();
2682   }
2683   {
2684     ObjectWriteOperation op;
2685     op.cache_pin();
2686     librados::AioCompletion *completion = cluster.aio_create_completion();
2687     ASSERT_EQ(0, cache_ioctx.aio_operate("baz", completion, &op));
2688     completion->wait_for_safe();
2689     ASSERT_EQ(0, completion->get_return_value());
2690     completion->release();
2691   }
2692
2693   // enable agent
2694   ASSERT_EQ(0, cluster.mon_command(
2695     set_pool_str(cache_pool_name, "hit_set_count", 2),
2696     inbl, NULL, NULL));
2697   ASSERT_EQ(0, cluster.mon_command(
2698     set_pool_str(cache_pool_name, "hit_set_period", 600),
2699     inbl, NULL, NULL));
2700   ASSERT_EQ(0, cluster.mon_command(
2701     set_pool_str(cache_pool_name, "hit_set_type", "bloom"),
2702     inbl, NULL, NULL));
2703   ASSERT_EQ(0, cluster.mon_command(
2704     set_pool_str(cache_pool_name, "min_read_recency_for_promote", 1),
2705     inbl, NULL, NULL));
2706   ASSERT_EQ(0, cluster.mon_command(
2707     set_pool_str(cache_pool_name, "target_max_objects", 1),
2708     inbl, NULL, NULL));
2709
2710   sleep(10);
2711
2712   // Verify the pinned object 'foo' is not flushed/evicted
2713   uint32_t count = 0;
2714   while (true) {
2715     bufferlist bl;
2716     ASSERT_EQ(1, ioctx.read("baz", bl, 1, 0));
2717
2718     count = 0;
2719     NObjectIterator it = cache_ioctx.nobjects_begin();
2720     while (it != cache_ioctx.nobjects_end()) {
2721       ASSERT_TRUE(it->get_oid() == string("foo") ||
2722                   it->get_oid() == string("bar") ||
2723                   it->get_oid() == string("baz") ||
2724                   it->get_oid() == string("bam"));
2725       ++count;
2726       ++it;
2727     }
2728     if (count == 2) {
2729       ASSERT_TRUE(it->get_oid() == string("foo") ||
2730                   it->get_oid() == string("baz"));
2731       break;
2732     }
2733
2734     sleep(1);
2735   }
2736
2737   // tear down tiers
2738   ASSERT_EQ(0, cluster.mon_command(
2739     "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + pool_name +
2740     "\"}",
2741     inbl, NULL, NULL));
2742   ASSERT_EQ(0, cluster.mon_command(
2743     "{\"prefix\": \"osd tier remove\", \"pool\": \"" + pool_name +
2744     "\", \"tierpool\": \"" + cache_pool_name + "\"}",
2745     inbl, NULL, NULL));
2746
2747   // wait for maps to settle before next test
2748   cluster.wait_for_latest_osdmap();
2749 }
2750
2751 TEST_F(LibRadosTwoPoolsPP, SetRedirectRead) {
2752   // skip test if not yet luminous
2753   {
2754     bufferlist inbl, outbl;
2755     ASSERT_EQ(0, cluster.mon_command(
2756                 "{\"prefix\": \"osd dump\"}",
2757                 inbl, &outbl, NULL));
2758     string s(outbl.c_str(), outbl.length());
2759     if (s.find("luminous") == std::string::npos) {
2760       cout << "cluster is not yet luminous, skipping test" << std::endl;
2761       return;
2762     }
2763   }
2764
2765   // create object
2766   {
2767     bufferlist bl;
2768     bl.append("hi there");
2769     ObjectWriteOperation op;
2770     op.write_full(bl);
2771     ASSERT_EQ(0, ioctx.operate("foo", &op));
2772   }
2773   {
2774     bufferlist bl;
2775     bl.append("there");
2776     ObjectWriteOperation op;
2777     op.write_full(bl);
2778     ASSERT_EQ(0, cache_ioctx.operate("bar", &op));
2779   }
2780
2781   // configure tier
2782   bufferlist inbl;
2783   ASSERT_EQ(0, cluster.mon_command(
2784     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
2785     "\", \"tierpool\": \"" + cache_pool_name +
2786     "\", \"force_nonempty\": \"--force-nonempty\" }",
2787     inbl, NULL, NULL));
2788
2789   // wait for maps to settle
2790   cluster.wait_for_latest_osdmap();
2791
2792   {
2793     ObjectWriteOperation op;
2794     op.set_redirect("bar", cache_ioctx, 0);
2795     librados::AioCompletion *completion = cluster.aio_create_completion();
2796     ASSERT_EQ(0, ioctx.aio_operate("foo", completion, &op));
2797     completion->wait_for_safe();
2798     ASSERT_EQ(0, completion->get_return_value());
2799     completion->release();
2800   }
2801   // read and verify the object
2802   {
2803     bufferlist bl;
2804     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
2805     ASSERT_EQ('t', bl[0]);
2806   }
2807
2808   ASSERT_EQ(0, cluster.mon_command(
2809     "{\"prefix\": \"osd tier remove\", \"pool\": \"" + pool_name +
2810     "\", \"tierpool\": \"" + cache_pool_name + "\"}",
2811     inbl, NULL, NULL));
2812
2813   // wait for maps to settle before next test
2814   cluster.wait_for_latest_osdmap();
2815 }
2816
2817 class LibRadosTwoPoolsECPP : public RadosTestECPP
2818 {
2819 public:
2820   LibRadosTwoPoolsECPP() {};
2821   ~LibRadosTwoPoolsECPP() override {};
2822 protected:
2823   static void SetUpTestCase() {
2824     pool_name = get_temp_pool_name();
2825     ASSERT_EQ("", create_one_ec_pool_pp(pool_name, s_cluster));
2826   }
2827   static void TearDownTestCase() {
2828     ASSERT_EQ(0, destroy_one_ec_pool_pp(pool_name, s_cluster));
2829   }
2830   static std::string cache_pool_name;
2831
2832   void SetUp() override {
2833     cache_pool_name = get_temp_pool_name();
2834     ASSERT_EQ(0, s_cluster.pool_create(cache_pool_name.c_str()));
2835     RadosTestECPP::SetUp();
2836
2837     ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx));
2838     cache_ioctx.application_enable("rados", true);
2839     cache_ioctx.set_namespace(nspace);
2840   }
2841   void TearDown() override {
2842     // flush + evict cache
2843     flush_evict_all(cluster, cache_ioctx);
2844
2845     bufferlist inbl;
2846     // tear down tiers
2847     ASSERT_EQ(0, cluster.mon_command(
2848       "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + pool_name +
2849       "\"}",
2850       inbl, NULL, NULL));
2851     ASSERT_EQ(0, cluster.mon_command(
2852       "{\"prefix\": \"osd tier remove\", \"pool\": \"" + pool_name +
2853       "\", \"tierpool\": \"" + cache_pool_name + "\"}",
2854     inbl, NULL, NULL));
2855
2856     // wait for maps to settle before next test
2857     cluster.wait_for_latest_osdmap();
2858
2859     RadosTestECPP::TearDown();
2860
2861     cleanup_default_namespace(cache_ioctx);
2862     cleanup_namespace(cache_ioctx, nspace);
2863
2864     cache_ioctx.close();
2865     ASSERT_EQ(0, s_cluster.pool_delete(cache_pool_name.c_str()));
2866   }
2867
2868   librados::IoCtx cache_ioctx;
2869 };
2870
2871 std::string LibRadosTwoPoolsECPP::cache_pool_name;
2872
2873 TEST_F(LibRadosTierECPP, Dirty) {
2874   {
2875     ObjectWriteOperation op;
2876     op.undirty();
2877     ASSERT_EQ(0, ioctx.operate("foo", &op)); // still get 0 if it dne
2878   }
2879   {
2880     ObjectWriteOperation op;
2881     op.create(true);
2882     ASSERT_EQ(0, ioctx.operate("foo", &op));
2883   }
2884   {
2885     bool dirty = false;
2886     int r = -1;
2887     ObjectReadOperation op;
2888     op.is_dirty(&dirty, &r);
2889     ASSERT_EQ(0, ioctx.operate("foo", &op, NULL));
2890     ASSERT_TRUE(dirty);
2891     ASSERT_EQ(0, r);
2892   }
2893   {
2894     ObjectWriteOperation op;
2895     op.undirty();
2896     ASSERT_EQ(0, ioctx.operate("foo", &op));
2897   }
2898   {
2899     ObjectWriteOperation op;
2900     op.undirty();
2901     ASSERT_EQ(0, ioctx.operate("foo", &op));  // still 0 if already clean
2902   }
2903   {
2904     bool dirty = false;
2905     int r = -1;
2906     ObjectReadOperation op;
2907     op.is_dirty(&dirty, &r);
2908     ASSERT_EQ(0, ioctx.operate("foo", &op, NULL));
2909     ASSERT_FALSE(dirty);
2910     ASSERT_EQ(0, r);
2911   }
2912   //{
2913   //  ObjectWriteOperation op;
2914   //  op.truncate(0);  // still a write even tho it is a no-op
2915   //  ASSERT_EQ(0, ioctx.operate("foo", &op));
2916   //}
2917   //{
2918   //  bool dirty = false;
2919   //  int r = -1;
2920   //  ObjectReadOperation op;
2921   //  op.is_dirty(&dirty, &r);
2922   //  ASSERT_EQ(0, ioctx.operate("foo", &op, NULL));
2923   //  ASSERT_TRUE(dirty);
2924   //  ASSERT_EQ(0, r);
2925   //}
2926 }
2927
2928 TEST_F(LibRadosTwoPoolsECPP, Overlay) {
2929   // create objects
2930   {
2931     bufferlist bl;
2932     bl.append("base");
2933     ObjectWriteOperation op;
2934     op.write_full(bl);
2935     ASSERT_EQ(0, ioctx.operate("foo", &op));
2936   }
2937   {
2938     bufferlist bl;
2939     bl.append("cache");
2940     ObjectWriteOperation op;
2941     op.write_full(bl);
2942     ASSERT_EQ(0, cache_ioctx.operate("foo", &op));
2943   }
2944
2945   // configure cache
2946   bufferlist inbl;
2947   ASSERT_EQ(0, cluster.mon_command(
2948     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
2949     "\", \"tierpool\": \"" + cache_pool_name +
2950     "\", \"force_nonempty\": \"--force-nonempty\" }",
2951     inbl, NULL, NULL));
2952   ASSERT_EQ(0, cluster.mon_command(
2953     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
2954     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
2955     inbl, NULL, NULL));
2956
2957   // wait for maps to settle
2958   cluster.wait_for_latest_osdmap();
2959
2960   // by default, the overlay sends us to cache pool
2961   {
2962     bufferlist bl;
2963     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
2964     ASSERT_EQ('c', bl[0]);
2965   }
2966   {
2967     bufferlist bl;
2968     ASSERT_EQ(1, cache_ioctx.read("foo", bl, 1, 0));
2969     ASSERT_EQ('c', bl[0]);
2970   }
2971
2972   // unless we say otherwise
2973   {
2974     bufferlist bl;
2975     ObjectReadOperation op;
2976     op.read(0, 1, &bl, NULL);
2977     librados::AioCompletion *completion = cluster.aio_create_completion();
2978     ASSERT_EQ(0, ioctx.aio_operate(
2979         "foo", completion, &op,
2980         librados::OPERATION_IGNORE_OVERLAY, NULL));
2981     completion->wait_for_safe();
2982     ASSERT_EQ(0, completion->get_return_value());
2983     completion->release();
2984     ASSERT_EQ('b', bl[0]);
2985   }
2986 }
2987
2988 TEST_F(LibRadosTwoPoolsECPP, Promote) {
2989   // create object
2990   {
2991     bufferlist bl;
2992     bl.append("hi there");
2993     ObjectWriteOperation op;
2994     op.write_full(bl);
2995     ASSERT_EQ(0, ioctx.operate("foo", &op));
2996   }
2997
2998   // configure cache
2999   bufferlist inbl;
3000   ASSERT_EQ(0, cluster.mon_command(
3001     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
3002     "\", \"tierpool\": \"" + cache_pool_name +
3003     "\", \"force_nonempty\": \"--force-nonempty\" }",
3004     inbl, NULL, NULL));
3005   ASSERT_EQ(0, cluster.mon_command(
3006     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
3007     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
3008     inbl, NULL, NULL));
3009   ASSERT_EQ(0, cluster.mon_command(
3010     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
3011     "\", \"mode\": \"writeback\"}",
3012     inbl, NULL, NULL));
3013
3014   // wait for maps to settle
3015   cluster.wait_for_latest_osdmap();
3016
3017   // read, trigger a promote
3018   {
3019     bufferlist bl;
3020     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
3021   }
3022
3023   // read, trigger a whiteout
3024   {
3025     bufferlist bl;
3026     ASSERT_EQ(-ENOENT, ioctx.read("bar", bl, 1, 0));
3027     ASSERT_EQ(-ENOENT, ioctx.read("bar", bl, 1, 0));
3028   }
3029
3030   // verify the object is present in the cache tier
3031   {
3032     NObjectIterator it = cache_ioctx.nobjects_begin();
3033     ASSERT_TRUE(it != cache_ioctx.nobjects_end());
3034     ASSERT_TRUE(it->get_oid() == string("foo") || it->get_oid() == string("bar"));
3035     ++it;
3036     ASSERT_TRUE(it->get_oid() == string("foo") || it->get_oid() == string("bar"));
3037     ++it;
3038     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
3039   }
3040 }
3041
3042 TEST_F(LibRadosTwoPoolsECPP, PromoteSnap) {
3043   // create object
3044   {
3045     bufferlist bl;
3046     bl.append("hi there");
3047     ObjectWriteOperation op;
3048     op.write_full(bl);
3049     ASSERT_EQ(0, ioctx.operate("foo", &op));
3050   }
3051   {
3052     bufferlist bl;
3053     bl.append("hi there");
3054     ObjectWriteOperation op;
3055     op.write_full(bl);
3056     ASSERT_EQ(0, ioctx.operate("bar", &op));
3057   }
3058   {
3059     bufferlist bl;
3060     bl.append("hi there");
3061     ObjectWriteOperation op;
3062     op.write_full(bl);
3063     ASSERT_EQ(0, ioctx.operate("baz", &op));
3064   }
3065   {
3066     bufferlist bl;
3067     bl.append("hi there");
3068     ObjectWriteOperation op;
3069     op.write_full(bl);
3070     ASSERT_EQ(0, ioctx.operate("bam", &op));
3071   }
3072
3073   // create a snapshot, clone
3074   vector<uint64_t> my_snaps(1);
3075   ASSERT_EQ(0, ioctx.selfmanaged_snap_create(&my_snaps[0]));
3076   ASSERT_EQ(0, ioctx.selfmanaged_snap_set_write_ctx(my_snaps[0],
3077                                                          my_snaps));
3078   {
3079     bufferlist bl;
3080     bl.append("ciao!");
3081     ObjectWriteOperation op;
3082     op.write_full(bl);
3083     ASSERT_EQ(0, ioctx.operate("foo", &op));
3084   }
3085   {
3086     bufferlist bl;
3087     bl.append("ciao!");
3088     ObjectWriteOperation op;
3089     op.write_full(bl);
3090     ASSERT_EQ(0, ioctx.operate("bar", &op));
3091   }
3092   {
3093     ObjectWriteOperation op;
3094     op.remove();
3095     ASSERT_EQ(0, ioctx.operate("baz", &op));
3096   }
3097   {
3098     bufferlist bl;
3099     bl.append("ciao!");
3100     ObjectWriteOperation op;
3101     op.write_full(bl);
3102     ASSERT_EQ(0, ioctx.operate("bam", &op));
3103   }
3104
3105   // configure cache
3106   bufferlist inbl;
3107   ASSERT_EQ(0, cluster.mon_command(
3108     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
3109     "\", \"tierpool\": \"" + cache_pool_name +
3110     "\", \"force_nonempty\": \"--force-nonempty\" }",
3111     inbl, NULL, NULL));
3112   ASSERT_EQ(0, cluster.mon_command(
3113     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
3114     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
3115     inbl, NULL, NULL));
3116   ASSERT_EQ(0, cluster.mon_command(
3117     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
3118     "\", \"mode\": \"writeback\"}",
3119     inbl, NULL, NULL));
3120
3121   // wait for maps to settle
3122   cluster.wait_for_latest_osdmap();
3123
3124   // read, trigger a promote on the head
3125   {
3126     bufferlist bl;
3127     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
3128     ASSERT_EQ('c', bl[0]);
3129   }
3130   {
3131     bufferlist bl;
3132     ASSERT_EQ(1, ioctx.read("bam", bl, 1, 0));
3133     ASSERT_EQ('c', bl[0]);
3134   }
3135
3136   ioctx.snap_set_read(my_snaps[0]);
3137
3138   // stop and scrub this pg (to make sure scrub can handle missing
3139   // clones in the cache tier)
3140   // This test requires cache tier and base tier to have the same pg_num/pgp_num
3141   {
3142     for (int tries = 0; tries < 5; ++tries) {
3143       IoCtx cache_ioctx;
3144       ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx));
3145       uint32_t hash;
3146       ASSERT_EQ(0, ioctx.get_object_pg_hash_position2("foo", &hash));
3147       ostringstream ss;
3148       ss << "{\"prefix\": \"pg scrub\", \"pgid\": \""
3149          << cache_ioctx.get_id() << "."
3150          << hash
3151          << "\"}";
3152       int r = cluster.mon_command(ss.str(), inbl, NULL, NULL);
3153       if (r == -EAGAIN ||
3154           r == -ENOENT) {  // in case mgr osdmap is a bit stale
3155         sleep(5);
3156         continue;
3157       }
3158       ASSERT_EQ(0, r);
3159       break;
3160     }
3161     // give it a few seconds to go.  this is sloppy but is usually enough time
3162     cout << "waiting for scrub..." << std::endl;
3163     sleep(15);
3164     cout << "done waiting" << std::endl;
3165   }
3166
3167   // read foo snap
3168   {
3169     bufferlist bl;
3170     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
3171     ASSERT_EQ('h', bl[0]);
3172   }
3173
3174   // read bar snap
3175   {
3176     bufferlist bl;
3177     ASSERT_EQ(1, ioctx.read("bar", bl, 1, 0));
3178     ASSERT_EQ('h', bl[0]);
3179   }
3180
3181   // read baz snap
3182   {
3183     bufferlist bl;
3184     ASSERT_EQ(1, ioctx.read("baz", bl, 1, 0));
3185     ASSERT_EQ('h', bl[0]);
3186   }
3187
3188   ioctx.snap_set_read(librados::SNAP_HEAD);
3189
3190   // read foo
3191   {
3192     bufferlist bl;
3193     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
3194     ASSERT_EQ('c', bl[0]);
3195   }
3196
3197   // read bar
3198   {
3199     bufferlist bl;
3200     ASSERT_EQ(1, ioctx.read("bar", bl, 1, 0));
3201     ASSERT_EQ('c', bl[0]);
3202   }
3203
3204   // read baz
3205   {
3206     bufferlist bl;
3207     ASSERT_EQ(-ENOENT, ioctx.read("baz", bl, 1, 0));
3208   }
3209
3210   // cleanup
3211   ioctx.selfmanaged_snap_remove(my_snaps[0]);
3212 }
3213
3214 TEST_F(LibRadosTwoPoolsECPP, PromoteSnapTrimRace) {
3215   // create object
3216   {
3217     bufferlist bl;
3218     bl.append("hi there");
3219     ObjectWriteOperation op;
3220     op.write_full(bl);
3221     ASSERT_EQ(0, ioctx.operate("foo", &op));
3222   }
3223
3224   // create a snapshot, clone
3225   vector<uint64_t> my_snaps(1);
3226   ASSERT_EQ(0, ioctx.selfmanaged_snap_create(&my_snaps[0]));
3227   ASSERT_EQ(0, ioctx.selfmanaged_snap_set_write_ctx(my_snaps[0],
3228                                                          my_snaps));
3229   {
3230     bufferlist bl;
3231     bl.append("ciao!");
3232     ObjectWriteOperation op;
3233     op.write_full(bl);
3234     ASSERT_EQ(0, ioctx.operate("foo", &op));
3235   }
3236
3237   // configure cache
3238   bufferlist inbl;
3239   ASSERT_EQ(0, cluster.mon_command(
3240     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
3241     "\", \"tierpool\": \"" + cache_pool_name +
3242     "\", \"force_nonempty\": \"--force-nonempty\" }",
3243     inbl, NULL, NULL));
3244   ASSERT_EQ(0, cluster.mon_command(
3245     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
3246     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
3247     inbl, NULL, NULL));
3248   ASSERT_EQ(0, cluster.mon_command(
3249     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
3250     "\", \"mode\": \"writeback\"}",
3251     inbl, NULL, NULL));
3252
3253   // wait for maps to settle
3254   cluster.wait_for_latest_osdmap();
3255
3256   // delete the snap
3257   ASSERT_EQ(0, ioctx.selfmanaged_snap_remove(my_snaps[0]));
3258
3259   ioctx.snap_set_read(my_snaps[0]);
3260
3261   // read foo snap
3262   {
3263     bufferlist bl;
3264     ASSERT_EQ(-ENOENT, ioctx.read("foo", bl, 1, 0));
3265   }
3266
3267   // cleanup
3268   ioctx.selfmanaged_snap_remove(my_snaps[0]);
3269 }
3270
3271 TEST_F(LibRadosTwoPoolsECPP, Whiteout) {
3272   // create object
3273   {
3274     bufferlist bl;
3275     bl.append("hi there");
3276     ObjectWriteOperation op;
3277     op.write_full(bl);
3278     ASSERT_EQ(0, ioctx.operate("foo", &op));
3279   }
3280
3281   // configure cache
3282   bufferlist inbl;
3283   ASSERT_EQ(0, cluster.mon_command(
3284     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
3285     "\", \"tierpool\": \"" + cache_pool_name +
3286     "\", \"force_nonempty\": \"--force-nonempty\" }",
3287     inbl, NULL, NULL));
3288   ASSERT_EQ(0, cluster.mon_command(
3289     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
3290     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
3291     inbl, NULL, NULL));
3292   ASSERT_EQ(0, cluster.mon_command(
3293     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
3294     "\", \"mode\": \"writeback\"}",
3295     inbl, NULL, NULL));
3296
3297   // wait for maps to settle
3298   cluster.wait_for_latest_osdmap();
3299
3300   // create some whiteouts, verify they behave
3301   {
3302     ObjectWriteOperation op;
3303     op.assert_exists();
3304     op.remove();
3305     ASSERT_EQ(0, ioctx.operate("foo", &op));
3306   }
3307
3308   {
3309     ObjectWriteOperation op;
3310     op.assert_exists();
3311     op.remove();
3312     ASSERT_EQ(-ENOENT, ioctx.operate("bar", &op));
3313   }
3314   {
3315     ObjectWriteOperation op;
3316     op.assert_exists();
3317     op.remove();
3318     ASSERT_EQ(-ENOENT, ioctx.operate("bar", &op));
3319   }
3320
3321   // verify the whiteouts are there in the cache tier
3322   {
3323     NObjectIterator it = cache_ioctx.nobjects_begin();
3324     ASSERT_TRUE(it != cache_ioctx.nobjects_end());
3325     ASSERT_TRUE(it->get_oid() == string("foo") || it->get_oid() == string("bar"));
3326     ++it;
3327     ASSERT_TRUE(it->get_oid() == string("foo") || it->get_oid() == string("bar"));
3328     ++it;
3329     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
3330   }
3331
3332   // delete a whiteout and verify it goes away
3333   ASSERT_EQ(-ENOENT, ioctx.remove("foo"));
3334   {
3335     ObjectWriteOperation op;
3336     op.remove();
3337     librados::AioCompletion *completion = cluster.aio_create_completion();
3338     ASSERT_EQ(0, ioctx.aio_operate("bar", completion, &op,
3339                                    librados::OPERATION_IGNORE_CACHE));
3340     completion->wait_for_safe();
3341     ASSERT_EQ(0, completion->get_return_value());
3342     completion->release();
3343
3344     NObjectIterator it = cache_ioctx.nobjects_begin();
3345     ASSERT_TRUE(it != cache_ioctx.nobjects_end());
3346     ASSERT_TRUE(it->get_oid() == string("foo"));
3347     ++it;
3348     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
3349   }
3350
3351   // recreate an object and verify we can read it
3352   {
3353     bufferlist bl;
3354     bl.append("hi there");
3355     ObjectWriteOperation op;
3356     op.write_full(bl);
3357     ASSERT_EQ(0, ioctx.operate("foo", &op));
3358   }
3359   {
3360     bufferlist bl;
3361     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
3362     ASSERT_EQ('h', bl[0]);
3363   }
3364 }
3365
3366 TEST_F(LibRadosTwoPoolsECPP, Evict) {
3367   // create object
3368   {
3369     bufferlist bl;
3370     bl.append("hi there");
3371     ObjectWriteOperation op;
3372     op.write_full(bl);
3373     ASSERT_EQ(0, ioctx.operate("foo", &op));
3374   }
3375
3376   // configure cache
3377   bufferlist inbl;
3378   ASSERT_EQ(0, cluster.mon_command(
3379     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
3380     "\", \"tierpool\": \"" + cache_pool_name +
3381     "\", \"force_nonempty\": \"--force-nonempty\" }",
3382     inbl, NULL, NULL));
3383   ASSERT_EQ(0, cluster.mon_command(
3384     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
3385     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
3386     inbl, NULL, NULL));
3387   ASSERT_EQ(0, cluster.mon_command(
3388     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
3389     "\", \"mode\": \"writeback\"}",
3390     inbl, NULL, NULL));
3391
3392   // wait for maps to settle
3393   cluster.wait_for_latest_osdmap();
3394
3395   // read, trigger a promote
3396   {
3397     bufferlist bl;
3398     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
3399   }
3400
3401   // read, trigger a whiteout, and a dirty object
3402   {
3403     bufferlist bl;
3404     ASSERT_EQ(-ENOENT, ioctx.read("bar", bl, 1, 0));
3405     ASSERT_EQ(-ENOENT, ioctx.read("bar", bl, 1, 0));
3406     ASSERT_EQ(0, ioctx.write("bar", bl, bl.length(), 0));
3407   }
3408
3409   // verify the object is present in the cache tier
3410   {
3411     NObjectIterator it = cache_ioctx.nobjects_begin();
3412     ASSERT_TRUE(it != cache_ioctx.nobjects_end());
3413     ASSERT_TRUE(it->get_oid() == string("foo") || it->get_oid() == string("bar"));
3414     ++it;
3415     ASSERT_TRUE(it->get_oid() == string("foo") || it->get_oid() == string("bar"));
3416     ++it;
3417     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
3418   }
3419
3420   // pin
3421   {
3422     ObjectWriteOperation op;
3423     op.cache_pin();
3424     librados::AioCompletion *completion = cluster.aio_create_completion();
3425     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op));
3426     completion->wait_for_safe();
3427     ASSERT_EQ(0, completion->get_return_value());
3428     completion->release();
3429   }
3430
3431   // evict the pinned object with -EPERM
3432   {
3433     ObjectReadOperation op;
3434     op.cache_evict();
3435     librados::AioCompletion *completion = cluster.aio_create_completion();
3436     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op,
3437                                          librados::OPERATION_IGNORE_CACHE,
3438                                          NULL));
3439     completion->wait_for_safe();
3440     ASSERT_EQ(-EPERM, completion->get_return_value());
3441     completion->release();
3442   }
3443
3444   // unpin
3445   {
3446     ObjectWriteOperation op;
3447     op.cache_unpin();
3448     librados::AioCompletion *completion = cluster.aio_create_completion();
3449     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op));
3450     completion->wait_for_safe();
3451     ASSERT_EQ(0, completion->get_return_value());
3452     completion->release();
3453   }
3454
3455   // flush
3456   {
3457     ObjectReadOperation op;
3458     op.cache_flush();
3459     librados::AioCompletion *completion = cluster.aio_create_completion();
3460     ASSERT_EQ(0, cache_ioctx.aio_operate(
3461       "foo", completion, &op,
3462       librados::OPERATION_IGNORE_OVERLAY, NULL));
3463     completion->wait_for_safe();
3464     ASSERT_EQ(0, completion->get_return_value());
3465     completion->release();
3466   }
3467
3468   // verify clean
3469   {
3470     bool dirty = false;
3471     int r = -1;
3472     ObjectReadOperation op;
3473     op.is_dirty(&dirty, &r);
3474     ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL));
3475     ASSERT_FALSE(dirty);
3476     ASSERT_EQ(0, r);
3477   }
3478
3479   // evict
3480   {
3481     ObjectReadOperation op;
3482     op.cache_evict();
3483     librados::AioCompletion *completion = cluster.aio_create_completion();
3484     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op,
3485                                          librados::OPERATION_IGNORE_CACHE,
3486                                          NULL));
3487     completion->wait_for_safe();
3488     ASSERT_EQ(0, completion->get_return_value());
3489     completion->release();
3490   }
3491   {
3492     ObjectReadOperation op;
3493     op.cache_evict();
3494     librados::AioCompletion *completion = cluster.aio_create_completion();
3495     ASSERT_EQ(0, cache_ioctx.aio_operate(
3496       "foo", completion, &op,
3497       librados::OPERATION_IGNORE_CACHE, NULL));
3498     completion->wait_for_safe();
3499     ASSERT_EQ(0, completion->get_return_value());
3500     completion->release();
3501   }
3502   {
3503     ObjectReadOperation op;
3504     op.cache_evict();
3505     librados::AioCompletion *completion = cluster.aio_create_completion();
3506     ASSERT_EQ(0, cache_ioctx.aio_operate(
3507       "bar", completion, &op,
3508       librados::OPERATION_IGNORE_CACHE, NULL));
3509     completion->wait_for_safe();
3510     ASSERT_EQ(-EBUSY, completion->get_return_value());
3511     completion->release();
3512   }
3513 }
3514
3515 TEST_F(LibRadosTwoPoolsECPP, EvictSnap) {
3516   // create object
3517   {
3518     bufferlist bl;
3519     bl.append("hi there");
3520     ObjectWriteOperation op;
3521     op.write_full(bl);
3522     ASSERT_EQ(0, ioctx.operate("foo", &op));
3523   }
3524   {
3525     bufferlist bl;
3526     bl.append("hi there");
3527     ObjectWriteOperation op;
3528     op.write_full(bl);
3529     ASSERT_EQ(0, ioctx.operate("bar", &op));
3530   }
3531   {
3532     bufferlist bl;
3533     bl.append("hi there");
3534     ObjectWriteOperation op;
3535     op.write_full(bl);
3536     ASSERT_EQ(0, ioctx.operate("baz", &op));
3537   }
3538   {
3539     bufferlist bl;
3540     bl.append("hi there");
3541     ObjectWriteOperation op;
3542     op.write_full(bl);
3543     ASSERT_EQ(0, ioctx.operate("bam", &op));
3544   }
3545
3546   // create a snapshot, clone
3547   vector<uint64_t> my_snaps(1);
3548   ASSERT_EQ(0, ioctx.selfmanaged_snap_create(&my_snaps[0]));
3549   ASSERT_EQ(0, ioctx.selfmanaged_snap_set_write_ctx(my_snaps[0],
3550                                                          my_snaps));
3551   {
3552     bufferlist bl;
3553     bl.append("ciao!");
3554     ObjectWriteOperation op;
3555     op.write_full(bl);
3556     ASSERT_EQ(0, ioctx.operate("foo", &op));
3557   }
3558   {
3559     bufferlist bl;
3560     bl.append("ciao!");
3561     ObjectWriteOperation op;
3562     op.write_full(bl);
3563     ASSERT_EQ(0, ioctx.operate("bar", &op));
3564   }
3565   {
3566     ObjectWriteOperation op;
3567     op.remove();
3568     ASSERT_EQ(0, ioctx.operate("baz", &op));
3569   }
3570   {
3571     bufferlist bl;
3572     bl.append("ciao!");
3573     ObjectWriteOperation op;
3574     op.write_full(bl);
3575     ASSERT_EQ(0, ioctx.operate("bam", &op));
3576   }
3577
3578   // configure cache
3579   bufferlist inbl;
3580   ASSERT_EQ(0, cluster.mon_command(
3581     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
3582     "\", \"tierpool\": \"" + cache_pool_name +
3583     "\", \"force_nonempty\": \"--force-nonempty\" }",
3584     inbl, NULL, NULL));
3585   ASSERT_EQ(0, cluster.mon_command(
3586     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
3587     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
3588     inbl, NULL, NULL));
3589   ASSERT_EQ(0, cluster.mon_command(
3590     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
3591     "\", \"mode\": \"writeback\"}",
3592     inbl, NULL, NULL));
3593
3594   // wait for maps to settle
3595   cluster.wait_for_latest_osdmap();
3596
3597   // read, trigger a promote on the head
3598   {
3599     bufferlist bl;
3600     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
3601     ASSERT_EQ('c', bl[0]);
3602   }
3603   {
3604     bufferlist bl;
3605     ASSERT_EQ(1, ioctx.read("bam", bl, 1, 0));
3606     ASSERT_EQ('c', bl[0]);
3607   }
3608
3609   // evict bam
3610   {
3611     ObjectReadOperation op;
3612     op.cache_evict();
3613     librados::AioCompletion *completion = cluster.aio_create_completion();
3614     ASSERT_EQ(0, cache_ioctx.aio_operate(
3615       "bam", completion, &op,
3616       librados::OPERATION_IGNORE_CACHE, NULL));
3617     completion->wait_for_safe();
3618     ASSERT_EQ(0, completion->get_return_value());
3619     completion->release();
3620   }
3621   {
3622     bufferlist bl;
3623     ObjectReadOperation op;
3624     op.read(1, 0, &bl, NULL);
3625     librados::AioCompletion *completion = cluster.aio_create_completion();
3626     ASSERT_EQ(0, cache_ioctx.aio_operate(
3627       "bam", completion, &op,
3628       librados::OPERATION_IGNORE_CACHE, NULL));
3629     completion->wait_for_safe();
3630     ASSERT_EQ(-ENOENT, completion->get_return_value());
3631     completion->release();
3632   }
3633
3634   // read foo snap
3635   ioctx.snap_set_read(my_snaps[0]);
3636   {
3637     bufferlist bl;
3638     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
3639     ASSERT_EQ('h', bl[0]);
3640   }
3641
3642   // evict foo snap
3643   {
3644     ObjectReadOperation op;
3645     op.cache_evict();
3646     librados::AioCompletion *completion = cluster.aio_create_completion();
3647     ASSERT_EQ(0, ioctx.aio_operate(
3648       "foo", completion, &op,
3649       librados::OPERATION_IGNORE_CACHE, NULL));
3650     completion->wait_for_safe();
3651     ASSERT_EQ(0, completion->get_return_value());
3652     completion->release();
3653   }
3654   // snap is gone...
3655   {
3656     bufferlist bl;
3657     ObjectReadOperation op;
3658     op.read(1, 0, &bl, NULL);
3659     librados::AioCompletion *completion = cluster.aio_create_completion();
3660     ASSERT_EQ(0, ioctx.aio_operate(
3661       "foo", completion, &op,
3662       librados::OPERATION_IGNORE_CACHE, NULL));
3663     completion->wait_for_safe();
3664     ASSERT_EQ(-ENOENT, completion->get_return_value());
3665     completion->release();
3666   }
3667   // head is still there...
3668   ioctx.snap_set_read(librados::SNAP_HEAD);
3669   {
3670     bufferlist bl;
3671     ObjectReadOperation op;
3672     op.read(1, 0, &bl, NULL);
3673     librados::AioCompletion *completion = cluster.aio_create_completion();
3674     ASSERT_EQ(0, ioctx.aio_operate(
3675       "foo", completion, &op,
3676       librados::OPERATION_IGNORE_CACHE, NULL));
3677     completion->wait_for_safe();
3678     ASSERT_EQ(0, completion->get_return_value());
3679     completion->release();
3680   }
3681
3682   // promote head + snap of bar
3683   ioctx.snap_set_read(librados::SNAP_HEAD);
3684   {
3685     bufferlist bl;
3686     ASSERT_EQ(1, ioctx.read("bar", bl, 1, 0));
3687     ASSERT_EQ('c', bl[0]);
3688   }
3689   ioctx.snap_set_read(my_snaps[0]);
3690   {
3691     bufferlist bl;
3692     ASSERT_EQ(1, ioctx.read("bar", bl, 1, 0));
3693     ASSERT_EQ('h', bl[0]);
3694   }
3695
3696   // evict bar head (fail)
3697   ioctx.snap_set_read(librados::SNAP_HEAD);
3698   {
3699     ObjectReadOperation op;
3700     op.cache_evict();
3701     librados::AioCompletion *completion = cluster.aio_create_completion();
3702     ASSERT_EQ(0, ioctx.aio_operate(
3703       "bar", completion, &op,
3704       librados::OPERATION_IGNORE_CACHE, NULL));
3705     completion->wait_for_safe();
3706     ASSERT_EQ(-EBUSY, completion->get_return_value());
3707     completion->release();
3708   }
3709
3710   // evict bar snap
3711   ioctx.snap_set_read(my_snaps[0]);
3712   {
3713     ObjectReadOperation op;
3714     op.cache_evict();
3715     librados::AioCompletion *completion = cluster.aio_create_completion();
3716     ASSERT_EQ(0, ioctx.aio_operate(
3717       "bar", completion, &op,
3718       librados::OPERATION_IGNORE_CACHE, NULL));
3719     completion->wait_for_safe();
3720     ASSERT_EQ(0, completion->get_return_value());
3721     completion->release();
3722   }
3723   // ...and then head
3724   ioctx.snap_set_read(librados::SNAP_HEAD);
3725   {
3726     bufferlist bl;
3727     ObjectReadOperation op;
3728     op.read(1, 0, &bl, NULL);
3729     librados::AioCompletion *completion = cluster.aio_create_completion();
3730     ASSERT_EQ(0, ioctx.aio_operate(
3731       "bar", completion, &op,
3732       librados::OPERATION_IGNORE_CACHE, NULL));
3733     completion->wait_for_safe();
3734     ASSERT_EQ(0, completion->get_return_value());
3735     completion->release();
3736   }
3737   {
3738     ObjectReadOperation op;
3739     op.cache_evict();
3740     librados::AioCompletion *completion = cluster.aio_create_completion();
3741     ASSERT_EQ(0, ioctx.aio_operate(
3742       "bar", completion, &op,
3743       librados::OPERATION_IGNORE_CACHE, NULL));
3744     completion->wait_for_safe();
3745     ASSERT_EQ(0, completion->get_return_value());
3746     completion->release();
3747   }
3748
3749   // cleanup
3750   ioctx.selfmanaged_snap_remove(my_snaps[0]);
3751 }
3752
3753 TEST_F(LibRadosTwoPoolsECPP, TryFlush) {
3754   // configure cache
3755   bufferlist inbl;
3756   ASSERT_EQ(0, cluster.mon_command(
3757     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
3758     "\", \"tierpool\": \"" + cache_pool_name +
3759     "\", \"force_nonempty\": \"--force-nonempty\" }",
3760     inbl, NULL, NULL));
3761   ASSERT_EQ(0, cluster.mon_command(
3762     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
3763     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
3764     inbl, NULL, NULL));
3765   ASSERT_EQ(0, cluster.mon_command(
3766     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
3767     "\", \"mode\": \"writeback\"}",
3768     inbl, NULL, NULL));
3769
3770   // wait for maps to settle
3771   cluster.wait_for_latest_osdmap();
3772
3773   // create object
3774   {
3775     bufferlist bl;
3776     bl.append("hi there");
3777     ObjectWriteOperation op;
3778     op.write_full(bl);
3779     ASSERT_EQ(0, ioctx.operate("foo", &op));
3780   }
3781
3782   // verify the object is present in the cache tier
3783   {
3784     NObjectIterator it = cache_ioctx.nobjects_begin();
3785     ASSERT_TRUE(it != cache_ioctx.nobjects_end());
3786     ASSERT_TRUE(it->get_oid() == string("foo"));
3787     ++it;
3788     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
3789   }
3790
3791   // verify the object is NOT present in the base tier
3792   {
3793     NObjectIterator it = ioctx.nobjects_begin();
3794     ASSERT_TRUE(it == ioctx.nobjects_end());
3795   }
3796
3797   // verify dirty
3798   {
3799     bool dirty = false;
3800     int r = -1;
3801     ObjectReadOperation op;
3802     op.is_dirty(&dirty, &r);
3803     ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL));
3804     ASSERT_TRUE(dirty);
3805     ASSERT_EQ(0, r);
3806   }
3807
3808   // pin
3809   {
3810     ObjectWriteOperation op;
3811     op.cache_pin();
3812     librados::AioCompletion *completion = cluster.aio_create_completion();
3813     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op));
3814     completion->wait_for_safe();
3815     ASSERT_EQ(0, completion->get_return_value());
3816     completion->release();
3817   }
3818
3819   // flush the pinned object with -EPERM
3820   {
3821     ObjectReadOperation op;
3822     op.cache_try_flush();
3823     librados::AioCompletion *completion = cluster.aio_create_completion();
3824     ASSERT_EQ(0, cache_ioctx.aio_operate(
3825       "foo", completion, &op,
3826       librados::OPERATION_IGNORE_OVERLAY |
3827       librados::OPERATION_SKIPRWLOCKS, NULL));
3828     completion->wait_for_safe();
3829     ASSERT_EQ(-EPERM, completion->get_return_value());
3830     completion->release();
3831   }
3832
3833   // unpin
3834   {
3835     ObjectWriteOperation op;
3836     op.cache_unpin();
3837     librados::AioCompletion *completion = cluster.aio_create_completion();
3838     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op));
3839     completion->wait_for_safe();
3840     ASSERT_EQ(0, completion->get_return_value());
3841     completion->release();
3842   }
3843
3844   // flush
3845   {
3846     ObjectReadOperation op;
3847     op.cache_try_flush();
3848     librados::AioCompletion *completion = cluster.aio_create_completion();
3849     ASSERT_EQ(0, cache_ioctx.aio_operate(
3850       "foo", completion, &op,
3851       librados::OPERATION_IGNORE_OVERLAY |
3852       librados::OPERATION_SKIPRWLOCKS, NULL));
3853     completion->wait_for_safe();
3854     ASSERT_EQ(0, completion->get_return_value());
3855     completion->release();
3856   }
3857
3858   // verify clean
3859   {
3860     bool dirty = false;
3861     int r = -1;
3862     ObjectReadOperation op;
3863     op.is_dirty(&dirty, &r);
3864     ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL));
3865     ASSERT_FALSE(dirty);
3866     ASSERT_EQ(0, r);
3867   }
3868
3869   // verify in base tier
3870   {
3871     NObjectIterator it = ioctx.nobjects_begin();
3872     ASSERT_TRUE(it != ioctx.nobjects_end());
3873     ASSERT_TRUE(it->get_oid() == string("foo"));
3874     ++it;
3875     ASSERT_TRUE(it == ioctx.nobjects_end());
3876   }
3877
3878   // evict it
3879   {
3880     ObjectReadOperation op;
3881     op.cache_evict();
3882     librados::AioCompletion *completion = cluster.aio_create_completion();
3883     ASSERT_EQ(0, cache_ioctx.aio_operate(
3884          "foo", completion, &op, librados::OPERATION_IGNORE_CACHE, NULL));
3885     completion->wait_for_safe();
3886     ASSERT_EQ(0, completion->get_return_value());
3887     completion->release();
3888   }
3889
3890   // verify no longer in cache tier
3891   {
3892     NObjectIterator it = cache_ioctx.nobjects_begin();
3893     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
3894   }
3895 }
3896
3897 TEST_F(LibRadosTwoPoolsECPP, FailedFlush) {
3898   // configure cache
3899   bufferlist inbl;
3900   ASSERT_EQ(0, cluster.mon_command(
3901     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
3902     "\", \"tierpool\": \"" + cache_pool_name +
3903     "\", \"force_nonempty\": \"--force-nonempty\" }",
3904     inbl, NULL, NULL));
3905   ASSERT_EQ(0, cluster.mon_command(
3906     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
3907     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
3908     inbl, NULL, NULL));
3909   ASSERT_EQ(0, cluster.mon_command(
3910     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
3911     "\", \"mode\": \"writeback\"}",
3912     inbl, NULL, NULL));
3913
3914   // wait for maps to settle
3915   cluster.wait_for_latest_osdmap();
3916
3917   // create object
3918   {
3919     bufferlist bl;
3920     bl.append("hi there");
3921     ObjectWriteOperation op;
3922     op.write_full(bl);
3923     ASSERT_EQ(0, ioctx.operate("foo", &op));
3924   }
3925
3926   // verify the object is present in the cache tier
3927   {
3928     NObjectIterator it = cache_ioctx.nobjects_begin();
3929     ASSERT_TRUE(it != cache_ioctx.nobjects_end());
3930     ASSERT_TRUE(it->get_oid() == string("foo"));
3931     ++it;
3932     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
3933   }
3934
3935   // verify the object is NOT present in the base tier
3936   {
3937     NObjectIterator it = ioctx.nobjects_begin();
3938     ASSERT_TRUE(it == ioctx.nobjects_end());
3939   }
3940
3941   // set omap
3942   {
3943     ObjectWriteOperation op;
3944     std::map<std::string, bufferlist> omap;
3945     omap["somekey"] = bufferlist();
3946     op.omap_set(omap);
3947     librados::AioCompletion *completion = cluster.aio_create_completion();
3948     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op));
3949     completion->wait_for_safe();
3950     ASSERT_EQ(0, completion->get_return_value());
3951     completion->release();
3952   }
3953
3954   // flush
3955   {
3956     ObjectReadOperation op;
3957     op.cache_flush();
3958     librados::AioCompletion *completion = cluster.aio_create_completion();
3959     ASSERT_EQ(0, cache_ioctx.aio_operate(
3960       "foo", completion, &op,
3961       librados::OPERATION_IGNORE_OVERLAY, NULL));
3962     completion->wait_for_safe();
3963     ASSERT_NE(0, completion->get_return_value());
3964     completion->release();
3965   }
3966
3967   // get omap
3968   {
3969     ObjectReadOperation op;
3970     bufferlist bl;
3971     int prval = 0;
3972     std::set<std::string> keys;
3973     keys.insert("somekey");
3974     std::map<std::string, bufferlist> map;
3975
3976     op.omap_get_vals_by_keys(keys, &map, &prval);
3977     librados::AioCompletion *completion = cluster.aio_create_completion();
3978     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op, &bl));
3979     sleep(5);
3980     bool completed = completion->is_complete();
3981     if( !completed ) {
3982       cache_ioctx.aio_cancel(completion); 
3983       std::cerr << "Most probably test case will hang here, please reset manually" << std::endl;
3984       ASSERT_TRUE(completed); //in fact we are locked forever at test case shutdown unless fix for http://tracker.ceph.com/issues/14511 is applied. Seems there is no workaround for that
3985     }
3986     completion->release();
3987   }
3988   // verify still not in base tier
3989   {
3990     ASSERT_TRUE(ioctx.nobjects_begin() == ioctx.nobjects_end());
3991   }
3992   // erase it
3993   {
3994     ObjectWriteOperation op;
3995     op.remove();
3996     ASSERT_EQ(0, ioctx.operate("foo", &op));
3997   }
3998   // flush whiteout
3999   {
4000     ObjectReadOperation op;
4001     op.cache_flush();
4002     librados::AioCompletion *completion = cluster.aio_create_completion();
4003     ASSERT_EQ(0, cache_ioctx.aio_operate(
4004       "foo", completion, &op,
4005       librados::OPERATION_IGNORE_OVERLAY, NULL));
4006     completion->wait_for_safe();
4007     ASSERT_EQ(0, completion->get_return_value());
4008     completion->release();
4009   }
4010   // evict
4011   {
4012     ObjectReadOperation op;
4013     op.cache_evict();
4014     librados::AioCompletion *completion = cluster.aio_create_completion();
4015     ASSERT_EQ(0, cache_ioctx.aio_operate(
4016       "foo", completion, &op, librados::OPERATION_IGNORE_CACHE, NULL));
4017     completion->wait_for_safe();
4018     ASSERT_EQ(0, completion->get_return_value());
4019     completion->release();
4020   }
4021
4022   // verify no longer in cache tier
4023   {
4024     NObjectIterator it = cache_ioctx.nobjects_begin();
4025     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
4026   }
4027   // or base tier
4028   {
4029     NObjectIterator it = ioctx.nobjects_begin();
4030     ASSERT_TRUE(it == ioctx.nobjects_end());
4031   }
4032 }
4033
4034 TEST_F(LibRadosTwoPoolsECPP, Flush) {
4035   // configure cache
4036   bufferlist inbl;
4037   ASSERT_EQ(0, cluster.mon_command(
4038     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
4039     "\", \"tierpool\": \"" + cache_pool_name +
4040     "\", \"force_nonempty\": \"--force-nonempty\" }",
4041     inbl, NULL, NULL));
4042   ASSERT_EQ(0, cluster.mon_command(
4043     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
4044     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
4045     inbl, NULL, NULL));
4046   ASSERT_EQ(0, cluster.mon_command(
4047     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
4048     "\", \"mode\": \"writeback\"}",
4049     inbl, NULL, NULL));
4050
4051   // wait for maps to settle
4052   cluster.wait_for_latest_osdmap();
4053
4054   uint64_t user_version = 0;
4055
4056   // create object
4057   {
4058     bufferlist bl;
4059     bl.append("hi there");
4060     ObjectWriteOperation op;
4061     op.write_full(bl);
4062     ASSERT_EQ(0, ioctx.operate("foo", &op));
4063   }
4064
4065   // verify the object is present in the cache tier
4066   {
4067     NObjectIterator it = cache_ioctx.nobjects_begin();
4068     ASSERT_TRUE(it != cache_ioctx.nobjects_end());
4069     ASSERT_TRUE(it->get_oid() == string("foo"));
4070     ++it;
4071     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
4072   }
4073
4074   // verify the object is NOT present in the base tier
4075   {
4076     NObjectIterator it = ioctx.nobjects_begin();
4077     ASSERT_TRUE(it == ioctx.nobjects_end());
4078   }
4079
4080   // verify dirty
4081   {
4082     bool dirty = false;
4083     int r = -1;
4084     ObjectReadOperation op;
4085     op.is_dirty(&dirty, &r);
4086     ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL));
4087     ASSERT_TRUE(dirty);
4088     ASSERT_EQ(0, r);
4089     user_version = cache_ioctx.get_last_version();
4090   }
4091
4092   // pin
4093   {
4094     ObjectWriteOperation op;
4095     op.cache_pin();
4096     librados::AioCompletion *completion = cluster.aio_create_completion();
4097     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op));
4098     completion->wait_for_safe();
4099     ASSERT_EQ(0, completion->get_return_value());
4100     completion->release();
4101   }
4102
4103   // flush the pinned object with -EPERM
4104   {
4105     ObjectReadOperation op;
4106     op.cache_try_flush();
4107     librados::AioCompletion *completion = cluster.aio_create_completion();
4108     ASSERT_EQ(0, cache_ioctx.aio_operate(
4109       "foo", completion, &op,
4110       librados::OPERATION_IGNORE_OVERLAY |
4111       librados::OPERATION_SKIPRWLOCKS, NULL));
4112     completion->wait_for_safe();
4113     ASSERT_EQ(-EPERM, completion->get_return_value());
4114     completion->release();
4115   }
4116
4117   // unpin
4118   {
4119     ObjectWriteOperation op;
4120     op.cache_unpin();
4121     librados::AioCompletion *completion = cluster.aio_create_completion();
4122     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op));
4123     completion->wait_for_safe();
4124     ASSERT_EQ(0, completion->get_return_value());
4125     completion->release();
4126   }
4127
4128   // flush
4129   {
4130     ObjectReadOperation op;
4131     op.cache_flush();
4132     librados::AioCompletion *completion = cluster.aio_create_completion();
4133     ASSERT_EQ(0, cache_ioctx.aio_operate(
4134       "foo", completion, &op,
4135       librados::OPERATION_IGNORE_OVERLAY, NULL));
4136     completion->wait_for_safe();
4137     ASSERT_EQ(0, completion->get_return_value());
4138     completion->release();
4139   }
4140
4141   // verify clean
4142   {
4143     bool dirty = false;
4144     int r = -1;
4145     ObjectReadOperation op;
4146     op.is_dirty(&dirty, &r);
4147     ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL));
4148     ASSERT_FALSE(dirty);
4149     ASSERT_EQ(0, r);
4150   }
4151
4152   // verify in base tier
4153   {
4154     NObjectIterator it = ioctx.nobjects_begin();
4155     ASSERT_TRUE(it != ioctx.nobjects_end());
4156     ASSERT_TRUE(it->get_oid() == string("foo"));
4157     ++it;
4158     ASSERT_TRUE(it == ioctx.nobjects_end());
4159   }
4160
4161   // evict it
4162   {
4163     ObjectReadOperation op;
4164     op.cache_evict();
4165     librados::AioCompletion *completion = cluster.aio_create_completion();
4166     ASSERT_EQ(0, cache_ioctx.aio_operate(
4167          "foo", completion, &op, librados::OPERATION_IGNORE_CACHE, NULL));
4168     completion->wait_for_safe();
4169     ASSERT_EQ(0, completion->get_return_value());
4170     completion->release();
4171   }
4172
4173   // verify no longer in cache tier
4174   {
4175     NObjectIterator it = cache_ioctx.nobjects_begin();
4176     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
4177   }
4178
4179   // read it again and verify the version is consistent
4180   {
4181     bufferlist bl;
4182     ASSERT_EQ(1, cache_ioctx.read("foo", bl, 1, 0));
4183     ASSERT_EQ(user_version, cache_ioctx.get_last_version());
4184   }
4185
4186   // erase it
4187   {
4188     ObjectWriteOperation op;
4189     op.remove();
4190     ASSERT_EQ(0, ioctx.operate("foo", &op));
4191   }
4192
4193   // flush whiteout
4194   {
4195     ObjectReadOperation op;
4196     op.cache_flush();
4197     librados::AioCompletion *completion = cluster.aio_create_completion();
4198     ASSERT_EQ(0, cache_ioctx.aio_operate(
4199       "foo", completion, &op,
4200       librados::OPERATION_IGNORE_OVERLAY, NULL));
4201     completion->wait_for_safe();
4202     ASSERT_EQ(0, completion->get_return_value());
4203     completion->release();
4204   }
4205
4206   // evict
4207   {
4208     ObjectReadOperation op;
4209     op.cache_evict();
4210     librados::AioCompletion *completion = cluster.aio_create_completion();
4211     ASSERT_EQ(0, cache_ioctx.aio_operate(
4212          "foo", completion, &op, librados::OPERATION_IGNORE_CACHE, NULL));
4213     completion->wait_for_safe();
4214     ASSERT_EQ(0, completion->get_return_value());
4215     completion->release();
4216   }
4217
4218   // verify no longer in cache tier
4219   {
4220     NObjectIterator it = cache_ioctx.nobjects_begin();
4221     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
4222   }
4223   // or base tier
4224   {
4225     NObjectIterator it = ioctx.nobjects_begin();
4226     ASSERT_TRUE(it == ioctx.nobjects_end());
4227   }
4228 }
4229
4230 TEST_F(LibRadosTwoPoolsECPP, FlushSnap) {
4231   // configure cache
4232   bufferlist inbl;
4233   ASSERT_EQ(0, cluster.mon_command(
4234     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
4235     "\", \"tierpool\": \"" + cache_pool_name +
4236     "\", \"force_nonempty\": \"--force-nonempty\" }",
4237     inbl, NULL, NULL));
4238   ASSERT_EQ(0, cluster.mon_command(
4239     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
4240     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
4241     inbl, NULL, NULL));
4242   ASSERT_EQ(0, cluster.mon_command(
4243     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
4244     "\", \"mode\": \"writeback\"}",
4245     inbl, NULL, NULL));
4246
4247   // wait for maps to settle
4248   cluster.wait_for_latest_osdmap();
4249
4250   // create object
4251   {
4252     bufferlist bl;
4253     bl.append("a");
4254     ObjectWriteOperation op;
4255     op.write_full(bl);
4256     ASSERT_EQ(0, ioctx.operate("foo", &op));
4257   }
4258
4259   // create a snapshot, clone
4260   vector<uint64_t> my_snaps(1);
4261   ASSERT_EQ(0, ioctx.selfmanaged_snap_create(&my_snaps[0]));
4262   ASSERT_EQ(0, ioctx.selfmanaged_snap_set_write_ctx(my_snaps[0],
4263                                                          my_snaps));
4264   {
4265     bufferlist bl;
4266     bl.append("b");
4267     ObjectWriteOperation op;
4268     op.write_full(bl);
4269     ASSERT_EQ(0, ioctx.operate("foo", &op));
4270   }
4271
4272   // and another
4273   my_snaps.resize(2);
4274   my_snaps[1] = my_snaps[0];
4275   ASSERT_EQ(0, ioctx.selfmanaged_snap_create(&my_snaps[0]));
4276   ASSERT_EQ(0, ioctx.selfmanaged_snap_set_write_ctx(my_snaps[0],
4277                                                          my_snaps));
4278   {
4279     bufferlist bl;
4280     bl.append("c");
4281     ObjectWriteOperation op;
4282     op.write_full(bl);
4283     ASSERT_EQ(0, ioctx.operate("foo", &op));
4284   }
4285
4286   // verify the object is present in the cache tier
4287   {
4288     NObjectIterator it = cache_ioctx.nobjects_begin();
4289     ASSERT_TRUE(it != cache_ioctx.nobjects_end());
4290     ASSERT_TRUE(it->get_oid() == string("foo"));
4291     ++it;
4292     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
4293   }
4294
4295   // verify the object is NOT present in the base tier
4296   {
4297     NObjectIterator it = ioctx.nobjects_begin();
4298     ASSERT_TRUE(it == ioctx.nobjects_end());
4299   }
4300
4301   // flush on head (should fail)
4302   ioctx.snap_set_read(librados::SNAP_HEAD);
4303   {
4304     ObjectReadOperation op;
4305     op.cache_flush();
4306     librados::AioCompletion *completion = cluster.aio_create_completion();
4307     ASSERT_EQ(0, ioctx.aio_operate(
4308       "foo", completion, &op,
4309       librados::OPERATION_IGNORE_CACHE, NULL));
4310     completion->wait_for_safe();
4311     ASSERT_EQ(-EBUSY, completion->get_return_value());
4312     completion->release();
4313   }
4314   // flush on recent snap (should fail)
4315   ioctx.snap_set_read(my_snaps[0]);
4316   {
4317     ObjectReadOperation op;
4318     op.cache_flush();
4319     librados::AioCompletion *completion = cluster.aio_create_completion();
4320     ASSERT_EQ(0, ioctx.aio_operate(
4321       "foo", completion, &op,
4322       librados::OPERATION_IGNORE_CACHE, NULL));
4323     completion->wait_for_safe();
4324     ASSERT_EQ(-EBUSY, completion->get_return_value());
4325     completion->release();
4326   }
4327   // flush on oldest snap
4328   ioctx.snap_set_read(my_snaps[1]);
4329   {
4330     ObjectReadOperation op;
4331     op.cache_flush();
4332     librados::AioCompletion *completion = cluster.aio_create_completion();
4333     ASSERT_EQ(0, ioctx.aio_operate(
4334       "foo", completion, &op,
4335       librados::OPERATION_IGNORE_CACHE, NULL));
4336     completion->wait_for_safe();
4337     ASSERT_EQ(0, completion->get_return_value());
4338     completion->release();
4339   }
4340   // flush on next oldest snap
4341   ioctx.snap_set_read(my_snaps[0]);
4342   {
4343     ObjectReadOperation op;
4344     op.cache_flush();
4345     librados::AioCompletion *completion = cluster.aio_create_completion();
4346     ASSERT_EQ(0, ioctx.aio_operate(
4347       "foo", completion, &op,
4348       librados::OPERATION_IGNORE_CACHE, NULL));
4349     completion->wait_for_safe();
4350     ASSERT_EQ(0, completion->get_return_value());
4351     completion->release();
4352   }
4353   // flush on head
4354   ioctx.snap_set_read(librados::SNAP_HEAD);
4355   {
4356     ObjectReadOperation op;
4357     op.cache_flush();
4358     librados::AioCompletion *completion = cluster.aio_create_completion();
4359     ASSERT_EQ(0, ioctx.aio_operate(
4360       "foo", completion, &op,
4361       librados::OPERATION_IGNORE_CACHE, NULL));
4362     completion->wait_for_safe();
4363     ASSERT_EQ(0, completion->get_return_value());
4364     completion->release();
4365   }
4366
4367   // verify i can read the snaps from the cache pool
4368   ioctx.snap_set_read(librados::SNAP_HEAD);
4369   {
4370     bufferlist bl;
4371     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
4372     ASSERT_EQ('c', bl[0]);
4373   }
4374   ioctx.snap_set_read(my_snaps[0]);
4375   {
4376     bufferlist bl;
4377     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
4378     ASSERT_EQ('b', bl[0]);
4379   }
4380   ioctx.snap_set_read(my_snaps[1]);
4381   {
4382     bufferlist bl;
4383     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
4384     ASSERT_EQ('a', bl[0]);
4385   }
4386
4387   // tear down tiers
4388   ASSERT_EQ(0, cluster.mon_command(
4389     "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + pool_name +
4390     "\"}",
4391     inbl, NULL, NULL));
4392
4393   // wait for maps to settle
4394   cluster.wait_for_latest_osdmap();
4395
4396   // verify i can read the snaps from the base pool
4397   ioctx.snap_set_read(librados::SNAP_HEAD);
4398   {
4399     bufferlist bl;
4400     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
4401     ASSERT_EQ('c', bl[0]);
4402   }
4403   ioctx.snap_set_read(my_snaps[0]);
4404   {
4405     bufferlist bl;
4406     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
4407     ASSERT_EQ('b', bl[0]);
4408   }
4409   ioctx.snap_set_read(my_snaps[1]);
4410   {
4411     bufferlist bl;
4412     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
4413     ASSERT_EQ('a', bl[0]);
4414   }
4415
4416   ASSERT_EQ(0, cluster.mon_command(
4417     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
4418     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
4419     inbl, NULL, NULL));
4420   cluster.wait_for_latest_osdmap();
4421
4422   // cleanup
4423   ioctx.selfmanaged_snap_remove(my_snaps[0]);
4424 }
4425
4426 TEST_F(LibRadosTierECPP, FlushWriteRaces) {
4427   Rados cluster;
4428   std::string pool_name = get_temp_pool_name();
4429   std::string cache_pool_name = pool_name + "-cache";
4430   ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
4431   ASSERT_EQ(0, cluster.pool_create(cache_pool_name.c_str()));
4432   IoCtx cache_ioctx;
4433   ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx));
4434   cache_ioctx.application_enable("rados", true);
4435   IoCtx ioctx;
4436   ASSERT_EQ(0, cluster.ioctx_create(pool_name.c_str(), ioctx));
4437
4438   // configure cache
4439   bufferlist inbl;
4440   ASSERT_EQ(0, cluster.mon_command(
4441     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
4442     "\", \"tierpool\": \"" + cache_pool_name + "\"}",
4443     inbl, NULL, NULL));
4444   ASSERT_EQ(0, cluster.mon_command(
4445     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
4446     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
4447     inbl, NULL, NULL));
4448   ASSERT_EQ(0, cluster.mon_command(
4449     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
4450     "\", \"mode\": \"writeback\"}",
4451     inbl, NULL, NULL));
4452
4453   // wait for maps to settle
4454   cluster.wait_for_latest_osdmap();
4455
4456   // create/dirty object
4457   bufferlist bl;
4458   bl.append("hi there");
4459   {
4460     ObjectWriteOperation op;
4461     op.write_full(bl);
4462     ASSERT_EQ(0, ioctx.operate("foo", &op));
4463   }
4464
4465   // flush + write
4466   {
4467     ObjectReadOperation op;
4468     op.cache_flush();
4469     librados::AioCompletion *completion = cluster.aio_create_completion();
4470     ASSERT_EQ(0, cache_ioctx.aio_operate(
4471       "foo", completion, &op,
4472       librados::OPERATION_IGNORE_OVERLAY, NULL));
4473
4474     ObjectWriteOperation op2;
4475     op2.write_full(bl);
4476     librados::AioCompletion *completion2 = cluster.aio_create_completion();
4477     ASSERT_EQ(0, ioctx.aio_operate(
4478       "foo", completion2, &op2, 0));
4479
4480     completion->wait_for_safe();
4481     completion2->wait_for_safe();
4482     ASSERT_EQ(0, completion->get_return_value());
4483     ASSERT_EQ(0, completion2->get_return_value());
4484     completion->release();
4485     completion2->release();
4486   }
4487
4488   int tries = 1000;
4489   do {
4490     // create/dirty object
4491     {
4492       bufferlist bl;
4493       bl.append("hi there");
4494       ObjectWriteOperation op;
4495       op.write_full(bl);
4496       ASSERT_EQ(0, ioctx.operate("foo", &op));
4497     }
4498
4499     // try-flush + write
4500     {
4501       ObjectReadOperation op;
4502       op.cache_try_flush();
4503       librados::AioCompletion *completion = cluster.aio_create_completion();
4504       ASSERT_EQ(0, cache_ioctx.aio_operate(
4505         "foo", completion, &op,
4506         librados::OPERATION_IGNORE_OVERLAY |
4507         librados::OPERATION_SKIPRWLOCKS, NULL));
4508
4509       ObjectWriteOperation op2;
4510       op2.write_full(bl);
4511       librados::AioCompletion *completion2 = cluster.aio_create_completion();
4512       ASSERT_EQ(0, ioctx.aio_operate("foo", completion2, &op2, 0));
4513
4514       completion->wait_for_safe();
4515       completion2->wait_for_safe();
4516       int r = completion->get_return_value();
4517       ASSERT_TRUE(r == -EBUSY || r == 0);
4518       ASSERT_EQ(0, completion2->get_return_value());
4519       completion->release();
4520       completion2->release();
4521       if (r == -EBUSY)
4522         break;
4523       cout << "didn't get EBUSY, trying again" << std::endl;
4524     }
4525     ASSERT_TRUE(--tries);
4526   } while (true);
4527
4528   // tear down tiers
4529   ASSERT_EQ(0, cluster.mon_command(
4530     "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + pool_name +
4531     "\"}",
4532     inbl, NULL, NULL));
4533   ASSERT_EQ(0, cluster.mon_command(
4534     "{\"prefix\": \"osd tier remove\", \"pool\": \"" + pool_name +
4535     "\", \"tierpool\": \"" + cache_pool_name + "\"}",
4536     inbl, NULL, NULL));
4537
4538   // wait for maps to settle before next test
4539   cluster.wait_for_latest_osdmap();
4540
4541   ASSERT_EQ(0, cluster.pool_delete(cache_pool_name.c_str()));
4542   ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
4543 }
4544
4545 TEST_F(LibRadosTwoPoolsECPP, FlushTryFlushRaces) {
4546   // configure cache
4547   bufferlist inbl;
4548   ASSERT_EQ(0, cluster.mon_command(
4549     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
4550     "\", \"tierpool\": \"" + cache_pool_name +
4551     "\", \"force_nonempty\": \"--force-nonempty\" }",
4552     inbl, NULL, NULL));
4553   ASSERT_EQ(0, cluster.mon_command(
4554     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
4555     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
4556     inbl, NULL, NULL));
4557   ASSERT_EQ(0, cluster.mon_command(
4558     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
4559     "\", \"mode\": \"writeback\"}",
4560     inbl, NULL, NULL));
4561
4562   // wait for maps to settle
4563   cluster.wait_for_latest_osdmap();
4564
4565   // create/dirty object
4566   {
4567     bufferlist bl;
4568     bl.append("hi there");
4569     ObjectWriteOperation op;
4570     op.write_full(bl);
4571     ASSERT_EQ(0, ioctx.operate("foo", &op));
4572   }
4573
4574   // flush + flush
4575   {
4576     ObjectReadOperation op;
4577     op.cache_flush();
4578     librados::AioCompletion *completion = cluster.aio_create_completion();
4579     ASSERT_EQ(0, cache_ioctx.aio_operate(
4580       "foo", completion, &op,
4581       librados::OPERATION_IGNORE_OVERLAY, NULL));
4582
4583     ObjectReadOperation op2;
4584     op2.cache_flush();
4585     librados::AioCompletion *completion2 = cluster.aio_create_completion();
4586     ASSERT_EQ(0, cache_ioctx.aio_operate(
4587       "foo", completion2, &op2,
4588       librados::OPERATION_IGNORE_OVERLAY, NULL));
4589
4590     completion->wait_for_safe();
4591     completion2->wait_for_safe();
4592     ASSERT_EQ(0, completion->get_return_value());
4593     ASSERT_EQ(0, completion2->get_return_value());
4594     completion->release();
4595     completion2->release();
4596   }
4597
4598   // create/dirty object
4599   {
4600     bufferlist bl;
4601     bl.append("hi there");
4602     ObjectWriteOperation op;
4603     op.write_full(bl);
4604     ASSERT_EQ(0, ioctx.operate("foo", &op));
4605   }
4606
4607   // flush + try-flush
4608   {
4609     ObjectReadOperation op;
4610     op.cache_flush();
4611     librados::AioCompletion *completion = cluster.aio_create_completion();
4612     ASSERT_EQ(0, cache_ioctx.aio_operate(
4613       "foo", completion, &op,
4614       librados::OPERATION_IGNORE_OVERLAY, NULL));
4615
4616     ObjectReadOperation op2;
4617     op2.cache_try_flush();
4618     librados::AioCompletion *completion2 = cluster.aio_create_completion();
4619     ASSERT_EQ(0, cache_ioctx.aio_operate(
4620       "foo", completion2, &op2,
4621       librados::OPERATION_IGNORE_OVERLAY |
4622       librados::OPERATION_SKIPRWLOCKS, NULL));
4623
4624     completion->wait_for_safe();
4625     completion2->wait_for_safe();
4626     ASSERT_EQ(0, completion->get_return_value());
4627     ASSERT_EQ(0, completion2->get_return_value());
4628     completion->release();
4629     completion2->release();
4630   }
4631
4632   // create/dirty object
4633   int tries = 1000;
4634   do {
4635     {
4636       bufferlist bl;
4637       bl.append("hi there");
4638       ObjectWriteOperation op;
4639       op.write_full(bl);
4640       ASSERT_EQ(0, ioctx.operate("foo", &op));
4641     }
4642
4643     // try-flush + flush
4644     //  (flush will not piggyback on try-flush)
4645     {
4646       ObjectReadOperation op;
4647       op.cache_try_flush();
4648       librados::AioCompletion *completion = cluster.aio_create_completion();
4649       ASSERT_EQ(0, cache_ioctx.aio_operate(
4650         "foo", completion, &op,
4651         librados::OPERATION_IGNORE_OVERLAY |
4652         librados::OPERATION_SKIPRWLOCKS, NULL));
4653
4654       ObjectReadOperation op2;
4655       op2.cache_flush();
4656       librados::AioCompletion *completion2 = cluster.aio_create_completion();
4657       ASSERT_EQ(0, cache_ioctx.aio_operate(
4658         "foo", completion2, &op2,
4659         librados::OPERATION_IGNORE_OVERLAY, NULL));
4660
4661       completion->wait_for_safe();
4662       completion2->wait_for_safe();
4663       int r = completion->get_return_value();
4664       ASSERT_TRUE(r == -EBUSY || r == 0);
4665       ASSERT_EQ(0, completion2->get_return_value());
4666       completion->release();
4667       completion2->release();
4668       if (r == -EBUSY)
4669         break;
4670       cout << "didn't get EBUSY, trying again" << std::endl;
4671     }
4672     ASSERT_TRUE(--tries);
4673   } while (true);
4674
4675   // create/dirty object
4676   {
4677     bufferlist bl;
4678     bl.append("hi there");
4679     ObjectWriteOperation op;
4680     op.write_full(bl);
4681     ASSERT_EQ(0, ioctx.operate("foo", &op));
4682   }
4683
4684   // try-flush + try-flush
4685   {
4686     ObjectReadOperation op;
4687     op.cache_try_flush();
4688     librados::AioCompletion *completion = cluster.aio_create_completion();
4689     ASSERT_EQ(0, cache_ioctx.aio_operate(
4690       "foo", completion, &op,
4691       librados::OPERATION_IGNORE_OVERLAY |
4692       librados::OPERATION_SKIPRWLOCKS, NULL));
4693
4694     ObjectReadOperation op2;
4695     op2.cache_try_flush();
4696     librados::AioCompletion *completion2 = cluster.aio_create_completion();
4697     ASSERT_EQ(0, cache_ioctx.aio_operate(
4698       "foo", completion2, &op2,
4699       librados::OPERATION_IGNORE_OVERLAY |
4700       librados::OPERATION_SKIPRWLOCKS, NULL));
4701
4702     completion->wait_for_safe();
4703     completion2->wait_for_safe();
4704     ASSERT_EQ(0, completion->get_return_value());
4705     ASSERT_EQ(0, completion2->get_return_value());
4706     completion->release();
4707     completion2->release();
4708   }
4709 }
4710
4711 TEST_F(LibRadosTwoPoolsECPP, TryFlushReadRace) {
4712   // configure cache
4713   bufferlist inbl;
4714   ASSERT_EQ(0, cluster.mon_command(
4715     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
4716     "\", \"tierpool\": \"" + cache_pool_name +
4717     "\", \"force_nonempty\": \"--force-nonempty\" }",
4718     inbl, NULL, NULL));
4719   ASSERT_EQ(0, cluster.mon_command(
4720     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
4721     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
4722     inbl, NULL, NULL));
4723   ASSERT_EQ(0, cluster.mon_command(
4724     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
4725     "\", \"mode\": \"writeback\"}",
4726     inbl, NULL, NULL));
4727
4728   // wait for maps to settle
4729   cluster.wait_for_latest_osdmap();
4730
4731   // create/dirty object
4732   {
4733     bufferlist bl;
4734     bl.append("hi there");
4735     bufferptr bp(4000000);  // make it big!
4736     bp.zero();
4737     bl.append(bp);
4738     ObjectWriteOperation op;
4739     op.write_full(bl);
4740     ASSERT_EQ(0, ioctx.operate("foo", &op));
4741   }
4742
4743   // start a continuous stream of reads
4744   read_ioctx = &ioctx;
4745   test_lock.Lock();
4746   for (int i = 0; i < max_reads; ++i) {
4747     start_flush_read();
4748     num_reads++;
4749   }
4750   test_lock.Unlock();
4751
4752   // try-flush
4753   ObjectReadOperation op;
4754   op.cache_try_flush();
4755   librados::AioCompletion *completion = cluster.aio_create_completion();
4756   ASSERT_EQ(0, cache_ioctx.aio_operate(
4757       "foo", completion, &op,
4758       librados::OPERATION_IGNORE_OVERLAY |
4759       librados::OPERATION_SKIPRWLOCKS, NULL));
4760
4761   completion->wait_for_safe();
4762   ASSERT_EQ(0, completion->get_return_value());
4763   completion->release();
4764
4765   // stop reads
4766   test_lock.Lock();
4767   max_reads = 0;
4768   while (num_reads > 0)
4769     cond.Wait(test_lock);
4770   test_lock.Unlock();
4771 }
4772
4773 TEST_F(LibRadosTierECPP, CallForcesPromote) {
4774   Rados cluster;
4775   std::string pool_name = get_temp_pool_name();
4776   std::string cache_pool_name = pool_name + "-cache";
4777   ASSERT_EQ("", create_one_ec_pool_pp(pool_name, cluster));
4778   ASSERT_EQ(0, cluster.pool_create(cache_pool_name.c_str()));
4779   IoCtx cache_ioctx;
4780   ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx));
4781   cache_ioctx.application_enable("rados", true);
4782   IoCtx ioctx;
4783   ASSERT_EQ(0, cluster.ioctx_create(pool_name.c_str(), ioctx));
4784
4785   // configure cache
4786   bufferlist inbl;
4787   ASSERT_EQ(0, cluster.mon_command(
4788     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
4789     "\", \"tierpool\": \"" + cache_pool_name + "\"}",
4790     inbl, NULL, NULL));
4791   ASSERT_EQ(0, cluster.mon_command(
4792     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
4793     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
4794     inbl, NULL, NULL));
4795   ASSERT_EQ(0, cluster.mon_command(
4796     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
4797     "\", \"mode\": \"writeback\"}",
4798     inbl, NULL, NULL));
4799
4800   // set things up such that the op would normally be proxied
4801   ASSERT_EQ(0, cluster.mon_command(
4802               set_pool_str(cache_pool_name, "hit_set_count", 2),
4803               inbl, NULL, NULL));
4804   ASSERT_EQ(0, cluster.mon_command(
4805               set_pool_str(cache_pool_name, "hit_set_period", 600),
4806               inbl, NULL, NULL));
4807   ASSERT_EQ(0, cluster.mon_command(
4808               set_pool_str(cache_pool_name, "hit_set_type",
4809                            "explicit_object"),
4810               inbl, NULL, NULL));
4811   ASSERT_EQ(0, cluster.mon_command(
4812               set_pool_str(cache_pool_name, "min_read_recency_for_promote",
4813                            "4"),
4814               inbl, NULL, NULL));
4815
4816   // wait for maps to settle
4817   cluster.wait_for_latest_osdmap();
4818
4819   // create/dirty object
4820   bufferlist bl;
4821   bl.append("hi there");
4822   {
4823     ObjectWriteOperation op;
4824     op.write_full(bl);
4825     ASSERT_EQ(0, ioctx.operate("foo", &op));
4826   }
4827
4828   // flush
4829   {
4830     ObjectReadOperation op;
4831     op.cache_flush();
4832     librados::AioCompletion *completion = cluster.aio_create_completion();
4833     ASSERT_EQ(0, cache_ioctx.aio_operate(
4834       "foo", completion, &op,
4835       librados::OPERATION_IGNORE_OVERLAY, NULL));
4836     completion->wait_for_safe();
4837     ASSERT_EQ(0, completion->get_return_value());
4838     completion->release();
4839   }
4840
4841   // evict
4842   {
4843     ObjectReadOperation op;
4844     op.cache_evict();
4845     librados::AioCompletion *completion = cluster.aio_create_completion();
4846     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op,
4847                                          librados::OPERATION_IGNORE_CACHE,
4848                                          NULL));
4849     completion->wait_for_safe();
4850     ASSERT_EQ(0, completion->get_return_value());
4851     completion->release();
4852   }
4853
4854   // call
4855   {
4856     ObjectReadOperation op;
4857     bufferlist bl;
4858     op.exec("rbd", "get_id", bl);
4859     bufferlist out;
4860     // should get EIO (not an rbd object), not -EOPNOTSUPP (we didn't promote)
4861     ASSERT_EQ(-5, ioctx.operate("foo", &op, &out));
4862   }
4863
4864   // make sure foo is back in the cache tier
4865   {
4866     NObjectIterator it = cache_ioctx.nobjects_begin();
4867     ASSERT_TRUE(it != cache_ioctx.nobjects_end());
4868     ASSERT_TRUE(it->get_oid() == string("foo"));
4869     ++it;
4870     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
4871   }
4872
4873   // tear down tiers
4874   ASSERT_EQ(0, cluster.mon_command(
4875     "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + pool_name +
4876     "\"}",
4877     inbl, NULL, NULL));
4878   ASSERT_EQ(0, cluster.mon_command(
4879     "{\"prefix\": \"osd tier remove\", \"pool\": \"" + pool_name +
4880     "\", \"tierpool\": \"" + cache_pool_name + "\"}",
4881     inbl, NULL, NULL));
4882
4883   // wait for maps to settle before next test
4884   cluster.wait_for_latest_osdmap();
4885
4886   ASSERT_EQ(0, cluster.pool_delete(cache_pool_name.c_str()));
4887   ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
4888 }
4889
4890 TEST_F(LibRadosTierECPP, HitSetNone) {
4891   {
4892     list< pair<time_t,time_t> > ls;
4893     AioCompletion *c = librados::Rados::aio_create_completion();
4894     ASSERT_EQ(0, ioctx.hit_set_list(123, c, &ls));
4895     c->wait_for_complete();
4896     ASSERT_EQ(0, c->get_return_value());
4897     ASSERT_TRUE(ls.empty());
4898     c->release();
4899   }
4900   {
4901     bufferlist bl;
4902     AioCompletion *c = librados::Rados::aio_create_completion();
4903     ASSERT_EQ(0, ioctx.hit_set_get(123, c, 12345, &bl));
4904     c->wait_for_complete();
4905     ASSERT_EQ(-ENOENT, c->get_return_value());
4906     c->release();
4907   }
4908 }
4909
4910 TEST_F(LibRadosTwoPoolsECPP, HitSetRead) {
4911   // make it a tier
4912   bufferlist inbl;
4913   ASSERT_EQ(0, cluster.mon_command(
4914     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
4915     "\", \"tierpool\": \"" + cache_pool_name +
4916     "\", \"force_nonempty\": \"--force-nonempty\" }",
4917     inbl, NULL, NULL));
4918
4919   // enable hitset tracking for this pool
4920   ASSERT_EQ(0, cluster.mon_command(set_pool_str(cache_pool_name, "hit_set_count", 2),
4921                                                 inbl, NULL, NULL));
4922   ASSERT_EQ(0, cluster.mon_command(set_pool_str(cache_pool_name, "hit_set_period", 600),
4923                                                 inbl, NULL, NULL));
4924   ASSERT_EQ(0, cluster.mon_command(set_pool_str(cache_pool_name, "hit_set_type",
4925                                                 "explicit_object"),
4926                                    inbl, NULL, NULL));
4927
4928   // wait for maps to settle
4929   cluster.wait_for_latest_osdmap();
4930
4931   cache_ioctx.set_namespace("");
4932
4933   // keep reading until we see our object appear in the HitSet
4934   utime_t start = ceph_clock_now();
4935   utime_t hard_stop = start + utime_t(600, 0);
4936
4937   while (true) {
4938     utime_t now = ceph_clock_now();
4939     ASSERT_TRUE(now < hard_stop);
4940
4941     string name = "foo";
4942     uint32_t hash;
4943     ASSERT_EQ(0, cache_ioctx.get_object_hash_position2(name, &hash));
4944     hobject_t oid(sobject_t(name, CEPH_NOSNAP), "", hash,
4945                   cluster.pool_lookup(cache_pool_name.c_str()), "");
4946
4947     bufferlist bl;
4948     ASSERT_EQ(-ENOENT, cache_ioctx.read("foo", bl, 1, 0));
4949
4950     bufferlist hbl;
4951     AioCompletion *c = librados::Rados::aio_create_completion();
4952     ASSERT_EQ(0, cache_ioctx.hit_set_get(hash, c, now.sec(), &hbl));
4953     c->wait_for_complete();
4954     c->release();
4955
4956     if (hbl.length()) {
4957       bufferlist::iterator p = hbl.begin();
4958       HitSet hs;
4959       ::decode(hs, p);
4960       if (hs.contains(oid)) {
4961         cout << "ok, hit_set contains " << oid << std::endl;
4962         break;
4963       }
4964       cout << "hmm, not in HitSet yet" << std::endl;
4965     } else {
4966       cout << "hmm, no HitSet yet" << std::endl;
4967     }
4968
4969     sleep(1);
4970   }
4971 }
4972
4973 // disable this test until hitset-get reliably works on EC pools
4974 #if 0
4975 TEST_F(LibRadosTierECPP, HitSetWrite) {
4976   int num_pg = _get_pg_num(cluster, pool_name);
4977   assert(num_pg > 0);
4978
4979   // enable hitset tracking for this pool
4980   bufferlist inbl;
4981   ASSERT_EQ(0, cluster.mon_command(set_pool_str(pool_name, "hit_set_count", 8),
4982                                                 inbl, NULL, NULL));
4983   ASSERT_EQ(0, cluster.mon_command(set_pool_str(pool_name, "hit_set_period", 600),
4984                                                 inbl, NULL, NULL));
4985   ASSERT_EQ(0, cluster.mon_command(set_pool_str(pool_name, "hit_set_type",
4986                                                 "explicit_hash"),
4987                                    inbl, NULL, NULL));
4988
4989   // wait for maps to settle
4990   cluster.wait_for_latest_osdmap();
4991
4992   ioctx.set_namespace("");
4993
4994   // do a bunch of writes
4995   for (int i=0; i<1000; ++i) {
4996     bufferlist bl;
4997     bl.append("a");
4998     ASSERT_EQ(0, ioctx.write(stringify(i), bl, 1, 0));
4999   }
5000
5001   // get HitSets
5002   std::map<int,HitSet> hitsets;
5003   for (int i=0; i<num_pg; ++i) {
5004     list< pair<time_t,time_t> > ls;
5005     AioCompletion *c = librados::Rados::aio_create_completion();
5006     ASSERT_EQ(0, ioctx.hit_set_list(i, c, &ls));
5007     c->wait_for_complete();
5008     c->release();
5009     std::cout << "pg " << i << " ls " << ls << std::endl;
5010     ASSERT_FALSE(ls.empty());
5011
5012     // get the latest
5013     c = librados::Rados::aio_create_completion();
5014     bufferlist bl;
5015     ASSERT_EQ(0, ioctx.hit_set_get(i, c, ls.back().first, &bl));
5016     c->wait_for_complete();
5017     c->release();
5018
5019     //std::cout << "bl len is " << bl.length() << "\n";
5020     //bl.hexdump(std::cout);
5021     //std::cout << std::endl;
5022
5023     bufferlist::iterator p = bl.begin();
5024     ::decode(hitsets[i], p);
5025
5026     // cope with racing splits by refreshing pg_num
5027     if (i == num_pg - 1)
5028       num_pg = _get_pg_num(cluster, pool_name);
5029   }
5030
5031   for (int i=0; i<1000; ++i) {
5032     string n = stringify(i);
5033     uint32_t hash = ioctx.get_object_hash_position(n);
5034     hobject_t oid(sobject_t(n, CEPH_NOSNAP), "", hash,
5035                   cluster.pool_lookup(pool_name.c_str()), "");
5036     std::cout << "checking for " << oid << std::endl;
5037     bool found = false;
5038     for (int p=0; p<num_pg; ++p) {
5039       if (hitsets[p].contains(oid)) {
5040         found = true;
5041         break;
5042       }
5043     }
5044     ASSERT_TRUE(found);
5045   }
5046 }
5047 #endif
5048
5049 TEST_F(LibRadosTwoPoolsECPP, HitSetTrim) {
5050   unsigned count = 3;
5051   unsigned period = 3;
5052
5053   // make it a tier
5054   bufferlist inbl;
5055   ASSERT_EQ(0, cluster.mon_command(
5056     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
5057     "\", \"tierpool\": \"" + cache_pool_name +
5058     "\", \"force_nonempty\": \"--force-nonempty\" }",
5059     inbl, NULL, NULL));
5060
5061   // enable hitset tracking for this pool
5062   ASSERT_EQ(0, cluster.mon_command(set_pool_str(cache_pool_name, "hit_set_count", count),
5063                                                 inbl, NULL, NULL));
5064   ASSERT_EQ(0, cluster.mon_command(set_pool_str(cache_pool_name, "hit_set_period", period),
5065                                                 inbl, NULL, NULL));
5066   ASSERT_EQ(0, cluster.mon_command(set_pool_str(cache_pool_name, "hit_set_type", "bloom"),
5067                                    inbl, NULL, NULL));
5068   ASSERT_EQ(0, cluster.mon_command(set_pool_str(cache_pool_name, "hit_set_fpp", ".01"),
5069                                    inbl, NULL, NULL));
5070
5071   // wait for maps to settle
5072   cluster.wait_for_latest_osdmap();
5073
5074   cache_ioctx.set_namespace("");
5075
5076   // do a bunch of writes and make sure the hitsets rotate
5077   utime_t start = ceph_clock_now();
5078   utime_t hard_stop = start + utime_t(count * period * 50, 0);
5079
5080   time_t first = 0;
5081   int bsize = alignment;
5082   char *buf = (char *)new char[bsize];
5083   memset(buf, 'f', bsize);
5084
5085   while (true) {
5086     string name = "foo";
5087     uint32_t hash;
5088     ASSERT_EQ(0, cache_ioctx.get_object_hash_position2(name, &hash));
5089     hobject_t oid(sobject_t(name, CEPH_NOSNAP), "", hash, -1, "");
5090
5091     bufferlist bl;
5092     bl.append(buf, bsize);
5093     ASSERT_EQ(0, cache_ioctx.append("foo", bl, bsize));
5094
5095     list<pair<time_t, time_t> > ls;
5096     AioCompletion *c = librados::Rados::aio_create_completion();
5097     ASSERT_EQ(0, cache_ioctx.hit_set_list(hash, c, &ls));
5098     c->wait_for_complete();
5099     c->release();
5100
5101     cout << " got ls " << ls << std::endl;
5102     if (!ls.empty()) {
5103       if (!first) {
5104         first = ls.front().first;
5105         cout << "first is " << first << std::endl;
5106       } else {
5107         if (ls.front().first != first) {
5108           cout << "first now " << ls.front().first << ", trimmed" << std::endl;
5109           break;
5110         }
5111       }
5112     }
5113
5114     utime_t now = ceph_clock_now();
5115     ASSERT_TRUE(now < hard_stop);
5116
5117     sleep(1);
5118   }
5119   delete[] buf;
5120 }
5121
5122 TEST_F(LibRadosTwoPoolsECPP, PromoteOn2ndRead) {
5123   // create object
5124   for (int i=0; i<20; ++i) {
5125     bufferlist bl;
5126     bl.append("hi there");
5127     ObjectWriteOperation op;
5128     op.write_full(bl);
5129     ASSERT_EQ(0, ioctx.operate("foo" + stringify(i), &op));
5130   }
5131
5132   // configure cache
5133   bufferlist inbl;
5134   ASSERT_EQ(0, cluster.mon_command(
5135     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
5136     "\", \"tierpool\": \"" + cache_pool_name +
5137     "\", \"force_nonempty\": \"--force-nonempty\" }",
5138     inbl, NULL, NULL));
5139   ASSERT_EQ(0, cluster.mon_command(
5140     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
5141     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
5142     inbl, NULL, NULL));
5143   ASSERT_EQ(0, cluster.mon_command(
5144     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
5145     "\", \"mode\": \"writeback\"}",
5146     inbl, NULL, NULL));
5147
5148   // enable hitset tracking for this pool
5149   ASSERT_EQ(0, cluster.mon_command(
5150     set_pool_str(cache_pool_name, "hit_set_count", 2),
5151     inbl, NULL, NULL));
5152   ASSERT_EQ(0, cluster.mon_command(
5153     set_pool_str(cache_pool_name, "hit_set_period", 600),
5154     inbl, NULL, NULL));
5155   ASSERT_EQ(0, cluster.mon_command(
5156     set_pool_str(cache_pool_name, "hit_set_type", "bloom"),
5157     inbl, NULL, NULL));
5158   ASSERT_EQ(0, cluster.mon_command(
5159     set_pool_str(cache_pool_name, "min_read_recency_for_promote", 1),
5160     inbl, NULL, NULL));
5161   ASSERT_EQ(0, cluster.mon_command(
5162     set_pool_str(cache_pool_name, "hit_set_grade_decay_rate", 20),
5163     inbl, NULL, NULL));
5164   ASSERT_EQ(0, cluster.mon_command(
5165     set_pool_str(cache_pool_name, "hit_set_search_last_n", 1),
5166     inbl, NULL, NULL));
5167
5168   // wait for maps to settle
5169   cluster.wait_for_latest_osdmap();
5170
5171   int fake = 0;  // set this to non-zero to test spurious promotion,
5172                  // e.g. from thrashing
5173   int attempt = 0;
5174   string obj;
5175   while (true) {
5176     // 1st read, don't trigger a promote
5177     obj = "foo" + stringify(attempt);
5178     cout << obj << std::endl;
5179     {
5180       bufferlist bl;
5181       ASSERT_EQ(1, ioctx.read(obj.c_str(), bl, 1, 0));
5182       if (--fake >= 0) {
5183         sleep(1);
5184         ASSERT_EQ(1, ioctx.read(obj.c_str(), bl, 1, 0));
5185         sleep(1);
5186       }
5187     }
5188
5189     // verify the object is NOT present in the cache tier
5190     {
5191       bool found = false;
5192       NObjectIterator it = cache_ioctx.nobjects_begin();
5193       while (it != cache_ioctx.nobjects_end()) {
5194         cout << " see " << it->get_oid() << std::endl;
5195         if (it->get_oid() == string(obj.c_str())) {
5196           found = true;
5197           break;
5198         }
5199         ++it;
5200       }
5201       if (!found)
5202         break;
5203     }
5204
5205     ++attempt;
5206     ASSERT_LE(attempt, 20);
5207     cout << "hrm, object is present in cache on attempt " << attempt
5208          << ", retrying" << std::endl;
5209   }
5210
5211   // Read until the object is present in the cache tier
5212   cout << "verifying " << obj << " is eventually promoted" << std::endl;
5213   while (true) {
5214     bufferlist bl;
5215     ASSERT_EQ(1, ioctx.read(obj.c_str(), bl, 1, 0));
5216
5217     bool there = false;
5218     NObjectIterator it = cache_ioctx.nobjects_begin();
5219     while (it != cache_ioctx.nobjects_end()) {
5220       if (it->get_oid() == string(obj.c_str())) {
5221         there = true;
5222         break;
5223       }
5224       ++it;
5225     }
5226     if (there)
5227       break;
5228
5229     sleep(1);
5230   }
5231
5232   // tear down tiers
5233   ASSERT_EQ(0, cluster.mon_command(
5234     "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + pool_name +
5235     "\"}",
5236     inbl, NULL, NULL));
5237   ASSERT_EQ(0, cluster.mon_command(
5238     "{\"prefix\": \"osd tier remove\", \"pool\": \"" + pool_name +
5239     "\", \"tierpool\": \"" + cache_pool_name + "\"}",
5240     inbl, NULL, NULL));
5241
5242   // wait for maps to settle before next test
5243   cluster.wait_for_latest_osdmap();
5244 }
5245
5246 TEST_F(LibRadosTwoPoolsECPP, ProxyRead) {
5247   // create object
5248   {
5249     bufferlist bl;
5250     bl.append("hi there");
5251     ObjectWriteOperation op;
5252     op.write_full(bl);
5253     ASSERT_EQ(0, ioctx.operate("foo", &op));
5254   }
5255
5256   // configure cache
5257   bufferlist inbl;
5258   ASSERT_EQ(0, cluster.mon_command(
5259     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
5260     "\", \"tierpool\": \"" + cache_pool_name +
5261     "\", \"force_nonempty\": \"--force-nonempty\" }",
5262     inbl, NULL, NULL));
5263   ASSERT_EQ(0, cluster.mon_command(
5264     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
5265     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
5266     inbl, NULL, NULL));
5267   ASSERT_EQ(0, cluster.mon_command(
5268     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
5269     "\", \"mode\": \"readproxy\"}",
5270     inbl, NULL, NULL));
5271
5272   // wait for maps to settle
5273   cluster.wait_for_latest_osdmap();
5274
5275   // read and verify the object
5276   {
5277     bufferlist bl;
5278     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
5279     ASSERT_EQ('h', bl[0]);
5280   }
5281
5282   // Verify 10 times the object is NOT present in the cache tier
5283   uint32_t i = 0;
5284   while (i++ < 10) {
5285     NObjectIterator it = cache_ioctx.nobjects_begin();
5286     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
5287     sleep(1);
5288   }
5289
5290   // tear down tiers
5291   ASSERT_EQ(0, cluster.mon_command(
5292     "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + pool_name +
5293     "\"}",
5294     inbl, NULL, NULL));
5295   ASSERT_EQ(0, cluster.mon_command(
5296     "{\"prefix\": \"osd tier remove\", \"pool\": \"" + pool_name +
5297     "\", \"tierpool\": \"" + cache_pool_name + "\"}",
5298     inbl, NULL, NULL));
5299
5300   // wait for maps to settle before next test
5301   cluster.wait_for_latest_osdmap();
5302 }
5303
5304 TEST_F(LibRadosTwoPoolsECPP, CachePin) {
5305   // create object
5306   {
5307     bufferlist bl;
5308     bl.append("hi there");
5309     ObjectWriteOperation op;
5310     op.write_full(bl);
5311     ASSERT_EQ(0, ioctx.operate("foo", &op));
5312   }
5313   {
5314     bufferlist bl;
5315     bl.append("hi there");
5316     ObjectWriteOperation op;
5317     op.write_full(bl);
5318     ASSERT_EQ(0, ioctx.operate("bar", &op));
5319   }
5320   {
5321     bufferlist bl;
5322     bl.append("hi there");
5323     ObjectWriteOperation op;
5324     op.write_full(bl);
5325     ASSERT_EQ(0, ioctx.operate("baz", &op));
5326   }
5327   {
5328     bufferlist bl;
5329     bl.append("hi there");
5330     ObjectWriteOperation op;
5331     op.write_full(bl);
5332     ASSERT_EQ(0, ioctx.operate("bam", &op));
5333   }
5334
5335   // configure cache
5336   bufferlist inbl;
5337   ASSERT_EQ(0, cluster.mon_command(
5338     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
5339     "\", \"tierpool\": \"" + cache_pool_name +
5340     "\", \"force_nonempty\": \"--force-nonempty\" }",
5341     inbl, NULL, NULL));
5342   ASSERT_EQ(0, cluster.mon_command(
5343     "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + pool_name +
5344     "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
5345     inbl, NULL, NULL));
5346   ASSERT_EQ(0, cluster.mon_command(
5347     "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
5348     "\", \"mode\": \"writeback\"}",
5349     inbl, NULL, NULL));
5350
5351   // wait for maps to settle
5352   cluster.wait_for_latest_osdmap();
5353
5354   // read, trigger promote
5355   {
5356     bufferlist bl;
5357     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
5358     ASSERT_EQ(1, ioctx.read("bar", bl, 1, 0));
5359     ASSERT_EQ(1, ioctx.read("baz", bl, 1, 0));
5360     ASSERT_EQ(1, ioctx.read("bam", bl, 1, 0));
5361   }
5362
5363   // verify the objects are present in the cache tier
5364   {
5365     NObjectIterator it = cache_ioctx.nobjects_begin();
5366     ASSERT_TRUE(it != cache_ioctx.nobjects_end());
5367     for (uint32_t i = 0; i < 4; i++) {
5368       ASSERT_TRUE(it->get_oid() == string("foo") ||
5369                   it->get_oid() == string("bar") ||
5370                   it->get_oid() == string("baz") ||
5371                   it->get_oid() == string("bam"));
5372       ++it;
5373     }
5374     ASSERT_TRUE(it == cache_ioctx.nobjects_end());
5375   }
5376
5377   // pin objects
5378   {
5379     ObjectWriteOperation op;
5380     op.cache_pin();
5381     librados::AioCompletion *completion = cluster.aio_create_completion();
5382     ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op));
5383     completion->wait_for_safe();
5384     ASSERT_EQ(0, completion->get_return_value());
5385     completion->release();
5386   }
5387   {
5388     ObjectWriteOperation op;
5389     op.cache_pin();
5390     librados::AioCompletion *completion = cluster.aio_create_completion();
5391     ASSERT_EQ(0, cache_ioctx.aio_operate("baz", completion, &op));
5392     completion->wait_for_safe();
5393     ASSERT_EQ(0, completion->get_return_value());
5394     completion->release();
5395   }
5396
5397   // enable agent
5398   ASSERT_EQ(0, cluster.mon_command(
5399     set_pool_str(cache_pool_name, "hit_set_count", 2),
5400     inbl, NULL, NULL));
5401   ASSERT_EQ(0, cluster.mon_command(
5402     set_pool_str(cache_pool_name, "hit_set_period", 600),
5403     inbl, NULL, NULL));
5404   ASSERT_EQ(0, cluster.mon_command(
5405     set_pool_str(cache_pool_name, "hit_set_type", "bloom"),
5406     inbl, NULL, NULL));
5407   ASSERT_EQ(0, cluster.mon_command(
5408     set_pool_str(cache_pool_name, "min_read_recency_for_promote", 1),
5409     inbl, NULL, NULL));
5410   ASSERT_EQ(0, cluster.mon_command(
5411     set_pool_str(cache_pool_name, "target_max_objects", 1),
5412     inbl, NULL, NULL));
5413
5414   sleep(10);
5415
5416   // Verify the pinned object 'foo' is not flushed/evicted
5417   uint32_t count = 0;
5418   while (true) {
5419     bufferlist bl;
5420     ASSERT_EQ(1, ioctx.read("baz", bl, 1, 0));
5421
5422     count = 0;
5423     NObjectIterator it = cache_ioctx.nobjects_begin();
5424     while (it != cache_ioctx.nobjects_end()) {
5425       ASSERT_TRUE(it->get_oid() == string("foo") ||
5426                   it->get_oid() == string("bar") ||
5427                   it->get_oid() == string("baz") ||
5428                   it->get_oid() == string("bam"));
5429       ++count;
5430       ++it;
5431     }
5432     if (count == 2) {
5433       ASSERT_TRUE(it->get_oid() == string("foo") ||
5434                   it->get_oid() == string("baz"));
5435       break;
5436     }
5437
5438     sleep(1);
5439   }
5440
5441   // tear down tiers
5442   ASSERT_EQ(0, cluster.mon_command(
5443     "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + pool_name +
5444     "\"}",
5445     inbl, NULL, NULL));
5446   ASSERT_EQ(0, cluster.mon_command(
5447     "{\"prefix\": \"osd tier remove\", \"pool\": \"" + pool_name +
5448     "\", \"tierpool\": \"" + cache_pool_name + "\"}",
5449     inbl, NULL, NULL));
5450
5451   // wait for maps to settle before next test
5452   cluster.wait_for_latest_osdmap();
5453 }
5454 TEST_F(LibRadosTwoPoolsECPP, SetRedirectRead) {
5455   // skip test if not yet luminous
5456   {
5457     bufferlist inbl, outbl;
5458     ASSERT_EQ(0, cluster.mon_command(
5459                 "{\"prefix\": \"osd dump\"}",
5460                 inbl, &outbl, NULL));
5461     string s(outbl.c_str(), outbl.length());
5462     if (s.find("luminous") == std::string::npos) {
5463       cout << "cluster is not yet luminous, skipping test" << std::endl;
5464       return;
5465     }
5466   }
5467
5468   // create object
5469   {
5470     bufferlist bl;
5471     bl.append("hi there");
5472     ObjectWriteOperation op;
5473     op.write_full(bl);
5474     ASSERT_EQ(0, ioctx.operate("foo", &op));
5475   }
5476   {
5477     bufferlist bl;
5478     bl.append("there");
5479     ObjectWriteOperation op;
5480     op.write_full(bl);
5481     ASSERT_EQ(0, cache_ioctx.operate("bar", &op));
5482   }
5483
5484   // configure tier
5485   bufferlist inbl;
5486   ASSERT_EQ(0, cluster.mon_command(
5487     "{\"prefix\": \"osd tier add\", \"pool\": \"" + pool_name +
5488     "\", \"tierpool\": \"" + cache_pool_name +
5489     "\", \"force_nonempty\": \"--force-nonempty\" }",
5490     inbl, NULL, NULL));
5491
5492   // wait for maps to settle
5493   cluster.wait_for_latest_osdmap();
5494
5495   {
5496     ObjectWriteOperation op;
5497     op.set_redirect("bar", cache_ioctx, 0);
5498     librados::AioCompletion *completion = cluster.aio_create_completion();
5499     ASSERT_EQ(0, ioctx.aio_operate("foo", completion, &op));
5500     completion->wait_for_safe();
5501     ASSERT_EQ(0, completion->get_return_value());
5502     completion->release();
5503   }
5504   // read and verify the object
5505   {
5506     bufferlist bl;
5507     ASSERT_EQ(1, ioctx.read("foo", bl, 1, 0));
5508     ASSERT_EQ('t', bl[0]);
5509   }
5510
5511   ASSERT_EQ(0, cluster.mon_command(
5512     "{\"prefix\": \"osd tier remove\", \"pool\": \"" + pool_name +
5513     "\", \"tierpool\": \"" + cache_pool_name + "\"}",
5514     inbl, NULL, NULL));
5515
5516   // wait for maps to settle before next test
5517   cluster.wait_for_latest_osdmap();
5518 }