X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftest%2Fjournal%2Ftest_JournalPlayer.cc;fp=src%2Fceph%2Fsrc%2Ftest%2Fjournal%2Ftest_JournalPlayer.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=7522198ec18ace9465f198c96279c6268457174d;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/test/journal/test_JournalPlayer.cc b/src/ceph/src/test/journal/test_JournalPlayer.cc deleted file mode 100644 index 7522198..0000000 --- a/src/ceph/src/test/journal/test_JournalPlayer.cc +++ /dev/null @@ -1,998 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include "journal/JournalPlayer.h" -#include "journal/Entry.h" -#include "journal/JournalMetadata.h" -#include "journal/ReplayHandler.h" -#include "include/stringify.h" -#include "common/Cond.h" -#include "common/Mutex.h" -#include "gtest/gtest.h" -#include "test/journal/RadosTestFixture.h" -#include -#include - -typedef std::list Entries; - -template -class TestJournalPlayer : public RadosTestFixture { -public: - typedef std::list JournalPlayers; - - static const uint64_t max_fetch_bytes = T::max_fetch_bytes; - - struct ReplayHandler : public journal::ReplayHandler { - Mutex lock; - Cond cond; - bool entries_available; - bool complete; - int complete_result; - - ReplayHandler() - : lock("lock"), entries_available(false), complete(false), - complete_result(0) {} - - void get() override {} - void put() override {} - - void handle_entries_available() override { - Mutex::Locker locker(lock); - entries_available = true; - cond.Signal(); - } - - void handle_complete(int r) override { - Mutex::Locker locker(lock); - complete = true; - complete_result = r; - cond.Signal(); - } - }; - - void TearDown() override { - for (JournalPlayers::iterator it = m_players.begin(); - it != m_players.end(); ++it) { - delete *it; - } - RadosTestFixture::TearDown(); - } - - journal::JournalMetadataPtr create_metadata(const std::string &oid) { - return RadosTestFixture::create_metadata(oid, "client", 0.1, - max_fetch_bytes); - } - - int client_commit(const std::string &oid, - journal::JournalPlayer::ObjectSetPosition position) { - return RadosTestFixture::client_commit(oid, "client", position); - } - - journal::Entry create_entry(uint64_t tag_tid, uint64_t entry_tid) { - std::string payload(128, '0'); - bufferlist payload_bl; - payload_bl.append(payload); - return journal::Entry(tag_tid, entry_tid, payload_bl); - } - - journal::JournalPlayer *create_player(const std::string &oid, - const journal::JournalMetadataPtr &metadata) { - journal::JournalPlayer *player(new journal::JournalPlayer( - m_ioctx, oid + ".", metadata, &m_replay_hander)); - m_players.push_back(player); - return player; - } - - bool wait_for_entries(journal::JournalPlayer *player, uint32_t count, - Entries *entries) { - entries->clear(); - while (entries->size() < count) { - journal::Entry entry; - uint64_t commit_tid; - while (entries->size() < count && - player->try_pop_front(&entry, &commit_tid)) { - entries->push_back(entry); - } - if (entries->size() == count) { - break; - } - - Mutex::Locker locker(m_replay_hander.lock); - if (m_replay_hander.entries_available) { - m_replay_hander.entries_available = false; - } else if (m_replay_hander.cond.WaitInterval( - m_replay_hander.lock, utime_t(10, 0)) != 0) { - break; - } - } - return entries->size() == count; - } - - bool wait_for_complete(journal::JournalPlayer *player) { - Mutex::Locker locker(m_replay_hander.lock); - while (!m_replay_hander.complete) { - journal::Entry entry; - uint64_t commit_tid; - player->try_pop_front(&entry, &commit_tid); - - if (m_replay_hander.cond.WaitInterval( - m_replay_hander.lock, utime_t(10, 0)) != 0) { - return false; - } - } - m_replay_hander.complete = false; - return true; - } - - int write_entry(const std::string &oid, uint64_t object_num, - uint64_t tag_tid, uint64_t entry_tid) { - bufferlist bl; - ::encode(create_entry(tag_tid, entry_tid), bl); - return append(oid + "." + stringify(object_num), bl); - } - - JournalPlayers m_players; - ReplayHandler m_replay_hander; -}; - -template -class TestJournalPlayerParams { -public: - static const uint64_t max_fetch_bytes = _max_fetch_bytes; -}; - -typedef ::testing::Types, - TestJournalPlayerParams<16> > TestJournalPlayerTypes; -TYPED_TEST_CASE(TestJournalPlayer, TestJournalPlayerTypes); - -TYPED_TEST(TestJournalPlayer, Prefetch) { - std::string oid = this->get_temp_oid(); - - journal::JournalPlayer::ObjectPositions positions; - positions = { - cls::journal::ObjectPosition(0, 234, 122) }; - cls::journal::ObjectSetPosition commit_position(positions); - - ASSERT_EQ(0, this->create(oid)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, commit_position)); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - - ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122)); - ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123)); - ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124)); - ASSERT_EQ(0, this->write_entry(oid, 1, 234, 125)); - - player->prefetch(); - - Entries entries; - ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); - ASSERT_TRUE(this->wait_for_complete(player)); - - Entries expected_entries; - expected_entries = { - this->create_entry(234, 123), - this->create_entry(234, 124), - this->create_entry(234, 125)}; - ASSERT_EQ(expected_entries, entries); - - uint64_t last_tid; - ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid)); - ASSERT_EQ(125U, last_tid); -} - -TYPED_TEST(TestJournalPlayer, PrefetchSkip) { - std::string oid = this->get_temp_oid(); - - journal::JournalPlayer::ObjectPositions positions; - positions = { - cls::journal::ObjectPosition(0, 234, 125), - cls::journal::ObjectPosition(1, 234, 124) }; - cls::journal::ObjectSetPosition commit_position(positions); - - ASSERT_EQ(0, this->create(oid)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, commit_position)); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - - ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122)); - ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123)); - ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124)); - ASSERT_EQ(0, this->write_entry(oid, 1, 234, 125)); - - player->prefetch(); - - Entries entries; - ASSERT_TRUE(this->wait_for_entries(player, 0, &entries)); - ASSERT_TRUE(this->wait_for_complete(player)); - - uint64_t last_tid; - ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid)); - ASSERT_EQ(125U, last_tid); -} - -TYPED_TEST(TestJournalPlayer, PrefetchWithoutCommit) { - std::string oid = this->get_temp_oid(); - - cls::journal::ObjectSetPosition commit_position; - - ASSERT_EQ(0, this->create(oid)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, commit_position)); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - - ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122)); - ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123)); - - player->prefetch(); - - Entries entries; - ASSERT_TRUE(this->wait_for_entries(player, 2, &entries)); - ASSERT_TRUE(this->wait_for_complete(player)); - - Entries expected_entries; - expected_entries = { - this->create_entry(234, 122), - this->create_entry(234, 123)}; - ASSERT_EQ(expected_entries, entries); -} - -TYPED_TEST(TestJournalPlayer, PrefetchMultipleTags) { - std::string oid = this->get_temp_oid(); - - journal::JournalPlayer::ObjectPositions positions; - positions = { - cls::journal::ObjectPosition(2, 234, 122), - cls::journal::ObjectPosition(1, 234, 121), - cls::journal::ObjectPosition(0, 234, 120)}; - cls::journal::ObjectSetPosition commit_position(positions); - - ASSERT_EQ(0, this->create(oid, 14, 3)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, commit_position)); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - - ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120)); - ASSERT_EQ(0, this->write_entry(oid, 1, 234, 121)); - ASSERT_EQ(0, this->write_entry(oid, 2, 234, 122)); - ASSERT_EQ(0, this->write_entry(oid, 0, 234, 123)); - ASSERT_EQ(0, this->write_entry(oid, 1, 234, 124)); - ASSERT_EQ(0, this->write_entry(oid, 0, 236, 0)); // new tag allocated - - player->prefetch(); - - Entries entries; - ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); - ASSERT_TRUE(this->wait_for_complete(player)); - - uint64_t last_tid; - ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid)); - ASSERT_EQ(124U, last_tid); - ASSERT_TRUE(metadata->get_last_allocated_entry_tid(236, &last_tid)); - ASSERT_EQ(0U, last_tid); -} - -TYPED_TEST(TestJournalPlayer, PrefetchCorruptSequence) { - std::string oid = this->get_temp_oid(); - - cls::journal::ObjectSetPosition commit_position; - - ASSERT_EQ(0, this->create(oid)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, commit_position)); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - - ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120)); - ASSERT_EQ(0, this->write_entry(oid, 1, 234, 121)); - ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124)); - - player->prefetch(); - Entries entries; - ASSERT_TRUE(this->wait_for_entries(player, 2, &entries)); - - journal::Entry entry; - uint64_t commit_tid; - ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); - ASSERT_TRUE(this->wait_for_complete(player)); - ASSERT_EQ(-ENOMSG, this->m_replay_hander.complete_result); -} - -TYPED_TEST(TestJournalPlayer, PrefetchMissingSequence) { - std::string oid = this->get_temp_oid(); - - cls::journal::ObjectSetPosition commit_position; - - ASSERT_EQ(0, this->create(oid, 14, 4)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, commit_position)); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - - ASSERT_EQ(0, metadata->set_active_set(1)); - ASSERT_EQ(0, this->write_entry(oid, 0, 2, 852)); - ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856)); - ASSERT_EQ(0, this->write_entry(oid, 0, 2, 860)); - ASSERT_EQ(0, this->write_entry(oid, 1, 2, 853)); - ASSERT_EQ(0, this->write_entry(oid, 1, 2, 857)); - ASSERT_EQ(0, this->write_entry(oid, 5, 2, 861)); - ASSERT_EQ(0, this->write_entry(oid, 2, 2, 854)); - ASSERT_EQ(0, this->write_entry(oid, 0, 3, 0)); - ASSERT_EQ(0, this->write_entry(oid, 5, 3, 1)); - ASSERT_EQ(0, this->write_entry(oid, 2, 3, 2)); - ASSERT_EQ(0, this->write_entry(oid, 3, 3, 3)); - - player->prefetch(); - Entries entries; - ASSERT_TRUE(this->wait_for_entries(player, 7, &entries)); - - Entries expected_entries = { - this->create_entry(2, 852), - this->create_entry(2, 853), - this->create_entry(2, 854), - this->create_entry(3, 0), - this->create_entry(3, 1), - this->create_entry(3, 2), - this->create_entry(3, 3)}; - ASSERT_EQ(expected_entries, entries); - - ASSERT_TRUE(this->wait_for_complete(player)); - ASSERT_EQ(0, this->m_replay_hander.complete_result); -} - -TYPED_TEST(TestJournalPlayer, PrefetchLargeMissingSequence) { - std::string oid = this->get_temp_oid(); - - cls::journal::ObjectSetPosition commit_position; - - ASSERT_EQ(0, this->create(oid)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, commit_position)); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - - ASSERT_EQ(0, metadata->set_active_set(2)); - ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0)); - ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); - ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3)); - ASSERT_EQ(0, this->write_entry(oid, 4, 1, 0)); - - player->prefetch(); - Entries entries; - ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); - - Entries expected_entries = { - this->create_entry(0, 0), - this->create_entry(0, 1), - this->create_entry(1, 0)}; - ASSERT_EQ(expected_entries, entries); -} - -TYPED_TEST(TestJournalPlayer, PrefetchBlockedNewTag) { - std::string oid = this->get_temp_oid(); - - cls::journal::ObjectSetPosition commit_position; - - ASSERT_EQ(0, this->create(oid)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, commit_position)); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - - ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0)); - ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); - ASSERT_EQ(0, this->write_entry(oid, 0, 0, 2)); - ASSERT_EQ(0, this->write_entry(oid, 0, 0, 4)); - ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0)); - - player->prefetch(); - Entries entries; - ASSERT_TRUE(this->wait_for_entries(player, 4, &entries)); - - Entries expected_entries = { - this->create_entry(0, 0), - this->create_entry(0, 1), - this->create_entry(0, 2), - this->create_entry(1, 0)}; - ASSERT_EQ(expected_entries, entries); -} - -TYPED_TEST(TestJournalPlayer, PrefetchStaleEntries) { - std::string oid = this->get_temp_oid(); - - journal::JournalPlayer::ObjectPositions positions = { - cls::journal::ObjectPosition(0, 1, 0) }; - cls::journal::ObjectSetPosition commit_position(positions); - - ASSERT_EQ(0, this->create(oid)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, commit_position)); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - - ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); - ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3)); - ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0)); - ASSERT_EQ(0, this->write_entry(oid, 1, 1, 1)); - - player->prefetch(); - Entries entries; - ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); - - Entries expected_entries = { - this->create_entry(1, 1)}; - ASSERT_EQ(expected_entries, entries); - - ASSERT_TRUE(this->wait_for_complete(player)); - ASSERT_EQ(0, this->m_replay_hander.complete_result); -} - -TYPED_TEST(TestJournalPlayer, PrefetchUnexpectedTag) { - std::string oid = this->get_temp_oid(); - - cls::journal::ObjectSetPosition commit_position; - - ASSERT_EQ(0, this->create(oid)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, commit_position)); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - - ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120)); - ASSERT_EQ(0, this->write_entry(oid, 1, 235, 121)); - ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124)); - - player->prefetch(); - Entries entries; - ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); - - journal::Entry entry; - uint64_t commit_tid; - ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); - ASSERT_TRUE(this->wait_for_complete(player)); - ASSERT_EQ(0, this->m_replay_hander.complete_result); -} - -TYPED_TEST(TestJournalPlayer, PrefetchAndWatch) { - std::string oid = this->get_temp_oid(); - - journal::JournalPlayer::ObjectPositions positions; - positions = { - cls::journal::ObjectPosition(0, 234, 122)}; - cls::journal::ObjectSetPosition commit_position(positions); - - ASSERT_EQ(0, this->create(oid)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, commit_position)); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - - ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122)); - - player->prefetch_and_watch(0.25); - - Entries entries; - ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123)); - ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); - - Entries expected_entries; - expected_entries = {this->create_entry(234, 123)}; - ASSERT_EQ(expected_entries, entries); - - ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124)); - ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); - - expected_entries = {this->create_entry(234, 124)}; - ASSERT_EQ(expected_entries, entries); -} - -TYPED_TEST(TestJournalPlayer, PrefetchSkippedObject) { - std::string oid = this->get_temp_oid(); - - cls::journal::ObjectSetPosition commit_position; - - ASSERT_EQ(0, this->create(oid, 14, 3)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, commit_position)); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - ASSERT_EQ(0, metadata->set_active_set(2)); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - - ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122)); - ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123)); - ASSERT_EQ(0, this->write_entry(oid, 5, 234, 124)); - ASSERT_EQ(0, this->write_entry(oid, 6, 234, 125)); - ASSERT_EQ(0, this->write_entry(oid, 7, 234, 126)); - - player->prefetch(); - - Entries entries; - ASSERT_TRUE(this->wait_for_entries(player, 5, &entries)); - ASSERT_TRUE(this->wait_for_complete(player)); - - Entries expected_entries; - expected_entries = { - this->create_entry(234, 122), - this->create_entry(234, 123), - this->create_entry(234, 124), - this->create_entry(234, 125), - this->create_entry(234, 126)}; - ASSERT_EQ(expected_entries, entries); - - uint64_t last_tid; - ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid)); - ASSERT_EQ(126U, last_tid); -} - -TYPED_TEST(TestJournalPlayer, ImbalancedJournal) { - std::string oid = this->get_temp_oid(); - - journal::JournalPlayer::ObjectPositions positions = { - cls::journal::ObjectPosition(9, 300, 1), - cls::journal::ObjectPosition(8, 300, 0), - cls::journal::ObjectPosition(10, 200, 4334), - cls::journal::ObjectPosition(11, 200, 4331) }; - cls::journal::ObjectSetPosition commit_position(positions); - - ASSERT_EQ(0, this->create(oid, 14, 4)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, commit_position)); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - ASSERT_EQ(0, metadata->set_active_set(2)); - metadata->set_minimum_set(2); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - - ASSERT_EQ(0, this->write_entry(oid, 8, 300, 0)); - ASSERT_EQ(0, this->write_entry(oid, 8, 301, 0)); - ASSERT_EQ(0, this->write_entry(oid, 9, 300, 1)); - ASSERT_EQ(0, this->write_entry(oid, 9, 301, 1)); - ASSERT_EQ(0, this->write_entry(oid, 10, 200, 4334)); - ASSERT_EQ(0, this->write_entry(oid, 10, 301, 2)); - ASSERT_EQ(0, this->write_entry(oid, 11, 200, 4331)); - ASSERT_EQ(0, this->write_entry(oid, 11, 301, 3)); - - player->prefetch(); - - Entries entries; - ASSERT_TRUE(this->wait_for_entries(player, 4, &entries)); - ASSERT_TRUE(this->wait_for_complete(player)); - - Entries expected_entries; - expected_entries = { - this->create_entry(301, 0), - this->create_entry(301, 1), - this->create_entry(301, 2), - this->create_entry(301, 3)}; - ASSERT_EQ(expected_entries, entries); - - uint64_t last_tid; - ASSERT_TRUE(metadata->get_last_allocated_entry_tid(301, &last_tid)); - ASSERT_EQ(3U, last_tid); -} - -TYPED_TEST(TestJournalPlayer, LiveReplayLaggyAppend) { - std::string oid = this->get_temp_oid(); - - cls::journal::ObjectSetPosition commit_position; - - ASSERT_EQ(0, this->create(oid)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, commit_position)); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - - ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0)); - ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); - ASSERT_EQ(0, this->write_entry(oid, 0, 0, 2)); - ASSERT_EQ(0, this->write_entry(oid, 0, 0, 4)); - ASSERT_EQ(0, this->write_entry(oid, 3, 0, 5)); // laggy entry 0/3 in object 1 - player->prefetch_and_watch(0.25); - - Entries entries; - ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); - - Entries expected_entries = { - this->create_entry(0, 0), - this->create_entry(0, 1), - this->create_entry(0, 2)}; - ASSERT_EQ(expected_entries, entries); - - journal::Entry entry; - uint64_t commit_tid; - ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); - - ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3)); - ASSERT_EQ(0, metadata->set_active_set(1)); - ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); - - expected_entries = { - this->create_entry(0, 3), - this->create_entry(0, 4), - this->create_entry(0, 5)}; - ASSERT_EQ(expected_entries, entries); -} - -TYPED_TEST(TestJournalPlayer, LiveReplayMissingSequence) { - std::string oid = this->get_temp_oid(); - - cls::journal::ObjectSetPosition commit_position; - - ASSERT_EQ(0, this->create(oid, 14, 4)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, commit_position)); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - - ASSERT_EQ(0, this->write_entry(oid, 0, 2, 852)); - ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856)); - ASSERT_EQ(0, this->write_entry(oid, 0, 2, 860)); - ASSERT_EQ(0, this->write_entry(oid, 1, 2, 853)); - ASSERT_EQ(0, this->write_entry(oid, 1, 2, 857)); - ASSERT_EQ(0, this->write_entry(oid, 2, 2, 854)); - ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856)); - player->prefetch_and_watch(0.25); - - Entries entries; - ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); - - Entries expected_entries = { - this->create_entry(2, 852), - this->create_entry(2, 853), - this->create_entry(2, 854)}; - ASSERT_EQ(expected_entries, entries); - - journal::Entry entry; - uint64_t commit_tid; - ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); - - ASSERT_EQ(0, this->write_entry(oid, 3, 3, 3)); - ASSERT_EQ(0, this->write_entry(oid, 2, 3, 2)); - ASSERT_EQ(0, this->write_entry(oid, 1, 3, 1)); - ASSERT_EQ(0, this->write_entry(oid, 0, 3, 0)); - ASSERT_TRUE(this->wait_for_entries(player, 4, &entries)); - - expected_entries = { - this->create_entry(3, 0), - this->create_entry(3, 1), - this->create_entry(3, 2), - this->create_entry(3, 3)}; - ASSERT_EQ(expected_entries, entries); -} - -TYPED_TEST(TestJournalPlayer, LiveReplayLargeMissingSequence) { - std::string oid = this->get_temp_oid(); - - cls::journal::ObjectSetPosition commit_position; - - ASSERT_EQ(0, this->create(oid)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, commit_position)); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - - ASSERT_EQ(0, metadata->set_active_set(2)); - ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0)); - ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); - ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3)); - ASSERT_EQ(0, this->write_entry(oid, 4, 1, 0)); - player->prefetch_and_watch(0.25); - - Entries entries; - ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); - - Entries expected_entries = { - this->create_entry(0, 0), - this->create_entry(0, 1), - this->create_entry(1, 0)}; - ASSERT_EQ(expected_entries, entries); -} - -TYPED_TEST(TestJournalPlayer, LiveReplayBlockedNewTag) { - std::string oid = this->get_temp_oid(); - - cls::journal::ObjectSetPosition commit_position; - - ASSERT_EQ(0, this->create(oid)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, commit_position)); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - - C_SaferCond ctx1; - cls::journal::Tag tag1; - metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, {}, &tag1, &ctx1); - ASSERT_EQ(0, ctx1.wait()); - - ASSERT_EQ(0, metadata->set_active_set(0)); - ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 0)); - ASSERT_EQ(0, this->write_entry(oid, 1, tag1.tid, 1)); - ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 2)); - ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 4)); - player->prefetch_and_watch(0.25); - - Entries entries; - ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); - - Entries expected_entries = { - this->create_entry(tag1.tid, 0), - this->create_entry(tag1.tid, 1), - this->create_entry(tag1.tid, 2)}; - ASSERT_EQ(expected_entries, entries); - - journal::Entry entry; - uint64_t commit_tid; - ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); - - C_SaferCond ctx2; - cls::journal::Tag tag2; - metadata->allocate_tag(tag1.tag_class, {}, &tag2, &ctx2); - ASSERT_EQ(0, ctx2.wait()); - - ASSERT_EQ(0, this->write_entry(oid, 0, tag2.tid, 0)); - ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); - - expected_entries = { - this->create_entry(tag2.tid, 0)}; - ASSERT_EQ(expected_entries, entries); -} - -TYPED_TEST(TestJournalPlayer, LiveReplayStaleEntries) { - std::string oid = this->get_temp_oid(); - - journal::JournalPlayer::ObjectPositions positions = { - cls::journal::ObjectPosition(0, 1, 0) }; - cls::journal::ObjectSetPosition commit_position(positions); - - ASSERT_EQ(0, this->create(oid)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, commit_position)); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - - ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); - ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3)); - ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0)); - ASSERT_EQ(0, this->write_entry(oid, 1, 1, 1)); - player->prefetch_and_watch(0.25); - - Entries entries; - ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); - - Entries expected_entries = { - this->create_entry(1, 1)}; - ASSERT_EQ(expected_entries, entries); -} - -TYPED_TEST(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) { - std::string oid = this->get_temp_oid(); - - journal::JournalPlayer::ObjectPositions positions = { - cls::journal::ObjectPosition(1, 0, 1), - cls::journal::ObjectPosition(0, 0, 0)}; - cls::journal::ObjectSetPosition commit_position(positions); - - ASSERT_EQ(0, this->create(oid)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, commit_position)); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - - ASSERT_EQ(0, metadata->set_active_set(1)); - ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0)); - ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); - ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3)); - ASSERT_EQ(0, this->write_entry(oid, 2, 1, 0)); - player->prefetch_and_watch(0.25); - - Entries entries; - ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); - - Entries expected_entries = { - this->create_entry(1, 0)}; - ASSERT_EQ(expected_entries, entries); - - // should remove player for offset 3 after refetching - ASSERT_EQ(0, metadata->set_active_set(3)); - ASSERT_EQ(0, this->write_entry(oid, 7, 1, 1)); - - ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); - - expected_entries = { - this->create_entry(1, 1)}; - ASSERT_EQ(expected_entries, entries); -} - -TYPED_TEST(TestJournalPlayer, PrefechShutDown) { - std::string oid = this->get_temp_oid(); - - ASSERT_EQ(0, this->create(oid)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, {})); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - player->prefetch(); -} - -TYPED_TEST(TestJournalPlayer, LiveReplayShutDown) { - std::string oid = this->get_temp_oid(); - - ASSERT_EQ(0, this->create(oid)); - ASSERT_EQ(0, this->client_register(oid)); - ASSERT_EQ(0, this->client_commit(oid, {})); - - journal::JournalMetadataPtr metadata = this->create_metadata(oid); - ASSERT_EQ(0, this->init_metadata(metadata)); - - journal::JournalPlayer *player = this->create_player(oid, metadata); - BOOST_SCOPE_EXIT_ALL( (player) ) { - C_SaferCond unwatch_ctx; - player->shut_down(&unwatch_ctx); - ASSERT_EQ(0, unwatch_ctx.wait()); - }; - player->prefetch_and_watch(0.25); -} -