X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Frgw%2Frgw_sync_module_es_rest.cc;fp=src%2Fceph%2Fsrc%2Frgw%2Frgw_sync_module_es_rest.cc;h=200335ffa6377778d282e25f312db7b63022cc43;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/rgw/rgw_sync_module_es_rest.cc b/src/ceph/src/rgw/rgw_sync_module_es_rest.cc new file mode 100644 index 0000000..200335f --- /dev/null +++ b/src/ceph/src/rgw/rgw_sync_module_es_rest.cc @@ -0,0 +1,412 @@ +#include "rgw_sync_module_es.h" +#include "rgw_sync_module_es_rest.h" +#include "rgw_es_query.h" +#include "rgw_op.h" +#include "rgw_rest.h" +#include "rgw_rest_s3.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rgw + +struct es_index_obj_response { + string bucket; + rgw_obj_key key; + uint64_t versioned_epoch{0}; + ACLOwner owner; + set read_permissions; + + struct { + uint64_t size{0}; + ceph::real_time mtime; + string etag; + string content_type; + map custom_str; + map custom_int; + map custom_date; + + template + struct _custom_entry { + string name; + T value; + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("name", name, obj); + JSONDecoder::decode_json("value", value, obj); + } + }; + + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("size", size, obj); + string mtime_str; + JSONDecoder::decode_json("mtime", mtime_str, obj); + parse_time(mtime_str.c_str(), &mtime); + JSONDecoder::decode_json("etag", etag, obj); + JSONDecoder::decode_json("content_type", content_type, obj); + list<_custom_entry > str_entries; + JSONDecoder::decode_json("custom-string", str_entries, obj); + for (auto& e : str_entries) { + custom_str[e.name] = e.value; + } + list<_custom_entry > int_entries; + JSONDecoder::decode_json("custom-int", int_entries, obj); + for (auto& e : int_entries) { + custom_int[e.name] = e.value; + } + list<_custom_entry > date_entries; + JSONDecoder::decode_json("custom-date", date_entries, obj); + for (auto& e : date_entries) { + custom_date[e.name] = e.value; + } + } + } meta; + + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("bucket", bucket, obj); + JSONDecoder::decode_json("name", key.name, obj); + JSONDecoder::decode_json("instance", key.instance, obj); + JSONDecoder::decode_json("versioned_epoch", versioned_epoch, obj); + JSONDecoder::decode_json("permissions", read_permissions, obj); + JSONDecoder::decode_json("owner", owner, obj); + JSONDecoder::decode_json("meta", meta, obj); + } +}; + +struct es_search_response { + uint32_t took; + bool timed_out; + struct { + uint32_t total; + uint32_t successful; + uint32_t failed; + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("total", total, obj); + JSONDecoder::decode_json("successful", successful, obj); + JSONDecoder::decode_json("failed", failed, obj); + } + } shards; + struct obj_hit { + string index; + string type; + string id; + // double score + es_index_obj_response source; + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("_index", index, obj); + JSONDecoder::decode_json("_type", type, obj); + JSONDecoder::decode_json("_id", id, obj); + JSONDecoder::decode_json("_source", source, obj); + } + }; + struct { + uint32_t total; + // double max_score; + list hits; + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("total", total, obj); + // JSONDecoder::decode_json("max_score", max_score, obj); + JSONDecoder::decode_json("hits", hits, obj); + } + } hits; + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("took", took, obj); + JSONDecoder::decode_json("timed_out", timed_out, obj); + JSONDecoder::decode_json("_shards", shards, obj); + JSONDecoder::decode_json("hits", hits, obj); + } +}; + +class RGWMetadataSearchOp : public RGWOp { + RGWSyncModuleInstanceRef sync_module_ref; + RGWElasticSyncModuleInstance *es_module; +protected: + string expression; + string custom_prefix; +#define MAX_KEYS_DEFAULT 100 + uint64_t max_keys{MAX_KEYS_DEFAULT}; + string marker_str; + uint64_t marker{0}; + string next_marker; + bool is_truncated{false}; + string err; + + es_search_response response; + +public: + RGWMetadataSearchOp(const RGWSyncModuleInstanceRef& sync_module) : sync_module_ref(sync_module) { + es_module = static_cast(sync_module_ref.get()); + } + + int verify_permission() { + return 0; + } + virtual int get_params() = 0; + void pre_exec(); + void execute(); + + virtual void send_response() = 0; + virtual const string name() { return "metadata_search"; } + virtual RGWOpType get_type() { return RGW_OP_METADATA_SEARCH; } + virtual uint32_t op_mask() { return RGW_OP_TYPE_READ; } +}; + +void RGWMetadataSearchOp::pre_exec() +{ + rgw_bucket_object_pre_exec(s); +} + +void RGWMetadataSearchOp::execute() +{ + op_ret = get_params(); + if (op_ret < 0) + return; + + list > conds; + + if (!s->user->system) { + conds.push_back(make_pair("permissions", s->user->user_id.to_str())); + } + + if (!s->bucket_name.empty()) { + conds.push_back(make_pair("bucket", s->bucket_name)); + } + + ESQueryCompiler es_query(expression, &conds, custom_prefix); + + static map aliases = { + { "bucket", "bucket" }, /* forces lowercase */ + { "name", "name" }, + { "key", "name" }, + { "instance", "instance" }, + { "etag", "meta.etag" }, + { "size", "meta.size" }, + { "mtime", "meta.mtime" }, + { "lastmodified", "meta.mtime" }, + { "contenttype", "meta.contenttype" }, + }; + es_query.set_field_aliases(&aliases); + + static map generic_map = { {"bucket", ESEntityTypeMap::ES_ENTITY_STR}, + {"name", ESEntityTypeMap::ES_ENTITY_STR}, + {"instance", ESEntityTypeMap::ES_ENTITY_STR}, + {"permissions", ESEntityTypeMap::ES_ENTITY_STR}, + {"meta.etag", ESEntityTypeMap::ES_ENTITY_STR}, + {"meta.contenttype", ESEntityTypeMap::ES_ENTITY_STR}, + {"meta.mtime", ESEntityTypeMap::ES_ENTITY_DATE}, + {"meta.size", ESEntityTypeMap::ES_ENTITY_INT} }; + ESEntityTypeMap gm(generic_map); + es_query.set_generic_type_map(&gm); + + static set restricted_fields = { {"permissions"} }; + es_query.set_restricted_fields(&restricted_fields); + + map custom_map; + for (auto& i : s->bucket_info.mdsearch_config) { + custom_map[i.first] = (ESEntityTypeMap::EntityType)i.second; + } + + ESEntityTypeMap em(custom_map); + es_query.set_custom_type_map(&em); + + bool valid = es_query.compile(&err); + if (!valid) { + ldout(s->cct, 10) << "invalid query, failed generating request json" << dendl; + op_ret = -EINVAL; + return; + } + + JSONFormatter f; + encode_json("root", es_query, &f); + + RGWRESTConn *conn = es_module->get_rest_conn(); + + bufferlist in; + bufferlist out; + + stringstream ss; + + f.flush(ss); + in.append(ss.str()); + + string resource = es_module->get_index_path() + "/_search"; + param_vec_t params; + static constexpr int BUFSIZE = 32; + char buf[BUFSIZE]; + snprintf(buf, sizeof(buf), "%lld", (long long)max_keys); + params.push_back(param_pair_t("size", buf)); + if (marker > 0) { + params.push_back(param_pair_t("from", marker_str.c_str())); + } + ldout(s->cct, 20) << "sending request to elasticsearch, payload=" << string(in.c_str(), in.length()) << dendl; + op_ret = conn->get_resource(resource, ¶ms, nullptr, out, &in); + if (op_ret < 0) { + ldout(s->cct, 0) << "ERROR: failed to fetch resource (r=" << resource << ", ret=" << op_ret << ")" << dendl; + return; + } + + ldout(s->cct, 20) << "response: " << string(out.c_str(), out.length()) << dendl; + + JSONParser jparser; + if (!jparser.parse(out.c_str(), out.length())) { + ldout(s->cct, 0) << "ERROR: failed to parse elasticsearch response" << dendl; + op_ret = -EINVAL; + return; + } + + try { + decode_json_obj(response, &jparser); + } catch (JSONDecoder::err& e) { + ldout(s->cct, 0) << "ERROR: failed to decode JSON input: " << e.message << dendl; + op_ret = -EINVAL; + return; + } + +} + +class RGWMetadataSearch_ObjStore_S3 : public RGWMetadataSearchOp { +public: + RGWMetadataSearch_ObjStore_S3(const RGWSyncModuleInstanceRef& _sync_module) : RGWMetadataSearchOp(_sync_module) { + custom_prefix = "x-amz-meta-"; + } + + int get_params() override { + expression = s->info.args.get("query"); + bool exists; + string max_keys_str = s->info.args.get("max-keys", &exists); +#define MAX_KEYS_MAX 10000 + if (exists) { + string err; + max_keys = strict_strtoll(max_keys_str.c_str(), 10, &err); + if (!err.empty()) { + return -EINVAL; + } + if (max_keys > MAX_KEYS_MAX) { + max_keys = MAX_KEYS_MAX; + } + } + marker_str = s->info.args.get("marker", &exists); + if (exists) { + string err; + marker = strict_strtoll(marker_str.c_str(), 10, &err); + if (!err.empty()) { + return -EINVAL; + } + } + uint64_t nm = marker + max_keys; + static constexpr int BUFSIZE = 32; + char buf[BUFSIZE]; + snprintf(buf, sizeof(buf), "%lld", (long long)nm); + next_marker = buf; + return 0; + } + void send_response() override { + if (op_ret) { + s->err.message = err; + set_req_state_err(s, op_ret); + } + dump_errno(s); + end_header(s, this, "application/xml"); + + if (op_ret < 0) { + return; + } + + is_truncated = (response.hits.hits.size() >= max_keys); + + s->formatter->open_object_section("SearchMetadataResponse"); + s->formatter->dump_string("Marker", marker_str); + s->formatter->dump_string("IsTruncated", (is_truncated ? "true" : "false")); + if (is_truncated) { + s->formatter->dump_string("NextMarker", next_marker); + } + if (s->format == RGW_FORMAT_JSON) { + s->formatter->open_array_section("Objects"); + } + for (auto& i : response.hits.hits) { + s->formatter->open_object_section("Contents"); + es_index_obj_response& e = i.source; + s->formatter->dump_string("Bucket", e.bucket); + s->formatter->dump_string("Key", e.key.name); + string instance = (!e.key.instance.empty() ? e.key.instance : "null"); + s->formatter->dump_string("Instance", instance.c_str()); + s->formatter->dump_int("VersionedEpoch", e.versioned_epoch); + dump_time(s, "LastModified", &e.meta.mtime); + s->formatter->dump_int("Size", e.meta.size); + s->formatter->dump_format("ETag", "\"%s\"", e.meta.etag.c_str()); + s->formatter->dump_string("ContentType", e.meta.content_type.c_str()); + dump_owner(s, e.owner.get_id(), e.owner.get_display_name()); + s->formatter->open_array_section("CustomMetadata"); + for (auto& m : e.meta.custom_str) { + s->formatter->open_object_section("Entry"); + s->formatter->dump_string("Name", m.first.c_str()); + s->formatter->dump_string("Value", m.second); + s->formatter->close_section(); + } + for (auto& m : e.meta.custom_int) { + s->formatter->open_object_section("Entry"); + s->formatter->dump_string("Name", m.first.c_str()); + s->formatter->dump_int("Value", m.second); + s->formatter->close_section(); + } + for (auto& m : e.meta.custom_date) { + s->formatter->open_object_section("Entry"); + s->formatter->dump_string("Name", m.first.c_str()); + s->formatter->dump_string("Value", m.second); + s->formatter->close_section(); + } + s->formatter->close_section(); + rgw_flush_formatter(s, s->formatter); + s->formatter->close_section(); + }; + if (s->format == RGW_FORMAT_JSON) { + s->formatter->close_section(); + } + s->formatter->close_section(); + rgw_flush_formatter_and_reset(s, s->formatter); + } +}; + +class RGWHandler_REST_MDSearch_S3 : public RGWHandler_REST_S3 { +protected: + RGWOp *op_get() { + if (s->info.args.exists("query")) { + return new RGWMetadataSearch_ObjStore_S3(store->get_sync_module()); + } + if (!s->init_state.url_bucket.empty() && + s->info.args.exists("mdsearch")) { + return new RGWGetBucketMetaSearch_ObjStore_S3; + } + return nullptr; + } + RGWOp *op_head() { + return nullptr; + } + RGWOp *op_post() { + return nullptr; + } +public: + RGWHandler_REST_MDSearch_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {} + virtual ~RGWHandler_REST_MDSearch_S3() {} +}; + + +RGWHandler_REST* RGWRESTMgr_MDSearch_S3::get_handler(struct req_state* const s, + const rgw::auth::StrategyRegistry& auth_registry, + const std::string& frontend_prefix) +{ + int ret = + RGWHandler_REST_S3::init_from_header(s, + RGW_FORMAT_XML, true); + if (ret < 0) { + return nullptr; + } + + if (!s->object.empty()) { + return nullptr; + } + + RGWHandler_REST *handler = new RGWHandler_REST_MDSearch_S3(auth_registry); + + ldout(s->cct, 20) << __func__ << " handler=" << typeid(*handler).name() + << dendl; + return handler; +} +