+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "journal/JournalPlayer.h"
-#include "journal/Entry.h"
-#include "journal/JournalMetadata.h"
-#include "journal/ReplayHandler.h"
-#include "include/stringify.h"
-#include "common/Cond.h"
-#include "common/Mutex.h"
-#include "gtest/gtest.h"
-#include "test/journal/RadosTestFixture.h"
-#include <list>
-#include <boost/scope_exit.hpp>
-
-typedef std::list<journal::Entry> Entries;
-
-template <typename T>
-class TestJournalPlayer : public RadosTestFixture {
-public:
- typedef std::list<journal::JournalPlayer *> JournalPlayers;
-
- static const uint64_t max_fetch_bytes = T::max_fetch_bytes;
-
- struct ReplayHandler : public journal::ReplayHandler {
- Mutex lock;
- Cond cond;
- bool entries_available;
- bool complete;
- int complete_result;
-
- ReplayHandler()
- : lock("lock"), entries_available(false), complete(false),
- complete_result(0) {}
-
- void get() override {}
- void put() override {}
-
- void handle_entries_available() override {
- Mutex::Locker locker(lock);
- entries_available = true;
- cond.Signal();
- }
-
- void handle_complete(int r) override {
- Mutex::Locker locker(lock);
- complete = true;
- complete_result = r;
- cond.Signal();
- }
- };
-
- void TearDown() override {
- for (JournalPlayers::iterator it = m_players.begin();
- it != m_players.end(); ++it) {
- delete *it;
- }
- RadosTestFixture::TearDown();
- }
-
- journal::JournalMetadataPtr create_metadata(const std::string &oid) {
- return RadosTestFixture::create_metadata(oid, "client", 0.1,
- max_fetch_bytes);
- }
-
- int client_commit(const std::string &oid,
- journal::JournalPlayer::ObjectSetPosition position) {
- return RadosTestFixture::client_commit(oid, "client", position);
- }
-
- journal::Entry create_entry(uint64_t tag_tid, uint64_t entry_tid) {
- std::string payload(128, '0');
- bufferlist payload_bl;
- payload_bl.append(payload);
- return journal::Entry(tag_tid, entry_tid, payload_bl);
- }
-
- journal::JournalPlayer *create_player(const std::string &oid,
- const journal::JournalMetadataPtr &metadata) {
- journal::JournalPlayer *player(new journal::JournalPlayer(
- m_ioctx, oid + ".", metadata, &m_replay_hander));
- m_players.push_back(player);
- return player;
- }
-
- bool wait_for_entries(journal::JournalPlayer *player, uint32_t count,
- Entries *entries) {
- entries->clear();
- while (entries->size() < count) {
- journal::Entry entry;
- uint64_t commit_tid;
- while (entries->size() < count &&
- player->try_pop_front(&entry, &commit_tid)) {
- entries->push_back(entry);
- }
- if (entries->size() == count) {
- break;
- }
-
- Mutex::Locker locker(m_replay_hander.lock);
- if (m_replay_hander.entries_available) {
- m_replay_hander.entries_available = false;
- } else if (m_replay_hander.cond.WaitInterval(
- m_replay_hander.lock, utime_t(10, 0)) != 0) {
- break;
- }
- }
- return entries->size() == count;
- }
-
- bool wait_for_complete(journal::JournalPlayer *player) {
- Mutex::Locker locker(m_replay_hander.lock);
- while (!m_replay_hander.complete) {
- journal::Entry entry;
- uint64_t commit_tid;
- player->try_pop_front(&entry, &commit_tid);
-
- if (m_replay_hander.cond.WaitInterval(
- m_replay_hander.lock, utime_t(10, 0)) != 0) {
- return false;
- }
- }
- m_replay_hander.complete = false;
- return true;
- }
-
- int write_entry(const std::string &oid, uint64_t object_num,
- uint64_t tag_tid, uint64_t entry_tid) {
- bufferlist bl;
- ::encode(create_entry(tag_tid, entry_tid), bl);
- return append(oid + "." + stringify(object_num), bl);
- }
-
- JournalPlayers m_players;
- ReplayHandler m_replay_hander;
-};
-
-template <uint64_t _max_fetch_bytes>
-class TestJournalPlayerParams {
-public:
- static const uint64_t max_fetch_bytes = _max_fetch_bytes;
-};
-
-typedef ::testing::Types<TestJournalPlayerParams<0>,
- TestJournalPlayerParams<16> > TestJournalPlayerTypes;
-TYPED_TEST_CASE(TestJournalPlayer, TestJournalPlayerTypes);
-
-TYPED_TEST(TestJournalPlayer, Prefetch) {
- std::string oid = this->get_temp_oid();
-
- journal::JournalPlayer::ObjectPositions positions;
- positions = {
- cls::journal::ObjectPosition(0, 234, 122) };
- cls::journal::ObjectSetPosition commit_position(positions);
-
- ASSERT_EQ(0, this->create(oid));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, commit_position));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
-
- ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
- ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
- ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
- ASSERT_EQ(0, this->write_entry(oid, 1, 234, 125));
-
- player->prefetch();
-
- Entries entries;
- ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
- ASSERT_TRUE(this->wait_for_complete(player));
-
- Entries expected_entries;
- expected_entries = {
- this->create_entry(234, 123),
- this->create_entry(234, 124),
- this->create_entry(234, 125)};
- ASSERT_EQ(expected_entries, entries);
-
- uint64_t last_tid;
- ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
- ASSERT_EQ(125U, last_tid);
-}
-
-TYPED_TEST(TestJournalPlayer, PrefetchSkip) {
- std::string oid = this->get_temp_oid();
-
- journal::JournalPlayer::ObjectPositions positions;
- positions = {
- cls::journal::ObjectPosition(0, 234, 125),
- cls::journal::ObjectPosition(1, 234, 124) };
- cls::journal::ObjectSetPosition commit_position(positions);
-
- ASSERT_EQ(0, this->create(oid));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, commit_position));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
-
- ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
- ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
- ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
- ASSERT_EQ(0, this->write_entry(oid, 1, 234, 125));
-
- player->prefetch();
-
- Entries entries;
- ASSERT_TRUE(this->wait_for_entries(player, 0, &entries));
- ASSERT_TRUE(this->wait_for_complete(player));
-
- uint64_t last_tid;
- ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
- ASSERT_EQ(125U, last_tid);
-}
-
-TYPED_TEST(TestJournalPlayer, PrefetchWithoutCommit) {
- std::string oid = this->get_temp_oid();
-
- cls::journal::ObjectSetPosition commit_position;
-
- ASSERT_EQ(0, this->create(oid));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, commit_position));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
-
- ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
- ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
-
- player->prefetch();
-
- Entries entries;
- ASSERT_TRUE(this->wait_for_entries(player, 2, &entries));
- ASSERT_TRUE(this->wait_for_complete(player));
-
- Entries expected_entries;
- expected_entries = {
- this->create_entry(234, 122),
- this->create_entry(234, 123)};
- ASSERT_EQ(expected_entries, entries);
-}
-
-TYPED_TEST(TestJournalPlayer, PrefetchMultipleTags) {
- std::string oid = this->get_temp_oid();
-
- journal::JournalPlayer::ObjectPositions positions;
- positions = {
- cls::journal::ObjectPosition(2, 234, 122),
- cls::journal::ObjectPosition(1, 234, 121),
- cls::journal::ObjectPosition(0, 234, 120)};
- cls::journal::ObjectSetPosition commit_position(positions);
-
- ASSERT_EQ(0, this->create(oid, 14, 3));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, commit_position));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
-
- ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
- ASSERT_EQ(0, this->write_entry(oid, 1, 234, 121));
- ASSERT_EQ(0, this->write_entry(oid, 2, 234, 122));
- ASSERT_EQ(0, this->write_entry(oid, 0, 234, 123));
- ASSERT_EQ(0, this->write_entry(oid, 1, 234, 124));
- ASSERT_EQ(0, this->write_entry(oid, 0, 236, 0)); // new tag allocated
-
- player->prefetch();
-
- Entries entries;
- ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
- ASSERT_TRUE(this->wait_for_complete(player));
-
- uint64_t last_tid;
- ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
- ASSERT_EQ(124U, last_tid);
- ASSERT_TRUE(metadata->get_last_allocated_entry_tid(236, &last_tid));
- ASSERT_EQ(0U, last_tid);
-}
-
-TYPED_TEST(TestJournalPlayer, PrefetchCorruptSequence) {
- std::string oid = this->get_temp_oid();
-
- cls::journal::ObjectSetPosition commit_position;
-
- ASSERT_EQ(0, this->create(oid));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, commit_position));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
-
- ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
- ASSERT_EQ(0, this->write_entry(oid, 1, 234, 121));
- ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
-
- player->prefetch();
- Entries entries;
- ASSERT_TRUE(this->wait_for_entries(player, 2, &entries));
-
- journal::Entry entry;
- uint64_t commit_tid;
- ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
- ASSERT_TRUE(this->wait_for_complete(player));
- ASSERT_EQ(-ENOMSG, this->m_replay_hander.complete_result);
-}
-
-TYPED_TEST(TestJournalPlayer, PrefetchMissingSequence) {
- std::string oid = this->get_temp_oid();
-
- cls::journal::ObjectSetPosition commit_position;
-
- ASSERT_EQ(0, this->create(oid, 14, 4));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, commit_position));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
-
- ASSERT_EQ(0, metadata->set_active_set(1));
- ASSERT_EQ(0, this->write_entry(oid, 0, 2, 852));
- ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
- ASSERT_EQ(0, this->write_entry(oid, 0, 2, 860));
- ASSERT_EQ(0, this->write_entry(oid, 1, 2, 853));
- ASSERT_EQ(0, this->write_entry(oid, 1, 2, 857));
- ASSERT_EQ(0, this->write_entry(oid, 5, 2, 861));
- ASSERT_EQ(0, this->write_entry(oid, 2, 2, 854));
- ASSERT_EQ(0, this->write_entry(oid, 0, 3, 0));
- ASSERT_EQ(0, this->write_entry(oid, 5, 3, 1));
- ASSERT_EQ(0, this->write_entry(oid, 2, 3, 2));
- ASSERT_EQ(0, this->write_entry(oid, 3, 3, 3));
-
- player->prefetch();
- Entries entries;
- ASSERT_TRUE(this->wait_for_entries(player, 7, &entries));
-
- Entries expected_entries = {
- this->create_entry(2, 852),
- this->create_entry(2, 853),
- this->create_entry(2, 854),
- this->create_entry(3, 0),
- this->create_entry(3, 1),
- this->create_entry(3, 2),
- this->create_entry(3, 3)};
- ASSERT_EQ(expected_entries, entries);
-
- ASSERT_TRUE(this->wait_for_complete(player));
- ASSERT_EQ(0, this->m_replay_hander.complete_result);
-}
-
-TYPED_TEST(TestJournalPlayer, PrefetchLargeMissingSequence) {
- std::string oid = this->get_temp_oid();
-
- cls::journal::ObjectSetPosition commit_position;
-
- ASSERT_EQ(0, this->create(oid));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, commit_position));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
-
- ASSERT_EQ(0, metadata->set_active_set(2));
- ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
- ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
- ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
- ASSERT_EQ(0, this->write_entry(oid, 4, 1, 0));
-
- player->prefetch();
- Entries entries;
- ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
-
- Entries expected_entries = {
- this->create_entry(0, 0),
- this->create_entry(0, 1),
- this->create_entry(1, 0)};
- ASSERT_EQ(expected_entries, entries);
-}
-
-TYPED_TEST(TestJournalPlayer, PrefetchBlockedNewTag) {
- std::string oid = this->get_temp_oid();
-
- cls::journal::ObjectSetPosition commit_position;
-
- ASSERT_EQ(0, this->create(oid));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, commit_position));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
-
- ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
- ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
- ASSERT_EQ(0, this->write_entry(oid, 0, 0, 2));
- ASSERT_EQ(0, this->write_entry(oid, 0, 0, 4));
- ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
-
- player->prefetch();
- Entries entries;
- ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
-
- Entries expected_entries = {
- this->create_entry(0, 0),
- this->create_entry(0, 1),
- this->create_entry(0, 2),
- this->create_entry(1, 0)};
- ASSERT_EQ(expected_entries, entries);
-}
-
-TYPED_TEST(TestJournalPlayer, PrefetchStaleEntries) {
- std::string oid = this->get_temp_oid();
-
- journal::JournalPlayer::ObjectPositions positions = {
- cls::journal::ObjectPosition(0, 1, 0) };
- cls::journal::ObjectSetPosition commit_position(positions);
-
- ASSERT_EQ(0, this->create(oid));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, commit_position));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
-
- ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
- ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
- ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
- ASSERT_EQ(0, this->write_entry(oid, 1, 1, 1));
-
- player->prefetch();
- Entries entries;
- ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
-
- Entries expected_entries = {
- this->create_entry(1, 1)};
- ASSERT_EQ(expected_entries, entries);
-
- ASSERT_TRUE(this->wait_for_complete(player));
- ASSERT_EQ(0, this->m_replay_hander.complete_result);
-}
-
-TYPED_TEST(TestJournalPlayer, PrefetchUnexpectedTag) {
- std::string oid = this->get_temp_oid();
-
- cls::journal::ObjectSetPosition commit_position;
-
- ASSERT_EQ(0, this->create(oid));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, commit_position));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
-
- ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
- ASSERT_EQ(0, this->write_entry(oid, 1, 235, 121));
- ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
-
- player->prefetch();
- Entries entries;
- ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
-
- journal::Entry entry;
- uint64_t commit_tid;
- ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
- ASSERT_TRUE(this->wait_for_complete(player));
- ASSERT_EQ(0, this->m_replay_hander.complete_result);
-}
-
-TYPED_TEST(TestJournalPlayer, PrefetchAndWatch) {
- std::string oid = this->get_temp_oid();
-
- journal::JournalPlayer::ObjectPositions positions;
- positions = {
- cls::journal::ObjectPosition(0, 234, 122)};
- cls::journal::ObjectSetPosition commit_position(positions);
-
- ASSERT_EQ(0, this->create(oid));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, commit_position));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
-
- ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
-
- player->prefetch_and_watch(0.25);
-
- Entries entries;
- ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
- ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
-
- Entries expected_entries;
- expected_entries = {this->create_entry(234, 123)};
- ASSERT_EQ(expected_entries, entries);
-
- ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
- ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
-
- expected_entries = {this->create_entry(234, 124)};
- ASSERT_EQ(expected_entries, entries);
-}
-
-TYPED_TEST(TestJournalPlayer, PrefetchSkippedObject) {
- std::string oid = this->get_temp_oid();
-
- cls::journal::ObjectSetPosition commit_position;
-
- ASSERT_EQ(0, this->create(oid, 14, 3));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, commit_position));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
- ASSERT_EQ(0, metadata->set_active_set(2));
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
-
- ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
- ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
- ASSERT_EQ(0, this->write_entry(oid, 5, 234, 124));
- ASSERT_EQ(0, this->write_entry(oid, 6, 234, 125));
- ASSERT_EQ(0, this->write_entry(oid, 7, 234, 126));
-
- player->prefetch();
-
- Entries entries;
- ASSERT_TRUE(this->wait_for_entries(player, 5, &entries));
- ASSERT_TRUE(this->wait_for_complete(player));
-
- Entries expected_entries;
- expected_entries = {
- this->create_entry(234, 122),
- this->create_entry(234, 123),
- this->create_entry(234, 124),
- this->create_entry(234, 125),
- this->create_entry(234, 126)};
- ASSERT_EQ(expected_entries, entries);
-
- uint64_t last_tid;
- ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
- ASSERT_EQ(126U, last_tid);
-}
-
-TYPED_TEST(TestJournalPlayer, ImbalancedJournal) {
- std::string oid = this->get_temp_oid();
-
- journal::JournalPlayer::ObjectPositions positions = {
- cls::journal::ObjectPosition(9, 300, 1),
- cls::journal::ObjectPosition(8, 300, 0),
- cls::journal::ObjectPosition(10, 200, 4334),
- cls::journal::ObjectPosition(11, 200, 4331) };
- cls::journal::ObjectSetPosition commit_position(positions);
-
- ASSERT_EQ(0, this->create(oid, 14, 4));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, commit_position));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
- ASSERT_EQ(0, metadata->set_active_set(2));
- metadata->set_minimum_set(2);
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
-
- ASSERT_EQ(0, this->write_entry(oid, 8, 300, 0));
- ASSERT_EQ(0, this->write_entry(oid, 8, 301, 0));
- ASSERT_EQ(0, this->write_entry(oid, 9, 300, 1));
- ASSERT_EQ(0, this->write_entry(oid, 9, 301, 1));
- ASSERT_EQ(0, this->write_entry(oid, 10, 200, 4334));
- ASSERT_EQ(0, this->write_entry(oid, 10, 301, 2));
- ASSERT_EQ(0, this->write_entry(oid, 11, 200, 4331));
- ASSERT_EQ(0, this->write_entry(oid, 11, 301, 3));
-
- player->prefetch();
-
- Entries entries;
- ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
- ASSERT_TRUE(this->wait_for_complete(player));
-
- Entries expected_entries;
- expected_entries = {
- this->create_entry(301, 0),
- this->create_entry(301, 1),
- this->create_entry(301, 2),
- this->create_entry(301, 3)};
- ASSERT_EQ(expected_entries, entries);
-
- uint64_t last_tid;
- ASSERT_TRUE(metadata->get_last_allocated_entry_tid(301, &last_tid));
- ASSERT_EQ(3U, last_tid);
-}
-
-TYPED_TEST(TestJournalPlayer, LiveReplayLaggyAppend) {
- std::string oid = this->get_temp_oid();
-
- cls::journal::ObjectSetPosition commit_position;
-
- ASSERT_EQ(0, this->create(oid));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, commit_position));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
-
- ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
- ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
- ASSERT_EQ(0, this->write_entry(oid, 0, 0, 2));
- ASSERT_EQ(0, this->write_entry(oid, 0, 0, 4));
- ASSERT_EQ(0, this->write_entry(oid, 3, 0, 5)); // laggy entry 0/3 in object 1
- player->prefetch_and_watch(0.25);
-
- Entries entries;
- ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
-
- Entries expected_entries = {
- this->create_entry(0, 0),
- this->create_entry(0, 1),
- this->create_entry(0, 2)};
- ASSERT_EQ(expected_entries, entries);
-
- journal::Entry entry;
- uint64_t commit_tid;
- ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
-
- ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
- ASSERT_EQ(0, metadata->set_active_set(1));
- ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
-
- expected_entries = {
- this->create_entry(0, 3),
- this->create_entry(0, 4),
- this->create_entry(0, 5)};
- ASSERT_EQ(expected_entries, entries);
-}
-
-TYPED_TEST(TestJournalPlayer, LiveReplayMissingSequence) {
- std::string oid = this->get_temp_oid();
-
- cls::journal::ObjectSetPosition commit_position;
-
- ASSERT_EQ(0, this->create(oid, 14, 4));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, commit_position));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
-
- ASSERT_EQ(0, this->write_entry(oid, 0, 2, 852));
- ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
- ASSERT_EQ(0, this->write_entry(oid, 0, 2, 860));
- ASSERT_EQ(0, this->write_entry(oid, 1, 2, 853));
- ASSERT_EQ(0, this->write_entry(oid, 1, 2, 857));
- ASSERT_EQ(0, this->write_entry(oid, 2, 2, 854));
- ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
- player->prefetch_and_watch(0.25);
-
- Entries entries;
- ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
-
- Entries expected_entries = {
- this->create_entry(2, 852),
- this->create_entry(2, 853),
- this->create_entry(2, 854)};
- ASSERT_EQ(expected_entries, entries);
-
- journal::Entry entry;
- uint64_t commit_tid;
- ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
-
- ASSERT_EQ(0, this->write_entry(oid, 3, 3, 3));
- ASSERT_EQ(0, this->write_entry(oid, 2, 3, 2));
- ASSERT_EQ(0, this->write_entry(oid, 1, 3, 1));
- ASSERT_EQ(0, this->write_entry(oid, 0, 3, 0));
- ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
-
- expected_entries = {
- this->create_entry(3, 0),
- this->create_entry(3, 1),
- this->create_entry(3, 2),
- this->create_entry(3, 3)};
- ASSERT_EQ(expected_entries, entries);
-}
-
-TYPED_TEST(TestJournalPlayer, LiveReplayLargeMissingSequence) {
- std::string oid = this->get_temp_oid();
-
- cls::journal::ObjectSetPosition commit_position;
-
- ASSERT_EQ(0, this->create(oid));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, commit_position));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
-
- ASSERT_EQ(0, metadata->set_active_set(2));
- ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
- ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
- ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
- ASSERT_EQ(0, this->write_entry(oid, 4, 1, 0));
- player->prefetch_and_watch(0.25);
-
- Entries entries;
- ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
-
- Entries expected_entries = {
- this->create_entry(0, 0),
- this->create_entry(0, 1),
- this->create_entry(1, 0)};
- ASSERT_EQ(expected_entries, entries);
-}
-
-TYPED_TEST(TestJournalPlayer, LiveReplayBlockedNewTag) {
- std::string oid = this->get_temp_oid();
-
- cls::journal::ObjectSetPosition commit_position;
-
- ASSERT_EQ(0, this->create(oid));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, commit_position));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
-
- C_SaferCond ctx1;
- cls::journal::Tag tag1;
- metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, {}, &tag1, &ctx1);
- ASSERT_EQ(0, ctx1.wait());
-
- ASSERT_EQ(0, metadata->set_active_set(0));
- ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 0));
- ASSERT_EQ(0, this->write_entry(oid, 1, tag1.tid, 1));
- ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 2));
- ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 4));
- player->prefetch_and_watch(0.25);
-
- Entries entries;
- ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
-
- Entries expected_entries = {
- this->create_entry(tag1.tid, 0),
- this->create_entry(tag1.tid, 1),
- this->create_entry(tag1.tid, 2)};
- ASSERT_EQ(expected_entries, entries);
-
- journal::Entry entry;
- uint64_t commit_tid;
- ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
-
- C_SaferCond ctx2;
- cls::journal::Tag tag2;
- metadata->allocate_tag(tag1.tag_class, {}, &tag2, &ctx2);
- ASSERT_EQ(0, ctx2.wait());
-
- ASSERT_EQ(0, this->write_entry(oid, 0, tag2.tid, 0));
- ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
-
- expected_entries = {
- this->create_entry(tag2.tid, 0)};
- ASSERT_EQ(expected_entries, entries);
-}
-
-TYPED_TEST(TestJournalPlayer, LiveReplayStaleEntries) {
- std::string oid = this->get_temp_oid();
-
- journal::JournalPlayer::ObjectPositions positions = {
- cls::journal::ObjectPosition(0, 1, 0) };
- cls::journal::ObjectSetPosition commit_position(positions);
-
- ASSERT_EQ(0, this->create(oid));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, commit_position));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
-
- ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
- ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
- ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
- ASSERT_EQ(0, this->write_entry(oid, 1, 1, 1));
- player->prefetch_and_watch(0.25);
-
- Entries entries;
- ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
-
- Entries expected_entries = {
- this->create_entry(1, 1)};
- ASSERT_EQ(expected_entries, entries);
-}
-
-TYPED_TEST(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) {
- std::string oid = this->get_temp_oid();
-
- journal::JournalPlayer::ObjectPositions positions = {
- cls::journal::ObjectPosition(1, 0, 1),
- cls::journal::ObjectPosition(0, 0, 0)};
- cls::journal::ObjectSetPosition commit_position(positions);
-
- ASSERT_EQ(0, this->create(oid));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, commit_position));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
-
- ASSERT_EQ(0, metadata->set_active_set(1));
- ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
- ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
- ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
- ASSERT_EQ(0, this->write_entry(oid, 2, 1, 0));
- player->prefetch_and_watch(0.25);
-
- Entries entries;
- ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
-
- Entries expected_entries = {
- this->create_entry(1, 0)};
- ASSERT_EQ(expected_entries, entries);
-
- // should remove player for offset 3 after refetching
- ASSERT_EQ(0, metadata->set_active_set(3));
- ASSERT_EQ(0, this->write_entry(oid, 7, 1, 1));
-
- ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
-
- expected_entries = {
- this->create_entry(1, 1)};
- ASSERT_EQ(expected_entries, entries);
-}
-
-TYPED_TEST(TestJournalPlayer, PrefechShutDown) {
- std::string oid = this->get_temp_oid();
-
- ASSERT_EQ(0, this->create(oid));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, {}));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
- player->prefetch();
-}
-
-TYPED_TEST(TestJournalPlayer, LiveReplayShutDown) {
- std::string oid = this->get_temp_oid();
-
- ASSERT_EQ(0, this->create(oid));
- ASSERT_EQ(0, this->client_register(oid));
- ASSERT_EQ(0, this->client_commit(oid, {}));
-
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
- ASSERT_EQ(0, this->init_metadata(metadata));
-
- journal::JournalPlayer *player = this->create_player(oid, metadata);
- BOOST_SCOPE_EXIT_ALL( (player) ) {
- C_SaferCond unwatch_ctx;
- player->shut_down(&unwatch_ctx);
- ASSERT_EQ(0, unwatch_ctx.wait());
- };
- player->prefetch_and_watch(0.25);
-}
-