// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "boost/algorithm/string.hpp" #include "BlueFS.h" #include "common/debug.h" #include "common/errno.h" #include "common/perf_counters.h" #include "BlockDevice.h" #include "Allocator.h" #include "include/assert.h" #define dout_context cct #define dout_subsys ceph_subsys_bluefs #undef dout_prefix #define dout_prefix *_dout << "bluefs " MEMPOOL_DEFINE_OBJECT_FACTORY(BlueFS::File, bluefs_file, bluefs); MEMPOOL_DEFINE_OBJECT_FACTORY(BlueFS::Dir, bluefs_dir, bluefs); MEMPOOL_DEFINE_OBJECT_FACTORY(BlueFS::FileWriter, bluefs_file_writer, bluefs); MEMPOOL_DEFINE_OBJECT_FACTORY(BlueFS::FileReaderBuffer, bluefs_file_reader_buffer, bluefs); MEMPOOL_DEFINE_OBJECT_FACTORY(BlueFS::FileReader, bluefs_file_reader, bluefs); MEMPOOL_DEFINE_OBJECT_FACTORY(BlueFS::FileLock, bluefs_file_lock, bluefs); BlueFS::BlueFS(CephContext* cct) : cct(cct), bdev(MAX_BDEV), ioc(MAX_BDEV), block_all(MAX_BDEV), block_total(MAX_BDEV, 0) { } BlueFS::~BlueFS() { for (auto p : ioc) { if (p) p->aio_wait(); } for (auto p : bdev) { if (p) { p->close(); delete p; } } for (auto p : ioc) { delete p; } } void BlueFS::_init_logger() { PerfCountersBuilder b(cct, "bluefs", l_bluefs_first, l_bluefs_last); b.add_u64_counter(l_bluefs_gift_bytes, "gift_bytes", "Bytes gifted from BlueStore"); b.add_u64_counter(l_bluefs_reclaim_bytes, "reclaim_bytes", "Bytes reclaimed by BlueStore"); b.add_u64(l_bluefs_db_total_bytes, "db_total_bytes", "Total bytes (main db device)", "b", PerfCountersBuilder::PRIO_USEFUL); b.add_u64(l_bluefs_db_used_bytes, "db_used_bytes", "Used bytes (main db device)", "u", PerfCountersBuilder::PRIO_USEFUL); b.add_u64(l_bluefs_wal_total_bytes, "wal_total_bytes", "Total bytes (wal device)", "walb", PerfCountersBuilder::PRIO_USEFUL); b.add_u64(l_bluefs_wal_used_bytes, "wal_used_bytes", "Used bytes (wal device)", "walu", PerfCountersBuilder::PRIO_USEFUL); b.add_u64(l_bluefs_slow_total_bytes, "slow_total_bytes", "Total bytes (slow device)", "slob", PerfCountersBuilder::PRIO_USEFUL); b.add_u64(l_bluefs_slow_used_bytes, "slow_used_bytes", "Used bytes (slow device)", "slou", PerfCountersBuilder::PRIO_USEFUL); b.add_u64(l_bluefs_num_files, "num_files", "File count", "f", PerfCountersBuilder::PRIO_USEFUL); b.add_u64(l_bluefs_log_bytes, "log_bytes", "Size of the metadata log", "jlen", PerfCountersBuilder::PRIO_INTERESTING); b.add_u64_counter(l_bluefs_log_compactions, "log_compactions", "Compactions of the metadata log"); b.add_u64_counter(l_bluefs_logged_bytes, "logged_bytes", "Bytes written to the metadata log", "j", PerfCountersBuilder::PRIO_CRITICAL); b.add_u64_counter(l_bluefs_files_written_wal, "files_written_wal", "Files written to WAL"); b.add_u64_counter(l_bluefs_files_written_sst, "files_written_sst", "Files written to SSTs"); b.add_u64_counter(l_bluefs_bytes_written_wal, "bytes_written_wal", "Bytes written to WAL", "wal", PerfCountersBuilder::PRIO_CRITICAL); b.add_u64_counter(l_bluefs_bytes_written_sst, "bytes_written_sst", "Bytes written to SSTs", "sst", PerfCountersBuilder::PRIO_CRITICAL); logger = b.create_perf_counters(); cct->get_perfcounters_collection()->add(logger); } void BlueFS::_shutdown_logger() { cct->get_perfcounters_collection()->remove(logger); delete logger; } void BlueFS::_update_logger_stats() { // we must be holding the lock logger->set(l_bluefs_num_files, file_map.size()); logger->set(l_bluefs_log_bytes, log_writer->file->fnode.size); if (alloc[BDEV_WAL]) { logger->set(l_bluefs_wal_total_bytes, block_total[BDEV_WAL]); logger->set(l_bluefs_wal_used_bytes, block_total[BDEV_WAL] - alloc[BDEV_WAL]->get_free()); } if (alloc[BDEV_DB]) { logger->set(l_bluefs_db_total_bytes, block_total[BDEV_DB]); logger->set(l_bluefs_db_used_bytes, block_total[BDEV_DB] - alloc[BDEV_DB]->get_free()); } if (alloc[BDEV_SLOW]) { logger->set(l_bluefs_slow_total_bytes, block_total[BDEV_SLOW]); logger->set(l_bluefs_slow_used_bytes, block_total[BDEV_SLOW] - alloc[BDEV_SLOW]->get_free()); } } int BlueFS::add_block_device(unsigned id, const string& path) { dout(10) << __func__ << " bdev " << id << " path " << path << dendl; assert(id < bdev.size()); assert(bdev[id] == NULL); BlockDevice *b = BlockDevice::create(cct, path, NULL, NULL); int r = b->open(path); if (r < 0) { delete b; return r; } dout(1) << __func__ << " bdev " << id << " path " << path << " size " << pretty_si_t(b->get_size()) << "B" << dendl; bdev[id] = b; ioc[id] = new IOContext(cct, NULL); return 0; } bool BlueFS::bdev_support_label(unsigned id) { assert(id < bdev.size()); assert(bdev[id]); return bdev[id]->supported_bdev_label(); } uint64_t BlueFS::get_block_device_size(unsigned id) { if (id < bdev.size() && bdev[id]) return bdev[id]->get_size(); return 0; } void BlueFS::add_block_extent(unsigned id, uint64_t offset, uint64_t length) { std::unique_lock l(lock); dout(1) << __func__ << " bdev " << id << " 0x" << std::hex << offset << "~" << length << std::dec << dendl; assert(id < bdev.size()); assert(bdev[id]); assert(bdev[id]->get_size() >= offset + length); block_all[id].insert(offset, length); block_total[id] += length; if (id < alloc.size() && alloc[id]) { log_t.op_alloc_add(id, offset, length); int r = _flush_and_sync_log(l); assert(r == 0); alloc[id]->init_add_free(offset, length); } if (logger) logger->inc(l_bluefs_gift_bytes, length); dout(10) << __func__ << " done" << dendl; } int BlueFS::reclaim_blocks(unsigned id, uint64_t want, AllocExtentVector *extents) { std::unique_lock l(lock); dout(1) << __func__ << " bdev " << id << " want 0x" << std::hex << want << std::dec << dendl; assert(id < alloc.size()); assert(alloc[id]); int r = alloc[id]->reserve(want); assert(r == 0); // caller shouldn't ask for more than they can get int64_t got = alloc[id]->allocate(want, cct->_conf->bluefs_alloc_size, 0, extents); if (got < (int64_t)want) { alloc[id]->unreserve(want - MAX(0, got)); } if (got <= 0) { derr << __func__ << " failed to allocate space to return to bluestore" << dendl; alloc[id]->dump(); return got; } for (auto& p : *extents) { block_all[id].erase(p.offset, p.length); block_total[id] -= p.length; log_t.op_alloc_rm(id, p.offset, p.length); } flush_bdev(); r = _flush_and_sync_log(l); assert(r == 0); if (logger) logger->inc(l_bluefs_reclaim_bytes, got); dout(1) << __func__ << " bdev " << id << " want 0x" << std::hex << want << " got " << *extents << dendl; return 0; } uint64_t BlueFS::get_fs_usage() { std::lock_guard l(lock); uint64_t total_bytes = 0; for (auto& p : file_map) { total_bytes += p.second->fnode.get_allocated(); } return total_bytes; } uint64_t BlueFS::get_total(unsigned id) { std::lock_guard l(lock); assert(id < block_all.size()); return block_total[id]; } uint64_t BlueFS::get_free(unsigned id) { std::lock_guard l(lock); assert(id < alloc.size()); return alloc[id]->get_free(); } void BlueFS::dump_perf_counters(Formatter *f) { f->open_object_section("bluefs_perf_counters"); logger->dump_formatted(f,0); f->close_section(); } void BlueFS::dump_block_extents(ostream& out) { for (unsigned i = 0; i < MAX_BDEV; ++i) { if (!bdev[i]) { continue; } out << i << " : size 0x" << std::hex << bdev[i]->get_size() << " : own 0x" << block_all[i] << std::dec << "\n"; } } void BlueFS::get_usage(vector> *usage) { std::lock_guard l(lock); usage->resize(bdev.size()); for (unsigned id = 0; id < bdev.size(); ++id) { if (!bdev[id]) { (*usage)[id] = make_pair(0, 0); continue; } (*usage)[id].first = alloc[id]->get_free(); (*usage)[id].second = block_total[id]; uint64_t used = (block_total[id] - (*usage)[id].first) * 100 / block_total[id]; dout(10) << __func__ << " bdev " << id << " free " << (*usage)[id].first << " (" << pretty_si_t((*usage)[id].first) << "B)" << " / " << (*usage)[id].second << " (" << pretty_si_t((*usage)[id].second) << "B)" << ", used " << used << "%" << dendl; } } int BlueFS::get_block_extents(unsigned id, interval_set *extents) { std::lock_guard l(lock); dout(10) << __func__ << " bdev " << id << dendl; if (id >= block_all.size()) return -EINVAL; *extents = block_all[id]; return 0; } int BlueFS::mkfs(uuid_d osd_uuid) { std::unique_lock l(lock); dout(1) << __func__ << " osd_uuid " << osd_uuid << dendl; _init_alloc(); _init_logger(); super.version = 1; super.block_size = bdev[BDEV_DB]->get_block_size(); super.osd_uuid = osd_uuid; super.uuid.generate_random(); dout(1) << __func__ << " uuid " << super.uuid << dendl; // init log FileRef log_file = new File; log_file->fnode.ino = 1; log_file->fnode.prefer_bdev = BDEV_WAL; int r = _allocate( log_file->fnode.prefer_bdev, cct->_conf->bluefs_max_log_runway, &log_file->fnode.extents); log_file->fnode.recalc_allocated(); assert(r == 0); log_writer = _create_writer(log_file); // initial txn log_t.op_init(); for (unsigned bdev = 0; bdev < MAX_BDEV; ++bdev) { interval_set& p = block_all[bdev]; if (p.empty()) continue; for (interval_set::iterator q = p.begin(); q != p.end(); ++q) { dout(20) << __func__ << " op_alloc_add " << bdev << " 0x" << std::hex << q.get_start() << "~" << q.get_len() << std::dec << dendl; log_t.op_alloc_add(bdev, q.get_start(), q.get_len()); } } _flush_and_sync_log(l); // write supers super.log_fnode = log_file->fnode; _write_super(); flush_bdev(); // clean up super = bluefs_super_t(); _close_writer(log_writer); log_writer = NULL; block_all.clear(); block_total.clear(); _stop_alloc(); _shutdown_logger(); dout(10) << __func__ << " success" << dendl; return 0; } void BlueFS::_init_alloc() { dout(20) << __func__ << dendl; alloc.resize(MAX_BDEV); pending_release.resize(MAX_BDEV); for (unsigned id = 0; id < bdev.size(); ++id) { if (!bdev[id]) { continue; } assert(bdev[id]->get_size()); alloc[id] = Allocator::create(cct, cct->_conf->bluefs_allocator, bdev[id]->get_size(), cct->_conf->bluefs_alloc_size); interval_set& p = block_all[id]; for (interval_set::iterator q = p.begin(); q != p.end(); ++q) { alloc[id]->init_add_free(q.get_start(), q.get_len()); } } } void BlueFS::_stop_alloc() { dout(20) << __func__ << dendl; for (auto p : alloc) { if (p != nullptr) { p->shutdown(); delete p; } } alloc.clear(); } int BlueFS::mount() { dout(1) << __func__ << dendl; int r = _open_super(); if (r < 0) { derr << __func__ << " failed to open super: " << cpp_strerror(r) << dendl; goto out; } block_all.clear(); block_all.resize(MAX_BDEV); block_total.clear(); block_total.resize(MAX_BDEV, 0); _init_alloc(); r = _replay(false); if (r < 0) { derr << __func__ << " failed to replay log: " << cpp_strerror(r) << dendl; _stop_alloc(); goto out; } // init freelist for (auto& p : file_map) { dout(30) << __func__ << " noting alloc for " << p.second->fnode << dendl; for (auto& q : p.second->fnode.extents) { alloc[q.bdev]->init_rm_free(q.offset, q.length); } } // set up the log for future writes log_writer = _create_writer(_get_file(1)); assert(log_writer->file->fnode.ino == 1); log_writer->pos = log_writer->file->fnode.size; dout(10) << __func__ << " log write pos set to 0x" << std::hex << log_writer->pos << std::dec << dendl; _init_logger(); return 0; out: super = bluefs_super_t(); return r; } void BlueFS::umount() { dout(1) << __func__ << dendl; sync_metadata(); _close_writer(log_writer); log_writer = NULL; _stop_alloc(); file_map.clear(); dir_map.clear(); super = bluefs_super_t(); log_t.clear(); _shutdown_logger(); } void BlueFS::collect_metadata(map *pm) { if (bdev[BDEV_DB]) bdev[BDEV_DB]->collect_metadata("bluefs_db_", pm); if (bdev[BDEV_WAL]) bdev[BDEV_WAL]->collect_metadata("bluefs_wal_", pm); if (bdev[BDEV_SLOW]) bdev[BDEV_SLOW]->collect_metadata("bluefs_slow_", pm); } int BlueFS::fsck() { std::lock_guard l(lock); dout(1) << __func__ << dendl; // hrm, i think we check everything on mount... return 0; } int BlueFS::_write_super() { // build superblock bufferlist bl; ::encode(super, bl); uint32_t crc = bl.crc32c(-1); ::encode(crc, bl); dout(10) << __func__ << " super block length(encoded): " << bl.length() << dendl; dout(10) << __func__ << " superblock " << super.version << dendl; dout(10) << __func__ << " log_fnode " << super.log_fnode << dendl; assert(bl.length() <= get_super_length()); bl.append_zero(get_super_length() - bl.length()); bdev[BDEV_DB]->write(get_super_offset(), bl, false); dout(20) << __func__ << " v " << super.version << " crc 0x" << std::hex << crc << " offset 0x" << get_super_offset() << std::dec << dendl; return 0; } int BlueFS::_open_super() { dout(10) << __func__ << dendl; bufferlist bl; uint32_t expected_crc, crc; int r; // always the second block r = bdev[BDEV_DB]->read(get_super_offset(), get_super_length(), &bl, ioc[BDEV_DB], false); if (r < 0) return r; bufferlist::iterator p = bl.begin(); ::decode(super, p); { bufferlist t; t.substr_of(bl, 0, p.get_off()); crc = t.crc32c(-1); } ::decode(expected_crc, p); if (crc != expected_crc) { derr << __func__ << " bad crc on superblock, expected 0x" << std::hex << expected_crc << " != actual 0x" << crc << std::dec << dendl; return -EIO; } dout(10) << __func__ << " superblock " << super.version << dendl; dout(10) << __func__ << " log_fnode " << super.log_fnode << dendl; return 0; } int BlueFS::_replay(bool noop) { dout(10) << __func__ << (noop ? " NO-OP" : "") << dendl; ino_last = 1; // by the log log_seq = 0; FileRef log_file; if (noop) { log_file = new File; } else { log_file = _get_file(1); } log_file->fnode = super.log_fnode; dout(10) << __func__ << " log_fnode " << super.log_fnode << dendl; FileReader *log_reader = new FileReader( log_file, cct->_conf->bluefs_max_prefetch, false, // !random true); // ignore eof while (true) { assert((log_reader->buf.pos & ~super.block_mask()) == 0); uint64_t pos = log_reader->buf.pos; uint64_t read_pos = pos; bufferlist bl; { int r = _read(log_reader, &log_reader->buf, read_pos, super.block_size, &bl, NULL); assert(r == (int)super.block_size); read_pos += r; } uint64_t more = 0; uint64_t seq; uuid_d uuid; { bufferlist::iterator p = bl.begin(); __u8 a, b; uint32_t len; ::decode(a, p); ::decode(b, p); ::decode(len, p); ::decode(uuid, p); ::decode(seq, p); if (len + 6 > bl.length()) { more = ROUND_UP_TO(len + 6 - bl.length(), super.block_size); } } if (uuid != super.uuid) { dout(10) << __func__ << " 0x" << std::hex << pos << std::dec << ": stop: uuid " << uuid << " != super.uuid " << super.uuid << dendl; break; } if (seq != log_seq + 1) { dout(10) << __func__ << " 0x" << std::hex << pos << std::dec << ": stop: seq " << seq << " != expected " << log_seq + 1 << dendl; break; } if (more) { dout(20) << __func__ << " need 0x" << std::hex << more << std::dec << " more bytes" << dendl; bufferlist t; int r = _read(log_reader, &log_reader->buf, read_pos, more, &t, NULL); if (r < (int)more) { dout(10) << __func__ << " 0x" << std::hex << pos << ": stop: len is 0x" << bl.length() + more << std::dec << ", which is past eof" << dendl; break; } assert(r == (int)more); bl.claim_append(t); read_pos += r; } bluefs_transaction_t t; try { bufferlist::iterator p = bl.begin(); ::decode(t, p); } catch (buffer::error& e) { dout(10) << __func__ << " 0x" << std::hex << pos << std::dec << ": stop: failed to decode: " << e.what() << dendl; delete log_reader; return -EIO; } assert(seq == t.seq); dout(10) << __func__ << " 0x" << std::hex << pos << std::dec << ": " << t << dendl; bufferlist::iterator p = t.op_bl.begin(); while (!p.end()) { __u8 op; ::decode(op, p); switch (op) { case bluefs_transaction_t::OP_INIT: dout(20) << __func__ << " 0x" << std::hex << pos << std::dec << ": op_init" << dendl; assert(t.seq == 1); break; case bluefs_transaction_t::OP_JUMP: { uint64_t next_seq; uint64_t offset; ::decode(next_seq, p); ::decode(offset, p); dout(20) << __func__ << " 0x" << std::hex << pos << std::dec << ": op_jump seq " << next_seq << " offset 0x" << std::hex << offset << std::dec << dendl; assert(next_seq >= log_seq); log_seq = next_seq - 1; // we will increment it below uint64_t skip = offset - read_pos; if (skip) { bufferlist junk; int r = _read(log_reader, &log_reader->buf, read_pos, skip, &junk, NULL); if (r != (int)skip) { dout(10) << __func__ << " 0x" << std::hex << read_pos << ": stop: failed to skip to " << offset << std::dec << dendl; assert(0 == "problem with op_jump"); } } } break; case bluefs_transaction_t::OP_JUMP_SEQ: { uint64_t next_seq; ::decode(next_seq, p); dout(20) << __func__ << " 0x" << std::hex << pos << std::dec << ": op_jump_seq " << next_seq << dendl; assert(next_seq >= log_seq); log_seq = next_seq - 1; // we will increment it below } break; case bluefs_transaction_t::OP_ALLOC_ADD: { __u8 id; uint64_t offset, length; ::decode(id, p); ::decode(offset, p); ::decode(length, p); dout(20) << __func__ << " 0x" << std::hex << pos << std::dec << ": op_alloc_add " << " " << (int)id << ":0x" << std::hex << offset << "~" << length << std::dec << dendl; if (!noop) { block_all[id].insert(offset, length); block_total[id] += length; alloc[id]->init_add_free(offset, length); } } break; case bluefs_transaction_t::OP_ALLOC_RM: { __u8 id; uint64_t offset, length; ::decode(id, p); ::decode(offset, p); ::decode(length, p); dout(20) << __func__ << " 0x" << std::hex << pos << std::dec << ": op_alloc_rm " << " " << (int)id << ":0x" << std::hex << offset << "~" << length << std::dec << dendl; if (!noop) { block_all[id].erase(offset, length); block_total[id] -= length; alloc[id]->init_rm_free(offset, length); } } break; case bluefs_transaction_t::OP_DIR_LINK: { string dirname, filename; uint64_t ino; ::decode(dirname, p); ::decode(filename, p); ::decode(ino, p); dout(20) << __func__ << " 0x" << std::hex << pos << std::dec << ": op_dir_link " << " " << dirname << "/" << filename << " to " << ino << dendl; if (!noop) { FileRef file = _get_file(ino); assert(file->fnode.ino); map::iterator q = dir_map.find(dirname); assert(q != dir_map.end()); map::iterator r = q->second->file_map.find(filename); assert(r == q->second->file_map.end()); q->second->file_map[filename] = file; ++file->refs; } } break; case bluefs_transaction_t::OP_DIR_UNLINK: { string dirname, filename; ::decode(dirname, p); ::decode(filename, p); dout(20) << __func__ << " 0x" << std::hex << pos << std::dec << ": op_dir_unlink " << " " << dirname << "/" << filename << dendl; if (!noop) { map::iterator q = dir_map.find(dirname); assert(q != dir_map.end()); map::iterator r = q->second->file_map.find(filename); assert(r != q->second->file_map.end()); assert(r->second->refs > 0); --r->second->refs; q->second->file_map.erase(r); } } break; case bluefs_transaction_t::OP_DIR_CREATE: { string dirname; ::decode(dirname, p); dout(20) << __func__ << " 0x" << std::hex << pos << std::dec << ": op_dir_create " << dirname << dendl; if (!noop) { map::iterator q = dir_map.find(dirname); assert(q == dir_map.end()); dir_map[dirname] = new Dir; } } break; case bluefs_transaction_t::OP_DIR_REMOVE: { string dirname; ::decode(dirname, p); dout(20) << __func__ << " 0x" << std::hex << pos << std::dec << ": op_dir_remove " << dirname << dendl; if (!noop) { map::iterator q = dir_map.find(dirname); assert(q != dir_map.end()); assert(q->second->file_map.empty()); dir_map.erase(q); } } break; case bluefs_transaction_t::OP_FILE_UPDATE: { bluefs_fnode_t fnode; ::decode(fnode, p); dout(20) << __func__ << " 0x" << std::hex << pos << std::dec << ": op_file_update " << " " << fnode << dendl; if (!noop) { FileRef f = _get_file(fnode.ino); f->fnode = fnode; if (fnode.ino > ino_last) { ino_last = fnode.ino; } } } break; case bluefs_transaction_t::OP_FILE_REMOVE: { uint64_t ino; ::decode(ino, p); dout(20) << __func__ << " 0x" << std::hex << pos << std::dec << ": op_file_remove " << ino << dendl; if (!noop) { auto p = file_map.find(ino); assert(p != file_map.end()); file_map.erase(p); } } break; default: derr << __func__ << " 0x" << std::hex << pos << std::dec << ": stop: unrecognized op " << (int)op << dendl; delete log_reader; return -EIO; } } assert(p.end()); // we successfully replayed the transaction; bump the seq and log size ++log_seq; log_file->fnode.size = log_reader->buf.pos; } dout(10) << __func__ << " log file size was 0x" << std::hex << log_file->fnode.size << std::dec << dendl; delete log_reader; if (!noop) { // verify file link counts are all >0 for (auto& p : file_map) { if (p.second->refs == 0 && p.second->fnode.ino > 1) { derr << __func__ << " file with link count 0: " << p.second->fnode << dendl; return -EIO; } } } dout(10) << __func__ << " done" << dendl; return 0; } BlueFS::FileRef BlueFS::_get_file(uint64_t ino) { auto p = file_map.find(ino); if (p == file_map.end()) { FileRef f = new File; file_map[ino] = f; dout(30) << __func__ << " ino " << ino << " = " << f << " (new)" << dendl; return f; } else { dout(30) << __func__ << " ino " << ino << " = " << p->second << dendl; return p->second; } } void BlueFS::_drop_link(FileRef file) { dout(20) << __func__ << " had refs " << file->refs << " on " << file->fnode << dendl; assert(file->refs > 0); --file->refs; if (file->refs == 0) { dout(20) << __func__ << " destroying " << file->fnode << dendl; assert(file->num_reading.load() == 0); log_t.op_file_remove(file->fnode.ino); for (auto& r : file->fnode.extents) { pending_release[r.bdev].insert(r.offset, r.length); } file_map.erase(file->fnode.ino); file->deleted = true; file->fnode.recalc_allocated(); if (file->dirty_seq) { assert(file->dirty_seq > log_seq_stable); assert(dirty_files.count(file->dirty_seq)); auto it = dirty_files[file->dirty_seq].iterator_to(*file); dirty_files[file->dirty_seq].erase(it); file->dirty_seq = 0; } } } int BlueFS::_read_random( FileReader *h, ///< [in] read from here uint64_t off, ///< [in] offset size_t len, ///< [in] this many bytes char *out) ///< [out] optional: or copy it here { dout(10) << __func__ << " h " << h << " 0x" << std::hex << off << "~" << len << std::dec << " from " << h->file->fnode << dendl; ++h->file->num_reading; if (!h->ignore_eof && off + len > h->file->fnode.size) { if (off > h->file->fnode.size) len = 0; else len = h->file->fnode.size - off; dout(20) << __func__ << " reaching (or past) eof, len clipped to 0x" << std::hex << len << std::dec << dendl; } int ret = 0; while (len > 0) { uint64_t x_off = 0; auto p = h->file->fnode.seek(off, &x_off); uint64_t l = MIN(p->length - x_off, len); dout(20) << __func__ << " read buffered 0x" << std::hex << x_off << "~" << l << std::dec << " of " << *p << dendl; int r = bdev[p->bdev]->read_random(p->offset + x_off, l, out, cct->_conf->bluefs_buffered_io); assert(r == 0); off += l; len -= l; ret += l; out += l; } dout(20) << __func__ << " got " << ret << dendl; --h->file->num_reading; return ret; } int BlueFS::_read( FileReader *h, ///< [in] read from here FileReaderBuffer *buf, ///< [in] reader state uint64_t off, ///< [in] offset size_t len, ///< [in] this many bytes bufferlist *outbl, ///< [out] optional: reference the result here char *out) ///< [out] optional: or copy it here { dout(10) << __func__ << " h " << h << " 0x" << std::hex << off << "~" << len << std::dec << " from " << h->file->fnode << dendl; ++h->file->num_reading; if (!h->ignore_eof && off + len > h->file->fnode.size) { if (off > h->file->fnode.size) len = 0; else len = h->file->fnode.size - off; dout(20) << __func__ << " reaching (or past) eof, len clipped to 0x" << std::hex << len << std::dec << dendl; } if (outbl) outbl->clear(); int ret = 0; while (len > 0) { size_t left; if (off < buf->bl_off || off >= buf->get_buf_end()) { buf->bl.clear(); buf->bl_off = off & super.block_mask(); uint64_t x_off = 0; auto p = h->file->fnode.seek(buf->bl_off, &x_off); uint64_t want = ROUND_UP_TO(len + (off & ~super.block_mask()), super.block_size); want = MAX(want, buf->max_prefetch); uint64_t l = MIN(p->length - x_off, want); uint64_t eof_offset = ROUND_UP_TO(h->file->fnode.size, super.block_size); if (!h->ignore_eof && buf->bl_off + l > eof_offset) { l = eof_offset - buf->bl_off; } dout(20) << __func__ << " fetching 0x" << std::hex << x_off << "~" << l << std::dec << " of " << *p << dendl; int r = bdev[p->bdev]->read(p->offset + x_off, l, &buf->bl, ioc[p->bdev], cct->_conf->bluefs_buffered_io); assert(r == 0); } left = buf->get_buf_remaining(off); dout(20) << __func__ << " left 0x" << std::hex << left << " len 0x" << len << std::dec << dendl; int r = MIN(len, left); if (outbl) { bufferlist t; t.substr_of(buf->bl, off - buf->bl_off, r); outbl->claim_append(t); } if (out) { // NOTE: h->bl is normally a contiguous buffer so c_str() is free. memcpy(out, buf->bl.c_str() + off - buf->bl_off, r); out += r; } dout(30) << __func__ << " result chunk (0x" << std::hex << r << std::dec << " bytes):\n"; bufferlist t; t.substr_of(buf->bl, off - buf->bl_off, r); t.hexdump(*_dout); *_dout << dendl; off += r; len -= r; ret += r; buf->pos += r; } dout(20) << __func__ << " got " << ret << dendl; assert(!outbl || (int)outbl->length() == ret); --h->file->num_reading; return ret; } void BlueFS::_invalidate_cache(FileRef f, uint64_t offset, uint64_t length) { dout(10) << __func__ << " file " << f->fnode << " 0x" << std::hex << offset << "~" << length << std::dec << dendl; if (offset & ~super.block_mask()) { offset &= super.block_mask(); length = ROUND_UP_TO(length, super.block_size); } uint64_t x_off = 0; auto p = f->fnode.seek(offset, &x_off); while (length > 0 && p != f->fnode.extents.end()) { uint64_t x_len = MIN(p->length - x_off, length); bdev[p->bdev]->invalidate_cache(p->offset + x_off, x_len); dout(20) << __func__ << " 0x" << std::hex << x_off << "~" << x_len << std:: dec << " of " << *p << dendl; offset += x_len; length -= x_len; } } uint64_t BlueFS::_estimate_log_size() { int avg_dir_size = 40; // fixme int avg_file_size = 12; uint64_t size = 4096 * 2; size += file_map.size() * (1 + sizeof(bluefs_fnode_t)); for (auto& p : block_all) size += p.num_intervals() * (1 + 1 + sizeof(uint64_t) * 2); size += dir_map.size() + (1 + avg_dir_size); size += file_map.size() * (1 + avg_dir_size + avg_file_size); return ROUND_UP_TO(size, super.block_size); } void BlueFS::compact_log() { std::unique_lock l(lock); if (cct->_conf->bluefs_compact_log_sync) { _compact_log_sync(); } else { _compact_log_async(l); } } bool BlueFS::_should_compact_log() { uint64_t current = log_writer->file->fnode.size; uint64_t expected = _estimate_log_size(); float ratio = (float)current / (float)expected; dout(10) << __func__ << " current 0x" << std::hex << current << " expected " << expected << std::dec << " ratio " << ratio << (new_log ? " (async compaction in progress)" : "") << dendl; if (new_log || current < cct->_conf->bluefs_log_compact_min_size || ratio < cct->_conf->bluefs_log_compact_min_ratio) { return false; } return true; } void BlueFS::_compact_log_dump_metadata(bluefs_transaction_t *t) { t->seq = 1; t->uuid = super.uuid; dout(20) << __func__ << " op_init" << dendl; t->op_init(); for (unsigned bdev = 0; bdev < MAX_BDEV; ++bdev) { interval_set& p = block_all[bdev]; for (interval_set::iterator q = p.begin(); q != p.end(); ++q) { dout(20) << __func__ << " op_alloc_add " << bdev << " 0x" << std::hex << q.get_start() << "~" << q.get_len() << std::dec << dendl; t->op_alloc_add(bdev, q.get_start(), q.get_len()); } } for (auto& p : file_map) { if (p.first == 1) continue; dout(20) << __func__ << " op_file_update " << p.second->fnode << dendl; assert(p.first > 1); t->op_file_update(p.second->fnode); } for (auto& p : dir_map) { dout(20) << __func__ << " op_dir_create " << p.first << dendl; t->op_dir_create(p.first); for (auto& q : p.second->file_map) { dout(20) << __func__ << " op_dir_link " << p.first << "/" << q.first << " to " << q.second->fnode.ino << dendl; t->op_dir_link(p.first, q.first, q.second->fnode.ino); } } } void BlueFS::_compact_log_sync() { dout(10) << __func__ << dendl; File *log_file = log_writer->file.get(); // clear out log (be careful who calls us!!!) log_t.clear(); bluefs_transaction_t t; _compact_log_dump_metadata(&t); dout(20) << __func__ << " op_jump_seq " << log_seq << dendl; t.op_jump_seq(log_seq); bufferlist bl; ::encode(t, bl); _pad_bl(bl); uint64_t need = bl.length() + cct->_conf->bluefs_max_log_runway; dout(20) << __func__ << " need " << need << dendl; mempool::bluefs::vector old_extents; old_extents.swap(log_file->fnode.extents); log_file->fnode.recalc_allocated(); while (log_file->fnode.get_allocated() < need) { int r = _allocate(log_file->fnode.prefer_bdev, need - log_file->fnode.get_allocated(), &log_file->fnode.extents); log_file->fnode.recalc_allocated(); assert(r == 0); } _close_writer(log_writer); log_file->fnode.size = bl.length(); log_writer = _create_writer(log_file); log_writer->append(bl); int r = _flush(log_writer, true); assert(r == 0); wait_for_aio(log_writer); list completed_ios; _claim_completed_aios(log_writer, &completed_ios); flush_bdev(); completed_ios.clear(); dout(10) << __func__ << " writing super" << dendl; super.log_fnode = log_file->fnode; ++super.version; _write_super(); flush_bdev(); dout(10) << __func__ << " release old log extents " << old_extents << dendl; for (auto& r : old_extents) { pending_release[r.bdev].insert(r.offset, r.length); } logger->inc(l_bluefs_log_compactions); } /* * 1. Allocate a new extent to continue the log, and then log an event * that jumps the log write position to the new extent. At this point, the * old extent(s) won't be written to, and reflect everything to compact. * New events will be written to the new region that we'll keep. * * 2. While still holding the lock, encode a bufferlist that dumps all of the * in-memory fnodes and names. This will become the new beginning of the * log. The last event will jump to the log continuation extent from #1. * * 3. Queue a write to a new extent for the new beginnging of the log. * * 4. Drop lock and wait * * 5. Retake the lock. * * 6. Update the log_fnode to splice in the new beginning. * * 7. Write the new superblock. * * 8. Release the old log space. Clean up. */ void BlueFS::_compact_log_async(std::unique_lock& l) { dout(10) << __func__ << dendl; File *log_file = log_writer->file.get(); assert(!new_log); assert(!new_log_writer); // create a new log [writer] so that we know compaction is in progress // (see _should_compact_log) new_log = new File; new_log->fnode.ino = 0; // so that _flush_range won't try to log the fnode // 0. wait for any racing flushes to complete. (We do not want to block // in _flush_sync_log with jump_to set or else a racing thread might flush // our entries and our jump_to update won't be correct.) while (log_flushing) { dout(10) << __func__ << " log is currently flushing, waiting" << dendl; log_cond.wait(l); } // 1. allocate new log space and jump to it. old_log_jump_to = log_file->fnode.get_allocated(); uint64_t need = old_log_jump_to + cct->_conf->bluefs_max_log_runway; dout(10) << __func__ << " old_log_jump_to 0x" << std::hex << old_log_jump_to << " need 0x" << need << std::dec << dendl; while (log_file->fnode.get_allocated() < need) { int r = _allocate(log_file->fnode.prefer_bdev, cct->_conf->bluefs_max_log_runway, &log_file->fnode.extents); assert(r == 0); log_file->fnode.recalc_allocated(); } dout(10) << __func__ << " log extents " << log_file->fnode.extents << dendl; // update the log file change and log a jump to the offset where we want to // write the new entries log_t.op_file_update(log_file->fnode); log_t.op_jump(log_seq, old_log_jump_to); flush_bdev(); // FIXME? _flush_and_sync_log(l, 0, old_log_jump_to); // 2. prepare compacted log bluefs_transaction_t t; //avoid record two times in log_t and _compact_log_dump_metadata. log_t.clear(); _compact_log_dump_metadata(&t); // conservative estimate for final encoded size new_log_jump_to = ROUND_UP_TO(t.op_bl.length() + super.block_size * 2, cct->_conf->bluefs_alloc_size); t.op_jump(log_seq, new_log_jump_to); bufferlist bl; ::encode(t, bl); _pad_bl(bl); dout(10) << __func__ << " new_log_jump_to 0x" << std::hex << new_log_jump_to << std::dec << dendl; // allocate int r = _allocate(BlueFS::BDEV_DB, new_log_jump_to, &new_log->fnode.extents); assert(r == 0); new_log->fnode.recalc_allocated(); new_log_writer = _create_writer(new_log); new_log_writer->append(bl); // 3. flush r = _flush(new_log_writer, true); assert(r == 0); lock.unlock(); // 4. wait dout(10) << __func__ << " waiting for compacted log to sync" << dendl; wait_for_aio(new_log_writer); list completed_ios; _claim_completed_aios(new_log_writer, &completed_ios); flush_bdev(); completed_ios.clear(); // 5. retake lock lock.lock(); // 6. update our log fnode // discard first old_log_jump_to extents dout(10) << __func__ << " remove 0x" << std::hex << old_log_jump_to << std::dec << " of " << log_file->fnode.extents << dendl; uint64_t discarded = 0; mempool::bluefs::vector old_extents; while (discarded < old_log_jump_to) { assert(!log_file->fnode.extents.empty()); bluefs_extent_t& e = log_file->fnode.extents.front(); bluefs_extent_t temp = e; if (discarded + e.length <= old_log_jump_to) { dout(10) << __func__ << " remove old log extent " << e << dendl; discarded += e.length; log_file->fnode.extents.erase(log_file->fnode.extents.begin()); } else { dout(10) << __func__ << " remove front of old log extent " << e << dendl; uint64_t drop = old_log_jump_to - discarded; temp.length = drop; e.offset += drop; e.length -= drop; discarded += drop; dout(10) << __func__ << " kept " << e << " removed " << temp << dendl; } old_extents.push_back(temp); } new_log->fnode.extents.insert(new_log->fnode.extents.end(), log_file->fnode.extents.begin(), log_file->fnode.extents.end()); // clear the extents from old log file, they are added to new log log_file->fnode.extents.clear(); // swap the log files. New log file is the log file now. log_file->fnode.extents.swap(new_log->fnode.extents); log_file->fnode.recalc_allocated(); new_log->fnode.recalc_allocated(); log_writer->pos = log_writer->file->fnode.size = log_writer->pos - old_log_jump_to + new_log_jump_to; // 7. write the super block to reflect the changes dout(10) << __func__ << " writing super" << dendl; super.log_fnode = log_file->fnode; ++super.version; _write_super(); lock.unlock(); flush_bdev(); lock.lock(); // 8. release old space dout(10) << __func__ << " release old log extents " << old_extents << dendl; for (auto& r : old_extents) { pending_release[r.bdev].insert(r.offset, r.length); } // delete the new log, remove from the dirty files list _close_writer(new_log_writer); if (new_log->dirty_seq) { assert(dirty_files.count(new_log->dirty_seq)); auto it = dirty_files[new_log->dirty_seq].iterator_to(*new_log); dirty_files[new_log->dirty_seq].erase(it); } new_log_writer = nullptr; new_log = nullptr; log_cond.notify_all(); dout(10) << __func__ << " log extents " << log_file->fnode.extents << dendl; logger->inc(l_bluefs_log_compactions); } void BlueFS::_pad_bl(bufferlist& bl) { uint64_t partial = bl.length() % super.block_size; if (partial) { dout(10) << __func__ << " padding with 0x" << std::hex << super.block_size - partial << " zeros" << std::dec << dendl; bl.append_zero(super.block_size - partial); } } void BlueFS::flush_log() { std::unique_lock l(lock); flush_bdev(); _flush_and_sync_log(l); } int BlueFS::_flush_and_sync_log(std::unique_lock& l, uint64_t want_seq, uint64_t jump_to) { while (log_flushing) { dout(10) << __func__ << " want_seq " << want_seq << " log is currently flushing, waiting" << dendl; assert(!jump_to); log_cond.wait(l); } if (want_seq && want_seq <= log_seq_stable) { dout(10) << __func__ << " want_seq " << want_seq << " <= log_seq_stable " << log_seq_stable << ", done" << dendl; assert(!jump_to); return 0; } if (log_t.empty() && dirty_files.empty()) { dout(10) << __func__ << " want_seq " << want_seq << " " << log_t << " not dirty, dirty_files empty, no-op" << dendl; assert(!jump_to); return 0; } uint64_t seq = log_t.seq = ++log_seq; assert(want_seq == 0 || want_seq <= seq); log_t.uuid = super.uuid; // log dirty files auto lsi = dirty_files.find(seq); if (lsi != dirty_files.end()) { dout(20) << __func__ << " " << lsi->second.size() << " dirty_files" << dendl; for (auto &f : lsi->second) { dout(20) << __func__ << " op_file_update " << f.fnode << dendl; log_t.op_file_update(f.fnode); } } dout(10) << __func__ << " " << log_t << dendl; assert(!log_t.empty()); // allocate some more space (before we run out)? int64_t runway = log_writer->file->fnode.get_allocated() - log_writer->get_effective_write_pos(); if (runway < (int64_t)cct->_conf->bluefs_min_log_runway) { dout(10) << __func__ << " allocating more log runway (0x" << std::hex << runway << std::dec << " remaining)" << dendl; while (new_log_writer) { dout(10) << __func__ << " waiting for async compaction" << dendl; log_cond.wait(l); } int r = _allocate(log_writer->file->fnode.prefer_bdev, cct->_conf->bluefs_max_log_runway, &log_writer->file->fnode.extents); assert(r == 0); log_writer->file->fnode.recalc_allocated(); log_t.op_file_update(log_writer->file->fnode); } bufferlist bl; ::encode(log_t, bl); // pad to block boundary _pad_bl(bl); logger->inc(l_bluefs_logged_bytes, bl.length()); log_writer->append(bl); log_t.clear(); log_t.seq = 0; // just so debug output is less confusing log_flushing = true; int r = _flush(log_writer, true); assert(r == 0); if (jump_to) { dout(10) << __func__ << " jumping log offset from 0x" << std::hex << log_writer->pos << " -> 0x" << jump_to << std::dec << dendl; log_writer->pos = jump_to; log_writer->file->fnode.size = jump_to; } _flush_bdev_safely(log_writer); log_flushing = false; log_cond.notify_all(); // clean dirty files if (seq > log_seq_stable) { log_seq_stable = seq; dout(20) << __func__ << " log_seq_stable " << log_seq_stable << dendl; auto p = dirty_files.begin(); while (p != dirty_files.end()) { if (p->first > log_seq_stable) { dout(20) << __func__ << " done cleaning up dirty files" << dendl; break; } auto l = p->second.begin(); while (l != p->second.end()) { File *file = &*l; assert(file->dirty_seq > 0); assert(file->dirty_seq <= log_seq_stable); dout(20) << __func__ << " cleaned file " << file->fnode << dendl; file->dirty_seq = 0; p->second.erase(l++); } assert(p->second.empty()); dirty_files.erase(p++); } } else { dout(20) << __func__ << " log_seq_stable " << log_seq_stable << " already >= out seq " << seq << ", we lost a race against another log flush, done" << dendl; } _update_logger_stats(); return 0; } int BlueFS::_flush_range(FileWriter *h, uint64_t offset, uint64_t length) { dout(10) << __func__ << " " << h << " pos 0x" << std::hex << h->pos << " 0x" << offset << "~" << length << std::dec << " to " << h->file->fnode << dendl; assert(!h->file->deleted); assert(h->file->num_readers.load() == 0); h->buffer_appender.flush(); bool buffered; if (h->file->fnode.ino == 1) buffered = false; else buffered = cct->_conf->bluefs_buffered_io; if (offset + length <= h->pos) return 0; if (offset < h->pos) { length -= h->pos - offset; offset = h->pos; dout(10) << " still need 0x" << std::hex << offset << "~" << length << std::dec << dendl; } assert(offset <= h->file->fnode.size); uint64_t allocated = h->file->fnode.get_allocated(); // do not bother to dirty the file if we are overwriting // previously allocated extents. bool must_dirty = false; if (allocated < offset + length) { // we should never run out of log space here; see the min runway check // in _flush_and_sync_log. assert(h->file->fnode.ino != 1); int r = _allocate(h->file->fnode.prefer_bdev, offset + length - allocated, &h->file->fnode.extents); if (r < 0) { derr << __func__ << " allocated: 0x" << std::hex << allocated << " offset: 0x" << offset << " length: 0x" << length << std::dec << dendl; assert(0 == "bluefs enospc"); return r; } h->file->fnode.recalc_allocated(); if (cct->_conf->bluefs_preextend_wal_files && h->writer_type == WRITER_WAL) { // NOTE: this *requires* that rocksdb also has log recycling // enabled and is therefore doing robust CRCs on the log // records. otherwise, we will fail to reply the rocksdb log // properly due to garbage on the device. h->file->fnode.size = h->file->fnode.get_allocated(); dout(10) << __func__ << " extending WAL size to 0x" << std::hex << h->file->fnode.size << std::dec << " to include allocated" << dendl; } must_dirty = true; } if (h->file->fnode.size < offset + length) { h->file->fnode.size = offset + length; if (h->file->fnode.ino > 1) { // we do not need to dirty the log file (or it's compacting // replacement) when the file size changes because replay is // smart enough to discover it on its own. must_dirty = true; } } if (must_dirty) { h->file->fnode.mtime = ceph_clock_now(); assert(h->file->fnode.ino >= 1); if (h->file->dirty_seq == 0) { h->file->dirty_seq = log_seq + 1; dirty_files[h->file->dirty_seq].push_back(*h->file); dout(20) << __func__ << " dirty_seq = " << log_seq + 1 << " (was clean)" << dendl; } else { if (h->file->dirty_seq != log_seq + 1) { // need re-dirty, erase from list first assert(dirty_files.count(h->file->dirty_seq)); auto it = dirty_files[h->file->dirty_seq].iterator_to(*h->file); dirty_files[h->file->dirty_seq].erase(it); h->file->dirty_seq = log_seq + 1; dirty_files[h->file->dirty_seq].push_back(*h->file); dout(20) << __func__ << " dirty_seq = " << log_seq + 1 << " (was " << h->file->dirty_seq << ")" << dendl; } else { dout(20) << __func__ << " dirty_seq = " << log_seq + 1 << " (unchanged, do nothing) " << dendl; } } } dout(20) << __func__ << " file now " << h->file->fnode << dendl; uint64_t x_off = 0; auto p = h->file->fnode.seek(offset, &x_off); assert(p != h->file->fnode.extents.end()); dout(20) << __func__ << " in " << *p << " x_off 0x" << std::hex << x_off << std::dec << dendl; unsigned partial = x_off & ~super.block_mask(); bufferlist bl; if (partial) { dout(20) << __func__ << " using partial tail 0x" << std::hex << partial << std::dec << dendl; assert(h->tail_block.length() == partial); bl.claim_append_piecewise(h->tail_block); x_off -= partial; offset -= partial; length += partial; dout(20) << __func__ << " waiting for previous aio to complete" << dendl; for (auto p : h->iocv) { if (p) { p->aio_wait(); } } } if (length == partial + h->buffer.length()) { bl.claim_append_piecewise(h->buffer); } else { bufferlist t; h->buffer.splice(0, length, &t); bl.claim_append_piecewise(t); t.substr_of(h->buffer, length, h->buffer.length() - length); h->buffer.swap(t); dout(20) << " leaving 0x" << std::hex << h->buffer.length() << std::dec << " unflushed" << dendl; } assert(bl.length() == length); switch (h->writer_type) { case WRITER_WAL: logger->inc(l_bluefs_bytes_written_wal, length); break; case WRITER_SST: logger->inc(l_bluefs_bytes_written_sst, length); break; } dout(30) << "dump:\n"; bl.hexdump(*_dout); *_dout << dendl; h->pos = offset + length; h->tail_block.clear(); uint64_t bloff = 0; while (length > 0) { uint64_t x_len = MIN(p->length - x_off, length); bufferlist t; t.substr_of(bl, bloff, x_len); unsigned tail = x_len & ~super.block_mask(); if (tail) { size_t zlen = super.block_size - tail; dout(20) << __func__ << " caching tail of 0x" << std::hex << tail << " and padding block with 0x" << zlen << std::dec << dendl; h->tail_block.substr_of(bl, bl.length() - tail, tail); if (h->file->fnode.ino > 1) { // we are using the page_aligned_appender, and can safely use // the tail of the raw buffer. const bufferptr &last = t.back(); if (last.unused_tail_length() < zlen) { derr << " wtf, last is " << last << " from " << t << dendl; assert(last.unused_tail_length() >= zlen); } bufferptr z = last; z.set_offset(last.offset() + last.length()); z.set_length(zlen); z.zero(); t.append(z, 0, zlen); } else { t.append_zero(zlen); } } if (cct->_conf->bluefs_sync_write) { bdev[p->bdev]->write(p->offset + x_off, t, buffered); } else { bdev[p->bdev]->aio_write(p->offset + x_off, t, h->iocv[p->bdev], buffered); } bloff += x_len; length -= x_len; ++p; x_off = 0; } for (unsigned i = 0; i < MAX_BDEV; ++i) { if (bdev[i]) { assert(h->iocv[i]); if (h->iocv[i]->has_pending_aios()) { bdev[i]->aio_submit(h->iocv[i]); } } } dout(20) << __func__ << " h " << h << " pos now 0x" << std::hex << h->pos << std::dec << dendl; return 0; } // we need to retire old completed aios so they don't stick around in // memory indefinitely (along with their bufferlist refs). void BlueFS::_claim_completed_aios(FileWriter *h, list *ls) { for (auto p : h->iocv) { if (p) { ls->splice(ls->end(), p->running_aios); } } dout(10) << __func__ << " got " << ls->size() << " aios" << dendl; } void BlueFS::wait_for_aio(FileWriter *h) { // NOTE: this is safe to call without a lock, as long as our reference is // stable. dout(10) << __func__ << " " << h << dendl; utime_t start = ceph_clock_now(); for (auto p : h->iocv) { if (p) { p->aio_wait(); } } utime_t end = ceph_clock_now(); utime_t dur = end - start; dout(10) << __func__ << " " << h << " done in " << dur << dendl; } int BlueFS::_flush(FileWriter *h, bool force) { h->buffer_appender.flush(); uint64_t length = h->buffer.length(); uint64_t offset = h->pos; if (!force && length < cct->_conf->bluefs_min_flush_size) { dout(10) << __func__ << " " << h << " ignoring, length " << length << " < min_flush_size " << cct->_conf->bluefs_min_flush_size << dendl; return 0; } if (length == 0) { dout(10) << __func__ << " " << h << " no dirty data on " << h->file->fnode << dendl; return 0; } dout(10) << __func__ << " " << h << " 0x" << std::hex << offset << "~" << length << std::dec << " to " << h->file->fnode << dendl; assert(h->pos <= h->file->fnode.size); return _flush_range(h, offset, length); } int BlueFS::_truncate(FileWriter *h, uint64_t offset) { dout(10) << __func__ << " 0x" << std::hex << offset << std::dec << " file " << h->file->fnode << dendl; if (h->file->deleted) { dout(10) << __func__ << " deleted, no-op" << dendl; return 0; } // we never truncate internal log files assert(h->file->fnode.ino > 1); h->buffer_appender.flush(); // truncate off unflushed data? if (h->pos < offset && h->pos + h->buffer.length() > offset) { bufferlist t; dout(20) << __func__ << " tossing out last " << offset - h->pos << " unflushed bytes" << dendl; t.substr_of(h->buffer, 0, offset - h->pos); h->buffer.swap(t); assert(0 == "actually this shouldn't happen"); } if (h->buffer.length()) { int r = _flush(h, true); if (r < 0) return r; } if (offset == h->file->fnode.size) { return 0; // no-op! } if (offset > h->file->fnode.size) { assert(0 == "truncate up not supported"); } assert(h->file->fnode.size >= offset); h->file->fnode.size = offset; log_t.op_file_update(h->file->fnode); return 0; } int BlueFS::_fsync(FileWriter *h, std::unique_lock& l) { dout(10) << __func__ << " " << h << " " << h->file->fnode << dendl; int r = _flush(h, true); if (r < 0) return r; uint64_t old_dirty_seq = h->file->dirty_seq; _flush_bdev_safely(h); if (old_dirty_seq) { uint64_t s = log_seq; dout(20) << __func__ << " file metadata was dirty (" << old_dirty_seq << ") on " << h->file->fnode << ", flushing log" << dendl; _flush_and_sync_log(l, old_dirty_seq); assert(h->file->dirty_seq == 0 || // cleaned h->file->dirty_seq > s); // or redirtied by someone else } return 0; } void BlueFS::_flush_bdev_safely(FileWriter *h) { if (!cct->_conf->bluefs_sync_write) { list completed_ios; _claim_completed_aios(h, &completed_ios); lock.unlock(); wait_for_aio(h); completed_ios.clear(); flush_bdev(); lock.lock(); } else { lock.unlock(); flush_bdev(); lock.lock(); } } void BlueFS::flush_bdev() { // NOTE: this is safe to call without a lock. dout(20) << __func__ << dendl; for (auto p : bdev) { if (p) p->flush(); } } int BlueFS::_allocate(uint8_t id, uint64_t len, mempool::bluefs::vector *ev) { dout(10) << __func__ << " len 0x" << std::hex << len << std::dec << " from " << (int)id << dendl; assert(id < alloc.size()); uint64_t min_alloc_size = cct->_conf->bluefs_alloc_size; uint64_t left = ROUND_UP_TO(len, min_alloc_size); int r = -ENOSPC; if (alloc[id]) { r = alloc[id]->reserve(left); } if (r < 0) { if (id != BDEV_SLOW) { if (bdev[id]) { dout(1) << __func__ << " failed to allocate 0x" << std::hex << left << " on bdev " << (int)id << ", free 0x" << alloc[id]->get_free() << "; fallback to bdev " << (int)id + 1 << std::dec << dendl; } return _allocate(id + 1, len, ev); } if (bdev[id]) derr << __func__ << " failed to allocate 0x" << std::hex << left << " on bdev " << (int)id << ", free 0x" << alloc[id]->get_free() << std::dec << dendl; else derr << __func__ << " failed to allocate 0x" << std::hex << left << " on bdev " << (int)id << ", dne" << std::dec << dendl; return r; } uint64_t hint = 0; if (!ev->empty()) { hint = ev->back().end(); } AllocExtentVector extents; extents.reserve(4); // 4 should be (more than) enough for most allocations int64_t alloc_len = alloc[id]->allocate(left, min_alloc_size, hint, &extents); if (alloc_len < (int64_t)left) { derr << __func__ << " allocate failed on 0x" << std::hex << left << " min_alloc_size 0x" << min_alloc_size << " hint 0x" << hint << std::dec << dendl; alloc[id]->dump(); assert(0 == "allocate failed... wtf"); return -ENOSPC; } for (auto& p : extents) { bluefs_extent_t e = bluefs_extent_t(id, p.offset, p.length); if (!ev->empty() && ev->back().bdev == e.bdev && ev->back().end() == (uint64_t) e.offset) { ev->back().length += e.length; } else { ev->push_back(e); } } return 0; } int BlueFS::_preallocate(FileRef f, uint64_t off, uint64_t len) { dout(10) << __func__ << " file " << f->fnode << " 0x" << std::hex << off << "~" << len << std::dec << dendl; if (f->deleted) { dout(10) << __func__ << " deleted, no-op" << dendl; return 0; } assert(f->fnode.ino > 1); uint64_t allocated = f->fnode.get_allocated(); if (off + len > allocated) { uint64_t want = off + len - allocated; int r = _allocate(f->fnode.prefer_bdev, want, &f->fnode.extents); if (r < 0) return r; f->fnode.recalc_allocated(); log_t.op_file_update(f->fnode); } return 0; } void BlueFS::sync_metadata() { std::unique_lock l(lock); if (log_t.empty()) { dout(10) << __func__ << " - no pending log events" << dendl; return; } dout(10) << __func__ << dendl; utime_t start = ceph_clock_now(); vector> to_release(pending_release.size()); to_release.swap(pending_release); flush_bdev(); // FIXME? _flush_and_sync_log(l); for (unsigned i = 0; i < to_release.size(); ++i) { for (auto p = to_release[i].begin(); p != to_release[i].end(); ++p) { alloc[i]->release(p.get_start(), p.get_len()); } } if (_should_compact_log()) { if (cct->_conf->bluefs_compact_log_sync) { _compact_log_sync(); } else { _compact_log_async(l); } } utime_t end = ceph_clock_now(); utime_t dur = end - start; dout(10) << __func__ << " done in " << dur << dendl; } int BlueFS::open_for_write( const string& dirname, const string& filename, FileWriter **h, bool overwrite) { std::lock_guard l(lock); dout(10) << __func__ << " " << dirname << "/" << filename << dendl; map::iterator p = dir_map.find(dirname); DirRef dir; if (p == dir_map.end()) { // implicitly create the dir dout(20) << __func__ << " dir " << dirname << " does not exist" << dendl; return -ENOENT; } else { dir = p->second; } FileRef file; bool create = false; map::iterator q = dir->file_map.find(filename); if (q == dir->file_map.end()) { if (overwrite) { dout(20) << __func__ << " dir " << dirname << " (" << dir << ") file " << filename << " does not exist" << dendl; return -ENOENT; } file = new File; file->fnode.ino = ++ino_last; file_map[ino_last] = file; dir->file_map[filename] = file; ++file->refs; create = true; } else { // overwrite existing file? file = q->second; if (overwrite) { dout(20) << __func__ << " dir " << dirname << " (" << dir << ") file " << filename << " already exists, overwrite in place" << dendl; } else { dout(20) << __func__ << " dir " << dirname << " (" << dir << ") file " << filename << " already exists, truncate + overwrite" << dendl; file->fnode.size = 0; for (auto& p : file->fnode.extents) { pending_release[p.bdev].insert(p.offset, p.length); } file->fnode.extents.clear(); file->fnode.recalc_allocated(); } } assert(file->fnode.ino > 1); file->fnode.mtime = ceph_clock_now(); file->fnode.prefer_bdev = BlueFS::BDEV_DB; if (dirname.length() > 5) { // the "db.slow" and "db.wal" directory names are hard-coded at // match up with bluestore. the slow device is always the second // one (when a dedicated block.db device is present and used at // bdev 0). the wal device is always last. if (boost::algorithm::ends_with(dirname, ".slow")) { file->fnode.prefer_bdev = BlueFS::BDEV_SLOW; } else if (boost::algorithm::ends_with(dirname, ".wal")) { file->fnode.prefer_bdev = BlueFS::BDEV_WAL; } } dout(20) << __func__ << " mapping " << dirname << "/" << filename << " to bdev " << (int)file->fnode.prefer_bdev << dendl; log_t.op_file_update(file->fnode); if (create) log_t.op_dir_link(dirname, filename, file->fnode.ino); *h = _create_writer(file); if (boost::algorithm::ends_with(filename, ".log")) { (*h)->writer_type = BlueFS::WRITER_WAL; if (logger && !overwrite) { logger->inc(l_bluefs_files_written_wal); } } else if (boost::algorithm::ends_with(filename, ".sst")) { (*h)->writer_type = BlueFS::WRITER_SST; if (logger) { logger->inc(l_bluefs_files_written_sst); } } dout(10) << __func__ << " h " << *h << " on " << file->fnode << dendl; return 0; } BlueFS::FileWriter *BlueFS::_create_writer(FileRef f) { FileWriter *w = new FileWriter(f); for (unsigned i = 0; i < MAX_BDEV; ++i) { if (bdev[i]) { w->iocv[i] = new IOContext(cct, NULL); } else { w->iocv[i] = NULL; } } return w; } void BlueFS::_close_writer(FileWriter *h) { dout(10) << __func__ << " " << h << " type " << h->writer_type << dendl; for (unsigned i=0; iiocv[i]); h->iocv[i]->aio_wait(); bdev[i]->queue_reap_ioc(h->iocv[i]); } } delete h; } int BlueFS::open_for_read( const string& dirname, const string& filename, FileReader **h, bool random) { std::lock_guard l(lock); dout(10) << __func__ << " " << dirname << "/" << filename << (random ? " (random)":" (sequential)") << dendl; map::iterator p = dir_map.find(dirname); if (p == dir_map.end()) { dout(20) << __func__ << " dir " << dirname << " not found" << dendl; return -ENOENT; } DirRef dir = p->second; map::iterator q = dir->file_map.find(filename); if (q == dir->file_map.end()) { dout(20) << __func__ << " dir " << dirname << " (" << dir << ") file " << filename << " not found" << dendl; return -ENOENT; } File *file = q->second.get(); *h = new FileReader(file, random ? 4096 : cct->_conf->bluefs_max_prefetch, random, false); dout(10) << __func__ << " h " << *h << " on " << file->fnode << dendl; return 0; } int BlueFS::rename( const string& old_dirname, const string& old_filename, const string& new_dirname, const string& new_filename) { std::lock_guard l(lock); dout(10) << __func__ << " " << old_dirname << "/" << old_filename << " -> " << new_dirname << "/" << new_filename << dendl; map::iterator p = dir_map.find(old_dirname); if (p == dir_map.end()) { dout(20) << __func__ << " dir " << old_dirname << " not found" << dendl; return -ENOENT; } DirRef old_dir = p->second; map::iterator q = old_dir->file_map.find(old_filename); if (q == old_dir->file_map.end()) { dout(20) << __func__ << " dir " << old_dirname << " (" << old_dir << ") file " << old_filename << " not found" << dendl; return -ENOENT; } FileRef file = q->second; p = dir_map.find(new_dirname); if (p == dir_map.end()) { dout(20) << __func__ << " dir " << new_dirname << " not found" << dendl; return -ENOENT; } DirRef new_dir = p->second; q = new_dir->file_map.find(new_filename); if (q != new_dir->file_map.end()) { dout(20) << __func__ << " dir " << new_dirname << " (" << old_dir << ") file " << new_filename << " already exists, unlinking" << dendl; assert(q->second != file); log_t.op_dir_unlink(new_dirname, new_filename); _drop_link(q->second); } dout(10) << __func__ << " " << new_dirname << "/" << new_filename << " " << " " << file->fnode << dendl; new_dir->file_map[new_filename] = file; old_dir->file_map.erase(old_filename); log_t.op_dir_link(new_dirname, new_filename, file->fnode.ino); log_t.op_dir_unlink(old_dirname, old_filename); return 0; } int BlueFS::mkdir(const string& dirname) { std::lock_guard l(lock); dout(10) << __func__ << " " << dirname << dendl; map::iterator p = dir_map.find(dirname); if (p != dir_map.end()) { dout(20) << __func__ << " dir " << dirname << " exists" << dendl; return -EEXIST; } dir_map[dirname] = new Dir; log_t.op_dir_create(dirname); return 0; } int BlueFS::rmdir(const string& dirname) { std::lock_guard l(lock); dout(10) << __func__ << " " << dirname << dendl; map::iterator p = dir_map.find(dirname); if (p == dir_map.end()) { dout(20) << __func__ << " dir " << dirname << " does not exist" << dendl; return -ENOENT; } DirRef dir = p->second; if (!dir->file_map.empty()) { dout(20) << __func__ << " dir " << dirname << " not empty" << dendl; return -ENOTEMPTY; } dir_map.erase(dirname); log_t.op_dir_remove(dirname); return 0; } bool BlueFS::dir_exists(const string& dirname) { std::lock_guard l(lock); map::iterator p = dir_map.find(dirname); bool exists = p != dir_map.end(); dout(10) << __func__ << " " << dirname << " = " << (int)exists << dendl; return exists; } int BlueFS::stat(const string& dirname, const string& filename, uint64_t *size, utime_t *mtime) { std::lock_guard l(lock); dout(10) << __func__ << " " << dirname << "/" << filename << dendl; map::iterator p = dir_map.find(dirname); if (p == dir_map.end()) { dout(20) << __func__ << " dir " << dirname << " not found" << dendl; return -ENOENT; } DirRef dir = p->second; map::iterator q = dir->file_map.find(filename); if (q == dir->file_map.end()) { dout(20) << __func__ << " dir " << dirname << " (" << dir << ") file " << filename << " not found" << dendl; return -ENOENT; } File *file = q->second.get(); dout(10) << __func__ << " " << dirname << "/" << filename << " " << file->fnode << dendl; if (size) *size = file->fnode.size; if (mtime) *mtime = file->fnode.mtime; return 0; } int BlueFS::lock_file(const string& dirname, const string& filename, FileLock **plock) { std::lock_guard l(lock); dout(10) << __func__ << " " << dirname << "/" << filename << dendl; map::iterator p = dir_map.find(dirname); if (p == dir_map.end()) { dout(20) << __func__ << " dir " << dirname << " not found" << dendl; return -ENOENT; } DirRef dir = p->second; map::iterator q = dir->file_map.find(filename); File *file; if (q == dir->file_map.end()) { dout(20) << __func__ << " dir " << dirname << " (" << dir << ") file " << filename << " not found, creating" << dendl; file = new File; file->fnode.ino = ++ino_last; file->fnode.mtime = ceph_clock_now(); file_map[ino_last] = file; dir->file_map[filename] = file; ++file->refs; log_t.op_file_update(file->fnode); log_t.op_dir_link(dirname, filename, file->fnode.ino); } else { file = q->second.get(); if (file->locked) { dout(10) << __func__ << " already locked" << dendl; return -EBUSY; } } file->locked = true; *plock = new FileLock(file); dout(10) << __func__ << " locked " << file->fnode << " with " << *plock << dendl; return 0; } int BlueFS::unlock_file(FileLock *fl) { std::lock_guard l(lock); dout(10) << __func__ << " " << fl << " on " << fl->file->fnode << dendl; assert(fl->file->locked); fl->file->locked = false; delete fl; return 0; } int BlueFS::readdir(const string& dirname, vector *ls) { std::lock_guard l(lock); dout(10) << __func__ << " " << dirname << dendl; if (dirname.empty()) { // list dirs ls->reserve(dir_map.size() + 2); for (auto& q : dir_map) { ls->push_back(q.first); } } else { // list files in dir map::iterator p = dir_map.find(dirname); if (p == dir_map.end()) { dout(20) << __func__ << " dir " << dirname << " not found" << dendl; return -ENOENT; } DirRef dir = p->second; ls->reserve(dir->file_map.size() + 2); for (auto& q : dir->file_map) { ls->push_back(q.first); } } ls->push_back("."); ls->push_back(".."); return 0; } int BlueFS::unlink(const string& dirname, const string& filename) { std::lock_guard l(lock); dout(10) << __func__ << " " << dirname << "/" << filename << dendl; map::iterator p = dir_map.find(dirname); if (p == dir_map.end()) { dout(20) << __func__ << " dir " << dirname << " not found" << dendl; return -ENOENT; } DirRef dir = p->second; map::iterator q = dir->file_map.find(filename); if (q == dir->file_map.end()) { dout(20) << __func__ << " file " << dirname << "/" << filename << " not found" << dendl; return -ENOENT; } FileRef file = q->second; if (file->locked) { dout(20) << __func__ << " file " << dirname << "/" << filename << " is locked" << dendl; return -EBUSY; } dir->file_map.erase(filename); log_t.op_dir_unlink(dirname, filename); _drop_link(file); return 0; } bool BlueFS::wal_is_rotational() { if (!bdev[BDEV_WAL] || bdev[BDEV_WAL]->is_rotational()) return true; return false; }