Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / LogClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include "common/LogClient.h"
16 #include "include/str_map.h"
17 #include "messages/MLog.h"
18 #include "messages/MLogAck.h"
19 #include "mon/MonMap.h"
20 #include "common/Graylog.h"
21
22 #ifdef DARWIN
23 #include <sys/param.h>
24 #include <sys/mount.h>
25 #endif // DARWIN
26
27 #define dout_subsys ceph_subsys_monc
28
29 int parse_log_client_options(CephContext *cct,
30                              map<string,string> &log_to_monitors,
31                              map<string,string> &log_to_syslog,
32                              map<string,string> &log_channels,
33                              map<string,string> &log_prios,
34                              map<string,string> &log_to_graylog,
35                              map<string,string> &log_to_graylog_host,
36                              map<string,string> &log_to_graylog_port,
37                              uuid_d &fsid,
38                              string &host)
39 {
40   ostringstream oss;
41
42   int r = get_conf_str_map_helper(cct->_conf->clog_to_monitors, oss,
43                                   &log_to_monitors, CLOG_CONFIG_DEFAULT_KEY);
44   if (r < 0) {
45     lderr(cct) << __func__ << " error parsing 'clog_to_monitors'" << dendl;
46     return r;
47   }
48
49   r = get_conf_str_map_helper(cct->_conf->clog_to_syslog, oss,
50                               &log_to_syslog, CLOG_CONFIG_DEFAULT_KEY);
51   if (r < 0) {
52     lderr(cct) << __func__ << " error parsing 'clog_to_syslog'" << dendl;
53     return r;
54   }
55
56   r = get_conf_str_map_helper(cct->_conf->clog_to_syslog_facility, oss,
57                               &log_channels, CLOG_CONFIG_DEFAULT_KEY);
58   if (r < 0) {
59     lderr(cct) << __func__ << " error parsing 'clog_to_syslog_facility'" << dendl;
60     return r;
61   }
62
63   r = get_conf_str_map_helper(cct->_conf->clog_to_syslog_level, oss,
64                               &log_prios, CLOG_CONFIG_DEFAULT_KEY);
65   if (r < 0) {
66     lderr(cct) << __func__ << " error parsing 'clog_to_syslog_level'" << dendl;
67     return r;
68   }
69
70   r = get_conf_str_map_helper(cct->_conf->clog_to_graylog, oss,
71                               &log_to_graylog, CLOG_CONFIG_DEFAULT_KEY);
72   if (r < 0) {
73     lderr(cct) << __func__ << " error parsing 'clog_to_graylog'" << dendl;
74     return r;
75   }
76
77   r = get_conf_str_map_helper(cct->_conf->clog_to_graylog_host, oss,
78                               &log_to_graylog_host, CLOG_CONFIG_DEFAULT_KEY);
79   if (r < 0) {
80     lderr(cct) << __func__ << " error parsing 'clog_to_graylog_host'" << dendl;
81     return r;
82   }
83
84   r = get_conf_str_map_helper(cct->_conf->clog_to_graylog_port, oss,
85                               &log_to_graylog_port, CLOG_CONFIG_DEFAULT_KEY);
86   if (r < 0) {
87     lderr(cct) << __func__ << " error parsing 'clog_to_graylog_port'" << dendl;
88     return r;
89   }
90
91   fsid = cct->_conf->get_val<uuid_d>("fsid");
92   host = cct->_conf->host;
93   return 0;
94 }
95
96 #undef dout_prefix
97 #define dout_prefix _prefix(_dout, this)
98 static ostream& _prefix(std::ostream *_dout, LogClient *logc) {
99   return *_dout << "log_client ";
100 }
101
102 static ostream& _prefix(std::ostream *_dout, LogChannel *lc) {
103   return *_dout << "log_channel(" << lc->get_log_channel() << ") ";
104 }
105
106 LogChannel::LogChannel(CephContext *cct, LogClient *lc, const string &channel)
107   : cct(cct), parent(lc), channel_lock("LogChannel::channel_lock"),
108     log_channel(channel), log_to_syslog(false), log_to_monitors(false)
109 {
110 }
111
112 LogChannel::LogChannel(CephContext *cct, LogClient *lc,
113                        const string &channel, const string &facility,
114                        const string &prio)
115   : cct(cct), parent(lc), channel_lock("LogChannel::channel_lock"),
116     log_channel(channel), log_prio(prio), syslog_facility(facility),
117     log_to_syslog(false), log_to_monitors(false)
118 {
119 }
120
121 LogClient::LogClient(CephContext *cct, Messenger *m, MonMap *mm,
122                      enum logclient_flag_t flags)
123   : cct(cct), messenger(m), monmap(mm), is_mon(flags & FLAG_MON),
124     log_lock("LogClient::log_lock"), last_log_sent(0), last_log(0)
125 {
126 }
127
128 LogClientTemp::LogClientTemp(clog_type type_, LogChannel &parent_)
129   : type(type_), parent(parent_)
130 {
131 }
132
133 LogClientTemp::LogClientTemp(const LogClientTemp &rhs)
134   : type(rhs.type), parent(rhs.parent)
135 {
136   // don't want to-- nor can we-- copy the ostringstream
137 }
138
139 LogClientTemp::~LogClientTemp()
140 {
141   if (ss.peek() != EOF)
142     parent.do_log(type, ss);
143 }
144
145 void LogChannel::update_config(map<string,string> &log_to_monitors,
146                                map<string,string> &log_to_syslog,
147                                map<string,string> &log_channels,
148                                map<string,string> &log_prios,
149                                map<string,string> &log_to_graylog,
150                                map<string,string> &log_to_graylog_host,
151                                map<string,string> &log_to_graylog_port,
152                                uuid_d &fsid,
153                                string &host)
154 {
155   ldout(cct, 20) << __func__ << " log_to_monitors " << log_to_monitors
156                  << " log_to_syslog " << log_to_syslog
157                  << " log_channels " << log_channels
158                  << " log_prios " << log_prios
159                  << dendl;
160   bool to_monitors = (get_str_map_key(log_to_monitors, log_channel,
161                                       &CLOG_CONFIG_DEFAULT_KEY) == "true");
162   bool to_syslog = (get_str_map_key(log_to_syslog, log_channel,
163                                     &CLOG_CONFIG_DEFAULT_KEY) == "true");
164   string syslog_facility = get_str_map_key(log_channels, log_channel,
165                                            &CLOG_CONFIG_DEFAULT_KEY);
166   string prio = get_str_map_key(log_prios, log_channel,
167                                 &CLOG_CONFIG_DEFAULT_KEY);
168   bool to_graylog = (get_str_map_key(log_to_graylog, log_channel,
169                                      &CLOG_CONFIG_DEFAULT_KEY) == "true");
170   string graylog_host = get_str_map_key(log_to_graylog_host, log_channel,
171                                        &CLOG_CONFIG_DEFAULT_KEY);
172   string graylog_port_str = get_str_map_key(log_to_graylog_port, log_channel,
173                                             &CLOG_CONFIG_DEFAULT_KEY);
174   int graylog_port = atoi(graylog_port_str.c_str());
175
176   set_log_to_monitors(to_monitors);
177   set_log_to_syslog(to_syslog);
178   set_syslog_facility(syslog_facility);
179   set_log_prio(prio);
180
181   if (to_graylog && !graylog) { /* should but isn't */
182     graylog = std::make_shared<ceph::logging::Graylog>("clog");
183   } else if (!to_graylog && graylog) { /* shouldn't but is */
184     graylog.reset();
185   }
186
187   if (to_graylog && graylog) {
188     graylog->set_fsid(fsid);
189     graylog->set_hostname(host);
190   }
191
192   if (graylog && (!graylog_host.empty()) && (graylog_port != 0)) {
193     graylog->set_destination(graylog_host, graylog_port);
194   }
195
196   ldout(cct, 10) << __func__
197                  << " to_monitors: " << (to_monitors ? "true" : "false")
198                  << " to_syslog: " << (to_syslog ? "true" : "false")
199                  << " syslog_facility: " << syslog_facility
200                  << " prio: " << prio
201                  << " to_graylog: " << (to_graylog ? "true" : "false")
202                  << " graylog_host: " << graylog_host
203                  << " graylog_port: " << graylog_port
204                  << ")" << dendl;
205 }
206
207 void LogChannel::do_log(clog_type prio, std::stringstream& ss)
208 {
209   while (!ss.eof()) {
210     string s;
211     getline(ss, s);
212     if (!s.empty())
213       do_log(prio, s);
214   }
215 }
216
217 void LogChannel::do_log(clog_type prio, const std::string& s)
218 {
219   Mutex::Locker l(channel_lock);
220   int lvl = (prio == CLOG_ERROR ? -1 : 0);
221   ldout(cct,lvl) << "log " << prio << " : " << s << dendl;
222   LogEntry e;
223   e.stamp = ceph_clock_now();
224   // seq and who should be set for syslog/graylog/log_to_mon
225   e.who = parent->get_myinst();
226   e.name = parent->get_myname();
227   e.prio = prio;
228   e.msg = s;
229   e.channel = get_log_channel();
230
231   // log to monitor?
232   if (log_to_monitors) {
233     e.seq = parent->queue(e);
234   } else {
235     e.seq = parent->get_next_seq();
236   }
237
238   // log to syslog?
239   if (do_log_to_syslog()) {
240     ldout(cct,0) << __func__ << " log to syslog"  << dendl;
241     e.log_to_syslog(get_log_prio(), get_syslog_facility());
242   }
243
244   // log to graylog?
245   if (do_log_to_graylog()) {
246     ldout(cct,0) << __func__ << " log to graylog"  << dendl;
247     graylog->log_log_entry(&e);
248   }
249 }
250
251 Message *LogClient::get_mon_log_message(bool flush)
252 {
253   Mutex::Locker l(log_lock);
254   if (flush) {
255     if (log_queue.empty())
256       return nullptr;
257     // reset session
258     last_log_sent = log_queue.front().seq;
259   }
260   return _get_mon_log_message();
261 }
262
263 bool LogClient::are_pending()
264 {
265   Mutex::Locker l(log_lock);
266   return last_log > last_log_sent;
267 }
268
269 Message *LogClient::_get_mon_log_message()
270 {
271   assert(log_lock.is_locked());
272   if (log_queue.empty())
273     return NULL;
274
275   // only send entries that haven't been sent yet during this mon
276   // session!  monclient needs to call reset_session() on mon session
277   // reset for this to work right.
278
279   if (last_log_sent == last_log)
280     return NULL;
281
282   // limit entries per message
283   unsigned num_unsent = last_log - last_log_sent;
284   unsigned num_send;
285   if (cct->_conf->mon_client_max_log_entries_per_message > 0)
286     num_send = MIN(num_unsent, (unsigned)cct->_conf->mon_client_max_log_entries_per_message);
287   else
288     num_send = num_unsent;
289
290   ldout(cct,10) << " log_queue is " << log_queue.size() << " last_log " << last_log << " sent " << last_log_sent
291                 << " num " << log_queue.size()
292                 << " unsent " << num_unsent
293                 << " sending " << num_send << dendl;
294   assert(num_unsent <= log_queue.size());
295   std::deque<LogEntry>::iterator p = log_queue.begin();
296   std::deque<LogEntry> o;
297   while (p->seq <= last_log_sent) {
298     ++p;
299     assert(p != log_queue.end());
300   }
301   while (num_send--) {
302     assert(p != log_queue.end());
303     o.push_back(*p);
304     last_log_sent = p->seq;
305     ldout(cct,10) << " will send " << *p << dendl;
306     ++p;
307   }
308   
309   MLog *log = new MLog(monmap->get_fsid());
310   log->entries.swap(o);
311
312   return log;
313 }
314
315 void LogClient::_send_to_mon()
316 {
317   assert(log_lock.is_locked());
318   assert(is_mon);
319   assert(messenger->get_myname().is_mon());
320   ldout(cct,10) << __func__ << "log to self" << dendl;
321   Message *log = _get_mon_log_message();
322   messenger->get_loopback_connection()->send_message(log);
323 }
324
325 version_t LogClient::queue(LogEntry &entry)
326 {
327   Mutex::Locker l(log_lock);
328   entry.seq = ++last_log;
329   log_queue.push_back(entry);
330
331   if (is_mon) {
332     _send_to_mon();
333   }
334
335   return entry.seq;
336 }
337
338 uint64_t LogClient::get_next_seq()
339 {
340   Mutex::Locker l(log_lock);
341   return ++last_log;
342 }
343
344 const entity_inst_t& LogClient::get_myinst()
345 {
346   return messenger->get_myinst();
347 }
348
349 const EntityName& LogClient::get_myname()
350 {
351   return cct->_conf->name;
352 }
353
354 bool LogClient::handle_log_ack(MLogAck *m)
355 {
356   Mutex::Locker l(log_lock);
357   ldout(cct,10) << "handle_log_ack " << *m << dendl;
358
359   version_t last = m->last;
360
361   deque<LogEntry>::iterator q = log_queue.begin();
362   while (q != log_queue.end()) {
363     const LogEntry &entry(*q);
364     if (entry.seq > last)
365       break;
366     ldout(cct,10) << " logged " << entry << dendl;
367     q = log_queue.erase(q);
368   }
369   return true;
370 }
371