Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / os / bluestore / NVMEDevice.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 //
3 // vim: ts=8 sw=2 smarttab
4 /*
5  * Ceph - scalable distributed file system
6   *
7  * Copyright (C) 2015 XSky <haomai@xsky.com>
8  *
9  * Author: Haomai Wang <haomaiwang@gmail.com>
10  *
11  * This is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Lesser General Public
13  * License version 2.1, as published by the Free Software
14  * Foundation.  See file COPYING.
15  *
16  */
17
18 #include <unistd.h>
19 #include <stdlib.h>
20 #include <sys/types.h>
21 #include <sys/stat.h>
22 #include <fcntl.h>
23 #include <unistd.h>
24
25 #include <chrono>
26 #include <functional>
27 #include <map>
28 #include <thread>
29 #include <xmmintrin.h>
30
31 #include <spdk/nvme.h>
32
33 #include <rte_lcore.h>
34
35 #include "include/stringify.h"
36 #include "include/types.h"
37 #include "include/compat.h"
38 #include "common/align.h"
39 #include "common/errno.h"
40 #include "common/debug.h"
41 #include "common/perf_counters.h"
42 #include "common/io_priority.h"
43
44 #include "NVMEDevice.h"
45
46 #define dout_context g_ceph_context
47 #define dout_subsys ceph_subsys_bdev
48 #undef dout_prefix
49 #define dout_prefix *_dout << "bdev(" << sn << ") "
50
51 static constexpr uint16_t data_buffer_default_num = 2048;
52
53 static constexpr uint32_t data_buffer_size = 8192;
54
55 static constexpr uint16_t inline_segment_num = 32;
56
57 static thread_local int queue_id = -1;
58
59 enum {
60   l_bluestore_nvmedevice_first = 632430,
61   l_bluestore_nvmedevice_aio_write_lat,
62   l_bluestore_nvmedevice_read_lat,
63   l_bluestore_nvmedevice_flush_lat,
64   l_bluestore_nvmedevice_aio_write_queue_lat,
65   l_bluestore_nvmedevice_read_queue_lat,
66   l_bluestore_nvmedevice_flush_queue_lat,
67   l_bluestore_nvmedevice_queue_ops,
68   l_bluestore_nvmedevice_polling_lat,
69   l_bluestore_nvmedevice_buffer_alloc_failed,
70   l_bluestore_nvmedevice_last
71 };
72
73 static void io_complete(void *t, const struct spdk_nvme_cpl *completion);
74
75 int dpdk_thread_adaptor(void *f)
76 {
77   (*static_cast<std::function<void ()>*>(f))();
78   return 0;
79 }
80
81 struct IOSegment {
82   uint32_t len;
83   void *addr;
84 };
85
86 struct IORequest {
87   uint16_t cur_seg_idx = 0;
88   uint16_t nseg;
89   uint32_t cur_seg_left = 0;
90   void *inline_segs[inline_segment_num];
91   void **extra_segs = nullptr;
92 };
93
94 class SharedDriverQueueData {
95   SharedDriverData *driver;
96   spdk_nvme_ctrlr *ctrlr;
97   spdk_nvme_ns *ns;
98   std::string sn;
99   uint64_t block_size;
100   uint32_t sector_size;
101   uint32_t core_id;
102   uint32_t queueid;
103   struct spdk_nvme_qpair *qpair;
104   std::function<void ()> run_func;
105   friend class AioCompletionThread;
106
107   bool aio_stop = false;
108   void _aio_thread();
109   int alloc_buf_from_pool(Task *t, bool write);
110
111   std::atomic_bool queue_empty;
112   Mutex queue_lock;
113   Cond queue_cond;
114   std::queue<Task*> task_queue;
115
116   Mutex flush_lock;
117   Cond flush_cond;
118   std::atomic_int flush_waiters;
119   std::set<uint64_t> flush_waiter_seqs;
120
121   public:
122     std::atomic_ulong completed_op_seq, queue_op_seq;
123     std::vector<void*> data_buf_mempool;
124     PerfCounters *logger = nullptr;
125
126     SharedDriverQueueData(SharedDriverData *driver, spdk_nvme_ctrlr *c, spdk_nvme_ns *ns, uint64_t block_size,
127                           const std::string &sn_tag, uint32_t sector_size, uint32_t core, uint32_t queue_id)
128       : driver(driver),
129         ctrlr(c),
130         ns(ns),
131         sn(sn_tag),
132         block_size(block_size),
133         sector_size(sector_size),
134         core_id(core),
135         queueid(queue_id),
136         run_func([this]() { _aio_thread(); }),
137         queue_empty(false),
138         queue_lock("NVMEDevice::queue_lock"),
139         flush_lock("NVMEDevice::flush_lock"),
140         flush_waiters(0),
141         completed_op_seq(0), queue_op_seq(0) {
142
143     qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, SPDK_NVME_QPRIO_URGENT);
144     PerfCountersBuilder b(g_ceph_context, string("NVMEDevice-AIOThread-"+stringify(this)),
145                           l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last);
146     b.add_time_avg(l_bluestore_nvmedevice_aio_write_lat, "aio_write_lat", "Average write completing latency");
147     b.add_time_avg(l_bluestore_nvmedevice_read_lat, "read_lat", "Average read completing latency");
148     b.add_time_avg(l_bluestore_nvmedevice_flush_lat, "flush_lat", "Average flush completing latency");
149     b.add_u64(l_bluestore_nvmedevice_queue_ops, "queue_ops", "Operations in nvme queue");
150     b.add_time_avg(l_bluestore_nvmedevice_polling_lat, "polling_lat", "Average polling latency");
151     b.add_time_avg(l_bluestore_nvmedevice_aio_write_queue_lat, "aio_write_queue_lat", "Average queue write request latency");
152     b.add_time_avg(l_bluestore_nvmedevice_read_queue_lat, "read_queue_lat", "Average queue read request latency");
153     b.add_time_avg(l_bluestore_nvmedevice_flush_queue_lat, "flush_queue_lat", "Average queue flush request latency");
154     b.add_u64_counter(l_bluestore_nvmedevice_buffer_alloc_failed, "buffer_alloc_failed", "Alloc data buffer failed count");
155     logger = b.create_perf_counters();
156     g_ceph_context->get_perfcounters_collection()->add(logger);
157    }
158
159    void queue_task(Task *t, uint64_t ops = 1) {
160     queue_op_seq += ops;
161     Mutex::Locker l(queue_lock);
162     task_queue.push(t);
163     if (queue_empty.load()) {
164       queue_empty = false;
165       queue_cond.Signal();
166     }
167   }
168
169   void flush_wait() {
170     uint64_t cur_seq = queue_op_seq.load();
171     uint64_t left = cur_seq - completed_op_seq.load();
172     if (cur_seq > completed_op_seq) {
173       // TODO: this may contains read op
174       dout(10) << __func__ << " existed inflight ops " << left << dendl;
175       Mutex::Locker l(flush_lock);
176       ++flush_waiters;
177       flush_waiter_seqs.insert(cur_seq);
178       while (cur_seq > completed_op_seq.load()) {
179        flush_cond.Wait(flush_lock);
180       }
181       flush_waiter_seqs.erase(cur_seq);
182       --flush_waiters;
183     }
184   }
185
186   void start() {
187     int r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&run_func),
188                                   core_id);
189     assert(r == 0);
190
191   }
192
193   void stop() {
194     {
195       Mutex::Locker l(queue_lock);
196       aio_stop = true;
197       queue_cond.Signal();
198     }
199     int r = rte_eal_wait_lcore(core_id);
200     assert(r == 0);
201     aio_stop = false;
202   }
203
204   ~SharedDriverQueueData() {
205     g_ceph_context->get_perfcounters_collection()->remove(logger);
206     if (!qpair) {
207       spdk_nvme_ctrlr_free_io_qpair(qpair); 
208     }
209     delete logger;
210   }
211 };
212
213 class SharedDriverData {
214   unsigned id;
215   uint32_t core_id;
216
217   std::string sn;
218   spdk_nvme_ctrlr *ctrlr;
219   spdk_nvme_ns *ns;
220   uint64_t block_size = 0;
221   uint32_t sector_size = 0;
222   uint64_t size = 0;
223   uint32_t queue_number;
224   std::vector<SharedDriverQueueData*> queues;
225
226   void _aio_start() {
227      for (auto &&it : queues)
228               it->start();
229   }
230   void _aio_stop() {
231       for (auto &&it : queues)
232               it->stop();
233   }
234
235   public:
236   std::vector<NVMEDevice*> registered_devices;
237   SharedDriverData(unsigned _id, const std::string &sn_tag,
238                    spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
239       : id(_id),
240         sn(sn_tag),
241         ctrlr(c),
242         ns(ns) {
243     int i;
244     sector_size = spdk_nvme_ns_get_sector_size(ns);
245     block_size = std::max(CEPH_PAGE_SIZE, sector_size);
246     size = ((uint64_t)sector_size) * spdk_nvme_ns_get_num_sectors(ns);
247
248     RTE_LCORE_FOREACH_SLAVE(i) {
249       queues.push_back(new SharedDriverQueueData(this, ctrlr, ns, block_size, sn, sector_size, i, queue_number++));
250    }
251
252    _aio_start();
253   }
254
255   bool is_equal(const string &tag) const { return sn == tag; }
256   ~SharedDriverData() {
257     for (auto p : queues) {
258       delete p;
259    }
260   }
261
262   SharedDriverQueueData *get_queue(uint32_t i) {
263         return queues.at(i%queue_number);
264   }
265
266   void register_device(NVMEDevice *device) {
267     // in case of registered_devices, we stop thread now.
268     // Because release is really a rare case, we could bear this
269     _aio_stop();
270     registered_devices.push_back(device);
271     _aio_start();
272   }
273
274   void remove_device(NVMEDevice *device) {
275     _aio_stop();
276     std::vector<NVMEDevice*> new_devices;
277     for (auto &&it : registered_devices) {
278       if (it != device)
279         new_devices.push_back(it);
280     }
281     registered_devices.swap(new_devices);
282     _aio_start();
283   }
284
285   uint64_t get_block_size() {
286     return block_size;
287   }
288   uint64_t get_size() {
289     return size;
290   }
291 };
292
293 struct Task {
294   NVMEDevice *device;
295   IOContext *ctx = nullptr;
296   IOCommand command;
297   uint64_t offset;
298   uint64_t len;
299   bufferlist write_bl;
300   std::function<void()> fill_cb;
301   Task *next = nullptr;
302   int64_t return_code;
303   ceph::coarse_real_clock::time_point start;
304   IORequest io_request;
305   std::mutex lock;
306   std::condition_variable cond;
307   SharedDriverQueueData *queue;
308   Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0)
309     : device(dev), command(c), offset(off), len(l),
310       return_code(rc),
311       start(ceph::coarse_real_clock::now()) {}
312   ~Task() {
313     assert(!io_request.nseg);
314   }
315   void release_segs(SharedDriverQueueData *queue_data) {
316     if (io_request.extra_segs) {
317       for (uint16_t i = 0; i < io_request.nseg; i++)
318         queue_data->data_buf_mempool.push_back(io_request.extra_segs[i]);
319       delete io_request.extra_segs;
320     } else if (io_request.nseg) {
321       for (uint16_t i = 0; i < io_request.nseg; i++)
322         queue_data->data_buf_mempool.push_back(io_request.inline_segs[i]);
323     }
324     io_request.nseg = 0;
325   }
326
327   void copy_to_buf(char *buf, uint64_t off, uint64_t len) {
328     uint64_t copied = 0;
329     uint64_t left = len;
330     void **segs = io_request.extra_segs ? io_request.extra_segs : io_request.inline_segs;
331     uint16_t i = 0;
332     while (left > 0) {
333       char *src = static_cast<char*>(segs[i++]);
334       uint64_t need_copy = std::min(left, data_buffer_size-off);
335       memcpy(buf+copied, src+off, need_copy);
336       off = 0;
337       left -= need_copy;
338       copied += need_copy;
339     }
340   }
341
342   void io_wait() {
343     std::unique_lock<std::mutex> l(lock);
344     cond.wait(l);
345   }
346
347   void io_wake() {
348     std::lock_guard<std::mutex> l(lock);
349     cond.notify_all();
350   }
351 };
352
353 static void data_buf_reset_sgl(void *cb_arg, uint32_t sgl_offset)
354 {
355   Task *t = static_cast<Task*>(cb_arg);
356   uint32_t i = sgl_offset / data_buffer_size;
357   uint32_t offset = i * data_buffer_size;
358   assert(i <= t->io_request.nseg);
359
360   for (; i < t->io_request.nseg; i++) {
361     offset += data_buffer_size;
362     if (offset > sgl_offset) {
363       if (offset > t->len)
364         offset = t->len;
365       break;
366     }
367   }
368
369   t->io_request.cur_seg_idx = i;
370   t->io_request.cur_seg_left = offset - sgl_offset;
371   return ;
372 }
373
374 static int data_buf_next_sge(void *cb_arg, void **address, uint32_t *length)
375 {
376   uint32_t size;
377   void *addr;
378   Task *t = static_cast<Task*>(cb_arg);
379   if (t->io_request.cur_seg_idx >= t->io_request.nseg) {
380     *length = 0;
381     *address = 0;
382     return 0;
383   }
384
385   addr = t->io_request.extra_segs ? t->io_request.extra_segs[t->io_request.cur_seg_idx] : t->io_request.inline_segs[t->io_request.cur_seg_idx];
386
387   size = data_buffer_size;
388   if (t->io_request.cur_seg_idx == t->io_request.nseg - 1) {
389       uint64_t tail = t->len % data_buffer_size;
390       if (tail) {
391         size = (uint32_t) tail;
392       }
393   }
394  
395   if (t->io_request.cur_seg_left) {
396     *address = (void *)((uint64_t)addr + size - t->io_request.cur_seg_left);
397     *length = t->io_request.cur_seg_left;
398     t->io_request.cur_seg_left = 0;
399   } else {
400     *address = addr;
401     *length = size;
402   }
403   
404   t->io_request.cur_seg_idx++;
405   return 0;
406 }
407
408 int SharedDriverQueueData::alloc_buf_from_pool(Task *t, bool write)
409 {
410   uint64_t count = t->len / data_buffer_size;
411   if (t->len % data_buffer_size)
412     ++count;
413   void **segs;
414   if (count > data_buf_mempool.size())
415     return -ENOMEM;
416   if (count <= inline_segment_num) {
417     segs = t->io_request.inline_segs;
418   } else {
419     t->io_request.extra_segs = new void*[count];
420     segs = t->io_request.extra_segs;
421   }
422   for (uint16_t i = 0; i < count; i++) {
423     segs[i] = data_buf_mempool.back();
424     data_buf_mempool.pop_back();
425   }
426   t->io_request.nseg = count;
427   if (write) {
428     auto blp = t->write_bl.begin();
429     uint32_t len = 0;
430     uint16_t i = 0;
431     for (; i < count - 1; ++i) {
432       blp.copy(data_buffer_size, static_cast<char*>(segs[i]));
433       len += data_buffer_size;
434     }
435     blp.copy(t->write_bl.length() - len, static_cast<char*>(segs[i]));
436   }
437
438   return 0;
439 }
440
441 void SharedDriverQueueData::_aio_thread()
442 {
443   dout(1) << __func__ << " start" << dendl;
444
445   if (data_buf_mempool.empty()) {
446     for (uint16_t i = 0; i < data_buffer_default_num; i++) {
447       void *b = spdk_zmalloc(data_buffer_size, CEPH_PAGE_SIZE, NULL);
448       if (!b) {
449         derr << __func__ << " failed to create memory pool for nvme data buffer" << dendl;
450         assert(b);
451       }
452       data_buf_mempool.push_back(b);
453     }
454   }
455
456   Task *t = nullptr;
457   int r = 0;
458   uint64_t lba_off, lba_count;
459
460   ceph::coarse_real_clock::time_point cur, start
461     = ceph::coarse_real_clock::now();
462   while (true) {
463     bool inflight = queue_op_seq.load() - completed_op_seq.load();
464  again:
465     dout(40) << __func__ << " polling" << dendl;
466     if (inflight) {
467       if (!spdk_nvme_qpair_process_completions(qpair, g_conf->bluestore_spdk_max_io_completion)) {
468         dout(30) << __func__ << " idle, have a pause" << dendl;
469         _mm_pause();
470       }
471     }
472
473     for (; t; t = t->next) {
474       t->queue = this;
475       lba_off = t->offset / sector_size;
476       lba_count = t->len / sector_size;
477       switch (t->command) {
478         case IOCommand::WRITE_COMMAND:
479         {
480           dout(20) << __func__ << " write command issued " << lba_off << "~" << lba_count << dendl;
481           r = alloc_buf_from_pool(t, true);
482           if (r < 0) {
483             logger->inc(l_bluestore_nvmedevice_buffer_alloc_failed);
484             goto again;
485           }
486
487           r = spdk_nvme_ns_cmd_writev(
488               ns, qpair, lba_off, lba_count, io_complete, t, 0,
489               data_buf_reset_sgl, data_buf_next_sge);
490           if (r < 0) {
491             derr << __func__ << " failed to do write command" << dendl;
492             t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr;
493             t->release_segs(this);
494             delete t;
495             ceph_abort();
496           }
497           cur = ceph::coarse_real_clock::now();
498           auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
499           logger->tinc(l_bluestore_nvmedevice_aio_write_queue_lat, dur);
500           break;
501         }
502         case IOCommand::READ_COMMAND:
503         {
504           dout(20) << __func__ << " read command issued " << lba_off << "~" << lba_count << dendl;
505           r = alloc_buf_from_pool(t, false);
506           if (r < 0) {
507             logger->inc(l_bluestore_nvmedevice_buffer_alloc_failed);
508             goto again;
509           }
510
511           r = spdk_nvme_ns_cmd_readv(
512               ns, qpair, lba_off, lba_count, io_complete, t, 0,
513               data_buf_reset_sgl, data_buf_next_sge);
514           if (r < 0) {
515             derr << __func__ << " failed to read" << dendl;
516             t->release_segs(this);
517             delete t;
518             ceph_abort();
519           } else {
520             cur = ceph::coarse_real_clock::now();
521             auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
522             logger->tinc(l_bluestore_nvmedevice_read_queue_lat, dur);
523           }
524           break;
525         }
526         case IOCommand::FLUSH_COMMAND:
527         {
528           dout(20) << __func__ << " flush command issueed " << dendl;
529           r = spdk_nvme_ns_cmd_flush(ns, qpair, io_complete, t);
530           if (r < 0) {
531             derr << __func__ << " failed to flush" << dendl;
532             t->release_segs(this);
533             delete t;
534             ceph_abort();
535           } else {
536             cur = ceph::coarse_real_clock::now();
537             auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
538             logger->tinc(l_bluestore_nvmedevice_flush_queue_lat, dur);
539           }
540           break;
541         }
542       }
543     }
544
545     if (!queue_empty.load()) {
546       Mutex::Locker l(queue_lock);
547       if (!task_queue.empty()) {
548         t = task_queue.front();
549         task_queue.pop();
550         logger->set(l_bluestore_nvmedevice_queue_ops, task_queue.size());
551       }
552       if (!t)
553         queue_empty = true;
554     } else {
555       if (flush_waiters.load()) {
556         Mutex::Locker l(flush_lock);
557         if (*flush_waiter_seqs.begin() <= completed_op_seq.load())
558           flush_cond.Signal();
559       }
560
561       if (!inflight) {
562         // be careful, here we need to let each thread reap its own, currently it is done
563         // by only one dedicatd dpdk thread
564         if(!queueid) {
565           for (auto &&it : driver->registered_devices)
566             it->reap_ioc();
567         }
568
569         Mutex::Locker l(queue_lock);
570         if (queue_empty.load()) {
571           cur = ceph::coarse_real_clock::now();
572           auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
573           logger->tinc(l_bluestore_nvmedevice_polling_lat, dur);
574           if (aio_stop)
575             break;
576           queue_cond.Wait(queue_lock);
577           start = ceph::coarse_real_clock::now();
578         }
579       }
580     }
581   }
582   assert(data_buf_mempool.size() == data_buffer_default_num);
583   dout(1) << __func__ << " end" << dendl;
584 }
585
586 #define dout_subsys ceph_subsys_bdev
587 #undef dout_prefix
588 #define dout_prefix *_dout << "bdev "
589
590 class NVMEManager {
591  public:
592   struct ProbeContext {
593     string sn_tag;
594     NVMEManager *manager;
595     SharedDriverData *driver;
596     bool done;
597   };
598
599  private:
600   Mutex lock;
601   bool init = false;
602   std::vector<SharedDriverData*> shared_driver_datas;
603   std::thread dpdk_thread;
604   std::mutex probe_queue_lock;
605   std::condition_variable probe_queue_cond;
606   std::list<ProbeContext*> probe_queue;
607
608  public:
609   NVMEManager()
610       : lock("NVMEDevice::NVMEManager::lock") {}
611   int try_get(const string &sn_tag, SharedDriverData **driver);
612   void register_ctrlr(const string &sn_tag, spdk_nvme_ctrlr *c, struct spdk_pci_device *pci_dev,
613                       SharedDriverData **driver) {
614     assert(lock.is_locked());
615     spdk_nvme_ns *ns;
616     int num_ns = spdk_nvme_ctrlr_get_num_ns(c);
617     assert(num_ns >= 1);
618     if (num_ns > 1) {
619       dout(0) << __func__ << " namespace count larger than 1, currently only use the first namespace" << dendl;
620     }
621     ns = spdk_nvme_ctrlr_get_ns(c, 1);
622     if (!ns) {
623       derr << __func__ << " failed to get namespace at 1" << dendl;
624       ceph_abort();
625     }
626     dout(1) << __func__ << " successfully attach nvme device at" << spdk_pci_device_get_bus(pci_dev)
627             << ":" << spdk_pci_device_get_dev(pci_dev) << ":" << spdk_pci_device_get_func(pci_dev) << dendl;
628
629     // only support one device per osd now!
630     assert(shared_driver_datas.empty());
631     // index 0 is occured by master thread
632     shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, sn_tag, c, ns));
633     *driver = shared_driver_datas.back();
634   }
635 };
636
637 static NVMEManager manager;
638
639 static bool probe_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid, struct spdk_nvme_ctrlr_opts *opts)
640 {
641   NVMEManager::ProbeContext *ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
642   char serial_number[128];
643   struct spdk_pci_addr pci_addr;
644   struct spdk_pci_device *pci_dev = NULL;
645   int result = 0;
646
647   if (trid->trtype != SPDK_NVME_TRANSPORT_PCIE) {
648     dout(0) << __func__ << " only probe local nvme device" << dendl;
649     return false;
650   }
651
652   result = spdk_pci_addr_parse(&pci_addr, trid->traddr);
653   if (result) {
654     dout(0) << __func__ << " failed to get pci address from %s, " << trid->traddr << " return value is: %d" << result << dendl;
655     return false;
656   }
657
658   pci_dev = spdk_pci_get_device(&pci_addr);
659   if (!pci_dev) {
660     dout(0) << __func__ << " failed to get pci device" << dendl; 
661     return false;
662   }
663
664   dout(0) << __func__ << " found device at bus: " << spdk_pci_device_get_bus(pci_dev)
665           << ":" << spdk_pci_device_get_dev(pci_dev) << ":"
666           << spdk_pci_device_get_func(pci_dev) << " vendor:0x" << spdk_pci_device_get_vendor_id(pci_dev) << " device:0x" << spdk_pci_device_get_device_id(pci_dev)
667           << dendl;
668   result = spdk_pci_device_get_serial_number(pci_dev, serial_number, 128);
669   if (result < 0) {
670     dout(10) << __func__ << " failed to get serial number from %p" << pci_dev << dendl;
671     return false;
672   }
673
674   if (ctx->sn_tag.compare(string(serial_number, 16))) {
675     dout(0) << __func__ << " device serial number (" << ctx->sn_tag << ") not match " << serial_number << dendl;
676     return false;
677   }
678
679   return true;
680 }
681
682 static void attach_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid,
683                       struct spdk_nvme_ctrlr *ctrlr, const struct spdk_nvme_ctrlr_opts *opts)
684 {
685   struct spdk_pci_addr pci_addr;
686   struct spdk_pci_device *pci_dev = NULL;
687
688   spdk_pci_addr_parse(&pci_addr, trid->traddr);
689
690   pci_dev = spdk_pci_get_device(&pci_addr);
691   if (!pci_dev) {
692     dout(0) << __func__ << " failed to get pci device" << dendl; 
693     assert(pci_dev);
694   }
695
696   NVMEManager::ProbeContext *ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
697   ctx->manager->register_ctrlr(ctx->sn_tag, ctrlr, pci_dev, &ctx->driver);
698 }
699
700 int NVMEManager::try_get(const string &sn_tag, SharedDriverData **driver)
701 {
702   Mutex::Locker l(lock);
703   int r = 0;
704   unsigned long long core_value;
705   uint32_t core_num = 0;
706   int m_core_arg = -1;
707   uint32_t mem_size_arg = g_conf->bluestore_spdk_mem;
708   char *coremask_arg = (char *)g_conf->bluestore_spdk_coremask.c_str();
709
710   if (sn_tag.empty()) {
711     r = -ENOENT;
712     derr << __func__ << " empty serial number: " << cpp_strerror(r) << dendl;
713     return r;
714   }
715
716   core_value = strtoll(coremask_arg, NULL, 16);
717   for (uint32_t i = 0; i < sizeof(long long) * 8; i++) {
718     bool tmp = (core_value >> i) & 0x1;
719     if (tmp) {
720       core_num += 1;
721       // select the least signficant bit as the master core
722       if(m_core_arg < 0) {
723         m_core_arg = i;
724       }
725     }
726   }
727
728   // at least two cores are needed for using spdk
729   if (core_num < 2) {
730     r = -ENOENT;
731     derr << __func__ << " invalid spdk coremask, at least two cores are needed: "
732          << cpp_strerror(r) << dendl;
733     return r;
734   }
735
736   for (auto &&it : shared_driver_datas) {
737     if (it->is_equal(sn_tag)) {
738       *driver = it;
739       return 0;
740     }
741   }
742
743   if (!init) {
744     init = true;
745     dpdk_thread = std::thread(
746       [this, coremask_arg, m_core_arg, mem_size_arg]() {
747         static struct spdk_env_opts opts;
748         int r;
749
750         spdk_env_opts_init(&opts);
751         opts.name = "ceph-osd";
752         opts.core_mask = coremask_arg;
753         opts.dpdk_master_core = m_core_arg;
754         opts.dpdk_mem_size = mem_size_arg;
755         spdk_env_init(&opts);
756
757         spdk_nvme_retry_count = g_ceph_context->_conf->bdev_nvme_retry_count;
758         if (spdk_nvme_retry_count < 0)
759           spdk_nvme_retry_count = SPDK_NVME_DEFAULT_RETRY_COUNT;
760
761         std::unique_lock<std::mutex> l(probe_queue_lock);
762         while (true) {
763           if (!probe_queue.empty()) {
764             ProbeContext* ctxt = probe_queue.front();
765             probe_queue.pop_front();
766             r = spdk_nvme_probe(NULL, ctxt, probe_cb, attach_cb, NULL);
767             if (r < 0) {
768               assert(!ctxt->driver);
769               derr << __func__ << " device probe nvme failed" << dendl;
770             }
771             ctxt->done = true;
772             probe_queue_cond.notify_all();
773           } else {
774             probe_queue_cond.wait(l);
775           }
776         }
777       }
778     );
779     dpdk_thread.detach();
780   }
781
782   ProbeContext ctx = {sn_tag, this, nullptr, false};
783   {
784     std::unique_lock<std::mutex> l(probe_queue_lock);
785     probe_queue.push_back(&ctx);
786     while (!ctx.done)
787       probe_queue_cond.wait(l);
788   }
789   if (!ctx.driver)
790     return -1;
791   *driver = ctx.driver;
792
793   return 0;
794 }
795
796 void io_complete(void *t, const struct spdk_nvme_cpl *completion)
797 {
798   Task *task = static_cast<Task*>(t);
799   IOContext *ctx = task->ctx;
800   SharedDriverQueueData *queue = task->queue;
801
802   assert(queue != NULL);
803   assert(ctx != NULL);
804   ++queue->completed_op_seq;
805   auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(
806       ceph::coarse_real_clock::now() - task->start);
807   if (task->command == IOCommand::WRITE_COMMAND) {
808     queue->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, dur);
809     assert(!spdk_nvme_cpl_is_error(completion));
810     dout(20) << __func__ << " write/zero op successfully, left "
811              << queue->queue_op_seq - queue->completed_op_seq << dendl;
812     // check waiting count before doing callback (which may
813     // destroy this ioc).
814     if (ctx->priv) {
815       if (!--ctx->num_running) {
816         task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
817       }
818     } else {
819       ctx->try_aio_wake();
820     }
821     task->release_segs(queue);
822     delete task;
823   } else if (task->command == IOCommand::READ_COMMAND) {
824     queue->logger->tinc(l_bluestore_nvmedevice_read_lat, dur);
825     assert(!spdk_nvme_cpl_is_error(completion));
826     dout(20) << __func__ << " read op successfully" << dendl;
827     task->fill_cb();
828     task->release_segs(queue);
829     // read submitted by AIO
830     if(!task->return_code) {
831       if (ctx->priv) {
832         if (!--ctx->num_running) {
833           task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
834         }
835       } else {
836         ctx->try_aio_wake();
837       }
838       delete task;
839     } else {
840       task->return_code = 0;
841       if (!--ctx->num_running) {
842         task->io_wake();
843       }
844     }
845   } else {
846     assert(task->command == IOCommand::FLUSH_COMMAND);
847     assert(!spdk_nvme_cpl_is_error(completion));
848     queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
849     dout(20) << __func__ << " flush op successfully" << dendl;
850     task->return_code = 0;
851   }
852 }
853
854 // ----------------
855 #undef dout_prefix
856 #define dout_prefix *_dout << "bdev(" << name << ") "
857
858 NVMEDevice::NVMEDevice(CephContext* cct, aio_callback_t cb, void *cbpriv)
859   :   BlockDevice(cct),
860       driver(nullptr),
861       size(0),
862       block_size(0),
863       aio_stop(false),
864       buffer_lock("NVMEDevice::buffer_lock"),
865       aio_callback(cb),
866       aio_callback_priv(cbpriv)
867 {
868 }
869
870
871 int NVMEDevice::open(const string& p)
872 {
873   int r = 0;
874   dout(1) << __func__ << " path " << p << dendl;
875
876   string serial_number;
877   int fd = ::open(p.c_str(), O_RDONLY);
878   if (fd < 0) {
879     r = -errno;
880     derr << __func__ << " unable to open " << p << ": " << cpp_strerror(r)
881          << dendl;
882     return r;
883   }
884   char buf[100];
885   r = ::read(fd, buf, sizeof(buf));
886   VOID_TEMP_FAILURE_RETRY(::close(fd));
887   fd = -1; // defensive
888   if (r <= 0) {
889     if (r == 0) {
890       r = -EINVAL;
891     } else {
892       r = -errno;
893     }
894     derr << __func__ << " unable to read " << p << ": " << cpp_strerror(r) << dendl;
895     return r;
896   }
897   /* scan buf from the beginning with isxdigit. */
898   int i = 0;
899   while (i < r && isxdigit(buf[i])) {
900     i++;
901   }
902   serial_number = string(buf, i);
903   r = manager.try_get(serial_number, &driver);
904   if (r < 0) {
905     derr << __func__ << " failed to get nvme device with sn " << serial_number << dendl;
906     return r;
907   }
908
909   driver->register_device(this);
910   block_size = driver->get_block_size();
911   size = driver->get_size();
912   name = serial_number;
913
914   //nvme is non-rotational device.
915   rotational = false;
916
917   // round size down to an even block
918   size &= ~(block_size - 1);
919
920   dout(1) << __func__ << " size " << size << " (" << pretty_si_t(size) << "B)"
921           << " block_size " << block_size << " (" << pretty_si_t(block_size)
922           << "B)" << dendl;
923
924   return 0;
925 }
926
927 void NVMEDevice::close()
928 {
929   dout(1) << __func__ << dendl;
930
931   name.clear();
932   driver->remove_device(this);
933
934   dout(1) << __func__ << " end" << dendl;
935 }
936
937 int NVMEDevice::collect_metadata(string prefix, map<string,string> *pm) const
938 {
939   (*pm)[prefix + "rotational"] = "0";
940   (*pm)[prefix + "size"] = stringify(get_size());
941   (*pm)[prefix + "block_size"] = stringify(get_block_size());
942   (*pm)[prefix + "driver"] = "NVMEDevice";
943   (*pm)[prefix + "type"] = "nvme";
944   (*pm)[prefix + "access_mode"] = "spdk";
945   (*pm)[prefix + "nvme_serial_number"] = name;
946
947   return 0;
948 }
949
950 int NVMEDevice::flush()
951 {
952   dout(10) << __func__ << " start" << dendl;
953   auto start = ceph::coarse_real_clock::now();
954
955   if(queue_id == -1)
956     queue_id = ceph_gettid();
957   SharedDriverQueueData *queue = driver->get_queue(queue_id);
958   assert(queue != NULL);
959   queue->flush_wait();
960   auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(
961       ceph::coarse_real_clock::now() - start);
962   queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
963   return 0;
964 }
965
966 void NVMEDevice::aio_submit(IOContext *ioc)
967 {
968   dout(20) << __func__ << " ioc " << ioc << " pending "
969            << ioc->num_pending.load() << " running "
970            << ioc->num_running.load() << dendl;
971   int pending = ioc->num_pending.load();
972   Task *t = static_cast<Task*>(ioc->nvme_task_first);
973   if (pending && t) {
974     ioc->num_running += pending;
975     ioc->num_pending -= pending;
976     assert(ioc->num_pending.load() == 0);  // we should be only thread doing this
977     // Only need to push the first entry
978   if(queue_id == -1)
979     queue_id = ceph_gettid();
980     driver->get_queue(queue_id)->queue_task(t, pending);
981     ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
982   }
983 }
984
985 int NVMEDevice::aio_write(
986     uint64_t off,
987     bufferlist &bl,
988     IOContext *ioc,
989     bool buffered)
990 {
991   uint64_t len = bl.length();
992   dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc
993            << " buffered " << buffered << dendl;
994   assert(off % block_size == 0);
995   assert(len % block_size == 0);
996   assert(len > 0);
997   assert(off < size);
998   assert(off + len <= size);
999
1000   Task *t = new Task(this, IOCommand::WRITE_COMMAND, off, len);
1001
1002   // TODO: if upper layer alloc memory with known physical address,
1003   // we can reduce this copy
1004   t->write_bl = std::move(bl);
1005
1006   if (buffered) {
1007     // Only need to push the first entry
1008     if(queue_id == -1)
1009       queue_id = ceph_gettid();
1010     driver->get_queue(queue_id)->queue_task(t);
1011   } else {
1012     t->ctx = ioc;
1013     Task *first = static_cast<Task*>(ioc->nvme_task_first);
1014     Task *last = static_cast<Task*>(ioc->nvme_task_last);
1015     if (last)
1016       last->next = t;
1017     if (!first)
1018       ioc->nvme_task_first = t;
1019     ioc->nvme_task_last = t;
1020     ++ioc->num_pending;
1021   }
1022
1023   dout(5) << __func__ << " " << off << "~" << len << dendl;
1024
1025   return 0;
1026 }
1027
1028 int NVMEDevice::write(uint64_t off, bufferlist &bl, bool buffered)
1029 {
1030   // FIXME: there is presumably a more efficient way to do this...
1031   IOContext ioc(cct, NULL);
1032   aio_write(off, bl, &ioc, buffered);
1033   ioc.aio_wait();
1034   return 0;
1035 }
1036
1037 int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
1038                      IOContext *ioc,
1039                      bool buffered)
1040 {
1041   dout(5) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
1042   assert(off % block_size == 0);
1043   assert(len % block_size == 0);
1044   assert(len > 0);
1045   assert(off < size);
1046   assert(off + len <= size);
1047
1048   Task *t = new Task(this, IOCommand::READ_COMMAND, off, len, 1);
1049   bufferptr p = buffer::create_page_aligned(len);
1050   int r = 0;
1051   t->ctx = ioc;
1052   char *buf = p.c_str();
1053   t->fill_cb = [buf, t]() {
1054     t->copy_to_buf(buf, 0, t->len);
1055   };
1056   ++ioc->num_running;
1057   if(queue_id == -1)
1058     queue_id = ceph_gettid();
1059   driver->get_queue(queue_id)->queue_task(t);
1060
1061   while(t->return_code > 0) {
1062     t->io_wait();
1063   }
1064   pbl->push_back(std::move(p));
1065   r = t->return_code;
1066   delete t;
1067   return r;
1068 }
1069
1070 int NVMEDevice::aio_read(
1071     uint64_t off,
1072     uint64_t len,
1073     bufferlist *pbl,
1074     IOContext *ioc)
1075 {
1076   dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
1077   assert(off % block_size == 0);
1078   assert(len % block_size == 0);
1079   assert(len > 0);
1080   assert(off < size);
1081   assert(off + len <= size);
1082
1083   Task *t = new Task(this, IOCommand::READ_COMMAND, off, len);
1084
1085   bufferptr p = buffer::create_page_aligned(len);
1086   pbl->append(p);
1087   t->ctx = ioc;
1088   char *buf = p.c_str();
1089   t->fill_cb = [buf, t]() {
1090     t->copy_to_buf(buf, 0, t->len);
1091   };
1092
1093   Task *first = static_cast<Task*>(ioc->nvme_task_first);
1094   Task *last = static_cast<Task*>(ioc->nvme_task_last);
1095   if (last)
1096     last->next = t;
1097   if (!first)
1098     ioc->nvme_task_first = t;
1099   ioc->nvme_task_last = t;
1100   ++ioc->num_pending;
1101
1102   return 0;
1103 }
1104
1105 int NVMEDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered)
1106 {
1107   assert(len > 0);
1108   assert(off < size);
1109   assert(off + len <= size);
1110
1111   uint64_t aligned_off = align_down(off, block_size);
1112   uint64_t aligned_len = align_up(off+len, block_size) - aligned_off;
1113   dout(5) << __func__ << " " << off << "~" << len
1114           << " aligned " << aligned_off << "~" << aligned_len << dendl;
1115   IOContext ioc(g_ceph_context, nullptr);
1116   Task *t = new Task(this, IOCommand::READ_COMMAND, aligned_off, aligned_len, 1);
1117   int r = 0;
1118   t->ctx = &ioc;
1119   t->fill_cb = [buf, t, off, len]() {
1120     t->copy_to_buf(buf, off-t->offset, len);
1121   };
1122   ++ioc.num_running;
1123   if(queue_id == -1)
1124     queue_id = ceph_gettid();
1125   driver->get_queue(queue_id)->queue_task(t);
1126
1127   while(t->return_code > 0) {
1128     t->io_wait();
1129   }
1130   r = t->return_code;
1131   delete t;
1132   return r;
1133 }
1134
1135 int NVMEDevice::invalidate_cache(uint64_t off, uint64_t len)
1136 {
1137   dout(5) << __func__ << " " << off << "~" << len << dendl;
1138   return 0;
1139 }