complete the release-notes
[stor4nfv.git] / src / ceph / 0008-librbd-implement-async-cache-lookup-and-read.patch
1 From 9f8ff821dfc98dfc3cdb557b736ce455a3ae6162 Mon Sep 17 00:00:00 2001
2 From: Yuan Zhou <yuan.zhou@intel.com>
3 Date: Thu, 16 Aug 2018 17:28:46 +0800
4 Subject: [PATCH 08/10] librbd: implement async cache lookup and read
5
6 Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
7 ---
8  .../SharedPersistentObjectCacherObjectDispatch.cc  | 63 ++++++++++++----------
9  .../SharedPersistentObjectCacherObjectDispatch.h   |  7 +++
10  src/tools/rbd_cache/CacheController.cc             |  9 ++--
11  src/tools/rbd_cache/CacheControllerSocket.hpp      |  8 ++-
12  .../rbd_cache/CacheControllerSocketClient.hpp      | 49 +++++++++--------
13  src/tools/rbd_cache/CacheControllerSocketCommon.h  | 12 +++++
14  6 files changed, 94 insertions(+), 54 deletions(-)
15
16 diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
17 index 2aa5cad..407ce49 100644
18 --- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
19 +++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
20 @@ -29,13 +29,8 @@ SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjec
21  
22  template <typename I>
23  SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() {
24 -  if (m_object_store) {
25      delete m_object_store;
26 -  }
27 -
28 -  if (m_cache_client) {
29      delete m_cache_client;
30 -  }
31  }
32  
33  template <typename I>
34 @@ -88,34 +83,48 @@ bool SharedPersistentObjectCacherObjectDispatch<I>::read(
35    ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
36                   << object_len << dendl;
37  
38 -  // ensure we aren't holding the cache lock post-read
39    on_dispatched = util::create_async_context_callback(*m_image_ctx,
40                                                        on_dispatched);
41 +  auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) {
42 +    handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched);
43 +  });
44  
45    if (m_cache_client && m_cache_client->connected && m_object_store) {
46 -    bool exists;
47      m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(),
48 -      m_image_ctx->id, oid, &exists);
49 -
50 -    // try to read from parent image
51 -    ldout(cct, 20) << "SRO cache object exists:" << exists << dendl;
52 -    if (exists) {
53 -      int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
54 -      if (r != 0) {
55 -        *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
56 -       on_dispatched->complete(r);
57 -        return true;
58 -      }
59 -    }
60 +      m_image_ctx->id, oid, ctx);
61    }
62 -
63 -  ldout(cct, 20) << "Continue read from RADOS" << dendl;
64 -  *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
65 -  on_dispatched->complete(0);
66    return true;
67  }
68  
69  template <typename I>
70 +int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache(
71 +    bool cache,
72 +    const std::string &oid, uint64_t object_off, uint64_t object_len,
73 +    ceph::bufferlist* read_data, io::DispatchResult* dispatch_result,
74 +    Context* on_dispatched) {
75 +  // IO chained in reverse order
76 +  auto cct = m_image_ctx->cct;
77 +  ldout(cct, 20) << dendl;
78 +
79 +  // try to read from parent image
80 +  if (cache) {
81 +    int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
82 +    if (r != 0) {
83 +      *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
84 +      //TODO(): complete in syncfile
85 +      on_dispatched->complete(r);
86 +      ldout(cct, 20) << "AAAAcomplete=" << *dispatch_result <<dendl;
87 +      return true;
88 +    }
89 +  } else {
90 +    *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
91 +    on_dispatched->complete(0);
92 +    ldout(cct, 20) << "BBB no cache" << *dispatch_result <<dendl;
93 +    return false;
94 +  }
95 +}
96 +
97 +template <typename I>
98  void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) {
99    auto cct = m_image_ctx->cct;
100    ldout(cct, 20) << dendl;
101 @@ -123,26 +132,26 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s
102    rbd::cache::rbdsc_req_type_t *io_ctx = (rbd::cache::rbdsc_req_type_t*)(msg.c_str());
103  
104    switch (io_ctx->type) {
105 -    case RBDSC_REGISTER_REPLY: {
106 +    case rbd::cache::RBDSC_REGISTER_REPLY: {
107        // open cache handler for volume        
108        ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
109        m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
110  
111        break;
112      }
113 -    case RBDSC_READ_REPLY: {
114 +    case rbd::cache::RBDSC_READ_REPLY: {
115        ldout(cct, 20) << "SRO cache client start to read cache" << dendl;
116        //TODO(): should call read here
117  
118        break;
119      }
120 -    case RBDSC_READ_RADOS: {
121 +    case rbd::cache::RBDSC_READ_RADOS: {
122        ldout(cct, 20) << "SRO cache client start to read rados" << dendl;
123        //TODO(): should call read here
124  
125        break;
126      }
127 -    default: ldout(cct, 20) << "nothing" << dendl;
128 +    default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl;
129        break;
130      
131    }
132 diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
133 index 200688f..36b868a 100644
134 --- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
135 +++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
136 @@ -112,6 +112,13 @@ public:
137  
138  private:
139  
140 +  int handle_read_cache(
141 +      bool cache,
142 +      const std::string &oid, uint64_t object_off,
143 +      uint64_t object_len, ceph::bufferlist* read_data,
144 +      io::DispatchResult* dispatch_result,
145 +      Context* on_dispatched);
146 +
147    ImageCtxT* m_image_ctx;
148  
149    void client_handle_request(std::string msg);
150 diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc
151 index cefcf28..c9d674b 100644
152 --- a/src/tools/rbd_cache/CacheController.cc
153 +++ b/src/tools/rbd_cache/CacheController.cc
154 @@ -76,7 +76,7 @@ void CacheController::run() {
155    }
156  }
157  
158 -void CacheController::handle_request(uint64_t sesstion_id, std::string msg){
159 +void CacheController::handle_request(uint64_t session_id, std::string msg){
160    rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
161  
162    int ret = 0;
163 @@ -86,7 +86,7 @@ void CacheController::handle_request(uint64_t sesstion_id, std::string msg){
164        // init cache layout for volume        
165        m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size);
166        io_ctx->type = RBDSC_REGISTER_REPLY;
167 -      m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size()));
168 +      m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
169  
170        break;
171      }
172 @@ -98,7 +98,10 @@ void CacheController::handle_request(uint64_t sesstion_id, std::string msg){
173        } else {
174          io_ctx->type = RBDSC_READ_REPLY;
175        }
176 -      m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size()));
177 +      if (io_ctx->type != RBDSC_READ_REPLY) {
178 +        assert(0);
179 +      }
180 +      m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
181  
182        break;
183      }
184 diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp
185 index 967af1d..d178b58 100644
186 --- a/src/tools/rbd_cache/CacheControllerSocket.hpp
187 +++ b/src/tools/rbd_cache/CacheControllerSocket.hpp
188 @@ -43,7 +43,9 @@ public:
189    void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
190  
191      if (!error) {
192 -     
193 +      if(bytes_transferred != 544){
194 +       assert(0);
195 +      }
196        process_msg(session_id, std::string(data_, bytes_transferred));
197  
198      }
199 @@ -51,7 +53,8 @@ public:
200  
201    void handle_write(const boost::system::error_code& error) {
202      if (!error) {
203 -      socket_.async_read_some(boost::asio::buffer(data_),
204 +    boost::asio::async_read(socket_, boost::asio::buffer(data_),
205 +                            boost::asio::transfer_exactly(544),
206            boost::bind(&session::handle_read,
207              shared_from_this(),
208              boost::asio::placeholders::error,
209 @@ -63,6 +66,7 @@ public:
210  
211        boost::asio::async_write(socket_,
212            boost::asio::buffer(msg.c_str(), msg.size()),
213 +          boost::asio::transfer_exactly(544),
214            boost::bind(&session::handle_write,
215              shared_from_this(),
216              boost::asio::placeholders::error));
217 diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
218 index 56b79ce..3b0ca00 100644
219 --- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp
220 +++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
221 @@ -8,6 +8,7 @@
222  #include <boost/bind.hpp>
223  #include <boost/algorithm/string.hpp>
224  #include "include/assert.h"
225 +#include "include/Context.h"
226  #include "CacheControllerSocketCommon.h"
227  
228  
229 @@ -26,8 +27,12 @@ public:
230        m_client_process_msg(processmsg),
231        ep_(stream_protocol::endpoint(file))
232    {
233 -     std::thread thd([this](){io_service_.run(); });
234 -     thd.detach();
235 +     io_thread.reset(new std::thread([this](){io_service_.run(); }));
236 +  }
237 +
238 +  ~CacheClient() {
239 +    io_service_.stop();
240 +    io_thread->join();
241    }
242  
243    void run(){
244 @@ -53,7 +58,8 @@ public:
245      message->offset = 0;
246      message->length = 0;
247      boost::asio::async_write(socket_,  boost::asio::buffer((char*)message, message->size()),
248 -        [this](const boost::system::error_code& err, size_t cb) {
249 +        [this, message](const boost::system::error_code& err, size_t cb) {
250 +        delete message;
251          if (!err) {
252            boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
253                boost::asio::transfer_exactly(544),
254 @@ -72,7 +78,7 @@ public:
255      return 0;
256    }
257  
258 -  int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, bool* result) {
259 +  int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) {
260      rbdsc_req_type_t *message = new rbdsc_req_type_t();
261      message->type = RBDSC_READ;
262      memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
263 @@ -82,49 +88,48 @@ public:
264      message->length = 0;
265  
266      boost::asio::async_write(socket_,  boost::asio::buffer((char*)message, message->size()),
267 -        [this, result](const boost::system::error_code& err, size_t cb) {
268 +        [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
269 +        delete message;
270          if (!err) {
271 -          get_result(result);
272 +          get_result(on_finish);
273          } else {
274            return -1;
275          }
276      });
277 -    std::unique_lock<std::mutex> lk(m);
278 -    //cv.wait(lk);
279 -    cv.wait_for(lk, std::chrono::milliseconds(100));
280 +
281      return 0;
282    }
283  
284 -  void get_result(bool* result) {
285 +  void get_result(Context* on_finish) {
286      boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
287          boost::asio::transfer_exactly(544),
288 -        [this, result](const boost::system::error_code& err, size_t cb) {
289 +        [this, on_finish](const boost::system::error_code& err, size_t cb) {
290 +        if (cb != 544) {
291 +         assert(0);
292 +        }
293          if (!err) {
294             rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(buffer_);
295              if (io_ctx->type == RBDSC_READ_REPLY) {
296 -             *result = true;
297 +             on_finish->complete(true);
298 +              return;
299              } else {
300 -             *result = false;
301 +             on_finish->complete(false);
302 +              return;
303              }
304 -            cv.notify_one();
305 -            m_client_process_msg(std::string(buffer_, cb));
306          } else {
307 -            return -1;
308 +           assert(0);
309 +            return on_finish->complete(false);
310          }
311      });
312    }
313  
314 -  void handle_connect(const boost::system::error_code& error) {
315 -    //TODO(): open librbd snap
316 -  }
317 -
318 -  void handle_write(const boost::system::error_code& error) {
319 -  }
320  
321  private:
322    boost::asio::io_service& io_service_;
323    boost::asio::io_service::work io_service_work_;
324    stream_protocol::socket socket_;
325 +
326 +  std::shared_ptr<std::thread> io_thread;
327    ClientProcessMsg m_client_process_msg;
328    stream_protocol::endpoint ep_;
329    char buffer_[1024];
330 diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h
331 index a9d73a8..e026ec8 100644
332 --- a/src/tools/rbd_cache/CacheControllerSocketCommon.h
333 +++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h
334 @@ -4,6 +4,7 @@
335  #ifndef CACHE_CONTROLLER_SOCKET_COMMON_H
336  #define CACHE_CONTROLLER_SOCKET_COMMON_H
337  
338 +/*
339  #define RBDSC_REGISTER         0X11
340  #define RBDSC_READ             0X12
341  #define RBDSC_LOOKUP           0X13
342 @@ -11,10 +12,21 @@
343  #define RBDSC_READ_REPLY       0X15
344  #define RBDSC_LOOKUP_REPLY     0X16
345  #define RBDSC_READ_RADOS       0X17
346 +*/
347  
348  namespace rbd {
349  namespace cache {
350  
351 +static const int RBDSC_REGISTER        =  0X11;
352 +static const int RBDSC_READ            =  0X12;
353 +static const int RBDSC_LOOKUP          =  0X13;
354 +static const int RBDSC_REGISTER_REPLY  =  0X14;
355 +static const int RBDSC_READ_REPLY      =  0X15;
356 +static const int RBDSC_LOOKUP_REPLY    =  0X16;
357 +static const int RBDSC_READ_RADOS      =  0X17;
358 +
359 +
360 +
361  typedef std::function<void(uint64_t, std::string)> ProcessMsg;
362  typedef std::function<void(std::string)> ClientProcessMsg;
363  typedef uint8_t rbdsc_req_type;
364 -- 
365 2.7.4
366