X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?p=stor4nfv.git;a=blobdiff_plain;f=src%2Fceph%2F0008-librbd-implement-async-cache-lookup-and-read.patch;fp=src%2Fceph%2F0008-librbd-implement-async-cache-lookup-and-read.patch;h=16dd41fb9d5870d24999a347cc00353460b66570;hp=0000000000000000000000000000000000000000;hb=d65e22d27ab305d38059046dae60d7a66ff4a4e0;hpb=828acdd1d5c5c2aeef287aa69e473bf44fcbce70 diff --git a/src/ceph/0008-librbd-implement-async-cache-lookup-and-read.patch b/src/ceph/0008-librbd-implement-async-cache-lookup-and-read.patch new file mode 100644 index 0000000..16dd41f --- /dev/null +++ b/src/ceph/0008-librbd-implement-async-cache-lookup-and-read.patch @@ -0,0 +1,366 @@ +From 9f8ff821dfc98dfc3cdb557b736ce455a3ae6162 Mon Sep 17 00:00:00 2001 +From: Yuan Zhou +Date: Thu, 16 Aug 2018 17:28:46 +0800 +Subject: [PATCH 08/10] librbd: implement async cache lookup and read + +Signed-off-by: Yuan Zhou +--- + .../SharedPersistentObjectCacherObjectDispatch.cc | 63 ++++++++++++---------- + .../SharedPersistentObjectCacherObjectDispatch.h | 7 +++ + src/tools/rbd_cache/CacheController.cc | 9 ++-- + src/tools/rbd_cache/CacheControllerSocket.hpp | 8 ++- + .../rbd_cache/CacheControllerSocketClient.hpp | 49 +++++++++-------- + src/tools/rbd_cache/CacheControllerSocketCommon.h | 12 +++++ + 6 files changed, 94 insertions(+), 54 deletions(-) + +diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc +index 2aa5cad..407ce49 100644 +--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc ++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc +@@ -29,13 +29,8 @@ SharedPersistentObjectCacherObjectDispatch::SharedPersistentObjectCacherObjec + + template + SharedPersistentObjectCacherObjectDispatch::~SharedPersistentObjectCacherObjectDispatch() { +- if (m_object_store) { + delete m_object_store; +- } +- +- if (m_cache_client) { + delete m_cache_client; +- } + } + + template +@@ -88,34 +83,48 @@ bool SharedPersistentObjectCacherObjectDispatch::read( + ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~" + << object_len << dendl; + +- // ensure we aren't holding the cache lock post-read + on_dispatched = util::create_async_context_callback(*m_image_ctx, + on_dispatched); ++ auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) { ++ handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched); ++ }); + + if (m_cache_client && m_cache_client->connected && m_object_store) { +- bool exists; + m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(), +- m_image_ctx->id, oid, &exists); +- +- // try to read from parent image +- ldout(cct, 20) << "SRO cache object exists:" << exists << dendl; +- if (exists) { +- int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); +- if (r != 0) { +- *dispatch_result = io::DISPATCH_RESULT_COMPLETE; +- on_dispatched->complete(r); +- return true; +- } +- } ++ m_image_ctx->id, oid, ctx); + } +- +- ldout(cct, 20) << "Continue read from RADOS" << dendl; +- *dispatch_result = io::DISPATCH_RESULT_CONTINUE; +- on_dispatched->complete(0); + return true; + } + + template ++int SharedPersistentObjectCacherObjectDispatch::handle_read_cache( ++ bool cache, ++ const std::string &oid, uint64_t object_off, uint64_t object_len, ++ ceph::bufferlist* read_data, io::DispatchResult* dispatch_result, ++ Context* on_dispatched) { ++ // IO chained in reverse order ++ auto cct = m_image_ctx->cct; ++ ldout(cct, 20) << dendl; ++ ++ // try to read from parent image ++ if (cache) { ++ int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); ++ if (r != 0) { ++ *dispatch_result = io::DISPATCH_RESULT_COMPLETE; ++ //TODO(): complete in syncfile ++ on_dispatched->complete(r); ++ ldout(cct, 20) << "AAAAcomplete=" << *dispatch_result <complete(0); ++ ldout(cct, 20) << "BBB no cache" << *dispatch_result < + void SharedPersistentObjectCacherObjectDispatch::client_handle_request(std::string msg) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << dendl; +@@ -123,26 +132,26 @@ void SharedPersistentObjectCacherObjectDispatch::client_handle_request(std::s + rbd::cache::rbdsc_req_type_t *io_ctx = (rbd::cache::rbdsc_req_type_t*)(msg.c_str()); + + switch (io_ctx->type) { +- case RBDSC_REGISTER_REPLY: { ++ case rbd::cache::RBDSC_REGISTER_REPLY: { + // open cache handler for volume + ldout(cct, 20) << "SRO cache client open cache handler" << dendl; + m_object_store = new SharedPersistentObjectCacher(m_image_ctx, m_image_ctx->shared_cache_path); + + break; + } +- case RBDSC_READ_REPLY: { ++ case rbd::cache::RBDSC_READ_REPLY: { + ldout(cct, 20) << "SRO cache client start to read cache" << dendl; + //TODO(): should call read here + + break; + } +- case RBDSC_READ_RADOS: { ++ case rbd::cache::RBDSC_READ_RADOS: { + ldout(cct, 20) << "SRO cache client start to read rados" << dendl; + //TODO(): should call read here + + break; + } +- default: ldout(cct, 20) << "nothing" << dendl; ++ default: ldout(cct, 20) << "nothing" << io_ctx->type <init_cache(io_ctx->vol_name, io_ctx->vol_size); + io_ctx->type = RBDSC_REGISTER_REPLY; +- m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size())); ++ m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); + + break; + } +@@ -98,7 +98,10 @@ void CacheController::handle_request(uint64_t sesstion_id, std::string msg){ + } else { + io_ctx->type = RBDSC_READ_REPLY; + } +- m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size())); ++ if (io_ctx->type != RBDSC_READ_REPLY) { ++ assert(0); ++ } ++ m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); + + break; + } +diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp +index 967af1d..d178b58 100644 +--- a/src/tools/rbd_cache/CacheControllerSocket.hpp ++++ b/src/tools/rbd_cache/CacheControllerSocket.hpp +@@ -43,7 +43,9 @@ public: + void handle_read(const boost::system::error_code& error, size_t bytes_transferred) { + + if (!error) { +- ++ if(bytes_transferred != 544){ ++ assert(0); ++ } + process_msg(session_id, std::string(data_, bytes_transferred)); + + } +@@ -51,7 +53,8 @@ public: + + void handle_write(const boost::system::error_code& error) { + if (!error) { +- socket_.async_read_some(boost::asio::buffer(data_), ++ boost::asio::async_read(socket_, boost::asio::buffer(data_), ++ boost::asio::transfer_exactly(544), + boost::bind(&session::handle_read, + shared_from_this(), + boost::asio::placeholders::error, +@@ -63,6 +66,7 @@ public: + + boost::asio::async_write(socket_, + boost::asio::buffer(msg.c_str(), msg.size()), ++ boost::asio::transfer_exactly(544), + boost::bind(&session::handle_write, + shared_from_this(), + boost::asio::placeholders::error)); +diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp +index 56b79ce..3b0ca00 100644 +--- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp ++++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp +@@ -8,6 +8,7 @@ + #include + #include + #include "include/assert.h" ++#include "include/Context.h" + #include "CacheControllerSocketCommon.h" + + +@@ -26,8 +27,12 @@ public: + m_client_process_msg(processmsg), + ep_(stream_protocol::endpoint(file)) + { +- std::thread thd([this](){io_service_.run(); }); +- thd.detach(); ++ io_thread.reset(new std::thread([this](){io_service_.run(); })); ++ } ++ ++ ~CacheClient() { ++ io_service_.stop(); ++ io_thread->join(); + } + + void run(){ +@@ -53,7 +58,8 @@ public: + message->offset = 0; + message->length = 0; + boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), +- [this](const boost::system::error_code& err, size_t cb) { ++ [this, message](const boost::system::error_code& err, size_t cb) { ++ delete message; + if (!err) { + boost::asio::async_read(socket_, boost::asio::buffer(buffer_), + boost::asio::transfer_exactly(544), +@@ -72,7 +78,7 @@ public: + return 0; + } + +- int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, bool* result) { ++ int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) { + rbdsc_req_type_t *message = new rbdsc_req_type_t(); + message->type = RBDSC_READ; + memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); +@@ -82,49 +88,48 @@ public: + message->length = 0; + + boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), +- [this, result](const boost::system::error_code& err, size_t cb) { ++ [this, on_finish, message](const boost::system::error_code& err, size_t cb) { ++ delete message; + if (!err) { +- get_result(result); ++ get_result(on_finish); + } else { + return -1; + } + }); +- std::unique_lock lk(m); +- //cv.wait(lk); +- cv.wait_for(lk, std::chrono::milliseconds(100)); ++ + return 0; + } + +- void get_result(bool* result) { ++ void get_result(Context* on_finish) { + boost::asio::async_read(socket_, boost::asio::buffer(buffer_), + boost::asio::transfer_exactly(544), +- [this, result](const boost::system::error_code& err, size_t cb) { ++ [this, on_finish](const boost::system::error_code& err, size_t cb) { ++ if (cb != 544) { ++ assert(0); ++ } + if (!err) { + rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(buffer_); + if (io_ctx->type == RBDSC_READ_REPLY) { +- *result = true; ++ on_finish->complete(true); ++ return; + } else { +- *result = false; ++ on_finish->complete(false); ++ return; + } +- cv.notify_one(); +- m_client_process_msg(std::string(buffer_, cb)); + } else { +- return -1; ++ assert(0); ++ return on_finish->complete(false); + } + }); + } + +- void handle_connect(const boost::system::error_code& error) { +- //TODO(): open librbd snap +- } +- +- void handle_write(const boost::system::error_code& error) { +- } + + private: + boost::asio::io_service& io_service_; + boost::asio::io_service::work io_service_work_; + stream_protocol::socket socket_; ++ ++ std::shared_ptr io_thread; + ClientProcessMsg m_client_process_msg; + stream_protocol::endpoint ep_; + char buffer_[1024]; +diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h +index a9d73a8..e026ec8 100644 +--- a/src/tools/rbd_cache/CacheControllerSocketCommon.h ++++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h +@@ -4,6 +4,7 @@ + #ifndef CACHE_CONTROLLER_SOCKET_COMMON_H + #define CACHE_CONTROLLER_SOCKET_COMMON_H + ++/* + #define RBDSC_REGISTER 0X11 + #define RBDSC_READ 0X12 + #define RBDSC_LOOKUP 0X13 +@@ -11,10 +12,21 @@ + #define RBDSC_READ_REPLY 0X15 + #define RBDSC_LOOKUP_REPLY 0X16 + #define RBDSC_READ_RADOS 0X17 ++*/ + + namespace rbd { + namespace cache { + ++static const int RBDSC_REGISTER = 0X11; ++static const int RBDSC_READ = 0X12; ++static const int RBDSC_LOOKUP = 0X13; ++static const int RBDSC_REGISTER_REPLY = 0X14; ++static const int RBDSC_READ_REPLY = 0X15; ++static const int RBDSC_LOOKUP_REPLY = 0X16; ++static const int RBDSC_READ_RADOS = 0X17; ++ ++ ++ + typedef std::function ProcessMsg; + typedef std::function ClientProcessMsg; + typedef uint8_t rbdsc_req_type; +-- +2.7.4 +