1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include <sys/types.h>
21 #include "KernelDevice.h"
22 #include "include/types.h"
23 #include "include/compat.h"
24 #include "include/stringify.h"
25 #include "common/errno.h"
26 #include "common/debug.h"
27 #include "common/blkdev.h"
28 #include "common/align.h"
29 #include "common/blkdev.h"
31 #define dout_context cct
32 #define dout_subsys ceph_subsys_bdev
34 #define dout_prefix *_dout << "bdev(" << this << " " << path << ") "
36 KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv)
40 size(0), block_size(0),
41 fs(NULL), aio(false), dio(false),
42 debug_lock("KernelDevice::debug_lock"),
43 aio_queue(cct->_conf->bdev_aio_max_queue_depth),
45 aio_callback_priv(cbpriv),
52 int KernelDevice::_lock()
55 memset(&l, 0, sizeof(l));
57 l.l_whence = SEEK_SET;
58 int r = ::fcntl(fd_direct, F_SETLK, &l);
64 int KernelDevice::open(const string& p)
68 dout(1) << __func__ << " path " << path << dendl;
70 fd_direct = ::open(path.c_str(), O_RDWR | O_DIRECT);
73 derr << __func__ << " open got: " << cpp_strerror(r) << dendl;
76 fd_buffered = ::open(path.c_str(), O_RDWR);
77 if (fd_buffered < 0) {
79 derr << __func__ << " open got: " << cpp_strerror(r) << dendl;
83 aio = cct->_conf->bdev_aio;
85 assert(0 == "non-aio not supported");
88 // disable readahead as it will wreak havoc on our mix of
89 // directio/aio and buffered io.
90 r = posix_fadvise(fd_buffered, 0, 0, POSIX_FADV_RANDOM);
93 derr << __func__ << " open got: " << cpp_strerror(r) << dendl;
99 derr << __func__ << " failed to lock " << path << ": " << cpp_strerror(r)
105 r = ::fstat(fd_direct, &st);
108 derr << __func__ << " fstat got " << cpp_strerror(r) << dendl;
112 // Operate as though the block size is 4 KB. The backing file
113 // blksize doesn't strictly matter except that some file systems may
114 // require a read/modify/write if we write something smaller than
116 block_size = cct->_conf->bdev_block_size;
117 if (block_size != (unsigned)st.st_blksize) {
118 dout(1) << __func__ << " backing device/file reports st_blksize "
119 << st.st_blksize << ", using bdev_block_size "
120 << block_size << " anyway" << dendl;
123 if (S_ISBLK(st.st_mode)) {
125 r = get_block_device_size(fd_direct, &s);
133 if (cct->_conf->get_val<bool>("bdev_inject_bad_size")) {
134 derr << "injecting bad size; actual 0x" << std::hex << size
135 << " but using 0x" << (size & ~block_size) << std::dec << dendl;
136 size &= ~(block_size);
140 char partition[PATH_MAX], devname[PATH_MAX];
141 r = get_device_by_fd(fd_buffered, partition, devname, sizeof(devname));
143 derr << "unable to get device name for " << path << ": "
144 << cpp_strerror(r) << dendl;
147 dout(20) << __func__ << " devname " << devname << dendl;
148 rotational = block_device_is_rotational(devname);
157 fs = FS::create_by_fd(fd_direct);
160 // round size down to an even block
161 size &= ~(block_size - 1);
165 << " (0x" << std::hex << size << std::dec << ", "
166 << pretty_si_t(size) << "B)"
167 << " block_size " << block_size
168 << " (" << pretty_si_t(block_size) << "B)"
169 << " " << (rotational ? "rotational" : "non-rotational")
174 VOID_TEMP_FAILURE_RETRY(::close(fd_buffered));
177 VOID_TEMP_FAILURE_RETRY(::close(fd_direct));
182 void KernelDevice::close()
184 dout(1) << __func__ << dendl;
191 assert(fd_direct >= 0);
192 VOID_TEMP_FAILURE_RETRY(::close(fd_direct));
195 assert(fd_buffered >= 0);
196 VOID_TEMP_FAILURE_RETRY(::close(fd_buffered));
202 static string get_dev_property(const char *dev, const char *property)
204 char val[1024] = {0};
205 get_block_device_string_property(dev, property, val, sizeof(val));
209 int KernelDevice::collect_metadata(string prefix, map<string,string> *pm) const
211 (*pm)[prefix + "rotational"] = stringify((int)(bool)rotational);
212 (*pm)[prefix + "size"] = stringify(get_size());
213 (*pm)[prefix + "block_size"] = stringify(get_block_size());
214 (*pm)[prefix + "driver"] = "KernelDevice";
216 (*pm)[prefix + "type"] = "hdd";
218 (*pm)[prefix + "type"] = "ssd";
222 int r = ::fstat(fd_buffered, &st);
225 if (S_ISBLK(st.st_mode)) {
226 (*pm)[prefix + "access_mode"] = "blk";
227 char partition_path[PATH_MAX];
228 char dev_node[PATH_MAX];
229 int rc = get_device_by_fd(fd_buffered, partition_path, dev_node, PATH_MAX);
233 (*pm)[prefix + "partition_path"] = "unknown";
234 (*pm)[prefix + "dev_node"] = "unknown";
237 (*pm)[prefix + "partition_path"] = string(partition_path);
238 (*pm)[prefix + "dev_node"] = "unknown";
242 (*pm)[prefix + "partition_path"] = string(partition_path);
243 (*pm)[prefix + "dev_node"] = string(dev_node);
244 (*pm)[prefix + "model"] = get_dev_property(dev_node, "device/model");
245 (*pm)[prefix + "dev"] = get_dev_property(dev_node, "dev");
247 // nvme exposes a serial number
248 string serial = get_dev_property(dev_node, "device/serial");
249 if (serial.length()) {
250 (*pm)[prefix + "serial"] = serial;
253 // nvme has a device/device/* structure; infer from that. there
254 // is probably a better way?
255 string nvme_vendor = get_dev_property(dev_node, "device/device/vendor");
256 if (nvme_vendor.length()) {
257 (*pm)[prefix + "type"] = "nvme";
262 (*pm)[prefix + "access_mode"] = "file";
263 (*pm)[prefix + "path"] = path;
268 int KernelDevice::flush()
270 // protect flush with a mutex. note that we are not really protecting
271 // data here. instead, we're ensuring that if any flush() caller
272 // sees that io_since_flush is true, they block any racing callers
273 // until the flush is observed. that allows racing threads to be
274 // calling flush while still ensuring that *any* of them that got an
275 // aio completion notification will not return before that aio is
276 // stable on disk: whichever thread sees the flag first will block
277 // followers until the aio is stable.
278 std::lock_guard<std::mutex> l(flush_mutex);
281 if (!io_since_flush.compare_exchange_strong(expect, false)) {
282 dout(10) << __func__ << " no-op (no ios since last flush), flag is "
283 << (int)io_since_flush.load() << dendl;
287 dout(10) << __func__ << " start" << dendl;
288 if (cct->_conf->bdev_inject_crash) {
290 // sleep for a moment to give other threads a chance to submit or
291 // wait on io that races with a flush.
292 derr << __func__ << " injecting crash. first we sleep..." << dendl;
293 sleep(cct->_conf->bdev_inject_crash_flush_delay);
294 derr << __func__ << " and now we die" << dendl;
298 utime_t start = ceph_clock_now();
299 int r = ::fdatasync(fd_direct);
300 utime_t end = ceph_clock_now();
301 utime_t dur = end - start;
304 derr << __func__ << " fdatasync got: " << cpp_strerror(r) << dendl;
307 dout(5) << __func__ << " in " << dur << dendl;;
311 int KernelDevice::_aio_start()
314 dout(10) << __func__ << dendl;
315 int r = aio_queue.init();
318 derr << __func__ << " io_setup(2) failed with EAGAIN; "
319 << "try increasing /proc/sys/fs/aio-max-nr" << dendl;
321 derr << __func__ << " io_setup(2) failed: " << cpp_strerror(r) << dendl;
325 aio_thread.create("bstore_aio");
330 void KernelDevice::_aio_stop()
333 dout(10) << __func__ << dendl;
337 aio_queue.shutdown();
341 void KernelDevice::_aio_thread()
343 dout(10) << __func__ << " start" << dendl;
344 int inject_crash_count = 0;
346 dout(40) << __func__ << " polling" << dendl;
347 int max = cct->_conf->bdev_aio_reap_max;
349 int r = aio_queue.get_next_completed(cct->_conf->bdev_aio_poll_ms,
352 derr << __func__ << " got " << cpp_strerror(r) << dendl;
355 dout(30) << __func__ << " got " << r << " completed aios" << dendl;
356 for (int i = 0; i < r; ++i) {
357 IOContext *ioc = static_cast<IOContext*>(aio[i]->priv);
358 _aio_log_finish(ioc, aio[i]->offset, aio[i]->length);
359 if (aio[i]->queue_item.is_linked()) {
360 std::lock_guard<std::mutex> l(debug_queue_lock);
361 debug_aio_unlink(*aio[i]);
364 // set flag indicating new ios have completed. we do this *before*
365 // any completion or notifications so that any user flush() that
366 // follows the observed io completion will include this io. Note
367 // that an earlier, racing flush() could observe and clear this
368 // flag, but that also ensures that the IO will be stable before the
369 // later flush() occurs.
370 io_since_flush.store(true);
372 int r = aio[i]->get_return_value();
373 dout(10) << __func__ << " finished aio " << aio[i] << " r " << r
375 << " with " << (ioc->num_running.load() - 1)
376 << " aios left" << dendl;
379 // NOTE: once num_running and we either call the callback or
380 // call aio_wake we cannot touch ioc or aio[] as the caller
383 if (--ioc->num_running == 0) {
384 aio_callback(aio_callback_priv, ioc->priv);
391 if (cct->_conf->bdev_debug_aio) {
392 utime_t now = ceph_clock_now();
393 std::lock_guard<std::mutex> l(debug_queue_lock);
395 if (debug_stall_since == utime_t()) {
396 debug_stall_since = now;
398 utime_t cutoff = now;
399 cutoff -= cct->_conf->bdev_debug_aio_suicide_timeout;
400 if (debug_stall_since < cutoff) {
401 derr << __func__ << " stalled aio " << debug_oldest
402 << " since " << debug_stall_since << ", timeout is "
403 << cct->_conf->bdev_debug_aio_suicide_timeout
404 << "s, suicide" << dendl;
405 assert(0 == "stalled aio... buggy kernel or bad device?");
411 if (cct->_conf->bdev_inject_crash) {
412 ++inject_crash_count;
413 if (inject_crash_count * cct->_conf->bdev_aio_poll_ms / 1000 >
414 cct->_conf->bdev_inject_crash + cct->_conf->bdev_inject_crash_flush_delay) {
415 derr << __func__ << " bdev_inject_crash trigger from aio thread"
423 dout(10) << __func__ << " end" << dendl;
426 void KernelDevice::_aio_log_start(
431 dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length
432 << std::dec << dendl;
433 if (cct->_conf->bdev_debug_inflight_ios) {
434 Mutex::Locker l(debug_lock);
435 if (debug_inflight.intersects(offset, length)) {
436 derr << __func__ << " inflight overlap of 0x"
438 << offset << "~" << length << std::dec
439 << " with " << debug_inflight << dendl;
442 debug_inflight.insert(offset, length);
446 void KernelDevice::debug_aio_link(aio_t& aio)
448 if (debug_queue.empty()) {
451 debug_queue.push_back(aio);
454 void KernelDevice::debug_aio_unlink(aio_t& aio)
456 if (aio.queue_item.is_linked()) {
457 debug_queue.erase(debug_queue.iterator_to(aio));
458 if (debug_oldest == &aio) {
459 if (debug_queue.empty()) {
460 debug_oldest = nullptr;
462 debug_oldest = &debug_queue.front();
464 debug_stall_since = utime_t();
469 void KernelDevice::_aio_log_finish(
474 dout(20) << __func__ << " " << aio << " 0x"
475 << std::hex << offset << "~" << length << std::dec << dendl;
476 if (cct->_conf->bdev_debug_inflight_ios) {
477 Mutex::Locker l(debug_lock);
478 debug_inflight.erase(offset, length);
482 void KernelDevice::aio_submit(IOContext *ioc)
484 dout(20) << __func__ << " ioc " << ioc
485 << " pending " << ioc->num_pending.load()
486 << " running " << ioc->num_running.load()
489 if (ioc->num_pending.load() == 0) {
493 // move these aside, and get our end iterator position now, as the
494 // aios might complete as soon as they are submitted and queue more
496 list<aio_t>::iterator e = ioc->running_aios.begin();
497 ioc->running_aios.splice(e, ioc->pending_aios);
499 int pending = ioc->num_pending.load();
500 ioc->num_running += pending;
501 ioc->num_pending -= pending;
502 assert(ioc->num_pending.load() == 0); // we should be only thread doing this
503 assert(ioc->pending_aios.size() == 0);
505 if (cct->_conf->bdev_debug_aio) {
506 list<aio_t>::iterator p = ioc->running_aios.begin();
508 for (auto& io : p->iov)
509 dout(30) << __func__ << " iov " << (void*)io.iov_base
510 << " len " << io.iov_len << dendl;
512 std::lock_guard<std::mutex> l(debug_queue_lock);
513 debug_aio_link(*p++);
517 void *priv = static_cast<void*>(ioc);
519 r = aio_queue.submit_batch(ioc->running_aios.begin(), e,
520 ioc->num_running.load(), priv, &retries);
523 derr << __func__ << " retries " << retries << dendl;
525 derr << " aio submit got " << cpp_strerror(r) << dendl;
530 int KernelDevice::_sync_write(uint64_t off, bufferlist &bl, bool buffered)
532 uint64_t len = bl.length();
533 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
534 << std::dec << " buffered" << dendl;
535 if (cct->_conf->bdev_inject_crash &&
536 rand() % cct->_conf->bdev_inject_crash == 0) {
537 derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex
538 << off << "~" << len << std::dec << dendl;
543 bl.prepare_iov(&iov);
544 int r = ::pwritev(buffered ? fd_buffered : fd_direct,
545 &iov[0], iov.size(), off);
549 derr << __func__ << " pwritev error: " << cpp_strerror(r) << dendl;
553 // initiate IO (but do not wait)
554 r = ::sync_file_range(fd_buffered, off, len, SYNC_FILE_RANGE_WRITE);
557 derr << __func__ << " sync_file_range error: " << cpp_strerror(r) << dendl;
562 io_since_flush.store(true);
567 int KernelDevice::write(
572 uint64_t len = bl.length();
573 dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
574 << (buffered ? " (buffered)" : " (direct)")
576 assert(off % block_size == 0);
577 assert(len % block_size == 0);
580 assert(off + len <= size);
582 if ((!buffered || bl.get_num_buffers() >= IOV_MAX) &&
583 bl.rebuild_aligned_size_and_memory(block_size, block_size)) {
584 dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl;
586 dout(40) << "data: ";
590 return _sync_write(off, bl, buffered);
593 int KernelDevice::aio_write(
599 uint64_t len = bl.length();
600 dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
601 << (buffered ? " (buffered)" : " (direct)")
603 assert(off % block_size == 0);
604 assert(len % block_size == 0);
607 assert(off + len <= size);
609 if ((!buffered || bl.get_num_buffers() >= IOV_MAX) &&
610 bl.rebuild_aligned_size_and_memory(block_size, block_size)) {
611 dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl;
613 dout(40) << "data: ";
617 _aio_log_start(ioc, off, len);
620 if (aio && dio && !buffered) {
621 ioc->pending_aios.push_back(aio_t(ioc, fd_direct));
623 aio_t& aio = ioc->pending_aios.back();
624 if (cct->_conf->bdev_inject_crash &&
625 rand() % cct->_conf->bdev_inject_crash == 0) {
626 derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex
627 << off << "~" << len << std::dec
629 // generate a real io so that aio_wait behaves properly, but make it
630 // a read instead of write, and toss the result.
634 bl.prepare_iov(&aio.iov);
635 for (unsigned i=0; i<aio.iov.size(); ++i) {
636 dout(30) << "aio " << i << " " << aio.iov[i].iov_base
637 << " " << aio.iov[i].iov_len << dendl;
639 aio.bl.claim_append(bl);
640 aio.pwritev(off, len);
642 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
643 << std::dec << " aio " << &aio << dendl;
647 int r = _sync_write(off, bl, buffered);
648 _aio_log_finish(ioc, off, len);
655 int KernelDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
659 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
660 << (buffered ? " (buffered)" : " (direct)")
662 assert(off % block_size == 0);
663 assert(len % block_size == 0);
666 assert(off + len <= size);
668 _aio_log_start(ioc, off, len);
670 bufferptr p = buffer::create_page_aligned(len);
671 int r = ::pread(buffered ? fd_buffered : fd_direct,
672 p.c_str(), len, off);
677 assert((uint64_t)r == len);
678 pbl->push_back(std::move(p));
680 dout(40) << "data: ";
681 pbl->hexdump(*_dout);
685 _aio_log_finish(ioc, off, len);
686 return r < 0 ? r : 0;
689 int KernelDevice::aio_read(
695 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
701 _aio_log_start(ioc, off, len);
702 ioc->pending_aios.push_back(aio_t(ioc, fd_direct));
704 aio_t& aio = ioc->pending_aios.back();
706 for (unsigned i=0; i<aio.iov.size(); ++i) {
707 dout(30) << "aio " << i << " " << aio.iov[i].iov_base
708 << " " << aio.iov[i].iov_len << dendl;
711 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
712 << std::dec << " aio " << &aio << dendl;
716 r = read(off, len, pbl, ioc, false);
722 int KernelDevice::direct_read_unaligned(uint64_t off, uint64_t len, char *buf)
724 uint64_t aligned_off = align_down(off, block_size);
725 uint64_t aligned_len = align_up(off+len, block_size) - aligned_off;
726 bufferptr p = buffer::create_page_aligned(aligned_len);
729 r = ::pread(fd_direct, p.c_str(), aligned_len, aligned_off);
732 derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
733 << " error: " << cpp_strerror(r) << dendl;
736 assert((uint64_t)r == aligned_len);
737 memcpy(buf, p.c_str() + (off - aligned_off), len);
739 dout(40) << __func__ << " data: ";
746 return r < 0 ? r : 0;
749 int KernelDevice::read_random(uint64_t off, uint64_t len, char *buf,
752 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
756 assert(off + len <= size);
759 //if it's direct io and unaligned, we have to use a internal buffer
760 if (!buffered && ((off % block_size != 0)
761 || (len % block_size != 0)
762 || (uintptr_t(buf) % CEPH_PAGE_SIZE != 0)))
763 return direct_read_unaligned(off, len, buf);
770 r = ::pread(fd_buffered, t, left, off);
773 derr << __func__ << " 0x" << std::hex << off << "~" << left
774 << std::dec << " error: " << cpp_strerror(r) << dendl;
782 //direct and aligned read
783 r = ::pread(fd_direct, buf, len, off);
786 derr << __func__ << " direct_aligned_read" << " 0x" << std::hex
787 << off << "~" << left << std::dec << " error: " << cpp_strerror(r)
791 assert((uint64_t)r == len);
794 dout(40) << __func__ << " data: ";
801 return r < 0 ? r : 0;
804 int KernelDevice::invalidate_cache(uint64_t off, uint64_t len)
806 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
808 assert(off % block_size == 0);
809 assert(len % block_size == 0);
810 int r = posix_fadvise(fd_buffered, off, len, POSIX_FADV_DONTNEED);
813 derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
814 << " error: " << cpp_strerror(r) << dendl;