NFVBENCH-42 Add multiple fluentd aggregators support
[nfvbench.git] / nfvbench / fluentd.py
index 16ff33e..628b968 100644 (file)
@@ -28,24 +28,34 @@ class FluentLogHandler(logging.Handler):
     - the level name
     - the runlogdate (to tie multiple run-related logs together)
     The timestamp is retrieved by the fluentd library.
+    There will be only one instance of FluentLogHandler running.
     '''
 
-    def __init__(self, tag, fluentd_ip='127.0.0.1', fluentd_port=24224):
+    def __init__(self, fluentd_configs):
         logging.Handler.__init__(self)
-        self.tag = tag
-        self.formatter = logging.Formatter('%(message)s')
-        self.sender = sender.FluentSender(self.tag, host=fluentd_ip, port=fluentd_port)
+        self.log_senders = []
+        self.result_senders = []
         self.runlogdate = 0
+        self.formatter = logging.Formatter('%(message)s')
+        for fluentd_config in fluentd_configs:
+            if fluentd_config.logging_tag:
+                self.log_senders.append(
+                    sender.FluentSender(fluentd_config.logging_tag, host=fluentd_config.ip,
+                                        port=fluentd_config.port))
+            if fluentd_config.result_tag:
+                self.result_senders.append(
+                    sender.FluentSender(fluentd_config.result_tag, host=fluentd_config.ip,
+                                        port=fluentd_config.port))
         self.__warning_counter = 0
         self.__error_counter = 0
 
     def start_new_run(self):
         '''Delimitate a new run in the stream of records with a new timestamp
         '''
-        self.runlogdate = self.__get_timestamp()
         # reset counters
         self.__warning_counter = 0
         self.__error_counter = 0
+        self.runlogdate = self.__get_timestamp()
         # send start record
         self.__send_start_record()
 
@@ -60,13 +70,15 @@ class FluentLogHandler(logging.Handler):
             data["runlogdate"] = self.runlogdate
 
         self.__update_stats(record.levelno)
-        self.sender.emit(None, data)
+        for log_sender in self.log_senders:
+            log_sender.emit(None, data)
 
-    # this function is called by summarizer
+    # this function is called by summarizer, and used for sending results
     def record_send(self, record):
-        self.sender.emit(None, record)
+        for result_sender in self.result_senders:
+            result_sender.emit(None, record)
 
-    # send START record for each run
+    # send START log record for each run
     def __send_start_record(self):
         data = {
             "runlogdate": self.runlogdate,
@@ -77,7 +89,8 @@ class FluentLogHandler(logging.Handler):
             "numwarnings": 0,
             "@timestamp": self.__get_timestamp()
         }
-        self.sender.emit(None, data)
+        for log_sender in self.log_senders:
+            log_sender.emit(None, data)
 
     # send stats related to the current run and reset state for a new run
     def send_run_summary(self, run_summary_required):
@@ -94,7 +107,8 @@ class FluentLogHandler(logging.Handler):
             # so don't send runlogdate
             if self.runlogdate != 0:
                 data["runlogdate"] = self.runlogdate
-            self.sender.emit(None, data)
+            for log_sender in self.log_senders:
+                log_sender.emit(None, data)
 
     def __get_highest_level(self):
         if self.__error_counter > 0: