1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "journal/JournalMetadata.h"
5 #include "test/journal/RadosTestFixture.h"
6 #include "common/Cond.h"
7 #include "common/Mutex.h"
10 class TestJournalMetadata : public RadosTestFixture {
12 void TearDown() override {
13 for (MetadataList::iterator it = m_metadata_list.begin();
14 it != m_metadata_list.end(); ++it) {
15 (*it)->remove_listener(&m_listener);
17 m_metadata_list.clear();
19 RadosTestFixture::TearDown();
22 journal::JournalMetadataPtr create_metadata(const std::string &oid,
23 const std::string &client_id,
24 double commit_interval = 0.1,
25 uint64_t max_fetch_bytes = 0,
26 int max_concurrent_object_sets = 0) {
27 journal::JournalMetadataPtr metadata = RadosTestFixture::create_metadata(
28 oid, client_id, commit_interval, max_fetch_bytes,
29 max_concurrent_object_sets);
30 m_metadata_list.push_back(metadata);
31 metadata->add_listener(&m_listener);
35 typedef std::list<journal::JournalMetadataPtr> MetadataList;
36 MetadataList m_metadata_list;
39 TEST_F(TestJournalMetadata, JournalDNE) {
40 std::string oid = get_temp_oid();
42 journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1");
43 ASSERT_EQ(-ENOENT, init_metadata(metadata1));
46 TEST_F(TestJournalMetadata, ClientDNE) {
47 std::string oid = get_temp_oid();
49 ASSERT_EQ(0, create(oid, 14, 2));
50 ASSERT_EQ(0, client_register(oid, "client1", ""));
52 journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1");
53 ASSERT_EQ(0, init_metadata(metadata1));
55 journal::JournalMetadataPtr metadata2 = create_metadata(oid, "client2");
56 ASSERT_EQ(-ENOENT, init_metadata(metadata2));
59 TEST_F(TestJournalMetadata, Committed) {
60 std::string oid = get_temp_oid();
62 ASSERT_EQ(0, create(oid, 14, 2));
63 ASSERT_EQ(0, client_register(oid, "client1", ""));
65 journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1", 600);
66 ASSERT_EQ(0, init_metadata(metadata1));
68 journal::JournalMetadataPtr metadata2 = create_metadata(oid, "client1");
69 ASSERT_EQ(0, init_metadata(metadata2));
70 ASSERT_TRUE(wait_for_update(metadata2));
72 journal::JournalMetadata::ObjectSetPosition expect_commit_position;
73 journal::JournalMetadata::ObjectSetPosition read_commit_position;
74 metadata1->get_commit_position(&read_commit_position);
75 ASSERT_EQ(expect_commit_position, read_commit_position);
77 uint64_t commit_tid1 = metadata1->allocate_commit_tid(0, 0, 0);
78 uint64_t commit_tid2 = metadata1->allocate_commit_tid(0, 1, 0);
79 uint64_t commit_tid3 = metadata1->allocate_commit_tid(1, 0, 1);
80 uint64_t commit_tid4 = metadata1->allocate_commit_tid(0, 0, 2);
82 // cannot commit until tid1 + 2 committed
83 metadata1->committed(commit_tid2, []() { return nullptr; });
84 metadata1->committed(commit_tid3, []() { return nullptr; });
87 metadata1->committed(commit_tid1, [&cond1]() { return &cond1; });
89 // given our 10 minute commit internal, this should override the
92 metadata1->committed(commit_tid4, [&cond2]() { return &cond2; });
94 ASSERT_EQ(-ESTALE, cond1.wait());
95 metadata1->flush_commit_position();
96 ASSERT_EQ(0, cond2.wait());
98 ASSERT_TRUE(wait_for_update(metadata2));
99 metadata2->get_commit_position(&read_commit_position);
100 expect_commit_position = {{{0, 0, 2}, {1, 0, 1}}};
101 ASSERT_EQ(expect_commit_position, read_commit_position);
104 TEST_F(TestJournalMetadata, UpdateActiveObject) {
105 std::string oid = get_temp_oid();
107 ASSERT_EQ(0, create(oid, 14, 2));
108 ASSERT_EQ(0, client_register(oid, "client1", ""));
110 journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1");
111 ASSERT_EQ(0, init_metadata(metadata1));
112 ASSERT_TRUE(wait_for_update(metadata1));
114 ASSERT_EQ(0U, metadata1->get_active_set());
116 ASSERT_EQ(0, metadata1->set_active_set(123));
117 ASSERT_TRUE(wait_for_update(metadata1));
119 ASSERT_EQ(123U, metadata1->get_active_set());
122 TEST_F(TestJournalMetadata, DisconnectLaggyClient) {
123 std::string oid = get_temp_oid();
125 ASSERT_EQ(0, create(oid));
126 ASSERT_EQ(0, client_register(oid, "client1", ""));
127 ASSERT_EQ(0, client_register(oid, "client2", "laggy"));
129 int max_concurrent_object_sets = 100;
130 journal::JournalMetadataPtr metadata =
131 create_metadata(oid, "client1", 0.1, 0, max_concurrent_object_sets);
132 ASSERT_EQ(0, init_metadata(metadata));
133 ASSERT_TRUE(wait_for_update(metadata));
135 ASSERT_EQ(0U, metadata->get_active_set());
137 journal::JournalMetadata::RegisteredClients clients;
139 #define ASSERT_CLIENT_STATES(s1, s2) \
140 ASSERT_EQ(2U, clients.size()); \
141 for (auto &c : clients) { \
142 if (c.id == "client1") { \
143 ASSERT_EQ(c.state, s1); \
144 } else if (c.id == "client2") { \
145 ASSERT_EQ(c.state, s2); \
147 ASSERT_TRUE(false); \
151 metadata->get_registered_clients(&clients);
152 ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED,
153 cls::journal::CLIENT_STATE_CONNECTED);
155 // client2 is connected when active set <= max_concurrent_object_sets
156 ASSERT_EQ(0, metadata->set_active_set(max_concurrent_object_sets));
157 ASSERT_TRUE(wait_for_update(metadata));
158 uint64_t commit_tid = metadata->allocate_commit_tid(0, 0, 0);
160 metadata->committed(commit_tid, [&cond1]() { return &cond1; });
161 ASSERT_EQ(0, cond1.wait());
162 metadata->flush_commit_position();
163 ASSERT_TRUE(wait_for_update(metadata));
164 ASSERT_EQ(100U, metadata->get_active_set());
166 metadata->get_registered_clients(&clients);
167 ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED,
168 cls::journal::CLIENT_STATE_CONNECTED);
170 // client2 is disconnected when active set > max_concurrent_object_sets
171 ASSERT_EQ(0, metadata->set_active_set(max_concurrent_object_sets + 1));
172 ASSERT_TRUE(wait_for_update(metadata));
173 commit_tid = metadata->allocate_commit_tid(0, 0, 1);
175 metadata->committed(commit_tid, [&cond2]() { return &cond2; });
176 ASSERT_EQ(0, cond2.wait());
177 metadata->flush_commit_position();
178 ASSERT_TRUE(wait_for_update(metadata));
179 ASSERT_EQ(101U, metadata->get_active_set());
181 metadata->get_registered_clients(&clients);
182 ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED,
183 cls::journal::CLIENT_STATE_DISCONNECTED);
186 TEST_F(TestJournalMetadata, AssertActiveTag) {
187 std::string oid = get_temp_oid();
189 ASSERT_EQ(0, create(oid));
190 ASSERT_EQ(0, client_register(oid, "client1", ""));
192 journal::JournalMetadataPtr metadata = create_metadata(oid, "client1");
193 ASSERT_EQ(0, init_metadata(metadata));
194 ASSERT_TRUE(wait_for_update(metadata));
197 cls::journal::Tag tag1;
198 metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, {}, &tag1, &ctx1);
199 ASSERT_EQ(0, ctx1.wait());
202 metadata->assert_active_tag(tag1.tid, &ctx2);
203 ASSERT_EQ(0, ctx2.wait());
206 cls::journal::Tag tag2;
207 metadata->allocate_tag(tag1.tag_class, {}, &tag2, &ctx3);
208 ASSERT_EQ(0, ctx3.wait());
211 metadata->assert_active_tag(tag1.tid, &ctx4);
212 ASSERT_EQ(-ESTALE, ctx4.wait());