// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "cls/journal/cls_journal_client.h" #include "include/rados/librados.hpp" #include "include/buffer.h" #include "include/Context.h" #include "common/Cond.h" #include namespace cls { namespace journal { namespace client { namespace { struct C_AioExec : public Context { librados::IoCtx &ioctx; std::string oid; C_AioExec(librados::IoCtx &_ioctx, const std::string &_oid) : ioctx(_ioctx), oid(_oid) { } static void rados_callback(rados_completion_t c, void *arg) { Context *ctx = reinterpret_cast(arg); ctx->complete(rados_aio_get_return_value(c)); } }; struct C_ClientList : public C_AioExec { std::set *clients; Context *on_finish; bufferlist outbl; C_ClientList(librados::IoCtx &_ioctx, const std::string &_oid, std::set *_clients, Context *_on_finish) : C_AioExec(_ioctx, _oid), clients(_clients), on_finish(_on_finish) {} void send(const std::string &start_after) { bufferlist inbl; ::encode(start_after, inbl); ::encode(JOURNAL_MAX_RETURN, inbl); librados::ObjectReadOperation op; op.exec("journal", "client_list", inbl); outbl.clear(); librados::AioCompletion *rados_completion = librados::Rados::aio_create_completion(this, rados_callback, NULL); int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl); assert(r == 0); rados_completion->release(); } void complete(int r) override { if (r < 0) { finish(r); return; } try { bufferlist::iterator iter = outbl.begin(); std::set partial_clients; ::decode(partial_clients, iter); std::string start_after; if (!partial_clients.empty()) { start_after = partial_clients.rbegin()->id; clients->insert(partial_clients.begin(), partial_clients.end()); } if (partial_clients.size() < JOURNAL_MAX_RETURN) { finish(0); } else { send(start_after); } } catch (const buffer::error &err) { finish(-EBADMSG); } } void finish(int r) override { on_finish->complete(r); delete this; } }; struct C_ImmutableMetadata : public C_AioExec { uint8_t *order; uint8_t *splay_width; int64_t *pool_id; Context *on_finish; bufferlist outbl; C_ImmutableMetadata(librados::IoCtx &_ioctx, const std::string &_oid, uint8_t *_order, uint8_t *_splay_width, int64_t *_pool_id, Context *_on_finish) : C_AioExec(_ioctx, _oid), order(_order), splay_width(_splay_width), pool_id(_pool_id), on_finish(_on_finish) { } void send() { librados::ObjectReadOperation op; bufferlist inbl; op.exec("journal", "get_order", inbl); op.exec("journal", "get_splay_width", inbl); op.exec("journal", "get_pool_id", inbl); librados::AioCompletion *rados_completion = librados::Rados::aio_create_completion(this, rados_callback, NULL); int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl); assert(r == 0); rados_completion->release(); } void finish(int r) override { if (r == 0) { try { bufferlist::iterator iter = outbl.begin(); ::decode(*order, iter); ::decode(*splay_width, iter); ::decode(*pool_id, iter); } catch (const buffer::error &err) { r = -EBADMSG; } } on_finish->complete(r); } }; struct C_MutableMetadata : public C_AioExec { uint64_t *minimum_set; uint64_t *active_set; C_ClientList *client_list; bufferlist outbl; C_MutableMetadata(librados::IoCtx &_ioctx, const std::string &_oid, uint64_t *_minimum_set, uint64_t *_active_set, C_ClientList *_client_list) : C_AioExec(_ioctx, _oid), minimum_set(_minimum_set), active_set(_active_set), client_list(_client_list) {} void send() { librados::ObjectReadOperation op; bufferlist inbl; op.exec("journal", "get_minimum_set", inbl); op.exec("journal", "get_active_set", inbl); librados::AioCompletion *rados_completion = librados::Rados::aio_create_completion(this, rados_callback, NULL); int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl); assert(r == 0); rados_completion->release(); } void finish(int r) override { if (r == 0) { try { bufferlist::iterator iter = outbl.begin(); ::decode(*minimum_set, iter); ::decode(*active_set, iter); client_list->send(""); } catch (const buffer::error &err) { r = -EBADMSG; } } if (r < 0) { client_list->complete(r); } } }; } // anonymous namespace void create(librados::ObjectWriteOperation *op, uint8_t order, uint8_t splay, int64_t pool_id) { bufferlist bl; ::encode(order, bl); ::encode(splay, bl); ::encode(pool_id, bl); op->exec("journal", "create", bl); } int create(librados::IoCtx &ioctx, const std::string &oid, uint8_t order, uint8_t splay, int64_t pool_id) { librados::ObjectWriteOperation op; create(&op, order, splay, pool_id); int r = ioctx.operate(oid, &op); if (r < 0) { return r; } return 0; } void get_immutable_metadata(librados::IoCtx &ioctx, const std::string &oid, uint8_t *order, uint8_t *splay_width, int64_t *pool_id, Context *on_finish) { C_ImmutableMetadata *metadata = new C_ImmutableMetadata(ioctx, oid, order, splay_width, pool_id, on_finish); metadata->send(); } void get_mutable_metadata(librados::IoCtx &ioctx, const std::string &oid, uint64_t *minimum_set, uint64_t *active_set, std::set *clients, Context *on_finish) { C_ClientList *client_list = new C_ClientList(ioctx, oid, clients, on_finish); C_MutableMetadata *metadata = new C_MutableMetadata( ioctx, oid, minimum_set, active_set, client_list); metadata->send(); } void set_minimum_set(librados::ObjectWriteOperation *op, uint64_t object_set) { bufferlist bl; ::encode(object_set, bl); op->exec("journal", "set_minimum_set", bl); } void set_active_set(librados::ObjectWriteOperation *op, uint64_t object_set) { bufferlist bl; ::encode(object_set, bl); op->exec("journal", "set_active_set", bl); } int get_client(librados::IoCtx &ioctx, const std::string &oid, const std::string &id, cls::journal::Client *client) { librados::ObjectReadOperation op; get_client_start(&op, id); bufferlist out_bl; int r = ioctx.operate(oid, &op, &out_bl); if (r < 0) { return r; } bufferlist::iterator iter = out_bl.begin(); r = get_client_finish(&iter, client); if (r < 0) { return r; } return 0; } void get_client_start(librados::ObjectReadOperation *op, const std::string &id) { bufferlist bl; ::encode(id, bl); op->exec("journal", "get_client", bl); } int get_client_finish(bufferlist::iterator *iter, cls::journal::Client *client) { try { ::decode(*client, *iter); } catch (const buffer::error &err) { return -EBADMSG; } return 0; } int client_register(librados::IoCtx &ioctx, const std::string &oid, const std::string &id, const bufferlist &data) { librados::ObjectWriteOperation op; client_register(&op, id, data); return ioctx.operate(oid, &op); } void client_register(librados::ObjectWriteOperation *op, const std::string &id, const bufferlist &data) { bufferlist bl; ::encode(id, bl); ::encode(data, bl); op->exec("journal", "client_register", bl); } int client_update_data(librados::IoCtx &ioctx, const std::string &oid, const std::string &id, const bufferlist &data) { librados::ObjectWriteOperation op; client_update_data(&op, id, data); return ioctx.operate(oid, &op); } void client_update_data(librados::ObjectWriteOperation *op, const std::string &id, const bufferlist &data) { bufferlist bl; ::encode(id, bl); ::encode(data, bl); op->exec("journal", "client_update_data", bl); } int client_update_state(librados::IoCtx &ioctx, const std::string &oid, const std::string &id, cls::journal::ClientState state) { librados::ObjectWriteOperation op; client_update_state(&op, id, state); return ioctx.operate(oid, &op); } void client_update_state(librados::ObjectWriteOperation *op, const std::string &id, cls::journal::ClientState state) { bufferlist bl; ::encode(id, bl); ::encode(static_cast(state), bl); op->exec("journal", "client_update_state", bl); } int client_unregister(librados::IoCtx &ioctx, const std::string &oid, const std::string &id) { librados::ObjectWriteOperation op; client_unregister(&op, id); return ioctx.operate(oid, &op); } void client_unregister(librados::ObjectWriteOperation *op, const std::string &id) { bufferlist bl; ::encode(id, bl); op->exec("journal", "client_unregister", bl); } void client_commit(librados::ObjectWriteOperation *op, const std::string &id, const cls::journal::ObjectSetPosition &commit_position) { bufferlist bl; ::encode(id, bl); ::encode(commit_position, bl); op->exec("journal", "client_commit", bl); } int client_list(librados::IoCtx &ioctx, const std::string &oid, std::set *clients) { C_SaferCond cond; client_list(ioctx, oid, clients, &cond); return cond.wait(); } void client_list(librados::IoCtx &ioctx, const std::string &oid, std::set *clients, Context *on_finish) { C_ClientList *client_list = new C_ClientList(ioctx, oid, clients, on_finish); client_list->send(""); } int get_next_tag_tid(librados::IoCtx &ioctx, const std::string &oid, uint64_t *tag_tid) { librados::ObjectReadOperation op; get_next_tag_tid_start(&op); bufferlist out_bl; int r = ioctx.operate(oid, &op, &out_bl); if (r < 0) { return r; } bufferlist::iterator iter = out_bl.begin(); r = get_next_tag_tid_finish(&iter, tag_tid); if (r < 0) { return r; } return 0; } void get_next_tag_tid_start(librados::ObjectReadOperation *op) { bufferlist bl; op->exec("journal", "get_next_tag_tid", bl); } int get_next_tag_tid_finish(bufferlist::iterator *iter, uint64_t *tag_tid) { try { ::decode(*tag_tid, *iter); } catch (const buffer::error &err) { return -EBADMSG; } return 0; } int get_tag(librados::IoCtx &ioctx, const std::string &oid, uint64_t tag_tid, cls::journal::Tag *tag) { librados::ObjectReadOperation op; get_tag_start(&op, tag_tid); bufferlist out_bl; int r = ioctx.operate(oid, &op, &out_bl); if (r < 0) { return r; } bufferlist::iterator iter = out_bl.begin(); r = get_tag_finish(&iter, tag); if (r < 0) { return r; } return 0; } void get_tag_start(librados::ObjectReadOperation *op, uint64_t tag_tid) { bufferlist bl; ::encode(tag_tid, bl); op->exec("journal", "get_tag", bl); } int get_tag_finish(bufferlist::iterator *iter, cls::journal::Tag *tag) { try { ::decode(*tag, *iter); } catch (const buffer::error &err) { return -EBADMSG; } return 0; } int tag_create(librados::IoCtx &ioctx, const std::string &oid, uint64_t tag_tid, uint64_t tag_class, const bufferlist &data) { librados::ObjectWriteOperation op; tag_create(&op, tag_tid, tag_class, data); return ioctx.operate(oid, &op); } void tag_create(librados::ObjectWriteOperation *op, uint64_t tag_tid, uint64_t tag_class, const bufferlist &data) { bufferlist bl; ::encode(tag_tid, bl); ::encode(tag_class, bl); ::encode(data, bl); op->exec("journal", "tag_create", bl); } int tag_list(librados::IoCtx &ioctx, const std::string &oid, const std::string &client_id, boost::optional tag_class, std::set *tags) { tags->clear(); uint64_t start_after_tag_tid = 0; while (true) { librados::ObjectReadOperation op; tag_list_start(&op, start_after_tag_tid, JOURNAL_MAX_RETURN, client_id, tag_class); bufferlist out_bl; int r = ioctx.operate(oid, &op, &out_bl); if (r < 0) { return r; } bufferlist::iterator iter = out_bl.begin(); std::set decode_tags; r = tag_list_finish(&iter, &decode_tags); if (r < 0) { return r; } tags->insert(decode_tags.begin(), decode_tags.end()); if (decode_tags.size() < JOURNAL_MAX_RETURN) { break; } } return 0; } void tag_list_start(librados::ObjectReadOperation *op, uint64_t start_after_tag_tid, uint64_t max_return, const std::string &client_id, boost::optional tag_class) { bufferlist bl; ::encode(start_after_tag_tid, bl); ::encode(max_return, bl); ::encode(client_id, bl); ::encode(tag_class, bl); op->exec("journal", "tag_list", bl); } int tag_list_finish(bufferlist::iterator *iter, std::set *tags) { try { ::decode(*tags, *iter); } catch (const buffer::error &err) { return -EBADMSG; } return 0; } void guard_append(librados::ObjectWriteOperation *op, uint64_t soft_max_size) { bufferlist bl; ::encode(soft_max_size, bl); op->exec("journal", "guard_append", bl); } } // namespace client } // namespace journal } // namespace cls