1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "journal/JournalPlayer.h"
5 #include "journal/Entry.h"
6 #include "journal/JournalMetadata.h"
7 #include "journal/ReplayHandler.h"
8 #include "include/stringify.h"
9 #include "common/Cond.h"
10 #include "common/Mutex.h"
11 #include "gtest/gtest.h"
12 #include "test/journal/RadosTestFixture.h"
14 #include <boost/scope_exit.hpp>
16 typedef std::list<journal::Entry> Entries;
19 class TestJournalPlayer : public RadosTestFixture {
21 typedef std::list<journal::JournalPlayer *> JournalPlayers;
23 static const uint64_t max_fetch_bytes = T::max_fetch_bytes;
25 struct ReplayHandler : public journal::ReplayHandler {
28 bool entries_available;
33 : lock("lock"), entries_available(false), complete(false),
36 void get() override {}
37 void put() override {}
39 void handle_entries_available() override {
40 Mutex::Locker locker(lock);
41 entries_available = true;
45 void handle_complete(int r) override {
46 Mutex::Locker locker(lock);
53 void TearDown() override {
54 for (JournalPlayers::iterator it = m_players.begin();
55 it != m_players.end(); ++it) {
58 RadosTestFixture::TearDown();
61 journal::JournalMetadataPtr create_metadata(const std::string &oid) {
62 return RadosTestFixture::create_metadata(oid, "client", 0.1,
66 int client_commit(const std::string &oid,
67 journal::JournalPlayer::ObjectSetPosition position) {
68 return RadosTestFixture::client_commit(oid, "client", position);
71 journal::Entry create_entry(uint64_t tag_tid, uint64_t entry_tid) {
72 std::string payload(128, '0');
73 bufferlist payload_bl;
74 payload_bl.append(payload);
75 return journal::Entry(tag_tid, entry_tid, payload_bl);
78 journal::JournalPlayer *create_player(const std::string &oid,
79 const journal::JournalMetadataPtr &metadata) {
80 journal::JournalPlayer *player(new journal::JournalPlayer(
81 m_ioctx, oid + ".", metadata, &m_replay_hander));
82 m_players.push_back(player);
86 bool wait_for_entries(journal::JournalPlayer *player, uint32_t count,
89 while (entries->size() < count) {
92 while (entries->size() < count &&
93 player->try_pop_front(&entry, &commit_tid)) {
94 entries->push_back(entry);
96 if (entries->size() == count) {
100 Mutex::Locker locker(m_replay_hander.lock);
101 if (m_replay_hander.entries_available) {
102 m_replay_hander.entries_available = false;
103 } else if (m_replay_hander.cond.WaitInterval(
104 m_replay_hander.lock, utime_t(10, 0)) != 0) {
108 return entries->size() == count;
111 bool wait_for_complete(journal::JournalPlayer *player) {
112 Mutex::Locker locker(m_replay_hander.lock);
113 while (!m_replay_hander.complete) {
114 journal::Entry entry;
116 player->try_pop_front(&entry, &commit_tid);
118 if (m_replay_hander.cond.WaitInterval(
119 m_replay_hander.lock, utime_t(10, 0)) != 0) {
123 m_replay_hander.complete = false;
127 int write_entry(const std::string &oid, uint64_t object_num,
128 uint64_t tag_tid, uint64_t entry_tid) {
130 ::encode(create_entry(tag_tid, entry_tid), bl);
131 return append(oid + "." + stringify(object_num), bl);
134 JournalPlayers m_players;
135 ReplayHandler m_replay_hander;
138 template <uint64_t _max_fetch_bytes>
139 class TestJournalPlayerParams {
141 static const uint64_t max_fetch_bytes = _max_fetch_bytes;
144 typedef ::testing::Types<TestJournalPlayerParams<0>,
145 TestJournalPlayerParams<16> > TestJournalPlayerTypes;
146 TYPED_TEST_CASE(TestJournalPlayer, TestJournalPlayerTypes);
148 TYPED_TEST(TestJournalPlayer, Prefetch) {
149 std::string oid = this->get_temp_oid();
151 journal::JournalPlayer::ObjectPositions positions;
153 cls::journal::ObjectPosition(0, 234, 122) };
154 cls::journal::ObjectSetPosition commit_position(positions);
156 ASSERT_EQ(0, this->create(oid));
157 ASSERT_EQ(0, this->client_register(oid));
158 ASSERT_EQ(0, this->client_commit(oid, commit_position));
160 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
161 ASSERT_EQ(0, this->init_metadata(metadata));
163 journal::JournalPlayer *player = this->create_player(oid, metadata);
164 BOOST_SCOPE_EXIT_ALL( (player) ) {
165 C_SaferCond unwatch_ctx;
166 player->shut_down(&unwatch_ctx);
167 ASSERT_EQ(0, unwatch_ctx.wait());
170 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
171 ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
172 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
173 ASSERT_EQ(0, this->write_entry(oid, 1, 234, 125));
178 ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
179 ASSERT_TRUE(this->wait_for_complete(player));
181 Entries expected_entries;
183 this->create_entry(234, 123),
184 this->create_entry(234, 124),
185 this->create_entry(234, 125)};
186 ASSERT_EQ(expected_entries, entries);
189 ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
190 ASSERT_EQ(125U, last_tid);
193 TYPED_TEST(TestJournalPlayer, PrefetchSkip) {
194 std::string oid = this->get_temp_oid();
196 journal::JournalPlayer::ObjectPositions positions;
198 cls::journal::ObjectPosition(0, 234, 125),
199 cls::journal::ObjectPosition(1, 234, 124) };
200 cls::journal::ObjectSetPosition commit_position(positions);
202 ASSERT_EQ(0, this->create(oid));
203 ASSERT_EQ(0, this->client_register(oid));
204 ASSERT_EQ(0, this->client_commit(oid, commit_position));
206 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
207 ASSERT_EQ(0, this->init_metadata(metadata));
209 journal::JournalPlayer *player = this->create_player(oid, metadata);
210 BOOST_SCOPE_EXIT_ALL( (player) ) {
211 C_SaferCond unwatch_ctx;
212 player->shut_down(&unwatch_ctx);
213 ASSERT_EQ(0, unwatch_ctx.wait());
216 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
217 ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
218 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
219 ASSERT_EQ(0, this->write_entry(oid, 1, 234, 125));
224 ASSERT_TRUE(this->wait_for_entries(player, 0, &entries));
225 ASSERT_TRUE(this->wait_for_complete(player));
228 ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
229 ASSERT_EQ(125U, last_tid);
232 TYPED_TEST(TestJournalPlayer, PrefetchWithoutCommit) {
233 std::string oid = this->get_temp_oid();
235 cls::journal::ObjectSetPosition commit_position;
237 ASSERT_EQ(0, this->create(oid));
238 ASSERT_EQ(0, this->client_register(oid));
239 ASSERT_EQ(0, this->client_commit(oid, commit_position));
241 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
242 ASSERT_EQ(0, this->init_metadata(metadata));
244 journal::JournalPlayer *player = this->create_player(oid, metadata);
245 BOOST_SCOPE_EXIT_ALL( (player) ) {
246 C_SaferCond unwatch_ctx;
247 player->shut_down(&unwatch_ctx);
248 ASSERT_EQ(0, unwatch_ctx.wait());
251 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
252 ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
257 ASSERT_TRUE(this->wait_for_entries(player, 2, &entries));
258 ASSERT_TRUE(this->wait_for_complete(player));
260 Entries expected_entries;
262 this->create_entry(234, 122),
263 this->create_entry(234, 123)};
264 ASSERT_EQ(expected_entries, entries);
267 TYPED_TEST(TestJournalPlayer, PrefetchMultipleTags) {
268 std::string oid = this->get_temp_oid();
270 journal::JournalPlayer::ObjectPositions positions;
272 cls::journal::ObjectPosition(2, 234, 122),
273 cls::journal::ObjectPosition(1, 234, 121),
274 cls::journal::ObjectPosition(0, 234, 120)};
275 cls::journal::ObjectSetPosition commit_position(positions);
277 ASSERT_EQ(0, this->create(oid, 14, 3));
278 ASSERT_EQ(0, this->client_register(oid));
279 ASSERT_EQ(0, this->client_commit(oid, commit_position));
281 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
282 ASSERT_EQ(0, this->init_metadata(metadata));
284 journal::JournalPlayer *player = this->create_player(oid, metadata);
285 BOOST_SCOPE_EXIT_ALL( (player) ) {
286 C_SaferCond unwatch_ctx;
287 player->shut_down(&unwatch_ctx);
288 ASSERT_EQ(0, unwatch_ctx.wait());
291 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
292 ASSERT_EQ(0, this->write_entry(oid, 1, 234, 121));
293 ASSERT_EQ(0, this->write_entry(oid, 2, 234, 122));
294 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 123));
295 ASSERT_EQ(0, this->write_entry(oid, 1, 234, 124));
296 ASSERT_EQ(0, this->write_entry(oid, 0, 236, 0)); // new tag allocated
301 ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
302 ASSERT_TRUE(this->wait_for_complete(player));
305 ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
306 ASSERT_EQ(124U, last_tid);
307 ASSERT_TRUE(metadata->get_last_allocated_entry_tid(236, &last_tid));
308 ASSERT_EQ(0U, last_tid);
311 TYPED_TEST(TestJournalPlayer, PrefetchCorruptSequence) {
312 std::string oid = this->get_temp_oid();
314 cls::journal::ObjectSetPosition commit_position;
316 ASSERT_EQ(0, this->create(oid));
317 ASSERT_EQ(0, this->client_register(oid));
318 ASSERT_EQ(0, this->client_commit(oid, commit_position));
320 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
321 ASSERT_EQ(0, this->init_metadata(metadata));
323 journal::JournalPlayer *player = this->create_player(oid, metadata);
324 BOOST_SCOPE_EXIT_ALL( (player) ) {
325 C_SaferCond unwatch_ctx;
326 player->shut_down(&unwatch_ctx);
327 ASSERT_EQ(0, unwatch_ctx.wait());
330 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
331 ASSERT_EQ(0, this->write_entry(oid, 1, 234, 121));
332 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
336 ASSERT_TRUE(this->wait_for_entries(player, 2, &entries));
338 journal::Entry entry;
340 ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
341 ASSERT_TRUE(this->wait_for_complete(player));
342 ASSERT_EQ(-ENOMSG, this->m_replay_hander.complete_result);
345 TYPED_TEST(TestJournalPlayer, PrefetchMissingSequence) {
346 std::string oid = this->get_temp_oid();
348 cls::journal::ObjectSetPosition commit_position;
350 ASSERT_EQ(0, this->create(oid, 14, 4));
351 ASSERT_EQ(0, this->client_register(oid));
352 ASSERT_EQ(0, this->client_commit(oid, commit_position));
354 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
355 ASSERT_EQ(0, this->init_metadata(metadata));
357 journal::JournalPlayer *player = this->create_player(oid, metadata);
358 BOOST_SCOPE_EXIT_ALL( (player) ) {
359 C_SaferCond unwatch_ctx;
360 player->shut_down(&unwatch_ctx);
361 ASSERT_EQ(0, unwatch_ctx.wait());
364 ASSERT_EQ(0, metadata->set_active_set(1));
365 ASSERT_EQ(0, this->write_entry(oid, 0, 2, 852));
366 ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
367 ASSERT_EQ(0, this->write_entry(oid, 0, 2, 860));
368 ASSERT_EQ(0, this->write_entry(oid, 1, 2, 853));
369 ASSERT_EQ(0, this->write_entry(oid, 1, 2, 857));
370 ASSERT_EQ(0, this->write_entry(oid, 5, 2, 861));
371 ASSERT_EQ(0, this->write_entry(oid, 2, 2, 854));
372 ASSERT_EQ(0, this->write_entry(oid, 0, 3, 0));
373 ASSERT_EQ(0, this->write_entry(oid, 5, 3, 1));
374 ASSERT_EQ(0, this->write_entry(oid, 2, 3, 2));
375 ASSERT_EQ(0, this->write_entry(oid, 3, 3, 3));
379 ASSERT_TRUE(this->wait_for_entries(player, 7, &entries));
381 Entries expected_entries = {
382 this->create_entry(2, 852),
383 this->create_entry(2, 853),
384 this->create_entry(2, 854),
385 this->create_entry(3, 0),
386 this->create_entry(3, 1),
387 this->create_entry(3, 2),
388 this->create_entry(3, 3)};
389 ASSERT_EQ(expected_entries, entries);
391 ASSERT_TRUE(this->wait_for_complete(player));
392 ASSERT_EQ(0, this->m_replay_hander.complete_result);
395 TYPED_TEST(TestJournalPlayer, PrefetchLargeMissingSequence) {
396 std::string oid = this->get_temp_oid();
398 cls::journal::ObjectSetPosition commit_position;
400 ASSERT_EQ(0, this->create(oid));
401 ASSERT_EQ(0, this->client_register(oid));
402 ASSERT_EQ(0, this->client_commit(oid, commit_position));
404 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
405 ASSERT_EQ(0, this->init_metadata(metadata));
407 journal::JournalPlayer *player = this->create_player(oid, metadata);
408 BOOST_SCOPE_EXIT_ALL( (player) ) {
409 C_SaferCond unwatch_ctx;
410 player->shut_down(&unwatch_ctx);
411 ASSERT_EQ(0, unwatch_ctx.wait());
414 ASSERT_EQ(0, metadata->set_active_set(2));
415 ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
416 ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
417 ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
418 ASSERT_EQ(0, this->write_entry(oid, 4, 1, 0));
422 ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
424 Entries expected_entries = {
425 this->create_entry(0, 0),
426 this->create_entry(0, 1),
427 this->create_entry(1, 0)};
428 ASSERT_EQ(expected_entries, entries);
431 TYPED_TEST(TestJournalPlayer, PrefetchBlockedNewTag) {
432 std::string oid = this->get_temp_oid();
434 cls::journal::ObjectSetPosition commit_position;
436 ASSERT_EQ(0, this->create(oid));
437 ASSERT_EQ(0, this->client_register(oid));
438 ASSERT_EQ(0, this->client_commit(oid, commit_position));
440 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
441 ASSERT_EQ(0, this->init_metadata(metadata));
443 journal::JournalPlayer *player = this->create_player(oid, metadata);
444 BOOST_SCOPE_EXIT_ALL( (player) ) {
445 C_SaferCond unwatch_ctx;
446 player->shut_down(&unwatch_ctx);
447 ASSERT_EQ(0, unwatch_ctx.wait());
450 ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
451 ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
452 ASSERT_EQ(0, this->write_entry(oid, 0, 0, 2));
453 ASSERT_EQ(0, this->write_entry(oid, 0, 0, 4));
454 ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
458 ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
460 Entries expected_entries = {
461 this->create_entry(0, 0),
462 this->create_entry(0, 1),
463 this->create_entry(0, 2),
464 this->create_entry(1, 0)};
465 ASSERT_EQ(expected_entries, entries);
468 TYPED_TEST(TestJournalPlayer, PrefetchStaleEntries) {
469 std::string oid = this->get_temp_oid();
471 journal::JournalPlayer::ObjectPositions positions = {
472 cls::journal::ObjectPosition(0, 1, 0) };
473 cls::journal::ObjectSetPosition commit_position(positions);
475 ASSERT_EQ(0, this->create(oid));
476 ASSERT_EQ(0, this->client_register(oid));
477 ASSERT_EQ(0, this->client_commit(oid, commit_position));
479 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
480 ASSERT_EQ(0, this->init_metadata(metadata));
482 journal::JournalPlayer *player = this->create_player(oid, metadata);
483 BOOST_SCOPE_EXIT_ALL( (player) ) {
484 C_SaferCond unwatch_ctx;
485 player->shut_down(&unwatch_ctx);
486 ASSERT_EQ(0, unwatch_ctx.wait());
489 ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
490 ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
491 ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
492 ASSERT_EQ(0, this->write_entry(oid, 1, 1, 1));
496 ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
498 Entries expected_entries = {
499 this->create_entry(1, 1)};
500 ASSERT_EQ(expected_entries, entries);
502 ASSERT_TRUE(this->wait_for_complete(player));
503 ASSERT_EQ(0, this->m_replay_hander.complete_result);
506 TYPED_TEST(TestJournalPlayer, PrefetchUnexpectedTag) {
507 std::string oid = this->get_temp_oid();
509 cls::journal::ObjectSetPosition commit_position;
511 ASSERT_EQ(0, this->create(oid));
512 ASSERT_EQ(0, this->client_register(oid));
513 ASSERT_EQ(0, this->client_commit(oid, commit_position));
515 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
516 ASSERT_EQ(0, this->init_metadata(metadata));
518 journal::JournalPlayer *player = this->create_player(oid, metadata);
519 BOOST_SCOPE_EXIT_ALL( (player) ) {
520 C_SaferCond unwatch_ctx;
521 player->shut_down(&unwatch_ctx);
522 ASSERT_EQ(0, unwatch_ctx.wait());
525 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
526 ASSERT_EQ(0, this->write_entry(oid, 1, 235, 121));
527 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
531 ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
533 journal::Entry entry;
535 ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
536 ASSERT_TRUE(this->wait_for_complete(player));
537 ASSERT_EQ(0, this->m_replay_hander.complete_result);
540 TYPED_TEST(TestJournalPlayer, PrefetchAndWatch) {
541 std::string oid = this->get_temp_oid();
543 journal::JournalPlayer::ObjectPositions positions;
545 cls::journal::ObjectPosition(0, 234, 122)};
546 cls::journal::ObjectSetPosition commit_position(positions);
548 ASSERT_EQ(0, this->create(oid));
549 ASSERT_EQ(0, this->client_register(oid));
550 ASSERT_EQ(0, this->client_commit(oid, commit_position));
552 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
553 ASSERT_EQ(0, this->init_metadata(metadata));
555 journal::JournalPlayer *player = this->create_player(oid, metadata);
556 BOOST_SCOPE_EXIT_ALL( (player) ) {
557 C_SaferCond unwatch_ctx;
558 player->shut_down(&unwatch_ctx);
559 ASSERT_EQ(0, unwatch_ctx.wait());
562 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
564 player->prefetch_and_watch(0.25);
567 ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
568 ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
570 Entries expected_entries;
571 expected_entries = {this->create_entry(234, 123)};
572 ASSERT_EQ(expected_entries, entries);
574 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
575 ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
577 expected_entries = {this->create_entry(234, 124)};
578 ASSERT_EQ(expected_entries, entries);
581 TYPED_TEST(TestJournalPlayer, PrefetchSkippedObject) {
582 std::string oid = this->get_temp_oid();
584 cls::journal::ObjectSetPosition commit_position;
586 ASSERT_EQ(0, this->create(oid, 14, 3));
587 ASSERT_EQ(0, this->client_register(oid));
588 ASSERT_EQ(0, this->client_commit(oid, commit_position));
590 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
591 ASSERT_EQ(0, this->init_metadata(metadata));
592 ASSERT_EQ(0, metadata->set_active_set(2));
594 journal::JournalPlayer *player = this->create_player(oid, metadata);
595 BOOST_SCOPE_EXIT_ALL( (player) ) {
596 C_SaferCond unwatch_ctx;
597 player->shut_down(&unwatch_ctx);
598 ASSERT_EQ(0, unwatch_ctx.wait());
601 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
602 ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
603 ASSERT_EQ(0, this->write_entry(oid, 5, 234, 124));
604 ASSERT_EQ(0, this->write_entry(oid, 6, 234, 125));
605 ASSERT_EQ(0, this->write_entry(oid, 7, 234, 126));
610 ASSERT_TRUE(this->wait_for_entries(player, 5, &entries));
611 ASSERT_TRUE(this->wait_for_complete(player));
613 Entries expected_entries;
615 this->create_entry(234, 122),
616 this->create_entry(234, 123),
617 this->create_entry(234, 124),
618 this->create_entry(234, 125),
619 this->create_entry(234, 126)};
620 ASSERT_EQ(expected_entries, entries);
623 ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
624 ASSERT_EQ(126U, last_tid);
627 TYPED_TEST(TestJournalPlayer, ImbalancedJournal) {
628 std::string oid = this->get_temp_oid();
630 journal::JournalPlayer::ObjectPositions positions = {
631 cls::journal::ObjectPosition(9, 300, 1),
632 cls::journal::ObjectPosition(8, 300, 0),
633 cls::journal::ObjectPosition(10, 200, 4334),
634 cls::journal::ObjectPosition(11, 200, 4331) };
635 cls::journal::ObjectSetPosition commit_position(positions);
637 ASSERT_EQ(0, this->create(oid, 14, 4));
638 ASSERT_EQ(0, this->client_register(oid));
639 ASSERT_EQ(0, this->client_commit(oid, commit_position));
641 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
642 ASSERT_EQ(0, this->init_metadata(metadata));
643 ASSERT_EQ(0, metadata->set_active_set(2));
644 metadata->set_minimum_set(2);
646 journal::JournalPlayer *player = this->create_player(oid, metadata);
647 BOOST_SCOPE_EXIT_ALL( (player) ) {
648 C_SaferCond unwatch_ctx;
649 player->shut_down(&unwatch_ctx);
650 ASSERT_EQ(0, unwatch_ctx.wait());
653 ASSERT_EQ(0, this->write_entry(oid, 8, 300, 0));
654 ASSERT_EQ(0, this->write_entry(oid, 8, 301, 0));
655 ASSERT_EQ(0, this->write_entry(oid, 9, 300, 1));
656 ASSERT_EQ(0, this->write_entry(oid, 9, 301, 1));
657 ASSERT_EQ(0, this->write_entry(oid, 10, 200, 4334));
658 ASSERT_EQ(0, this->write_entry(oid, 10, 301, 2));
659 ASSERT_EQ(0, this->write_entry(oid, 11, 200, 4331));
660 ASSERT_EQ(0, this->write_entry(oid, 11, 301, 3));
665 ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
666 ASSERT_TRUE(this->wait_for_complete(player));
668 Entries expected_entries;
670 this->create_entry(301, 0),
671 this->create_entry(301, 1),
672 this->create_entry(301, 2),
673 this->create_entry(301, 3)};
674 ASSERT_EQ(expected_entries, entries);
677 ASSERT_TRUE(metadata->get_last_allocated_entry_tid(301, &last_tid));
678 ASSERT_EQ(3U, last_tid);
681 TYPED_TEST(TestJournalPlayer, LiveReplayLaggyAppend) {
682 std::string oid = this->get_temp_oid();
684 cls::journal::ObjectSetPosition commit_position;
686 ASSERT_EQ(0, this->create(oid));
687 ASSERT_EQ(0, this->client_register(oid));
688 ASSERT_EQ(0, this->client_commit(oid, commit_position));
690 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
691 ASSERT_EQ(0, this->init_metadata(metadata));
693 journal::JournalPlayer *player = this->create_player(oid, metadata);
694 BOOST_SCOPE_EXIT_ALL( (player) ) {
695 C_SaferCond unwatch_ctx;
696 player->shut_down(&unwatch_ctx);
697 ASSERT_EQ(0, unwatch_ctx.wait());
700 ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
701 ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
702 ASSERT_EQ(0, this->write_entry(oid, 0, 0, 2));
703 ASSERT_EQ(0, this->write_entry(oid, 0, 0, 4));
704 ASSERT_EQ(0, this->write_entry(oid, 3, 0, 5)); // laggy entry 0/3 in object 1
705 player->prefetch_and_watch(0.25);
708 ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
710 Entries expected_entries = {
711 this->create_entry(0, 0),
712 this->create_entry(0, 1),
713 this->create_entry(0, 2)};
714 ASSERT_EQ(expected_entries, entries);
716 journal::Entry entry;
718 ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
720 ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
721 ASSERT_EQ(0, metadata->set_active_set(1));
722 ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
725 this->create_entry(0, 3),
726 this->create_entry(0, 4),
727 this->create_entry(0, 5)};
728 ASSERT_EQ(expected_entries, entries);
731 TYPED_TEST(TestJournalPlayer, LiveReplayMissingSequence) {
732 std::string oid = this->get_temp_oid();
734 cls::journal::ObjectSetPosition commit_position;
736 ASSERT_EQ(0, this->create(oid, 14, 4));
737 ASSERT_EQ(0, this->client_register(oid));
738 ASSERT_EQ(0, this->client_commit(oid, commit_position));
740 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
741 ASSERT_EQ(0, this->init_metadata(metadata));
743 journal::JournalPlayer *player = this->create_player(oid, metadata);
744 BOOST_SCOPE_EXIT_ALL( (player) ) {
745 C_SaferCond unwatch_ctx;
746 player->shut_down(&unwatch_ctx);
747 ASSERT_EQ(0, unwatch_ctx.wait());
750 ASSERT_EQ(0, this->write_entry(oid, 0, 2, 852));
751 ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
752 ASSERT_EQ(0, this->write_entry(oid, 0, 2, 860));
753 ASSERT_EQ(0, this->write_entry(oid, 1, 2, 853));
754 ASSERT_EQ(0, this->write_entry(oid, 1, 2, 857));
755 ASSERT_EQ(0, this->write_entry(oid, 2, 2, 854));
756 ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
757 player->prefetch_and_watch(0.25);
760 ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
762 Entries expected_entries = {
763 this->create_entry(2, 852),
764 this->create_entry(2, 853),
765 this->create_entry(2, 854)};
766 ASSERT_EQ(expected_entries, entries);
768 journal::Entry entry;
770 ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
772 ASSERT_EQ(0, this->write_entry(oid, 3, 3, 3));
773 ASSERT_EQ(0, this->write_entry(oid, 2, 3, 2));
774 ASSERT_EQ(0, this->write_entry(oid, 1, 3, 1));
775 ASSERT_EQ(0, this->write_entry(oid, 0, 3, 0));
776 ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
779 this->create_entry(3, 0),
780 this->create_entry(3, 1),
781 this->create_entry(3, 2),
782 this->create_entry(3, 3)};
783 ASSERT_EQ(expected_entries, entries);
786 TYPED_TEST(TestJournalPlayer, LiveReplayLargeMissingSequence) {
787 std::string oid = this->get_temp_oid();
789 cls::journal::ObjectSetPosition commit_position;
791 ASSERT_EQ(0, this->create(oid));
792 ASSERT_EQ(0, this->client_register(oid));
793 ASSERT_EQ(0, this->client_commit(oid, commit_position));
795 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
796 ASSERT_EQ(0, this->init_metadata(metadata));
798 journal::JournalPlayer *player = this->create_player(oid, metadata);
799 BOOST_SCOPE_EXIT_ALL( (player) ) {
800 C_SaferCond unwatch_ctx;
801 player->shut_down(&unwatch_ctx);
802 ASSERT_EQ(0, unwatch_ctx.wait());
805 ASSERT_EQ(0, metadata->set_active_set(2));
806 ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
807 ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
808 ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
809 ASSERT_EQ(0, this->write_entry(oid, 4, 1, 0));
810 player->prefetch_and_watch(0.25);
813 ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
815 Entries expected_entries = {
816 this->create_entry(0, 0),
817 this->create_entry(0, 1),
818 this->create_entry(1, 0)};
819 ASSERT_EQ(expected_entries, entries);
822 TYPED_TEST(TestJournalPlayer, LiveReplayBlockedNewTag) {
823 std::string oid = this->get_temp_oid();
825 cls::journal::ObjectSetPosition commit_position;
827 ASSERT_EQ(0, this->create(oid));
828 ASSERT_EQ(0, this->client_register(oid));
829 ASSERT_EQ(0, this->client_commit(oid, commit_position));
831 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
832 ASSERT_EQ(0, this->init_metadata(metadata));
834 journal::JournalPlayer *player = this->create_player(oid, metadata);
835 BOOST_SCOPE_EXIT_ALL( (player) ) {
836 C_SaferCond unwatch_ctx;
837 player->shut_down(&unwatch_ctx);
838 ASSERT_EQ(0, unwatch_ctx.wait());
842 cls::journal::Tag tag1;
843 metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, {}, &tag1, &ctx1);
844 ASSERT_EQ(0, ctx1.wait());
846 ASSERT_EQ(0, metadata->set_active_set(0));
847 ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 0));
848 ASSERT_EQ(0, this->write_entry(oid, 1, tag1.tid, 1));
849 ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 2));
850 ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 4));
851 player->prefetch_and_watch(0.25);
854 ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
856 Entries expected_entries = {
857 this->create_entry(tag1.tid, 0),
858 this->create_entry(tag1.tid, 1),
859 this->create_entry(tag1.tid, 2)};
860 ASSERT_EQ(expected_entries, entries);
862 journal::Entry entry;
864 ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
867 cls::journal::Tag tag2;
868 metadata->allocate_tag(tag1.tag_class, {}, &tag2, &ctx2);
869 ASSERT_EQ(0, ctx2.wait());
871 ASSERT_EQ(0, this->write_entry(oid, 0, tag2.tid, 0));
872 ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
875 this->create_entry(tag2.tid, 0)};
876 ASSERT_EQ(expected_entries, entries);
879 TYPED_TEST(TestJournalPlayer, LiveReplayStaleEntries) {
880 std::string oid = this->get_temp_oid();
882 journal::JournalPlayer::ObjectPositions positions = {
883 cls::journal::ObjectPosition(0, 1, 0) };
884 cls::journal::ObjectSetPosition commit_position(positions);
886 ASSERT_EQ(0, this->create(oid));
887 ASSERT_EQ(0, this->client_register(oid));
888 ASSERT_EQ(0, this->client_commit(oid, commit_position));
890 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
891 ASSERT_EQ(0, this->init_metadata(metadata));
893 journal::JournalPlayer *player = this->create_player(oid, metadata);
894 BOOST_SCOPE_EXIT_ALL( (player) ) {
895 C_SaferCond unwatch_ctx;
896 player->shut_down(&unwatch_ctx);
897 ASSERT_EQ(0, unwatch_ctx.wait());
900 ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
901 ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
902 ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
903 ASSERT_EQ(0, this->write_entry(oid, 1, 1, 1));
904 player->prefetch_and_watch(0.25);
907 ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
909 Entries expected_entries = {
910 this->create_entry(1, 1)};
911 ASSERT_EQ(expected_entries, entries);
914 TYPED_TEST(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) {
915 std::string oid = this->get_temp_oid();
917 journal::JournalPlayer::ObjectPositions positions = {
918 cls::journal::ObjectPosition(1, 0, 1),
919 cls::journal::ObjectPosition(0, 0, 0)};
920 cls::journal::ObjectSetPosition commit_position(positions);
922 ASSERT_EQ(0, this->create(oid));
923 ASSERT_EQ(0, this->client_register(oid));
924 ASSERT_EQ(0, this->client_commit(oid, commit_position));
926 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
927 ASSERT_EQ(0, this->init_metadata(metadata));
929 journal::JournalPlayer *player = this->create_player(oid, metadata);
930 BOOST_SCOPE_EXIT_ALL( (player) ) {
931 C_SaferCond unwatch_ctx;
932 player->shut_down(&unwatch_ctx);
933 ASSERT_EQ(0, unwatch_ctx.wait());
936 ASSERT_EQ(0, metadata->set_active_set(1));
937 ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
938 ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
939 ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
940 ASSERT_EQ(0, this->write_entry(oid, 2, 1, 0));
941 player->prefetch_and_watch(0.25);
944 ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
946 Entries expected_entries = {
947 this->create_entry(1, 0)};
948 ASSERT_EQ(expected_entries, entries);
950 // should remove player for offset 3 after refetching
951 ASSERT_EQ(0, metadata->set_active_set(3));
952 ASSERT_EQ(0, this->write_entry(oid, 7, 1, 1));
954 ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
957 this->create_entry(1, 1)};
958 ASSERT_EQ(expected_entries, entries);
961 TYPED_TEST(TestJournalPlayer, PrefechShutDown) {
962 std::string oid = this->get_temp_oid();
964 ASSERT_EQ(0, this->create(oid));
965 ASSERT_EQ(0, this->client_register(oid));
966 ASSERT_EQ(0, this->client_commit(oid, {}));
968 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
969 ASSERT_EQ(0, this->init_metadata(metadata));
971 journal::JournalPlayer *player = this->create_player(oid, metadata);
972 BOOST_SCOPE_EXIT_ALL( (player) ) {
973 C_SaferCond unwatch_ctx;
974 player->shut_down(&unwatch_ctx);
975 ASSERT_EQ(0, unwatch_ctx.wait());
980 TYPED_TEST(TestJournalPlayer, LiveReplayShutDown) {
981 std::string oid = this->get_temp_oid();
983 ASSERT_EQ(0, this->create(oid));
984 ASSERT_EQ(0, this->client_register(oid));
985 ASSERT_EQ(0, this->client_commit(oid, {}));
987 journal::JournalMetadataPtr metadata = this->create_metadata(oid);
988 ASSERT_EQ(0, this->init_metadata(metadata));
990 journal::JournalPlayer *player = this->create_player(oid, metadata);
991 BOOST_SCOPE_EXIT_ALL( (player) ) {
992 C_SaferCond unwatch_ctx;
993 player->shut_down(&unwatch_ctx);
994 ASSERT_EQ(0, unwatch_ctx.wait());
996 player->prefetch_and_watch(0.25);