Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / librbd / io / ReadResult.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/io/ReadResult.h"
5 #include "include/buffer.h"
6 #include "common/dout.h"
7 #include "librbd/io/AioCompletion.h"
8 #include <boost/variant/apply_visitor.hpp>
9 #include <boost/variant/static_visitor.hpp>
10
11 #define dout_subsys ceph_subsys_rbd
12 #undef dout_prefix
13 #define dout_prefix *_dout << "librbd::io::ReadResult: " << this \
14                            << " " << __func__ << ": "
15
16 namespace librbd {
17 namespace io {
18
19 struct ReadResult::SetClipLengthVisitor : public boost::static_visitor<void> {
20   size_t length;
21
22   SetClipLengthVisitor(size_t length) : length(length) {
23   }
24
25   void operator()(Linear &linear) const {
26     assert(length <= linear.buf_len);
27     linear.buf_len = length;
28   }
29
30   template <typename T>
31   void operator()(T &t) const {
32   }
33 };
34
35 struct ReadResult::AssembleResultVisitor : public boost::static_visitor<void> {
36   CephContext *cct;
37   Striper::StripedReadResult &destriper;
38
39   AssembleResultVisitor(CephContext *cct, Striper::StripedReadResult &destriper)
40     : cct(cct), destriper(destriper) {
41   }
42
43   void operator()(Empty &empty) const {
44     ldout(cct, 20) << "dropping read result" << dendl;
45   }
46
47   void operator()(Linear &linear) const {
48     ldout(cct, 20) << "copying resulting bytes to "
49                    << reinterpret_cast<void*>(linear.buf) << dendl;
50     destriper.assemble_result(cct, linear.buf, linear.buf_len);
51   }
52
53   void operator()(Vector &vector) const {
54     bufferlist bl;
55     destriper.assemble_result(cct, bl, true);
56
57     ldout(cct, 20) << "copying resulting " << bl.length() << " bytes to iovec "
58                    << reinterpret_cast<const void*>(vector.iov) << dendl;
59
60     bufferlist::iterator it = bl.begin();
61     size_t length = bl.length();
62     size_t offset = 0;
63     int idx = 0;
64     for (; offset < length && idx < vector.iov_count; idx++) {
65       size_t len = MIN(vector.iov[idx].iov_len, length - offset);
66       it.copy(len, static_cast<char *>(vector.iov[idx].iov_base));
67       offset += len;
68     }
69     assert(offset == bl.length());
70   }
71
72   void operator()(Bufferlist &bufferlist) const {
73     bufferlist.bl->clear();
74     destriper.assemble_result(cct, *bufferlist.bl, true);
75
76     ldout(cct, 20) << "moved resulting " << bufferlist.bl->length() << " "
77                    << "bytes to bl " << reinterpret_cast<void*>(bufferlist.bl)
78                    << dendl;
79   }
80 };
81
82 ReadResult::C_ReadRequest::C_ReadRequest(AioCompletion *aio_completion)
83   : aio_completion(aio_completion) {
84   aio_completion->add_request();
85 }
86
87 void ReadResult::C_ReadRequest::finish(int r) {
88   aio_completion->complete_request(r);
89 }
90
91 void ReadResult::C_ImageReadRequest::finish(int r) {
92   CephContext *cct = aio_completion->ictx->cct;
93   ldout(cct, 10) << "C_ImageReadRequest: r=" << r
94                  << dendl;
95   if (r >= 0) {
96     size_t length = 0;
97     for (auto &image_extent : image_extents) {
98       length += image_extent.second;
99     }
100     assert(length == bl.length());
101
102     aio_completion->lock.Lock();
103     aio_completion->read_result.m_destriper.add_partial_result(
104       cct, bl, image_extents);
105     aio_completion->lock.Unlock();
106     r = length;
107   }
108
109   C_ReadRequest::finish(r);
110 }
111
112 void ReadResult::C_SparseReadRequestBase::finish(ExtentMap &extent_map,
113                                                  const Extents &buffer_extents,
114                                                  uint64_t offset, size_t length,
115                                                  bufferlist &bl, int r) {
116   aio_completion->lock.Lock();
117   CephContext *cct = aio_completion->ictx->cct;
118   ldout(cct, 10) << "C_SparseReadRequestBase: r = " << r
119                  << dendl;
120
121   if (r >= 0 || r == -ENOENT) { // this was a sparse_read operation
122     ldout(cct, 10) << " got " << extent_map
123                    << " for " << buffer_extents
124                    << " bl " << bl.length() << dendl;
125     // reads from the parent don't populate the m_ext_map and the overlap
126     // may not be the full buffer.  compensate here by filling in m_ext_map
127     // with the read extent when it is empty.
128     if (extent_map.empty()) {
129       extent_map[offset] = bl.length();
130     }
131
132     aio_completion->read_result.m_destriper.add_partial_sparse_result(
133       cct, bl, extent_map, offset, buffer_extents);
134     r = length;
135   }
136   aio_completion->lock.Unlock();
137
138   C_ReadRequest::finish(r);
139 }
140
141 ReadResult::ReadResult() : m_buffer(Empty()) {
142 }
143
144 ReadResult::ReadResult(char *buf, size_t buf_len)
145   : m_buffer(Linear(buf, buf_len)) {
146 }
147
148 ReadResult::ReadResult(const struct iovec *iov, int iov_count)
149   : m_buffer(Vector(iov, iov_count)) {
150 }
151
152 ReadResult::ReadResult(ceph::bufferlist *bl)
153   : m_buffer(Bufferlist(bl)) {
154 }
155
156 void ReadResult::set_clip_length(size_t length) {
157   boost::apply_visitor(SetClipLengthVisitor(length), m_buffer);
158 }
159
160 void ReadResult::assemble_result(CephContext *cct) {
161   boost::apply_visitor(AssembleResultVisitor(cct, m_destriper), m_buffer);
162 }
163
164 } // namespace io
165 } // namespace librbd
166