Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / test_filejournal.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 #include <gtest/gtest.h>
3 #include <stdlib.h>
4 #include <limits.h>
5
6 #include "common/ceph_argparse.h"
7 #include "common/common_init.h"
8 #include "global/global_init.h"
9 #include "common/config.h"
10 #include "common/Finisher.h"
11 #include "os/filestore/FileJournal.h"
12 #include "include/Context.h"
13 #include "common/Mutex.h"
14 #include "common/safe_io.h"
15 #include "os/filestore/JournalingObjectStore.h"
16
17 Finisher *finisher;
18 Cond sync_cond;
19 char path[200];
20 uuid_d fsid;
21 struct test_info {
22     bool directio, aio, faio;
23     const char *description;
24 } subtests[3] = {
25     { false, false, false, "DIRECTIO OFF  AIO OFF" },
26     { true, false, false, "DIRECTIO ON  AIO OFF" },
27     { true, true, true, "DIRECTIO ON  AIO ON"}
28 };
29
30 // ----
31 Cond cond;
32 Mutex wait_lock("lock");
33 bool done;
34
35 void wait()
36 {
37   wait_lock.Lock();
38   while (!done)
39     cond.Wait(wait_lock);
40   wait_lock.Unlock();
41 }
42
43 // ----
44 class C_Sync {
45 public:
46   Cond cond;
47   Mutex lock;
48   bool done;
49   C_SafeCond *c;
50
51   C_Sync()
52     : lock("C_Sync::lock"), done(false) {
53     c = new C_SafeCond(&lock, &cond, &done);
54   }
55   ~C_Sync() {
56     lock.Lock();
57     //cout << "wait" << std::endl;
58     while (!done)
59       cond.Wait(lock);
60     //cout << "waited" << std::endl;
61     lock.Unlock();
62   }
63 };
64
65 unsigned size_mb = 200;
66 //Gtest argument prefix
67 const char GTEST_PRFIX[] = "--gtest_";
68
69 int main(int argc, char **argv) {
70   vector<const char*> args;
71   argv_to_vec(argc, (const char **)argv, args);
72
73   auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
74                          CODE_ENVIRONMENT_UTILITY, 0);
75   common_init_finish(g_ceph_context);
76
77   char mb[10];
78   sprintf(mb, "%u", size_mb);
79   g_ceph_context->_conf->set_val("osd_journal_size", mb);
80   g_ceph_context->_conf->apply_changes(NULL);
81
82   finisher = new Finisher(g_ceph_context);
83   
84   path[0] = '\0';
85   if (!args.empty()) {
86     for ( unsigned int i = 0; i < args.size(); ++i) {
87       if (strncmp(args[i], GTEST_PRFIX, sizeof(GTEST_PRFIX) - 1)) {
88         //Non gtest argument, set to path.
89         size_t copy_len = std::min(sizeof(path) - 1, strlen(args[i]));
90         strncpy(path, args[i], copy_len);
91         path[copy_len] = '\0';
92         break;
93       }
94     }
95   }
96   if ( path[0] == '\0') {
97     srand(getpid() + time(0));
98     snprintf(path, sizeof(path), "/var/tmp/ceph_test_filejournal.tmp.%d", rand());
99   }
100   cout << "path " << path << std::endl;
101
102   ::testing::InitGoogleTest(&argc, argv);
103
104   finisher->start();
105
106   int r = RUN_ALL_TESTS();
107   
108   finisher->stop();
109
110   unlink(path);
111   
112   return r;
113 }
114
115 TEST(TestFileJournal, Create) {
116   g_ceph_context->_conf->set_val("journal_ignore_corruption", "false");
117   g_ceph_context->_conf->set_val("journal_write_header_frequency", "0");
118   g_ceph_context->_conf->apply_changes(NULL);
119
120   for (unsigned i = 0 ; i < 3; ++i) {
121     SCOPED_TRACE(subtests[i].description);
122     fsid.generate_random();
123     FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
124                   subtests[i].directio, subtests[i].aio, subtests[i].faio);
125     ASSERT_EQ(0, fj.create());
126   }
127 }
128
129 TEST(TestFileJournal, WriteSmall) {
130   g_ceph_context->_conf->set_val("journal_ignore_corruption", "false");
131   g_ceph_context->_conf->set_val("journal_write_header_frequency", "0");
132   g_ceph_context->_conf->apply_changes(NULL);
133
134   for (unsigned i = 0 ; i < 3; ++i) {
135     SCOPED_TRACE(subtests[i].description);
136     fsid.generate_random();
137     FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
138                   subtests[i].directio, subtests[i].aio, subtests[i].faio);
139     ASSERT_EQ(0, fj.create());
140     ASSERT_EQ(0, fj.make_writeable());
141
142     vector<ObjectStore::Transaction> tls;
143     bufferlist bl;
144     bl.append("small");
145     int orig_len = fj.prepare_entry(tls, &bl);
146     fj.reserve_throttle_and_backoff(bl.length());
147     fj.submit_entry(1, bl, orig_len, new C_SafeCond(&wait_lock, &cond, &done));
148     wait();
149
150     fj.close();
151   }
152 }
153
154 TEST(TestFileJournal, WriteBig) {
155   g_ceph_context->_conf->set_val("journal_ignore_corruption", "false");
156   g_ceph_context->_conf->set_val("journal_write_header_frequency", "0");
157   g_ceph_context->_conf->apply_changes(NULL);
158
159   for (unsigned i = 0 ; i < 3; ++i) {
160     SCOPED_TRACE(subtests[i].description);
161     fsid.generate_random();
162     FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
163                   subtests[i].directio, subtests[i].aio, subtests[i].faio);
164     ASSERT_EQ(0, fj.create());
165     ASSERT_EQ(0, fj.make_writeable());
166
167     bufferlist bl;
168     while (bl.length() < size_mb*1000/2) {
169       char foo[1024*1024];
170       memset(foo, 1, sizeof(foo));
171       bl.append(foo, sizeof(foo));
172     }
173     vector<ObjectStore::Transaction> tls;
174     int orig_len = fj.prepare_entry(tls, &bl);
175     fj.reserve_throttle_and_backoff(bl.length());
176     fj.submit_entry(1, bl, orig_len, new C_SafeCond(&wait_lock, &cond, &done));
177     wait();
178     fj.close();
179   }
180 }
181
182 TEST(TestFileJournal, WriteMany) {
183   g_ceph_context->_conf->set_val("journal_ignore_corruption", "false");
184   g_ceph_context->_conf->set_val("journal_write_header_frequency", "0");
185   g_ceph_context->_conf->apply_changes(NULL);
186
187   for (unsigned i = 0 ; i < 3; ++i) {
188     SCOPED_TRACE(subtests[i].description);
189     fsid.generate_random();
190     FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
191                   subtests[i].directio, subtests[i].aio, subtests[i].faio);
192     ASSERT_EQ(0, fj.create());
193     ASSERT_EQ(0, fj.make_writeable());
194
195     C_GatherBuilder gb(g_ceph_context, new C_SafeCond(&wait_lock, &cond, &done));
196
197     vector<ObjectStore::Transaction> tls;
198     bufferlist bl;
199     bl.append("small");
200     uint64_t seq = 1;
201     for (int i=0; i<100; i++) {
202       bl.append("small");
203       int orig_len = fj.prepare_entry(tls, &bl);
204       fj.reserve_throttle_and_backoff(bl.length());
205       fj.submit_entry(seq++, bl, orig_len, gb.new_sub());
206     }
207     gb.activate();
208
209     wait();
210
211     fj.close();
212   }
213 }
214
215 TEST(TestFileJournal, WriteManyVecs) {
216   g_ceph_context->_conf->set_val("journal_ignore_corruption", "false");
217   g_ceph_context->_conf->set_val("journal_write_header_frequency", "0");
218   g_ceph_context->_conf->apply_changes(NULL);
219
220   for (unsigned i = 0 ; i < 3; ++i) {
221     SCOPED_TRACE(subtests[i].description);
222     fsid.generate_random();
223     FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
224                   subtests[i].directio, subtests[i].aio, subtests[i].faio);
225     ASSERT_EQ(0, fj.create());
226     ASSERT_EQ(0, fj.make_writeable());
227
228     C_GatherBuilder gb(g_ceph_context, new C_SafeCond(&wait_lock, &cond, &done));
229
230     bufferlist first;
231     first.append("small");
232     vector<ObjectStore::Transaction> tls;
233     int orig_len = fj.prepare_entry(tls, &first);
234     fj.reserve_throttle_and_backoff(first.length());
235     fj.submit_entry(1, first, orig_len, gb.new_sub());
236
237     bufferlist bl;
238     for (int i=0; i<IOV_MAX * 2; i++) {
239       bufferptr bp = buffer::create_page_aligned(4096);
240       memset(bp.c_str(), (char)i, 4096);
241       bl.append(bp);
242     }
243     bufferlist origbl = bl;
244     orig_len = fj.prepare_entry(tls, &bl);
245     fj.reserve_throttle_and_backoff(bl.length());
246     fj.submit_entry(2, bl, orig_len, gb.new_sub());
247     gb.activate();
248     wait();
249
250     fj.close();
251
252     fj.open(1);
253     bufferlist inbl;
254     string v;
255     uint64_t seq = 0;
256     ASSERT_EQ(true, fj.read_entry(inbl, seq));
257     ASSERT_EQ(seq, 2ull);
258     ASSERT_TRUE(inbl.contents_equal(origbl));
259     ASSERT_EQ(0, fj.make_writeable());
260     fj.close();
261
262   }
263 }
264
265 TEST(TestFileJournal, ReplaySmall) {
266   g_ceph_context->_conf->set_val("journal_ignore_corruption", "false");
267   g_ceph_context->_conf->set_val("journal_write_header_frequency", "0");
268   g_ceph_context->_conf->apply_changes(NULL);
269
270   vector<ObjectStore::Transaction> tls;
271
272   for (unsigned i = 0 ; i < 3; ++i) {
273     SCOPED_TRACE(subtests[i].description);
274     fsid.generate_random();
275     FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
276                   subtests[i].directio, subtests[i].aio, subtests[i].faio);
277     ASSERT_EQ(0, fj.create());
278     ASSERT_EQ(0, fj.make_writeable());
279
280     C_GatherBuilder gb(g_ceph_context, new C_SafeCond(&wait_lock, &cond, &done));
281
282     bufferlist bl;
283     bl.append("small");
284     int orig_len = fj.prepare_entry(tls, &bl);
285     fj.reserve_throttle_and_backoff(bl.length());
286     fj.submit_entry(1, bl, orig_len, gb.new_sub());
287     bl.append("small");
288     orig_len = fj.prepare_entry(tls, &bl);
289     fj.reserve_throttle_and_backoff(bl.length());
290     fj.submit_entry(2, bl, orig_len, gb.new_sub());
291     bl.append("small");
292     orig_len = fj.prepare_entry(tls, &bl);
293     fj.reserve_throttle_and_backoff(bl.length());
294     fj.submit_entry(3, bl, orig_len, gb.new_sub());
295     gb.activate();
296     wait();
297
298     fj.close();
299
300     fj.open(1);
301
302     bufferlist inbl;
303     string v;
304     uint64_t seq = 0;
305     ASSERT_EQ(true, fj.read_entry(inbl, seq));
306     ASSERT_EQ(seq, 2ull);
307     inbl.copy(0, inbl.length(), v);
308     ASSERT_EQ("small", v);
309     inbl.clear();
310     v.clear();
311
312     ASSERT_EQ(true, fj.read_entry(inbl, seq));
313     ASSERT_EQ(seq, 3ull);
314     inbl.copy(0, inbl.length(), v);
315     ASSERT_EQ("small", v);
316     inbl.clear();
317     v.clear();
318
319     ASSERT_TRUE(!fj.read_entry(inbl, seq));
320
321     ASSERT_EQ(0, fj.make_writeable());
322     fj.close();
323   }
324 }
325
326 TEST(TestFileJournal, ReplayCorrupt) {
327   g_ceph_context->_conf->set_val("journal_ignore_corruption", "true");
328   g_ceph_context->_conf->set_val("journal_write_header_frequency", "0");
329   g_ceph_context->_conf->apply_changes(NULL);
330
331   vector<ObjectStore::Transaction> tls;
332   for (unsigned i = 0 ; i < 3; ++i) {
333     SCOPED_TRACE(subtests[i].description);
334     fsid.generate_random();
335     FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
336                   subtests[i].directio, subtests[i].aio, subtests[i].faio);
337     ASSERT_EQ(0, fj.create());
338     ASSERT_EQ(0, fj.make_writeable());
339
340     C_GatherBuilder gb(g_ceph_context, new C_SafeCond(&wait_lock, &cond, &done));
341
342     const char *needle =    "i am a needle";
343     const char *newneedle = "in a haystack";
344     bufferlist bl;
345     bl.append(needle);
346     int orig_len = fj.prepare_entry(tls, &bl);
347     fj.reserve_throttle_and_backoff(bl.length());
348     fj.submit_entry(1, bl, orig_len, gb.new_sub());
349     bl.append(needle);
350     orig_len = fj.prepare_entry(tls, &bl);
351     fj.reserve_throttle_and_backoff(bl.length());
352     fj.submit_entry(2, bl, orig_len, gb.new_sub());
353     bl.append(needle);
354     orig_len = fj.prepare_entry(tls, &bl);
355     fj.reserve_throttle_and_backoff(bl.length());
356     fj.submit_entry(3, bl, orig_len, gb.new_sub());
357     bl.append(needle);
358     orig_len = fj.prepare_entry(tls, &bl);
359     fj.reserve_throttle_and_backoff(bl.length());
360     fj.submit_entry(4, bl, orig_len, gb.new_sub());
361     gb.activate();
362     wait();
363
364     fj.close();
365
366     cout << "corrupting journal" << std::endl;
367     char buf[1024*128];
368     int fd = open(path, O_RDONLY);
369     ASSERT_GE(fd, 0);
370     int r = safe_read_exact(fd, buf, sizeof(buf));
371     ASSERT_EQ(0, r);
372     int n = 0;
373     for (unsigned o=0; o < sizeof(buf) - strlen(needle); o++) {
374       if (memcmp(buf+o, needle, strlen(needle)) == 0) {
375         if (n >= 2) {
376           cout << "replacing at offset " << o << std::endl;
377           memcpy(buf+o, newneedle, strlen(newneedle));
378         } else {
379           cout << "leaving at offset " << o << std::endl;
380         }
381         n++;
382       }
383     }
384     ASSERT_EQ(n, 4);
385     close(fd);
386     fd = open(path, O_WRONLY);
387     ASSERT_GE(fd, 0);
388     r = safe_write(fd, buf, sizeof(buf));
389     ASSERT_EQ(r, 0);
390     close(fd);
391
392     fj.open(1);
393
394     bufferlist inbl;
395     string v;
396     uint64_t seq = 0;
397     ASSERT_EQ(true, fj.read_entry(inbl, seq));
398     ASSERT_EQ(seq, 2ull);
399     inbl.copy(0, inbl.length(), v);
400     ASSERT_EQ(needle, v);
401     inbl.clear();
402     v.clear();
403     bool corrupt;
404     ASSERT_FALSE(fj.read_entry(inbl, seq, &corrupt));
405     ASSERT_TRUE(corrupt);
406
407     ASSERT_EQ(0, fj.make_writeable());
408     fj.close();
409   }
410 }
411
412 TEST(TestFileJournal, WriteTrim) {
413   g_ceph_context->_conf->set_val("journal_ignore_corruption", "false");
414   g_ceph_context->_conf->set_val("journal_write_header_frequency", "0");
415   g_ceph_context->_conf->apply_changes(NULL);
416
417   for (unsigned i = 0 ; i < 3; ++i) {
418     SCOPED_TRACE(subtests[i].description);
419     fsid.generate_random();
420     FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
421                   subtests[i].directio, subtests[i].aio, subtests[i].faio);
422     ASSERT_EQ(0, fj.create());
423     ASSERT_EQ(0, fj.make_writeable());
424
425     list<C_Sync*> ls;
426
427     bufferlist bl;
428     char foo[1024*1024];
429     memset(foo, 1, sizeof(foo));
430
431     uint64_t seq = 1, committed = 0;
432     vector<ObjectStore::Transaction> tls;
433
434     for (unsigned i=0; i<size_mb*2; i++) {
435       bl.clear();
436       bl.push_back(buffer::copy(foo, sizeof(foo)));
437       bl.zero();
438       ls.push_back(new C_Sync);
439       int orig_len = fj.prepare_entry(tls, &bl);
440       fj.reserve_throttle_and_backoff(bl.length());
441       fj.submit_entry(seq++, bl, orig_len, ls.back()->c);
442
443       while (ls.size() > size_mb/2) {
444         delete ls.front();
445         ls.pop_front();
446         committed++;
447         fj.committed_thru(committed);
448       }
449     }
450
451     while (ls.size()) {
452       delete ls.front();
453       ls.pop_front();
454       fj.committed_thru(++committed);
455     }
456
457     ASSERT_TRUE(fj.journalq_empty());
458
459     fj.close();
460   }
461 }
462
463 TEST(TestFileJournal, WriteTrimSmall) {
464   g_ceph_context->_conf->set_val("journal_ignore_corruption", "false");
465   g_ceph_context->_conf->set_val("journal_write_header_frequency", "0");
466   g_ceph_context->_conf->apply_changes(NULL);
467   vector<ObjectStore::Transaction> tls;
468
469   for (unsigned i = 0 ; i < 3; ++i) {
470     SCOPED_TRACE(subtests[i].description);
471     fsid.generate_random();
472     FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
473                   subtests[i].directio, subtests[i].aio, subtests[i].faio);
474     ASSERT_EQ(0, fj.create());
475     ASSERT_EQ(0, fj.make_writeable());
476
477     list<C_Sync*> ls;
478
479     bufferlist bl;
480     char foo[1024*1024];
481     memset(foo, 1, sizeof(foo));
482
483     uint64_t seq = 1, committed = 0;
484
485     for (unsigned i=0; i<size_mb*2; i++) {
486       bl.clear();
487       for (int k=0; k<128; k++)
488         bl.push_back(buffer::copy(foo, sizeof(foo) / 128));
489       bl.zero();
490       ls.push_back(new C_Sync);
491       int orig_len = fj.prepare_entry(tls, &bl);
492       fj.reserve_throttle_and_backoff(bl.length());
493       fj.submit_entry(seq++, bl, orig_len, ls.back()->c);
494
495       while (ls.size() > size_mb/2) {
496         delete ls.front();
497         ls.pop_front();
498         committed++;
499         fj.committed_thru(committed);
500       }
501     }
502
503     while (ls.size()) {
504       delete ls.front();
505       ls.pop_front();
506       fj.committed_thru(committed);
507     }
508
509     fj.close();
510   }
511 }
512
513 TEST(TestFileJournal, ReplayDetectCorruptFooterMagic) {
514   g_ceph_context->_conf->set_val("journal_ignore_corruption", "true");
515   g_ceph_context->_conf->set_val("journal_write_header_frequency", "1");
516   g_ceph_context->_conf->apply_changes(NULL);
517
518   vector<ObjectStore::Transaction> tls;
519   for (unsigned i = 0 ; i < 3; ++i) {
520     SCOPED_TRACE(subtests[i].description);
521     fsid.generate_random();
522     FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
523                   subtests[i].directio, subtests[i].aio, subtests[i].faio);
524     ASSERT_EQ(0, fj.create());
525     ASSERT_EQ(0, fj.make_writeable());
526
527     C_GatherBuilder gb(g_ceph_context, new C_SafeCond(&wait_lock, &cond, &done));
528
529     const char *needle =    "i am a needle";
530     for (unsigned i = 1; i <= 4; ++i) {
531       bufferlist bl;
532       bl.append(needle);
533       int orig_len = fj.prepare_entry(tls, &bl);
534       fj.reserve_throttle_and_backoff(bl.length());
535       fj.submit_entry(i, bl, orig_len, gb.new_sub());
536     }
537     gb.activate();
538     wait();
539
540     bufferlist bl;
541     bl.append("needle");
542     int orig_len = fj.prepare_entry(tls, &bl);
543     fj.reserve_throttle_and_backoff(bl.length());
544     fj.submit_entry(5, bl, orig_len, new C_SafeCond(&wait_lock, &cond, &done));
545     wait();
546
547     fj.close();
548     int fd = open(path, O_WRONLY);
549
550     cout << "corrupting journal" << std::endl;
551     fj.open(0);
552     fj.corrupt_footer_magic(fd, 2);
553
554     uint64_t seq = 0;
555     bl.clear();
556     bool corrupt = false;
557     bool result = fj.read_entry(bl, seq, &corrupt);
558     ASSERT_TRUE(result);
559     ASSERT_EQ(seq, 1UL);
560     ASSERT_FALSE(corrupt);
561
562     result = fj.read_entry(bl, seq, &corrupt);
563     ASSERT_FALSE(result);
564     ASSERT_TRUE(corrupt);
565
566     ASSERT_EQ(0, fj.make_writeable());
567     fj.close();
568     ::close(fd);
569   }
570 }
571
572 TEST(TestFileJournal, ReplayDetectCorruptPayload) {
573   g_ceph_context->_conf->set_val("journal_ignore_corruption", "true");
574   g_ceph_context->_conf->set_val("journal_write_header_frequency", "1");
575   g_ceph_context->_conf->apply_changes(NULL);
576
577   vector<ObjectStore::Transaction> tls;
578   for (unsigned i = 0 ; i < 3; ++i) {
579     SCOPED_TRACE(subtests[i].description);
580     fsid.generate_random();
581     FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
582                   subtests[i].directio, subtests[i].aio, subtests[i].faio);
583     ASSERT_EQ(0, fj.create());
584     ASSERT_EQ(0, fj.make_writeable());
585
586     C_GatherBuilder gb(g_ceph_context, new C_SafeCond(&wait_lock, &cond, &done));
587
588     const char *needle =    "i am a needle";
589     for (unsigned i = 1; i <= 4; ++i) {
590       bufferlist bl;
591       bl.append(needle);
592       int orig_len = fj.prepare_entry(tls, &bl);
593       fj.reserve_throttle_and_backoff(bl.length());
594       fj.submit_entry(i, bl, orig_len, gb.new_sub());
595     }
596     gb.activate();
597     wait();
598
599     bufferlist bl;
600     bl.append("needle");
601     int orig_len = fj.prepare_entry(tls, &bl);
602     fj.reserve_throttle_and_backoff(bl.length());
603     fj.submit_entry(5, bl, orig_len, new C_SafeCond(&wait_lock, &cond, &done));
604     wait();
605
606     fj.close();
607     int fd = open(path, O_WRONLY);
608
609     cout << "corrupting journal" << std::endl;
610     fj.open(0);
611     fj.corrupt_payload(fd, 2);
612
613     uint64_t seq = 0;
614     bl.clear();
615     bool corrupt = false;
616     bool result = fj.read_entry(bl, seq, &corrupt);
617     ASSERT_TRUE(result);
618     ASSERT_EQ(seq, 1UL);
619     ASSERT_FALSE(corrupt);
620
621     result = fj.read_entry(bl, seq, &corrupt);
622     ASSERT_FALSE(result);
623     ASSERT_TRUE(corrupt);
624
625     ASSERT_EQ(0, fj.make_writeable());
626     fj.close();
627     ::close(fd);
628   }
629 }
630
631 TEST(TestFileJournal, ReplayDetectCorruptHeader) {
632   g_ceph_context->_conf->set_val("journal_ignore_corruption", "true");
633   g_ceph_context->_conf->set_val("journal_write_header_frequency", "1");
634   g_ceph_context->_conf->apply_changes(NULL);
635
636   vector<ObjectStore::Transaction> tls;
637   for (unsigned i = 0 ; i < 3; ++i) {
638     SCOPED_TRACE(subtests[i].description);
639     fsid.generate_random();
640     FileJournal fj(g_ceph_context, fsid, finisher, &sync_cond, path,
641                   subtests[i].directio, subtests[i].aio, subtests[i].faio);
642     ASSERT_EQ(0, fj.create());
643     ASSERT_EQ(0, fj.make_writeable());
644
645     C_GatherBuilder gb(g_ceph_context, new C_SafeCond(&wait_lock, &cond, &done));
646
647     const char *needle =    "i am a needle";
648     for (unsigned i = 1; i <= 4; ++i) {
649       bufferlist bl;
650       bl.append(needle);
651       int orig_len = fj.prepare_entry(tls, &bl);
652       fj.reserve_throttle_and_backoff(bl.length());
653       fj.submit_entry(i, bl, orig_len, gb.new_sub());
654     }
655     gb.activate();
656     wait();
657
658     bufferlist bl;
659     bl.append("needle");
660     int orig_len = fj.prepare_entry(tls, &bl);
661     fj.reserve_throttle_and_backoff(bl.length());
662     fj.submit_entry(5, bl, orig_len, new C_SafeCond(&wait_lock, &cond, &done));
663     wait();
664
665     fj.close();
666     int fd = open(path, O_WRONLY);
667
668     cout << "corrupting journal" << std::endl;
669     fj.open(0);
670     fj.corrupt_header_magic(fd, 2);
671
672     uint64_t seq = 0;
673     bl.clear();
674     bool corrupt = false;
675     bool result = fj.read_entry(bl, seq, &corrupt);
676     ASSERT_TRUE(result);
677     ASSERT_EQ(seq, 1UL);
678     ASSERT_FALSE(corrupt);
679
680     result = fj.read_entry(bl, seq, &corrupt);
681     ASSERT_FALSE(result);
682     ASSERT_TRUE(corrupt);
683
684     ASSERT_EQ(0, fj.make_writeable());
685     fj.close();
686     ::close(fd);
687   }
688 }