Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / journal / test_ObjectRecorder.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "journal/ObjectRecorder.h"
5 #include "common/Cond.h"
6 #include "common/Mutex.h"
7 #include "common/Timer.h"
8 #include "gtest/gtest.h"
9 #include "test/librados/test.h"
10 #include "test/journal/RadosTestFixture.h"
11 #include <limits>
12
13 using std::shared_ptr;
14
15 class TestObjectRecorder : public RadosTestFixture {
16 public:
17   TestObjectRecorder()
18     : m_flush_interval(std::numeric_limits<uint32_t>::max()),
19       m_flush_bytes(std::numeric_limits<uint64_t>::max()),
20       m_flush_age(600)
21   {
22   }
23
24   struct Handler : public journal::ObjectRecorder::Handler {
25     Mutex lock;
26     shared_ptr<Mutex> object_lock;
27     Cond cond;
28     bool is_closed = false;
29     uint32_t overflows = 0;
30
31     Handler() : lock("lock") {
32     }
33
34     void closed(journal::ObjectRecorder *object_recorder) override {
35       Mutex::Locker locker(lock);
36       is_closed = true;
37       cond.Signal();
38     }
39     void overflow(journal::ObjectRecorder *object_recorder) override {
40       Mutex::Locker locker(lock);
41       journal::AppendBuffers append_buffers;
42       object_lock->Lock();
43       object_recorder->claim_append_buffers(&append_buffers);
44       object_lock->Unlock();
45
46       ++overflows;
47       cond.Signal();
48     }
49   };
50
51   typedef std::list<journal::ObjectRecorderPtr> ObjectRecorders;
52   typedef std::map<std::string, shared_ptr<Mutex>> ObjectRecorderLocksMap;
53
54   ObjectRecorders m_object_recorders;
55   ObjectRecorderLocksMap m_object_recorder_locks;
56
57   uint32_t m_flush_interval;
58   uint64_t m_flush_bytes;
59   double m_flush_age;
60   Handler m_handler;
61
62   void TearDown() override {
63     for (ObjectRecorders::iterator it = m_object_recorders.begin();
64          it != m_object_recorders.end(); ++it) {
65       C_SaferCond cond;
66       (*it)->flush(&cond);
67       cond.wait();
68     }
69     m_object_recorders.clear();
70
71     RadosTestFixture::TearDown();
72   }
73
74   inline void set_flush_interval(uint32_t i) {
75     m_flush_interval = i;
76   }
77   inline void set_flush_bytes(uint64_t i) {
78     m_flush_bytes = i;
79   }
80   inline void set_flush_age(double i) {
81     m_flush_age = i;
82   }
83
84   journal::AppendBuffer create_append_buffer(uint64_t tag_tid, uint64_t entry_tid,
85                                              const std::string &payload) {
86     journal::FutureImplPtr future(new journal::FutureImpl(tag_tid, entry_tid,
87                                                           456));
88     future->init(journal::FutureImplPtr());
89
90     bufferlist bl;
91     bl.append(payload);
92     return std::make_pair(future, bl);
93   }
94
95   journal::ObjectRecorderPtr create_object(const std::string &oid,
96                                            uint8_t order, shared_ptr<Mutex> lock) {
97     journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
98       m_ioctx, oid, 0, lock, m_work_queue, *m_timer, m_timer_lock, &m_handler,
99       order, m_flush_interval, m_flush_bytes, m_flush_age));
100     m_object_recorders.push_back(object);
101     m_object_recorder_locks.insert(std::make_pair(oid, lock));
102     m_handler.object_lock = lock;
103     return object;
104   }
105 };
106
107 TEST_F(TestObjectRecorder, Append) {
108   std::string oid = get_temp_oid();
109   ASSERT_EQ(0, create(oid));
110   ASSERT_EQ(0, client_register(oid));
111   journal::JournalMetadataPtr metadata = create_metadata(oid);
112   ASSERT_EQ(0, init_metadata(metadata));
113
114   shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
115   journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
116
117   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
118                                                               "payload");
119   journal::AppendBuffers append_buffers;
120   append_buffers = {append_buffer1};
121   lock->Lock();
122   ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
123   ASSERT_EQ(1U, object->get_pending_appends());
124
125   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
126                                                               "payload");
127   append_buffers = {append_buffer2};
128   lock->Lock();
129   ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
130   ASSERT_EQ(2U, object->get_pending_appends());
131
132   C_SaferCond cond;
133   append_buffer2.first->flush(&cond);
134   ASSERT_EQ(0, cond.wait());
135   ASSERT_EQ(0U, object->get_pending_appends());
136 }
137
138 TEST_F(TestObjectRecorder, AppendFlushByCount) {
139   std::string oid = get_temp_oid();
140   ASSERT_EQ(0, create(oid));
141   ASSERT_EQ(0, client_register(oid));
142   journal::JournalMetadataPtr metadata = create_metadata(oid);
143   ASSERT_EQ(0, init_metadata(metadata));
144
145   set_flush_interval(2);
146   shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
147   journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
148
149   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
150                                                               "payload");
151   journal::AppendBuffers append_buffers;
152   append_buffers = {append_buffer1};
153   lock->Lock();
154   ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
155   ASSERT_EQ(1U, object->get_pending_appends());
156
157   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
158                                                               "payload");
159   append_buffers = {append_buffer2};
160   lock->Lock();
161   ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
162   ASSERT_EQ(0U, object->get_pending_appends());
163
164   C_SaferCond cond;
165   append_buffer2.first->wait(&cond);
166   ASSERT_EQ(0, cond.wait());
167 }
168
169 TEST_F(TestObjectRecorder, AppendFlushByBytes) {
170   std::string oid = get_temp_oid();
171   ASSERT_EQ(0, create(oid));
172   ASSERT_EQ(0, client_register(oid));
173   journal::JournalMetadataPtr metadata = create_metadata(oid);
174   ASSERT_EQ(0, init_metadata(metadata));
175
176   set_flush_bytes(10);
177   shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
178   journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
179
180   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
181                                                               "payload");
182   journal::AppendBuffers append_buffers;
183   append_buffers = {append_buffer1};
184   lock->Lock();
185   ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
186   ASSERT_EQ(1U, object->get_pending_appends());
187
188   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
189                                                               "payload");
190   append_buffers = {append_buffer2};
191   lock->Lock();
192   ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
193   ASSERT_EQ(0U, object->get_pending_appends());
194
195   C_SaferCond cond;
196   append_buffer2.first->wait(&cond);
197   ASSERT_EQ(0, cond.wait());
198 }
199
200 TEST_F(TestObjectRecorder, AppendFlushByAge) {
201   std::string oid = get_temp_oid();
202   ASSERT_EQ(0, create(oid));
203   ASSERT_EQ(0, client_register(oid));
204   journal::JournalMetadataPtr metadata = create_metadata(oid);
205   ASSERT_EQ(0, init_metadata(metadata));
206
207   set_flush_age(0.1);
208   shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
209   journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
210
211   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
212                                                               "payload");
213   journal::AppendBuffers append_buffers;
214   append_buffers = {append_buffer1};
215   lock->Lock();
216   ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
217
218   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
219                                                               "payload");
220   append_buffers = {append_buffer2};
221   lock->Lock();
222   ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
223
224   C_SaferCond cond;
225   append_buffer2.first->wait(&cond);
226   ASSERT_EQ(0, cond.wait());
227   ASSERT_EQ(0U, object->get_pending_appends());
228 }
229
230 TEST_F(TestObjectRecorder, AppendFilledObject) {
231   std::string oid = get_temp_oid();
232   ASSERT_EQ(0, create(oid));
233   ASSERT_EQ(0, client_register(oid));
234   journal::JournalMetadataPtr metadata = create_metadata(oid);
235   ASSERT_EQ(0, init_metadata(metadata));
236
237   shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
238   journal::ObjectRecorderPtr object = create_object(oid, 12, lock);
239
240   std::string payload(2048, '1');
241   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
242                                                               payload);
243   journal::AppendBuffers append_buffers;
244   append_buffers = {append_buffer1};
245   lock->Lock();
246   ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
247
248   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
249                                                               payload);
250   append_buffers = {append_buffer2};
251   lock->Lock();
252   ASSERT_TRUE(object->append_unlock(std::move(append_buffers)));
253
254   C_SaferCond cond;
255   append_buffer2.first->wait(&cond);
256   ASSERT_EQ(0, cond.wait());
257   ASSERT_EQ(0U, object->get_pending_appends());
258 }
259
260 TEST_F(TestObjectRecorder, Flush) {
261   std::string oid = get_temp_oid();
262   ASSERT_EQ(0, create(oid));
263   ASSERT_EQ(0, client_register(oid));
264   journal::JournalMetadataPtr metadata = create_metadata(oid);
265   ASSERT_EQ(0, init_metadata(metadata));
266
267   shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
268   journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
269
270   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
271                                                               "payload");
272   journal::AppendBuffers append_buffers;
273   append_buffers = {append_buffer1};
274   lock->Lock();
275   ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
276   ASSERT_EQ(1U, object->get_pending_appends());
277
278   C_SaferCond cond1;
279   object->flush(&cond1);
280   ASSERT_EQ(0, cond1.wait());
281
282   C_SaferCond cond2;
283   append_buffer1.first->wait(&cond2);
284   ASSERT_EQ(0, cond2.wait());
285   ASSERT_EQ(0U, object->get_pending_appends());
286 }
287
288 TEST_F(TestObjectRecorder, FlushFuture) {
289   std::string oid = get_temp_oid();
290   ASSERT_EQ(0, create(oid));
291   ASSERT_EQ(0, client_register(oid));
292   journal::JournalMetadataPtr metadata = create_metadata(oid);
293   ASSERT_EQ(0, init_metadata(metadata));
294
295   shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
296   journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
297
298   journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
299                                                              "payload");
300   journal::AppendBuffers append_buffers;
301   append_buffers = {append_buffer};
302   lock->Lock();
303   ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
304   ASSERT_EQ(1U, object->get_pending_appends());
305
306   C_SaferCond cond;
307   append_buffer.first->wait(&cond);
308   lock->Lock();
309   object->flush(append_buffer.first);
310   ASSERT_TRUE(lock->is_locked());
311   lock->Unlock();
312   ASSERT_TRUE(append_buffer.first->is_flush_in_progress() ||
313               append_buffer.first->is_complete());
314   ASSERT_EQ(0, cond.wait());
315 }
316
317 TEST_F(TestObjectRecorder, FlushDetachedFuture) {
318   std::string oid = get_temp_oid();
319   ASSERT_EQ(0, create(oid));
320   ASSERT_EQ(0, client_register(oid));
321   journal::JournalMetadataPtr metadata = create_metadata(oid);
322   ASSERT_EQ(0, init_metadata(metadata));
323
324   shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
325   journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
326
327   journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
328                                                              "payload");
329
330   journal::AppendBuffers append_buffers;
331   append_buffers = {append_buffer};
332
333   lock->Lock();
334   object->flush(append_buffer.first);
335   ASSERT_TRUE(lock->is_locked());
336   lock->Unlock();
337   ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
338   lock->Lock();
339   ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
340
341   // should automatically flush once its attached to the object
342   C_SaferCond cond;
343   append_buffer.first->wait(&cond);
344   ASSERT_EQ(0, cond.wait());
345 }
346
347 TEST_F(TestObjectRecorder, Close) {
348   std::string oid = get_temp_oid();
349   ASSERT_EQ(0, create(oid));
350   ASSERT_EQ(0, client_register(oid));
351   journal::JournalMetadataPtr metadata = create_metadata(oid);
352   ASSERT_EQ(0, init_metadata(metadata));
353
354   set_flush_interval(2);
355   shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
356   journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
357
358   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
359                                                               "payload");
360   journal::AppendBuffers append_buffers;
361   append_buffers = {append_buffer1};
362   lock->Lock();
363   ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
364   ASSERT_EQ(1U, object->get_pending_appends());
365
366   lock->Lock();
367   ASSERT_FALSE(object->close());
368   ASSERT_TRUE(lock->is_locked());
369   lock->Unlock();
370
371   {
372     Mutex::Locker locker(m_handler.lock);
373     while (!m_handler.is_closed) {
374       if (m_handler.cond.WaitInterval(
375             m_handler.lock, utime_t(10, 0)) != 0) {
376         break;
377       }
378     }
379   }
380
381   ASSERT_TRUE(m_handler.is_closed);
382   ASSERT_EQ(0U, object->get_pending_appends());
383 }
384
385 TEST_F(TestObjectRecorder, Overflow) {
386   std::string oid = get_temp_oid();
387   ASSERT_EQ(0, create(oid));
388   ASSERT_EQ(0, client_register(oid));
389   journal::JournalMetadataPtr metadata = create_metadata(oid);
390   ASSERT_EQ(0, init_metadata(metadata));
391
392   shared_ptr<Mutex> lock1(new Mutex("object_recorder_lock_1"));
393   journal::ObjectRecorderPtr object1 = create_object(oid, 12, lock1);
394   shared_ptr<Mutex> lock2(new Mutex("object_recorder_lock_2"));
395   journal::ObjectRecorderPtr object2 = create_object(oid, 12, lock2);
396
397   std::string payload(2048, '1');
398   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
399                                                               payload);
400   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
401                                                               payload);
402   journal::AppendBuffers append_buffers;
403   append_buffers = {append_buffer1, append_buffer2};
404   lock1->Lock();
405   ASSERT_TRUE(object1->append_unlock(std::move(append_buffers)));
406
407   C_SaferCond cond;
408   append_buffer2.first->wait(&cond);
409   ASSERT_EQ(0, cond.wait());
410   ASSERT_EQ(0U, object1->get_pending_appends());
411
412   journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123,
413                                                               payload);
414   append_buffers = {append_buffer3};
415
416   lock2->Lock();
417   ASSERT_FALSE(object2->append_unlock(std::move(append_buffers)));
418   append_buffer3.first->flush(NULL);
419
420   bool overflowed = false;
421   {
422     Mutex::Locker locker(m_handler.lock);
423     while (m_handler.overflows == 0) {
424       if (m_handler.cond.WaitInterval(
425             m_handler.lock, utime_t(10, 0)) != 0) {
426         break;
427       }
428     }
429     if (m_handler.overflows != 0) {
430       overflowed = true;
431     }
432   }
433
434   ASSERT_TRUE(overflowed);
435 }