1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "test/librados_test_stub/TestRadosClient.h"
5 #include "test/librados_test_stub/TestIoCtxImpl.h"
6 #include "librados/AioCompletionImpl.h"
7 #include "include/assert.h"
8 #include "common/ceph_json.h"
9 #include "common/Finisher.h"
10 #include <boost/bind.hpp>
11 #include <boost/thread.hpp>
16 static int get_concurrency() {
18 char *env = getenv("LIBRADOS_CONCURRENCY");
20 concurrency = atoi(env);
22 if (concurrency == 0) {
23 concurrency = boost::thread::thread::hardware_concurrency();
25 if (concurrency == 0) {
33 static void finish_aio_completion(AioCompletionImpl *c, int r) {
39 rados_callback_t cb_complete = c->callback_complete;
40 void *cb_complete_arg = c->callback_complete_arg;
42 cb_complete(c, cb_complete_arg);
45 rados_callback_t cb_safe = c->callback_safe;
46 void *cb_safe_arg = c->callback_safe_arg;
48 cb_safe(c, cb_safe_arg);
52 c->callback_complete = NULL;
53 c->callback_safe = NULL;
58 class AioFunctionContext : public Context {
60 AioFunctionContext(const TestRadosClient::AioFunction &callback,
61 Finisher *finisher, AioCompletionImpl *c)
62 : m_callback(callback), m_finisher(finisher), m_comp(c)
69 void finish(int r) override {
70 int ret = m_callback();
72 if (m_finisher != NULL) {
73 m_finisher->queue(new FunctionContext(boost::bind(
74 &finish_aio_completion, m_comp, ret)));
76 finish_aio_completion(m_comp, ret);
81 TestRadosClient::AioFunction m_callback;
83 AioCompletionImpl *m_comp;
86 TestRadosClient::TestRadosClient(CephContext *cct,
87 TestWatchNotify *watch_notify)
88 : m_cct(cct->get()), m_watch_notify(watch_notify),
89 m_aio_finisher(new Finisher(m_cct))
93 // simulate multiple OSDs
94 int concurrency = get_concurrency();
95 for (int i = 0; i < concurrency; ++i) {
96 m_finishers.push_back(new Finisher(m_cct));
97 m_finishers.back()->start();
100 // replicate AIO callback processing
101 m_aio_finisher->start();
104 TestRadosClient::~TestRadosClient() {
105 flush_aio_operations();
107 for (size_t i = 0; i < m_finishers.size(); ++i) {
108 m_finishers[i]->stop();
109 delete m_finishers[i];
111 m_aio_finisher->stop();
112 delete m_aio_finisher;
118 void TestRadosClient::get() {
122 void TestRadosClient::put() {
123 if (--m_refcount == 0) {
129 CephContext *TestRadosClient::cct() {
133 int TestRadosClient::connect() {
137 void TestRadosClient::shutdown() {
140 int TestRadosClient::wait_for_latest_osdmap() {
144 int TestRadosClient::mon_command(const std::vector<std::string>& cmd,
145 const bufferlist &inbl,
146 bufferlist *outbl, std::string *outs) {
147 for (std::vector<std::string>::const_iterator it = cmd.begin();
148 it != cmd.end(); ++it) {
150 if (!parser.parse(it->c_str(), it->length())) {
154 JSONObjIter j_it = parser.find("prefix");
159 if ((*j_it)->get_data() == "osd tier add") {
161 } else if ((*j_it)->get_data() == "osd tier cache-mode") {
163 } else if ((*j_it)->get_data() == "osd tier set-overlay") {
165 } else if ((*j_it)->get_data() == "osd tier remove-overlay") {
167 } else if ((*j_it)->get_data() == "osd tier remove") {
174 void TestRadosClient::add_aio_operation(const std::string& oid,
176 const AioFunction &aio_function,
177 AioCompletionImpl *c) {
178 AioFunctionContext *ctx = new AioFunctionContext(
179 aio_function, queue_callback ? m_aio_finisher : NULL, c);
180 get_finisher(oid)->queue(ctx);
183 struct WaitForFlush {
186 aio_finisher->queue(new FunctionContext(boost::bind(
187 &finish_aio_completion, c, 0)));
193 std::atomic<int64_t> count = { 0 };
194 Finisher *aio_finisher;
195 AioCompletionImpl *c;
198 void TestRadosClient::flush_aio_operations() {
199 AioCompletionImpl *comp = new AioCompletionImpl();
200 flush_aio_operations(comp);
201 comp->wait_for_safe();
205 void TestRadosClient::flush_aio_operations(AioCompletionImpl *c) {
208 WaitForFlush *wait_for_flush = new WaitForFlush();
209 wait_for_flush->count = m_finishers.size();
210 wait_for_flush->aio_finisher = m_aio_finisher;
211 wait_for_flush->c = c;
213 for (size_t i = 0; i < m_finishers.size(); ++i) {
214 AioFunctionContext *ctx = new AioFunctionContext(
215 boost::bind(&WaitForFlush::flushed, wait_for_flush),
217 m_finishers[i]->queue(ctx);
221 int TestRadosClient::aio_watch_flush(AioCompletionImpl *c) {
223 Context *ctx = new FunctionContext(boost::bind(
224 &TestRadosClient::finish_aio_completion, this, c, _1));
225 get_watch_notify()->aio_flush(this, ctx);
229 void TestRadosClient::finish_aio_completion(AioCompletionImpl *c, int r) {
230 librados::finish_aio_completion(c, r);
233 Finisher *TestRadosClient::get_finisher(const std::string &oid) {
234 std::size_t h = m_hash(oid);
235 return m_finishers[h % m_finishers.size()];
238 } // namespace librados