X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Frgw%2Frgw_log.cc;fp=src%2Fceph%2Fsrc%2Frgw%2Frgw_log.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=588aa0a26fde84b8b626c0864ed5977f06fb7129;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/rgw/rgw_log.cc b/src/ceph/src/rgw/rgw_log.cc deleted file mode 100644 index 588aa0a..0000000 --- a/src/ceph/src/rgw/rgw_log.cc +++ /dev/null @@ -1,429 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include "common/Clock.h" -#include "common/Timer.h" -#include "common/utf8.h" -#include "common/OutputDataSocket.h" -#include "common/Formatter.h" - -#include "rgw_bucket.h" -#include "rgw_log.h" -#include "rgw_acl.h" -#include "rgw_rados.h" -#include "rgw_client_io.h" -#include "rgw_rest.h" - -#define dout_subsys ceph_subsys_rgw - -static void set_param_str(struct req_state *s, const char *name, string& str) -{ - const char *p = s->info.env->get(name); - if (p) - str = p; -} - -string render_log_object_name(const string& format, - struct tm *dt, string& bucket_id, - const string& bucket_name) -{ - string o; - for (unsigned i=0; itm_year + 1900); - break; - case 'y': - sprintf(buf, "%.2d", dt->tm_year % 100); - break; - case 'm': - sprintf(buf, "%.2d", dt->tm_mon + 1); - break; - case 'd': - sprintf(buf, "%.2d", dt->tm_mday); - break; - case 'H': - sprintf(buf, "%.2d", dt->tm_hour); - break; - case 'I': - sprintf(buf, "%.2d", (dt->tm_hour % 12) + 1); - break; - case 'k': - sprintf(buf, "%d", dt->tm_hour); - break; - case 'l': - sprintf(buf, "%d", (dt->tm_hour % 12) + 1); - break; - case 'M': - sprintf(buf, "%.2d", dt->tm_min); - break; - - case 'i': - o += bucket_id; - continue; - case 'n': - o += bucket_name; - continue; - default: - // unknown code - sprintf(buf, "%%%c", format[i]); - break; - } - o += buf; - continue; - } - o += format[i]; - } - return o; -} - -/* usage logger */ -class UsageLogger { - CephContext *cct; - RGWRados *store; - map usage_map; - Mutex lock; - int32_t num_entries; - Mutex timer_lock; - SafeTimer timer; - utime_t round_timestamp; - - class C_UsageLogTimeout : public Context { - UsageLogger *logger; - public: - explicit C_UsageLogTimeout(UsageLogger *_l) : logger(_l) {} - void finish(int r) override { - logger->flush(); - logger->set_timer(); - } - }; - - void set_timer() { - timer.add_event_after(cct->_conf->rgw_usage_log_tick_interval, new C_UsageLogTimeout(this)); - } -public: - - UsageLogger(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("UsageLogger"), num_entries(0), timer_lock("UsageLogger::timer_lock"), timer(cct, timer_lock) { - timer.init(); - Mutex::Locker l(timer_lock); - set_timer(); - utime_t ts = ceph_clock_now(); - recalc_round_timestamp(ts); - } - - ~UsageLogger() { - Mutex::Locker l(timer_lock); - flush(); - timer.cancel_all_events(); - timer.shutdown(); - } - - void recalc_round_timestamp(utime_t& ts) { - round_timestamp = ts.round_to_hour(); - } - - void insert_user(utime_t& timestamp, const rgw_user& user, rgw_usage_log_entry& entry) { - lock.Lock(); - if (timestamp.sec() > round_timestamp + 3600) - recalc_round_timestamp(timestamp); - entry.epoch = round_timestamp.sec(); - bool account; - string u = user.to_str(); - rgw_user_bucket ub(u, entry.bucket); - real_time rt = round_timestamp.to_real_time(); - usage_map[ub].insert(rt, entry, &account); - if (account) - num_entries++; - bool need_flush = (num_entries > cct->_conf->rgw_usage_log_flush_threshold); - lock.Unlock(); - if (need_flush) { - Mutex::Locker l(timer_lock); - flush(); - } - } - - void insert(utime_t& timestamp, rgw_usage_log_entry& entry) { - if (entry.payer.empty()) { - insert_user(timestamp, entry.owner, entry); - } else { - insert_user(timestamp, entry.payer, entry); - } - } - - void flush() { - map old_map; - lock.Lock(); - old_map.swap(usage_map); - num_entries = 0; - lock.Unlock(); - - store->log_usage(old_map); - } -}; - -static UsageLogger *usage_logger = NULL; - -void rgw_log_usage_init(CephContext *cct, RGWRados *store) -{ - usage_logger = new UsageLogger(cct, store); -} - -void rgw_log_usage_finalize() -{ - delete usage_logger; - usage_logger = NULL; -} - -static void log_usage(struct req_state *s, const string& op_name) -{ - if (s->system_request) /* don't log system user operations */ - return; - - if (!usage_logger) - return; - - rgw_user user; - rgw_user payer; - string bucket_name; - - bucket_name = s->bucket_name; - - if (!bucket_name.empty()) { - user = s->bucket_owner.get_id(); - if (s->bucket_info.requester_pays) { - payer = s->user->user_id; - } - } else { - user = s->user->user_id; - } - - bool error = s->err.is_err(); - if (error && s->err.http_ret == 404) { - bucket_name = "-"; /* bucket not found, use the invalid '-' as bucket name */ - } - - string u = user.to_str(); - string p = payer.to_str(); - rgw_usage_log_entry entry(u, p, bucket_name); - - uint64_t bytes_sent = ACCOUNTING_IO(s)->get_bytes_sent(); - uint64_t bytes_received = ACCOUNTING_IO(s)->get_bytes_received(); - - rgw_usage_data data(bytes_sent, bytes_received); - - data.ops = 1; - if (!s->is_err()) - data.successful_ops = 1; - - ldout(s->cct, 30) << "log_usage: bucket_name=" << bucket_name - << " tenant=" << s->bucket_tenant - << ", bytes_sent=" << bytes_sent << ", bytes_received=" - << bytes_received << ", success=" << data.successful_ops << dendl; - - entry.add(op_name, data); - - utime_t ts = ceph_clock_now(); - - usage_logger->insert(ts, entry); -} - -void rgw_format_ops_log_entry(struct rgw_log_entry& entry, Formatter *formatter) -{ - formatter->open_object_section("log_entry"); - formatter->dump_string("bucket", entry.bucket); - entry.time.gmtime(formatter->dump_stream("time")); // UTC - entry.time.localtime(formatter->dump_stream("time_local")); - formatter->dump_string("remote_addr", entry.remote_addr); - string obj_owner = entry.object_owner.to_str(); - if (obj_owner.length()) - formatter->dump_string("object_owner", obj_owner); - formatter->dump_string("user", entry.user); - formatter->dump_string("operation", entry.op); - formatter->dump_string("uri", entry.uri); - formatter->dump_string("http_status", entry.http_status); - formatter->dump_string("error_code", entry.error_code); - formatter->dump_int("bytes_sent", entry.bytes_sent); - formatter->dump_int("bytes_received", entry.bytes_received); - formatter->dump_int("object_size", entry.obj_size); - uint64_t total_time = entry.total_time.sec() * 1000000LL + entry.total_time.usec(); - - formatter->dump_int("total_time", total_time); - formatter->dump_string("user_agent", entry.user_agent); - formatter->dump_string("referrer", entry.referrer); - if (entry.x_headers.size() > 0) { - formatter->open_array_section("http_x_headers"); - for (const auto& iter: entry.x_headers) { - formatter->open_object_section(iter.first.c_str()); - formatter->dump_string(iter.first.c_str(), iter.second); - formatter->close_section(); - } - formatter->close_section(); - } - formatter->close_section(); -} - -void OpsLogSocket::formatter_to_bl(bufferlist& bl) -{ - stringstream ss; - formatter->flush(ss); - const string& s = ss.str(); - - bl.append(s); -} - -void OpsLogSocket::init_connection(bufferlist& bl) -{ - bl.append("["); -} - -OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog), lock("OpsLogSocket") -{ - formatter = new JSONFormatter; - delim.append(",\n"); -} - -OpsLogSocket::~OpsLogSocket() -{ - delete formatter; -} - -void OpsLogSocket::log(struct rgw_log_entry& entry) -{ - bufferlist bl; - - lock.Lock(); - rgw_format_ops_log_entry(entry, formatter); - formatter_to_bl(bl); - lock.Unlock(); - - append_output(bl); -} - -int rgw_log_op(RGWRados *store, RGWREST* const rest, struct req_state *s, - const string& op_name, OpsLogSocket *olog) -{ - struct rgw_log_entry entry; - string bucket_id; - - if (s->enable_usage_log) - log_usage(s, op_name); - - if (!s->enable_ops_log) - return 0; - - if (s->bucket_name.empty()) { - ldout(s->cct, 5) << "nothing to log for operation" << dendl; - return -EINVAL; - } - if (s->err.ret == -ERR_NO_SUCH_BUCKET) { - if (!s->cct->_conf->rgw_log_nonexistent_bucket) { - ldout(s->cct, 5) << "bucket " << s->bucket << " doesn't exist, not logging" << dendl; - return 0; - } - bucket_id = ""; - } else { - bucket_id = s->bucket.bucket_id; - } - rgw_make_bucket_entry_name(s->bucket_tenant, s->bucket_name, entry.bucket); - - if (check_utf8(entry.bucket.c_str(), entry.bucket.size()) != 0) { - ldout(s->cct, 5) << "not logging op on bucket with non-utf8 name" << dendl; - return 0; - } - - if (!s->object.empty()) { - entry.obj = s->object; - } else { - entry.obj = rgw_obj_key("-"); - } - - entry.obj_size = s->obj_size; - - if (s->cct->_conf->rgw_remote_addr_param.length()) - set_param_str(s, s->cct->_conf->rgw_remote_addr_param.c_str(), - entry.remote_addr); - else - set_param_str(s, "REMOTE_ADDR", entry.remote_addr); - set_param_str(s, "HTTP_USER_AGENT", entry.user_agent); - set_param_str(s, "HTTP_REFERRER", entry.referrer); - set_param_str(s, "REQUEST_URI", entry.uri); - set_param_str(s, "REQUEST_METHOD", entry.op); - - /* custom header logging */ - if (rest) { - if (rest->log_x_headers()) { - for (const auto& iter : s->info.env->get_map()) { - if (rest->log_x_header(iter.first)) { - entry.x_headers.insert( - rgw_log_entry::headers_map::value_type(iter.first, iter.second)); - } - } - } - } - - entry.user = s->user->user_id.to_str(); - if (s->object_acl) - entry.object_owner = s->object_acl->get_owner().get_id(); - entry.bucket_owner = s->bucket_owner.get_id(); - - uint64_t bytes_sent = ACCOUNTING_IO(s)->get_bytes_sent(); - uint64_t bytes_received = ACCOUNTING_IO(s)->get_bytes_received(); - - entry.time = s->time; - entry.total_time = ceph_clock_now() - s->time; - entry.bytes_sent = bytes_sent; - entry.bytes_received = bytes_received; - if (s->err.http_ret) { - char buf[16]; - snprintf(buf, sizeof(buf), "%d", s->err.http_ret); - entry.http_status = buf; - } else - entry.http_status = "200"; // default - - entry.error_code = s->err.err_code; - entry.bucket_id = bucket_id; - - bufferlist bl; - ::encode(entry, bl); - - struct tm bdt; - time_t t = entry.time.sec(); - if (s->cct->_conf->rgw_log_object_name_utc) - gmtime_r(&t, &bdt); - else - localtime_r(&t, &bdt); - - int ret = 0; - - if (s->cct->_conf->rgw_ops_log_rados) { - string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt, - s->bucket.bucket_id, entry.bucket); - - rgw_raw_obj obj(store->get_zone_params().log_pool, oid); - - ret = store->append_async(obj, bl.length(), bl); - if (ret == -ENOENT) { - ret = store->create_pool(store->get_zone_params().log_pool); - if (ret < 0) - goto done; - // retry - ret = store->append_async(obj, bl.length(), bl); - } - } - - if (olog) { - olog->log(entry); - } -done: - if (ret < 0) - ldout(s->cct, 0) << "ERROR: failed to log entry" << dendl; - - return ret; -} -