Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_log.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "common/Clock.h"
5 #include "common/Timer.h"
6 #include "common/utf8.h"
7 #include "common/OutputDataSocket.h"
8 #include "common/Formatter.h"
9
10 #include "rgw_bucket.h"
11 #include "rgw_log.h"
12 #include "rgw_acl.h"
13 #include "rgw_rados.h"
14 #include "rgw_client_io.h"
15 #include "rgw_rest.h"
16
17 #define dout_subsys ceph_subsys_rgw
18
19 static void set_param_str(struct req_state *s, const char *name, string& str)
20 {
21   const char *p = s->info.env->get(name);
22   if (p)
23     str = p;
24 }
25
26 string render_log_object_name(const string& format,
27                               struct tm *dt, string& bucket_id,
28                               const string& bucket_name)
29 {
30   string o;
31   for (unsigned i=0; i<format.size(); i++) {
32     if (format[i] == '%' && i+1 < format.size()) {
33       i++;
34       char buf[32];
35       switch (format[i]) {
36       case '%':
37         strcpy(buf, "%");
38         break;
39       case 'Y':
40         sprintf(buf, "%.4d", dt->tm_year + 1900);
41         break;
42       case 'y':
43         sprintf(buf, "%.2d", dt->tm_year % 100);
44         break;
45       case 'm':
46         sprintf(buf, "%.2d", dt->tm_mon + 1);
47         break;
48       case 'd':
49         sprintf(buf, "%.2d", dt->tm_mday);
50         break;
51       case 'H':
52         sprintf(buf, "%.2d", dt->tm_hour);
53         break;
54       case 'I':
55         sprintf(buf, "%.2d", (dt->tm_hour % 12) + 1);
56         break;
57       case 'k':
58         sprintf(buf, "%d", dt->tm_hour);
59         break;
60       case 'l':
61         sprintf(buf, "%d", (dt->tm_hour % 12) + 1);
62         break;
63       case 'M':
64         sprintf(buf, "%.2d", dt->tm_min);
65         break;
66
67       case 'i':
68         o += bucket_id;
69         continue;
70       case 'n':
71         o += bucket_name;
72         continue;
73       default:
74         // unknown code
75         sprintf(buf, "%%%c", format[i]);
76         break;
77       }
78       o += buf;
79       continue;
80     }
81     o += format[i];
82   }
83   return o;
84 }
85
86 /* usage logger */
87 class UsageLogger {
88   CephContext *cct;
89   RGWRados *store;
90   map<rgw_user_bucket, RGWUsageBatch> usage_map;
91   Mutex lock;
92   int32_t num_entries;
93   Mutex timer_lock;
94   SafeTimer timer;
95   utime_t round_timestamp;
96
97   class C_UsageLogTimeout : public Context {
98     UsageLogger *logger;
99   public:
100     explicit C_UsageLogTimeout(UsageLogger *_l) : logger(_l) {}
101     void finish(int r) override {
102       logger->flush();
103       logger->set_timer();
104     }
105   };
106
107   void set_timer() {
108     timer.add_event_after(cct->_conf->rgw_usage_log_tick_interval, new C_UsageLogTimeout(this));
109   }
110 public:
111
112   UsageLogger(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("UsageLogger"), num_entries(0), timer_lock("UsageLogger::timer_lock"), timer(cct, timer_lock) {
113     timer.init();
114     Mutex::Locker l(timer_lock);
115     set_timer();
116     utime_t ts = ceph_clock_now();
117     recalc_round_timestamp(ts);
118   }
119
120   ~UsageLogger() {
121     Mutex::Locker l(timer_lock);
122     flush();
123     timer.cancel_all_events();
124     timer.shutdown();
125   }
126
127   void recalc_round_timestamp(utime_t& ts) {
128     round_timestamp = ts.round_to_hour();
129   }
130
131   void insert_user(utime_t& timestamp, const rgw_user& user, rgw_usage_log_entry& entry) {
132     lock.Lock();
133     if (timestamp.sec() > round_timestamp + 3600)
134       recalc_round_timestamp(timestamp);
135     entry.epoch = round_timestamp.sec();
136     bool account;
137     string u = user.to_str();
138     rgw_user_bucket ub(u, entry.bucket);
139     real_time rt = round_timestamp.to_real_time();
140     usage_map[ub].insert(rt, entry, &account);
141     if (account)
142       num_entries++;
143     bool need_flush = (num_entries > cct->_conf->rgw_usage_log_flush_threshold);
144     lock.Unlock();
145     if (need_flush) {
146       Mutex::Locker l(timer_lock);
147       flush();
148     }
149   }
150
151   void insert(utime_t& timestamp, rgw_usage_log_entry& entry) {
152     if (entry.payer.empty()) {
153       insert_user(timestamp, entry.owner, entry);
154     } else {
155       insert_user(timestamp, entry.payer, entry);
156     }
157   }
158
159   void flush() {
160     map<rgw_user_bucket, RGWUsageBatch> old_map;
161     lock.Lock();
162     old_map.swap(usage_map);
163     num_entries = 0;
164     lock.Unlock();
165
166     store->log_usage(old_map);
167   }
168 };
169
170 static UsageLogger *usage_logger = NULL;
171
172 void rgw_log_usage_init(CephContext *cct, RGWRados *store)
173 {
174   usage_logger = new UsageLogger(cct, store);
175 }
176
177 void rgw_log_usage_finalize()
178 {
179   delete usage_logger;
180   usage_logger = NULL;
181 }
182
183 static void log_usage(struct req_state *s, const string& op_name)
184 {
185   if (s->system_request) /* don't log system user operations */
186     return;
187
188   if (!usage_logger)
189     return;
190
191   rgw_user user;
192   rgw_user payer;
193   string bucket_name;
194
195   bucket_name = s->bucket_name;
196
197   if (!bucket_name.empty()) {
198     user = s->bucket_owner.get_id();
199     if (s->bucket_info.requester_pays) {
200       payer = s->user->user_id;
201     }
202   } else {
203       user = s->user->user_id;
204   }
205
206   bool error = s->err.is_err();
207   if (error && s->err.http_ret == 404) {
208     bucket_name = "-"; /* bucket not found, use the invalid '-' as bucket name */
209   }
210
211   string u = user.to_str();
212   string p = payer.to_str();
213   rgw_usage_log_entry entry(u, p, bucket_name);
214
215   uint64_t bytes_sent = ACCOUNTING_IO(s)->get_bytes_sent();
216   uint64_t bytes_received = ACCOUNTING_IO(s)->get_bytes_received();
217
218   rgw_usage_data data(bytes_sent, bytes_received);
219
220   data.ops = 1;
221   if (!s->is_err())
222     data.successful_ops = 1;
223
224   ldout(s->cct, 30) << "log_usage: bucket_name=" << bucket_name
225         << " tenant=" << s->bucket_tenant
226         << ", bytes_sent=" << bytes_sent << ", bytes_received="
227         << bytes_received << ", success=" << data.successful_ops << dendl;
228
229   entry.add(op_name, data);
230
231   utime_t ts = ceph_clock_now();
232
233   usage_logger->insert(ts, entry);
234 }
235
236 void rgw_format_ops_log_entry(struct rgw_log_entry& entry, Formatter *formatter)
237 {
238   formatter->open_object_section("log_entry");
239   formatter->dump_string("bucket", entry.bucket);
240   entry.time.gmtime(formatter->dump_stream("time"));      // UTC
241   entry.time.localtime(formatter->dump_stream("time_local"));
242   formatter->dump_string("remote_addr", entry.remote_addr);
243   string obj_owner = entry.object_owner.to_str();
244   if (obj_owner.length())
245     formatter->dump_string("object_owner", obj_owner);
246   formatter->dump_string("user", entry.user);
247   formatter->dump_string("operation", entry.op);
248   formatter->dump_string("uri", entry.uri);
249   formatter->dump_string("http_status", entry.http_status);
250   formatter->dump_string("error_code", entry.error_code);
251   formatter->dump_int("bytes_sent", entry.bytes_sent);
252   formatter->dump_int("bytes_received", entry.bytes_received);
253   formatter->dump_int("object_size", entry.obj_size);
254   uint64_t total_time =  entry.total_time.sec() * 1000000LL + entry.total_time.usec();
255
256   formatter->dump_int("total_time", total_time);
257   formatter->dump_string("user_agent",  entry.user_agent);
258   formatter->dump_string("referrer",  entry.referrer);
259   if (entry.x_headers.size() > 0) {
260     formatter->open_array_section("http_x_headers");
261     for (const auto& iter: entry.x_headers) {
262       formatter->open_object_section(iter.first.c_str());
263       formatter->dump_string(iter.first.c_str(), iter.second);
264       formatter->close_section();
265     }
266     formatter->close_section();
267   }
268   formatter->close_section();
269 }
270
271 void OpsLogSocket::formatter_to_bl(bufferlist& bl)
272 {
273   stringstream ss;
274   formatter->flush(ss);
275   const string& s = ss.str();
276
277   bl.append(s);
278 }
279
280 void OpsLogSocket::init_connection(bufferlist& bl)
281 {
282   bl.append("[");
283 }
284
285 OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog), lock("OpsLogSocket")
286 {
287   formatter = new JSONFormatter;
288   delim.append(",\n");
289 }
290
291 OpsLogSocket::~OpsLogSocket()
292 {
293   delete formatter;
294 }
295
296 void OpsLogSocket::log(struct rgw_log_entry& entry)
297 {
298   bufferlist bl;
299
300   lock.Lock();
301   rgw_format_ops_log_entry(entry, formatter);
302   formatter_to_bl(bl);
303   lock.Unlock();
304
305   append_output(bl);
306 }
307
308 int rgw_log_op(RGWRados *store, RGWREST* const rest, struct req_state *s,
309                const string& op_name, OpsLogSocket *olog)
310 {
311   struct rgw_log_entry entry;
312   string bucket_id;
313
314   if (s->enable_usage_log)
315     log_usage(s, op_name);
316
317   if (!s->enable_ops_log)
318     return 0;
319
320   if (s->bucket_name.empty()) {
321     ldout(s->cct, 5) << "nothing to log for operation" << dendl;
322     return -EINVAL;
323   }
324   if (s->err.ret == -ERR_NO_SUCH_BUCKET) {
325     if (!s->cct->_conf->rgw_log_nonexistent_bucket) {
326       ldout(s->cct, 5) << "bucket " << s->bucket << " doesn't exist, not logging" << dendl;
327       return 0;
328     }
329     bucket_id = "";
330   } else {
331     bucket_id = s->bucket.bucket_id;
332   }
333   rgw_make_bucket_entry_name(s->bucket_tenant, s->bucket_name, entry.bucket);
334
335   if (check_utf8(entry.bucket.c_str(), entry.bucket.size()) != 0) {
336     ldout(s->cct, 5) << "not logging op on bucket with non-utf8 name" << dendl;
337     return 0;
338   }
339
340   if (!s->object.empty()) {
341     entry.obj = s->object;
342   } else {
343     entry.obj = rgw_obj_key("-");
344   }
345
346   entry.obj_size = s->obj_size;
347
348   if (s->cct->_conf->rgw_remote_addr_param.length())
349     set_param_str(s, s->cct->_conf->rgw_remote_addr_param.c_str(),
350                   entry.remote_addr);
351   else
352     set_param_str(s, "REMOTE_ADDR", entry.remote_addr);
353   set_param_str(s, "HTTP_USER_AGENT", entry.user_agent);
354   set_param_str(s, "HTTP_REFERRER", entry.referrer);
355   set_param_str(s, "REQUEST_URI", entry.uri);
356   set_param_str(s, "REQUEST_METHOD", entry.op);
357
358   /* custom header logging */
359   if (rest) {
360     if (rest->log_x_headers()) {
361       for (const auto& iter : s->info.env->get_map()) {
362         if (rest->log_x_header(iter.first)) {
363           entry.x_headers.insert(
364             rgw_log_entry::headers_map::value_type(iter.first, iter.second));
365         }
366       }
367     }
368   }
369
370   entry.user = s->user->user_id.to_str();
371   if (s->object_acl)
372     entry.object_owner = s->object_acl->get_owner().get_id();
373   entry.bucket_owner = s->bucket_owner.get_id();
374
375   uint64_t bytes_sent = ACCOUNTING_IO(s)->get_bytes_sent();
376   uint64_t bytes_received = ACCOUNTING_IO(s)->get_bytes_received();
377
378   entry.time = s->time;
379   entry.total_time = ceph_clock_now() - s->time;
380   entry.bytes_sent = bytes_sent;
381   entry.bytes_received = bytes_received;
382   if (s->err.http_ret) {
383     char buf[16];
384     snprintf(buf, sizeof(buf), "%d", s->err.http_ret);
385     entry.http_status = buf;
386   } else
387     entry.http_status = "200"; // default
388
389   entry.error_code = s->err.err_code;
390   entry.bucket_id = bucket_id;
391
392   bufferlist bl;
393   ::encode(entry, bl);
394
395   struct tm bdt;
396   time_t t = entry.time.sec();
397   if (s->cct->_conf->rgw_log_object_name_utc)
398     gmtime_r(&t, &bdt);
399   else
400     localtime_r(&t, &bdt);
401
402   int ret = 0;
403
404   if (s->cct->_conf->rgw_ops_log_rados) {
405     string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt,
406                                         s->bucket.bucket_id, entry.bucket);
407
408     rgw_raw_obj obj(store->get_zone_params().log_pool, oid);
409
410     ret = store->append_async(obj, bl.length(), bl);
411     if (ret == -ENOENT) {
412       ret = store->create_pool(store->get_zone_params().log_pool);
413       if (ret < 0)
414         goto done;
415       // retry
416       ret = store->append_async(obj, bl.length(), bl);
417     }
418   }
419
420   if (olog) {
421     olog->log(entry);
422   }
423 done:
424   if (ret < 0)
425     ldout(s->cct, 0) << "ERROR: failed to log entry" << dendl;
426
427   return ret;
428 }
429