collectd VES agent: collectd VES agent code
[barometer.git] / 3rd_party / collectd-ves-plugin / ves_plugin / ves_plugin.py
1 # MIT License
2 #
3 # Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
4 #
5 # Permission is hereby granted, free of charge, to any person obtaining a
6 # copy of this software and associated documentation files (the "Software"),
7 # to deal in the Software without restriction, including without limitation
8 # the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 # and/or sell copies of the Software, and to permit persons to whom the
10 # Software is furnished to do so, subject to the following conditions:
11 #
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 # DEALINGS IN THE SOFTWARE.
22
23 import collectd
24 import json
25 import sys
26 import base64
27 import urllib2
28 import socket
29 import time
30 from threading import Timer
31 from threading import Lock
32
33 class Event(object):
34     """Event header"""
35
36     def __init__(self):
37         """Construct the common header"""
38         self.version = 1.1
39         self.event_type = "Info" # use "Info" unless a notification is generated
40         self.domain = ""
41         self.event_id = ""
42         self.source_id = ""
43         self.source_name = ""
44         self.functional_role = ""
45         self.reporting_entity_id = ""
46         self.reporting_entity_name = ""
47         self.priority = "Normal" # will be derived from event if there is one
48         self.start_epoch_microsec = 0
49         self.last_epoch_micro_sec = 0
50         self.sequence = 0
51
52     def get_json(self):
53         """Get the object of the datatype"""
54         obj = {}
55         obj['version'] = self.version
56         obj['eventType'] = self.event_type
57         obj['domain'] = self.domain
58         obj['eventId'] = self.event_id
59         obj['sourceId'] = self.source_id
60         obj['sourceName'] = self.source_name
61         obj['functionalRole'] = self.functional_role
62         obj['reportingEntityId'] = self.reporting_entity_id
63         obj['reportingEntityName'] = self.reporting_entity_name
64         obj['priority'] = self.priority
65         obj['startEpochMicrosec'] = self.start_epoch_microsec
66         obj['lastEpochMicrosec'] = self.last_epoch_micro_sec
67         obj['sequence'] = self.sequence
68         return json.dumps({
69             'event' : {
70                 'commonEventHeader' : obj,
71                 self.get_name() : self.get_obj()
72             }
73         })
74
75     def get_name():
76         assert False, 'abstract method get_name() is not implemented'
77
78     def get_obj():
79         assert False, 'abstract method get_obj() is not implemented'
80
81 class MeasurementGroup(object):
82     """MeasurementGroup datatype"""
83
84     def __init__(self, name):
85         self.name = name
86         self.measurement = []
87         pass
88
89     def add_measurement(self, name, value):
90         self.measurement.append({
91             'name' : name,
92             'value' : value
93         })
94
95     def get_obj(self):
96         return {
97             'name' : self.name,
98             'measurements' : self.measurement
99         }
100
101 class MeasurementsForVfScaling(Event):
102     """MeasurementsForVfScaling datatype"""
103
104     def __init__(self, event_id):
105         """Construct the header"""
106         super(MeasurementsForVfScaling, self).__init__()
107         # common attributes
108         self.domain = "measurementsForVfScaling"
109         self.event_id = event_id
110         # measurement attributes
111         self.additional_measurements = []
112         self.aggregate_cpu_usage = 0
113         self.codec_usage_array = []
114         self.concurrent_sessions = 0
115         self.configured_entities = 0
116         self.cpu_usage_array = []
117         self.errors = []
118         self.feature_usage_array = []
119         self.filesystem_usage_array = []
120         self.latency_distribution = []
121         self.mean_request_latency = 0
122         self.measurement_fields_version = 1.1
123         self.measurement_interval = 0
124         self.memory_configured = 0
125         self.memory_used = 0
126         self.number_of_media_ports_in_use = 0
127         self.request_rate = 0
128         self.vnfc_scaling_metric = 0
129         self.v_nic_usage_array = []
130
131     def add_measurement_group(self, group):
132         self.additional_measurements.append(group.get_obj())
133
134     def add_cpu_usage(self, cpu_identifier, usage):
135         self.cpu_usage_array.append({
136             'cpuIdentifier' : cpu_identifier,
137             'percentUsage' : usage
138         })
139
140     def add_v_nic_usage(self, if_name, if_pkts, if_bytes):
141         self.v_nic_usage_array.append({
142             'broadcastPacketsIn' : 0.0,
143             'broadcastPacketsOut' : 0.0,
144             'multicastPacketsIn' : 0.0,
145             'multicastPacketsOut' : 0.0,
146             'unicastPacketsIn' : 0.0,
147             'unicastPacketsOut' : 0.0,
148             'vNicIdentifier' : if_name,
149             'packetsIn' : if_pkts[0],
150             'packetsOut' : if_pkts[1],
151             'bytesIn' : if_bytes[0],
152             'bytesOut' : if_bytes[1]
153         })
154
155     def get_obj(self):
156         """Get the object of the datatype"""
157         obj = {}
158         obj['additionalMeasurements'] = self.additional_measurements
159         obj['aggregateCpuUsage'] = self.aggregate_cpu_usage
160         obj['codecUsageArray'] = self.codec_usage_array
161         obj['concurrentSessions'] = self.concurrent_sessions
162         obj['configuredEntities'] = self.configured_entities
163         obj['cpuUsageArray'] = self.cpu_usage_array
164         obj['errors'] = self.errors
165         obj['featureUsageArray'] = self.feature_usage_array
166         obj['filesystemUsageArray'] = self.filesystem_usage_array
167         obj['latencyDistribution'] = self.latency_distribution
168         obj['meanRequestLatency'] = self.mean_request_latency
169         obj['measurementFieldsVersion'] = self.measurement_fields_version
170         obj['measurementInterval'] = self.measurement_interval
171         obj['memoryConfigured'] = self.memory_configured
172         obj['memoryUsed'] = self.memory_used
173         obj['numberOfMediaPortsInUse'] = self.number_of_media_ports_in_use
174         obj['requestRate'] = self.request_rate
175         obj['vnfcScalingMetric'] = self.vnfc_scaling_metric
176         obj['vNicUsageArray'] = self.v_nic_usage_array
177         return obj
178
179     def get_name(self):
180         """Name of datatype"""
181         return "measurementsForVfScalingFields"
182
183 class Fault(Event):
184     """Fault datatype"""
185
186     def __init__(self, event_id):
187         """Construct the header"""
188         super(Fault, self).__init__()
189         # common attributes
190         self.domain = "fault"
191         self.event_id = event_id
192         self.event_type = "Fault"
193         # fault attributes
194         self.fault_fields_version = 1.1
195         self.event_severity = 'NORMAL'
196         self.event_source_type = 'other(0)'
197         self.alarm_condition = ''
198         self.specific_problem = ''
199         self.vf_status = 'Active'
200         self.alarm_interface_a = ''
201         self.alarm_additional_information = []
202
203     def get_name(self):
204         """Name of datatype"""
205         return 'faultFields'
206
207     def get_obj(self):
208         """Get the object of the datatype"""
209         obj = {}
210         obj['faultFieldsVersion'] = self.fault_fields_version
211         obj['eventSeverity'] = self.event_severity
212         obj['eventSourceType'] = self.event_source_type
213         obj['alarmCondition'] = self.alarm_condition
214         obj['specificProblem'] = self.specific_problem
215         obj['vfStatus'] = self.vf_status
216         obj['alarmInterfaceA'] = self.alarm_interface_a
217         obj['alarmAdditionalInformation'] = self.alarm_additional_information
218         return obj
219
220 class VESPlugin(object):
221     """VES plugin with collectd callbacks"""
222
223     def __init__(self):
224         """Plugin initialization"""
225         self.__plugin_data_cache = {
226             'cpu' : {'interval' : 0.0, 'vls' : []},
227             'cpu-aggregation' : {'interval' : 0.0, 'vls' : []},
228             'virt' : {'interval' : 0.0, 'vls' : []},
229             'disk' : {'interval' : 0.0, 'vls' : []},
230             'interface' : {'interval' : 0.0, 'vls' : []},
231             'memory' : {'interval' : 0.0, 'vls' : []}
232         }
233         self.__plugin_config = {
234             'Domain' : '127.0.0.1',
235             'Port' : 30000.0,
236             'Path' : '',
237             'Username' : '',
238             'Password' : '',
239             'Topic' : '',
240             'UseHttps' : False,
241             'SendEventInterval' : 20.0,
242             'FunctionalRole' : 'Collectd VES Agent',
243             'GuestRunning' : False
244         }
245         self.__host_name = None
246         self.__ves_timer = None
247         self.__event_timer_interval = 20.0
248         self.__lock = Lock()
249         self.__event_id = 0
250
251     def get_event_id(self):
252         """get event id"""
253         self.__event_id += 1
254         return str(self.__event_id)
255
256     def lock(self):
257         """Lock the plugin"""
258         self.__lock.acquire()
259
260     def unlock(self):
261         """Unlock the plugin"""
262         self.__lock.release()
263
264     def start_timer(self):
265         """Start event timer"""
266         self.__ves_timer = Timer(self.__event_timer_interval, self.__on_time)
267         self.__ves_timer.start()
268
269     def stop_timer(self):
270         """Stop event timer"""
271         self.__ves_timer.cancel()
272
273     def __on_time(self):
274         """Timer thread"""
275         self.start_timer()
276         self.event_timer()
277
278     def event_send(self, event):
279         """Send event to VES"""
280         server_url = "http{}://{}:{}/{}eventListener/v1{}".format(
281             's' if self.__plugin_config['UseHttps'] else '', self.__plugin_config['Domain'],
282             int(self.__plugin_config['Port']), '{}/'.format(
283             '/{}'.format(self.__plugin_config['Path'])) if (len(self.__plugin_config['Path']) > 0) else '',
284             self.__plugin_config['Topic'])
285         collectd.info('Vendor Event Listener is at: {}'.format(server_url))
286         credentials = base64.b64encode('{}:{}'.format(
287             self.__plugin_config['Username'], self.__plugin_config['Password']))
288         collectd.info('Authentication credentials are: {}'.format(credentials))
289         try:
290             request = urllib2.Request(server_url)
291             request.add_header('Authorization', 'Basic {}'.format(credentials))
292             request.add_header('Content-Type', 'application/json')
293             collectd.debug("Sending {} to {}".format(event.get_json(), server_url))
294             vel = urllib2.urlopen(request, event.get_json(), timeout=1)
295         except urllib2.HTTPError as e:
296             collectd.error('Vendor Event Listener exception: {}'.format(e))
297         except urllib2.URLError as e:
298             collectd.error('Vendor Event Listener is is not reachable: {}'.format(e))
299
300     def bytes_to_gb(self, bytes):
301         """Convert bytes to GB"""
302         return round((bytes / 1073741824.0), 3)
303
304     def get_hostname(self):
305         if len(self.__host_name):
306             return self.__host_name
307         return socket.gethostname()
308
309     def event_timer(self):
310         """Event timer thread"""
311         self.lock()
312         try:
313             if (self.__plugin_config['GuestRunning']):
314                 # if we running on a guest only, send 'additionalMeasurements' only
315                 measurement = MeasurementsForVfScaling(self.get_event_id())
316                 measurement.functional_role = self.__plugin_config['FunctionalRole']
317                 # add host/guest values as additional measurements
318                 self.fill_additional_measurements(measurement, exclude_plugins=[
319                     'cpu', 'cpu-aggregation', 'memory', 'disk', 'interface', 'virt'])
320                 # fill out reporting & source entities
321                 reporting_entity = self.get_hostname()
322                 measurement.reporting_entity_id = reporting_entity
323                 measurement.reporting_entity_name = reporting_entity
324                 measurement.source_id = reporting_entity
325                 measurement.source_name = measurement.source_id
326                 measurement.start_epoch_microsec = (time.time() * 1000000)
327                 measurement.measurement_interval = self.__plugin_config['SendEventInterval']
328                 # total CPU
329                 total_cpu_system = self.cache_get_value(plugin_name='cpu-aggregation', type_instance='system')
330                 total_cpu_user = self.cache_get_value(plugin_name='cpu-aggregation', type_instance='user')
331                 measurement.aggregate_cpu_usage = round(total_cpu_system[0]['values'][0] +
332                                                     total_cpu_user[0]['values'][0], 2)
333                 # CPU per each instance
334                 cpux_system = self.cache_get_value(plugin_name='cpu', type_instance='system',
335                                                   mark_as_read = False)
336                 for cpu_inst in [x['plugin_instance'] for x in cpux_system]:
337                     cpu_system = self.cache_get_value(plugin_name='cpu',
338                                                       plugin_instance=cpu_inst, type_instance='system')
339                     cpu_user = self.cache_get_value(plugin_name='cpu',
340                                                       plugin_instance=cpu_inst, type_instance='user')
341                     cpu_usage = round(cpu_system[0]['values'][0] + cpu_user[0]['values'][0], 2)
342                     measurement.add_cpu_usage(cpu_inst, cpu_usage)
343                 # fill memory used
344                 memory_used = self.cache_get_value(plugin_name='memory', type_name='memory', type_instance='used')
345                 if len(memory_used) > 0:
346                     measurement.memory_used = self.bytes_to_gb(memory_used[0]['values'][0])
347                 # if_packets
348                 ifinfo = {}
349                 if_stats = self.cache_get_value(plugin_name='interface', type_name='if_packets')
350                 if len(if_stats) > 0:
351                     for if_stat in if_stats:
352                         ifinfo[if_stat['plugin_instance']] = {
353                             'pkts' : (if_stat['values'][0], if_stat['values'][1])
354                         }
355                 # go through all interfaces and get if_octets
356                 for if_name in ifinfo.keys():
357                     if_stats = self.cache_get_value(plugin_instance=if_name, plugin_name='interface',
358                                                     type_name='if_octets')
359                     if len(if_stats) > 0:
360                         ifinfo[if_name]['bytes'] = (if_stats[0]['values'][0], if_stats[0]['values'][1])
361                 # fill vNicUsageArray filed in the event
362                 for if_name in ifinfo.keys():
363                     measurement.add_v_nic_usage(if_name, ifinfo[if_name]['pkts'], ifinfo[if_name]['bytes'])
364                 # send event to the VES
365                 self.event_send(measurement)
366                 return
367             # get list of all VMs
368             virt_vcpu_total = self.cache_get_value(plugin_name='virt', type_name='virt_cpu_total',
369                                                    mark_as_read=False)
370             vm_names = [x['plugin_instance'] for x in virt_vcpu_total]
371             for vm_name in vm_names:
372                 # make sure that 'virt' plugin cache is up-to-date
373                 vm_values = self.cache_get_value(plugin_name='virt', plugin_instance=vm_name,
374                                                  mark_as_read=False)
375                 us_up_to_date = True
376                 for vm_value in vm_values:
377                     if vm_value['updated'] == False:
378                         us_up_to_date = False
379                         break
380                 if not us_up_to_date:
381                         # one of the cache value is not up-to-date, break
382                         collectd.warning("virt collectD cache values are not up-to-date for {}".format(vm_name))
383                         continue
384                 # if values are up-to-date, create an event message
385                 measurement = MeasurementsForVfScaling(self.get_event_id())
386                 measurement.functional_role = self.__plugin_config['FunctionalRole']
387                 # fill out reporting_entity
388                 reporting_entity = '{}-{}-{}'.format(self.get_hostname(), 'virt', vm_name)
389                 measurement.reporting_entity_id = reporting_entity
390                 measurement.reporting_entity_name = reporting_entity
391                 # virt_cpu_total
392                 virt_vcpu_total = self.cache_get_value(plugin_instance=vm_name,
393                                                        plugin_name='virt', type_name='virt_cpu_total')
394                 if len(virt_vcpu_total) > 0:
395                     measurement.aggregate_cpu_usage = self.cpu_ns_to_percentage(virt_vcpu_total[0])
396                     # set source as a host for virt_vcpu_total value
397                     measurement.source_id = virt_vcpu_total[0]['host']
398                     measurement.source_name = measurement.source_id
399                     # fill out EpochMicrosec (convert to us)
400                     measurement.start_epoch_microsec = (virt_vcpu_total[0]['time'] * 1000000)
401                 # virt_vcp
402                 virt_vcpus = self.cache_get_value(plugin_instance=vm_name,
403                                                   plugin_name='virt', type_name='virt_vcpu')
404                 if len(virt_vcpus) > 0:
405                     for virt_vcpu in virt_vcpus:
406                         cpu_usage = self.cpu_ns_to_percentage(virt_vcpu)
407                         measurement.add_cpu_usage(virt_vcpu['type_instance'], cpu_usage)
408                 # plugin interval
409                 measurement.measurement_interval = self.__plugin_data_cache['virt']['interval']
410                 # memory-total
411                 memory_total = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
412                                                     type_name='memory', type_instance='total')
413                 if len(memory_total) > 0:
414                     measurement.memory_configured = self.bytes_to_gb(memory_total[0]['values'][0])
415                 # memory-rss
416                 memory_rss = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
417                                                   type_name='memory', type_instance='rss')
418                 if len(memory_rss) > 0:
419                     measurement.memory_used = self.bytes_to_gb(memory_rss[0]['values'][0])
420                 # if_packets
421                 ifinfo = {}
422                 if_stats = self.cache_get_value(plugin_instance=vm_name,
423                                                 plugin_name='virt', type_name='if_packets')
424                 if len(if_stats) > 0:
425                     for if_stat in if_stats:
426                         ifinfo[if_stat['type_instance']] = {
427                             'pkts' : (if_stat['values'][0], if_stat['values'][1])
428                         }
429                 # go through all interfaces and get if_octets
430                 for if_name in ifinfo.keys():
431                     if_stats = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
432                                                     type_name='if_octets', type_instance=if_name)
433                     if len(if_stats) > 0:
434                         ifinfo[if_name]['bytes'] = (if_stats[0]['values'][0], if_stats[0]['values'][1])
435                 # fill vNicUsageArray filed in the event
436                 for if_name in ifinfo.keys():
437                     measurement.add_v_nic_usage(if_name, ifinfo[if_name]['pkts'], ifinfo[if_name]['bytes'])
438                 # add host/guest values as additional measurements
439                 self.fill_additional_measurements(measurement, ['virt'])
440                 # send event to the VES
441                 self.event_send(measurement)
442         finally:
443             self.unlock()
444
445     def fill_additional_measurements(self, measurement, exclude_plugins=None):
446         """Fill out addition measurement filed with host/guets values"""
447         # add host/guest values as additional measurements
448         for plugin_name in self.__plugin_data_cache.keys():
449             if (exclude_plugins != None and plugin_name in exclude_plugins):
450                 # skip host-only values
451                 continue;
452             for val in self.__plugin_data_cache[plugin_name]['vls']:
453                 if val['updated']:
454                     mgroup_name = '{}{}{}'.format(plugin_name,
455                         '-{}'.format(val['plugin_instance']) if len(val['plugin_instance']) else '',
456                         '-{}'.format(val['type_instance']) if len(val['type_instance']) else '')
457                     mgroup = MeasurementGroup(mgroup_name)
458                     ds = collectd.get_dataset(val['type'])
459                     for index in xrange(len(ds)):
460                         mname = '{}-{}'.format(val['type'], ds[index][0])
461                         mgroup.add_measurement(mname, str(val['values'][index]))
462                     measurement.add_measurement_group(mgroup);
463                     val['updated'] = False
464
465     def cpu_ns_to_percentage(self, vl):
466         """Convert CPU usage ns to CPU %"""
467         total = vl['values'][0]
468         total_time = vl['time']
469         pre_total = vl['pre_values'][0]
470         pre_total_time = vl['pre_time']
471         if (total_time - pre_total_time) == 0:
472             # return zero usage if time diff is zero
473             return 0.0
474         percent = (100.0 * (total - pre_total))/((total_time - pre_total_time) * 1000000000.0)
475         collectd.debug("pre_time={}, pre_value={}, time={}, value={}, cpu={}%".format(
476             pre_total_time, pre_total, total_time, total, round(percent, 2)))
477         return round(percent, 2)
478
479     def config(self, config):
480         """Collectd config callback"""
481         for child in config.children:
482             # check the config entry name
483             if child.key not in self.__plugin_config:
484                 collectd.error("Key '{}' name is invalid".format(child.key))
485                 raise RuntimeError('Configuration key name error')
486             # check the config entry value type
487             if len(child.values) == 0 or type(child.values[0]) != type(self.__plugin_config[child.key]):
488                 collectd.error("Key '{}' value type should be {}".format(
489                                child.key, str(type(self.__plugin_config[child.key]))))
490                 raise RuntimeError('Configuration key value error')
491             # store the value in configuration
492             self.__plugin_config[child.key] = child.values[0]
493
494     def init(self):
495         """Collectd init callback"""
496         # start the VES timer
497         self.start_timer()
498
499     ##
500     # Please note, the cache should be locked before using this function
501     #
502     def update_cache_value(self, vl):
503         """Update value internal collectD cache values or create new one"""
504         found = False
505         if vl.plugin not in self.__plugin_data_cache:
506              self.__plugin_data_cache[vl.plugin] = {'vls': []}
507         plugin_vl = self.__plugin_data_cache[vl.plugin]['vls']
508         for index in xrange(len(plugin_vl)):
509             # record found, so just update time the values
510             if (plugin_vl[index]['plugin_instance'] ==
511                 vl.plugin_instance) and (plugin_vl[index]['type_instance'] ==
512                     vl.type_instance) and (plugin_vl[index]['type'] == vl.type):
513                 plugin_vl[index]['pre_time'] = plugin_vl[index]['time']
514                 plugin_vl[index]['time'] = vl.time
515                 plugin_vl[index]['pre_values'] = plugin_vl[index]['values']
516                 plugin_vl[index]['values'] = vl.values
517                 plugin_vl[index]['updated'] = True
518                 found = True
519                 break
520         if not found:
521             value = {}
522             # create new cache record
523             value['plugin_instance'] = vl.plugin_instance
524             value['type_instance'] = vl.type_instance
525             value['values'] = vl.values
526             value['pre_values'] = vl.values
527             value['type'] = vl.type
528             value['time'] = vl.time
529             value['pre_time'] = vl.time
530             value['host'] = vl.host
531             value['updated'] = True
532             self.__plugin_data_cache[vl.plugin]['vls'].append(value)
533             # update plugin interval based on one received in the value
534             self.__plugin_data_cache[vl.plugin]['interval'] = vl.interval
535
536     def cache_get_value(self, plugin_name=None, plugin_instance=None,
537                         type_name=None, type_instance=None, type_names=None, mark_as_read=True):
538         """Get cache value by given criteria"""
539         ret_list = []
540         if plugin_name in self.__plugin_data_cache:
541             for val in self.__plugin_data_cache[plugin_name]['vls']:
542                 #collectd.info("plugin={}, type={}, type_instance={}".format(
543                 #    plugin_name, val['type'], val['type_instance']))
544                 if (type_name == None or type_name == val['type']) and (plugin_instance == None
545                     or plugin_instance == val['plugin_instance']) and (type_instance == None
546                     or type_instance == val['type_instance']) and (type_names == None
547                     or val['type'] in type_names):
548                     if mark_as_read:
549                         val['updated'] = False
550                     ret_list.append(val)
551         return ret_list
552
553     def write(self, vl, data=None):
554         """Collectd write callback"""
555         self.lock()
556         try:
557             # Example of collectD Value format
558             # collectd.Values(type='cpu',type_instance='interrupt',
559             # plugin='cpu',plugin_instance='25',host='localhost',
560             # time=1476694097.022873,interval=10.0,values=[0])
561             if vl.plugin == 'ves_plugin':
562                 # store the host name and unregister callback
563                 self.__host_name = vl.host
564                 collectd.unregister_read(self.read)
565                 return
566             # update the cache values
567             self.update_cache_value(vl)
568         finally:
569             self.unlock()
570
571     def read(self, data=None):
572         """Collectd read callback. Use this callback to get host name"""
573         vl = collectd.Values(type='gauge')
574         vl.plugin='ves_plugin'
575         vl.dispatch(values=[0])
576
577     def notify(self, n):
578         """Collectd notification callback"""
579         collectd_event_severity_map = {
580             collectd.NOTIF_FAILURE : 'CRITICAL',
581             collectd.NOTIF_WARNING : 'WARNING',
582             collectd.NOTIF_OKAY : 'NORMAL'
583         }
584         fault = Fault(self.get_event_id())
585         # fill out common header
586         fault.event_type = "Notification"
587         fault.functional_role = self.__plugin_config['FunctionalRole']
588         fault.reporting_entity_id = self.get_hostname()
589         fault.reporting_entity_name = self.get_hostname()
590         fault.source_id = self.get_hostname()
591         fault.source_name = self.get_hostname()
592         fault.start_epoch_microsec = (n.time * 1000000)
593         fault.last_epoch_micro_sec = fault.start_epoch_microsec
594         # fill out fault header
595         fault.event_severity = collectd_event_severity_map[n.severity]
596         fault.specific_problem = '{}{}'.format('{}-'.format(n.plugin_instance
597             if len(n.plugin_instance) else ''), n.type_instance)
598         fault.alarm_interface_a = '{}{}'.format(n.plugin, '-{}'.format(
599             n.plugin_instance if len(n.plugin_instance) else ''))
600         fault.event_source_type = 'virtualMachine(8)' if self.__plugin_config['GuestRunning'] else 'host(3)'
601         fault.alarm_condition = n.message
602         self.event_send(fault)
603
604     def shutdown(self):
605         """Collectd shutdown callback"""
606         # stop the timer
607         self.stop_timer()
608
609 # The collectd plugin instance
610 plugin_instance = VESPlugin()
611
612 # Register plugin callbacks
613 collectd.register_config(plugin_instance.config)
614 collectd.register_init(plugin_instance.init)
615 collectd.register_read(plugin_instance.read)
616 collectd.register_write(plugin_instance.write)
617 collectd.register_notification(plugin_instance.notify)
618 collectd.register_shutdown(plugin_instance.shutdown)