X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fos%2Fbluestore%2FNVMEDevice.cc;fp=src%2Fceph%2Fsrc%2Fos%2Fbluestore%2FNVMEDevice.cc;h=2eb278aa0d08c2d6b184dcd3dc739e09406bfd3b;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/os/bluestore/NVMEDevice.cc b/src/ceph/src/os/bluestore/NVMEDevice.cc new file mode 100644 index 0000000..2eb278a --- /dev/null +++ b/src/ceph/src/os/bluestore/NVMEDevice.cc @@ -0,0 +1,1139 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 XSky + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + +#include + +#include "include/stringify.h" +#include "include/types.h" +#include "include/compat.h" +#include "common/align.h" +#include "common/errno.h" +#include "common/debug.h" +#include "common/perf_counters.h" +#include "common/io_priority.h" + +#include "NVMEDevice.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_bdev +#undef dout_prefix +#define dout_prefix *_dout << "bdev(" << sn << ") " + +static constexpr uint16_t data_buffer_default_num = 2048; + +static constexpr uint32_t data_buffer_size = 8192; + +static constexpr uint16_t inline_segment_num = 32; + +static thread_local int queue_id = -1; + +enum { + l_bluestore_nvmedevice_first = 632430, + l_bluestore_nvmedevice_aio_write_lat, + l_bluestore_nvmedevice_read_lat, + l_bluestore_nvmedevice_flush_lat, + l_bluestore_nvmedevice_aio_write_queue_lat, + l_bluestore_nvmedevice_read_queue_lat, + l_bluestore_nvmedevice_flush_queue_lat, + l_bluestore_nvmedevice_queue_ops, + l_bluestore_nvmedevice_polling_lat, + l_bluestore_nvmedevice_buffer_alloc_failed, + l_bluestore_nvmedevice_last +}; + +static void io_complete(void *t, const struct spdk_nvme_cpl *completion); + +int dpdk_thread_adaptor(void *f) +{ + (*static_cast*>(f))(); + return 0; +} + +struct IOSegment { + uint32_t len; + void *addr; +}; + +struct IORequest { + uint16_t cur_seg_idx = 0; + uint16_t nseg; + uint32_t cur_seg_left = 0; + void *inline_segs[inline_segment_num]; + void **extra_segs = nullptr; +}; + +class SharedDriverQueueData { + SharedDriverData *driver; + spdk_nvme_ctrlr *ctrlr; + spdk_nvme_ns *ns; + std::string sn; + uint64_t block_size; + uint32_t sector_size; + uint32_t core_id; + uint32_t queueid; + struct spdk_nvme_qpair *qpair; + std::function run_func; + friend class AioCompletionThread; + + bool aio_stop = false; + void _aio_thread(); + int alloc_buf_from_pool(Task *t, bool write); + + std::atomic_bool queue_empty; + Mutex queue_lock; + Cond queue_cond; + std::queue task_queue; + + Mutex flush_lock; + Cond flush_cond; + std::atomic_int flush_waiters; + std::set flush_waiter_seqs; + + public: + std::atomic_ulong completed_op_seq, queue_op_seq; + std::vector data_buf_mempool; + PerfCounters *logger = nullptr; + + SharedDriverQueueData(SharedDriverData *driver, spdk_nvme_ctrlr *c, spdk_nvme_ns *ns, uint64_t block_size, + const std::string &sn_tag, uint32_t sector_size, uint32_t core, uint32_t queue_id) + : driver(driver), + ctrlr(c), + ns(ns), + sn(sn_tag), + block_size(block_size), + sector_size(sector_size), + core_id(core), + queueid(queue_id), + run_func([this]() { _aio_thread(); }), + queue_empty(false), + queue_lock("NVMEDevice::queue_lock"), + flush_lock("NVMEDevice::flush_lock"), + flush_waiters(0), + completed_op_seq(0), queue_op_seq(0) { + + qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, SPDK_NVME_QPRIO_URGENT); + PerfCountersBuilder b(g_ceph_context, string("NVMEDevice-AIOThread-"+stringify(this)), + l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last); + b.add_time_avg(l_bluestore_nvmedevice_aio_write_lat, "aio_write_lat", "Average write completing latency"); + b.add_time_avg(l_bluestore_nvmedevice_read_lat, "read_lat", "Average read completing latency"); + b.add_time_avg(l_bluestore_nvmedevice_flush_lat, "flush_lat", "Average flush completing latency"); + b.add_u64(l_bluestore_nvmedevice_queue_ops, "queue_ops", "Operations in nvme queue"); + b.add_time_avg(l_bluestore_nvmedevice_polling_lat, "polling_lat", "Average polling latency"); + b.add_time_avg(l_bluestore_nvmedevice_aio_write_queue_lat, "aio_write_queue_lat", "Average queue write request latency"); + b.add_time_avg(l_bluestore_nvmedevice_read_queue_lat, "read_queue_lat", "Average queue read request latency"); + b.add_time_avg(l_bluestore_nvmedevice_flush_queue_lat, "flush_queue_lat", "Average queue flush request latency"); + b.add_u64_counter(l_bluestore_nvmedevice_buffer_alloc_failed, "buffer_alloc_failed", "Alloc data buffer failed count"); + logger = b.create_perf_counters(); + g_ceph_context->get_perfcounters_collection()->add(logger); + } + + void queue_task(Task *t, uint64_t ops = 1) { + queue_op_seq += ops; + Mutex::Locker l(queue_lock); + task_queue.push(t); + if (queue_empty.load()) { + queue_empty = false; + queue_cond.Signal(); + } + } + + void flush_wait() { + uint64_t cur_seq = queue_op_seq.load(); + uint64_t left = cur_seq - completed_op_seq.load(); + if (cur_seq > completed_op_seq) { + // TODO: this may contains read op + dout(10) << __func__ << " existed inflight ops " << left << dendl; + Mutex::Locker l(flush_lock); + ++flush_waiters; + flush_waiter_seqs.insert(cur_seq); + while (cur_seq > completed_op_seq.load()) { + flush_cond.Wait(flush_lock); + } + flush_waiter_seqs.erase(cur_seq); + --flush_waiters; + } + } + + void start() { + int r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast(&run_func), + core_id); + assert(r == 0); + + } + + void stop() { + { + Mutex::Locker l(queue_lock); + aio_stop = true; + queue_cond.Signal(); + } + int r = rte_eal_wait_lcore(core_id); + assert(r == 0); + aio_stop = false; + } + + ~SharedDriverQueueData() { + g_ceph_context->get_perfcounters_collection()->remove(logger); + if (!qpair) { + spdk_nvme_ctrlr_free_io_qpair(qpair); + } + delete logger; + } +}; + +class SharedDriverData { + unsigned id; + uint32_t core_id; + + std::string sn; + spdk_nvme_ctrlr *ctrlr; + spdk_nvme_ns *ns; + uint64_t block_size = 0; + uint32_t sector_size = 0; + uint64_t size = 0; + uint32_t queue_number; + std::vector queues; + + void _aio_start() { + for (auto &&it : queues) + it->start(); + } + void _aio_stop() { + for (auto &&it : queues) + it->stop(); + } + + public: + std::vector registered_devices; + SharedDriverData(unsigned _id, const std::string &sn_tag, + spdk_nvme_ctrlr *c, spdk_nvme_ns *ns) + : id(_id), + sn(sn_tag), + ctrlr(c), + ns(ns) { + int i; + sector_size = spdk_nvme_ns_get_sector_size(ns); + block_size = std::max(CEPH_PAGE_SIZE, sector_size); + size = ((uint64_t)sector_size) * spdk_nvme_ns_get_num_sectors(ns); + + RTE_LCORE_FOREACH_SLAVE(i) { + queues.push_back(new SharedDriverQueueData(this, ctrlr, ns, block_size, sn, sector_size, i, queue_number++)); + } + + _aio_start(); + } + + bool is_equal(const string &tag) const { return sn == tag; } + ~SharedDriverData() { + for (auto p : queues) { + delete p; + } + } + + SharedDriverQueueData *get_queue(uint32_t i) { + return queues.at(i%queue_number); + } + + void register_device(NVMEDevice *device) { + // in case of registered_devices, we stop thread now. + // Because release is really a rare case, we could bear this + _aio_stop(); + registered_devices.push_back(device); + _aio_start(); + } + + void remove_device(NVMEDevice *device) { + _aio_stop(); + std::vector new_devices; + for (auto &&it : registered_devices) { + if (it != device) + new_devices.push_back(it); + } + registered_devices.swap(new_devices); + _aio_start(); + } + + uint64_t get_block_size() { + return block_size; + } + uint64_t get_size() { + return size; + } +}; + +struct Task { + NVMEDevice *device; + IOContext *ctx = nullptr; + IOCommand command; + uint64_t offset; + uint64_t len; + bufferlist write_bl; + std::function fill_cb; + Task *next = nullptr; + int64_t return_code; + ceph::coarse_real_clock::time_point start; + IORequest io_request; + std::mutex lock; + std::condition_variable cond; + SharedDriverQueueData *queue; + Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0) + : device(dev), command(c), offset(off), len(l), + return_code(rc), + start(ceph::coarse_real_clock::now()) {} + ~Task() { + assert(!io_request.nseg); + } + void release_segs(SharedDriverQueueData *queue_data) { + if (io_request.extra_segs) { + for (uint16_t i = 0; i < io_request.nseg; i++) + queue_data->data_buf_mempool.push_back(io_request.extra_segs[i]); + delete io_request.extra_segs; + } else if (io_request.nseg) { + for (uint16_t i = 0; i < io_request.nseg; i++) + queue_data->data_buf_mempool.push_back(io_request.inline_segs[i]); + } + io_request.nseg = 0; + } + + void copy_to_buf(char *buf, uint64_t off, uint64_t len) { + uint64_t copied = 0; + uint64_t left = len; + void **segs = io_request.extra_segs ? io_request.extra_segs : io_request.inline_segs; + uint16_t i = 0; + while (left > 0) { + char *src = static_cast(segs[i++]); + uint64_t need_copy = std::min(left, data_buffer_size-off); + memcpy(buf+copied, src+off, need_copy); + off = 0; + left -= need_copy; + copied += need_copy; + } + } + + void io_wait() { + std::unique_lock l(lock); + cond.wait(l); + } + + void io_wake() { + std::lock_guard l(lock); + cond.notify_all(); + } +}; + +static void data_buf_reset_sgl(void *cb_arg, uint32_t sgl_offset) +{ + Task *t = static_cast(cb_arg); + uint32_t i = sgl_offset / data_buffer_size; + uint32_t offset = i * data_buffer_size; + assert(i <= t->io_request.nseg); + + for (; i < t->io_request.nseg; i++) { + offset += data_buffer_size; + if (offset > sgl_offset) { + if (offset > t->len) + offset = t->len; + break; + } + } + + t->io_request.cur_seg_idx = i; + t->io_request.cur_seg_left = offset - sgl_offset; + return ; +} + +static int data_buf_next_sge(void *cb_arg, void **address, uint32_t *length) +{ + uint32_t size; + void *addr; + Task *t = static_cast(cb_arg); + if (t->io_request.cur_seg_idx >= t->io_request.nseg) { + *length = 0; + *address = 0; + return 0; + } + + addr = t->io_request.extra_segs ? t->io_request.extra_segs[t->io_request.cur_seg_idx] : t->io_request.inline_segs[t->io_request.cur_seg_idx]; + + size = data_buffer_size; + if (t->io_request.cur_seg_idx == t->io_request.nseg - 1) { + uint64_t tail = t->len % data_buffer_size; + if (tail) { + size = (uint32_t) tail; + } + } + + if (t->io_request.cur_seg_left) { + *address = (void *)((uint64_t)addr + size - t->io_request.cur_seg_left); + *length = t->io_request.cur_seg_left; + t->io_request.cur_seg_left = 0; + } else { + *address = addr; + *length = size; + } + + t->io_request.cur_seg_idx++; + return 0; +} + +int SharedDriverQueueData::alloc_buf_from_pool(Task *t, bool write) +{ + uint64_t count = t->len / data_buffer_size; + if (t->len % data_buffer_size) + ++count; + void **segs; + if (count > data_buf_mempool.size()) + return -ENOMEM; + if (count <= inline_segment_num) { + segs = t->io_request.inline_segs; + } else { + t->io_request.extra_segs = new void*[count]; + segs = t->io_request.extra_segs; + } + for (uint16_t i = 0; i < count; i++) { + segs[i] = data_buf_mempool.back(); + data_buf_mempool.pop_back(); + } + t->io_request.nseg = count; + if (write) { + auto blp = t->write_bl.begin(); + uint32_t len = 0; + uint16_t i = 0; + for (; i < count - 1; ++i) { + blp.copy(data_buffer_size, static_cast(segs[i])); + len += data_buffer_size; + } + blp.copy(t->write_bl.length() - len, static_cast(segs[i])); + } + + return 0; +} + +void SharedDriverQueueData::_aio_thread() +{ + dout(1) << __func__ << " start" << dendl; + + if (data_buf_mempool.empty()) { + for (uint16_t i = 0; i < data_buffer_default_num; i++) { + void *b = spdk_zmalloc(data_buffer_size, CEPH_PAGE_SIZE, NULL); + if (!b) { + derr << __func__ << " failed to create memory pool for nvme data buffer" << dendl; + assert(b); + } + data_buf_mempool.push_back(b); + } + } + + Task *t = nullptr; + int r = 0; + uint64_t lba_off, lba_count; + + ceph::coarse_real_clock::time_point cur, start + = ceph::coarse_real_clock::now(); + while (true) { + bool inflight = queue_op_seq.load() - completed_op_seq.load(); + again: + dout(40) << __func__ << " polling" << dendl; + if (inflight) { + if (!spdk_nvme_qpair_process_completions(qpair, g_conf->bluestore_spdk_max_io_completion)) { + dout(30) << __func__ << " idle, have a pause" << dendl; + _mm_pause(); + } + } + + for (; t; t = t->next) { + t->queue = this; + lba_off = t->offset / sector_size; + lba_count = t->len / sector_size; + switch (t->command) { + case IOCommand::WRITE_COMMAND: + { + dout(20) << __func__ << " write command issued " << lba_off << "~" << lba_count << dendl; + r = alloc_buf_from_pool(t, true); + if (r < 0) { + logger->inc(l_bluestore_nvmedevice_buffer_alloc_failed); + goto again; + } + + r = spdk_nvme_ns_cmd_writev( + ns, qpair, lba_off, lba_count, io_complete, t, 0, + data_buf_reset_sgl, data_buf_next_sge); + if (r < 0) { + derr << __func__ << " failed to do write command" << dendl; + t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr; + t->release_segs(this); + delete t; + ceph_abort(); + } + cur = ceph::coarse_real_clock::now(); + auto dur = std::chrono::duration_cast(cur - start); + logger->tinc(l_bluestore_nvmedevice_aio_write_queue_lat, dur); + break; + } + case IOCommand::READ_COMMAND: + { + dout(20) << __func__ << " read command issued " << lba_off << "~" << lba_count << dendl; + r = alloc_buf_from_pool(t, false); + if (r < 0) { + logger->inc(l_bluestore_nvmedevice_buffer_alloc_failed); + goto again; + } + + r = spdk_nvme_ns_cmd_readv( + ns, qpair, lba_off, lba_count, io_complete, t, 0, + data_buf_reset_sgl, data_buf_next_sge); + if (r < 0) { + derr << __func__ << " failed to read" << dendl; + t->release_segs(this); + delete t; + ceph_abort(); + } else { + cur = ceph::coarse_real_clock::now(); + auto dur = std::chrono::duration_cast(cur - start); + logger->tinc(l_bluestore_nvmedevice_read_queue_lat, dur); + } + break; + } + case IOCommand::FLUSH_COMMAND: + { + dout(20) << __func__ << " flush command issueed " << dendl; + r = spdk_nvme_ns_cmd_flush(ns, qpair, io_complete, t); + if (r < 0) { + derr << __func__ << " failed to flush" << dendl; + t->release_segs(this); + delete t; + ceph_abort(); + } else { + cur = ceph::coarse_real_clock::now(); + auto dur = std::chrono::duration_cast(cur - start); + logger->tinc(l_bluestore_nvmedevice_flush_queue_lat, dur); + } + break; + } + } + } + + if (!queue_empty.load()) { + Mutex::Locker l(queue_lock); + if (!task_queue.empty()) { + t = task_queue.front(); + task_queue.pop(); + logger->set(l_bluestore_nvmedevice_queue_ops, task_queue.size()); + } + if (!t) + queue_empty = true; + } else { + if (flush_waiters.load()) { + Mutex::Locker l(flush_lock); + if (*flush_waiter_seqs.begin() <= completed_op_seq.load()) + flush_cond.Signal(); + } + + if (!inflight) { + // be careful, here we need to let each thread reap its own, currently it is done + // by only one dedicatd dpdk thread + if(!queueid) { + for (auto &&it : driver->registered_devices) + it->reap_ioc(); + } + + Mutex::Locker l(queue_lock); + if (queue_empty.load()) { + cur = ceph::coarse_real_clock::now(); + auto dur = std::chrono::duration_cast(cur - start); + logger->tinc(l_bluestore_nvmedevice_polling_lat, dur); + if (aio_stop) + break; + queue_cond.Wait(queue_lock); + start = ceph::coarse_real_clock::now(); + } + } + } + } + assert(data_buf_mempool.size() == data_buffer_default_num); + dout(1) << __func__ << " end" << dendl; +} + +#define dout_subsys ceph_subsys_bdev +#undef dout_prefix +#define dout_prefix *_dout << "bdev " + +class NVMEManager { + public: + struct ProbeContext { + string sn_tag; + NVMEManager *manager; + SharedDriverData *driver; + bool done; + }; + + private: + Mutex lock; + bool init = false; + std::vector shared_driver_datas; + std::thread dpdk_thread; + std::mutex probe_queue_lock; + std::condition_variable probe_queue_cond; + std::list probe_queue; + + public: + NVMEManager() + : lock("NVMEDevice::NVMEManager::lock") {} + int try_get(const string &sn_tag, SharedDriverData **driver); + void register_ctrlr(const string &sn_tag, spdk_nvme_ctrlr *c, struct spdk_pci_device *pci_dev, + SharedDriverData **driver) { + assert(lock.is_locked()); + spdk_nvme_ns *ns; + int num_ns = spdk_nvme_ctrlr_get_num_ns(c); + assert(num_ns >= 1); + if (num_ns > 1) { + dout(0) << __func__ << " namespace count larger than 1, currently only use the first namespace" << dendl; + } + ns = spdk_nvme_ctrlr_get_ns(c, 1); + if (!ns) { + derr << __func__ << " failed to get namespace at 1" << dendl; + ceph_abort(); + } + dout(1) << __func__ << " successfully attach nvme device at" << spdk_pci_device_get_bus(pci_dev) + << ":" << spdk_pci_device_get_dev(pci_dev) << ":" << spdk_pci_device_get_func(pci_dev) << dendl; + + // only support one device per osd now! + assert(shared_driver_datas.empty()); + // index 0 is occured by master thread + shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, sn_tag, c, ns)); + *driver = shared_driver_datas.back(); + } +}; + +static NVMEManager manager; + +static bool probe_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid, struct spdk_nvme_ctrlr_opts *opts) +{ + NVMEManager::ProbeContext *ctx = static_cast(cb_ctx); + char serial_number[128]; + struct spdk_pci_addr pci_addr; + struct spdk_pci_device *pci_dev = NULL; + int result = 0; + + if (trid->trtype != SPDK_NVME_TRANSPORT_PCIE) { + dout(0) << __func__ << " only probe local nvme device" << dendl; + return false; + } + + result = spdk_pci_addr_parse(&pci_addr, trid->traddr); + if (result) { + dout(0) << __func__ << " failed to get pci address from %s, " << trid->traddr << " return value is: %d" << result << dendl; + return false; + } + + pci_dev = spdk_pci_get_device(&pci_addr); + if (!pci_dev) { + dout(0) << __func__ << " failed to get pci device" << dendl; + return false; + } + + dout(0) << __func__ << " found device at bus: " << spdk_pci_device_get_bus(pci_dev) + << ":" << spdk_pci_device_get_dev(pci_dev) << ":" + << spdk_pci_device_get_func(pci_dev) << " vendor:0x" << spdk_pci_device_get_vendor_id(pci_dev) << " device:0x" << spdk_pci_device_get_device_id(pci_dev) + << dendl; + result = spdk_pci_device_get_serial_number(pci_dev, serial_number, 128); + if (result < 0) { + dout(10) << __func__ << " failed to get serial number from %p" << pci_dev << dendl; + return false; + } + + if (ctx->sn_tag.compare(string(serial_number, 16))) { + dout(0) << __func__ << " device serial number (" << ctx->sn_tag << ") not match " << serial_number << dendl; + return false; + } + + return true; +} + +static void attach_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid, + struct spdk_nvme_ctrlr *ctrlr, const struct spdk_nvme_ctrlr_opts *opts) +{ + struct spdk_pci_addr pci_addr; + struct spdk_pci_device *pci_dev = NULL; + + spdk_pci_addr_parse(&pci_addr, trid->traddr); + + pci_dev = spdk_pci_get_device(&pci_addr); + if (!pci_dev) { + dout(0) << __func__ << " failed to get pci device" << dendl; + assert(pci_dev); + } + + NVMEManager::ProbeContext *ctx = static_cast(cb_ctx); + ctx->manager->register_ctrlr(ctx->sn_tag, ctrlr, pci_dev, &ctx->driver); +} + +int NVMEManager::try_get(const string &sn_tag, SharedDriverData **driver) +{ + Mutex::Locker l(lock); + int r = 0; + unsigned long long core_value; + uint32_t core_num = 0; + int m_core_arg = -1; + uint32_t mem_size_arg = g_conf->bluestore_spdk_mem; + char *coremask_arg = (char *)g_conf->bluestore_spdk_coremask.c_str(); + + if (sn_tag.empty()) { + r = -ENOENT; + derr << __func__ << " empty serial number: " << cpp_strerror(r) << dendl; + return r; + } + + core_value = strtoll(coremask_arg, NULL, 16); + for (uint32_t i = 0; i < sizeof(long long) * 8; i++) { + bool tmp = (core_value >> i) & 0x1; + if (tmp) { + core_num += 1; + // select the least signficant bit as the master core + if(m_core_arg < 0) { + m_core_arg = i; + } + } + } + + // at least two cores are needed for using spdk + if (core_num < 2) { + r = -ENOENT; + derr << __func__ << " invalid spdk coremask, at least two cores are needed: " + << cpp_strerror(r) << dendl; + return r; + } + + for (auto &&it : shared_driver_datas) { + if (it->is_equal(sn_tag)) { + *driver = it; + return 0; + } + } + + if (!init) { + init = true; + dpdk_thread = std::thread( + [this, coremask_arg, m_core_arg, mem_size_arg]() { + static struct spdk_env_opts opts; + int r; + + spdk_env_opts_init(&opts); + opts.name = "ceph-osd"; + opts.core_mask = coremask_arg; + opts.dpdk_master_core = m_core_arg; + opts.dpdk_mem_size = mem_size_arg; + spdk_env_init(&opts); + + spdk_nvme_retry_count = g_ceph_context->_conf->bdev_nvme_retry_count; + if (spdk_nvme_retry_count < 0) + spdk_nvme_retry_count = SPDK_NVME_DEFAULT_RETRY_COUNT; + + std::unique_lock l(probe_queue_lock); + while (true) { + if (!probe_queue.empty()) { + ProbeContext* ctxt = probe_queue.front(); + probe_queue.pop_front(); + r = spdk_nvme_probe(NULL, ctxt, probe_cb, attach_cb, NULL); + if (r < 0) { + assert(!ctxt->driver); + derr << __func__ << " device probe nvme failed" << dendl; + } + ctxt->done = true; + probe_queue_cond.notify_all(); + } else { + probe_queue_cond.wait(l); + } + } + } + ); + dpdk_thread.detach(); + } + + ProbeContext ctx = {sn_tag, this, nullptr, false}; + { + std::unique_lock l(probe_queue_lock); + probe_queue.push_back(&ctx); + while (!ctx.done) + probe_queue_cond.wait(l); + } + if (!ctx.driver) + return -1; + *driver = ctx.driver; + + return 0; +} + +void io_complete(void *t, const struct spdk_nvme_cpl *completion) +{ + Task *task = static_cast(t); + IOContext *ctx = task->ctx; + SharedDriverQueueData *queue = task->queue; + + assert(queue != NULL); + assert(ctx != NULL); + ++queue->completed_op_seq; + auto dur = std::chrono::duration_cast( + ceph::coarse_real_clock::now() - task->start); + if (task->command == IOCommand::WRITE_COMMAND) { + queue->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, dur); + assert(!spdk_nvme_cpl_is_error(completion)); + dout(20) << __func__ << " write/zero op successfully, left " + << queue->queue_op_seq - queue->completed_op_seq << dendl; + // check waiting count before doing callback (which may + // destroy this ioc). + if (ctx->priv) { + if (!--ctx->num_running) { + task->device->aio_callback(task->device->aio_callback_priv, ctx->priv); + } + } else { + ctx->try_aio_wake(); + } + task->release_segs(queue); + delete task; + } else if (task->command == IOCommand::READ_COMMAND) { + queue->logger->tinc(l_bluestore_nvmedevice_read_lat, dur); + assert(!spdk_nvme_cpl_is_error(completion)); + dout(20) << __func__ << " read op successfully" << dendl; + task->fill_cb(); + task->release_segs(queue); + // read submitted by AIO + if(!task->return_code) { + if (ctx->priv) { + if (!--ctx->num_running) { + task->device->aio_callback(task->device->aio_callback_priv, ctx->priv); + } + } else { + ctx->try_aio_wake(); + } + delete task; + } else { + task->return_code = 0; + if (!--ctx->num_running) { + task->io_wake(); + } + } + } else { + assert(task->command == IOCommand::FLUSH_COMMAND); + assert(!spdk_nvme_cpl_is_error(completion)); + queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur); + dout(20) << __func__ << " flush op successfully" << dendl; + task->return_code = 0; + } +} + +// ---------------- +#undef dout_prefix +#define dout_prefix *_dout << "bdev(" << name << ") " + +NVMEDevice::NVMEDevice(CephContext* cct, aio_callback_t cb, void *cbpriv) + : BlockDevice(cct), + driver(nullptr), + size(0), + block_size(0), + aio_stop(false), + buffer_lock("NVMEDevice::buffer_lock"), + aio_callback(cb), + aio_callback_priv(cbpriv) +{ +} + + +int NVMEDevice::open(const string& p) +{ + int r = 0; + dout(1) << __func__ << " path " << p << dendl; + + string serial_number; + int fd = ::open(p.c_str(), O_RDONLY); + if (fd < 0) { + r = -errno; + derr << __func__ << " unable to open " << p << ": " << cpp_strerror(r) + << dendl; + return r; + } + char buf[100]; + r = ::read(fd, buf, sizeof(buf)); + VOID_TEMP_FAILURE_RETRY(::close(fd)); + fd = -1; // defensive + if (r <= 0) { + if (r == 0) { + r = -EINVAL; + } else { + r = -errno; + } + derr << __func__ << " unable to read " << p << ": " << cpp_strerror(r) << dendl; + return r; + } + /* scan buf from the beginning with isxdigit. */ + int i = 0; + while (i < r && isxdigit(buf[i])) { + i++; + } + serial_number = string(buf, i); + r = manager.try_get(serial_number, &driver); + if (r < 0) { + derr << __func__ << " failed to get nvme device with sn " << serial_number << dendl; + return r; + } + + driver->register_device(this); + block_size = driver->get_block_size(); + size = driver->get_size(); + name = serial_number; + + //nvme is non-rotational device. + rotational = false; + + // round size down to an even block + size &= ~(block_size - 1); + + dout(1) << __func__ << " size " << size << " (" << pretty_si_t(size) << "B)" + << " block_size " << block_size << " (" << pretty_si_t(block_size) + << "B)" << dendl; + + return 0; +} + +void NVMEDevice::close() +{ + dout(1) << __func__ << dendl; + + name.clear(); + driver->remove_device(this); + + dout(1) << __func__ << " end" << dendl; +} + +int NVMEDevice::collect_metadata(string prefix, map *pm) const +{ + (*pm)[prefix + "rotational"] = "0"; + (*pm)[prefix + "size"] = stringify(get_size()); + (*pm)[prefix + "block_size"] = stringify(get_block_size()); + (*pm)[prefix + "driver"] = "NVMEDevice"; + (*pm)[prefix + "type"] = "nvme"; + (*pm)[prefix + "access_mode"] = "spdk"; + (*pm)[prefix + "nvme_serial_number"] = name; + + return 0; +} + +int NVMEDevice::flush() +{ + dout(10) << __func__ << " start" << dendl; + auto start = ceph::coarse_real_clock::now(); + + if(queue_id == -1) + queue_id = ceph_gettid(); + SharedDriverQueueData *queue = driver->get_queue(queue_id); + assert(queue != NULL); + queue->flush_wait(); + auto dur = std::chrono::duration_cast( + ceph::coarse_real_clock::now() - start); + queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur); + return 0; +} + +void NVMEDevice::aio_submit(IOContext *ioc) +{ + dout(20) << __func__ << " ioc " << ioc << " pending " + << ioc->num_pending.load() << " running " + << ioc->num_running.load() << dendl; + int pending = ioc->num_pending.load(); + Task *t = static_cast(ioc->nvme_task_first); + if (pending && t) { + ioc->num_running += pending; + ioc->num_pending -= pending; + assert(ioc->num_pending.load() == 0); // we should be only thread doing this + // Only need to push the first entry + if(queue_id == -1) + queue_id = ceph_gettid(); + driver->get_queue(queue_id)->queue_task(t, pending); + ioc->nvme_task_first = ioc->nvme_task_last = nullptr; + } +} + +int NVMEDevice::aio_write( + uint64_t off, + bufferlist &bl, + IOContext *ioc, + bool buffered) +{ + uint64_t len = bl.length(); + dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc + << " buffered " << buffered << dendl; + assert(off % block_size == 0); + assert(len % block_size == 0); + assert(len > 0); + assert(off < size); + assert(off + len <= size); + + Task *t = new Task(this, IOCommand::WRITE_COMMAND, off, len); + + // TODO: if upper layer alloc memory with known physical address, + // we can reduce this copy + t->write_bl = std::move(bl); + + if (buffered) { + // Only need to push the first entry + if(queue_id == -1) + queue_id = ceph_gettid(); + driver->get_queue(queue_id)->queue_task(t); + } else { + t->ctx = ioc; + Task *first = static_cast(ioc->nvme_task_first); + Task *last = static_cast(ioc->nvme_task_last); + if (last) + last->next = t; + if (!first) + ioc->nvme_task_first = t; + ioc->nvme_task_last = t; + ++ioc->num_pending; + } + + dout(5) << __func__ << " " << off << "~" << len << dendl; + + return 0; +} + +int NVMEDevice::write(uint64_t off, bufferlist &bl, bool buffered) +{ + // FIXME: there is presumably a more efficient way to do this... + IOContext ioc(cct, NULL); + aio_write(off, bl, &ioc, buffered); + ioc.aio_wait(); + return 0; +} + +int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl, + IOContext *ioc, + bool buffered) +{ + dout(5) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl; + assert(off % block_size == 0); + assert(len % block_size == 0); + assert(len > 0); + assert(off < size); + assert(off + len <= size); + + Task *t = new Task(this, IOCommand::READ_COMMAND, off, len, 1); + bufferptr p = buffer::create_page_aligned(len); + int r = 0; + t->ctx = ioc; + char *buf = p.c_str(); + t->fill_cb = [buf, t]() { + t->copy_to_buf(buf, 0, t->len); + }; + ++ioc->num_running; + if(queue_id == -1) + queue_id = ceph_gettid(); + driver->get_queue(queue_id)->queue_task(t); + + while(t->return_code > 0) { + t->io_wait(); + } + pbl->push_back(std::move(p)); + r = t->return_code; + delete t; + return r; +} + +int NVMEDevice::aio_read( + uint64_t off, + uint64_t len, + bufferlist *pbl, + IOContext *ioc) +{ + dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl; + assert(off % block_size == 0); + assert(len % block_size == 0); + assert(len > 0); + assert(off < size); + assert(off + len <= size); + + Task *t = new Task(this, IOCommand::READ_COMMAND, off, len); + + bufferptr p = buffer::create_page_aligned(len); + pbl->append(p); + t->ctx = ioc; + char *buf = p.c_str(); + t->fill_cb = [buf, t]() { + t->copy_to_buf(buf, 0, t->len); + }; + + Task *first = static_cast(ioc->nvme_task_first); + Task *last = static_cast(ioc->nvme_task_last); + if (last) + last->next = t; + if (!first) + ioc->nvme_task_first = t; + ioc->nvme_task_last = t; + ++ioc->num_pending; + + return 0; +} + +int NVMEDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered) +{ + assert(len > 0); + assert(off < size); + assert(off + len <= size); + + uint64_t aligned_off = align_down(off, block_size); + uint64_t aligned_len = align_up(off+len, block_size) - aligned_off; + dout(5) << __func__ << " " << off << "~" << len + << " aligned " << aligned_off << "~" << aligned_len << dendl; + IOContext ioc(g_ceph_context, nullptr); + Task *t = new Task(this, IOCommand::READ_COMMAND, aligned_off, aligned_len, 1); + int r = 0; + t->ctx = &ioc; + t->fill_cb = [buf, t, off, len]() { + t->copy_to_buf(buf, off-t->offset, len); + }; + ++ioc.num_running; + if(queue_id == -1) + queue_id = ceph_gettid(); + driver->get_queue(queue_id)->queue_task(t); + + while(t->return_code > 0) { + t->io_wait(); + } + r = t->return_code; + delete t; + return r; +} + +int NVMEDevice::invalidate_cache(uint64_t off, uint64_t len) +{ + dout(5) << __func__ << " " << off << "~" << len << dendl; + return 0; +}