Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / os / bluestore / KernelDevice.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2014 Red Hat
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include <unistd.h>
16 #include <stdlib.h>
17 #include <sys/types.h>
18 #include <sys/stat.h>
19 #include <fcntl.h>
20
21 #include "KernelDevice.h"
22 #include "include/types.h"
23 #include "include/compat.h"
24 #include "include/stringify.h"
25 #include "common/errno.h"
26 #include "common/debug.h"
27 #include "common/blkdev.h"
28 #include "common/align.h"
29 #include "common/blkdev.h"
30
31 #define dout_context cct
32 #define dout_subsys ceph_subsys_bdev
33 #undef dout_prefix
34 #define dout_prefix *_dout << "bdev(" << this << " " << path << ") "
35
36 KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv)
37   : BlockDevice(cct),
38     fd_direct(-1),
39     fd_buffered(-1),
40     size(0), block_size(0),
41     fs(NULL), aio(false), dio(false),
42     debug_lock("KernelDevice::debug_lock"),
43     aio_queue(cct->_conf->bdev_aio_max_queue_depth),
44     aio_callback(cb),
45     aio_callback_priv(cbpriv),
46     aio_stop(false),
47     aio_thread(this),
48     injecting_crash(0)
49 {
50 }
51
52 int KernelDevice::_lock()
53 {
54   struct flock l;
55   memset(&l, 0, sizeof(l));
56   l.l_type = F_WRLCK;
57   l.l_whence = SEEK_SET;
58   int r = ::fcntl(fd_direct, F_SETLK, &l);
59   if (r < 0)
60     return -errno;
61   return 0;
62 }
63
64 int KernelDevice::open(const string& p)
65 {
66   path = p;
67   int r = 0;
68   dout(1) << __func__ << " path " << path << dendl;
69
70   fd_direct = ::open(path.c_str(), O_RDWR | O_DIRECT);
71   if (fd_direct < 0) {
72     r = -errno;
73     derr << __func__ << " open got: " << cpp_strerror(r) << dendl;
74     return r;
75   }
76   fd_buffered = ::open(path.c_str(), O_RDWR);
77   if (fd_buffered < 0) {
78     r = -errno;
79     derr << __func__ << " open got: " << cpp_strerror(r) << dendl;
80     goto out_direct;
81   }
82   dio = true;
83   aio = cct->_conf->bdev_aio;
84   if (!aio) {
85     assert(0 == "non-aio not supported");
86   }
87
88   // disable readahead as it will wreak havoc on our mix of
89   // directio/aio and buffered io.
90   r = posix_fadvise(fd_buffered, 0, 0, POSIX_FADV_RANDOM);
91   if (r) {
92     r = -r;
93     derr << __func__ << " open got: " << cpp_strerror(r) << dendl;
94     goto out_fail;
95   }
96
97   r = _lock();
98   if (r < 0) {
99     derr << __func__ << " failed to lock " << path << ": " << cpp_strerror(r)
100          << dendl;
101     goto out_fail;
102   }
103
104   struct stat st;
105   r = ::fstat(fd_direct, &st);
106   if (r < 0) {
107     r = -errno;
108     derr << __func__ << " fstat got " << cpp_strerror(r) << dendl;
109     goto out_fail;
110   }
111
112   // Operate as though the block size is 4 KB.  The backing file
113   // blksize doesn't strictly matter except that some file systems may
114   // require a read/modify/write if we write something smaller than
115   // it.
116   block_size = cct->_conf->bdev_block_size;
117   if (block_size != (unsigned)st.st_blksize) {
118     dout(1) << __func__ << " backing device/file reports st_blksize "
119             << st.st_blksize << ", using bdev_block_size "
120             << block_size << " anyway" << dendl;
121   }
122
123   if (S_ISBLK(st.st_mode)) {
124     int64_t s;
125     r = get_block_device_size(fd_direct, &s);
126     if (r < 0) {
127       goto out_fail;
128     }
129     size = s;
130   } else {
131     size = st.st_size;
132   }
133   if (cct->_conf->get_val<bool>("bdev_inject_bad_size")) {
134     derr << "injecting bad size; actual 0x" << std::hex << size
135          << " but using 0x" << (size & ~block_size) << std::dec << dendl;
136     size &= ~(block_size);
137   }
138
139   {
140     char partition[PATH_MAX], devname[PATH_MAX];
141     r = get_device_by_fd(fd_buffered, partition, devname, sizeof(devname));
142     if (r < 0) {
143       derr << "unable to get device name for " << path << ": "
144            << cpp_strerror(r) << dendl;
145       rotational = true;
146     } else {
147       dout(20) << __func__ << " devname " << devname << dendl;
148       rotational = block_device_is_rotational(devname);
149     }
150   }
151
152   r = _aio_start();
153   if (r < 0) {
154     goto out_fail;
155   }
156
157   fs = FS::create_by_fd(fd_direct);
158   assert(fs);
159
160   // round size down to an even block
161   size &= ~(block_size - 1);
162
163   dout(1) << __func__
164           << " size " << size
165           << " (0x" << std::hex << size << std::dec << ", "
166           << pretty_si_t(size) << "B)"
167           << " block_size " << block_size
168           << " (" << pretty_si_t(block_size) << "B)"
169           << " " << (rotational ? "rotational" : "non-rotational")
170           << dendl;
171   return 0;
172
173  out_fail:
174   VOID_TEMP_FAILURE_RETRY(::close(fd_buffered));
175   fd_buffered = -1;
176  out_direct:
177   VOID_TEMP_FAILURE_RETRY(::close(fd_direct));
178   fd_direct = -1;
179   return r;
180 }
181
182 void KernelDevice::close()
183 {
184   dout(1) << __func__ << dendl;
185   _aio_stop();
186
187   assert(fs);
188   delete fs;
189   fs = NULL;
190
191   assert(fd_direct >= 0);
192   VOID_TEMP_FAILURE_RETRY(::close(fd_direct));
193   fd_direct = -1;
194
195   assert(fd_buffered >= 0);
196   VOID_TEMP_FAILURE_RETRY(::close(fd_buffered));
197   fd_buffered = -1;
198
199   path.clear();
200 }
201
202 static string get_dev_property(const char *dev, const char *property)
203 {
204   char val[1024] = {0};
205   get_block_device_string_property(dev, property, val, sizeof(val));
206   return val;
207 }
208
209 int KernelDevice::collect_metadata(string prefix, map<string,string> *pm) const
210 {
211   (*pm)[prefix + "rotational"] = stringify((int)(bool)rotational);
212   (*pm)[prefix + "size"] = stringify(get_size());
213   (*pm)[prefix + "block_size"] = stringify(get_block_size());
214   (*pm)[prefix + "driver"] = "KernelDevice";
215   if (rotational) {
216     (*pm)[prefix + "type"] = "hdd";
217   } else {
218     (*pm)[prefix + "type"] = "ssd";
219   }
220
221   struct stat st;
222   int r = ::fstat(fd_buffered, &st);
223   if (r < 0)
224     return -errno;
225   if (S_ISBLK(st.st_mode)) {
226     (*pm)[prefix + "access_mode"] = "blk";
227     char partition_path[PATH_MAX];
228     char dev_node[PATH_MAX];
229     int rc = get_device_by_fd(fd_buffered, partition_path, dev_node, PATH_MAX);
230     switch (rc) {
231     case -EOPNOTSUPP:
232     case -EINVAL:
233       (*pm)[prefix + "partition_path"] = "unknown";
234       (*pm)[prefix + "dev_node"] = "unknown";
235       break;
236     case -ENODEV:
237       (*pm)[prefix + "partition_path"] = string(partition_path);
238       (*pm)[prefix + "dev_node"] = "unknown";
239       break;
240     default:
241       {
242         (*pm)[prefix + "partition_path"] = string(partition_path);
243         (*pm)[prefix + "dev_node"] = string(dev_node);
244         (*pm)[prefix + "model"] = get_dev_property(dev_node, "device/model");
245         (*pm)[prefix + "dev"] = get_dev_property(dev_node, "dev");
246
247         // nvme exposes a serial number
248         string serial = get_dev_property(dev_node, "device/serial");
249         if (serial.length()) {
250           (*pm)[prefix + "serial"] = serial;
251         }
252
253         // nvme has a device/device/* structure; infer from that.  there
254         // is probably a better way?
255         string nvme_vendor = get_dev_property(dev_node, "device/device/vendor");
256         if (nvme_vendor.length()) {
257           (*pm)[prefix + "type"] = "nvme";
258         }
259       }
260     }
261   } else {
262     (*pm)[prefix + "access_mode"] = "file";
263     (*pm)[prefix + "path"] = path;
264   }
265   return 0;
266 }
267
268 int KernelDevice::flush()
269 {
270   // protect flush with a mutex.  note that we are not really protecting
271   // data here.  instead, we're ensuring that if any flush() caller
272   // sees that io_since_flush is true, they block any racing callers
273   // until the flush is observed.  that allows racing threads to be
274   // calling flush while still ensuring that *any* of them that got an
275   // aio completion notification will not return before that aio is
276   // stable on disk: whichever thread sees the flag first will block
277   // followers until the aio is stable.
278   std::lock_guard<std::mutex> l(flush_mutex);
279
280   bool expect = true;
281   if (!io_since_flush.compare_exchange_strong(expect, false)) {
282     dout(10) << __func__ << " no-op (no ios since last flush), flag is "
283              << (int)io_since_flush.load() << dendl;
284     return 0;
285   }
286
287   dout(10) << __func__ << " start" << dendl;
288   if (cct->_conf->bdev_inject_crash) {
289     ++injecting_crash;
290     // sleep for a moment to give other threads a chance to submit or
291     // wait on io that races with a flush.
292     derr << __func__ << " injecting crash. first we sleep..." << dendl;
293     sleep(cct->_conf->bdev_inject_crash_flush_delay);
294     derr << __func__ << " and now we die" << dendl;
295     cct->_log->flush();
296     _exit(1);
297   }
298   utime_t start = ceph_clock_now();
299   int r = ::fdatasync(fd_direct);
300   utime_t end = ceph_clock_now();
301   utime_t dur = end - start;
302   if (r < 0) {
303     r = -errno;
304     derr << __func__ << " fdatasync got: " << cpp_strerror(r) << dendl;
305     ceph_abort();
306   }
307   dout(5) << __func__ << " in " << dur << dendl;;
308   return r;
309 }
310
311 int KernelDevice::_aio_start()
312 {
313   if (aio) {
314     dout(10) << __func__ << dendl;
315     int r = aio_queue.init();
316     if (r < 0) {
317       if (r == -EAGAIN) {
318         derr << __func__ << " io_setup(2) failed with EAGAIN; "
319              << "try increasing /proc/sys/fs/aio-max-nr" << dendl;
320       } else {
321         derr << __func__ << " io_setup(2) failed: " << cpp_strerror(r) << dendl;
322       }
323       return r;
324     }
325     aio_thread.create("bstore_aio");
326   }
327   return 0;
328 }
329
330 void KernelDevice::_aio_stop()
331 {
332   if (aio) {
333     dout(10) << __func__ << dendl;
334     aio_stop = true;
335     aio_thread.join();
336     aio_stop = false;
337     aio_queue.shutdown();
338   }
339 }
340
341 void KernelDevice::_aio_thread()
342 {
343   dout(10) << __func__ << " start" << dendl;
344   int inject_crash_count = 0;
345   while (!aio_stop) {
346     dout(40) << __func__ << " polling" << dendl;
347     int max = cct->_conf->bdev_aio_reap_max;
348     aio_t *aio[max];
349     int r = aio_queue.get_next_completed(cct->_conf->bdev_aio_poll_ms,
350                                          aio, max);
351     if (r < 0) {
352       derr << __func__ << " got " << cpp_strerror(r) << dendl;
353     }
354     if (r > 0) {
355       dout(30) << __func__ << " got " << r << " completed aios" << dendl;
356       for (int i = 0; i < r; ++i) {
357         IOContext *ioc = static_cast<IOContext*>(aio[i]->priv);
358         _aio_log_finish(ioc, aio[i]->offset, aio[i]->length);
359         if (aio[i]->queue_item.is_linked()) {
360           std::lock_guard<std::mutex> l(debug_queue_lock);
361           debug_aio_unlink(*aio[i]);
362         }
363
364         // set flag indicating new ios have completed.  we do this *before*
365         // any completion or notifications so that any user flush() that
366         // follows the observed io completion will include this io.  Note
367         // that an earlier, racing flush() could observe and clear this
368         // flag, but that also ensures that the IO will be stable before the
369         // later flush() occurs.
370         io_since_flush.store(true);
371
372         int r = aio[i]->get_return_value();
373         dout(10) << __func__ << " finished aio " << aio[i] << " r " << r
374                  << " ioc " << ioc
375                  << " with " << (ioc->num_running.load() - 1)
376                  << " aios left" << dendl;
377         assert(r >= 0);
378
379         // NOTE: once num_running and we either call the callback or
380         // call aio_wake we cannot touch ioc or aio[] as the caller
381         // may free it.
382         if (ioc->priv) {
383           if (--ioc->num_running == 0) {
384             aio_callback(aio_callback_priv, ioc->priv);
385           }
386         } else {
387           ioc->try_aio_wake();
388         }
389       }
390     }
391     if (cct->_conf->bdev_debug_aio) {
392       utime_t now = ceph_clock_now();
393       std::lock_guard<std::mutex> l(debug_queue_lock);
394       if (debug_oldest) {
395         if (debug_stall_since == utime_t()) {
396           debug_stall_since = now;
397         } else {
398           utime_t cutoff = now;
399           cutoff -= cct->_conf->bdev_debug_aio_suicide_timeout;
400           if (debug_stall_since < cutoff) {
401             derr << __func__ << " stalled aio " << debug_oldest
402                  << " since " << debug_stall_since << ", timeout is "
403                  << cct->_conf->bdev_debug_aio_suicide_timeout
404                  << "s, suicide" << dendl;
405             assert(0 == "stalled aio... buggy kernel or bad device?");
406           }
407         }
408       }
409     }
410     reap_ioc();
411     if (cct->_conf->bdev_inject_crash) {
412       ++inject_crash_count;
413       if (inject_crash_count * cct->_conf->bdev_aio_poll_ms / 1000 >
414           cct->_conf->bdev_inject_crash + cct->_conf->bdev_inject_crash_flush_delay) {
415         derr << __func__ << " bdev_inject_crash trigger from aio thread"
416              << dendl;
417         cct->_log->flush();
418         _exit(1);
419       }
420     }
421   }
422   reap_ioc();
423   dout(10) << __func__ << " end" << dendl;
424 }
425
426 void KernelDevice::_aio_log_start(
427   IOContext *ioc,
428   uint64_t offset,
429   uint64_t length)
430 {
431   dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length
432            << std::dec << dendl;
433   if (cct->_conf->bdev_debug_inflight_ios) {
434     Mutex::Locker l(debug_lock);
435     if (debug_inflight.intersects(offset, length)) {
436       derr << __func__ << " inflight overlap of 0x"
437            << std::hex
438            << offset << "~" << length << std::dec
439            << " with " << debug_inflight << dendl;
440       ceph_abort();
441     }
442     debug_inflight.insert(offset, length);
443   }
444 }
445
446 void KernelDevice::debug_aio_link(aio_t& aio)
447 {
448   if (debug_queue.empty()) {
449     debug_oldest = &aio;
450   }
451   debug_queue.push_back(aio);
452 }
453
454 void KernelDevice::debug_aio_unlink(aio_t& aio)
455 {
456   if (aio.queue_item.is_linked()) {
457     debug_queue.erase(debug_queue.iterator_to(aio));
458     if (debug_oldest == &aio) {
459       if (debug_queue.empty()) {
460         debug_oldest = nullptr;
461       } else {
462         debug_oldest = &debug_queue.front();
463       }
464       debug_stall_since = utime_t();
465     }
466   }
467 }
468
469 void KernelDevice::_aio_log_finish(
470   IOContext *ioc,
471   uint64_t offset,
472   uint64_t length)
473 {
474   dout(20) << __func__ << " " << aio << " 0x"
475            << std::hex << offset << "~" << length << std::dec << dendl;
476   if (cct->_conf->bdev_debug_inflight_ios) {
477     Mutex::Locker l(debug_lock);
478     debug_inflight.erase(offset, length);
479   }
480 }
481
482 void KernelDevice::aio_submit(IOContext *ioc)
483 {
484   dout(20) << __func__ << " ioc " << ioc
485            << " pending " << ioc->num_pending.load()
486            << " running " << ioc->num_running.load()
487            << dendl;
488
489   if (ioc->num_pending.load() == 0) {
490     return;
491   }
492
493   // move these aside, and get our end iterator position now, as the
494   // aios might complete as soon as they are submitted and queue more
495   // wal aio's.
496   list<aio_t>::iterator e = ioc->running_aios.begin();
497   ioc->running_aios.splice(e, ioc->pending_aios);
498
499   int pending = ioc->num_pending.load();
500   ioc->num_running += pending;
501   ioc->num_pending -= pending;
502   assert(ioc->num_pending.load() == 0);  // we should be only thread doing this
503   assert(ioc->pending_aios.size() == 0);
504   
505   if (cct->_conf->bdev_debug_aio) {
506     list<aio_t>::iterator p = ioc->running_aios.begin();
507     while (p != e) {
508       for (auto& io : p->iov)
509         dout(30) << __func__ << "   iov " << (void*)io.iov_base
510                  << " len " << io.iov_len << dendl;
511
512       std::lock_guard<std::mutex> l(debug_queue_lock);
513       debug_aio_link(*p++);
514     }
515   }
516
517   void *priv = static_cast<void*>(ioc);
518   int r, retries = 0;
519   r = aio_queue.submit_batch(ioc->running_aios.begin(), e, 
520                              ioc->num_running.load(), priv, &retries);
521   
522   if (retries)
523     derr << __func__ << " retries " << retries << dendl;
524   if (r < 0) {
525     derr << " aio submit got " << cpp_strerror(r) << dendl;
526     assert(r == 0);
527   }
528 }
529
530 int KernelDevice::_sync_write(uint64_t off, bufferlist &bl, bool buffered)
531 {
532   uint64_t len = bl.length();
533   dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
534           << std::dec << " buffered" << dendl;
535   if (cct->_conf->bdev_inject_crash &&
536       rand() % cct->_conf->bdev_inject_crash == 0) {
537     derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex
538          << off << "~" << len << std::dec << dendl;
539     ++injecting_crash;
540     return 0;
541   }
542   vector<iovec> iov;
543   bl.prepare_iov(&iov);
544   int r = ::pwritev(buffered ? fd_buffered : fd_direct,
545                     &iov[0], iov.size(), off);
546
547   if (r < 0) {
548     r = -errno;
549     derr << __func__ << " pwritev error: " << cpp_strerror(r) << dendl;
550     return r;
551   }
552   if (buffered) {
553     // initiate IO (but do not wait)
554     r = ::sync_file_range(fd_buffered, off, len, SYNC_FILE_RANGE_WRITE);
555     if (r < 0) {
556       r = -errno;
557       derr << __func__ << " sync_file_range error: " << cpp_strerror(r) << dendl;
558       return r;
559     }
560   }
561
562   io_since_flush.store(true);
563
564   return 0;
565 }
566
567 int KernelDevice::write(
568   uint64_t off,
569   bufferlist &bl,
570   bool buffered)
571 {
572   uint64_t len = bl.length();
573   dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
574            << (buffered ? " (buffered)" : " (direct)")
575            << dendl;
576   assert(off % block_size == 0);
577   assert(len % block_size == 0);
578   assert(len > 0);
579   assert(off < size);
580   assert(off + len <= size);
581
582   if ((!buffered || bl.get_num_buffers() >= IOV_MAX) &&
583       bl.rebuild_aligned_size_and_memory(block_size, block_size)) {
584     dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl;
585   }
586   dout(40) << "data: ";
587   bl.hexdump(*_dout);
588   *_dout << dendl;
589
590   return _sync_write(off, bl, buffered);
591 }
592
593 int KernelDevice::aio_write(
594   uint64_t off,
595   bufferlist &bl,
596   IOContext *ioc,
597   bool buffered)
598 {
599   uint64_t len = bl.length();
600   dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
601            << (buffered ? " (buffered)" : " (direct)")
602            << dendl;
603   assert(off % block_size == 0);
604   assert(len % block_size == 0);
605   assert(len > 0);
606   assert(off < size);
607   assert(off + len <= size);
608
609   if ((!buffered || bl.get_num_buffers() >= IOV_MAX) &&
610       bl.rebuild_aligned_size_and_memory(block_size, block_size)) {
611     dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl;
612   }
613   dout(40) << "data: ";
614   bl.hexdump(*_dout);
615   *_dout << dendl;
616
617   _aio_log_start(ioc, off, len);
618
619 #ifdef HAVE_LIBAIO
620   if (aio && dio && !buffered) {
621     ioc->pending_aios.push_back(aio_t(ioc, fd_direct));
622     ++ioc->num_pending;
623     aio_t& aio = ioc->pending_aios.back();
624     if (cct->_conf->bdev_inject_crash &&
625         rand() % cct->_conf->bdev_inject_crash == 0) {
626       derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex
627            << off << "~" << len << std::dec
628            << dendl;
629       // generate a real io so that aio_wait behaves properly, but make it
630       // a read instead of write, and toss the result.
631       aio.pread(off, len);
632       ++injecting_crash;
633     } else {
634       bl.prepare_iov(&aio.iov);
635       for (unsigned i=0; i<aio.iov.size(); ++i) {
636         dout(30) << "aio " << i << " " << aio.iov[i].iov_base
637                  << " " << aio.iov[i].iov_len << dendl;
638       }
639       aio.bl.claim_append(bl);
640       aio.pwritev(off, len);
641     }
642     dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
643             << std::dec << " aio " << &aio << dendl;
644   } else
645 #endif
646   {
647     int r = _sync_write(off, bl, buffered);
648     _aio_log_finish(ioc, off, len);
649     if (r < 0)
650       return r;
651   }
652   return 0;
653 }
654
655 int KernelDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
656                       IOContext *ioc,
657                       bool buffered)
658 {
659   dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
660           << (buffered ? " (buffered)" : " (direct)")
661           << dendl;
662   assert(off % block_size == 0);
663   assert(len % block_size == 0);
664   assert(len > 0);
665   assert(off < size);
666   assert(off + len <= size);
667
668   _aio_log_start(ioc, off, len);
669
670   bufferptr p = buffer::create_page_aligned(len);
671   int r = ::pread(buffered ? fd_buffered : fd_direct,
672                   p.c_str(), len, off);
673   if (r < 0) {
674     r = -errno;
675     goto out;
676   }
677   assert((uint64_t)r == len);
678   pbl->push_back(std::move(p));
679
680   dout(40) << "data: ";
681   pbl->hexdump(*_dout);
682   *_dout << dendl;
683
684  out:
685   _aio_log_finish(ioc, off, len);
686   return r < 0 ? r : 0;
687 }
688
689 int KernelDevice::aio_read(
690   uint64_t off,
691   uint64_t len,
692   bufferlist *pbl,
693   IOContext *ioc)
694 {
695   dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
696           << dendl;
697
698   int r = 0;
699 #ifdef HAVE_LIBAIO
700   if (aio && dio) {
701     _aio_log_start(ioc, off, len);
702     ioc->pending_aios.push_back(aio_t(ioc, fd_direct));
703     ++ioc->num_pending;
704     aio_t& aio = ioc->pending_aios.back();
705     aio.pread(off, len);
706     for (unsigned i=0; i<aio.iov.size(); ++i) {
707       dout(30) << "aio " << i << " " << aio.iov[i].iov_base
708                << " " << aio.iov[i].iov_len << dendl;
709     }
710     pbl->append(aio.bl);
711     dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
712             << std::dec << " aio " << &aio << dendl;
713   } else
714 #endif
715   {
716     r = read(off, len, pbl, ioc, false);
717   }
718
719   return r;
720 }
721
722 int KernelDevice::direct_read_unaligned(uint64_t off, uint64_t len, char *buf)
723 {
724   uint64_t aligned_off = align_down(off, block_size);
725   uint64_t aligned_len = align_up(off+len, block_size) - aligned_off;
726   bufferptr p = buffer::create_page_aligned(aligned_len);
727   int r = 0;
728
729   r = ::pread(fd_direct, p.c_str(), aligned_len, aligned_off);
730   if (r < 0) {
731     r = -errno;
732     derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec 
733       << " error: " << cpp_strerror(r) << dendl;
734     goto out;
735   }
736   assert((uint64_t)r == aligned_len);
737   memcpy(buf, p.c_str() + (off - aligned_off), len);
738
739   dout(40) << __func__ << " data: ";
740   bufferlist bl;
741   bl.append(buf, len);
742   bl.hexdump(*_dout);
743   *_dout << dendl;
744
745  out:
746   return r < 0 ? r : 0;
747 }
748
749 int KernelDevice::read_random(uint64_t off, uint64_t len, char *buf,
750                        bool buffered)
751 {
752   dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
753           << dendl;
754   assert(len > 0);
755   assert(off < size);
756   assert(off + len <= size);
757   int r = 0;
758
759   //if it's direct io and unaligned, we have to use a internal buffer
760   if (!buffered && ((off % block_size != 0)
761                     || (len % block_size != 0)
762                     || (uintptr_t(buf) % CEPH_PAGE_SIZE != 0)))
763     return direct_read_unaligned(off, len, buf);
764
765   if (buffered) {
766     //buffered read
767     char *t = buf;
768     uint64_t left = len;
769     while (left > 0) {
770       r = ::pread(fd_buffered, t, left, off);
771       if (r < 0) {
772         r = -errno;
773         derr << __func__ << " 0x" << std::hex << off << "~" << left 
774           << std::dec << " error: " << cpp_strerror(r) << dendl;
775         goto out;
776       }
777       off += r;
778       t += r;
779       left -= r;
780     }
781   } else {
782     //direct and aligned read
783     r = ::pread(fd_direct, buf, len, off);
784     if (r < 0) {
785       r = -errno;
786       derr << __func__ << " direct_aligned_read" << " 0x" << std::hex 
787         << off << "~" << left << std::dec << " error: " << cpp_strerror(r) 
788         << dendl;
789       goto out;
790     }
791     assert((uint64_t)r == len);
792   }
793
794   dout(40) << __func__ << " data: ";
795   bufferlist bl;
796   bl.append(buf, len);
797   bl.hexdump(*_dout);
798   *_dout << dendl;
799
800  out:
801   return r < 0 ? r : 0;
802 }
803
804 int KernelDevice::invalidate_cache(uint64_t off, uint64_t len)
805 {
806   dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
807           << dendl;
808   assert(off % block_size == 0);
809   assert(len % block_size == 0);
810   int r = posix_fadvise(fd_buffered, off, len, POSIX_FADV_DONTNEED);
811   if (r) {
812     r = -r;
813     derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
814          << " error: " << cpp_strerror(r) << dendl;
815   }
816   return r;
817 }
818