Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mon / LogMonitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include <boost/algorithm/string/predicate.hpp>
16
17 #include <sstream>
18 #include <syslog.h>
19
20 #include "LogMonitor.h"
21 #include "Monitor.h"
22 #include "MonitorDBStore.h"
23
24 #include "messages/MMonCommand.h"
25 #include "messages/MLog.h"
26 #include "messages/MLogAck.h"
27 #include "common/Graylog.h"
28 #include "common/errno.h"
29 #include "common/strtol.h"
30 #include "include/assert.h"
31 #include "include/str_list.h"
32 #include "include/str_map.h"
33 #include "include/compat.h"
34
35 #define dout_subsys ceph_subsys_mon
36 #undef dout_prefix
37 #define dout_prefix _prefix(_dout, mon, get_last_committed())
38 static ostream& _prefix(std::ostream *_dout, Monitor *mon, version_t v) {
39   return *_dout << "mon." << mon->name << "@" << mon->rank
40                 << "(" << mon->get_state_name()
41                 << ").log v" << v << " ";
42 }
43
44 ostream& operator<<(ostream &out, const LogMonitor &pm)
45 {
46   return out << "log";
47 }
48
49 /*
50  Tick function to update the map based on performance every N seconds
51 */
52
53 void LogMonitor::tick() 
54 {
55   if (!is_active()) return;
56
57   dout(10) << *this << dendl;
58
59 }
60
61 void LogMonitor::create_initial()
62 {
63   dout(10) << "create_initial -- creating initial map" << dendl;
64   LogEntry e;
65   memset(&e.who, 0, sizeof(e.who));
66   e.name = g_conf->name;
67   e.stamp = ceph_clock_now();
68   e.prio = CLOG_INFO;
69   std::stringstream ss;
70   ss << "mkfs " << mon->monmap->get_fsid();
71   e.msg = ss.str();
72   e.seq = 0;
73   pending_log.insert(pair<utime_t,LogEntry>(e.stamp, e));
74 }
75
76 void LogMonitor::update_from_paxos(bool *need_bootstrap)
77 {
78   dout(10) << __func__ << dendl;
79   version_t version = get_last_committed();
80   dout(10) << __func__ << " version " << version
81            << " summary v " << summary.version << dendl;
82   if (version == summary.version)
83     return;
84   assert(version >= summary.version);
85
86   map<string,bufferlist> channel_blog;
87
88   version_t latest_full = get_version_latest_full();
89   dout(10) << __func__ << " latest full " << latest_full << dendl;
90   if ((latest_full > 0) && (latest_full > summary.version)) {
91     bufferlist latest_bl;
92     get_version_full(latest_full, latest_bl);
93     assert(latest_bl.length() != 0);
94     dout(7) << __func__ << " loading summary e" << latest_full << dendl;
95     bufferlist::iterator p = latest_bl.begin();
96     ::decode(summary, p);
97     dout(7) << __func__ << " loaded summary e" << summary.version << dendl;
98   }
99
100   // walk through incrementals
101   while (version > summary.version) {
102     bufferlist bl;
103     int err = get_version(summary.version+1, bl);
104     assert(err == 0);
105     assert(bl.length());
106
107     bufferlist::iterator p = bl.begin();
108     __u8 v;
109     ::decode(v, p);
110     while (!p.end()) {
111       LogEntry le;
112       le.decode(p);
113       dout(7) << "update_from_paxos applying incremental log " << summary.version+1 <<  " " << le << dendl;
114
115       string channel = le.channel;
116       if (channel.empty()) // keep retrocompatibility
117         channel = CLOG_CHANNEL_CLUSTER;
118
119       if (channels.do_log_to_syslog(channel)) {
120         string level = channels.get_level(channel);
121         string facility = channels.get_facility(channel);
122         if (level.empty() || facility.empty()) {
123           derr << __func__ << " unable to log to syslog -- level or facility"
124                << " not defined (level: " << level << ", facility: "
125                << facility << ")" << dendl;
126           continue;
127         }
128         le.log_to_syslog(channels.get_level(channel),
129                          channels.get_facility(channel));
130       }
131
132       if (channels.do_log_to_graylog(channel)) {
133         ceph::logging::Graylog::Ref graylog = channels.get_graylog(channel);
134         if (graylog) {
135           graylog->log_log_entry(&le);
136         }
137         dout(7) << "graylog: " << channel << " " << graylog
138                 << " host:" << channels.log_to_graylog_host << dendl;
139       }
140
141       string log_file = channels.get_log_file(channel);
142       dout(20) << __func__ << " logging for channel '" << channel
143                << "' to file '" << log_file << "'" << dendl;
144
145       if (!log_file.empty()) {
146         string log_file_level = channels.get_log_file_level(channel);
147         if (log_file_level.empty()) {
148           dout(1) << __func__ << " warning: log file level not defined for"
149                   << " channel '" << channel << "' yet a log file is --"
150                   << " will assume lowest level possible" << dendl;
151         }
152
153         int min = string_to_syslog_level(log_file_level);
154         int l = clog_type_to_syslog_level(le.prio);
155         if (l <= min) {
156           stringstream ss;
157           ss << le << "\n";
158           // init entry if DNE
159           bufferlist &blog = channel_blog[channel];
160           blog.append(ss.str());
161         }
162       }
163
164       summary.add(le);
165     }
166
167     summary.version++;
168     summary.prune(g_conf->mon_log_max_summary);
169   }
170
171   dout(15) << __func__ << " logging for "
172            << channel_blog.size() << " channels" << dendl;
173   for(map<string,bufferlist>::iterator p = channel_blog.begin();
174       p != channel_blog.end(); ++p) {
175     if (!p->second.length()) {
176       dout(15) << __func__ << " channel '" << p->first
177                << "': nothing to log" << dendl;
178       continue;
179     }
180
181     dout(15) << __func__ << " channel '" << p->first
182              << "' logging " << p->second.length() << " bytes" << dendl;
183     string log_file = channels.get_log_file(p->first);
184
185     int fd = ::open(log_file.c_str(), O_WRONLY|O_APPEND|O_CREAT, 0600);
186     if (fd < 0) {
187       int err = -errno;
188       dout(1) << "unable to write to '" << log_file << "' for channel '"
189               << p->first << "': " << cpp_strerror(err) << dendl;
190     } else {
191       int err = p->second.write_fd(fd);
192       if (err < 0) {
193         dout(1) << "error writing to '" << log_file << "' for channel '"
194                 << p->first << ": " << cpp_strerror(err) << dendl;
195       }
196       VOID_TEMP_FAILURE_RETRY(::close(fd));
197     }
198   }
199
200   check_subs();
201 }
202
203 void LogMonitor::create_pending()
204 {
205   pending_log.clear();
206   pending_summary = summary;
207   dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl;
208 }
209
210 void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t)
211 {
212   version_t version = get_last_committed() + 1;
213   bufferlist bl;
214   dout(10) << __func__ << " v" << version << dendl;
215   __u8 v = 1;
216   ::encode(v, bl);
217   multimap<utime_t,LogEntry>::iterator p;
218   for (p = pending_log.begin(); p != pending_log.end(); ++p)
219     p->second.encode(bl, mon->get_quorum_con_features());
220
221   put_version(t, version, bl);
222   put_last_committed(t, version);
223 }
224
225 void LogMonitor::encode_full(MonitorDBStore::TransactionRef t)
226 {
227   dout(10) << __func__ << " log v " << summary.version << dendl;
228   assert(get_last_committed() == summary.version);
229
230   bufferlist summary_bl;
231   ::encode(summary, summary_bl, mon->get_quorum_con_features());
232
233   put_version_full(t, summary.version, summary_bl);
234   put_version_latest_full(t, summary.version);
235 }
236
237 version_t LogMonitor::get_trim_to()
238 {
239   if (!mon->is_leader())
240     return 0;
241
242   unsigned max = g_conf->mon_max_log_epochs;
243   version_t version = get_last_committed();
244   if (version > max)
245     return version - max;
246   return 0;
247 }
248
249 bool LogMonitor::preprocess_query(MonOpRequestRef op)
250 {
251   op->mark_logmon_event("preprocess_query");
252   PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
253   dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
254   switch (m->get_type()) {
255   case MSG_MON_COMMAND:
256     return preprocess_command(op);
257
258   case MSG_LOG:
259     return preprocess_log(op);
260
261   default:
262     ceph_abort();
263     return true;
264   }
265 }
266
267 bool LogMonitor::prepare_update(MonOpRequestRef op)
268 {
269   op->mark_logmon_event("prepare_update");
270   PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
271   dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
272   switch (m->get_type()) {
273   case MSG_MON_COMMAND:
274     return prepare_command(op);
275   case MSG_LOG:
276     return prepare_log(op);
277   default:
278     ceph_abort();
279     return false;
280   }
281 }
282
283 bool LogMonitor::preprocess_log(MonOpRequestRef op)
284 {
285   op->mark_logmon_event("preprocess_log");
286   MLog *m = static_cast<MLog*>(op->get_req());
287   dout(10) << "preprocess_log " << *m << " from " << m->get_orig_source() << dendl;
288   int num_new = 0;
289
290   MonSession *session = m->get_session();
291   if (!session)
292     goto done;
293   if (!session->is_capable("log", MON_CAP_W)) {
294     dout(0) << "preprocess_log got MLog from entity with insufficient privileges "
295             << session->caps << dendl;
296     goto done;
297   }
298   
299   for (deque<LogEntry>::iterator p = m->entries.begin();
300        p != m->entries.end();
301        ++p) {
302     if (!pending_summary.contains(p->key()))
303       num_new++;
304   }
305   if (!num_new) {
306     dout(10) << "  nothing new" << dendl;
307     goto done;
308   }
309
310   return false;
311
312  done:
313   return true;
314 }
315
316 struct LogMonitor::C_Log : public C_MonOp {
317   LogMonitor *logmon;
318   C_Log(LogMonitor *p, MonOpRequestRef o) :
319     C_MonOp(o), logmon(p) {}
320   void _finish(int r) override {
321     if (r == -ECANCELED) {
322       return;
323     }
324     logmon->_updated_log(op);
325   }
326 };
327
328 bool LogMonitor::prepare_log(MonOpRequestRef op) 
329 {
330   op->mark_logmon_event("prepare_log");
331   MLog *m = static_cast<MLog*>(op->get_req());
332   dout(10) << "prepare_log " << *m << " from " << m->get_orig_source() << dendl;
333
334   if (m->fsid != mon->monmap->fsid) {
335     dout(0) << "handle_log on fsid " << m->fsid << " != " << mon->monmap->fsid 
336             << dendl;
337     return false;
338   }
339
340   for (deque<LogEntry>::iterator p = m->entries.begin();
341        p != m->entries.end();
342        ++p) {
343     dout(10) << " logging " << *p << dendl;
344     if (!pending_summary.contains(p->key())) {
345       pending_summary.add(*p);
346       pending_log.insert(pair<utime_t,LogEntry>(p->stamp, *p));
347     }
348   }
349   pending_summary.prune(g_conf->mon_log_max_summary);
350   wait_for_finished_proposal(op, new C_Log(this, op));
351   return true;
352 }
353
354 void LogMonitor::_updated_log(MonOpRequestRef op)
355 {
356   MLog *m = static_cast<MLog*>(op->get_req());
357   dout(7) << "_updated_log for " << m->get_orig_source_inst() << dendl;
358   mon->send_reply(op, new MLogAck(m->fsid, m->entries.rbegin()->seq));
359 }
360
361 bool LogMonitor::should_propose(double& delay)
362 {
363   // commit now if we have a lot of pending events
364   if (g_conf->mon_max_log_entries_per_event > 0 &&
365       pending_log.size() >= (unsigned)g_conf->mon_max_log_entries_per_event)
366     return true;
367
368   // otherwise fall back to generic policy
369   return PaxosService::should_propose(delay);
370 }
371
372
373 bool LogMonitor::preprocess_command(MonOpRequestRef op)
374 {
375   op->mark_logmon_event("preprocess_command");
376   MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
377   int r = -EINVAL;
378   bufferlist rdata;
379   stringstream ss;
380
381   map<string, cmd_vartype> cmdmap;
382   if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
383     string rs = ss.str();
384     mon->reply_command(op, -EINVAL, rs, get_last_committed());
385     return true;
386   }
387   MonSession *session = m->get_session();
388   if (!session) {
389     mon->reply_command(op, -EACCES, "access denied", get_last_committed());
390     return true;
391   }
392
393   string prefix;
394   cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
395
396   string format;
397   cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
398   boost::scoped_ptr<Formatter> f(Formatter::create(format));
399
400   if (prefix == "log last") {
401     int64_t num = 20;
402     cmd_getval(g_ceph_context, cmdmap, "num", num);
403     if (f) {
404       f->open_array_section("tail");
405     }
406
407     std::string level_str;
408     clog_type level;
409     if (cmd_getval(g_ceph_context, cmdmap, "level", level_str)) {
410       level = LogEntry::str_to_level(level_str);
411       if (level == CLOG_UNKNOWN) {
412         ss << "Invalid severity '" << level_str << "'";
413         mon->reply_command(op, -EINVAL, ss.str(), get_last_committed());
414         return true;
415       }
416     } else {
417       level = CLOG_INFO;
418     }
419
420     std::string channel;
421     if (!cmd_getval(g_ceph_context, cmdmap, "channel", channel)) {
422       channel = CLOG_CHANNEL_DEFAULT;
423     }
424
425     // We'll apply this twice, once while counting out lines
426     // and once while outputting them.
427     auto match = [level, channel](const LogEntry &entry) {
428       return entry.prio >= level && (entry.channel == channel || channel == "*");
429     };
430
431     auto rp = summary.tail.rbegin();
432     for (; num > 0 && rp != summary.tail.rend(); ++rp) {
433       if (match(*rp)) {
434         num--;
435       }
436     }
437     if (rp == summary.tail.rend()) {
438       --rp;
439     }
440     ostringstream ss;
441     for (; rp != summary.tail.rbegin(); --rp) {
442       if (!match(*rp)) {
443         continue;
444       }
445
446       if (f) {
447         f->dump_object("entry", *rp);
448       } else {
449         ss << *rp << "\n";
450       }
451     }
452     if (f) {
453       f->close_section();
454       f->flush(rdata);
455     } else {
456       rdata.append(ss.str());
457     }
458     r = 0;
459   } else {
460     return false;
461   }
462
463   string rs;
464   getline(ss, rs);
465   mon->reply_command(op, r, rs, rdata, get_last_committed());
466   return true;
467 }
468
469
470 bool LogMonitor::prepare_command(MonOpRequestRef op)
471 {
472   op->mark_logmon_event("prepare_command");
473   MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
474   stringstream ss;
475   string rs;
476   int err = -EINVAL;
477
478   map<string, cmd_vartype> cmdmap;
479   if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
480     // ss has reason for failure
481     string rs = ss.str();
482     mon->reply_command(op, -EINVAL, rs, get_last_committed());
483     return true;
484   }
485
486   string prefix;
487   cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
488
489   MonSession *session = m->get_session();
490   if (!session) {
491     mon->reply_command(op, -EACCES, "access denied", get_last_committed());
492     return true;
493   }
494
495   if (prefix == "log") {
496     vector<string> logtext;
497     cmd_getval(g_ceph_context, cmdmap, "logtext", logtext);
498     LogEntry le;
499     le.who = m->get_orig_source_inst();
500     le.name = session->entity_name;
501     le.stamp = m->get_recv_stamp();
502     le.seq = 0;
503     le.prio = CLOG_INFO;
504     le.channel = CLOG_CHANNEL_DEFAULT;
505     le.msg = str_join(logtext, " ");
506     pending_summary.add(le);
507     pending_summary.prune(g_conf->mon_log_max_summary);
508     pending_log.insert(pair<utime_t,LogEntry>(le.stamp, le));
509     wait_for_finished_proposal(op, new Monitor::C_Command(
510           mon, op, 0, string(), get_last_committed() + 1));
511     return true;
512   }
513
514   getline(ss, rs);
515   mon->reply_command(op, err, rs, get_last_committed());
516   return false;
517 }
518
519
520 int LogMonitor::sub_name_to_id(const string& n)
521 {
522   if (n.substr(0, 4) == "log-" && n.size() > 4) {
523     return LogEntry::str_to_level(n.substr(4));
524   } else {
525     return CLOG_UNKNOWN;
526   }
527 }
528
529 void LogMonitor::check_subs()
530 {
531   dout(10) << __func__ << dendl;
532   for (map<string, xlist<Subscription*>*>::iterator i = mon->session_map.subs.begin();
533        i != mon->session_map.subs.end();
534        ++i) {
535     for (xlist<Subscription*>::iterator j = i->second->begin(); !j.end(); ++j) {
536       if (sub_name_to_id((*j)->type) >= 0)
537         check_sub(*j);
538     }
539   }
540 }
541
542 void LogMonitor::check_sub(Subscription *s)
543 {
544   dout(10) << __func__ << " client wants " << s->type << " ver " << s->next << dendl;
545
546   int sub_level = sub_name_to_id(s->type);
547   assert(sub_level >= 0);
548
549   version_t summary_version = summary.version;
550   if (s->next > summary_version) {
551     dout(10) << __func__ << " client " << s->session->inst 
552             << " requested version (" << s->next << ") is greater than ours (" 
553             << summary_version << "), which means we already sent him" 
554             << " everything we have." << dendl;
555     return;
556   } 
557  
558   MLog *mlog = new MLog(mon->monmap->fsid);
559
560   if (s->next == 0) { 
561     /* First timer, heh? */
562     bool ret = _create_sub_summary(mlog, sub_level);
563     if (!ret) {
564       dout(1) << __func__ << " ret = " << ret << dendl;
565       mlog->put();
566       return;
567     }
568   } else {
569     /* let us send you an incremental log... */
570     _create_sub_incremental(mlog, sub_level, s->next);
571   }
572
573   dout(1) << __func__ << " sending message to " << s->session->inst 
574           << " with " << mlog->entries.size() << " entries"
575           << " (version " << mlog->version << ")" << dendl;
576   
577   if (!mlog->entries.empty()) {
578     s->session->con->send_message(mlog);
579   } else {
580     mlog->put();
581   }
582   if (s->onetime)
583     mon->session_map.remove_sub(s);
584   else
585     s->next = summary_version+1;
586 }
587
588 /**
589  * Create a log message containing only the last message in the summary.
590  *
591  * @param mlog  Log message we'll send to the client.
592  * @param level Maximum log level the client is interested in.
593  * @return      'true' if we consider we successfully populated @mlog;
594  *              'false' otherwise.
595  */
596 bool LogMonitor::_create_sub_summary(MLog *mlog, int level)
597 {
598   dout(10) << __func__ << dendl;
599
600   assert(mlog != NULL);
601
602   if (!summary.tail.size())
603     return false;
604
605   list<LogEntry>::reverse_iterator it = summary.tail.rbegin();
606   for (; it != summary.tail.rend(); ++it) {
607     LogEntry e = *it;
608     if (e.prio < level)
609       continue;
610
611     mlog->entries.push_back(e);
612     mlog->version = summary.version;
613     break;
614   }
615
616   return true;
617 }
618
619 /**
620  * Create an incremental log message from version \p sv to \p summary.version
621  *
622  * @param mlog  Log message we'll send to the client with the messages received
623  *              since version \p sv, inclusive.
624  * @param level The max log level of the messages the client is interested in.
625  * @param sv    The version the client is looking for.
626  */
627 void LogMonitor::_create_sub_incremental(MLog *mlog, int level, version_t sv)
628 {
629   dout(10) << __func__ << " level " << level << " ver " << sv 
630           << " cur summary ver " << summary.version << dendl; 
631
632   if (sv < get_first_committed()) {
633     dout(10) << __func__ << " skipped from " << sv
634              << " to first_committed " << get_first_committed() << dendl;
635     LogEntry le;
636     le.stamp = ceph_clock_now();
637     le.prio = CLOG_WARN;
638     ostringstream ss;
639     ss << "skipped log messages from " << sv << " to " << get_first_committed();
640     le.msg = ss.str();
641     mlog->entries.push_back(le);
642     sv = get_first_committed();
643   }
644
645   version_t summary_ver = summary.version;
646   while (sv <= summary_ver) {
647     bufferlist bl;
648     int err = get_version(sv, bl);
649     assert(err == 0);
650     assert(bl.length());
651     bufferlist::iterator p = bl.begin();
652     __u8 v;
653     ::decode(v,p);
654     while (!p.end()) {
655       LogEntry le;
656       le.decode(p);
657
658       if (le.prio < level) {
659         dout(20) << __func__ << " requested " << level 
660                  << " entry " << le.prio << dendl;
661         continue;
662       }
663
664       mlog->entries.push_back(le);
665     }
666     mlog->version = sv++;
667   }
668
669   dout(10) << __func__ << " incremental message ready (" 
670            << mlog->entries.size() << " entries)" << dendl;
671 }
672
673 void LogMonitor::update_log_channels()
674 {
675   ostringstream oss;
676
677   channels.clear();
678
679   int r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog,
680                                   oss, &channels.log_to_syslog,
681                                   CLOG_CONFIG_DEFAULT_KEY);
682   if (r < 0) {
683     derr << __func__ << " error parsing 'mon_cluster_log_to_syslog'" << dendl;
684     return;
685   }
686
687   r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog_level,
688                               oss, &channels.syslog_level,
689                               CLOG_CONFIG_DEFAULT_KEY);
690   if (r < 0) {
691     derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_level'"
692          << dendl;
693     return;
694   }
695
696   r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog_facility,
697                               oss, &channels.syslog_facility,
698                               CLOG_CONFIG_DEFAULT_KEY);
699   if (r < 0) {
700     derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_facility'"
701          << dendl;
702     return;
703   }
704
705   r = get_conf_str_map_helper(g_conf->mon_cluster_log_file, oss,
706                               &channels.log_file,
707                               CLOG_CONFIG_DEFAULT_KEY);
708   if (r < 0) {
709     derr << __func__ << " error parsing 'mon_cluster_log_file'" << dendl;
710     return;
711   }
712
713   r = get_conf_str_map_helper(g_conf->mon_cluster_log_file_level, oss,
714                               &channels.log_file_level,
715                               CLOG_CONFIG_DEFAULT_KEY);
716   if (r < 0) {
717     derr << __func__ << " error parsing 'mon_cluster_log_file_level'"
718          << dendl;
719     return;
720   }
721
722   r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog, oss,
723                               &channels.log_to_graylog,
724                               CLOG_CONFIG_DEFAULT_KEY);
725   if (r < 0) {
726     derr << __func__ << " error parsing 'mon_cluster_log_to_graylog'"
727          << dendl;
728     return;
729   }
730
731   r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog_host, oss,
732                               &channels.log_to_graylog_host,
733                               CLOG_CONFIG_DEFAULT_KEY);
734   if (r < 0) {
735     derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_host'"
736          << dendl;
737     return;
738   }
739
740   r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog_port, oss,
741                               &channels.log_to_graylog_port,
742                               CLOG_CONFIG_DEFAULT_KEY);
743   if (r < 0) {
744     derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_port'"
745          << dendl;
746     return;
747   }
748
749   channels.expand_channel_meta();
750 }
751
752 void LogMonitor::log_channel_info::expand_channel_meta(map<string,string> &m)
753 {
754   generic_dout(20) << __func__ << " expand map: " << m << dendl;
755   for (map<string,string>::iterator p = m.begin(); p != m.end(); ++p) {
756     m[p->first] = expand_channel_meta(p->second, p->first);
757   }
758   generic_dout(20) << __func__ << " expanded map: " << m << dendl;
759 }
760
761 string LogMonitor::log_channel_info::expand_channel_meta(
762     const string &input,
763     const string &change_to)
764 {
765   size_t pos = string::npos;
766   string s(input);
767   while ((pos = s.find(LOG_META_CHANNEL)) != string::npos) {
768     string tmp = s.substr(0, pos) + change_to;
769     if (pos+LOG_META_CHANNEL.length() < s.length())
770       tmp += s.substr(pos+LOG_META_CHANNEL.length());
771     s = tmp;
772   }
773   generic_dout(20) << __func__ << " from '" << input
774                    << "' to '" << s << "'" << dendl;
775
776   return s;
777 }
778
779 bool LogMonitor::log_channel_info::do_log_to_syslog(const string &channel) {
780   string v = get_str_map_key(log_to_syslog, channel,
781                              &CLOG_CONFIG_DEFAULT_KEY);
782   // We expect booleans, but they are in k/v pairs, kept
783   // as strings, in 'log_to_syslog'. We must ensure
784   // compatibility with existing boolean handling, and so
785   // we are here using a modified version of how
786   // md_config_t::set_val_raw() handles booleans. We will
787   // accept both 'true' and 'false', but will also check for
788   // '1' and '0'. The main distiction between this and the
789   // original code is that we will assume everything not '1',
790   // '0', 'true' or 'false' to be 'false'.
791   bool ret = false;
792
793   if (boost::iequals(v, "false")) {
794     ret = false;
795   } else if (boost::iequals(v, "true")) {
796     ret = true;
797   } else {
798     std::string err;
799     int b = strict_strtol(v.c_str(), 10, &err);
800     ret = (err.empty() && b == 1);
801   }
802
803   return ret;
804 }
805
806 ceph::logging::Graylog::Ref LogMonitor::log_channel_info::get_graylog(
807     const string &channel)
808 {
809   generic_dout(25) << __func__ << " for channel '"
810                    << channel << "'" << dendl;
811
812   if (graylogs.count(channel) == 0) {
813     auto graylog(std::make_shared<ceph::logging::Graylog>("mon"));
814
815     graylog->set_fsid(g_conf->get_val<uuid_d>("fsid"));
816     graylog->set_hostname(g_conf->host);
817     graylog->set_destination(get_str_map_key(log_to_graylog_host, channel,
818                                              &CLOG_CONFIG_DEFAULT_KEY),
819                              atoi(get_str_map_key(log_to_graylog_port, channel,
820                                                   &CLOG_CONFIG_DEFAULT_KEY).c_str()));
821
822     graylogs[channel] = graylog;
823     generic_dout(20) << __func__ << " for channel '"
824                      << channel << "' to graylog host '"
825                      << log_to_graylog_host[channel] << ":"
826                      << log_to_graylog_port[channel]
827                      << "'" << dendl;
828   }
829   return graylogs[channel];
830 }
831
832 void LogMonitor::handle_conf_change(const struct md_config_t *conf,
833                                     const std::set<std::string> &changed)
834 {
835   if (changed.count("mon_cluster_log_to_syslog") ||
836       changed.count("mon_cluster_log_to_syslog_level") ||
837       changed.count("mon_cluster_log_to_syslog_facility") ||
838       changed.count("mon_cluster_log_file") ||
839       changed.count("mon_cluster_log_file_level") ||
840       changed.count("mon_cluster_log_to_graylog") ||
841       changed.count("mon_cluster_log_to_graylog_host") ||
842       changed.count("mon_cluster_log_to_graylog_port")) {
843     update_log_channels();
844   }
845 }