3 # Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
5 # Permission is hereby granted, free of charge, to any person obtaining a
6 # copy of this software and associated documentation files (the "Software"),
7 # to deal in the Software without restriction, including without limitation
8 # the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 # and/or sell copies of the Software, and to permit persons to whom the
10 # Software is furnished to do so, subject to the following conditions:
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 # DEALINGS IN THE SOFTWARE.
30 from threading import Timer
31 from threading import Lock
37 """Construct the common header"""
39 self.event_type = "Info" # use "Info" unless a notification is generated
44 self.functional_role = ""
45 self.reporting_entity_id = ""
46 self.reporting_entity_name = ""
47 self.priority = "Normal" # will be derived from event if there is one
48 self.start_epoch_microsec = 0
49 self.last_epoch_micro_sec = 0
53 """Get the object of the datatype"""
55 obj['version'] = self.version
56 obj['eventType'] = self.event_type
57 obj['domain'] = self.domain
58 obj['eventId'] = self.event_id
59 obj['sourceId'] = self.source_id
60 obj['sourceName'] = self.source_name
61 obj['functionalRole'] = self.functional_role
62 obj['reportingEntityId'] = self.reporting_entity_id
63 obj['reportingEntityName'] = self.reporting_entity_name
64 obj['priority'] = self.priority
65 obj['startEpochMicrosec'] = self.start_epoch_microsec
66 obj['lastEpochMicrosec'] = self.last_epoch_micro_sec
67 obj['sequence'] = self.sequence
70 'commonEventHeader' : obj,
71 self.get_name() : self.get_obj()
76 assert False, 'abstract method get_name() is not implemented'
79 assert False, 'abstract method get_obj() is not implemented'
81 class MeasurementGroup(object):
82 """MeasurementGroup datatype"""
84 def __init__(self, name):
89 def add_measurement(self, name, value):
90 self.measurement.append({
98 'measurements' : self.measurement
101 class MeasurementsForVfScaling(Event):
102 """MeasurementsForVfScaling datatype"""
104 def __init__(self, event_id):
105 """Construct the header"""
106 super(MeasurementsForVfScaling, self).__init__()
108 self.domain = "measurementsForVfScaling"
109 self.event_id = event_id
110 # measurement attributes
111 self.additional_measurements = []
112 self.aggregate_cpu_usage = 0
113 self.codec_usage_array = []
114 self.concurrent_sessions = 0
115 self.configured_entities = 0
116 self.cpu_usage_array = []
118 self.feature_usage_array = []
119 self.filesystem_usage_array = []
120 self.latency_distribution = []
121 self.mean_request_latency = 0
122 self.measurement_fields_version = 1.1
123 self.measurement_interval = 0
124 self.memory_configured = 0
126 self.number_of_media_ports_in_use = 0
127 self.request_rate = 0
128 self.vnfc_scaling_metric = 0
129 self.v_nic_usage_array = []
131 def add_measurement_group(self, group):
132 self.additional_measurements.append(group.get_obj())
134 def add_cpu_usage(self, cpu_identifier, usage):
135 self.cpu_usage_array.append({
136 'cpuIdentifier' : cpu_identifier,
137 'percentUsage' : usage
140 def add_v_nic_usage(self, if_name, if_pkts, if_bytes):
141 self.v_nic_usage_array.append({
142 'broadcastPacketsIn' : 0.0,
143 'broadcastPacketsOut' : 0.0,
144 'multicastPacketsIn' : 0.0,
145 'multicastPacketsOut' : 0.0,
146 'unicastPacketsIn' : 0.0,
147 'unicastPacketsOut' : 0.0,
148 'vNicIdentifier' : if_name,
149 'packetsIn' : if_pkts[0],
150 'packetsOut' : if_pkts[1],
151 'bytesIn' : if_bytes[0],
152 'bytesOut' : if_bytes[1]
156 """Get the object of the datatype"""
158 obj['additionalMeasurements'] = self.additional_measurements
159 obj['aggregateCpuUsage'] = self.aggregate_cpu_usage
160 obj['codecUsageArray'] = self.codec_usage_array
161 obj['concurrentSessions'] = self.concurrent_sessions
162 obj['configuredEntities'] = self.configured_entities
163 obj['cpuUsageArray'] = self.cpu_usage_array
164 obj['errors'] = self.errors
165 obj['featureUsageArray'] = self.feature_usage_array
166 obj['filesystemUsageArray'] = self.filesystem_usage_array
167 obj['latencyDistribution'] = self.latency_distribution
168 obj['meanRequestLatency'] = self.mean_request_latency
169 obj['measurementFieldsVersion'] = self.measurement_fields_version
170 obj['measurementInterval'] = self.measurement_interval
171 obj['memoryConfigured'] = self.memory_configured
172 obj['memoryUsed'] = self.memory_used
173 obj['numberOfMediaPortsInUse'] = self.number_of_media_ports_in_use
174 obj['requestRate'] = self.request_rate
175 obj['vnfcScalingMetric'] = self.vnfc_scaling_metric
176 obj['vNicUsageArray'] = self.v_nic_usage_array
180 """Name of datatype"""
181 return "measurementsForVfScalingFields"
186 def __init__(self, event_id):
187 """Construct the header"""
188 super(Fault, self).__init__()
190 self.domain = "fault"
191 self.event_id = event_id
192 self.event_type = "Fault"
194 self.fault_fields_version = 1.1
195 self.event_severity = 'NORMAL'
196 self.event_source_type = 'other(0)'
197 self.alarm_condition = ''
198 self.specific_problem = ''
199 self.vf_status = 'Active'
200 self.alarm_interface_a = ''
201 self.alarm_additional_information = []
204 """Name of datatype"""
208 """Get the object of the datatype"""
210 obj['faultFieldsVersion'] = self.fault_fields_version
211 obj['eventSeverity'] = self.event_severity
212 obj['eventSourceType'] = self.event_source_type
213 obj['alarmCondition'] = self.alarm_condition
214 obj['specificProblem'] = self.specific_problem
215 obj['vfStatus'] = self.vf_status
216 obj['alarmInterfaceA'] = self.alarm_interface_a
217 obj['alarmAdditionalInformation'] = self.alarm_additional_information
220 class VESPlugin(object):
221 """VES plugin with collectd callbacks"""
224 """Plugin initialization"""
225 self.__plugin_data_cache = {
226 'cpu' : {'interval' : 0.0, 'vls' : []},
227 'cpu-aggregation' : {'interval' : 0.0, 'vls' : []},
228 'virt' : {'interval' : 0.0, 'vls' : []},
229 'disk' : {'interval' : 0.0, 'vls' : []},
230 'interface' : {'interval' : 0.0, 'vls' : []},
231 'memory' : {'interval' : 0.0, 'vls' : []}
233 self.__plugin_config = {
234 'Domain' : '127.0.0.1',
241 'SendEventInterval' : 20.0,
242 'FunctionalRole' : 'Collectd VES Agent',
243 'GuestRunning' : False
245 self.__host_name = None
246 self.__ves_timer = None
247 self.__event_timer_interval = 20.0
251 def get_event_id(self):
254 return str(self.__event_id)
257 """Lock the plugin"""
258 self.__lock.acquire()
261 """Unlock the plugin"""
262 self.__lock.release()
264 def start_timer(self):
265 """Start event timer"""
266 self.__ves_timer = Timer(self.__event_timer_interval, self.__on_time)
267 self.__ves_timer.start()
269 def stop_timer(self):
270 """Stop event timer"""
271 self.__ves_timer.cancel()
278 def event_send(self, event):
279 """Send event to VES"""
280 server_url = "http{}://{}:{}/{}eventListener/v1{}".format(
281 's' if self.__plugin_config['UseHttps'] else '', self.__plugin_config['Domain'],
282 int(self.__plugin_config['Port']), '{}/'.format(
283 '/{}'.format(self.__plugin_config['Path'])) if (len(self.__plugin_config['Path']) > 0) else '',
284 self.__plugin_config['Topic'])
285 collectd.info('Vendor Event Listener is at: {}'.format(server_url))
286 credentials = base64.b64encode('{}:{}'.format(
287 self.__plugin_config['Username'], self.__plugin_config['Password']))
288 collectd.info('Authentication credentials are: {}'.format(credentials))
290 request = urllib2.Request(server_url)
291 request.add_header('Authorization', 'Basic {}'.format(credentials))
292 request.add_header('Content-Type', 'application/json')
293 collectd.debug("Sending {} to {}".format(event.get_json(), server_url))
294 vel = urllib2.urlopen(request, event.get_json(), timeout=1)
295 except urllib2.HTTPError as e:
296 collectd.error('Vendor Event Listener exception: {}'.format(e))
297 except urllib2.URLError as e:
298 collectd.error('Vendor Event Listener is is not reachable: {}'.format(e))
300 def bytes_to_gb(self, bytes):
301 """Convert bytes to GB"""
302 return round((bytes / 1073741824.0), 3)
304 def get_hostname(self):
305 if len(self.__host_name):
306 return self.__host_name
307 return socket.gethostname()
309 def event_timer(self):
310 """Event timer thread"""
313 if (self.__plugin_config['GuestRunning']):
314 # if we running on a guest only, send 'additionalMeasurements' only
315 measurement = MeasurementsForVfScaling(self.get_event_id())
316 measurement.functional_role = self.__plugin_config['FunctionalRole']
317 # add host/guest values as additional measurements
318 self.fill_additional_measurements(measurement, exclude_plugins=[
319 'cpu', 'cpu-aggregation', 'memory', 'disk', 'interface', 'virt'])
320 # fill out reporting & source entities
321 reporting_entity = self.get_hostname()
322 measurement.reporting_entity_id = reporting_entity
323 measurement.reporting_entity_name = reporting_entity
324 measurement.source_id = reporting_entity
325 measurement.source_name = measurement.source_id
326 measurement.start_epoch_microsec = (time.time() * 1000000)
327 measurement.measurement_interval = self.__plugin_config['SendEventInterval']
329 total_cpu_system = self.cache_get_value(plugin_name='cpu-aggregation', type_instance='system')
330 total_cpu_user = self.cache_get_value(plugin_name='cpu-aggregation', type_instance='user')
331 measurement.aggregate_cpu_usage = round(total_cpu_system[0]['values'][0] +
332 total_cpu_user[0]['values'][0], 2)
333 # CPU per each instance
334 cpux_system = self.cache_get_value(plugin_name='cpu', type_instance='system',
335 mark_as_read = False)
336 for cpu_inst in [x['plugin_instance'] for x in cpux_system]:
337 cpu_system = self.cache_get_value(plugin_name='cpu',
338 plugin_instance=cpu_inst, type_instance='system')
339 cpu_user = self.cache_get_value(plugin_name='cpu',
340 plugin_instance=cpu_inst, type_instance='user')
341 cpu_usage = round(cpu_system[0]['values'][0] + cpu_user[0]['values'][0], 2)
342 measurement.add_cpu_usage(cpu_inst, cpu_usage)
344 memory_used = self.cache_get_value(plugin_name='memory', type_name='memory', type_instance='used')
345 if len(memory_used) > 0:
346 measurement.memory_used = self.bytes_to_gb(memory_used[0]['values'][0])
349 if_stats = self.cache_get_value(plugin_name='interface', type_name='if_packets')
350 if len(if_stats) > 0:
351 for if_stat in if_stats:
352 ifinfo[if_stat['plugin_instance']] = {
353 'pkts' : (if_stat['values'][0], if_stat['values'][1])
355 # go through all interfaces and get if_octets
356 for if_name in ifinfo.keys():
357 if_stats = self.cache_get_value(plugin_instance=if_name, plugin_name='interface',
358 type_name='if_octets')
359 if len(if_stats) > 0:
360 ifinfo[if_name]['bytes'] = (if_stats[0]['values'][0], if_stats[0]['values'][1])
361 # fill vNicUsageArray filed in the event
362 for if_name in ifinfo.keys():
363 measurement.add_v_nic_usage(if_name, ifinfo[if_name]['pkts'], ifinfo[if_name]['bytes'])
364 # send event to the VES
365 self.event_send(measurement)
367 # get list of all VMs
368 virt_vcpu_total = self.cache_get_value(plugin_name='virt', type_name='virt_cpu_total',
370 vm_names = [x['plugin_instance'] for x in virt_vcpu_total]
371 for vm_name in vm_names:
372 # make sure that 'virt' plugin cache is up-to-date
373 vm_values = self.cache_get_value(plugin_name='virt', plugin_instance=vm_name,
376 for vm_value in vm_values:
377 if vm_value['updated'] == False:
378 us_up_to_date = False
380 if not us_up_to_date:
381 # one of the cache value is not up-to-date, break
382 collectd.warning("virt collectD cache values are not up-to-date for {}".format(vm_name))
384 # if values are up-to-date, create an event message
385 measurement = MeasurementsForVfScaling(self.get_event_id())
386 measurement.functional_role = self.__plugin_config['FunctionalRole']
387 # fill out reporting_entity
388 reporting_entity = '{}-{}-{}'.format(self.get_hostname(), 'virt', vm_name)
389 measurement.reporting_entity_id = reporting_entity
390 measurement.reporting_entity_name = reporting_entity
392 virt_vcpu_total = self.cache_get_value(plugin_instance=vm_name,
393 plugin_name='virt', type_name='virt_cpu_total')
394 if len(virt_vcpu_total) > 0:
395 measurement.aggregate_cpu_usage = self.cpu_ns_to_percentage(virt_vcpu_total[0])
396 # set source as a host for virt_vcpu_total value
397 measurement.source_id = virt_vcpu_total[0]['host']
398 measurement.source_name = measurement.source_id
399 # fill out EpochMicrosec (convert to us)
400 measurement.start_epoch_microsec = (virt_vcpu_total[0]['time'] * 1000000)
402 virt_vcpus = self.cache_get_value(plugin_instance=vm_name,
403 plugin_name='virt', type_name='virt_vcpu')
404 if len(virt_vcpus) > 0:
405 for virt_vcpu in virt_vcpus:
406 cpu_usage = self.cpu_ns_to_percentage(virt_vcpu)
407 measurement.add_cpu_usage(virt_vcpu['type_instance'], cpu_usage)
409 measurement.measurement_interval = self.__plugin_data_cache['virt']['interval']
411 memory_total = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
412 type_name='memory', type_instance='total')
413 if len(memory_total) > 0:
414 measurement.memory_configured = self.bytes_to_gb(memory_total[0]['values'][0])
416 memory_rss = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
417 type_name='memory', type_instance='rss')
418 if len(memory_rss) > 0:
419 measurement.memory_used = self.bytes_to_gb(memory_rss[0]['values'][0])
422 if_stats = self.cache_get_value(plugin_instance=vm_name,
423 plugin_name='virt', type_name='if_packets')
424 if len(if_stats) > 0:
425 for if_stat in if_stats:
426 ifinfo[if_stat['type_instance']] = {
427 'pkts' : (if_stat['values'][0], if_stat['values'][1])
429 # go through all interfaces and get if_octets
430 for if_name in ifinfo.keys():
431 if_stats = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
432 type_name='if_octets', type_instance=if_name)
433 if len(if_stats) > 0:
434 ifinfo[if_name]['bytes'] = (if_stats[0]['values'][0], if_stats[0]['values'][1])
435 # fill vNicUsageArray filed in the event
436 for if_name in ifinfo.keys():
437 measurement.add_v_nic_usage(if_name, ifinfo[if_name]['pkts'], ifinfo[if_name]['bytes'])
438 # add host/guest values as additional measurements
439 self.fill_additional_measurements(measurement, ['virt'])
440 # send event to the VES
441 self.event_send(measurement)
445 def fill_additional_measurements(self, measurement, exclude_plugins=None):
446 """Fill out addition measurement filed with host/guets values"""
447 # add host/guest values as additional measurements
448 for plugin_name in self.__plugin_data_cache.keys():
449 if (exclude_plugins != None and plugin_name in exclude_plugins):
450 # skip host-only values
452 for val in self.__plugin_data_cache[plugin_name]['vls']:
454 mgroup_name = '{}{}{}'.format(plugin_name,
455 '-{}'.format(val['plugin_instance']) if len(val['plugin_instance']) else '',
456 '-{}'.format(val['type_instance']) if len(val['type_instance']) else '')
457 mgroup = MeasurementGroup(mgroup_name)
458 ds = collectd.get_dataset(val['type'])
459 for index in xrange(len(ds)):
460 mname = '{}-{}'.format(val['type'], ds[index][0])
461 mgroup.add_measurement(mname, str(val['values'][index]))
462 measurement.add_measurement_group(mgroup);
463 val['updated'] = False
465 def cpu_ns_to_percentage(self, vl):
466 """Convert CPU usage ns to CPU %"""
467 total = vl['values'][0]
468 total_time = vl['time']
469 pre_total = vl['pre_values'][0]
470 pre_total_time = vl['pre_time']
471 if (total_time - pre_total_time) == 0:
472 # return zero usage if time diff is zero
474 percent = (100.0 * (total - pre_total))/((total_time - pre_total_time) * 1000000000.0)
475 collectd.debug("pre_time={}, pre_value={}, time={}, value={}, cpu={}%".format(
476 pre_total_time, pre_total, total_time, total, round(percent, 2)))
477 return round(percent, 2)
479 def config(self, config):
480 """Collectd config callback"""
481 for child in config.children:
482 # check the config entry name
483 if child.key not in self.__plugin_config:
484 collectd.error("Key '{}' name is invalid".format(child.key))
485 raise RuntimeError('Configuration key name error')
486 # check the config entry value type
487 if len(child.values) == 0 or type(child.values[0]) != type(self.__plugin_config[child.key]):
488 collectd.error("Key '{}' value type should be {}".format(
489 child.key, str(type(self.__plugin_config[child.key]))))
490 raise RuntimeError('Configuration key value error')
491 # store the value in configuration
492 self.__plugin_config[child.key] = child.values[0]
495 """Collectd init callback"""
496 # start the VES timer
500 # Please note, the cache should be locked before using this function
502 def update_cache_value(self, vl):
503 """Update value internal collectD cache values or create new one"""
505 if vl.plugin not in self.__plugin_data_cache:
506 self.__plugin_data_cache[vl.plugin] = {'vls': []}
507 plugin_vl = self.__plugin_data_cache[vl.plugin]['vls']
508 for index in xrange(len(plugin_vl)):
509 # record found, so just update time the values
510 if (plugin_vl[index]['plugin_instance'] ==
511 vl.plugin_instance) and (plugin_vl[index]['type_instance'] ==
512 vl.type_instance) and (plugin_vl[index]['type'] == vl.type):
513 plugin_vl[index]['pre_time'] = plugin_vl[index]['time']
514 plugin_vl[index]['time'] = vl.time
515 plugin_vl[index]['pre_values'] = plugin_vl[index]['values']
516 plugin_vl[index]['values'] = vl.values
517 plugin_vl[index]['updated'] = True
522 # create new cache record
523 value['plugin_instance'] = vl.plugin_instance
524 value['type_instance'] = vl.type_instance
525 value['values'] = vl.values
526 value['pre_values'] = vl.values
527 value['type'] = vl.type
528 value['time'] = vl.time
529 value['pre_time'] = vl.time
530 value['host'] = vl.host
531 value['updated'] = True
532 self.__plugin_data_cache[vl.plugin]['vls'].append(value)
533 # update plugin interval based on one received in the value
534 self.__plugin_data_cache[vl.plugin]['interval'] = vl.interval
536 def cache_get_value(self, plugin_name=None, plugin_instance=None,
537 type_name=None, type_instance=None, type_names=None, mark_as_read=True):
538 """Get cache value by given criteria"""
540 if plugin_name in self.__plugin_data_cache:
541 for val in self.__plugin_data_cache[plugin_name]['vls']:
542 #collectd.info("plugin={}, type={}, type_instance={}".format(
543 # plugin_name, val['type'], val['type_instance']))
544 if (type_name == None or type_name == val['type']) and (plugin_instance == None
545 or plugin_instance == val['plugin_instance']) and (type_instance == None
546 or type_instance == val['type_instance']) and (type_names == None
547 or val['type'] in type_names):
549 val['updated'] = False
553 def write(self, vl, data=None):
554 """Collectd write callback"""
557 # Example of collectD Value format
558 # collectd.Values(type='cpu',type_instance='interrupt',
559 # plugin='cpu',plugin_instance='25',host='localhost',
560 # time=1476694097.022873,interval=10.0,values=[0])
561 if vl.plugin == 'ves_plugin':
562 # store the host name and unregister callback
563 self.__host_name = vl.host
564 collectd.unregister_read(self.read)
566 # update the cache values
567 self.update_cache_value(vl)
571 def read(self, data=None):
572 """Collectd read callback. Use this callback to get host name"""
573 vl = collectd.Values(type='gauge')
574 vl.plugin='ves_plugin'
575 vl.dispatch(values=[0])
578 """Collectd notification callback"""
579 collectd_event_severity_map = {
580 collectd.NOTIF_FAILURE : 'CRITICAL',
581 collectd.NOTIF_WARNING : 'WARNING',
582 collectd.NOTIF_OKAY : 'NORMAL'
584 fault = Fault(self.get_event_id())
585 # fill out common header
586 fault.event_type = "Notification"
587 fault.functional_role = self.__plugin_config['FunctionalRole']
588 fault.reporting_entity_id = self.get_hostname()
589 fault.reporting_entity_name = self.get_hostname()
590 fault.source_id = self.get_hostname()
591 fault.source_name = self.get_hostname()
592 fault.start_epoch_microsec = (n.time * 1000000)
593 fault.last_epoch_micro_sec = fault.start_epoch_microsec
594 # fill out fault header
595 fault.event_severity = collectd_event_severity_map[n.severity]
596 fault.specific_problem = '{}{}'.format('{}-'.format(n.plugin_instance
597 if len(n.plugin_instance) else ''), n.type_instance)
598 fault.alarm_interface_a = '{}{}'.format(n.plugin, '-{}'.format(
599 n.plugin_instance if len(n.plugin_instance) else ''))
600 fault.event_source_type = 'virtualMachine(8)' if self.__plugin_config['GuestRunning'] else 'host(3)'
601 fault.alarm_condition = n.message
602 self.event_send(fault)
605 """Collectd shutdown callback"""
609 # The collectd plugin instance
610 plugin_instance = VESPlugin()
612 # Register plugin callbacks
613 collectd.register_config(plugin_instance.config)
614 collectd.register_init(plugin_instance.init)
615 collectd.register_read(plugin_instance.read)
616 collectd.register_write(plugin_instance.write)
617 collectd.register_notification(plugin_instance.notify)
618 collectd.register_shutdown(plugin_instance.shutdown)