1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 // vim: ts=8 sw=2 smarttab
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2015 XSky <haomai@xsky.com>
9 * Author: Haomai Wang <haomaiwang@gmail.com>
11 * This is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License version 2.1, as published by the Free Software
14 * Foundation. See file COPYING.
20 #include <sys/types.h>
29 #include <xmmintrin.h>
31 #include <spdk/nvme.h>
33 #include <rte_lcore.h>
35 #include "include/stringify.h"
36 #include "include/types.h"
37 #include "include/compat.h"
38 #include "common/align.h"
39 #include "common/errno.h"
40 #include "common/debug.h"
41 #include "common/perf_counters.h"
42 #include "common/io_priority.h"
44 #include "NVMEDevice.h"
46 #define dout_context g_ceph_context
47 #define dout_subsys ceph_subsys_bdev
49 #define dout_prefix *_dout << "bdev(" << sn << ") "
51 static constexpr uint16_t data_buffer_default_num = 2048;
53 static constexpr uint32_t data_buffer_size = 8192;
55 static constexpr uint16_t inline_segment_num = 32;
57 static thread_local int queue_id = -1;
60 l_bluestore_nvmedevice_first = 632430,
61 l_bluestore_nvmedevice_aio_write_lat,
62 l_bluestore_nvmedevice_read_lat,
63 l_bluestore_nvmedevice_flush_lat,
64 l_bluestore_nvmedevice_aio_write_queue_lat,
65 l_bluestore_nvmedevice_read_queue_lat,
66 l_bluestore_nvmedevice_flush_queue_lat,
67 l_bluestore_nvmedevice_queue_ops,
68 l_bluestore_nvmedevice_polling_lat,
69 l_bluestore_nvmedevice_buffer_alloc_failed,
70 l_bluestore_nvmedevice_last
73 static void io_complete(void *t, const struct spdk_nvme_cpl *completion);
75 int dpdk_thread_adaptor(void *f)
77 (*static_cast<std::function<void ()>*>(f))();
87 uint16_t cur_seg_idx = 0;
89 uint32_t cur_seg_left = 0;
90 void *inline_segs[inline_segment_num];
91 void **extra_segs = nullptr;
94 class SharedDriverQueueData {
95 SharedDriverData *driver;
96 spdk_nvme_ctrlr *ctrlr;
100 uint32_t sector_size;
103 struct spdk_nvme_qpair *qpair;
104 std::function<void ()> run_func;
105 friend class AioCompletionThread;
107 bool aio_stop = false;
109 int alloc_buf_from_pool(Task *t, bool write);
111 std::atomic_bool queue_empty;
114 std::queue<Task*> task_queue;
118 std::atomic_int flush_waiters;
119 std::set<uint64_t> flush_waiter_seqs;
122 std::atomic_ulong completed_op_seq, queue_op_seq;
123 std::vector<void*> data_buf_mempool;
124 PerfCounters *logger = nullptr;
126 SharedDriverQueueData(SharedDriverData *driver, spdk_nvme_ctrlr *c, spdk_nvme_ns *ns, uint64_t block_size,
127 const std::string &sn_tag, uint32_t sector_size, uint32_t core, uint32_t queue_id)
132 block_size(block_size),
133 sector_size(sector_size),
136 run_func([this]() { _aio_thread(); }),
138 queue_lock("NVMEDevice::queue_lock"),
139 flush_lock("NVMEDevice::flush_lock"),
141 completed_op_seq(0), queue_op_seq(0) {
143 qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, SPDK_NVME_QPRIO_URGENT);
144 PerfCountersBuilder b(g_ceph_context, string("NVMEDevice-AIOThread-"+stringify(this)),
145 l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last);
146 b.add_time_avg(l_bluestore_nvmedevice_aio_write_lat, "aio_write_lat", "Average write completing latency");
147 b.add_time_avg(l_bluestore_nvmedevice_read_lat, "read_lat", "Average read completing latency");
148 b.add_time_avg(l_bluestore_nvmedevice_flush_lat, "flush_lat", "Average flush completing latency");
149 b.add_u64(l_bluestore_nvmedevice_queue_ops, "queue_ops", "Operations in nvme queue");
150 b.add_time_avg(l_bluestore_nvmedevice_polling_lat, "polling_lat", "Average polling latency");
151 b.add_time_avg(l_bluestore_nvmedevice_aio_write_queue_lat, "aio_write_queue_lat", "Average queue write request latency");
152 b.add_time_avg(l_bluestore_nvmedevice_read_queue_lat, "read_queue_lat", "Average queue read request latency");
153 b.add_time_avg(l_bluestore_nvmedevice_flush_queue_lat, "flush_queue_lat", "Average queue flush request latency");
154 b.add_u64_counter(l_bluestore_nvmedevice_buffer_alloc_failed, "buffer_alloc_failed", "Alloc data buffer failed count");
155 logger = b.create_perf_counters();
156 g_ceph_context->get_perfcounters_collection()->add(logger);
159 void queue_task(Task *t, uint64_t ops = 1) {
161 Mutex::Locker l(queue_lock);
163 if (queue_empty.load()) {
170 uint64_t cur_seq = queue_op_seq.load();
171 uint64_t left = cur_seq - completed_op_seq.load();
172 if (cur_seq > completed_op_seq) {
173 // TODO: this may contains read op
174 dout(10) << __func__ << " existed inflight ops " << left << dendl;
175 Mutex::Locker l(flush_lock);
177 flush_waiter_seqs.insert(cur_seq);
178 while (cur_seq > completed_op_seq.load()) {
179 flush_cond.Wait(flush_lock);
181 flush_waiter_seqs.erase(cur_seq);
187 int r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&run_func),
195 Mutex::Locker l(queue_lock);
199 int r = rte_eal_wait_lcore(core_id);
204 ~SharedDriverQueueData() {
205 g_ceph_context->get_perfcounters_collection()->remove(logger);
207 spdk_nvme_ctrlr_free_io_qpair(qpair);
213 class SharedDriverData {
218 spdk_nvme_ctrlr *ctrlr;
220 uint64_t block_size = 0;
221 uint32_t sector_size = 0;
223 uint32_t queue_number;
224 std::vector<SharedDriverQueueData*> queues;
227 for (auto &&it : queues)
231 for (auto &&it : queues)
236 std::vector<NVMEDevice*> registered_devices;
237 SharedDriverData(unsigned _id, const std::string &sn_tag,
238 spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
244 sector_size = spdk_nvme_ns_get_sector_size(ns);
245 block_size = std::max(CEPH_PAGE_SIZE, sector_size);
246 size = ((uint64_t)sector_size) * spdk_nvme_ns_get_num_sectors(ns);
248 RTE_LCORE_FOREACH_SLAVE(i) {
249 queues.push_back(new SharedDriverQueueData(this, ctrlr, ns, block_size, sn, sector_size, i, queue_number++));
255 bool is_equal(const string &tag) const { return sn == tag; }
256 ~SharedDriverData() {
257 for (auto p : queues) {
262 SharedDriverQueueData *get_queue(uint32_t i) {
263 return queues.at(i%queue_number);
266 void register_device(NVMEDevice *device) {
267 // in case of registered_devices, we stop thread now.
268 // Because release is really a rare case, we could bear this
270 registered_devices.push_back(device);
274 void remove_device(NVMEDevice *device) {
276 std::vector<NVMEDevice*> new_devices;
277 for (auto &&it : registered_devices) {
279 new_devices.push_back(it);
281 registered_devices.swap(new_devices);
285 uint64_t get_block_size() {
288 uint64_t get_size() {
295 IOContext *ctx = nullptr;
300 std::function<void()> fill_cb;
301 Task *next = nullptr;
303 ceph::coarse_real_clock::time_point start;
304 IORequest io_request;
306 std::condition_variable cond;
307 SharedDriverQueueData *queue;
308 Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0)
309 : device(dev), command(c), offset(off), len(l),
311 start(ceph::coarse_real_clock::now()) {}
313 assert(!io_request.nseg);
315 void release_segs(SharedDriverQueueData *queue_data) {
316 if (io_request.extra_segs) {
317 for (uint16_t i = 0; i < io_request.nseg; i++)
318 queue_data->data_buf_mempool.push_back(io_request.extra_segs[i]);
319 delete io_request.extra_segs;
320 } else if (io_request.nseg) {
321 for (uint16_t i = 0; i < io_request.nseg; i++)
322 queue_data->data_buf_mempool.push_back(io_request.inline_segs[i]);
327 void copy_to_buf(char *buf, uint64_t off, uint64_t len) {
330 void **segs = io_request.extra_segs ? io_request.extra_segs : io_request.inline_segs;
333 char *src = static_cast<char*>(segs[i++]);
334 uint64_t need_copy = std::min(left, data_buffer_size-off);
335 memcpy(buf+copied, src+off, need_copy);
343 std::unique_lock<std::mutex> l(lock);
348 std::lock_guard<std::mutex> l(lock);
353 static void data_buf_reset_sgl(void *cb_arg, uint32_t sgl_offset)
355 Task *t = static_cast<Task*>(cb_arg);
356 uint32_t i = sgl_offset / data_buffer_size;
357 uint32_t offset = i * data_buffer_size;
358 assert(i <= t->io_request.nseg);
360 for (; i < t->io_request.nseg; i++) {
361 offset += data_buffer_size;
362 if (offset > sgl_offset) {
369 t->io_request.cur_seg_idx = i;
370 t->io_request.cur_seg_left = offset - sgl_offset;
374 static int data_buf_next_sge(void *cb_arg, void **address, uint32_t *length)
378 Task *t = static_cast<Task*>(cb_arg);
379 if (t->io_request.cur_seg_idx >= t->io_request.nseg) {
385 addr = t->io_request.extra_segs ? t->io_request.extra_segs[t->io_request.cur_seg_idx] : t->io_request.inline_segs[t->io_request.cur_seg_idx];
387 size = data_buffer_size;
388 if (t->io_request.cur_seg_idx == t->io_request.nseg - 1) {
389 uint64_t tail = t->len % data_buffer_size;
391 size = (uint32_t) tail;
395 if (t->io_request.cur_seg_left) {
396 *address = (void *)((uint64_t)addr + size - t->io_request.cur_seg_left);
397 *length = t->io_request.cur_seg_left;
398 t->io_request.cur_seg_left = 0;
404 t->io_request.cur_seg_idx++;
408 int SharedDriverQueueData::alloc_buf_from_pool(Task *t, bool write)
410 uint64_t count = t->len / data_buffer_size;
411 if (t->len % data_buffer_size)
414 if (count > data_buf_mempool.size())
416 if (count <= inline_segment_num) {
417 segs = t->io_request.inline_segs;
419 t->io_request.extra_segs = new void*[count];
420 segs = t->io_request.extra_segs;
422 for (uint16_t i = 0; i < count; i++) {
423 segs[i] = data_buf_mempool.back();
424 data_buf_mempool.pop_back();
426 t->io_request.nseg = count;
428 auto blp = t->write_bl.begin();
431 for (; i < count - 1; ++i) {
432 blp.copy(data_buffer_size, static_cast<char*>(segs[i]));
433 len += data_buffer_size;
435 blp.copy(t->write_bl.length() - len, static_cast<char*>(segs[i]));
441 void SharedDriverQueueData::_aio_thread()
443 dout(1) << __func__ << " start" << dendl;
445 if (data_buf_mempool.empty()) {
446 for (uint16_t i = 0; i < data_buffer_default_num; i++) {
447 void *b = spdk_zmalloc(data_buffer_size, CEPH_PAGE_SIZE, NULL);
449 derr << __func__ << " failed to create memory pool for nvme data buffer" << dendl;
452 data_buf_mempool.push_back(b);
458 uint64_t lba_off, lba_count;
460 ceph::coarse_real_clock::time_point cur, start
461 = ceph::coarse_real_clock::now();
463 bool inflight = queue_op_seq.load() - completed_op_seq.load();
465 dout(40) << __func__ << " polling" << dendl;
467 if (!spdk_nvme_qpair_process_completions(qpair, g_conf->bluestore_spdk_max_io_completion)) {
468 dout(30) << __func__ << " idle, have a pause" << dendl;
473 for (; t; t = t->next) {
475 lba_off = t->offset / sector_size;
476 lba_count = t->len / sector_size;
477 switch (t->command) {
478 case IOCommand::WRITE_COMMAND:
480 dout(20) << __func__ << " write command issued " << lba_off << "~" << lba_count << dendl;
481 r = alloc_buf_from_pool(t, true);
483 logger->inc(l_bluestore_nvmedevice_buffer_alloc_failed);
487 r = spdk_nvme_ns_cmd_writev(
488 ns, qpair, lba_off, lba_count, io_complete, t, 0,
489 data_buf_reset_sgl, data_buf_next_sge);
491 derr << __func__ << " failed to do write command" << dendl;
492 t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr;
493 t->release_segs(this);
497 cur = ceph::coarse_real_clock::now();
498 auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
499 logger->tinc(l_bluestore_nvmedevice_aio_write_queue_lat, dur);
502 case IOCommand::READ_COMMAND:
504 dout(20) << __func__ << " read command issued " << lba_off << "~" << lba_count << dendl;
505 r = alloc_buf_from_pool(t, false);
507 logger->inc(l_bluestore_nvmedevice_buffer_alloc_failed);
511 r = spdk_nvme_ns_cmd_readv(
512 ns, qpair, lba_off, lba_count, io_complete, t, 0,
513 data_buf_reset_sgl, data_buf_next_sge);
515 derr << __func__ << " failed to read" << dendl;
516 t->release_segs(this);
520 cur = ceph::coarse_real_clock::now();
521 auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
522 logger->tinc(l_bluestore_nvmedevice_read_queue_lat, dur);
526 case IOCommand::FLUSH_COMMAND:
528 dout(20) << __func__ << " flush command issueed " << dendl;
529 r = spdk_nvme_ns_cmd_flush(ns, qpair, io_complete, t);
531 derr << __func__ << " failed to flush" << dendl;
532 t->release_segs(this);
536 cur = ceph::coarse_real_clock::now();
537 auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
538 logger->tinc(l_bluestore_nvmedevice_flush_queue_lat, dur);
545 if (!queue_empty.load()) {
546 Mutex::Locker l(queue_lock);
547 if (!task_queue.empty()) {
548 t = task_queue.front();
550 logger->set(l_bluestore_nvmedevice_queue_ops, task_queue.size());
555 if (flush_waiters.load()) {
556 Mutex::Locker l(flush_lock);
557 if (*flush_waiter_seqs.begin() <= completed_op_seq.load())
562 // be careful, here we need to let each thread reap its own, currently it is done
563 // by only one dedicatd dpdk thread
565 for (auto &&it : driver->registered_devices)
569 Mutex::Locker l(queue_lock);
570 if (queue_empty.load()) {
571 cur = ceph::coarse_real_clock::now();
572 auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
573 logger->tinc(l_bluestore_nvmedevice_polling_lat, dur);
576 queue_cond.Wait(queue_lock);
577 start = ceph::coarse_real_clock::now();
582 assert(data_buf_mempool.size() == data_buffer_default_num);
583 dout(1) << __func__ << " end" << dendl;
586 #define dout_subsys ceph_subsys_bdev
588 #define dout_prefix *_dout << "bdev "
592 struct ProbeContext {
594 NVMEManager *manager;
595 SharedDriverData *driver;
602 std::vector<SharedDriverData*> shared_driver_datas;
603 std::thread dpdk_thread;
604 std::mutex probe_queue_lock;
605 std::condition_variable probe_queue_cond;
606 std::list<ProbeContext*> probe_queue;
610 : lock("NVMEDevice::NVMEManager::lock") {}
611 int try_get(const string &sn_tag, SharedDriverData **driver);
612 void register_ctrlr(const string &sn_tag, spdk_nvme_ctrlr *c, struct spdk_pci_device *pci_dev,
613 SharedDriverData **driver) {
614 assert(lock.is_locked());
616 int num_ns = spdk_nvme_ctrlr_get_num_ns(c);
619 dout(0) << __func__ << " namespace count larger than 1, currently only use the first namespace" << dendl;
621 ns = spdk_nvme_ctrlr_get_ns(c, 1);
623 derr << __func__ << " failed to get namespace at 1" << dendl;
626 dout(1) << __func__ << " successfully attach nvme device at" << spdk_pci_device_get_bus(pci_dev)
627 << ":" << spdk_pci_device_get_dev(pci_dev) << ":" << spdk_pci_device_get_func(pci_dev) << dendl;
629 // only support one device per osd now!
630 assert(shared_driver_datas.empty());
631 // index 0 is occured by master thread
632 shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, sn_tag, c, ns));
633 *driver = shared_driver_datas.back();
637 static NVMEManager manager;
639 static bool probe_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid, struct spdk_nvme_ctrlr_opts *opts)
641 NVMEManager::ProbeContext *ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
642 char serial_number[128];
643 struct spdk_pci_addr pci_addr;
644 struct spdk_pci_device *pci_dev = NULL;
647 if (trid->trtype != SPDK_NVME_TRANSPORT_PCIE) {
648 dout(0) << __func__ << " only probe local nvme device" << dendl;
652 result = spdk_pci_addr_parse(&pci_addr, trid->traddr);
654 dout(0) << __func__ << " failed to get pci address from %s, " << trid->traddr << " return value is: %d" << result << dendl;
658 pci_dev = spdk_pci_get_device(&pci_addr);
660 dout(0) << __func__ << " failed to get pci device" << dendl;
664 dout(0) << __func__ << " found device at bus: " << spdk_pci_device_get_bus(pci_dev)
665 << ":" << spdk_pci_device_get_dev(pci_dev) << ":"
666 << spdk_pci_device_get_func(pci_dev) << " vendor:0x" << spdk_pci_device_get_vendor_id(pci_dev) << " device:0x" << spdk_pci_device_get_device_id(pci_dev)
668 result = spdk_pci_device_get_serial_number(pci_dev, serial_number, 128);
670 dout(10) << __func__ << " failed to get serial number from %p" << pci_dev << dendl;
674 if (ctx->sn_tag.compare(string(serial_number, 16))) {
675 dout(0) << __func__ << " device serial number (" << ctx->sn_tag << ") not match " << serial_number << dendl;
682 static void attach_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid,
683 struct spdk_nvme_ctrlr *ctrlr, const struct spdk_nvme_ctrlr_opts *opts)
685 struct spdk_pci_addr pci_addr;
686 struct spdk_pci_device *pci_dev = NULL;
688 spdk_pci_addr_parse(&pci_addr, trid->traddr);
690 pci_dev = spdk_pci_get_device(&pci_addr);
692 dout(0) << __func__ << " failed to get pci device" << dendl;
696 NVMEManager::ProbeContext *ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
697 ctx->manager->register_ctrlr(ctx->sn_tag, ctrlr, pci_dev, &ctx->driver);
700 int NVMEManager::try_get(const string &sn_tag, SharedDriverData **driver)
702 Mutex::Locker l(lock);
704 unsigned long long core_value;
705 uint32_t core_num = 0;
707 uint32_t mem_size_arg = g_conf->bluestore_spdk_mem;
708 char *coremask_arg = (char *)g_conf->bluestore_spdk_coremask.c_str();
710 if (sn_tag.empty()) {
712 derr << __func__ << " empty serial number: " << cpp_strerror(r) << dendl;
716 core_value = strtoll(coremask_arg, NULL, 16);
717 for (uint32_t i = 0; i < sizeof(long long) * 8; i++) {
718 bool tmp = (core_value >> i) & 0x1;
721 // select the least signficant bit as the master core
728 // at least two cores are needed for using spdk
731 derr << __func__ << " invalid spdk coremask, at least two cores are needed: "
732 << cpp_strerror(r) << dendl;
736 for (auto &&it : shared_driver_datas) {
737 if (it->is_equal(sn_tag)) {
745 dpdk_thread = std::thread(
746 [this, coremask_arg, m_core_arg, mem_size_arg]() {
747 static struct spdk_env_opts opts;
750 spdk_env_opts_init(&opts);
751 opts.name = "ceph-osd";
752 opts.core_mask = coremask_arg;
753 opts.dpdk_master_core = m_core_arg;
754 opts.dpdk_mem_size = mem_size_arg;
755 spdk_env_init(&opts);
757 spdk_nvme_retry_count = g_ceph_context->_conf->bdev_nvme_retry_count;
758 if (spdk_nvme_retry_count < 0)
759 spdk_nvme_retry_count = SPDK_NVME_DEFAULT_RETRY_COUNT;
761 std::unique_lock<std::mutex> l(probe_queue_lock);
763 if (!probe_queue.empty()) {
764 ProbeContext* ctxt = probe_queue.front();
765 probe_queue.pop_front();
766 r = spdk_nvme_probe(NULL, ctxt, probe_cb, attach_cb, NULL);
768 assert(!ctxt->driver);
769 derr << __func__ << " device probe nvme failed" << dendl;
772 probe_queue_cond.notify_all();
774 probe_queue_cond.wait(l);
779 dpdk_thread.detach();
782 ProbeContext ctx = {sn_tag, this, nullptr, false};
784 std::unique_lock<std::mutex> l(probe_queue_lock);
785 probe_queue.push_back(&ctx);
787 probe_queue_cond.wait(l);
791 *driver = ctx.driver;
796 void io_complete(void *t, const struct spdk_nvme_cpl *completion)
798 Task *task = static_cast<Task*>(t);
799 IOContext *ctx = task->ctx;
800 SharedDriverQueueData *queue = task->queue;
802 assert(queue != NULL);
804 ++queue->completed_op_seq;
805 auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(
806 ceph::coarse_real_clock::now() - task->start);
807 if (task->command == IOCommand::WRITE_COMMAND) {
808 queue->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, dur);
809 assert(!spdk_nvme_cpl_is_error(completion));
810 dout(20) << __func__ << " write/zero op successfully, left "
811 << queue->queue_op_seq - queue->completed_op_seq << dendl;
812 // check waiting count before doing callback (which may
813 // destroy this ioc).
815 if (!--ctx->num_running) {
816 task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
821 task->release_segs(queue);
823 } else if (task->command == IOCommand::READ_COMMAND) {
824 queue->logger->tinc(l_bluestore_nvmedevice_read_lat, dur);
825 assert(!spdk_nvme_cpl_is_error(completion));
826 dout(20) << __func__ << " read op successfully" << dendl;
828 task->release_segs(queue);
829 // read submitted by AIO
830 if(!task->return_code) {
832 if (!--ctx->num_running) {
833 task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
840 task->return_code = 0;
841 if (!--ctx->num_running) {
846 assert(task->command == IOCommand::FLUSH_COMMAND);
847 assert(!spdk_nvme_cpl_is_error(completion));
848 queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
849 dout(20) << __func__ << " flush op successfully" << dendl;
850 task->return_code = 0;
856 #define dout_prefix *_dout << "bdev(" << name << ") "
858 NVMEDevice::NVMEDevice(CephContext* cct, aio_callback_t cb, void *cbpriv)
864 buffer_lock("NVMEDevice::buffer_lock"),
866 aio_callback_priv(cbpriv)
871 int NVMEDevice::open(const string& p)
874 dout(1) << __func__ << " path " << p << dendl;
876 string serial_number;
877 int fd = ::open(p.c_str(), O_RDONLY);
880 derr << __func__ << " unable to open " << p << ": " << cpp_strerror(r)
885 r = ::read(fd, buf, sizeof(buf));
886 VOID_TEMP_FAILURE_RETRY(::close(fd));
887 fd = -1; // defensive
894 derr << __func__ << " unable to read " << p << ": " << cpp_strerror(r) << dendl;
897 /* scan buf from the beginning with isxdigit. */
899 while (i < r && isxdigit(buf[i])) {
902 serial_number = string(buf, i);
903 r = manager.try_get(serial_number, &driver);
905 derr << __func__ << " failed to get nvme device with sn " << serial_number << dendl;
909 driver->register_device(this);
910 block_size = driver->get_block_size();
911 size = driver->get_size();
912 name = serial_number;
914 //nvme is non-rotational device.
917 // round size down to an even block
918 size &= ~(block_size - 1);
920 dout(1) << __func__ << " size " << size << " (" << pretty_si_t(size) << "B)"
921 << " block_size " << block_size << " (" << pretty_si_t(block_size)
927 void NVMEDevice::close()
929 dout(1) << __func__ << dendl;
932 driver->remove_device(this);
934 dout(1) << __func__ << " end" << dendl;
937 int NVMEDevice::collect_metadata(string prefix, map<string,string> *pm) const
939 (*pm)[prefix + "rotational"] = "0";
940 (*pm)[prefix + "size"] = stringify(get_size());
941 (*pm)[prefix + "block_size"] = stringify(get_block_size());
942 (*pm)[prefix + "driver"] = "NVMEDevice";
943 (*pm)[prefix + "type"] = "nvme";
944 (*pm)[prefix + "access_mode"] = "spdk";
945 (*pm)[prefix + "nvme_serial_number"] = name;
950 int NVMEDevice::flush()
952 dout(10) << __func__ << " start" << dendl;
953 auto start = ceph::coarse_real_clock::now();
956 queue_id = ceph_gettid();
957 SharedDriverQueueData *queue = driver->get_queue(queue_id);
958 assert(queue != NULL);
960 auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(
961 ceph::coarse_real_clock::now() - start);
962 queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
966 void NVMEDevice::aio_submit(IOContext *ioc)
968 dout(20) << __func__ << " ioc " << ioc << " pending "
969 << ioc->num_pending.load() << " running "
970 << ioc->num_running.load() << dendl;
971 int pending = ioc->num_pending.load();
972 Task *t = static_cast<Task*>(ioc->nvme_task_first);
974 ioc->num_running += pending;
975 ioc->num_pending -= pending;
976 assert(ioc->num_pending.load() == 0); // we should be only thread doing this
977 // Only need to push the first entry
979 queue_id = ceph_gettid();
980 driver->get_queue(queue_id)->queue_task(t, pending);
981 ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
985 int NVMEDevice::aio_write(
991 uint64_t len = bl.length();
992 dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc
993 << " buffered " << buffered << dendl;
994 assert(off % block_size == 0);
995 assert(len % block_size == 0);
998 assert(off + len <= size);
1000 Task *t = new Task(this, IOCommand::WRITE_COMMAND, off, len);
1002 // TODO: if upper layer alloc memory with known physical address,
1003 // we can reduce this copy
1004 t->write_bl = std::move(bl);
1007 // Only need to push the first entry
1009 queue_id = ceph_gettid();
1010 driver->get_queue(queue_id)->queue_task(t);
1013 Task *first = static_cast<Task*>(ioc->nvme_task_first);
1014 Task *last = static_cast<Task*>(ioc->nvme_task_last);
1018 ioc->nvme_task_first = t;
1019 ioc->nvme_task_last = t;
1023 dout(5) << __func__ << " " << off << "~" << len << dendl;
1028 int NVMEDevice::write(uint64_t off, bufferlist &bl, bool buffered)
1030 // FIXME: there is presumably a more efficient way to do this...
1031 IOContext ioc(cct, NULL);
1032 aio_write(off, bl, &ioc, buffered);
1037 int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
1041 dout(5) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
1042 assert(off % block_size == 0);
1043 assert(len % block_size == 0);
1046 assert(off + len <= size);
1048 Task *t = new Task(this, IOCommand::READ_COMMAND, off, len, 1);
1049 bufferptr p = buffer::create_page_aligned(len);
1052 char *buf = p.c_str();
1053 t->fill_cb = [buf, t]() {
1054 t->copy_to_buf(buf, 0, t->len);
1058 queue_id = ceph_gettid();
1059 driver->get_queue(queue_id)->queue_task(t);
1061 while(t->return_code > 0) {
1064 pbl->push_back(std::move(p));
1070 int NVMEDevice::aio_read(
1076 dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
1077 assert(off % block_size == 0);
1078 assert(len % block_size == 0);
1081 assert(off + len <= size);
1083 Task *t = new Task(this, IOCommand::READ_COMMAND, off, len);
1085 bufferptr p = buffer::create_page_aligned(len);
1088 char *buf = p.c_str();
1089 t->fill_cb = [buf, t]() {
1090 t->copy_to_buf(buf, 0, t->len);
1093 Task *first = static_cast<Task*>(ioc->nvme_task_first);
1094 Task *last = static_cast<Task*>(ioc->nvme_task_last);
1098 ioc->nvme_task_first = t;
1099 ioc->nvme_task_last = t;
1105 int NVMEDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered)
1109 assert(off + len <= size);
1111 uint64_t aligned_off = align_down(off, block_size);
1112 uint64_t aligned_len = align_up(off+len, block_size) - aligned_off;
1113 dout(5) << __func__ << " " << off << "~" << len
1114 << " aligned " << aligned_off << "~" << aligned_len << dendl;
1115 IOContext ioc(g_ceph_context, nullptr);
1116 Task *t = new Task(this, IOCommand::READ_COMMAND, aligned_off, aligned_len, 1);
1119 t->fill_cb = [buf, t, off, len]() {
1120 t->copy_to_buf(buf, off-t->offset, len);
1124 queue_id = ceph_gettid();
1125 driver->get_queue(queue_id)->queue_task(t);
1127 while(t->return_code > 0) {
1135 int NVMEDevice::invalidate_cache(uint64_t off, uint64_t len)
1137 dout(5) << __func__ << " " << off << "~" << len << dendl;