Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / journal / test_JournalPlayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "journal/JournalPlayer.h"
5 #include "journal/Entry.h"
6 #include "journal/JournalMetadata.h"
7 #include "journal/ReplayHandler.h"
8 #include "include/stringify.h"
9 #include "common/Cond.h"
10 #include "common/Mutex.h"
11 #include "gtest/gtest.h"
12 #include "test/journal/RadosTestFixture.h"
13 #include <list>
14 #include <boost/scope_exit.hpp>
15
16 typedef std::list<journal::Entry> Entries;
17
18 template <typename T>
19 class TestJournalPlayer : public RadosTestFixture {
20 public:
21   typedef std::list<journal::JournalPlayer *> JournalPlayers;
22
23   static const uint64_t max_fetch_bytes = T::max_fetch_bytes;
24
25   struct ReplayHandler : public journal::ReplayHandler {
26     Mutex lock;
27     Cond cond;
28     bool entries_available;
29     bool complete;
30     int complete_result;
31
32     ReplayHandler()
33       : lock("lock"), entries_available(false), complete(false),
34         complete_result(0) {}
35
36     void get() override {}
37     void put() override {}
38
39     void handle_entries_available() override {
40       Mutex::Locker locker(lock);
41       entries_available = true;
42       cond.Signal();
43     }
44
45     void handle_complete(int r) override {
46       Mutex::Locker locker(lock);
47       complete = true;
48       complete_result = r;
49       cond.Signal();
50     }
51   };
52
53   void TearDown() override {
54     for (JournalPlayers::iterator it = m_players.begin();
55          it != m_players.end(); ++it) {
56       delete *it;
57     }
58     RadosTestFixture::TearDown();
59   }
60
61   journal::JournalMetadataPtr create_metadata(const std::string &oid) {
62     return RadosTestFixture::create_metadata(oid, "client", 0.1,
63                                              max_fetch_bytes);
64   }
65
66   int client_commit(const std::string &oid,
67                     journal::JournalPlayer::ObjectSetPosition position) {
68     return RadosTestFixture::client_commit(oid, "client", position);
69   }
70
71   journal::Entry create_entry(uint64_t tag_tid, uint64_t entry_tid) {
72     std::string payload(128, '0');
73     bufferlist payload_bl;
74     payload_bl.append(payload);
75     return journal::Entry(tag_tid, entry_tid, payload_bl);
76   }
77
78   journal::JournalPlayer *create_player(const std::string &oid,
79                                         const journal::JournalMetadataPtr &metadata) {
80     journal::JournalPlayer *player(new journal::JournalPlayer(
81       m_ioctx, oid + ".", metadata, &m_replay_hander));
82     m_players.push_back(player);
83     return player;
84   }
85
86   bool wait_for_entries(journal::JournalPlayer *player, uint32_t count,
87                         Entries *entries) {
88     entries->clear();
89     while (entries->size() < count) {
90       journal::Entry entry;
91       uint64_t commit_tid;
92       while (entries->size() < count &&
93              player->try_pop_front(&entry, &commit_tid)) {
94         entries->push_back(entry);
95       }
96       if (entries->size() == count) {
97         break;
98       }
99
100       Mutex::Locker locker(m_replay_hander.lock);
101       if (m_replay_hander.entries_available) {
102         m_replay_hander.entries_available = false;
103       } else if (m_replay_hander.cond.WaitInterval(
104           m_replay_hander.lock, utime_t(10, 0)) != 0) {
105         break;
106       }
107     }
108     return entries->size() == count;
109   }
110
111   bool wait_for_complete(journal::JournalPlayer *player) {
112     Mutex::Locker locker(m_replay_hander.lock);
113     while (!m_replay_hander.complete) {
114       journal::Entry entry;
115       uint64_t commit_tid;
116       player->try_pop_front(&entry, &commit_tid);
117
118       if (m_replay_hander.cond.WaitInterval(
119             m_replay_hander.lock, utime_t(10, 0)) != 0) {
120         return false;
121       }
122     }
123     m_replay_hander.complete = false;
124     return true;
125   }
126
127   int write_entry(const std::string &oid, uint64_t object_num,
128                   uint64_t tag_tid, uint64_t entry_tid) {
129     bufferlist bl;
130     ::encode(create_entry(tag_tid, entry_tid), bl);
131     return append(oid + "." + stringify(object_num), bl);
132   }
133
134   JournalPlayers m_players;
135   ReplayHandler m_replay_hander;
136 };
137
138 template <uint64_t _max_fetch_bytes>
139 class TestJournalPlayerParams {
140 public:
141   static const uint64_t max_fetch_bytes = _max_fetch_bytes;
142 };
143
144 typedef ::testing::Types<TestJournalPlayerParams<0>,
145                          TestJournalPlayerParams<16> > TestJournalPlayerTypes;
146 TYPED_TEST_CASE(TestJournalPlayer, TestJournalPlayerTypes);
147
148 TYPED_TEST(TestJournalPlayer, Prefetch) {
149   std::string oid = this->get_temp_oid();
150
151   journal::JournalPlayer::ObjectPositions positions;
152   positions = {
153     cls::journal::ObjectPosition(0, 234, 122) };
154   cls::journal::ObjectSetPosition commit_position(positions);
155
156   ASSERT_EQ(0, this->create(oid));
157   ASSERT_EQ(0, this->client_register(oid));
158   ASSERT_EQ(0, this->client_commit(oid, commit_position));
159
160   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
161   ASSERT_EQ(0, this->init_metadata(metadata));
162
163   journal::JournalPlayer *player = this->create_player(oid, metadata);
164   BOOST_SCOPE_EXIT_ALL( (player) ) {
165     C_SaferCond unwatch_ctx;
166     player->shut_down(&unwatch_ctx);
167     ASSERT_EQ(0, unwatch_ctx.wait());
168   };
169
170   ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
171   ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
172   ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
173   ASSERT_EQ(0, this->write_entry(oid, 1, 234, 125));
174
175   player->prefetch();
176
177   Entries entries;
178   ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
179   ASSERT_TRUE(this->wait_for_complete(player));
180
181   Entries expected_entries;
182   expected_entries = {
183     this->create_entry(234, 123),
184     this->create_entry(234, 124),
185     this->create_entry(234, 125)};
186   ASSERT_EQ(expected_entries, entries);
187
188   uint64_t last_tid;
189   ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
190   ASSERT_EQ(125U, last_tid);
191 }
192
193 TYPED_TEST(TestJournalPlayer, PrefetchSkip) {
194   std::string oid = this->get_temp_oid();
195
196   journal::JournalPlayer::ObjectPositions positions;
197   positions = {
198     cls::journal::ObjectPosition(0, 234, 125),
199     cls::journal::ObjectPosition(1, 234, 124) };
200   cls::journal::ObjectSetPosition commit_position(positions);
201
202   ASSERT_EQ(0, this->create(oid));
203   ASSERT_EQ(0, this->client_register(oid));
204   ASSERT_EQ(0, this->client_commit(oid, commit_position));
205
206   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
207   ASSERT_EQ(0, this->init_metadata(metadata));
208
209   journal::JournalPlayer *player = this->create_player(oid, metadata);
210   BOOST_SCOPE_EXIT_ALL( (player) ) {
211     C_SaferCond unwatch_ctx;
212     player->shut_down(&unwatch_ctx);
213     ASSERT_EQ(0, unwatch_ctx.wait());
214   };
215
216   ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
217   ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
218   ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
219   ASSERT_EQ(0, this->write_entry(oid, 1, 234, 125));
220
221   player->prefetch();
222
223   Entries entries;
224   ASSERT_TRUE(this->wait_for_entries(player, 0, &entries));
225   ASSERT_TRUE(this->wait_for_complete(player));
226
227   uint64_t last_tid;
228   ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
229   ASSERT_EQ(125U, last_tid);
230 }
231
232 TYPED_TEST(TestJournalPlayer, PrefetchWithoutCommit) {
233   std::string oid = this->get_temp_oid();
234
235   cls::journal::ObjectSetPosition commit_position;
236
237   ASSERT_EQ(0, this->create(oid));
238   ASSERT_EQ(0, this->client_register(oid));
239   ASSERT_EQ(0, this->client_commit(oid, commit_position));
240
241   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
242   ASSERT_EQ(0, this->init_metadata(metadata));
243
244   journal::JournalPlayer *player = this->create_player(oid, metadata);
245   BOOST_SCOPE_EXIT_ALL( (player) ) {
246     C_SaferCond unwatch_ctx;
247     player->shut_down(&unwatch_ctx);
248     ASSERT_EQ(0, unwatch_ctx.wait());
249   };
250
251   ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
252   ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
253
254   player->prefetch();
255
256   Entries entries;
257   ASSERT_TRUE(this->wait_for_entries(player, 2, &entries));
258   ASSERT_TRUE(this->wait_for_complete(player));
259
260   Entries expected_entries;
261   expected_entries = {
262     this->create_entry(234, 122),
263     this->create_entry(234, 123)};
264   ASSERT_EQ(expected_entries, entries);
265 }
266
267 TYPED_TEST(TestJournalPlayer, PrefetchMultipleTags) {
268   std::string oid = this->get_temp_oid();
269
270   journal::JournalPlayer::ObjectPositions positions;
271   positions = {
272     cls::journal::ObjectPosition(2, 234, 122),
273     cls::journal::ObjectPosition(1, 234, 121),
274     cls::journal::ObjectPosition(0, 234, 120)};
275   cls::journal::ObjectSetPosition commit_position(positions);
276
277   ASSERT_EQ(0, this->create(oid, 14, 3));
278   ASSERT_EQ(0, this->client_register(oid));
279   ASSERT_EQ(0, this->client_commit(oid, commit_position));
280
281   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
282   ASSERT_EQ(0, this->init_metadata(metadata));
283
284   journal::JournalPlayer *player = this->create_player(oid, metadata);
285   BOOST_SCOPE_EXIT_ALL( (player) ) {
286     C_SaferCond unwatch_ctx;
287     player->shut_down(&unwatch_ctx);
288     ASSERT_EQ(0, unwatch_ctx.wait());
289   };
290
291   ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
292   ASSERT_EQ(0, this->write_entry(oid, 1, 234, 121));
293   ASSERT_EQ(0, this->write_entry(oid, 2, 234, 122));
294   ASSERT_EQ(0, this->write_entry(oid, 0, 234, 123));
295   ASSERT_EQ(0, this->write_entry(oid, 1, 234, 124));
296   ASSERT_EQ(0, this->write_entry(oid, 0, 236, 0)); // new tag allocated
297
298   player->prefetch();
299
300   Entries entries;
301   ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
302   ASSERT_TRUE(this->wait_for_complete(player));
303
304   uint64_t last_tid;
305   ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
306   ASSERT_EQ(124U, last_tid);
307   ASSERT_TRUE(metadata->get_last_allocated_entry_tid(236, &last_tid));
308   ASSERT_EQ(0U, last_tid);
309 }
310
311 TYPED_TEST(TestJournalPlayer, PrefetchCorruptSequence) {
312   std::string oid = this->get_temp_oid();
313
314   cls::journal::ObjectSetPosition commit_position;
315
316   ASSERT_EQ(0, this->create(oid));
317   ASSERT_EQ(0, this->client_register(oid));
318   ASSERT_EQ(0, this->client_commit(oid, commit_position));
319
320   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
321   ASSERT_EQ(0, this->init_metadata(metadata));
322
323   journal::JournalPlayer *player = this->create_player(oid, metadata);
324   BOOST_SCOPE_EXIT_ALL( (player) ) {
325     C_SaferCond unwatch_ctx;
326     player->shut_down(&unwatch_ctx);
327     ASSERT_EQ(0, unwatch_ctx.wait());
328   };
329
330   ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
331   ASSERT_EQ(0, this->write_entry(oid, 1, 234, 121));
332   ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
333
334   player->prefetch();
335   Entries entries;
336   ASSERT_TRUE(this->wait_for_entries(player, 2, &entries));
337
338   journal::Entry entry;
339   uint64_t commit_tid;
340   ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
341   ASSERT_TRUE(this->wait_for_complete(player));
342   ASSERT_EQ(-ENOMSG, this->m_replay_hander.complete_result);
343 }
344
345 TYPED_TEST(TestJournalPlayer, PrefetchMissingSequence) {
346   std::string oid = this->get_temp_oid();
347
348   cls::journal::ObjectSetPosition commit_position;
349
350   ASSERT_EQ(0, this->create(oid, 14, 4));
351   ASSERT_EQ(0, this->client_register(oid));
352   ASSERT_EQ(0, this->client_commit(oid, commit_position));
353
354   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
355   ASSERT_EQ(0, this->init_metadata(metadata));
356
357   journal::JournalPlayer *player = this->create_player(oid, metadata);
358   BOOST_SCOPE_EXIT_ALL( (player) ) {
359     C_SaferCond unwatch_ctx;
360     player->shut_down(&unwatch_ctx);
361     ASSERT_EQ(0, unwatch_ctx.wait());
362   };
363
364   ASSERT_EQ(0, metadata->set_active_set(1));
365   ASSERT_EQ(0, this->write_entry(oid, 0, 2, 852));
366   ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
367   ASSERT_EQ(0, this->write_entry(oid, 0, 2, 860));
368   ASSERT_EQ(0, this->write_entry(oid, 1, 2, 853));
369   ASSERT_EQ(0, this->write_entry(oid, 1, 2, 857));
370   ASSERT_EQ(0, this->write_entry(oid, 5, 2, 861));
371   ASSERT_EQ(0, this->write_entry(oid, 2, 2, 854));
372   ASSERT_EQ(0, this->write_entry(oid, 0, 3, 0));
373   ASSERT_EQ(0, this->write_entry(oid, 5, 3, 1));
374   ASSERT_EQ(0, this->write_entry(oid, 2, 3, 2));
375   ASSERT_EQ(0, this->write_entry(oid, 3, 3, 3));
376
377   player->prefetch();
378   Entries entries;
379   ASSERT_TRUE(this->wait_for_entries(player, 7, &entries));
380
381   Entries expected_entries = {
382     this->create_entry(2, 852),
383     this->create_entry(2, 853),
384     this->create_entry(2, 854),
385     this->create_entry(3, 0),
386     this->create_entry(3, 1),
387     this->create_entry(3, 2),
388     this->create_entry(3, 3)};
389   ASSERT_EQ(expected_entries, entries);
390
391   ASSERT_TRUE(this->wait_for_complete(player));
392   ASSERT_EQ(0, this->m_replay_hander.complete_result);
393 }
394
395 TYPED_TEST(TestJournalPlayer, PrefetchLargeMissingSequence) {
396   std::string oid = this->get_temp_oid();
397
398   cls::journal::ObjectSetPosition commit_position;
399
400   ASSERT_EQ(0, this->create(oid));
401   ASSERT_EQ(0, this->client_register(oid));
402   ASSERT_EQ(0, this->client_commit(oid, commit_position));
403
404   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
405   ASSERT_EQ(0, this->init_metadata(metadata));
406
407   journal::JournalPlayer *player = this->create_player(oid, metadata);
408   BOOST_SCOPE_EXIT_ALL( (player) ) {
409     C_SaferCond unwatch_ctx;
410     player->shut_down(&unwatch_ctx);
411     ASSERT_EQ(0, unwatch_ctx.wait());
412   };
413
414   ASSERT_EQ(0, metadata->set_active_set(2));
415   ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
416   ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
417   ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
418   ASSERT_EQ(0, this->write_entry(oid, 4, 1, 0));
419
420   player->prefetch();
421   Entries entries;
422   ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
423
424   Entries expected_entries = {
425     this->create_entry(0, 0),
426     this->create_entry(0, 1),
427     this->create_entry(1, 0)};
428   ASSERT_EQ(expected_entries, entries);
429 }
430
431 TYPED_TEST(TestJournalPlayer, PrefetchBlockedNewTag) {
432   std::string oid = this->get_temp_oid();
433
434   cls::journal::ObjectSetPosition commit_position;
435
436   ASSERT_EQ(0, this->create(oid));
437   ASSERT_EQ(0, this->client_register(oid));
438   ASSERT_EQ(0, this->client_commit(oid, commit_position));
439
440   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
441   ASSERT_EQ(0, this->init_metadata(metadata));
442
443   journal::JournalPlayer *player = this->create_player(oid, metadata);
444   BOOST_SCOPE_EXIT_ALL( (player) ) {
445     C_SaferCond unwatch_ctx;
446     player->shut_down(&unwatch_ctx);
447     ASSERT_EQ(0, unwatch_ctx.wait());
448   };
449
450   ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
451   ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
452   ASSERT_EQ(0, this->write_entry(oid, 0, 0, 2));
453   ASSERT_EQ(0, this->write_entry(oid, 0, 0, 4));
454   ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
455
456   player->prefetch();
457   Entries entries;
458   ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
459
460   Entries expected_entries = {
461     this->create_entry(0, 0),
462     this->create_entry(0, 1),
463     this->create_entry(0, 2),
464     this->create_entry(1, 0)};
465   ASSERT_EQ(expected_entries, entries);
466 }
467
468 TYPED_TEST(TestJournalPlayer, PrefetchStaleEntries) {
469   std::string oid = this->get_temp_oid();
470
471   journal::JournalPlayer::ObjectPositions positions = {
472     cls::journal::ObjectPosition(0, 1, 0) };
473   cls::journal::ObjectSetPosition commit_position(positions);
474
475   ASSERT_EQ(0, this->create(oid));
476   ASSERT_EQ(0, this->client_register(oid));
477   ASSERT_EQ(0, this->client_commit(oid, commit_position));
478
479   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
480   ASSERT_EQ(0, this->init_metadata(metadata));
481
482   journal::JournalPlayer *player = this->create_player(oid, metadata);
483   BOOST_SCOPE_EXIT_ALL( (player) ) {
484     C_SaferCond unwatch_ctx;
485     player->shut_down(&unwatch_ctx);
486     ASSERT_EQ(0, unwatch_ctx.wait());
487   };
488
489   ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
490   ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
491   ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
492   ASSERT_EQ(0, this->write_entry(oid, 1, 1, 1));
493
494   player->prefetch();
495   Entries entries;
496   ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
497
498   Entries expected_entries = {
499     this->create_entry(1, 1)};
500   ASSERT_EQ(expected_entries, entries);
501
502   ASSERT_TRUE(this->wait_for_complete(player));
503   ASSERT_EQ(0, this->m_replay_hander.complete_result);
504 }
505
506 TYPED_TEST(TestJournalPlayer, PrefetchUnexpectedTag) {
507   std::string oid = this->get_temp_oid();
508
509   cls::journal::ObjectSetPosition commit_position;
510
511   ASSERT_EQ(0, this->create(oid));
512   ASSERT_EQ(0, this->client_register(oid));
513   ASSERT_EQ(0, this->client_commit(oid, commit_position));
514
515   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
516   ASSERT_EQ(0, this->init_metadata(metadata));
517
518   journal::JournalPlayer *player = this->create_player(oid, metadata);
519   BOOST_SCOPE_EXIT_ALL( (player) ) {
520     C_SaferCond unwatch_ctx;
521     player->shut_down(&unwatch_ctx);
522     ASSERT_EQ(0, unwatch_ctx.wait());
523   };
524
525   ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
526   ASSERT_EQ(0, this->write_entry(oid, 1, 235, 121));
527   ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
528
529   player->prefetch();
530   Entries entries;
531   ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
532
533   journal::Entry entry;
534   uint64_t commit_tid;
535   ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
536   ASSERT_TRUE(this->wait_for_complete(player));
537   ASSERT_EQ(0, this->m_replay_hander.complete_result);
538 }
539
540 TYPED_TEST(TestJournalPlayer, PrefetchAndWatch) {
541   std::string oid = this->get_temp_oid();
542
543   journal::JournalPlayer::ObjectPositions positions;
544   positions = {
545     cls::journal::ObjectPosition(0, 234, 122)};
546   cls::journal::ObjectSetPosition commit_position(positions);
547
548   ASSERT_EQ(0, this->create(oid));
549   ASSERT_EQ(0, this->client_register(oid));
550   ASSERT_EQ(0, this->client_commit(oid, commit_position));
551
552   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
553   ASSERT_EQ(0, this->init_metadata(metadata));
554
555   journal::JournalPlayer *player = this->create_player(oid, metadata);
556   BOOST_SCOPE_EXIT_ALL( (player) ) {
557     C_SaferCond unwatch_ctx;
558     player->shut_down(&unwatch_ctx);
559     ASSERT_EQ(0, unwatch_ctx.wait());
560   };
561
562   ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
563
564   player->prefetch_and_watch(0.25);
565
566   Entries entries;
567   ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
568   ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
569
570   Entries expected_entries;
571   expected_entries = {this->create_entry(234, 123)};
572   ASSERT_EQ(expected_entries, entries);
573
574   ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
575   ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
576
577   expected_entries = {this->create_entry(234, 124)};
578   ASSERT_EQ(expected_entries, entries);
579 }
580
581 TYPED_TEST(TestJournalPlayer, PrefetchSkippedObject) {
582   std::string oid = this->get_temp_oid();
583
584   cls::journal::ObjectSetPosition commit_position;
585
586   ASSERT_EQ(0, this->create(oid, 14, 3));
587   ASSERT_EQ(0, this->client_register(oid));
588   ASSERT_EQ(0, this->client_commit(oid, commit_position));
589
590   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
591   ASSERT_EQ(0, this->init_metadata(metadata));
592   ASSERT_EQ(0, metadata->set_active_set(2));
593
594   journal::JournalPlayer *player = this->create_player(oid, metadata);
595   BOOST_SCOPE_EXIT_ALL( (player) ) {
596     C_SaferCond unwatch_ctx;
597     player->shut_down(&unwatch_ctx);
598     ASSERT_EQ(0, unwatch_ctx.wait());
599   };
600
601   ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
602   ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
603   ASSERT_EQ(0, this->write_entry(oid, 5, 234, 124));
604   ASSERT_EQ(0, this->write_entry(oid, 6, 234, 125));
605   ASSERT_EQ(0, this->write_entry(oid, 7, 234, 126));
606
607   player->prefetch();
608
609   Entries entries;
610   ASSERT_TRUE(this->wait_for_entries(player, 5, &entries));
611   ASSERT_TRUE(this->wait_for_complete(player));
612
613   Entries expected_entries;
614   expected_entries = {
615     this->create_entry(234, 122),
616     this->create_entry(234, 123),
617     this->create_entry(234, 124),
618     this->create_entry(234, 125),
619     this->create_entry(234, 126)};
620   ASSERT_EQ(expected_entries, entries);
621
622   uint64_t last_tid;
623   ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
624   ASSERT_EQ(126U, last_tid);
625 }
626
627 TYPED_TEST(TestJournalPlayer, ImbalancedJournal) {
628   std::string oid = this->get_temp_oid();
629
630   journal::JournalPlayer::ObjectPositions positions = {
631     cls::journal::ObjectPosition(9, 300, 1),
632     cls::journal::ObjectPosition(8, 300, 0),
633     cls::journal::ObjectPosition(10, 200, 4334),
634     cls::journal::ObjectPosition(11, 200, 4331) };
635   cls::journal::ObjectSetPosition commit_position(positions);
636
637   ASSERT_EQ(0, this->create(oid, 14, 4));
638   ASSERT_EQ(0, this->client_register(oid));
639   ASSERT_EQ(0, this->client_commit(oid, commit_position));
640
641   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
642   ASSERT_EQ(0, this->init_metadata(metadata));
643   ASSERT_EQ(0, metadata->set_active_set(2));
644   metadata->set_minimum_set(2);
645
646   journal::JournalPlayer *player = this->create_player(oid, metadata);
647   BOOST_SCOPE_EXIT_ALL( (player) ) {
648     C_SaferCond unwatch_ctx;
649     player->shut_down(&unwatch_ctx);
650     ASSERT_EQ(0, unwatch_ctx.wait());
651   };
652
653   ASSERT_EQ(0, this->write_entry(oid, 8, 300, 0));
654   ASSERT_EQ(0, this->write_entry(oid, 8, 301, 0));
655   ASSERT_EQ(0, this->write_entry(oid, 9, 300, 1));
656   ASSERT_EQ(0, this->write_entry(oid, 9, 301, 1));
657   ASSERT_EQ(0, this->write_entry(oid, 10, 200, 4334));
658   ASSERT_EQ(0, this->write_entry(oid, 10, 301, 2));
659   ASSERT_EQ(0, this->write_entry(oid, 11, 200, 4331));
660   ASSERT_EQ(0, this->write_entry(oid, 11, 301, 3));
661
662   player->prefetch();
663
664   Entries entries;
665   ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
666   ASSERT_TRUE(this->wait_for_complete(player));
667
668   Entries expected_entries;
669   expected_entries = {
670     this->create_entry(301, 0),
671     this->create_entry(301, 1),
672     this->create_entry(301, 2),
673     this->create_entry(301, 3)};
674   ASSERT_EQ(expected_entries, entries);
675
676   uint64_t last_tid;
677   ASSERT_TRUE(metadata->get_last_allocated_entry_tid(301, &last_tid));
678   ASSERT_EQ(3U, last_tid);
679 }
680
681 TYPED_TEST(TestJournalPlayer, LiveReplayLaggyAppend) {
682   std::string oid = this->get_temp_oid();
683
684   cls::journal::ObjectSetPosition commit_position;
685
686   ASSERT_EQ(0, this->create(oid));
687   ASSERT_EQ(0, this->client_register(oid));
688   ASSERT_EQ(0, this->client_commit(oid, commit_position));
689
690   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
691   ASSERT_EQ(0, this->init_metadata(metadata));
692
693   journal::JournalPlayer *player = this->create_player(oid, metadata);
694   BOOST_SCOPE_EXIT_ALL( (player) ) {
695     C_SaferCond unwatch_ctx;
696     player->shut_down(&unwatch_ctx);
697     ASSERT_EQ(0, unwatch_ctx.wait());
698   };
699
700   ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
701   ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
702   ASSERT_EQ(0, this->write_entry(oid, 0, 0, 2));
703   ASSERT_EQ(0, this->write_entry(oid, 0, 0, 4));
704   ASSERT_EQ(0, this->write_entry(oid, 3, 0, 5)); // laggy entry 0/3 in object 1
705   player->prefetch_and_watch(0.25);
706
707   Entries entries;
708   ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
709
710   Entries expected_entries = {
711     this->create_entry(0, 0),
712     this->create_entry(0, 1),
713     this->create_entry(0, 2)};
714   ASSERT_EQ(expected_entries, entries);
715
716   journal::Entry entry;
717   uint64_t commit_tid;
718   ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
719
720   ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
721   ASSERT_EQ(0, metadata->set_active_set(1));
722   ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
723
724   expected_entries = {
725     this->create_entry(0, 3),
726     this->create_entry(0, 4),
727     this->create_entry(0, 5)};
728   ASSERT_EQ(expected_entries, entries);
729 }
730
731 TYPED_TEST(TestJournalPlayer, LiveReplayMissingSequence) {
732   std::string oid = this->get_temp_oid();
733
734   cls::journal::ObjectSetPosition commit_position;
735
736   ASSERT_EQ(0, this->create(oid, 14, 4));
737   ASSERT_EQ(0, this->client_register(oid));
738   ASSERT_EQ(0, this->client_commit(oid, commit_position));
739
740   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
741   ASSERT_EQ(0, this->init_metadata(metadata));
742
743   journal::JournalPlayer *player = this->create_player(oid, metadata);
744   BOOST_SCOPE_EXIT_ALL( (player) ) {
745     C_SaferCond unwatch_ctx;
746     player->shut_down(&unwatch_ctx);
747     ASSERT_EQ(0, unwatch_ctx.wait());
748   };
749
750   ASSERT_EQ(0, this->write_entry(oid, 0, 2, 852));
751   ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
752   ASSERT_EQ(0, this->write_entry(oid, 0, 2, 860));
753   ASSERT_EQ(0, this->write_entry(oid, 1, 2, 853));
754   ASSERT_EQ(0, this->write_entry(oid, 1, 2, 857));
755   ASSERT_EQ(0, this->write_entry(oid, 2, 2, 854));
756   ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
757   player->prefetch_and_watch(0.25);
758
759   Entries entries;
760   ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
761
762   Entries expected_entries = {
763     this->create_entry(2, 852),
764     this->create_entry(2, 853),
765     this->create_entry(2, 854)};
766   ASSERT_EQ(expected_entries, entries);
767
768   journal::Entry entry;
769   uint64_t commit_tid;
770   ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
771
772   ASSERT_EQ(0, this->write_entry(oid, 3, 3, 3));
773   ASSERT_EQ(0, this->write_entry(oid, 2, 3, 2));
774   ASSERT_EQ(0, this->write_entry(oid, 1, 3, 1));
775   ASSERT_EQ(0, this->write_entry(oid, 0, 3, 0));
776   ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
777
778   expected_entries = {
779     this->create_entry(3, 0),
780     this->create_entry(3, 1),
781     this->create_entry(3, 2),
782     this->create_entry(3, 3)};
783   ASSERT_EQ(expected_entries, entries);
784 }
785
786 TYPED_TEST(TestJournalPlayer, LiveReplayLargeMissingSequence) {
787   std::string oid = this->get_temp_oid();
788
789   cls::journal::ObjectSetPosition commit_position;
790
791   ASSERT_EQ(0, this->create(oid));
792   ASSERT_EQ(0, this->client_register(oid));
793   ASSERT_EQ(0, this->client_commit(oid, commit_position));
794
795   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
796   ASSERT_EQ(0, this->init_metadata(metadata));
797
798   journal::JournalPlayer *player = this->create_player(oid, metadata);
799   BOOST_SCOPE_EXIT_ALL( (player) ) {
800     C_SaferCond unwatch_ctx;
801     player->shut_down(&unwatch_ctx);
802     ASSERT_EQ(0, unwatch_ctx.wait());
803   };
804
805   ASSERT_EQ(0, metadata->set_active_set(2));
806   ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
807   ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
808   ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
809   ASSERT_EQ(0, this->write_entry(oid, 4, 1, 0));
810   player->prefetch_and_watch(0.25);
811
812   Entries entries;
813   ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
814
815   Entries expected_entries = {
816     this->create_entry(0, 0),
817     this->create_entry(0, 1),
818     this->create_entry(1, 0)};
819   ASSERT_EQ(expected_entries, entries);
820 }
821
822 TYPED_TEST(TestJournalPlayer, LiveReplayBlockedNewTag) {
823   std::string oid = this->get_temp_oid();
824
825   cls::journal::ObjectSetPosition commit_position;
826
827   ASSERT_EQ(0, this->create(oid));
828   ASSERT_EQ(0, this->client_register(oid));
829   ASSERT_EQ(0, this->client_commit(oid, commit_position));
830
831   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
832   ASSERT_EQ(0, this->init_metadata(metadata));
833
834   journal::JournalPlayer *player = this->create_player(oid, metadata);
835   BOOST_SCOPE_EXIT_ALL( (player) ) {
836     C_SaferCond unwatch_ctx;
837     player->shut_down(&unwatch_ctx);
838     ASSERT_EQ(0, unwatch_ctx.wait());
839   };
840
841   C_SaferCond ctx1;
842   cls::journal::Tag tag1;
843   metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, {}, &tag1, &ctx1);
844   ASSERT_EQ(0, ctx1.wait());
845
846   ASSERT_EQ(0, metadata->set_active_set(0));
847   ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 0));
848   ASSERT_EQ(0, this->write_entry(oid, 1, tag1.tid, 1));
849   ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 2));
850   ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 4));
851   player->prefetch_and_watch(0.25);
852
853   Entries entries;
854   ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
855
856   Entries expected_entries = {
857     this->create_entry(tag1.tid, 0),
858     this->create_entry(tag1.tid, 1),
859     this->create_entry(tag1.tid, 2)};
860   ASSERT_EQ(expected_entries, entries);
861
862   journal::Entry entry;
863   uint64_t commit_tid;
864   ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
865
866   C_SaferCond ctx2;
867   cls::journal::Tag tag2;
868   metadata->allocate_tag(tag1.tag_class, {}, &tag2, &ctx2);
869   ASSERT_EQ(0, ctx2.wait());
870
871   ASSERT_EQ(0, this->write_entry(oid, 0, tag2.tid, 0));
872   ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
873
874   expected_entries = {
875     this->create_entry(tag2.tid, 0)};
876   ASSERT_EQ(expected_entries, entries);
877 }
878
879 TYPED_TEST(TestJournalPlayer, LiveReplayStaleEntries) {
880   std::string oid = this->get_temp_oid();
881
882   journal::JournalPlayer::ObjectPositions positions = {
883     cls::journal::ObjectPosition(0, 1, 0) };
884   cls::journal::ObjectSetPosition commit_position(positions);
885
886   ASSERT_EQ(0, this->create(oid));
887   ASSERT_EQ(0, this->client_register(oid));
888   ASSERT_EQ(0, this->client_commit(oid, commit_position));
889
890   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
891   ASSERT_EQ(0, this->init_metadata(metadata));
892
893   journal::JournalPlayer *player = this->create_player(oid, metadata);
894   BOOST_SCOPE_EXIT_ALL( (player) ) {
895     C_SaferCond unwatch_ctx;
896     player->shut_down(&unwatch_ctx);
897     ASSERT_EQ(0, unwatch_ctx.wait());
898   };
899
900   ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
901   ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
902   ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
903   ASSERT_EQ(0, this->write_entry(oid, 1, 1, 1));
904   player->prefetch_and_watch(0.25);
905
906   Entries entries;
907   ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
908
909   Entries expected_entries = {
910     this->create_entry(1, 1)};
911   ASSERT_EQ(expected_entries, entries);
912 }
913
914 TYPED_TEST(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) {
915   std::string oid = this->get_temp_oid();
916
917   journal::JournalPlayer::ObjectPositions positions = {
918     cls::journal::ObjectPosition(1, 0, 1),
919     cls::journal::ObjectPosition(0, 0, 0)};
920   cls::journal::ObjectSetPosition commit_position(positions);
921
922   ASSERT_EQ(0, this->create(oid));
923   ASSERT_EQ(0, this->client_register(oid));
924   ASSERT_EQ(0, this->client_commit(oid, commit_position));
925
926   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
927   ASSERT_EQ(0, this->init_metadata(metadata));
928
929   journal::JournalPlayer *player = this->create_player(oid, metadata);
930   BOOST_SCOPE_EXIT_ALL( (player) ) {
931     C_SaferCond unwatch_ctx;
932     player->shut_down(&unwatch_ctx);
933     ASSERT_EQ(0, unwatch_ctx.wait());
934   };
935
936   ASSERT_EQ(0, metadata->set_active_set(1));
937   ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
938   ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
939   ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
940   ASSERT_EQ(0, this->write_entry(oid, 2, 1, 0));
941   player->prefetch_and_watch(0.25);
942
943   Entries entries;
944   ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
945
946   Entries expected_entries = {
947     this->create_entry(1, 0)};
948   ASSERT_EQ(expected_entries, entries);
949
950   // should remove player for offset 3 after refetching
951   ASSERT_EQ(0, metadata->set_active_set(3));
952   ASSERT_EQ(0, this->write_entry(oid, 7, 1, 1));
953
954   ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
955
956   expected_entries = {
957     this->create_entry(1, 1)};
958   ASSERT_EQ(expected_entries, entries);
959 }
960
961 TYPED_TEST(TestJournalPlayer, PrefechShutDown) {
962   std::string oid = this->get_temp_oid();
963
964   ASSERT_EQ(0, this->create(oid));
965   ASSERT_EQ(0, this->client_register(oid));
966   ASSERT_EQ(0, this->client_commit(oid, {}));
967
968   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
969   ASSERT_EQ(0, this->init_metadata(metadata));
970
971   journal::JournalPlayer *player = this->create_player(oid, metadata);
972   BOOST_SCOPE_EXIT_ALL( (player) ) {
973     C_SaferCond unwatch_ctx;
974     player->shut_down(&unwatch_ctx);
975     ASSERT_EQ(0, unwatch_ctx.wait());
976   };
977   player->prefetch();
978 }
979
980 TYPED_TEST(TestJournalPlayer, LiveReplayShutDown) {
981   std::string oid = this->get_temp_oid();
982
983   ASSERT_EQ(0, this->create(oid));
984   ASSERT_EQ(0, this->client_register(oid));
985   ASSERT_EQ(0, this->client_commit(oid, {}));
986
987   journal::JournalMetadataPtr metadata = this->create_metadata(oid);
988   ASSERT_EQ(0, this->init_metadata(metadata));
989
990   journal::JournalPlayer *player = this->create_player(oid, metadata);
991   BOOST_SCOPE_EXIT_ALL( (player) ) {
992     C_SaferCond unwatch_ctx;
993     player->shut_down(&unwatch_ctx);
994     ASSERT_EQ(0, unwatch_ctx.wait());
995   };
996   player->prefetch_and_watch(0.25);
997 }
998