1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "journal/ObjectRecorder.h"
5 #include "common/Cond.h"
6 #include "common/Mutex.h"
7 #include "common/Timer.h"
8 #include "gtest/gtest.h"
9 #include "test/librados/test.h"
10 #include "test/journal/RadosTestFixture.h"
13 using std::shared_ptr;
15 class TestObjectRecorder : public RadosTestFixture {
18 : m_flush_interval(std::numeric_limits<uint32_t>::max()),
19 m_flush_bytes(std::numeric_limits<uint64_t>::max()),
24 struct Handler : public journal::ObjectRecorder::Handler {
26 shared_ptr<Mutex> object_lock;
28 bool is_closed = false;
29 uint32_t overflows = 0;
31 Handler() : lock("lock") {
34 void closed(journal::ObjectRecorder *object_recorder) override {
35 Mutex::Locker locker(lock);
39 void overflow(journal::ObjectRecorder *object_recorder) override {
40 Mutex::Locker locker(lock);
41 journal::AppendBuffers append_buffers;
43 object_recorder->claim_append_buffers(&append_buffers);
44 object_lock->Unlock();
51 typedef std::list<journal::ObjectRecorderPtr> ObjectRecorders;
52 typedef std::map<std::string, shared_ptr<Mutex>> ObjectRecorderLocksMap;
54 ObjectRecorders m_object_recorders;
55 ObjectRecorderLocksMap m_object_recorder_locks;
57 uint32_t m_flush_interval;
58 uint64_t m_flush_bytes;
62 void TearDown() override {
63 for (ObjectRecorders::iterator it = m_object_recorders.begin();
64 it != m_object_recorders.end(); ++it) {
69 m_object_recorders.clear();
71 RadosTestFixture::TearDown();
74 inline void set_flush_interval(uint32_t i) {
77 inline void set_flush_bytes(uint64_t i) {
80 inline void set_flush_age(double i) {
84 journal::AppendBuffer create_append_buffer(uint64_t tag_tid, uint64_t entry_tid,
85 const std::string &payload) {
86 journal::FutureImplPtr future(new journal::FutureImpl(tag_tid, entry_tid,
88 future->init(journal::FutureImplPtr());
92 return std::make_pair(future, bl);
95 journal::ObjectRecorderPtr create_object(const std::string &oid,
96 uint8_t order, shared_ptr<Mutex> lock) {
97 journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
98 m_ioctx, oid, 0, lock, m_work_queue, *m_timer, m_timer_lock, &m_handler,
99 order, m_flush_interval, m_flush_bytes, m_flush_age));
100 m_object_recorders.push_back(object);
101 m_object_recorder_locks.insert(std::make_pair(oid, lock));
102 m_handler.object_lock = lock;
107 TEST_F(TestObjectRecorder, Append) {
108 std::string oid = get_temp_oid();
109 ASSERT_EQ(0, create(oid));
110 ASSERT_EQ(0, client_register(oid));
111 journal::JournalMetadataPtr metadata = create_metadata(oid);
112 ASSERT_EQ(0, init_metadata(metadata));
114 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
115 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
117 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
119 journal::AppendBuffers append_buffers;
120 append_buffers = {append_buffer1};
122 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
123 ASSERT_EQ(1U, object->get_pending_appends());
125 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
127 append_buffers = {append_buffer2};
129 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
130 ASSERT_EQ(2U, object->get_pending_appends());
133 append_buffer2.first->flush(&cond);
134 ASSERT_EQ(0, cond.wait());
135 ASSERT_EQ(0U, object->get_pending_appends());
138 TEST_F(TestObjectRecorder, AppendFlushByCount) {
139 std::string oid = get_temp_oid();
140 ASSERT_EQ(0, create(oid));
141 ASSERT_EQ(0, client_register(oid));
142 journal::JournalMetadataPtr metadata = create_metadata(oid);
143 ASSERT_EQ(0, init_metadata(metadata));
145 set_flush_interval(2);
146 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
147 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
149 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
151 journal::AppendBuffers append_buffers;
152 append_buffers = {append_buffer1};
154 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
155 ASSERT_EQ(1U, object->get_pending_appends());
157 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
159 append_buffers = {append_buffer2};
161 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
162 ASSERT_EQ(0U, object->get_pending_appends());
165 append_buffer2.first->wait(&cond);
166 ASSERT_EQ(0, cond.wait());
169 TEST_F(TestObjectRecorder, AppendFlushByBytes) {
170 std::string oid = get_temp_oid();
171 ASSERT_EQ(0, create(oid));
172 ASSERT_EQ(0, client_register(oid));
173 journal::JournalMetadataPtr metadata = create_metadata(oid);
174 ASSERT_EQ(0, init_metadata(metadata));
177 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
178 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
180 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
182 journal::AppendBuffers append_buffers;
183 append_buffers = {append_buffer1};
185 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
186 ASSERT_EQ(1U, object->get_pending_appends());
188 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
190 append_buffers = {append_buffer2};
192 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
193 ASSERT_EQ(0U, object->get_pending_appends());
196 append_buffer2.first->wait(&cond);
197 ASSERT_EQ(0, cond.wait());
200 TEST_F(TestObjectRecorder, AppendFlushByAge) {
201 std::string oid = get_temp_oid();
202 ASSERT_EQ(0, create(oid));
203 ASSERT_EQ(0, client_register(oid));
204 journal::JournalMetadataPtr metadata = create_metadata(oid);
205 ASSERT_EQ(0, init_metadata(metadata));
208 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
209 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
211 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
213 journal::AppendBuffers append_buffers;
214 append_buffers = {append_buffer1};
216 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
218 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
220 append_buffers = {append_buffer2};
222 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
225 append_buffer2.first->wait(&cond);
226 ASSERT_EQ(0, cond.wait());
227 ASSERT_EQ(0U, object->get_pending_appends());
230 TEST_F(TestObjectRecorder, AppendFilledObject) {
231 std::string oid = get_temp_oid();
232 ASSERT_EQ(0, create(oid));
233 ASSERT_EQ(0, client_register(oid));
234 journal::JournalMetadataPtr metadata = create_metadata(oid);
235 ASSERT_EQ(0, init_metadata(metadata));
237 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
238 journal::ObjectRecorderPtr object = create_object(oid, 12, lock);
240 std::string payload(2048, '1');
241 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
243 journal::AppendBuffers append_buffers;
244 append_buffers = {append_buffer1};
246 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
248 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
250 append_buffers = {append_buffer2};
252 ASSERT_TRUE(object->append_unlock(std::move(append_buffers)));
255 append_buffer2.first->wait(&cond);
256 ASSERT_EQ(0, cond.wait());
257 ASSERT_EQ(0U, object->get_pending_appends());
260 TEST_F(TestObjectRecorder, Flush) {
261 std::string oid = get_temp_oid();
262 ASSERT_EQ(0, create(oid));
263 ASSERT_EQ(0, client_register(oid));
264 journal::JournalMetadataPtr metadata = create_metadata(oid);
265 ASSERT_EQ(0, init_metadata(metadata));
267 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
268 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
270 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
272 journal::AppendBuffers append_buffers;
273 append_buffers = {append_buffer1};
275 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
276 ASSERT_EQ(1U, object->get_pending_appends());
279 object->flush(&cond1);
280 ASSERT_EQ(0, cond1.wait());
283 append_buffer1.first->wait(&cond2);
284 ASSERT_EQ(0, cond2.wait());
285 ASSERT_EQ(0U, object->get_pending_appends());
288 TEST_F(TestObjectRecorder, FlushFuture) {
289 std::string oid = get_temp_oid();
290 ASSERT_EQ(0, create(oid));
291 ASSERT_EQ(0, client_register(oid));
292 journal::JournalMetadataPtr metadata = create_metadata(oid);
293 ASSERT_EQ(0, init_metadata(metadata));
295 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
296 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
298 journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
300 journal::AppendBuffers append_buffers;
301 append_buffers = {append_buffer};
303 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
304 ASSERT_EQ(1U, object->get_pending_appends());
307 append_buffer.first->wait(&cond);
309 object->flush(append_buffer.first);
310 ASSERT_TRUE(lock->is_locked());
312 ASSERT_TRUE(append_buffer.first->is_flush_in_progress() ||
313 append_buffer.first->is_complete());
314 ASSERT_EQ(0, cond.wait());
317 TEST_F(TestObjectRecorder, FlushDetachedFuture) {
318 std::string oid = get_temp_oid();
319 ASSERT_EQ(0, create(oid));
320 ASSERT_EQ(0, client_register(oid));
321 journal::JournalMetadataPtr metadata = create_metadata(oid);
322 ASSERT_EQ(0, init_metadata(metadata));
324 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
325 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
327 journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
330 journal::AppendBuffers append_buffers;
331 append_buffers = {append_buffer};
334 object->flush(append_buffer.first);
335 ASSERT_TRUE(lock->is_locked());
337 ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
339 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
341 // should automatically flush once its attached to the object
343 append_buffer.first->wait(&cond);
344 ASSERT_EQ(0, cond.wait());
347 TEST_F(TestObjectRecorder, Close) {
348 std::string oid = get_temp_oid();
349 ASSERT_EQ(0, create(oid));
350 ASSERT_EQ(0, client_register(oid));
351 journal::JournalMetadataPtr metadata = create_metadata(oid);
352 ASSERT_EQ(0, init_metadata(metadata));
354 set_flush_interval(2);
355 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
356 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
358 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
360 journal::AppendBuffers append_buffers;
361 append_buffers = {append_buffer1};
363 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
364 ASSERT_EQ(1U, object->get_pending_appends());
367 ASSERT_FALSE(object->close());
368 ASSERT_TRUE(lock->is_locked());
372 Mutex::Locker locker(m_handler.lock);
373 while (!m_handler.is_closed) {
374 if (m_handler.cond.WaitInterval(
375 m_handler.lock, utime_t(10, 0)) != 0) {
381 ASSERT_TRUE(m_handler.is_closed);
382 ASSERT_EQ(0U, object->get_pending_appends());
385 TEST_F(TestObjectRecorder, Overflow) {
386 std::string oid = get_temp_oid();
387 ASSERT_EQ(0, create(oid));
388 ASSERT_EQ(0, client_register(oid));
389 journal::JournalMetadataPtr metadata = create_metadata(oid);
390 ASSERT_EQ(0, init_metadata(metadata));
392 shared_ptr<Mutex> lock1(new Mutex("object_recorder_lock_1"));
393 journal::ObjectRecorderPtr object1 = create_object(oid, 12, lock1);
394 shared_ptr<Mutex> lock2(new Mutex("object_recorder_lock_2"));
395 journal::ObjectRecorderPtr object2 = create_object(oid, 12, lock2);
397 std::string payload(2048, '1');
398 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
400 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
402 journal::AppendBuffers append_buffers;
403 append_buffers = {append_buffer1, append_buffer2};
405 ASSERT_TRUE(object1->append_unlock(std::move(append_buffers)));
408 append_buffer2.first->wait(&cond);
409 ASSERT_EQ(0, cond.wait());
410 ASSERT_EQ(0U, object1->get_pending_appends());
412 journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123,
414 append_buffers = {append_buffer3};
417 ASSERT_FALSE(object2->append_unlock(std::move(append_buffers)));
418 append_buffer3.first->flush(NULL);
420 bool overflowed = false;
422 Mutex::Locker locker(m_handler.lock);
423 while (m_handler.overflows == 0) {
424 if (m_handler.cond.WaitInterval(
425 m_handler.lock, utime_t(10, 0)) != 0) {
429 if (m_handler.overflows != 0) {
434 ASSERT_TRUE(overflowed);