Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / os / filestore / FileJournal.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14 #include "acconfig.h"
15
16 #include "common/debug.h"
17 #include "common/errno.h"
18 #include "common/safe_io.h"
19 #include "FileJournal.h"
20 #include "include/color.h"
21 #include "common/perf_counters.h"
22 #include "FileStore.h"
23
24 #include "include/compat.h"
25
26 #include <fcntl.h>
27 #include <limits.h>
28 #include <sstream>
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <sys/types.h>
32 #include <sys/stat.h>
33 #include <sys/mount.h>
34
35 #include "common/blkdev.h"
36 #if defined(__linux__)
37 #include "common/linux_version.h"
38 #endif
39
40 #if defined(__FreeBSD__)
41 #define O_DSYNC O_SYNC
42 #endif
43
44 #define dout_context cct
45 #define dout_subsys ceph_subsys_journal
46 #undef dout_prefix
47 #define dout_prefix *_dout << "journal "
48
49 const static int64_t ONE_MEG(1 << 20);
50 const static int CEPH_DIRECTIO_ALIGNMENT(4096);
51
52
53 int FileJournal::_open(bool forwrite, bool create)
54 {
55   int flags, ret;
56
57   if (forwrite) {
58     flags = O_RDWR;
59     if (directio)
60       flags |= O_DIRECT | O_DSYNC;
61   } else {
62     flags = O_RDONLY;
63   }
64   if (create)
65     flags |= O_CREAT;
66
67   if (fd >= 0) {
68     if (TEMP_FAILURE_RETRY(::close(fd))) {
69       int err = errno;
70       derr << "FileJournal::_open: error closing old fd: "
71            << cpp_strerror(err) << dendl;
72     }
73   }
74   fd = TEMP_FAILURE_RETRY(::open(fn.c_str(), flags, 0644));
75   if (fd < 0) {
76     int err = errno;
77     dout(2) << "FileJournal::_open unable to open journal "
78             << fn << ": " << cpp_strerror(err) << dendl;
79     return -err;
80   }
81
82   struct stat st;
83   ret = ::fstat(fd, &st);
84   if (ret) {
85     ret = errno;
86     derr << "FileJournal::_open: unable to fstat journal: " << cpp_strerror(ret) << dendl;
87     ret = -ret;
88     goto out_fd;
89   }
90
91   if (S_ISBLK(st.st_mode)) {
92     ret = _open_block_device();
93   } else if (S_ISREG(st.st_mode)) {
94     if (aio && !force_aio) {
95       derr << "FileJournal::_open: disabling aio for non-block journal.  Use "
96            << "journal_force_aio to force use of aio anyway" << dendl;
97       aio = false;
98     }
99     ret = _open_file(st.st_size, st.st_blksize, create);
100   } else {
101     derr << "FileJournal::_open: wrong journal file type: " << st.st_mode
102          << dendl;
103     ret = -EINVAL;
104   }
105
106   if (ret)
107     goto out_fd;
108
109 #ifdef HAVE_LIBAIO
110   if (aio) {
111     aio_ctx = 0;
112     ret = io_setup(128, &aio_ctx);
113     if (ret < 0) {
114       switch (ret) {
115         // Contrary to naive expectations -EAGIAN means ...
116         case -EAGAIN:
117           derr << "FileJournal::_open: user's limit of aio events exceeded. "
118                << "Try increasing /proc/sys/fs/aio-max-nr" << dendl;
119           break;
120         default:
121           derr << "FileJournal::_open: unable to setup io_context " << cpp_strerror(-ret) << dendl;
122           break;
123       }
124       goto out_fd;
125     }
126   }
127 #endif
128
129   /* We really want max_size to be a multiple of block_size. */
130   max_size -= max_size % block_size;
131
132   dout(1) << "_open " << fn << " fd " << fd
133           << ": " << max_size
134           << " bytes, block size " << block_size
135           << " bytes, directio = " << directio
136           << ", aio = " << aio
137           << dendl;
138   return 0;
139
140  out_fd:
141   VOID_TEMP_FAILURE_RETRY(::close(fd));
142   fd = -1;
143   return ret;
144 }
145
146 int FileJournal::_open_block_device()
147 {
148   int64_t bdev_sz = 0;
149   int ret = get_block_device_size(fd, &bdev_sz);
150   if (ret) {
151     dout(0) << __func__ << ": failed to read block device size." << dendl;
152     return -EIO;
153   }
154
155   /* Check for bdev_sz too small */
156   if (bdev_sz < ONE_MEG) {
157     dout(0) << __func__ << ": your block device must be at least "
158       << ONE_MEG << " bytes to be used for a Ceph journal." << dendl;
159     return -EINVAL;
160   }
161
162   dout(10) << __func__ << ": ignoring osd journal size. "
163            << "We'll use the entire block device (size: " << bdev_sz << ")"
164            << dendl;
165   max_size = bdev_sz;
166
167   block_size = cct->_conf->journal_block_size;
168
169   if (cct->_conf->journal_discard) {
170     discard = block_device_support_discard(fn.c_str());
171     dout(10) << fn << " support discard: " << (int)discard << dendl;
172   }
173
174   return 0;
175 }
176
177 int FileJournal::_open_file(int64_t oldsize, blksize_t blksize,
178                             bool create)
179 {
180   int ret;
181   int64_t conf_journal_sz(cct->_conf->osd_journal_size);
182   conf_journal_sz <<= 20;
183
184   if ((cct->_conf->osd_journal_size == 0) && (oldsize < ONE_MEG)) {
185     derr << "I'm sorry, I don't know how large of a journal to create."
186          << "Please specify a block device to use as the journal OR "
187          << "set osd_journal_size in your ceph.conf" << dendl;
188     return -EINVAL;
189   }
190
191   if (create && (oldsize < conf_journal_sz)) {
192     uint64_t newsize(conf_journal_sz);
193     dout(10) <<  __func__ << " _open extending to " << newsize << " bytes" << dendl;
194     ret = ::ftruncate(fd, newsize);
195     if (ret < 0) {
196       int err = errno;
197       derr << "FileJournal::_open_file : unable to extend journal to "
198            << newsize << " bytes: " << cpp_strerror(err) << dendl;
199       return -err;
200     }
201 #ifdef HAVE_POSIX_FALLOCATE
202     ret = ::posix_fallocate(fd, 0, newsize);
203     if (ret) {
204       derr << "FileJournal::_open_file : unable to preallocation journal to "
205            << newsize << " bytes: " << cpp_strerror(ret) << dendl;
206       return -ret;
207     }
208     max_size = newsize;
209 #elif defined(__APPLE__)
210     fstore_t store;
211     store.fst_flags = F_ALLOCATECONTIG;
212     store.fst_posmode = F_PEOFPOSMODE;
213     store.fst_offset = 0;
214     store.fst_length = newsize;
215
216     ret = ::fcntl(fd, F_PREALLOCATE, &store);
217     if (ret == -1) {
218       ret = -errno;
219       derr << "FileJournal::_open_file : unable to preallocation journal to "
220            << newsize << " bytes: " << cpp_strerror(ret) << dendl;
221       return ret;
222     }
223     max_size = newsize;
224 #else
225 # error "Journal pre-allocation not supported on platform."
226 #endif
227   }
228   else {
229     max_size = oldsize;
230   }
231   block_size = cct->_conf->journal_block_size;
232
233   if (create && cct->_conf->journal_zero_on_create) {
234     derr << "FileJournal::_open_file : zeroing journal" << dendl;
235     uint64_t write_size = 1 << 20;
236     char *buf;
237     ret = ::posix_memalign((void **)&buf, block_size, write_size);
238     if (ret != 0) {
239       return -ret;
240     }
241     memset(static_cast<void*>(buf), 0, write_size);
242     uint64_t i = 0;
243     for (; (i + write_size) <= (uint64_t)max_size; i += write_size) {
244       ret = ::pwrite(fd, static_cast<void*>(buf), write_size, i);
245       if (ret < 0) {
246         free(buf);
247         return -errno;
248       }
249     }
250     if (i < (uint64_t)max_size) {
251       ret = ::pwrite(fd, static_cast<void*>(buf), max_size - i, i);
252       if (ret < 0) {
253         free(buf);
254         return -errno;
255       }
256     }
257     free(buf);
258   }
259
260
261   dout(10) << "_open journal is not a block device, NOT checking disk "
262            << "write cache on '" << fn << "'" << dendl;
263
264   return 0;
265 }
266
267 // This can not be used on an active journal
268 int FileJournal::check()
269 {
270   int ret;
271
272   assert(fd == -1);
273   ret = _open(false, false);
274   if (ret)
275     return ret;
276
277   ret = read_header(&header);
278   if (ret < 0)
279     goto done;
280
281   if (header.fsid != fsid) {
282     derr << "check: ondisk fsid " << header.fsid << " doesn't match expected " << fsid
283          << ", invalid (someone else's?) journal" << dendl;
284     ret = -EINVAL;
285     goto done;
286   }
287
288   dout(1) << "check: header looks ok" << dendl;
289   ret = 0;
290
291  done:
292   close();
293   return ret;
294 }
295
296
297 int FileJournal::create()
298 {
299   void *buf = 0;
300   int64_t needed_space;
301   int ret;
302   buffer::ptr bp;
303   dout(2) << "create " << fn << " fsid " << fsid << dendl;
304
305   ret = _open(true, true);
306   if (ret)
307     goto done;
308
309   // write empty header
310   header = header_t();
311   header.flags = header_t::FLAG_CRC;  // enable crcs on any new journal.
312   header.fsid = fsid;
313   header.max_size = max_size;
314   header.block_size = block_size;
315   if (cct->_conf->journal_block_align || directio)
316     header.alignment = block_size;
317   else
318     header.alignment = 16;  // at least stay word aligned on 64bit machines...
319
320   header.start = get_top();
321   header.start_seq = 0;
322
323   print_header(header);
324
325   // static zeroed buffer for alignment padding
326   delete [] zero_buf;
327   zero_buf = new char[header.alignment];
328   memset(zero_buf, 0, header.alignment);
329
330   bp = prepare_header();
331   if (TEMP_FAILURE_RETRY(::pwrite(fd, bp.c_str(), bp.length(), 0)) < 0) {
332     ret = -errno;
333     derr << "FileJournal::create : create write header error "
334          << cpp_strerror(ret) << dendl;
335     goto close_fd;
336   }
337
338   // zero first little bit, too.
339   ret = posix_memalign(&buf, block_size, block_size);
340   if (ret) {
341     ret = -ret;
342     derr << "FileJournal::create: failed to allocate " << block_size
343          << " bytes of memory: " << cpp_strerror(ret) << dendl;
344     goto close_fd;
345   }
346   memset(buf, 0, block_size);
347   if (TEMP_FAILURE_RETRY(::pwrite(fd, buf, block_size, get_top())) < 0) {
348     ret = -errno;
349     derr << "FileJournal::create: error zeroing first " << block_size
350          << " bytes " << cpp_strerror(ret) << dendl;
351     goto free_buf;
352   }
353
354   needed_space = ((int64_t)cct->_conf->osd_max_write_size) << 20;
355   needed_space += (2 * sizeof(entry_header_t)) + get_top();
356   if (header.max_size - header.start < needed_space) {
357     derr << "FileJournal::create: OSD journal is not large enough to hold "
358          << "osd_max_write_size bytes!" << dendl;
359     ret = -ENOSPC;
360     goto free_buf;
361   }
362
363   dout(2) << "create done" << dendl;
364   ret = 0;
365
366 free_buf:
367   free(buf);
368   buf = 0;
369 close_fd:
370   if (TEMP_FAILURE_RETRY(::close(fd)) < 0) {
371     ret = -errno;
372     derr << "FileJournal::create: error closing fd: " << cpp_strerror(ret)
373          << dendl;
374   }
375 done:
376   fd = -1;
377   return ret;
378 }
379
380 // This can not be used on an active journal
381 int FileJournal::peek_fsid(uuid_d& fsid)
382 {
383   assert(fd == -1);
384   int r = _open(false, false);
385   if (r)
386     return r;
387   r = read_header(&header);
388   if (r < 0)
389     goto out;
390   fsid = header.fsid;
391 out:
392   close();
393   return r;
394 }
395
396 int FileJournal::open(uint64_t fs_op_seq)
397 {
398   dout(2) << "open " << fn << " fsid " << fsid << " fs_op_seq " << fs_op_seq << dendl;
399
400   uint64_t next_seq = fs_op_seq + 1;
401   uint64_t seq = -1;
402
403   int err = _open(false);
404   if (err)
405     return err;
406
407   // assume writeable, unless...
408   read_pos = 0;
409   write_pos = get_top();
410
411   // read header?
412   err = read_header(&header);
413   if (err < 0)
414     goto out;
415
416   // static zeroed buffer for alignment padding
417   delete [] zero_buf;
418   zero_buf = new char[header.alignment];
419   memset(zero_buf, 0, header.alignment);
420
421   dout(10) << "open header.fsid = " << header.fsid
422     //<< " vs expected fsid = " << fsid
423            << dendl;
424   if (header.fsid != fsid) {
425     derr << "FileJournal::open: ondisk fsid " << header.fsid << " doesn't match expected " << fsid
426          << ", invalid (someone else's?) journal" << dendl;
427     err = -EINVAL;
428     goto out;
429   }
430   if (header.max_size > max_size) {
431     dout(2) << "open journal size " << header.max_size << " > current " << max_size << dendl;
432     err = -EINVAL;
433     goto out;
434   }
435   if (header.block_size != block_size) {
436     dout(2) << "open journal block size " << header.block_size << " != current " << block_size << dendl;
437     err = -EINVAL;
438     goto out;
439   }
440   if (header.max_size % header.block_size) {
441     dout(2) << "open journal max size " << header.max_size
442             << " not a multiple of block size " << header.block_size << dendl;
443     err = -EINVAL;
444     goto out;
445   }
446   if (header.alignment != block_size && directio) {
447     dout(0) << "open journal alignment " << header.alignment << " does not match block size "
448             << block_size << " (required for direct_io journal mode)" << dendl;
449     err = -EINVAL;
450     goto out;
451   }
452   if ((header.alignment % CEPH_DIRECTIO_ALIGNMENT) && directio) {
453     dout(0) << "open journal alignment " << header.alignment
454             << " is not multiple of minimum directio alignment "
455             << CEPH_DIRECTIO_ALIGNMENT << " (required for direct_io journal mode)"
456             << dendl;
457     err = -EINVAL;
458     goto out;
459   }
460
461   // looks like a valid header.
462   write_pos = 0;  // not writeable yet
463
464   journaled_seq = header.committed_up_to;
465
466   // find next entry
467   read_pos = header.start;
468   seq = header.start_seq;
469
470   while (1) {
471     bufferlist bl;
472     off64_t old_pos = read_pos;
473     if (!read_entry(bl, seq)) {
474       dout(10) << "open reached end of journal." << dendl;
475       break;
476     }
477     if (seq > next_seq) {
478       dout(10) << "open entry " << seq << " len " << bl.length() << " > next_seq " << next_seq
479                << ", ignoring journal contents"
480                << dendl;
481       read_pos = -1;
482       last_committed_seq = 0;
483       return 0;
484     }
485     if (seq == next_seq) {
486       dout(10) << "open reached seq " << seq << dendl;
487       read_pos = old_pos;
488       break;
489     }
490     seq++;  // next event should follow.
491   }
492
493   return 0;
494 out:
495   close();
496   return err;
497 }
498
499 void FileJournal::_close(int fd) const
500 {
501   VOID_TEMP_FAILURE_RETRY(::close(fd));
502 }
503
504 void FileJournal::close()
505 {
506   dout(1) << "close " << fn << dendl;
507
508   // stop writer thread
509   stop_writer();
510
511   // close
512   assert(writeq_empty());
513   assert(!must_write_header);
514   assert(fd >= 0);
515   _close(fd);
516   fd = -1;
517 }
518
519
520 int FileJournal::dump(ostream& out)
521 {
522   return _dump(out, false);
523 }
524
525 int FileJournal::simple_dump(ostream& out)
526 {
527   return _dump(out, true);
528 }
529
530 int FileJournal::_dump(ostream& out, bool simple)
531 {
532   JSONFormatter f(true);
533   int ret = _fdump(f, simple);
534   f.flush(out);
535   return ret;
536 }
537
538 int FileJournal::_fdump(Formatter &f, bool simple)
539 {
540   dout(10) << "_fdump" << dendl;
541
542   assert(fd == -1);
543   int err = _open(false, false);
544   if (err)
545     return err;
546
547   err = read_header(&header);
548   if (err < 0) {
549     close();
550     return err;
551   }
552
553   off64_t next_pos = header.start;
554
555   f.open_object_section("journal");
556
557   f.open_object_section("header");
558   f.dump_unsigned("flags", header.flags);
559   ostringstream os;
560   os << header.fsid;
561   f.dump_string("fsid", os.str());
562   f.dump_unsigned("block_size", header.block_size);
563   f.dump_unsigned("alignment", header.alignment);
564   f.dump_int("max_size", header.max_size);
565   f.dump_int("start", header.start);
566   f.dump_unsigned("committed_up_to", header.committed_up_to);
567   f.dump_unsigned("start_seq", header.start_seq);
568   f.close_section();
569
570   f.open_array_section("entries");
571   uint64_t seq = header.start_seq;
572   while (1) {
573     bufferlist bl;
574     off64_t pos = next_pos;
575
576     if (!pos) {
577       dout(2) << "_dump -- not readable" << dendl;
578       err = -EINVAL;
579       break;
580     }
581     stringstream ss;
582     read_entry_result result = do_read_entry(
583       pos,
584       &next_pos,
585       &bl,
586       &seq,
587       &ss);
588     if (result != SUCCESS) {
589       if (seq < header.committed_up_to) {
590         dout(2) << "Unable to read past sequence " << seq
591             << " but header indicates the journal has committed up through "
592             << header.committed_up_to << ", journal is corrupt" << dendl;
593         err = -EINVAL;
594       }
595       dout(25) << ss.str() << dendl;
596       dout(25) << "No further valid entries found, journal is most likely valid"
597           << dendl;
598       break;
599     }
600
601     f.open_object_section("entry");
602     f.dump_unsigned("offset", pos);
603     f.dump_unsigned("seq", seq);
604     if (simple) {
605       f.dump_unsigned("bl.length", bl.length());
606     } else {
607       f.open_array_section("transactions");
608       bufferlist::iterator p = bl.begin();
609       int trans_num = 0;
610       while (!p.end()) {
611         ObjectStore::Transaction t(p);
612         f.open_object_section("transaction");
613         f.dump_unsigned("trans_num", trans_num);
614         t.dump(&f);
615         f.close_section();
616         trans_num++;
617       }
618       f.close_section();
619     }
620     f.close_section();
621   }
622
623   f.close_section();
624   f.close_section();
625   dout(10) << "dump finish" << dendl;
626
627   close();
628   return err;
629 }
630
631
632 void FileJournal::start_writer()
633 {
634   write_stop = false;
635   aio_stop = false;
636   write_thread.create("journal_write");
637 #ifdef HAVE_LIBAIO
638   if (aio)
639     write_finish_thread.create("journal_wrt_fin");
640 #endif
641 }
642
643 void FileJournal::stop_writer()
644 {
645   // Do nothing if writer already stopped or never started
646   if (!write_stop)
647   {
648     {
649       Mutex::Locker l(write_lock);
650       Mutex::Locker p(writeq_lock);
651       write_stop = true;
652       writeq_cond.Signal();
653       // Doesn't hurt to signal commit_cond in case thread is waiting there
654       // and caller didn't use committed_thru() first.
655       commit_cond.Signal();
656     }
657     write_thread.join();
658
659     // write journal header now so that we have less to replay on remount
660     write_header_sync();
661   }
662
663 #ifdef HAVE_LIBAIO
664   // stop aio completeion thread *after* writer thread has stopped
665   // and has submitted all of its io
666   if (aio && !aio_stop) {
667     aio_lock.Lock();
668     aio_stop = true;
669     aio_cond.Signal();
670     write_finish_cond.Signal();
671     aio_lock.Unlock();
672     write_finish_thread.join();
673   }
674 #endif
675 }
676
677
678
679 void FileJournal::print_header(const header_t &header) const
680 {
681   dout(10) << "header: block_size " << header.block_size
682            << " alignment " << header.alignment
683            << " max_size " << header.max_size
684            << dendl;
685   dout(10) << "header: start " << header.start << dendl;
686   dout(10) << " write_pos " << write_pos << dendl;
687 }
688
689 int FileJournal::read_header(header_t *hdr) const
690 {
691   dout(10) << "read_header" << dendl;
692   bufferlist bl;
693
694   buffer::ptr bp = buffer::create_page_aligned(block_size);
695   char* bpdata = bp.c_str();
696   int r = ::pread(fd, bpdata, bp.length(), 0);
697
698   if (r < 0) {
699     int err = errno;
700     dout(0) << "read_header got " << cpp_strerror(err) << dendl;
701     return -err;
702   }
703
704   // don't use bp.zero() here, because it also invalidates
705   // crc cache (which is not yet populated anyway)
706   if (bp.length() != (size_t)r) {
707       // r will be always less or equal than bp.length
708       bpdata += r;
709       memset(bpdata, 0, bp.length() - r);
710   }
711
712   bl.push_back(std::move(bp));
713
714   try {
715     bufferlist::iterator p = bl.begin();
716     ::decode(*hdr, p);
717   }
718   catch (buffer::error& e) {
719     derr << "read_header error decoding journal header" << dendl;
720     return -EINVAL;
721   }
722
723
724   /*
725    * Unfortunately we weren't initializing the flags field for new
726    * journals!  Aie.  This is safe(ish) now that we have only one
727    * flag.  Probably around when we add the next flag we need to
728    * remove this or else this (eventually old) code will clobber newer
729    * code's flags.
730    */
731   if (hdr->flags > 3) {
732     derr << "read_header appears to have gibberish flags; assuming 0" << dendl;
733     hdr->flags = 0;
734   }
735
736   print_header(*hdr);
737
738   return 0;
739 }
740
741 bufferptr FileJournal::prepare_header()
742 {
743   bufferlist bl;
744   {
745     Mutex::Locker l(finisher_lock);
746     header.committed_up_to = journaled_seq;
747   }
748   ::encode(header, bl);
749   bufferptr bp = buffer::create_page_aligned(get_top());
750   // don't use bp.zero() here, because it also invalidates
751   // crc cache (which is not yet populated anyway)
752   char* data = bp.c_str();
753   memcpy(data, bl.c_str(), bl.length());
754   data += bl.length();
755   memset(data, 0, bp.length()-bl.length());
756   return bp;
757 }
758
759 void FileJournal::write_header_sync()
760 {
761   Mutex::Locker locker(write_lock);
762   must_write_header = true;
763   bufferlist bl;
764   do_write(bl);
765   dout(20) << __func__ << " finish" << dendl;
766 }
767
768 int FileJournal::check_for_full(uint64_t seq, off64_t pos, off64_t size)
769 {
770   // already full?
771   if (full_state != FULL_NOTFULL)
772     return -ENOSPC;
773
774   // take 1 byte off so that we only get pos == header.start on EMPTY, never on FULL.
775   off64_t room;
776   if (pos >= header.start)
777     room = (header.max_size - pos) + (header.start - get_top()) - 1;
778   else
779     room = header.start - pos - 1;
780   dout(10) << "room " << room << " max_size " << max_size << " pos " << pos << " header.start " << header.start
781            << " top " << get_top() << dendl;
782
783   if (do_sync_cond) {
784     if (room >= (header.max_size >> 1) &&
785         room - size < (header.max_size >> 1)) {
786       dout(10) << " passing half full mark, triggering commit" << dendl;
787       do_sync_cond->SloppySignal();  // initiate a real commit so we can trim
788     }
789   }
790
791   if (room >= size) {
792     dout(10) << "check_for_full at " << pos << " : " << size << " < " << room << dendl;
793     if (pos + size > header.max_size)
794       must_write_header = true;
795     return 0;
796   }
797
798   // full
799   dout(1) << "check_for_full at " << pos << " : JOURNAL FULL "
800           << pos << " >= " << room
801           << " (max_size " << header.max_size << " start " << header.start << ")"
802           << dendl;
803
804   off64_t max = header.max_size - get_top();
805   if (size > max)
806     dout(0) << "JOURNAL TOO SMALL: continuing, but slow: item " << size << " > journal " << max << " (usable)" << dendl;
807
808   return -ENOSPC;
809 }
810
811 int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytes)
812 {
813   // gather queued writes
814   off64_t queue_pos = write_pos;
815
816   int eleft = cct->_conf->journal_max_write_entries;
817   unsigned bmax = cct->_conf->journal_max_write_bytes;
818
819   if (full_state != FULL_NOTFULL)
820     return -ENOSPC;
821
822   while (!writeq_empty()) {
823     list<write_item> items;
824     batch_pop_write(items);
825     list<write_item>::iterator it = items.begin();
826     while (it != items.end()) {
827       uint64_t bytes = it->bl.length();
828       int r = prepare_single_write(*it, bl, queue_pos, orig_ops, orig_bytes);
829       if (r == 0) { // prepare ok, delete it
830         items.erase(it++);
831 #ifdef HAVE_LIBAIO
832         {
833           Mutex::Locker locker(aio_lock);
834           assert(aio_write_queue_ops > 0);
835           aio_write_queue_ops--;
836           assert(aio_write_queue_bytes >= bytes);
837           aio_write_queue_bytes -= bytes;
838         }
839 #else
840         (void)bytes;
841 #endif
842       }
843       if (r == -ENOSPC) {
844         // the journal maybe full, insert the left item to writeq
845         batch_unpop_write(items);
846         if (orig_ops)
847           goto out;         // commit what we have
848
849         if (logger)
850           logger->inc(l_filestore_journal_full);
851
852         if (wait_on_full) {
853           dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl;
854         } else {
855           dout(20) << "prepare_multi_write full on first entry, restarting journal" << dendl;
856
857           // throw out what we have so far
858           full_state = FULL_FULL;
859           while (!writeq_empty()) {
860             complete_write(1, peek_write().orig_len);
861             pop_write();
862           }
863           print_header(header);
864         }
865
866         return -ENOSPC;  // hrm, full on first op
867       }
868       if (eleft) {
869         if (--eleft == 0) {
870           dout(20) << "prepare_multi_write hit max events per write "
871                    << cct->_conf->journal_max_write_entries << dendl;
872           batch_unpop_write(items);
873           goto out;
874         }
875       }
876       if (bmax) {
877         if (bl.length() >= bmax) {
878           dout(20) << "prepare_multi_write hit max write size "
879                    << cct->_conf->journal_max_write_bytes << dendl;
880           batch_unpop_write(items);
881           goto out;
882         }
883       }
884     }
885   }
886
887 out:
888   dout(20) << "prepare_multi_write queue_pos now " << queue_pos << dendl;
889   assert((write_pos + bl.length() == queue_pos) ||
890          (write_pos + bl.length() - header.max_size + get_top() == queue_pos));
891   return 0;
892 }
893
894 /*
895 void FileJournal::queue_write_fin(uint64_t seq, Context *fin)
896 {
897   writing_seq.push_back(seq);
898   if (!waiting_for_notfull.empty()) {
899     // make sure previously unjournaled stuff waiting for UNFULL triggers
900     // _before_ newly journaled stuff does
901     dout(10) << "queue_write_fin will defer seq " << seq << " callback " << fin
902              << " until after UNFULL" << dendl;
903     C_Gather *g = new C_Gather(writeq.front().fin);
904     writing_fin.push_back(g->new_sub());
905     waiting_for_notfull.push_back(g->new_sub());
906   } else {
907     writing_fin.push_back(writeq.front().fin);
908     dout(20) << "queue_write_fin seq " << seq << " callback " << fin << dendl;
909   }
910 }
911 */
912
913 void FileJournal::queue_completions_thru(uint64_t seq)
914 {
915   assert(finisher_lock.is_locked());
916   utime_t now = ceph_clock_now();
917   list<completion_item> items;
918   batch_pop_completions(items);
919   list<completion_item>::iterator it = items.begin();
920   while (it != items.end()) {
921     completion_item& next = *it;
922     if (next.seq > seq)
923       break;
924     utime_t lat = now;
925     lat -= next.start;
926     dout(10) << "queue_completions_thru seq " << seq
927              << " queueing seq " << next.seq
928              << " " << next.finish
929              << " lat " << lat << dendl;
930     if (logger) {
931       logger->tinc(l_filestore_journal_latency, lat);
932     }
933     if (next.finish)
934       finisher->queue(next.finish);
935     if (next.tracked_op) {
936       next.tracked_op->mark_event("journaled_completion_queued");
937       next.tracked_op->journal_trace.event("queued completion");
938       next.tracked_op->journal_trace.keyval("completed through", seq);
939     }
940     items.erase(it++);
941   }
942   batch_unpop_completions(items);
943   finisher_cond.Signal();
944 }
945
946
947 int FileJournal::prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes)
948 {
949   uint64_t seq = next_write.seq;
950   bufferlist &ebl = next_write.bl;
951   off64_t size = ebl.length();
952
953   int r = check_for_full(seq, queue_pos, size);
954   if (r < 0)
955     return r;   // ENOSPC or EAGAIN
956
957   uint32_t orig_len = next_write.orig_len;
958   orig_bytes += orig_len;
959   orig_ops++;
960
961   // add to write buffer
962   dout(15) << "prepare_single_write " << orig_ops << " will write " << queue_pos << " : seq " << seq
963            << " len " << orig_len << " -> " << size << dendl;
964
965   unsigned seq_offset = offsetof(entry_header_t, seq);
966   unsigned magic1_offset = offsetof(entry_header_t, magic1);
967   unsigned magic2_offset = offsetof(entry_header_t, magic2);
968
969   bufferptr headerptr = ebl.buffers().front();
970   uint64_t _seq = seq;
971   uint64_t _queue_pos = queue_pos;
972   uint64_t magic2 = entry_header_t::make_magic(seq, orig_len, header.get_fsid64());
973   headerptr.copy_in(seq_offset, sizeof(uint64_t), (char *)&_seq);
974   headerptr.copy_in(magic1_offset, sizeof(uint64_t), (char *)&_queue_pos);
975   headerptr.copy_in(magic2_offset, sizeof(uint64_t), (char *)&magic2);
976
977   bufferptr footerptr = ebl.buffers().back();
978   unsigned post_offset  = footerptr.length() - sizeof(entry_header_t);
979   footerptr.copy_in(post_offset + seq_offset, sizeof(uint64_t), (char *)&_seq);
980   footerptr.copy_in(post_offset + magic1_offset, sizeof(uint64_t), (char *)&_queue_pos);
981   footerptr.copy_in(post_offset + magic2_offset, sizeof(uint64_t), (char *)&magic2);
982
983   bl.claim_append(ebl);
984   if (next_write.tracked_op) {
985     next_write.tracked_op->mark_event("write_thread_in_journal_buffer");
986     next_write.tracked_op->journal_trace.event("prepare_single_write");
987   }
988
989   journalq.push_back(pair<uint64_t,off64_t>(seq, queue_pos));
990   writing_seq = seq;
991
992   queue_pos += size;
993   if (queue_pos >= header.max_size)
994     queue_pos = queue_pos + get_top() - header.max_size;
995
996   return 0;
997 }
998
999 void FileJournal::check_align(off64_t pos, bufferlist& bl)
1000 {
1001   // make sure list segments are page aligned
1002   if (directio && !bl.is_aligned_size_and_memory(block_size, CEPH_DIRECTIO_ALIGNMENT)) {
1003     assert((bl.length() & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0);
1004     assert((pos & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0);
1005     assert(0 == "bl was not aligned");
1006   }
1007 }
1008
1009 int FileJournal::write_bl(off64_t& pos, bufferlist& bl)
1010 {
1011   int ret;
1012
1013   off64_t spos = ::lseek64(fd, pos, SEEK_SET);
1014   if (spos < 0) {
1015     ret = -errno;
1016     derr << "FileJournal::write_bl : lseek64 failed " << cpp_strerror(ret) << dendl;
1017     return ret;
1018   }
1019   ret = bl.write_fd(fd);
1020   if (ret) {
1021     derr << "FileJournal::write_bl : write_fd failed: " << cpp_strerror(ret) << dendl;
1022     return ret;
1023   }
1024   pos += bl.length();
1025   if (pos == header.max_size)
1026     pos = get_top();
1027   return 0;
1028 }
1029
1030 void FileJournal::do_write(bufferlist& bl)
1031 {
1032   // nothing to do?
1033   if (bl.length() == 0 && !must_write_header)
1034     return;
1035
1036   buffer::ptr hbp;
1037   if (cct->_conf->journal_write_header_frequency &&
1038       (((++journaled_since_start) %
1039         cct->_conf->journal_write_header_frequency) == 0)) {
1040     must_write_header = true;
1041   }
1042
1043   if (must_write_header) {
1044     must_write_header = false;
1045     hbp = prepare_header();
1046   }
1047
1048   dout(15) << "do_write writing " << write_pos << "~" << bl.length()
1049            << (hbp.length() ? " + header":"")
1050            << dendl;
1051
1052   utime_t from = ceph_clock_now();
1053
1054   // entry
1055   off64_t pos = write_pos;
1056
1057   // Adjust write_pos
1058   write_pos += bl.length();
1059   if (write_pos >= header.max_size)
1060     write_pos = write_pos - header.max_size + get_top();
1061
1062   write_lock.Unlock();
1063
1064   // split?
1065   off64_t split = 0;
1066   if (pos + bl.length() > header.max_size) {
1067     bufferlist first, second;
1068     split = header.max_size - pos;
1069     first.substr_of(bl, 0, split);
1070     second.substr_of(bl, split, bl.length() - split);
1071     assert(first.length() + second.length() == bl.length());
1072     dout(10) << "do_write wrapping, first bit at " << pos << " len " << first.length()
1073              << " second bit len " << second.length() << " (orig len " << bl.length() << ")" << dendl;
1074
1075     //Save pos to write first piece second
1076     off64_t first_pos = pos;
1077     off64_t orig_pos;
1078     pos = get_top();
1079     // header too?
1080     if (hbp.length()) {
1081       // be sneaky: include the header in the second fragment
1082       second.push_front(hbp);
1083       pos = 0;          // we included the header
1084     }
1085     // Write the second portion first possible with the header, so
1086     // do_read_entry() won't even get a valid entry_header_t if there
1087     // is a crash between the two writes.
1088     orig_pos = pos;
1089     if (write_bl(pos, second)) {
1090       derr << "FileJournal::do_write: write_bl(pos=" << orig_pos
1091            << ") failed" << dendl;
1092       check_align(pos, second);
1093       ceph_abort();
1094     }
1095     orig_pos = first_pos;
1096     if (write_bl(first_pos, first)) {
1097       derr << "FileJournal::do_write: write_bl(pos=" << orig_pos
1098            << ") failed" << dendl;
1099       check_align(first_pos, first);
1100       ceph_abort();
1101     }
1102     assert(first_pos == get_top());
1103   } else {
1104     // header too?
1105     if (hbp.length()) {
1106       if (TEMP_FAILURE_RETRY(::pwrite(fd, hbp.c_str(), hbp.length(), 0)) < 0) {
1107         int err = errno;
1108         derr << "FileJournal::do_write: pwrite(fd=" << fd
1109              << ", hbp.length=" << hbp.length() << ") failed :"
1110              << cpp_strerror(err) << dendl;
1111         ceph_abort();
1112       }
1113     }
1114
1115     if (write_bl(pos, bl)) {
1116       derr << "FileJournal::do_write: write_bl(pos=" << pos
1117            << ") failed" << dendl;
1118       check_align(pos, bl);
1119       ceph_abort();
1120     }
1121   }
1122
1123   if (!directio) {
1124     dout(20) << "do_write fsync" << dendl;
1125
1126     /*
1127      * We'd really love to have a fsync_range or fdatasync_range and do a:
1128      *
1129      *  if (split) {
1130      *    ::fsync_range(fd, header.max_size - split, split)l
1131      *    ::fsync_range(fd, get_top(), bl.length() - split);
1132      *  else
1133      *    ::fsync_range(fd, write_pos, bl.length())
1134      *
1135      * NetBSD and AIX apparently have it, and adding it to Linux wouldn't be
1136      * too hard given all the underlying infrastructure already exist.
1137      *
1138      * NOTE: using sync_file_range here would not be safe as it does not
1139      * flush disk caches or commits any sort of metadata.
1140      */
1141     int ret = 0;
1142 #if defined(DARWIN) || defined(__FreeBSD__)
1143     ret = ::fsync(fd);
1144 #else
1145     ret = ::fdatasync(fd);
1146 #endif
1147     if (ret < 0) {
1148       derr << __func__ << " fsync/fdatasync failed: " << cpp_strerror(errno) << dendl;
1149       ceph_abort();
1150     }
1151 #ifdef HAVE_POSIX_FADVISE
1152     if (cct->_conf->filestore_fadvise)
1153       posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED);
1154 #endif
1155   }
1156
1157   utime_t lat = ceph_clock_now() - from;
1158   dout(20) << "do_write latency " << lat << dendl;
1159
1160   write_lock.Lock();
1161
1162   assert(write_pos == pos);
1163   assert(write_pos % header.alignment == 0);
1164
1165   {
1166     Mutex::Locker locker(finisher_lock);
1167     journaled_seq = writing_seq;
1168
1169     // kick finisher?
1170     //  only if we haven't filled up recently!
1171     if (full_state != FULL_NOTFULL) {
1172       dout(10) << "do_write NOT queueing finisher seq " << journaled_seq
1173                << ", full_commit_seq|full_restart_seq" << dendl;
1174     } else {
1175       if (plug_journal_completions) {
1176         dout(20) << "do_write NOT queueing finishers through seq " << journaled_seq
1177                  << " due to completion plug" << dendl;
1178       } else {
1179         dout(20) << "do_write queueing finishers through seq " << journaled_seq << dendl;
1180         queue_completions_thru(journaled_seq);
1181       }
1182     }
1183   }
1184 }
1185
1186 void FileJournal::flush()
1187 {
1188   dout(10) << "waiting for completions to empty" << dendl;
1189   {
1190     Mutex::Locker l(finisher_lock);
1191     while (!completions_empty())
1192       finisher_cond.Wait(finisher_lock);
1193   }
1194   dout(10) << "flush waiting for finisher" << dendl;
1195   finisher->wait_for_empty();
1196   dout(10) << "flush done" << dendl;
1197 }
1198
1199
1200 void FileJournal::write_thread_entry()
1201 {
1202   dout(10) << "write_thread_entry start" << dendl;
1203   while (1) {
1204     {
1205       Mutex::Locker locker(writeq_lock);
1206       if (writeq.empty() && !must_write_header) {
1207         if (write_stop)
1208           break;
1209         dout(20) << "write_thread_entry going to sleep" << dendl;
1210         writeq_cond.Wait(writeq_lock);
1211         dout(20) << "write_thread_entry woke up" << dendl;
1212         continue;
1213       }
1214     }
1215
1216 #ifdef HAVE_LIBAIO
1217     if (aio) {
1218       Mutex::Locker locker(aio_lock);
1219       // should we back off to limit aios in flight?  try to do this
1220       // adaptively so that we submit larger aios once we have lots of
1221       // them in flight.
1222       //
1223       // NOTE: our condition here is based on aio_num (protected by
1224       // aio_lock) and throttle_bytes (part of the write queue).  when
1225       // we sleep, we *only* wait for aio_num to change, and do not
1226       // wake when more data is queued.  this is not strictly correct,
1227       // but should be fine given that we will have plenty of aios in
1228       // flight if we hit this limit to ensure we keep the device
1229       // saturated.
1230       while (aio_num > 0) {
1231         int exp = MIN(aio_num * 2, 24);
1232         long unsigned min_new = 1ull << exp;
1233         uint64_t cur = aio_write_queue_bytes;
1234         dout(20) << "write_thread_entry aio throttle: aio num " << aio_num << " bytes " << aio_bytes
1235                  << " ... exp " << exp << " min_new " << min_new
1236                  << " ... pending " << cur << dendl;
1237         if (cur >= min_new)
1238           break;
1239         dout(20) << "write_thread_entry deferring until more aios complete: "
1240                  << aio_num << " aios with " << aio_bytes << " bytes needs " << min_new
1241                  << " bytes to start a new aio (currently " << cur << " pending)" << dendl;
1242         aio_cond.Wait(aio_lock);
1243         dout(20) << "write_thread_entry woke up" << dendl;
1244       }
1245     }
1246 #endif
1247
1248     Mutex::Locker locker(write_lock);
1249     uint64_t orig_ops = 0;
1250     uint64_t orig_bytes = 0;
1251
1252     bufferlist bl;
1253     int r = prepare_multi_write(bl, orig_ops, orig_bytes);
1254     // Don't care about journal full if stoppping, so drop queue and
1255     // possibly let header get written and loop above to notice stop
1256     if (r == -ENOSPC) {
1257       if (write_stop) {
1258         dout(20) << "write_thread_entry full and stopping, throw out queue and finish up" << dendl;
1259         while (!writeq_empty()) {
1260           complete_write(1, peek_write().orig_len);
1261           pop_write();
1262         }
1263         print_header(header);
1264         r = 0;
1265       } else {
1266         dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl;
1267         commit_cond.Wait(write_lock);
1268         dout(20) << "write_thread_entry woke up" << dendl;
1269         continue;
1270       }
1271     }
1272     assert(r == 0);
1273
1274     if (logger) {
1275       logger->inc(l_filestore_journal_wr);
1276       logger->inc(l_filestore_journal_wr_bytes, bl.length());
1277     }
1278
1279 #ifdef HAVE_LIBAIO
1280     if (aio)
1281       do_aio_write(bl);
1282     else
1283       do_write(bl);
1284 #else
1285     do_write(bl);
1286 #endif
1287     complete_write(orig_ops, orig_bytes);
1288   }
1289
1290   dout(10) << "write_thread_entry finish" << dendl;
1291 }
1292
1293 #ifdef HAVE_LIBAIO
1294 void FileJournal::do_aio_write(bufferlist& bl)
1295 {
1296
1297   if (cct->_conf->journal_write_header_frequency &&
1298       (((++journaled_since_start) %
1299         cct->_conf->journal_write_header_frequency) == 0)) {
1300     must_write_header = true;
1301   }
1302
1303   // nothing to do?
1304   if (bl.length() == 0 && !must_write_header)
1305     return;
1306
1307   buffer::ptr hbp;
1308   if (must_write_header) {
1309     must_write_header = false;
1310     hbp = prepare_header();
1311   }
1312
1313   // entry
1314   off64_t pos = write_pos;
1315
1316   dout(15) << "do_aio_write writing " << pos << "~" << bl.length()
1317            << (hbp.length() ? " + header":"")
1318            << dendl;
1319
1320   // split?
1321   off64_t split = 0;
1322   if (pos + bl.length() > header.max_size) {
1323     bufferlist first, second;
1324     split = header.max_size - pos;
1325     first.substr_of(bl, 0, split);
1326     second.substr_of(bl, split, bl.length() - split);
1327     assert(first.length() + second.length() == bl.length());
1328     dout(10) << "do_aio_write wrapping, first bit at " << pos << "~" << first.length() << dendl;
1329
1330     if (write_aio_bl(pos, first, 0)) {
1331       derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1332            << ") failed" << dendl;
1333       ceph_abort();
1334     }
1335     assert(pos == header.max_size);
1336     if (hbp.length()) {
1337       // be sneaky: include the header in the second fragment
1338       second.push_front(hbp);
1339       pos = 0;          // we included the header
1340     } else
1341       pos = get_top();  // no header, start after that
1342     if (write_aio_bl(pos, second, writing_seq)) {
1343       derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1344            << ") failed" << dendl;
1345       ceph_abort();
1346     }
1347   } else {
1348     // header too?
1349     if (hbp.length()) {
1350       bufferlist hbl;
1351       hbl.push_back(hbp);
1352       loff_t pos = 0;
1353       if (write_aio_bl(pos, hbl, 0)) {
1354         derr << "FileJournal::do_aio_write: write_aio_bl(header) failed" << dendl;
1355         ceph_abort();
1356       }
1357     }
1358
1359     if (write_aio_bl(pos, bl, writing_seq)) {
1360       derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1361            << ") failed" << dendl;
1362       ceph_abort();
1363     }
1364   }
1365
1366   write_pos = pos;
1367   if (write_pos == header.max_size)
1368     write_pos = get_top();
1369   assert(write_pos % header.alignment == 0);
1370 }
1371
1372 /**
1373  * write a buffer using aio
1374  *
1375  * @param seq seq to trigger when this aio completes.  if 0, do not update any state
1376  * on completion.
1377  */
1378 int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq)
1379 {
1380   dout(20) << "write_aio_bl " << pos << "~" << bl.length() << " seq " << seq << dendl;
1381
1382   while (bl.length() > 0) {
1383     int max = MIN(bl.get_num_buffers(), IOV_MAX-1);
1384     iovec *iov = new iovec[max];
1385     int n = 0;
1386     unsigned len = 0;
1387     for (std::list<buffer::ptr>::const_iterator p = bl.buffers().begin();
1388          n < max;
1389          ++p, ++n) {
1390       assert(p != bl.buffers().end());
1391       iov[n].iov_base = (void *)p->c_str();
1392       iov[n].iov_len = p->length();
1393       len += p->length();
1394     }
1395
1396     bufferlist tbl;
1397     bl.splice(0, len, &tbl);  // move bytes from bl -> tbl
1398
1399     // lock only aio_queue, current aio, aio_num, aio_bytes, which may be
1400     // modified in check_aio_completion
1401     aio_lock.Lock();
1402     aio_queue.push_back(aio_info(tbl, pos, bl.length() > 0 ? 0 : seq));
1403     aio_info& aio = aio_queue.back();
1404     aio.iov = iov;
1405
1406     io_prep_pwritev(&aio.iocb, fd, aio.iov, n, pos);
1407
1408     dout(20) << "write_aio_bl .. " << aio.off << "~" << aio.len
1409              << " in " << n << dendl;
1410
1411     aio_num++;
1412     aio_bytes += aio.len;
1413
1414     // need to save current aio len to update write_pos later because current
1415     // aio could be ereased from aio_queue once it is done
1416     uint64_t cur_len = aio.len;
1417     // unlock aio_lock because following io_submit might take time to return
1418     aio_lock.Unlock();
1419
1420     iocb *piocb = &aio.iocb;
1421
1422     // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds
1423     int attempts = 16;
1424     int delay = 125;
1425     do {
1426       int r = io_submit(aio_ctx, 1, &piocb);
1427       dout(20) << "write_aio_bl io_submit return value: " << r << dendl;
1428       if (r < 0) {
1429         derr << "io_submit to " << aio.off << "~" << cur_len
1430              << " got " << cpp_strerror(r) << dendl;
1431         if (r == -EAGAIN && attempts-- > 0) {
1432           usleep(delay);
1433           delay *= 2;
1434           continue;
1435         }
1436         check_align(pos, tbl);
1437         assert(0 == "io_submit got unexpected error");
1438       } else {
1439         break;
1440       }
1441     } while (true);
1442     pos += cur_len;
1443   }
1444   aio_lock.Lock();
1445   write_finish_cond.Signal();
1446   aio_lock.Unlock();
1447   return 0;
1448 }
1449 #endif
1450
1451 void FileJournal::write_finish_thread_entry()
1452 {
1453 #ifdef HAVE_LIBAIO
1454   dout(10) << "write_finish_thread_entry enter" << dendl;
1455   while (true) {
1456     {
1457       Mutex::Locker locker(aio_lock);
1458       if (aio_queue.empty()) {
1459         if (aio_stop)
1460           break;
1461         dout(20) << "write_finish_thread_entry sleeping" << dendl;
1462         write_finish_cond.Wait(aio_lock);
1463         continue;
1464       }
1465     }
1466
1467     dout(20) << "write_finish_thread_entry waiting for aio(s)" << dendl;
1468     io_event event[16];
1469     int r = io_getevents(aio_ctx, 1, 16, event, NULL);
1470     if (r < 0) {
1471       if (r == -EINTR) {
1472         dout(0) << "io_getevents got " << cpp_strerror(r) << dendl;
1473         continue;
1474       }
1475       derr << "io_getevents got " << cpp_strerror(r) << dendl;
1476       assert(0 == "got unexpected error from io_getevents");
1477     }
1478
1479     {
1480       Mutex::Locker locker(aio_lock);
1481       for (int i=0; i<r; i++) {
1482         aio_info *ai = (aio_info *)event[i].obj;
1483         if (event[i].res != ai->len) {
1484           derr << "aio to " << ai->off << "~" << ai->len
1485                << " returned: " << (int)event[i].res << dendl;
1486           assert(0 == "unexpected aio error");
1487         }
1488         dout(10) << "write_finish_thread_entry aio " << ai->off
1489                  << "~" << ai->len << " done" << dendl;
1490         ai->done = true;
1491       }
1492       check_aio_completion();
1493     }
1494   }
1495   dout(10) << "write_finish_thread_entry exit" << dendl;
1496 #endif
1497 }
1498
1499 #ifdef HAVE_LIBAIO
1500 /**
1501  * check aio_wait for completed aio, and update state appropriately.
1502  */
1503 void FileJournal::check_aio_completion()
1504 {
1505   assert(aio_lock.is_locked());
1506   dout(20) << "check_aio_completion" << dendl;
1507
1508   bool completed_something = false, signal = false;
1509   uint64_t new_journaled_seq = 0;
1510
1511   list<aio_info>::iterator p = aio_queue.begin();
1512   while (p != aio_queue.end() && p->done) {
1513     dout(20) << "check_aio_completion completed seq " << p->seq << " "
1514              << p->off << "~" << p->len << dendl;
1515     if (p->seq) {
1516       new_journaled_seq = p->seq;
1517       completed_something = true;
1518     }
1519     aio_num--;
1520     aio_bytes -= p->len;
1521     aio_queue.erase(p++);
1522     signal = true;
1523   }
1524
1525   if (completed_something) {
1526     // kick finisher?
1527     //  only if we haven't filled up recently!
1528     Mutex::Locker locker(finisher_lock);
1529     journaled_seq = new_journaled_seq;
1530     if (full_state != FULL_NOTFULL) {
1531       dout(10) << "check_aio_completion NOT queueing finisher seq " << journaled_seq
1532                << ", full_commit_seq|full_restart_seq" << dendl;
1533     } else {
1534       if (plug_journal_completions) {
1535         dout(20) << "check_aio_completion NOT queueing finishers through seq " << journaled_seq
1536                  << " due to completion plug" << dendl;
1537       } else {
1538         dout(20) << "check_aio_completion queueing finishers through seq " << journaled_seq << dendl;
1539         queue_completions_thru(journaled_seq);
1540       }
1541     }
1542   }
1543   if (signal) {
1544     // maybe write queue was waiting for aio count to drop?
1545     aio_cond.Signal();
1546   }
1547 }
1548 #endif
1549
1550 int FileJournal::prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist* tbl) {
1551   dout(10) << "prepare_entry " << tls << dendl;
1552   int data_len = cct->_conf->journal_align_min_size - 1;
1553   int data_align = -1; // -1 indicates that we don't care about the alignment
1554   bufferlist bl;
1555   for (vector<ObjectStore::Transaction>::iterator p = tls.begin();
1556       p != tls.end(); ++p) {
1557    if ((int)(*p).get_data_length() > data_len) {
1558      data_len = (*p).get_data_length();
1559      data_align = ((*p).get_data_alignment() - bl.length()) & ~CEPH_PAGE_MASK;
1560     }
1561     ::encode(*p, bl);
1562   }
1563   if (tbl->length()) {
1564     bl.claim_append(*tbl);
1565   }
1566   // add it this entry
1567   entry_header_t h;
1568   unsigned head_size = sizeof(entry_header_t);
1569   off64_t base_size = 2*head_size + bl.length();
1570   memset(&h, 0, sizeof(h));
1571   if (data_align >= 0)
1572     h.pre_pad = ((unsigned int)data_align - (unsigned int)head_size) & ~CEPH_PAGE_MASK;
1573   off64_t size = ROUND_UP_TO(base_size + h.pre_pad, header.alignment);
1574   unsigned post_pad = size - base_size - h.pre_pad;
1575   h.len = bl.length();
1576   h.post_pad = post_pad;
1577   h.crc32c = bl.crc32c(0);
1578   dout(10) << " len " << bl.length() << " -> " << size
1579        << " (head " << head_size << " pre_pad " << h.pre_pad
1580        << " bl " << bl.length() << " post_pad " << post_pad << " tail " << head_size << ")"
1581        << " (bl alignment " << data_align << ")"
1582        << dendl;
1583   bufferlist ebl;
1584   // header
1585   ebl.append((const char*)&h, sizeof(h));
1586   if (h.pre_pad) {
1587     ebl.push_back(buffer::create_static(h.pre_pad, zero_buf));
1588   }
1589   // payload
1590   ebl.claim_append(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy
1591   if (h.post_pad) {
1592     ebl.push_back(buffer::create_static(h.post_pad, zero_buf));
1593   }
1594   // footer
1595   ebl.append((const char*)&h, sizeof(h));
1596   if (directio)
1597     ebl.rebuild_aligned(CEPH_DIRECTIO_ALIGNMENT);
1598   tbl->claim(ebl);
1599   return h.len;
1600 }
1601
1602 void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len,
1603                                Context *oncommit, TrackedOpRef osd_op)
1604 {
1605   // dump on queue
1606   dout(5) << "submit_entry seq " << seq
1607           << " len " << e.length()
1608           << " (" << oncommit << ")" << dendl;
1609   assert(e.length() > 0);
1610   assert(e.length() < header.max_size);
1611
1612   if (osd_op)
1613     osd_op->mark_event("commit_queued_for_journal_write");
1614   if (logger) {
1615     logger->inc(l_filestore_journal_queue_bytes, orig_len);
1616     logger->inc(l_filestore_journal_queue_ops, 1);
1617   }
1618
1619   throttle.register_throttle_seq(seq, e.length());
1620   if (logger) {
1621     logger->inc(l_filestore_journal_ops, 1);
1622     logger->inc(l_filestore_journal_bytes, e.length());
1623   }
1624
1625   if (osd_op) {
1626     osd_op->mark_event("commit_queued_for_journal_write");
1627     if (osd_op->store_trace) {
1628       osd_op->journal_trace.init("journal", &trace_endpoint, &osd_op->store_trace);
1629       osd_op->journal_trace.event("submit_entry");
1630       osd_op->journal_trace.keyval("seq", seq);
1631     }
1632   }
1633   {
1634     Mutex::Locker l1(writeq_lock);
1635 #ifdef HAVE_LIBAIO
1636     Mutex::Locker l2(aio_lock);
1637 #endif
1638     Mutex::Locker l3(completions_lock);
1639
1640 #ifdef HAVE_LIBAIO
1641     aio_write_queue_ops++;
1642     aio_write_queue_bytes += e.length();
1643     aio_cond.Signal();
1644 #endif
1645
1646     completions.push_back(
1647       completion_item(
1648         seq, oncommit, ceph_clock_now(), osd_op));
1649     if (writeq.empty())
1650       writeq_cond.Signal();
1651     writeq.push_back(write_item(seq, e, orig_len, osd_op));
1652     if (osd_op)
1653       osd_op->journal_trace.keyval("queue depth", writeq.size());
1654   }
1655 }
1656
1657 bool FileJournal::writeq_empty()
1658 {
1659   Mutex::Locker locker(writeq_lock);
1660   return writeq.empty();
1661 }
1662
1663 FileJournal::write_item &FileJournal::peek_write()
1664 {
1665   assert(write_lock.is_locked());
1666   Mutex::Locker locker(writeq_lock);
1667   return writeq.front();
1668 }
1669
1670 void FileJournal::pop_write()
1671 {
1672   assert(write_lock.is_locked());
1673   Mutex::Locker locker(writeq_lock);
1674   if (logger) {
1675     logger->dec(l_filestore_journal_queue_bytes, writeq.front().orig_len);
1676     logger->dec(l_filestore_journal_queue_ops, 1);
1677   }
1678   writeq.pop_front();
1679 }
1680
1681 void FileJournal::batch_pop_write(list<write_item> &items)
1682 {
1683   assert(write_lock.is_locked());
1684   {
1685     Mutex::Locker locker(writeq_lock);
1686     writeq.swap(items);
1687   }
1688   for (auto &&i : items) {
1689     if (logger) {
1690       logger->dec(l_filestore_journal_queue_bytes, i.orig_len);
1691       logger->dec(l_filestore_journal_queue_ops, 1);
1692     }
1693   }
1694 }
1695
1696 void FileJournal::batch_unpop_write(list<write_item> &items)
1697 {
1698   assert(write_lock.is_locked());
1699   for (auto &&i : items) {
1700     if (logger) {
1701       logger->inc(l_filestore_journal_queue_bytes, i.orig_len);
1702       logger->inc(l_filestore_journal_queue_ops, 1);
1703     }
1704   }
1705   Mutex::Locker locker(writeq_lock);
1706   writeq.splice(writeq.begin(), items);
1707 }
1708
1709 void FileJournal::commit_start(uint64_t seq)
1710 {
1711   dout(10) << "commit_start" << dendl;
1712
1713   // was full?
1714   switch (full_state) {
1715   case FULL_NOTFULL:
1716     break; // all good
1717
1718   case FULL_FULL:
1719     if (seq >= journaled_seq) {
1720       dout(1) << " FULL_FULL -> FULL_WAIT.  commit_start on seq "
1721               << seq << " > journaled_seq " << journaled_seq
1722               << ", moving to FULL_WAIT."
1723               << dendl;
1724       full_state = FULL_WAIT;
1725     } else {
1726       dout(1) << "FULL_FULL commit_start on seq "
1727               << seq << " < journaled_seq " << journaled_seq
1728               << ", remaining in FULL_FULL"
1729               << dendl;
1730     }
1731     break;
1732
1733   case FULL_WAIT:
1734     dout(1) << " FULL_WAIT -> FULL_NOTFULL.  journal now active, setting completion plug." << dendl;
1735     full_state = FULL_NOTFULL;
1736     plug_journal_completions = true;
1737     break;
1738   }
1739 }
1740
1741 /*
1742  *send discard command to joural block deivce
1743  */
1744 void FileJournal::do_discard(int64_t offset, int64_t end)
1745 {
1746   dout(10) << __func__ << "trim(" << offset << ", " << end << dendl;
1747
1748   offset = ROUND_UP_TO(offset, block_size);
1749   if (offset >= end)
1750     return;
1751   end = ROUND_UP_TO(end - block_size, block_size);
1752   assert(end >= offset);
1753   if (offset < end)
1754     if (block_device_discard(fd, offset, end - offset) < 0)
1755         dout(1) << __func__ << "ioctl(BLKDISCARD) error:" << cpp_strerror(errno) << dendl;
1756 }
1757
1758 void FileJournal::committed_thru(uint64_t seq)
1759 {
1760   Mutex::Locker locker(write_lock);
1761
1762   auto released = throttle.flush(seq);
1763   if (logger) {
1764     logger->dec(l_filestore_journal_ops, released.first);
1765     logger->dec(l_filestore_journal_bytes, released.second);
1766   }
1767
1768   if (seq < last_committed_seq) {
1769     dout(5) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl;
1770     assert(seq >= last_committed_seq);
1771     return;
1772   }
1773   if (seq == last_committed_seq) {
1774     dout(5) << "committed_thru " << seq << " == last_committed_seq " << last_committed_seq << dendl;
1775     return;
1776   }
1777
1778   dout(5) << "committed_thru " << seq << " (last_committed_seq " << last_committed_seq << ")" << dendl;
1779   last_committed_seq = seq;
1780
1781   // completions!
1782   {
1783     Mutex::Locker locker(finisher_lock);
1784     queue_completions_thru(seq);
1785     if (plug_journal_completions && seq >= header.start_seq) {
1786       dout(10) << " removing completion plug, queuing completions thru journaled_seq " << journaled_seq << dendl;
1787       plug_journal_completions = false;
1788       queue_completions_thru(journaled_seq);
1789     }
1790   }
1791
1792   // adjust start pointer
1793   while (!journalq.empty() && journalq.front().first <= seq) {
1794     journalq.pop_front();
1795   }
1796
1797   int64_t old_start = header.start;
1798   if (!journalq.empty()) {
1799     header.start = journalq.front().second;
1800     header.start_seq = journalq.front().first;
1801   } else {
1802     header.start = write_pos;
1803     header.start_seq = seq + 1;
1804   }
1805
1806   if (discard) {
1807     dout(10) << __func__  << " will trim (" << old_start << ", " << header.start << ")" << dendl;
1808     if (old_start < header.start)
1809       do_discard(old_start, header.start - 1);
1810     else {
1811       do_discard(old_start, header.max_size - 1);
1812       do_discard(get_top(), header.start - 1);
1813     }
1814   }
1815
1816   must_write_header = true;
1817   print_header(header);
1818
1819   // committed but unjournaled items
1820   while (!writeq_empty() && peek_write().seq <= seq) {
1821     dout(15) << " dropping committed but unwritten seq " << peek_write().seq
1822              << " len " << peek_write().bl.length()
1823              << dendl;
1824     complete_write(1, peek_write().orig_len);
1825     pop_write();
1826   }
1827
1828   commit_cond.Signal();
1829
1830   dout(10) << "committed_thru done" << dendl;
1831 }
1832
1833
1834 void FileJournal::complete_write(uint64_t ops, uint64_t bytes)
1835 {
1836   dout(5) << __func__ << " finished " << ops << " ops and "
1837           << bytes << " bytes" << dendl;
1838 }
1839
1840 int FileJournal::make_writeable()
1841 {
1842   dout(10) << __func__ << dendl;
1843   int r = set_throttle_params();
1844   if (r < 0)
1845     return r;
1846
1847   r = _open(true);
1848   if (r < 0)
1849     return r;
1850
1851   if (read_pos > 0)
1852     write_pos = read_pos;
1853   else
1854     write_pos = get_top();
1855   read_pos = 0;
1856
1857   must_write_header = true;
1858
1859   start_writer();
1860   return 0;
1861 }
1862
1863 int FileJournal::set_throttle_params()
1864 {
1865   stringstream ss;
1866   bool valid = throttle.set_params(
1867     cct->_conf->journal_throttle_low_threshhold,
1868     cct->_conf->journal_throttle_high_threshhold,
1869     cct->_conf->filestore_expected_throughput_bytes,
1870     cct->_conf->journal_throttle_high_multiple,
1871     cct->_conf->journal_throttle_max_multiple,
1872     header.max_size - get_top(),
1873     &ss);
1874
1875   if (!valid) {
1876     derr << "tried to set invalid params: "
1877          << ss.str()
1878          << dendl;
1879   }
1880   return valid ? 0 : -EINVAL;
1881 }
1882
1883 const char** FileJournal::get_tracked_conf_keys() const
1884 {
1885   static const char *KEYS[] = {
1886     "journal_throttle_low_threshhold",
1887     "journal_throttle_high_threshhold",
1888     "journal_throttle_high_multiple",
1889     "journal_throttle_max_multiple",
1890     "filestore_expected_throughput_bytes",
1891     NULL};
1892   return KEYS;
1893 }
1894
1895 void FileJournal::wrap_read_bl(
1896   off64_t pos,
1897   int64_t olen,
1898   bufferlist* bl,
1899   off64_t *out_pos
1900   ) const
1901 {
1902   while (olen > 0) {
1903     while (pos >= header.max_size)
1904       pos = pos + get_top() - header.max_size;
1905
1906     int64_t len;
1907     if (pos + olen > header.max_size)
1908       len = header.max_size - pos;        // partial
1909     else
1910       len = olen;                         // rest
1911
1912     int64_t actual = ::lseek64(fd, pos, SEEK_SET);
1913     assert(actual == pos);
1914
1915     bufferptr bp = buffer::create(len);
1916     int r = safe_read_exact(fd, bp.c_str(), len);
1917     if (r) {
1918       derr << "FileJournal::wrap_read_bl: safe_read_exact " << pos << "~" << len << " returned "
1919            << r << dendl;
1920       ceph_abort();
1921     }
1922     bl->push_back(std::move(bp));
1923     pos += len;
1924     olen -= len;
1925   }
1926   if (pos >= header.max_size)
1927     pos = pos + get_top() - header.max_size;
1928   if (out_pos)
1929     *out_pos = pos;
1930 }
1931
1932 bool FileJournal::read_entry(
1933   bufferlist &bl,
1934   uint64_t &next_seq,
1935   bool *corrupt)
1936 {
1937   if (corrupt)
1938     *corrupt = false;
1939   uint64_t seq = next_seq;
1940
1941   if (!read_pos) {
1942     dout(2) << "read_entry -- not readable" << dendl;
1943     return false;
1944   }
1945
1946   off64_t pos = read_pos;
1947   off64_t next_pos = pos;
1948   stringstream ss;
1949   read_entry_result result = do_read_entry(
1950     pos,
1951     &next_pos,
1952     &bl,
1953     &seq,
1954     &ss);
1955   if (result == SUCCESS) {
1956     journalq.push_back( pair<uint64_t,off64_t>(seq, pos));
1957     uint64_t amount_to_take =
1958       next_pos > pos ?
1959       next_pos - pos :
1960       (header.max_size - pos) + (next_pos - get_top());
1961     throttle.take(amount_to_take);
1962     throttle.register_throttle_seq(next_seq, amount_to_take);
1963     if (logger) {
1964       logger->inc(l_filestore_journal_ops, 1);
1965       logger->inc(l_filestore_journal_bytes, amount_to_take);
1966     }
1967     if (next_seq > seq) {
1968       return false;
1969     } else {
1970       read_pos = next_pos;
1971       next_seq = seq;
1972       if (seq > journaled_seq)
1973         journaled_seq = seq;
1974       return true;
1975     }
1976   } else {
1977     derr << "do_read_entry(" << pos << "): " << ss.str() << dendl;
1978   }
1979
1980   if (seq && seq < header.committed_up_to) {
1981     derr << "Unable to read past sequence " << seq
1982          << " but header indicates the journal has committed up through "
1983          << header.committed_up_to << ", journal is corrupt" << dendl;
1984     if (cct->_conf->journal_ignore_corruption) {
1985       if (corrupt)
1986         *corrupt = true;
1987       return false;
1988     } else {
1989       ceph_abort();
1990     }
1991   }
1992
1993   dout(2) << "No further valid entries found, journal is most likely valid"
1994           << dendl;
1995   return false;
1996 }
1997
1998 FileJournal::read_entry_result FileJournal::do_read_entry(
1999   off64_t init_pos,
2000   off64_t *next_pos,
2001   bufferlist *bl,
2002   uint64_t *seq,
2003   ostream *ss,
2004   entry_header_t *_h) const
2005 {
2006   off64_t cur_pos = init_pos;
2007   bufferlist _bl;
2008   if (!bl)
2009     bl = &_bl;
2010
2011   // header
2012   entry_header_t *h;
2013   bufferlist hbl;
2014   off64_t _next_pos;
2015   wrap_read_bl(cur_pos, sizeof(*h), &hbl, &_next_pos);
2016   h = reinterpret_cast<entry_header_t *>(hbl.c_str());
2017
2018   if (!h->check_magic(cur_pos, header.get_fsid64())) {
2019     dout(25) << "read_entry " << init_pos
2020              << " : bad header magic, end of journal" << dendl;
2021     if (ss)
2022       *ss << "bad header magic";
2023     if (next_pos)
2024       *next_pos = init_pos + (4<<10); // check 4k ahead
2025     return MAYBE_CORRUPT;
2026   }
2027   cur_pos = _next_pos;
2028
2029   // pad + body + pad
2030   if (h->pre_pad)
2031     cur_pos += h->pre_pad;
2032
2033   bl->clear();
2034   wrap_read_bl(cur_pos, h->len, bl, &cur_pos);
2035
2036   if (h->post_pad)
2037     cur_pos += h->post_pad;
2038
2039   // footer
2040   entry_header_t *f;
2041   bufferlist fbl;
2042   wrap_read_bl(cur_pos, sizeof(*f), &fbl, &cur_pos);
2043   f = reinterpret_cast<entry_header_t *>(fbl.c_str());
2044   if (memcmp(f, h, sizeof(*f))) {
2045     if (ss)
2046       *ss << "bad footer magic, partial entry";
2047     if (next_pos)
2048       *next_pos = cur_pos;
2049     return MAYBE_CORRUPT;
2050   }
2051
2052   if ((header.flags & header_t::FLAG_CRC) ||   // if explicitly enabled (new journal)
2053       h->crc32c != 0) {                        // newer entry in old journal
2054     uint32_t actual_crc = bl->crc32c(0);
2055     if (actual_crc != h->crc32c) {
2056       if (ss)
2057         *ss << "header crc (" << h->crc32c
2058             << ") doesn't match body crc (" << actual_crc << ")";
2059       if (next_pos)
2060         *next_pos = cur_pos;
2061       return MAYBE_CORRUPT;
2062     }
2063   }
2064
2065   // yay!
2066   dout(2) << "read_entry " << init_pos << " : seq " << h->seq
2067           << " " << h->len << " bytes"
2068           << dendl;
2069
2070   // ok!
2071   if (seq)
2072     *seq = h->seq;
2073
2074
2075   if (next_pos)
2076     *next_pos = cur_pos;
2077
2078   if (_h)
2079     *_h = *h;
2080
2081   assert(cur_pos % header.alignment == 0);
2082   return SUCCESS;
2083 }
2084
2085 void FileJournal::reserve_throttle_and_backoff(uint64_t count)
2086 {
2087   throttle.get(count);
2088 }
2089
2090 void FileJournal::get_header(
2091   uint64_t wanted_seq,
2092   off64_t *_pos,
2093   entry_header_t *h)
2094 {
2095   off64_t pos = header.start;
2096   off64_t next_pos = pos;
2097   bufferlist bl;
2098   uint64_t seq = 0;
2099   dout(2) << __func__ << dendl;
2100   while (1) {
2101     bl.clear();
2102     pos = next_pos;
2103     read_entry_result result = do_read_entry(
2104       pos,
2105       &next_pos,
2106       &bl,
2107       &seq,
2108       0,
2109       h);
2110     if (result == FAILURE || result == MAYBE_CORRUPT)
2111       ceph_abort();
2112     if (seq == wanted_seq) {
2113       if (_pos)
2114         *_pos = pos;
2115       return;
2116     }
2117   }
2118   ceph_abort(); // not reachable
2119 }
2120
2121 void FileJournal::corrupt(
2122   int wfd,
2123   off64_t corrupt_at)
2124 {
2125   dout(2) << __func__ << dendl;
2126   if (corrupt_at >= header.max_size)
2127     corrupt_at = corrupt_at + get_top() - header.max_size;
2128
2129   int64_t actual = ::lseek64(fd, corrupt_at, SEEK_SET);
2130   assert(actual == corrupt_at);
2131
2132   char buf[10];
2133   int r = safe_read_exact(fd, buf, 1);
2134   assert(r == 0);
2135
2136   actual = ::lseek64(wfd, corrupt_at, SEEK_SET);
2137   assert(actual == corrupt_at);
2138
2139   buf[0]++;
2140   r = safe_write(wfd, buf, 1);
2141   assert(r == 0);
2142 }
2143
2144 void FileJournal::corrupt_payload(
2145   int wfd,
2146   uint64_t seq)
2147 {
2148   dout(2) << __func__ << dendl;
2149   off64_t pos = 0;
2150   entry_header_t h;
2151   get_header(seq, &pos, &h);
2152   off64_t corrupt_at =
2153     pos + sizeof(entry_header_t) + h.pre_pad;
2154   corrupt(wfd, corrupt_at);
2155 }
2156
2157
2158 void FileJournal::corrupt_footer_magic(
2159   int wfd,
2160   uint64_t seq)
2161 {
2162   dout(2) << __func__ << dendl;
2163   off64_t pos = 0;
2164   entry_header_t h;
2165   get_header(seq, &pos, &h);
2166   off64_t corrupt_at =
2167     pos + sizeof(entry_header_t) + h.pre_pad +
2168     h.len + h.post_pad +
2169     (reinterpret_cast<char*>(&h.magic2) - reinterpret_cast<char*>(&h));
2170   corrupt(wfd, corrupt_at);
2171 }
2172
2173
2174 void FileJournal::corrupt_header_magic(
2175   int wfd,
2176   uint64_t seq)
2177 {
2178   dout(2) << __func__ << dendl;
2179   off64_t pos = 0;
2180   entry_header_t h;
2181   get_header(seq, &pos, &h);
2182   off64_t corrupt_at =
2183     pos +
2184     (reinterpret_cast<char*>(&h.magic2) - reinterpret_cast<char*>(&h));
2185   corrupt(wfd, corrupt_at);
2186 }
2187
2188 off64_t FileJournal::get_journal_size_estimate()
2189 {
2190   off64_t size, start = header.start;
2191   if (write_pos < start) {
2192     size = (max_size - start) + write_pos;
2193   } else {
2194     size = write_pos - start;
2195   }
2196   dout(20) << __func__ << " journal size=" << size << dendl;
2197   return size;
2198 }