2 * Copyright 2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.intentperf;
18 import com.google.common.collect.ImmutableList;
19 import org.apache.felix.scr.annotations.Activate;
20 import org.apache.felix.scr.annotations.Component;
21 import org.apache.felix.scr.annotations.Deactivate;
22 import org.apache.felix.scr.annotations.Reference;
23 import org.apache.felix.scr.annotations.ReferenceCardinality;
24 import org.apache.felix.scr.annotations.Service;
25 import org.onosproject.cluster.ClusterService;
26 import org.onosproject.cluster.ControllerNode;
27 import org.onosproject.cluster.NodeId;
28 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
29 import org.onosproject.store.cluster.messaging.ClusterMessage;
30 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
31 import org.onosproject.store.cluster.messaging.MessageSubject;
32 import org.slf4j.Logger;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.HashMap;
37 import java.util.LinkedList;
38 import java.util.List;
41 import static org.onlab.util.SharedExecutors.getPoolThreadExecutor;
42 import static org.slf4j.LoggerFactory.getLogger;
45 * Collects and distributes performance samples.
47 @Component(immediate = true)
48 @Service(value = IntentPerfCollector.class)
49 public class IntentPerfCollector {
51 private static final long SAMPLE_TIME_WINDOW_MS = 5_000;
52 private final Logger log = getLogger(getClass());
54 private static final int MAX_SAMPLES = 1_000;
56 private final List<Sample> samples = new LinkedList<>();
58 private static final MessageSubject SAMPLE = new MessageSubject("intent-perf-sample");
60 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
61 protected ClusterCommunicationService communicationService;
63 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
64 protected ClusterService clusterService;
66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67 protected IntentPerfUi ui;
69 // Auxiliary structures used to accrue data for normalized time interval
71 private long newestTime;
72 private Sample overall;
73 private Sample current;
75 private ControllerNode[] nodes;
76 private Map<NodeId, Integer> nodeToIndex;
78 private NodeId nodeId;
81 public void activate() {
82 nodeId = clusterService.getLocalNode().id();
84 communicationService.addSubscriber(SAMPLE, new InternalSampleCollector(),
85 getPoolThreadExecutor());
87 nodes = clusterService.getNodes().toArray(new ControllerNode[]{});
88 Arrays.sort(nodes, (a, b) -> a.id().toString().compareTo(b.id().toString()));
90 nodeToIndex = new HashMap<>();
91 for (int i = 0; i < nodes.length; i++) {
92 nodeToIndex.put(nodes[i].id(), i);
96 ui.setCollector(this);
101 public void deactivate() {
102 communicationService.removeSubscriber(SAMPLE);
107 * Clears all previously accumulated data.
109 public void clearSamples() {
111 overall = new Sample(0, nodes.length);
112 current = new Sample(0, nodes.length);
118 * Records a sample point of data about intent operation rate.
120 * @param overallRate overall rate
121 * @param currentRate current rate
123 public void recordSample(double overallRate, double currentRate) {
124 long now = System.currentTimeMillis();
125 addSample(now, nodeId, overallRate, currentRate);
126 broadcastSample(now, nodeId, overallRate, currentRate);
130 * Returns set of node ids as headers.
132 * @return node id headers
134 public List<String> getSampleHeaders() {
135 List<String> headers = new ArrayList<>();
136 for (ControllerNode node : nodes) {
137 headers.add(node.id().toString());
143 * Returns set of all accumulated samples normalized to the local set of
146 * @return accumulated samples
148 public synchronized List<Sample> getSamples() {
149 return ImmutableList.copyOf(samples);
153 * Returns overall throughput performance for each of the cluster nodes.
155 * @return overall intent throughput
157 public synchronized Sample getOverall() {
161 // Records a new sample to our collection of samples
162 private synchronized void addSample(long time, NodeId nodeId,
163 double overallRate, double currentRate) {
164 Sample fullSample = createCurrentSampleIfNeeded(time);
165 setSampleData(current, nodeId, currentRate);
166 setSampleData(overall, nodeId, overallRate);
167 pruneSamplesIfNeeded();
169 if (fullSample != null && ui != null) {
170 ui.reportSample(fullSample);
174 private Sample createCurrentSampleIfNeeded(long time) {
175 Sample oldSample = time - newestTime > SAMPLE_TIME_WINDOW_MS || current.isComplete() ? current : null;
176 if (oldSample != null) {
178 current = new Sample(time, nodes.length);
179 if (oldSample.time > 0) {
180 samples.add(oldSample);
186 private void setSampleData(Sample sample, NodeId nodeId, double data) {
187 Integer index = nodeToIndex.get(nodeId);
189 sample.data[index] = data;
193 private void pruneSamplesIfNeeded() {
194 if (samples.size() > MAX_SAMPLES) {
199 // Performance data sample.
200 static class Sample {
204 public Sample(long time, int nodeCount) {
206 this.data = new double[nodeCount];
207 Arrays.fill(data, -1);
210 public boolean isComplete() {
211 for (int i = 0; i < data.length; i++) {
220 private void broadcastSample(long time, NodeId nodeId, double overallRate, double currentRate) {
221 String data = String.format("%d|%f|%f", time, overallRate, currentRate);
222 communicationService.broadcast(data, SAMPLE, str -> str.getBytes());
225 private class InternalSampleCollector implements ClusterMessageHandler {
227 public void handle(ClusterMessage message) {
228 String[] fields = new String(message.payload()).split("\\|");
229 log.debug("Received sample from {}: {}", message.sender(), fields);
230 addSample(Long.parseLong(fields[0]), message.sender(),
231 Double.parseDouble(fields[1]), Double.parseDouble(fields[2]));