1 From 9f8ff821dfc98dfc3cdb557b736ce455a3ae6162 Mon Sep 17 00:00:00 2001
2 From: Yuan Zhou <yuan.zhou@intel.com>
3 Date: Thu, 16 Aug 2018 17:28:46 +0800
4 Subject: [PATCH 08/10] librbd: implement async cache lookup and read
6 Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
8 .../SharedPersistentObjectCacherObjectDispatch.cc | 63 ++++++++++++----------
9 .../SharedPersistentObjectCacherObjectDispatch.h | 7 +++
10 src/tools/rbd_cache/CacheController.cc | 9 ++--
11 src/tools/rbd_cache/CacheControllerSocket.hpp | 8 ++-
12 .../rbd_cache/CacheControllerSocketClient.hpp | 49 +++++++++--------
13 src/tools/rbd_cache/CacheControllerSocketCommon.h | 12 +++++
14 6 files changed, 94 insertions(+), 54 deletions(-)
16 diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
17 index 2aa5cad..407ce49 100644
18 --- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
19 +++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
20 @@ -29,13 +29,8 @@ SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjec
23 SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() {
24 - if (m_object_store) {
25 delete m_object_store;
28 - if (m_cache_client) {
29 delete m_cache_client;
34 @@ -88,34 +83,48 @@ bool SharedPersistentObjectCacherObjectDispatch<I>::read(
35 ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
36 << object_len << dendl;
38 - // ensure we aren't holding the cache lock post-read
39 on_dispatched = util::create_async_context_callback(*m_image_ctx,
41 + auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) {
42 + handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched);
45 if (m_cache_client && m_cache_client->connected && m_object_store) {
47 m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(),
48 - m_image_ctx->id, oid, &exists);
50 - // try to read from parent image
51 - ldout(cct, 20) << "SRO cache object exists:" << exists << dendl;
53 - int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
55 - *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
56 - on_dispatched->complete(r);
60 + m_image_ctx->id, oid, ctx);
63 - ldout(cct, 20) << "Continue read from RADOS" << dendl;
64 - *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
65 - on_dispatched->complete(0);
70 +int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache(
72 + const std::string &oid, uint64_t object_off, uint64_t object_len,
73 + ceph::bufferlist* read_data, io::DispatchResult* dispatch_result,
74 + Context* on_dispatched) {
75 + // IO chained in reverse order
76 + auto cct = m_image_ctx->cct;
77 + ldout(cct, 20) << dendl;
79 + // try to read from parent image
81 + int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
83 + *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
84 + //TODO(): complete in syncfile
85 + on_dispatched->complete(r);
86 + ldout(cct, 20) << "AAAAcomplete=" << *dispatch_result <<dendl;
90 + *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
91 + on_dispatched->complete(0);
92 + ldout(cct, 20) << "BBB no cache" << *dispatch_result <<dendl;
97 +template <typename I>
98 void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) {
99 auto cct = m_image_ctx->cct;
100 ldout(cct, 20) << dendl;
101 @@ -123,26 +132,26 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s
102 rbd::cache::rbdsc_req_type_t *io_ctx = (rbd::cache::rbdsc_req_type_t*)(msg.c_str());
104 switch (io_ctx->type) {
105 - case RBDSC_REGISTER_REPLY: {
106 + case rbd::cache::RBDSC_REGISTER_REPLY: {
107 // open cache handler for volume
108 ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
109 m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
113 - case RBDSC_READ_REPLY: {
114 + case rbd::cache::RBDSC_READ_REPLY: {
115 ldout(cct, 20) << "SRO cache client start to read cache" << dendl;
116 //TODO(): should call read here
120 - case RBDSC_READ_RADOS: {
121 + case rbd::cache::RBDSC_READ_RADOS: {
122 ldout(cct, 20) << "SRO cache client start to read rados" << dendl;
123 //TODO(): should call read here
127 - default: ldout(cct, 20) << "nothing" << dendl;
128 + default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl;
132 diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
133 index 200688f..36b868a 100644
134 --- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
135 +++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
136 @@ -112,6 +112,13 @@ public:
140 + int handle_read_cache(
142 + const std::string &oid, uint64_t object_off,
143 + uint64_t object_len, ceph::bufferlist* read_data,
144 + io::DispatchResult* dispatch_result,
145 + Context* on_dispatched);
147 ImageCtxT* m_image_ctx;
149 void client_handle_request(std::string msg);
150 diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc
151 index cefcf28..c9d674b 100644
152 --- a/src/tools/rbd_cache/CacheController.cc
153 +++ b/src/tools/rbd_cache/CacheController.cc
154 @@ -76,7 +76,7 @@ void CacheController::run() {
158 -void CacheController::handle_request(uint64_t sesstion_id, std::string msg){
159 +void CacheController::handle_request(uint64_t session_id, std::string msg){
160 rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
163 @@ -86,7 +86,7 @@ void CacheController::handle_request(uint64_t sesstion_id, std::string msg){
164 // init cache layout for volume
165 m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size);
166 io_ctx->type = RBDSC_REGISTER_REPLY;
167 - m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size()));
168 + m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
172 @@ -98,7 +98,10 @@ void CacheController::handle_request(uint64_t sesstion_id, std::string msg){
174 io_ctx->type = RBDSC_READ_REPLY;
176 - m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size()));
177 + if (io_ctx->type != RBDSC_READ_REPLY) {
180 + m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
184 diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp
185 index 967af1d..d178b58 100644
186 --- a/src/tools/rbd_cache/CacheControllerSocket.hpp
187 +++ b/src/tools/rbd_cache/CacheControllerSocket.hpp
188 @@ -43,7 +43,9 @@ public:
189 void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
193 + if(bytes_transferred != 544){
196 process_msg(session_id, std::string(data_, bytes_transferred));
199 @@ -51,7 +53,8 @@ public:
201 void handle_write(const boost::system::error_code& error) {
203 - socket_.async_read_some(boost::asio::buffer(data_),
204 + boost::asio::async_read(socket_, boost::asio::buffer(data_),
205 + boost::asio::transfer_exactly(544),
206 boost::bind(&session::handle_read,
208 boost::asio::placeholders::error,
209 @@ -63,6 +66,7 @@ public:
211 boost::asio::async_write(socket_,
212 boost::asio::buffer(msg.c_str(), msg.size()),
213 + boost::asio::transfer_exactly(544),
214 boost::bind(&session::handle_write,
216 boost::asio::placeholders::error));
217 diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
218 index 56b79ce..3b0ca00 100644
219 --- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp
220 +++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
222 #include <boost/bind.hpp>
223 #include <boost/algorithm/string.hpp>
224 #include "include/assert.h"
225 +#include "include/Context.h"
226 #include "CacheControllerSocketCommon.h"
229 @@ -26,8 +27,12 @@ public:
230 m_client_process_msg(processmsg),
231 ep_(stream_protocol::endpoint(file))
233 - std::thread thd([this](){io_service_.run(); });
235 + io_thread.reset(new std::thread([this](){io_service_.run(); }));
239 + io_service_.stop();
244 @@ -53,7 +58,8 @@ public:
247 boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()),
248 - [this](const boost::system::error_code& err, size_t cb) {
249 + [this, message](const boost::system::error_code& err, size_t cb) {
252 boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
253 boost::asio::transfer_exactly(544),
254 @@ -72,7 +78,7 @@ public:
258 - int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, bool* result) {
259 + int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) {
260 rbdsc_req_type_t *message = new rbdsc_req_type_t();
261 message->type = RBDSC_READ;
262 memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
263 @@ -82,49 +88,48 @@ public:
266 boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()),
267 - [this, result](const boost::system::error_code& err, size_t cb) {
268 + [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
271 - get_result(result);
272 + get_result(on_finish);
277 - std::unique_lock<std::mutex> lk(m);
279 - cv.wait_for(lk, std::chrono::milliseconds(100));
284 - void get_result(bool* result) {
285 + void get_result(Context* on_finish) {
286 boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
287 boost::asio::transfer_exactly(544),
288 - [this, result](const boost::system::error_code& err, size_t cb) {
289 + [this, on_finish](const boost::system::error_code& err, size_t cb) {
294 rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(buffer_);
295 if (io_ctx->type == RBDSC_READ_REPLY) {
297 + on_finish->complete(true);
301 + on_finish->complete(false);
305 - m_client_process_msg(std::string(buffer_, cb));
309 + return on_finish->complete(false);
314 - void handle_connect(const boost::system::error_code& error) {
315 - //TODO(): open librbd snap
318 - void handle_write(const boost::system::error_code& error) {
322 boost::asio::io_service& io_service_;
323 boost::asio::io_service::work io_service_work_;
324 stream_protocol::socket socket_;
326 + std::shared_ptr<std::thread> io_thread;
327 ClientProcessMsg m_client_process_msg;
328 stream_protocol::endpoint ep_;
330 diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h
331 index a9d73a8..e026ec8 100644
332 --- a/src/tools/rbd_cache/CacheControllerSocketCommon.h
333 +++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h
335 #ifndef CACHE_CONTROLLER_SOCKET_COMMON_H
336 #define CACHE_CONTROLLER_SOCKET_COMMON_H
339 #define RBDSC_REGISTER 0X11
340 #define RBDSC_READ 0X12
341 #define RBDSC_LOOKUP 0X13
343 #define RBDSC_READ_REPLY 0X15
344 #define RBDSC_LOOKUP_REPLY 0X16
345 #define RBDSC_READ_RADOS 0X17
351 +static const int RBDSC_REGISTER = 0X11;
352 +static const int RBDSC_READ = 0X12;
353 +static const int RBDSC_LOOKUP = 0X13;
354 +static const int RBDSC_REGISTER_REPLY = 0X14;
355 +static const int RBDSC_READ_REPLY = 0X15;
356 +static const int RBDSC_LOOKUP_REPLY = 0X16;
357 +static const int RBDSC_READ_RADOS = 0X17;
361 typedef std::function<void(uint64_t, std::string)> ProcessMsg;
362 typedef std::function<void(std::string)> ClientProcessMsg;
363 typedef uint8_t rbdsc_req_type;