1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 #include <gtest/gtest.h>
6 #include "common/ceph_argparse.h"
7 #include "common/common_init.h"
8 #include "global/global_init.h"
9 #include "common/config.h"
10 #include "common/Finisher.h"
11 #include "os/filestore/FileJournal.h"
12 #include "include/Context.h"
13 #include "common/Mutex.h"
14 #include "common/safe_io.h"
15 #include "os/filestore/JournalingObjectStore.h"
22 bool directio, aio, faio;
23 const char *description;
25 { false, false, false, "DIRECTIO OFF AIO OFF" },
26 { true, false, false, "DIRECTIO ON AIO OFF" },
27 { true, true, true, "DIRECTIO ON AIO ON"}
32 Mutex wait_lock("lock");
52 : lock("C_Sync::lock"), done(false) {
53 c = new C_SafeCond(&lock, &cond, &done);
57 //cout << "wait" << std::endl;
60 //cout << "waited" << std::endl;
65 unsigned size_mb = 200;
66 //Gtest argument prefix
67 const char GTEST_PRFIX[] = "--gtest_";
69 int main(int argc, char **argv) {
70 vector<const char*> args;
71 argv_to_vec(argc, (const char **)argv, args);
73 auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
74 CODE_ENVIRONMENT_UTILITY, 0);
75 common_init_finish(g_ceph_context);
78 sprintf(mb, "%u", size_mb);
79 g_ceph_context->_conf->set_val("osd_journal_size", mb);
80 g_ceph_context->_conf->apply_changes(NULL);
82 finisher = new Finisher(g_ceph_context);
86 for ( unsigned int i = 0; i < args.size(); ++i) {
87 if (strncmp(args[i], GTEST_PRFIX, sizeof(GTEST_PRFIX) - 1)) {
88 //Non gtest argument, set to path.
89 size_t copy_len = std::min(sizeof(path) - 1, strlen(args[i]));
90 strncpy(path, args[i], copy_len);
91 path[copy_len] = '\0';
96 if ( path[0] == '\0') {
97 srand(getpid() + time(0));
98 snprintf(path, sizeof(path), "/var/tmp/ceph_test_filejournal.tmp.%d", rand());
100 cout << "path " << path << std::endl;
102 ::testing::InitGoogleTest(&argc, argv);
106 int r = RUN_ALL_TESTS();
115 TEST(TestFileJournal, Create) {
116 g_ceph_context->_conf->set_val("journal_ignore_corruption", "false");
117 g_ceph_context->_conf->set_val("journal_write_header_frequency", "0");
118 g_ceph_context->_conf->apply_changes(NULL);
120 for (unsigned i = 0 ; i < 3; ++i) {
121 SCOPED_TRACE(subtests[i].description);
122 fsid.generate_random();
123 FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
124 subtests[i].directio, subtests[i].aio, subtests[i].faio);
125 ASSERT_EQ(0, fj.create());
129 TEST(TestFileJournal, WriteSmall) {
130 g_ceph_context->_conf->set_val("journal_ignore_corruption", "false");
131 g_ceph_context->_conf->set_val("journal_write_header_frequency", "0");
132 g_ceph_context->_conf->apply_changes(NULL);
134 for (unsigned i = 0 ; i < 3; ++i) {
135 SCOPED_TRACE(subtests[i].description);
136 fsid.generate_random();
137 FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
138 subtests[i].directio, subtests[i].aio, subtests[i].faio);
139 ASSERT_EQ(0, fj.create());
140 ASSERT_EQ(0, fj.make_writeable());
142 vector<ObjectStore::Transaction> tls;
145 int orig_len = fj.prepare_entry(tls, &bl);
146 fj.reserve_throttle_and_backoff(bl.length());
147 fj.submit_entry(1, bl, orig_len, new C_SafeCond(&wait_lock, &cond, &done));
154 TEST(TestFileJournal, WriteBig) {
155 g_ceph_context->_conf->set_val("journal_ignore_corruption", "false");
156 g_ceph_context->_conf->set_val("journal_write_header_frequency", "0");
157 g_ceph_context->_conf->apply_changes(NULL);
159 for (unsigned i = 0 ; i < 3; ++i) {
160 SCOPED_TRACE(subtests[i].description);
161 fsid.generate_random();
162 FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
163 subtests[i].directio, subtests[i].aio, subtests[i].faio);
164 ASSERT_EQ(0, fj.create());
165 ASSERT_EQ(0, fj.make_writeable());
168 while (bl.length() < size_mb*1000/2) {
170 memset(foo, 1, sizeof(foo));
171 bl.append(foo, sizeof(foo));
173 vector<ObjectStore::Transaction> tls;
174 int orig_len = fj.prepare_entry(tls, &bl);
175 fj.reserve_throttle_and_backoff(bl.length());
176 fj.submit_entry(1, bl, orig_len, new C_SafeCond(&wait_lock, &cond, &done));
182 TEST(TestFileJournal, WriteMany) {
183 g_ceph_context->_conf->set_val("journal_ignore_corruption", "false");
184 g_ceph_context->_conf->set_val("journal_write_header_frequency", "0");
185 g_ceph_context->_conf->apply_changes(NULL);
187 for (unsigned i = 0 ; i < 3; ++i) {
188 SCOPED_TRACE(subtests[i].description);
189 fsid.generate_random();
190 FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
191 subtests[i].directio, subtests[i].aio, subtests[i].faio);
192 ASSERT_EQ(0, fj.create());
193 ASSERT_EQ(0, fj.make_writeable());
195 C_GatherBuilder gb(g_ceph_context, new C_SafeCond(&wait_lock, &cond, &done));
197 vector<ObjectStore::Transaction> tls;
201 for (int i=0; i<100; i++) {
203 int orig_len = fj.prepare_entry(tls, &bl);
204 fj.reserve_throttle_and_backoff(bl.length());
205 fj.submit_entry(seq++, bl, orig_len, gb.new_sub());
215 TEST(TestFileJournal, WriteManyVecs) {
216 g_ceph_context->_conf->set_val("journal_ignore_corruption", "false");
217 g_ceph_context->_conf->set_val("journal_write_header_frequency", "0");
218 g_ceph_context->_conf->apply_changes(NULL);
220 for (unsigned i = 0 ; i < 3; ++i) {
221 SCOPED_TRACE(subtests[i].description);
222 fsid.generate_random();
223 FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
224 subtests[i].directio, subtests[i].aio, subtests[i].faio);
225 ASSERT_EQ(0, fj.create());
226 ASSERT_EQ(0, fj.make_writeable());
228 C_GatherBuilder gb(g_ceph_context, new C_SafeCond(&wait_lock, &cond, &done));
231 first.append("small");
232 vector<ObjectStore::Transaction> tls;
233 int orig_len = fj.prepare_entry(tls, &first);
234 fj.reserve_throttle_and_backoff(first.length());
235 fj.submit_entry(1, first, orig_len, gb.new_sub());
238 for (int i=0; i<IOV_MAX * 2; i++) {
239 bufferptr bp = buffer::create_page_aligned(4096);
240 memset(bp.c_str(), (char)i, 4096);
243 bufferlist origbl = bl;
244 orig_len = fj.prepare_entry(tls, &bl);
245 fj.reserve_throttle_and_backoff(bl.length());
246 fj.submit_entry(2, bl, orig_len, gb.new_sub());
256 ASSERT_EQ(true, fj.read_entry(inbl, seq));
257 ASSERT_EQ(seq, 2ull);
258 ASSERT_TRUE(inbl.contents_equal(origbl));
259 ASSERT_EQ(0, fj.make_writeable());
265 TEST(TestFileJournal, ReplaySmall) {
266 g_ceph_context->_conf->set_val("journal_ignore_corruption", "false");
267 g_ceph_context->_conf->set_val("journal_write_header_frequency", "0");
268 g_ceph_context->_conf->apply_changes(NULL);
270 vector<ObjectStore::Transaction> tls;
272 for (unsigned i = 0 ; i < 3; ++i) {
273 SCOPED_TRACE(subtests[i].description);
274 fsid.generate_random();
275 FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
276 subtests[i].directio, subtests[i].aio, subtests[i].faio);
277 ASSERT_EQ(0, fj.create());
278 ASSERT_EQ(0, fj.make_writeable());
280 C_GatherBuilder gb(g_ceph_context, new C_SafeCond(&wait_lock, &cond, &done));
284 int orig_len = fj.prepare_entry(tls, &bl);
285 fj.reserve_throttle_and_backoff(bl.length());
286 fj.submit_entry(1, bl, orig_len, gb.new_sub());
288 orig_len = fj.prepare_entry(tls, &bl);
289 fj.reserve_throttle_and_backoff(bl.length());
290 fj.submit_entry(2, bl, orig_len, gb.new_sub());
292 orig_len = fj.prepare_entry(tls, &bl);
293 fj.reserve_throttle_and_backoff(bl.length());
294 fj.submit_entry(3, bl, orig_len, gb.new_sub());
305 ASSERT_EQ(true, fj.read_entry(inbl, seq));
306 ASSERT_EQ(seq, 2ull);
307 inbl.copy(0, inbl.length(), v);
308 ASSERT_EQ("small", v);
312 ASSERT_EQ(true, fj.read_entry(inbl, seq));
313 ASSERT_EQ(seq, 3ull);
314 inbl.copy(0, inbl.length(), v);
315 ASSERT_EQ("small", v);
319 ASSERT_TRUE(!fj.read_entry(inbl, seq));
321 ASSERT_EQ(0, fj.make_writeable());
326 TEST(TestFileJournal, ReplayCorrupt) {
327 g_ceph_context->_conf->set_val("journal_ignore_corruption", "true");
328 g_ceph_context->_conf->set_val("journal_write_header_frequency", "0");
329 g_ceph_context->_conf->apply_changes(NULL);
331 vector<ObjectStore::Transaction> tls;
332 for (unsigned i = 0 ; i < 3; ++i) {
333 SCOPED_TRACE(subtests[i].description);
334 fsid.generate_random();
335 FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
336 subtests[i].directio, subtests[i].aio, subtests[i].faio);
337 ASSERT_EQ(0, fj.create());
338 ASSERT_EQ(0, fj.make_writeable());
340 C_GatherBuilder gb(g_ceph_context, new C_SafeCond(&wait_lock, &cond, &done));
342 const char *needle = "i am a needle";
343 const char *newneedle = "in a haystack";
346 int orig_len = fj.prepare_entry(tls, &bl);
347 fj.reserve_throttle_and_backoff(bl.length());
348 fj.submit_entry(1, bl, orig_len, gb.new_sub());
350 orig_len = fj.prepare_entry(tls, &bl);
351 fj.reserve_throttle_and_backoff(bl.length());
352 fj.submit_entry(2, bl, orig_len, gb.new_sub());
354 orig_len = fj.prepare_entry(tls, &bl);
355 fj.reserve_throttle_and_backoff(bl.length());
356 fj.submit_entry(3, bl, orig_len, gb.new_sub());
358 orig_len = fj.prepare_entry(tls, &bl);
359 fj.reserve_throttle_and_backoff(bl.length());
360 fj.submit_entry(4, bl, orig_len, gb.new_sub());
366 cout << "corrupting journal" << std::endl;
368 int fd = open(path, O_RDONLY);
370 int r = safe_read_exact(fd, buf, sizeof(buf));
373 for (unsigned o=0; o < sizeof(buf) - strlen(needle); o++) {
374 if (memcmp(buf+o, needle, strlen(needle)) == 0) {
376 cout << "replacing at offset " << o << std::endl;
377 memcpy(buf+o, newneedle, strlen(newneedle));
379 cout << "leaving at offset " << o << std::endl;
386 fd = open(path, O_WRONLY);
388 r = safe_write(fd, buf, sizeof(buf));
397 ASSERT_EQ(true, fj.read_entry(inbl, seq));
398 ASSERT_EQ(seq, 2ull);
399 inbl.copy(0, inbl.length(), v);
400 ASSERT_EQ(needle, v);
404 ASSERT_FALSE(fj.read_entry(inbl, seq, &corrupt));
405 ASSERT_TRUE(corrupt);
407 ASSERT_EQ(0, fj.make_writeable());
412 TEST(TestFileJournal, WriteTrim) {
413 g_ceph_context->_conf->set_val("journal_ignore_corruption", "false");
414 g_ceph_context->_conf->set_val("journal_write_header_frequency", "0");
415 g_ceph_context->_conf->apply_changes(NULL);
417 for (unsigned i = 0 ; i < 3; ++i) {
418 SCOPED_TRACE(subtests[i].description);
419 fsid.generate_random();
420 FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
421 subtests[i].directio, subtests[i].aio, subtests[i].faio);
422 ASSERT_EQ(0, fj.create());
423 ASSERT_EQ(0, fj.make_writeable());
429 memset(foo, 1, sizeof(foo));
431 uint64_t seq = 1, committed = 0;
432 vector<ObjectStore::Transaction> tls;
434 for (unsigned i=0; i<size_mb*2; i++) {
436 bl.push_back(buffer::copy(foo, sizeof(foo)));
438 ls.push_back(new C_Sync);
439 int orig_len = fj.prepare_entry(tls, &bl);
440 fj.reserve_throttle_and_backoff(bl.length());
441 fj.submit_entry(seq++, bl, orig_len, ls.back()->c);
443 while (ls.size() > size_mb/2) {
447 fj.committed_thru(committed);
454 fj.committed_thru(++committed);
457 ASSERT_TRUE(fj.journalq_empty());
463 TEST(TestFileJournal, WriteTrimSmall) {
464 g_ceph_context->_conf->set_val("journal_ignore_corruption", "false");
465 g_ceph_context->_conf->set_val("journal_write_header_frequency", "0");
466 g_ceph_context->_conf->apply_changes(NULL);
467 vector<ObjectStore::Transaction> tls;
469 for (unsigned i = 0 ; i < 3; ++i) {
470 SCOPED_TRACE(subtests[i].description);
471 fsid.generate_random();
472 FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
473 subtests[i].directio, subtests[i].aio, subtests[i].faio);
474 ASSERT_EQ(0, fj.create());
475 ASSERT_EQ(0, fj.make_writeable());
481 memset(foo, 1, sizeof(foo));
483 uint64_t seq = 1, committed = 0;
485 for (unsigned i=0; i<size_mb*2; i++) {
487 for (int k=0; k<128; k++)
488 bl.push_back(buffer::copy(foo, sizeof(foo) / 128));
490 ls.push_back(new C_Sync);
491 int orig_len = fj.prepare_entry(tls, &bl);
492 fj.reserve_throttle_and_backoff(bl.length());
493 fj.submit_entry(seq++, bl, orig_len, ls.back()->c);
495 while (ls.size() > size_mb/2) {
499 fj.committed_thru(committed);
506 fj.committed_thru(committed);
513 TEST(TestFileJournal, ReplayDetectCorruptFooterMagic) {
514 g_ceph_context->_conf->set_val("journal_ignore_corruption", "true");
515 g_ceph_context->_conf->set_val("journal_write_header_frequency", "1");
516 g_ceph_context->_conf->apply_changes(NULL);
518 vector<ObjectStore::Transaction> tls;
519 for (unsigned i = 0 ; i < 3; ++i) {
520 SCOPED_TRACE(subtests[i].description);
521 fsid.generate_random();
522 FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
523 subtests[i].directio, subtests[i].aio, subtests[i].faio);
524 ASSERT_EQ(0, fj.create());
525 ASSERT_EQ(0, fj.make_writeable());
527 C_GatherBuilder gb(g_ceph_context, new C_SafeCond(&wait_lock, &cond, &done));
529 const char *needle = "i am a needle";
530 for (unsigned i = 1; i <= 4; ++i) {
533 int orig_len = fj.prepare_entry(tls, &bl);
534 fj.reserve_throttle_and_backoff(bl.length());
535 fj.submit_entry(i, bl, orig_len, gb.new_sub());
542 int orig_len = fj.prepare_entry(tls, &bl);
543 fj.reserve_throttle_and_backoff(bl.length());
544 fj.submit_entry(5, bl, orig_len, new C_SafeCond(&wait_lock, &cond, &done));
548 int fd = open(path, O_WRONLY);
550 cout << "corrupting journal" << std::endl;
552 fj.corrupt_footer_magic(fd, 2);
556 bool corrupt = false;
557 bool result = fj.read_entry(bl, seq, &corrupt);
560 ASSERT_FALSE(corrupt);
562 result = fj.read_entry(bl, seq, &corrupt);
563 ASSERT_FALSE(result);
564 ASSERT_TRUE(corrupt);
566 ASSERT_EQ(0, fj.make_writeable());
572 TEST(TestFileJournal, ReplayDetectCorruptPayload) {
573 g_ceph_context->_conf->set_val("journal_ignore_corruption", "true");
574 g_ceph_context->_conf->set_val("journal_write_header_frequency", "1");
575 g_ceph_context->_conf->apply_changes(NULL);
577 vector<ObjectStore::Transaction> tls;
578 for (unsigned i = 0 ; i < 3; ++i) {
579 SCOPED_TRACE(subtests[i].description);
580 fsid.generate_random();
581 FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
582 subtests[i].directio, subtests[i].aio, subtests[i].faio);
583 ASSERT_EQ(0, fj.create());
584 ASSERT_EQ(0, fj.make_writeable());
586 C_GatherBuilder gb(g_ceph_context, new C_SafeCond(&wait_lock, &cond, &done));
588 const char *needle = "i am a needle";
589 for (unsigned i = 1; i <= 4; ++i) {
592 int orig_len = fj.prepare_entry(tls, &bl);
593 fj.reserve_throttle_and_backoff(bl.length());
594 fj.submit_entry(i, bl, orig_len, gb.new_sub());
601 int orig_len = fj.prepare_entry(tls, &bl);
602 fj.reserve_throttle_and_backoff(bl.length());
603 fj.submit_entry(5, bl, orig_len, new C_SafeCond(&wait_lock, &cond, &done));
607 int fd = open(path, O_WRONLY);
609 cout << "corrupting journal" << std::endl;
611 fj.corrupt_payload(fd, 2);
615 bool corrupt = false;
616 bool result = fj.read_entry(bl, seq, &corrupt);
619 ASSERT_FALSE(corrupt);
621 result = fj.read_entry(bl, seq, &corrupt);
622 ASSERT_FALSE(result);
623 ASSERT_TRUE(corrupt);
625 ASSERT_EQ(0, fj.make_writeable());
631 TEST(TestFileJournal, ReplayDetectCorruptHeader) {
632 g_ceph_context->_conf->set_val("journal_ignore_corruption", "true");
633 g_ceph_context->_conf->set_val("journal_write_header_frequency", "1");
634 g_ceph_context->_conf->apply_changes(NULL);
636 vector<ObjectStore::Transaction> tls;
637 for (unsigned i = 0 ; i < 3; ++i) {
638 SCOPED_TRACE(subtests[i].description);
639 fsid.generate_random();
640 FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
641 subtests[i].directio, subtests[i].aio, subtests[i].faio);
642 ASSERT_EQ(0, fj.create());
643 ASSERT_EQ(0, fj.make_writeable());
645 C_GatherBuilder gb(g_ceph_context, new C_SafeCond(&wait_lock, &cond, &done));
647 const char *needle = "i am a needle";
648 for (unsigned i = 1; i <= 4; ++i) {
651 int orig_len = fj.prepare_entry(tls, &bl);
652 fj.reserve_throttle_and_backoff(bl.length());
653 fj.submit_entry(i, bl, orig_len, gb.new_sub());
660 int orig_len = fj.prepare_entry(tls, &bl);
661 fj.reserve_throttle_and_backoff(bl.length());
662 fj.submit_entry(5, bl, orig_len, new C_SafeCond(&wait_lock, &cond, &done));
666 int fd = open(path, O_WRONLY);
668 cout << "corrupting journal" << std::endl;
670 fj.corrupt_header_magic(fd, 2);
674 bool corrupt = false;
675 bool result = fj.read_entry(bl, seq, &corrupt);
678 ASSERT_FALSE(corrupt);
680 result = fj.read_entry(bl, seq, &corrupt);
681 ASSERT_FALSE(result);
682 ASSERT_TRUE(corrupt);
684 ASSERT_EQ(0, fj.make_writeable());