NFVBENCH-42 Add multiple fluentd aggregators support 99/45399/6
authorKerim Gokarslan <kgokarsl@cisco.com>
Tue, 17 Oct 2017 22:17:44 +0000 (15:17 -0700)
committerKerim Gokarslan <kgokarsl@cisco.com>
Wed, 18 Oct 2017 17:01:59 +0000 (10:01 -0700)
Change-Id: I5b752f9ad4f7b4a60f2678d22467db570e02ab82
Signed-off-by: Kerim Gokarslan <kgokarsl@cisco.com>
docs/testing/user/userguide/fluentd.rst
nfvbench/cfg.default.yaml
nfvbench/fluentd.py
nfvbench/nfvbench.py
nfvbench/nfvbenchd.py
test/test_nfvbench.py

index 465d0a3..889d598 100644 (file)
@@ -187,4 +187,4 @@ And the results of this command obtained from fluentd by elasticsearch:
       "sort": [
         1508264203755
       ]
-    }
\ No newline at end of file
+    }
index c26991c..2892d11 100644 (file)
@@ -367,24 +367,32 @@ debug: false
 # Defaults to disabled
 log_file:
 
-# When enabled, all logs will be sent to a fluentd server at the requested IP and port
-# The fluentd "tag" and "label" fields for every message will be set to "nfvbench"
+# When enabled, all results and/or logs will be sent to a fluentd servers at the requested IPs and ports
+# A list of one or more fluentd servers identified by their IPs and  port numbers should be given.
+# For each recipient it is possible to enable both sending logs and performance
+# results, or enable either logs or performance results. For enabling logs or results logging_tag or
+# result_tag should be set.
+
 fluentd:
-    # by default (logging_tag is empty) nfvbench log messages are not sent to fluentd
-    # to enable logging to fluents, specify a valid fluentd tag name to be used for the
-    # log records
-    logging_tag:
+      # by default (logging_tag is empty) nfvbench log messages are not sent to fluentd
+      # to enable logging to fluents, specify a valid fluentd tag name to be used for the
+      # log records
+    - logging_tag:
+
+      # by default (result_tag is empty) nfvbench results are not sent to fluentd
+      # to enable sending nfvbench results to fluentd, specify a valid fluentd tag name
+      # to be used for the results records, which is different than logging_tag
+      result_tag:
 
-    # by default (result_tag is empty) nfvbench results are not sent to fluentd
-    # to enable sending nfvbench results to fluentd, specify a valid fluentd tag name
-    # to be used for the results records, which is different than logging_tag
-    result_tag:
+      # IP address of the server, defaults to loopback
+      ip: 127.0.0.1
 
-    # IP address of the server, defaults to loopback
-    ip: 127.0.0.1
+      # port # to use, by default, use the default fluentd forward port
+      port: 24224
 
-    # port # to use, by default, use the default fluentd forward port
-    port: 24224
+      # by default (logging_tag is empty) nfvbench log messages are not sent to fluentd
+      # to enable logging to fluents, specify a valid fluentd tag name to be used for the
+      # log records
 
 # Module and class name of factory which will be used to provide classes dynamically for other components.
 factory_module: 'nfvbench.factory'
index 16ff33e..628b968 100644 (file)
@@ -28,24 +28,34 @@ class FluentLogHandler(logging.Handler):
     - the level name
     - the runlogdate (to tie multiple run-related logs together)
     The timestamp is retrieved by the fluentd library.
+    There will be only one instance of FluentLogHandler running.
     '''
 
-    def __init__(self, tag, fluentd_ip='127.0.0.1', fluentd_port=24224):
+    def __init__(self, fluentd_configs):
         logging.Handler.__init__(self)
-        self.tag = tag
-        self.formatter = logging.Formatter('%(message)s')
-        self.sender = sender.FluentSender(self.tag, host=fluentd_ip, port=fluentd_port)
+        self.log_senders = []
+        self.result_senders = []
         self.runlogdate = 0
+        self.formatter = logging.Formatter('%(message)s')
+        for fluentd_config in fluentd_configs:
+            if fluentd_config.logging_tag:
+                self.log_senders.append(
+                    sender.FluentSender(fluentd_config.logging_tag, host=fluentd_config.ip,
+                                        port=fluentd_config.port))
+            if fluentd_config.result_tag:
+                self.result_senders.append(
+                    sender.FluentSender(fluentd_config.result_tag, host=fluentd_config.ip,
+                                        port=fluentd_config.port))
         self.__warning_counter = 0
         self.__error_counter = 0
 
     def start_new_run(self):
         '''Delimitate a new run in the stream of records with a new timestamp
         '''
-        self.runlogdate = self.__get_timestamp()
         # reset counters
         self.__warning_counter = 0
         self.__error_counter = 0
+        self.runlogdate = self.__get_timestamp()
         # send start record
         self.__send_start_record()
 
@@ -60,13 +70,15 @@ class FluentLogHandler(logging.Handler):
             data["runlogdate"] = self.runlogdate
 
         self.__update_stats(record.levelno)
-        self.sender.emit(None, data)
+        for log_sender in self.log_senders:
+            log_sender.emit(None, data)
 
-    # this function is called by summarizer
+    # this function is called by summarizer, and used for sending results
     def record_send(self, record):
-        self.sender.emit(None, record)
+        for result_sender in self.result_senders:
+            result_sender.emit(None, record)
 
-    # send START record for each run
+    # send START log record for each run
     def __send_start_record(self):
         data = {
             "runlogdate": self.runlogdate,
@@ -77,7 +89,8 @@ class FluentLogHandler(logging.Handler):
             "numwarnings": 0,
             "@timestamp": self.__get_timestamp()
         }
-        self.sender.emit(None, data)
+        for log_sender in self.log_senders:
+            log_sender.emit(None, data)
 
     # send stats related to the current run and reset state for a new run
     def send_run_summary(self, run_summary_required):
@@ -94,7 +107,8 @@ class FluentLogHandler(logging.Handler):
             # so don't send runlogdate
             if self.runlogdate != 0:
                 data["runlogdate"] = self.runlogdate
-            self.sender.emit(None, data)
+            for log_sender in self.log_senders:
+                log_sender.emit(None, data)
 
     def __get_highest_level(self):
         if self.__error_counter > 0:
index bbee4f4..d1bd0d9 100644 (file)
@@ -131,13 +131,7 @@ class NFVBench(object):
     def prepare_summary(self, result):
         """Prepares summary of the result to print and send it to logger (eg: fluentd)"""
         global fluent_logger
-        sender = None
-        if self.config.fluentd.result_tag:
-            sender = FluentLogHandler(self.config.fluentd.result_tag,
-                                      fluentd_ip=self.config.fluentd.ip,
-                                      fluentd_port=self.config.fluentd.port)
-            sender.runlogdate = fluent_logger.runlogdate
-        summary = NFVBenchSummarizer(result, sender)
+        summary = NFVBenchSummarizer(result, fluent_logger)
         LOG.info(str(summary))
 
     def save(self, result):
@@ -446,18 +440,17 @@ def main():
         config = config_plugin.get_config()
         openstack_spec = config_plugin.get_openstack_spec()
 
-        # setup the fluent logger as soon as possible right after the config plugin is called
-        if config.fluentd.logging_tag:
-            fluent_logger = FluentLogHandler(config.fluentd.logging_tag,
-                                             fluentd_ip=config.fluentd.ip,
-                                             fluentd_port=config.fluentd.port)
-            LOG.addHandler(fluent_logger)
-        else:
-            fluent_logger = None
-
         opts, unknown_opts = parse_opts_from_cli()
         log.set_level(debug=opts.debug)
 
+        # setup the fluent logger as soon as possible right after the config plugin is called,
+        # if there is any logging or result tag is set then initialize the fluent logger
+        for fluentd in config.fluentd:
+            if fluentd.logging_tag or fluentd.result_tag:
+                fluent_logger = FluentLogHandler(config.fluentd)
+                LOG.addHandler(fluent_logger)
+                break
+
         if opts.version:
             print pbr.version.VersionInfo('nfvbench').version_string_with_vcs()
             sys.exit(0)
@@ -467,14 +460,7 @@ def main():
                 result = json.load(json_data)
                 if opts.user_label:
                     result['config']['user_label'] = opts.user_label
-                if config.fluentd.result_tag:
-                    sender = FluentLogHandler(config.fluentd.result_tag,
-                                              fluentd_ip=config.fluentd.ip,
-                                              fluentd_port=config.fluentd.port)
-                    sender.runlogdate = fluent_logger.runlogdate
-                    print NFVBenchSummarizer(result, sender)
-                else:
-                    print NFVBenchSummarizer(result, None)
+                print NFVBenchSummarizer(result, fluent_logger)
             sys.exit(0)
 
         # show default config in text/yaml format
@@ -531,8 +517,7 @@ def main():
 
         if opts.server:
             if os.path.isdir(opts.server):
-                server = WebSocketIoServer(opts.server, nfvbench_instance, fluent_logger,
-                                           config.fluentd.result_tag)
+                server = WebSocketIoServer(opts.server, nfvbench_instance, fluent_logger)
                 nfvbench_instance.set_notifier(server)
                 try:
                     port = int(opts.port)
index 1797496..4772700 100644 (file)
@@ -26,7 +26,6 @@ from flask import request
 
 from flask_socketio import emit
 from flask_socketio import SocketIO
-from fluentd import FluentLogHandler
 from summarizer import NFVBenchSummarizer
 
 from log import LOG
@@ -210,17 +209,10 @@ class WebSocketIoServer(object):
     of this class and pass a runner object then invoke the run method
     """
 
-    def __init__(self, http_root, runner, logger, result_tag):
+    def __init__(self, http_root, runner, fluent_logger):
         self.nfvbench_runner = runner
         setup_flask(http_root)
-        self.fluent_logger = logger
-        self.result_fluent_logger = None
-        if result_tag:
-            self.result_fluent_logger = \
-                FluentLogHandler(result_tag,
-                                 fluentd_ip=self.fluent_logger.sender.host,
-                                 fluentd_port=self.fluent_logger.sender.port)
-            self.result_fluent_logger.runlogdate = self.fluent_logger.runlogdate
+        self.fluent_logger = fluent_logger
 
     def run(self, host='127.0.0.1', port=7556):
 
@@ -240,6 +232,8 @@ class WebSocketIoServer(object):
                 # remove unfilled values as we do not want them to override default values with None
                 config = {k: v for k, v in config.items() if v is not None}
                 with RunLock():
+                    if self.fluent_logger:
+                        self.fluent_logger.start_new_run()
                     results = self.nfvbench_runner.run(config, config)
             except Exception as exc:
                 print 'NFVbench runner exception:'
@@ -252,9 +246,7 @@ class WebSocketIoServer(object):
             else:
                 # this might overwrite a previously unfetched result
                 Ctx.set_result(results)
-            if self.fluent_logger:
-                self.result_fluent_logger.runlogdate = self.fluent_logger.runlogdate
-            summary = NFVBenchSummarizer(results['result'], self.result_fluent_logger)
+            summary = NFVBenchSummarizer(results['result'], self.fluent_logger)
             LOG.info(str(summary))
             Ctx.release()
             if self.fluent_logger:
index 85342bb..2578407 100644 (file)
@@ -786,11 +786,44 @@ def test_config():
 
 def test_fluentd():
     logger = logging.getLogger('fluent-logger')
-    handler = FluentLogHandler('nfvbench', fluentd_port=7081)
+
+    class FluentdConfig(dict):
+        def __getattr__(self, attr):
+            return self.get(attr)
+
+    fluentd_configs = [
+        FluentdConfig({
+            'logging_tag': 'nfvbench',
+            'result_tag': 'resultnfvbench',
+            'ip': '127.0.0.1',
+            'port': 7081
+        }),
+        FluentdConfig({
+            'logging_tag': 'nfvbench',
+            'result_tag': 'resultnfvbench',
+            'ip': '127.0.0.1',
+            'port': 24224
+        }),
+        FluentdConfig({
+            'logging_tag': None,
+            'result_tag': 'resultnfvbench',
+            'ip': '127.0.0.1',
+            'port': 7082
+        }),
+        FluentdConfig({
+            'logging_tag': 'nfvbench',
+            'result_tag': None,
+            'ip': '127.0.0.1',
+            'port': 7083
+        })
+    ]
+
+    handler = FluentLogHandler(fluentd_configs=fluentd_configs)
     logger.addHandler(handler)
     logger.setLevel(logging.INFO)
     logger.info('test')
     logger.warning('test %d', 100)
+
     try:
         raise Exception("test")
     except Exception: