8c160e85ddee300283b1ef4b60c95c0d9a92c187
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.intentperf;
17
18 import com.google.common.collect.ImmutableList;
19 import org.apache.felix.scr.annotations.Activate;
20 import org.apache.felix.scr.annotations.Component;
21 import org.apache.felix.scr.annotations.Deactivate;
22 import org.apache.felix.scr.annotations.Reference;
23 import org.apache.felix.scr.annotations.ReferenceCardinality;
24 import org.apache.felix.scr.annotations.Service;
25 import org.onosproject.cluster.ClusterService;
26 import org.onosproject.cluster.ControllerNode;
27 import org.onosproject.cluster.NodeId;
28 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
29 import org.onosproject.store.cluster.messaging.ClusterMessage;
30 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
31 import org.onosproject.store.cluster.messaging.MessageSubject;
32 import org.slf4j.Logger;
33
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.HashMap;
37 import java.util.LinkedList;
38 import java.util.List;
39 import java.util.Map;
40
41 import static org.onlab.util.SharedExecutors.getPoolThreadExecutor;
42 import static org.slf4j.LoggerFactory.getLogger;
43
44 /**
45  * Collects and distributes performance samples.
46  */
47 @Component(immediate = true)
48 @Service(value = IntentPerfCollector.class)
49 public class IntentPerfCollector {
50
51     private static final long SAMPLE_TIME_WINDOW_MS = 5_000;
52     private final Logger log = getLogger(getClass());
53
54     private static final int MAX_SAMPLES = 1_000;
55
56     private final List<Sample> samples = new LinkedList<>();
57
58     private static final MessageSubject SAMPLE = new MessageSubject("intent-perf-sample");
59
60     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
61     protected ClusterCommunicationService communicationService;
62
63     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
64     protected ClusterService clusterService;
65
66     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67     protected IntentPerfUi ui;
68
69     // Auxiliary structures used to accrue data for normalized time interval
70     // across all nodes.
71     private long newestTime;
72     private Sample overall;
73     private Sample current;
74
75     private ControllerNode[] nodes;
76     private Map<NodeId, Integer> nodeToIndex;
77
78     private NodeId nodeId;
79
80     @Activate
81     public void activate() {
82         nodeId = clusterService.getLocalNode().id();
83
84         communicationService.addSubscriber(SAMPLE, new InternalSampleCollector(),
85                                            getPoolThreadExecutor());
86
87         nodes = clusterService.getNodes().toArray(new ControllerNode[]{});
88         Arrays.sort(nodes, (a, b) -> a.id().toString().compareTo(b.id().toString()));
89
90         nodeToIndex = new HashMap<>();
91         for (int i = 0; i < nodes.length; i++) {
92             nodeToIndex.put(nodes[i].id(), i);
93         }
94
95         clearSamples();
96         ui.setCollector(this);
97         log.info("Started");
98     }
99
100     @Deactivate
101     public void deactivate() {
102         communicationService.removeSubscriber(SAMPLE);
103         log.info("Stopped");
104     }
105
106     /**
107      * Clears all previously accumulated data.
108      */
109     public void clearSamples() {
110         newestTime = 0;
111         overall = new Sample(0, nodes.length);
112         current = new Sample(0, nodes.length);
113         samples.clear();
114     }
115
116
117     /**
118      * Records a sample point of data about intent operation rate.
119      *
120      * @param overallRate overall rate
121      * @param currentRate current rate
122      */
123     public void recordSample(double overallRate, double currentRate) {
124         long now = System.currentTimeMillis();
125         addSample(now, nodeId, overallRate, currentRate);
126         broadcastSample(now, nodeId, overallRate, currentRate);
127     }
128
129     /**
130      * Returns set of node ids as headers.
131      *
132      * @return node id headers
133      */
134     public List<String> getSampleHeaders() {
135         List<String> headers = new ArrayList<>();
136         for (ControllerNode node : nodes) {
137             headers.add(node.id().toString());
138         }
139         return headers;
140     }
141
142     /**
143      * Returns set of all accumulated samples normalized to the local set of
144      * samples.
145      *
146      * @return accumulated samples
147      */
148     public synchronized List<Sample> getSamples() {
149         return ImmutableList.copyOf(samples);
150     }
151
152     /**
153      * Returns overall throughput performance for each of the cluster nodes.
154      *
155      * @return overall intent throughput
156      */
157     public synchronized Sample getOverall() {
158         return overall;
159     }
160
161     // Records a new sample to our collection of samples
162     private synchronized void addSample(long time, NodeId nodeId,
163                                         double overallRate, double currentRate) {
164         Sample fullSample = createCurrentSampleIfNeeded(time);
165         setSampleData(current, nodeId, currentRate);
166         setSampleData(overall, nodeId, overallRate);
167         pruneSamplesIfNeeded();
168
169         if (fullSample != null && ui != null) {
170             ui.reportSample(fullSample);
171         }
172     }
173
174     private Sample createCurrentSampleIfNeeded(long time) {
175         Sample oldSample = time - newestTime > SAMPLE_TIME_WINDOW_MS || current.isComplete() ? current : null;
176         if (oldSample != null) {
177             newestTime = time;
178             current = new Sample(time, nodes.length);
179             if (oldSample.time > 0) {
180                 samples.add(oldSample);
181             }
182         }
183         return oldSample;
184     }
185
186     private void setSampleData(Sample sample, NodeId nodeId, double data) {
187         Integer index = nodeToIndex.get(nodeId);
188         if (index != null) {
189             sample.data[index] = data;
190         }
191     }
192
193     private void pruneSamplesIfNeeded() {
194         if (samples.size() > MAX_SAMPLES) {
195             samples.remove(0);
196         }
197     }
198
199     // Performance data sample.
200     static class Sample {
201         final long time;
202         final double[] data;
203
204         public Sample(long time, int nodeCount) {
205             this.time = time;
206             this.data = new double[nodeCount];
207             Arrays.fill(data, -1);
208         }
209
210         public boolean isComplete() {
211             for (int i = 0; i < data.length; i++) {
212                 if (data[i] < 0) {
213                     return false;
214                 }
215             }
216             return true;
217         }
218     }
219
220     private void broadcastSample(long time, NodeId nodeId, double overallRate, double currentRate) {
221         String data = String.format("%d|%f|%f", time, overallRate, currentRate);
222         communicationService.broadcast(data, SAMPLE, str -> str.getBytes());
223     }
224
225     private class InternalSampleCollector implements ClusterMessageHandler {
226         @Override
227         public void handle(ClusterMessage message) {
228             String[] fields = new String(message.payload()).split("\\|");
229             log.debug("Received sample from {}: {}", message.sender(), fields);
230             addSample(Long.parseLong(fields[0]), message.sender(),
231                       Double.parseDouble(fields[1]), Double.parseDouble(fields[2]));
232         }
233     }
234 }