de9e9f217025398e579701fcf4885964d5f67e1d
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.intentperf;
17
18 import com.google.common.collect.ArrayListMultimap;
19 import com.google.common.collect.Lists;
20 import com.google.common.collect.Maps;
21 import com.google.common.collect.Multimap;
22 import com.google.common.collect.Sets;
23 import org.apache.commons.lang.math.RandomUtils;
24 import org.apache.felix.scr.annotations.Activate;
25 import org.apache.felix.scr.annotations.Component;
26 import org.apache.felix.scr.annotations.Deactivate;
27 import org.apache.felix.scr.annotations.Modified;
28 import org.apache.felix.scr.annotations.Property;
29 import org.apache.felix.scr.annotations.Reference;
30 import org.apache.felix.scr.annotations.ReferenceCardinality;
31 import org.apache.felix.scr.annotations.Service;
32 import org.onlab.packet.MacAddress;
33 import org.onlab.util.Counter;
34 import org.onosproject.cfg.ComponentConfigService;
35 import org.onosproject.cluster.ClusterService;
36 import org.onosproject.cluster.ControllerNode;
37 import org.onosproject.cluster.NodeId;
38 import org.onosproject.core.ApplicationId;
39 import org.onosproject.core.CoreService;
40 import org.onosproject.mastership.MastershipService;
41 import org.onosproject.net.ConnectPoint;
42 import org.onosproject.net.Device;
43 import org.onosproject.net.PortNumber;
44 import org.onosproject.net.device.DeviceService;
45 import org.onosproject.net.flow.DefaultTrafficSelector;
46 import org.onosproject.net.flow.DefaultTrafficTreatment;
47 import org.onosproject.net.flow.TrafficSelector;
48 import org.onosproject.net.flow.TrafficTreatment;
49 import org.onosproject.net.intent.Intent;
50 import org.onosproject.net.intent.IntentEvent;
51 import org.onosproject.net.intent.IntentListener;
52 import org.onosproject.net.intent.IntentService;
53 import org.onosproject.net.intent.Key;
54 import org.onosproject.net.intent.PartitionService;
55 import org.onosproject.net.intent.PointToPointIntent;
56 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
57 import org.onosproject.store.cluster.messaging.ClusterMessage;
58 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
59 import org.onosproject.store.cluster.messaging.MessageSubject;
60 import org.osgi.service.component.ComponentContext;
61 import org.slf4j.Logger;
62
63 import java.util.ArrayList;
64 import java.util.Collections;
65 import java.util.Dictionary;
66 import java.util.List;
67 import java.util.Map;
68 import java.util.Set;
69 import java.util.Timer;
70 import java.util.TimerTask;
71 import java.util.concurrent.ExecutorService;
72 import java.util.concurrent.Executors;
73 import java.util.concurrent.TimeUnit;
74 import java.util.stream.Collectors;
75
76 import static com.google.common.base.Preconditions.checkState;
77 import static com.google.common.base.Strings.isNullOrEmpty;
78 import static java.lang.String.format;
79 import static java.lang.System.currentTimeMillis;
80 import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
81 import static org.onlab.util.Tools.*;
82 import static org.onosproject.net.intent.IntentEvent.Type.*;
83 import static org.slf4j.LoggerFactory.getLogger;
84
85 /**
86  * Application to test sustained intent throughput.
87  */
88 @Component(immediate = true)
89 @Service(value = IntentPerfInstaller.class)
90 public class IntentPerfInstaller {
91
92     private final Logger log = getLogger(getClass());
93
94     private static final int DEFAULT_NUM_WORKERS = 1;
95
96     private static final int DEFAULT_NUM_KEYS = 40000;
97     private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1000; //ms
98
99     private static final int DEFAULT_NUM_NEIGHBORS = 0;
100
101     private static final int START_DELAY = 5_000; // ms
102     private static final int REPORT_PERIOD = 1_000; //ms
103
104     private static final String START = "start";
105     private static final String STOP = "stop";
106     private static final MessageSubject CONTROL = new MessageSubject("intent-perf-ctl");
107
108     //FIXME add path length
109
110     @Property(name = "numKeys", intValue = DEFAULT_NUM_KEYS,
111             label = "Number of keys (i.e. unique intents) to generate per instance")
112     private int numKeys = DEFAULT_NUM_KEYS;
113
114     //TODO implement numWorkers property
115 //    @Property(name = "numThreads", intValue = DEFAULT_NUM_WORKERS,
116 //              label = "Number of installer threads per instance")
117 //    private int numWokers = DEFAULT_NUM_WORKERS;
118
119     @Property(name = "cyclePeriod", intValue = DEFAULT_GOAL_CYCLE_PERIOD,
120             label = "Goal for cycle period (in ms)")
121     private int cyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
122
123     @Property(name = "numNeighbors", intValue = DEFAULT_NUM_NEIGHBORS,
124             label = "Number of neighbors to generate intents for")
125     private int numNeighbors = DEFAULT_NUM_NEIGHBORS;
126
127     @Reference(cardinality = MANDATORY_UNARY)
128     protected CoreService coreService;
129
130     @Reference(cardinality = MANDATORY_UNARY)
131     protected IntentService intentService;
132
133     @Reference(cardinality = MANDATORY_UNARY)
134     protected ClusterService clusterService;
135
136     @Reference(cardinality = MANDATORY_UNARY)
137     protected DeviceService deviceService;
138
139     @Reference(cardinality = MANDATORY_UNARY)
140     protected MastershipService mastershipService;
141
142     @Reference(cardinality = MANDATORY_UNARY)
143     protected PartitionService partitionService;
144
145     @Reference(cardinality = MANDATORY_UNARY)
146     protected ComponentConfigService configService;
147
148     @Reference(cardinality = MANDATORY_UNARY)
149     protected IntentPerfCollector sampleCollector;
150
151     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
152     protected ClusterCommunicationService communicationService;
153
154     private ExecutorService messageHandlingExecutor;
155
156     private ExecutorService workers;
157     private ApplicationId appId;
158     private Listener listener;
159     private boolean stopped = true;
160
161     private Timer reportTimer;
162
163     // FIXME this variable isn't shared properly between multiple worker threads
164     private int lastKey = 0;
165
166     private IntentPerfUi perfUi;
167     private NodeId nodeId;
168     private TimerTask reporterTask;
169
170     @Activate
171     public void activate(ComponentContext context) {
172         configService.registerProperties(getClass());
173
174         nodeId = clusterService.getLocalNode().id();
175         appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId.toString());
176
177         // TODO: replace with shared timer
178         reportTimer = new Timer("onos-intent-perf-reporter");
179         workers = Executors.newFixedThreadPool(DEFAULT_NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d"));
180
181         // disable flow backups for testing
182         configService.setProperty("org.onosproject.store.flow.impl.DistributedFlowRuleStore",
183                                   "backupEnabled", "false");
184
185         // TODO: replace with shared executor
186         messageHandlingExecutor = Executors.newSingleThreadExecutor(
187                 groupedThreads("onos/perf", "command-handler"));
188
189         communicationService.addSubscriber(CONTROL, new InternalControl(),
190                                            messageHandlingExecutor);
191
192         listener = new Listener();
193         intentService.addListener(listener);
194
195         // TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
196         modify(context);
197     }
198
199     @Deactivate
200     public void deactivate() {
201         stopTestRun();
202
203         configService.unregisterProperties(getClass(), false);
204         messageHandlingExecutor.shutdown();
205         communicationService.removeSubscriber(CONTROL);
206
207         if (listener != null) {
208             reportTimer.cancel();
209             intentService.removeListener(listener);
210             listener = null;
211             reportTimer = null;
212         }
213     }
214
215     @Modified
216     public void modify(ComponentContext context) {
217         if (context == null) {
218             logConfig("Reconfigured");
219             return;
220         }
221
222         Dictionary<?, ?> properties = context.getProperties();
223         int newNumKeys, newCyclePeriod, newNumNeighbors;
224         try {
225             String s = get(properties, "numKeys");
226             newNumKeys = isNullOrEmpty(s) ? numKeys : Integer.parseInt(s.trim());
227
228             s = get(properties, "cyclePeriod");
229             newCyclePeriod = isNullOrEmpty(s) ? cyclePeriod : Integer.parseInt(s.trim());
230
231             s = get(properties, "numNeighbors");
232             newNumNeighbors = isNullOrEmpty(s) ? numNeighbors : Integer.parseInt(s.trim());
233
234         } catch (NumberFormatException | ClassCastException e) {
235             log.warn("Malformed configuration detected; using defaults", e);
236             newNumKeys = DEFAULT_NUM_KEYS;
237             newCyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
238             newNumNeighbors = DEFAULT_NUM_NEIGHBORS;
239         }
240
241         if (newNumKeys != numKeys || newCyclePeriod != cyclePeriod || newNumNeighbors != numNeighbors) {
242             numKeys = newNumKeys;
243             cyclePeriod = newCyclePeriod;
244             numNeighbors = newNumNeighbors;
245             logConfig("Reconfigured");
246         }
247     }
248
249     public void start() {
250         if (stopped) {
251             stopped = false;
252             communicationService.broadcast(START, CONTROL, str -> str.getBytes());
253             startTestRun();
254         }
255     }
256
257     public void stop() {
258         if (!stopped) {
259             communicationService.broadcast(STOP, CONTROL, str -> str.getBytes());
260             stopTestRun();
261         }
262     }
263
264     private void logConfig(String prefix) {
265         log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
266                  prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
267     }
268
269     private void startTestRun() {
270         sampleCollector.clearSamples();
271
272         // adjust numNeighbors and generate list of neighbors
273         numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
274
275         // Schedule reporter task on report period boundary
276         reporterTask = new ReporterTask();
277         reportTimer.scheduleAtFixedRate(reporterTask,
278                                         REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
279                                         REPORT_PERIOD);
280
281         // Submit workers
282         stopped = false;
283         for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
284             workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
285         }
286         log.info("Started test run");
287     }
288
289     private void stopTestRun() {
290         if (reporterTask != null) {
291             reporterTask.cancel();
292             reporterTask = null;
293         }
294
295         try {
296             workers.awaitTermination(5 * cyclePeriod, TimeUnit.MILLISECONDS);
297         } catch (InterruptedException e) {
298             log.warn("Failed to stop worker", e);
299         }
300
301         sampleCollector.recordSample(0, 0);
302         sampleCollector.recordSample(0, 0);
303         stopped = true;
304
305         log.info("Stopped test run");
306     }
307
308     private List<NodeId> getNeighbors() {
309         List<NodeId> nodes = clusterService.getNodes().stream()
310                 .map(ControllerNode::id)
311                 .collect(Collectors.toCollection(ArrayList::new));
312         // sort neighbors by id
313         Collections.sort(nodes, (node1, node2) ->
314                 node1.toString().compareTo(node2.toString()));
315         // rotate the local node to index 0
316         Collections.rotate(nodes, -1 * nodes.indexOf(clusterService.getLocalNode().id()));
317         log.debug("neighbors (raw): {}", nodes); //TODO remove
318         // generate the sub-list that will contain local node and selected neighbors
319         nodes = nodes.subList(0, numNeighbors + 1);
320         log.debug("neighbors: {}", nodes); //TODO remove
321         return nodes;
322     }
323
324     private Intent createIntent(Key key, long mac, NodeId node, Multimap<NodeId, Device> devices) {
325         // choose a random device for which this node is master
326         List<Device> deviceList = devices.get(node).stream().collect(Collectors.toList());
327         Device device = deviceList.get(RandomUtils.nextInt(deviceList.size()));
328
329         //FIXME we currently ignore the path length and always use the same device
330         TrafficSelector selector = DefaultTrafficSelector.builder()
331                 .matchEthDst(MacAddress.valueOf(mac)).build();
332         TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
333         ConnectPoint ingress = new ConnectPoint(device.id(), PortNumber.portNumber(1));
334         ConnectPoint egress = new ConnectPoint(device.id(), PortNumber.portNumber(2));
335
336         return PointToPointIntent.builder()
337                 .appId(appId)
338                 .key(key)
339                 .selector(selector)
340                 .treatment(treatment)
341                 .ingressPoint(ingress)
342                 .egressPoint(egress)
343                 .build();
344     }
345
346     /**
347      * Creates a specified number of intents for testing purposes.
348      *
349      * @param numberOfKeys number of intents
350      * @param pathLength   path depth
351      * @param firstKey     first key to attempt
352      * @return set of intents
353      */
354     private Set<Intent> createIntents(int numberOfKeys, int pathLength, int firstKey) {
355         List<NodeId> neighbors = getNeighbors();
356
357         Multimap<NodeId, Device> devices = ArrayListMultimap.create();
358         deviceService.getAvailableDevices()
359                 .forEach(device -> devices.put(mastershipService.getMasterFor(device.id()), device));
360
361         // ensure that we have at least one device per neighbor
362         neighbors.forEach(node -> checkState(devices.get(node).size() > 0,
363                                              "There are no devices for {}", node));
364
365         // TODO pull this outside so that createIntent can use it
366         // prefix based on node id for keys generated on this instance
367         long keyPrefix = ((long) clusterService.getLocalNode().ip().getIp4Address().toInt()) << 32;
368
369         int maxKeysPerNode = (int) Math.ceil((double) numberOfKeys / neighbors.size());
370         Multimap<NodeId, Intent> intents = ArrayListMultimap.create();
371
372         for (int count = 0, k = firstKey; count < numberOfKeys; k++) {
373             Key key = Key.of(keyPrefix + k, appId);
374
375             NodeId leader = partitionService.getLeader(key);
376             if (!neighbors.contains(leader) || intents.get(leader).size() >= maxKeysPerNode) {
377                 // Bail if we are not sending to this node or we have enough for this node
378                 continue;
379             }
380             intents.put(leader, createIntent(key, keyPrefix + k, leader, devices));
381
382             // Bump up the counter and remember this as the last key used.
383             count++;
384             lastKey = k;
385             if (count % 1000 == 0) {
386                 log.info("Building intents... {} (attempt: {})", count, lastKey);
387             }
388         }
389         checkState(intents.values().size() == numberOfKeys,
390                    "Generated wrong number of intents");
391         log.info("Created {} intents", numberOfKeys);
392         intents.keySet().forEach(node -> log.info("\t{}\t{}", node, intents.get(node).size()));
393
394         return Sets.newHashSet(intents.values());
395     }
396
397     // Submits intent operations.
398     final class Submitter implements Runnable {
399
400         private long lastDuration;
401         private int lastCount;
402
403         private Set<Intent> intents = Sets.newHashSet();
404         private Set<Intent> submitted = Sets.newHashSet();
405         private Set<Intent> withdrawn = Sets.newHashSet();
406
407         private Submitter(Set<Intent> intents) {
408             this.intents = intents;
409             lastCount = numKeys / 4;
410             lastDuration = 1_000; // 1 second
411         }
412
413         @Override
414         public void run() {
415             prime();
416             while (!stopped) {
417                 try {
418                     cycle();
419                 } catch (Exception e) {
420                     log.warn("Exception during cycle", e);
421                 }
422             }
423             clear();
424         }
425
426         private Iterable<Intent> subset(Set<Intent> intents) {
427             List<Intent> subset = Lists.newArrayList(intents);
428             Collections.shuffle(subset);
429             return subset.subList(0, lastCount);
430         }
431
432         // Submits the specified intent.
433         private void submit(Intent intent) {
434             intentService.submit(intent);
435             submitted.add(intent);
436             withdrawn.remove(intent); //TODO could check result here...
437         }
438
439         // Withdraws the specified intent.
440         private void withdraw(Intent intent) {
441             intentService.withdraw(intent);
442             withdrawn.add(intent);
443             submitted.remove(intent); //TODO could check result here...
444         }
445
446         // Primes the cycle.
447         private void prime() {
448             int i = 0;
449             withdrawn.addAll(intents);
450             for (Intent intent : intents) {
451                 submit(intent);
452                 // only submit half of the intents to start
453                 if (i++ >= intents.size() / 2) {
454                     break;
455                 }
456             }
457         }
458
459         private void clear() {
460             submitted.forEach(this::withdraw);
461         }
462
463         // Runs a single operation cycle.
464         private void cycle() {
465             //TODO consider running without rate adjustment
466             adjustRates();
467
468             long start = currentTimeMillis();
469             subset(submitted).forEach(this::withdraw);
470             subset(withdrawn).forEach(this::submit);
471             long delta = currentTimeMillis() - start;
472
473             if (delta > cyclePeriod * 3 || delta < 0) {
474                 log.warn("Cycle took {} ms", delta);
475             }
476
477             int difference = cyclePeriod - (int) delta;
478             if (difference > 0) {
479                 delay(difference);
480             }
481
482             lastDuration = delta;
483         }
484
485         int cycleCount = 0;
486
487         private void adjustRates() {
488
489             int addDelta = Math.max(1000 - cycleCount, 10);
490             double multRatio = Math.min(0.8 + cycleCount * 0.0002, 0.995);
491
492             //FIXME need to iron out the rate adjustment
493             //FIXME we should taper the adjustments over time
494             //FIXME don't just use the lastDuration, take an average
495             if (++cycleCount % 5 == 0) { //TODO: maybe use a timer (we should do this every 5-10 sec)
496                 if (listener.requestThroughput() - listener.processedThroughput() <= 2000 && //was 500
497                         lastDuration <= cyclePeriod) {
498                     lastCount = Math.min(lastCount + addDelta, intents.size() / 2);
499                 } else {
500                     lastCount *= multRatio;
501                 }
502                 log.info("last count: {}, last duration: {} ms (sub: {} vs inst: {})",
503                          lastCount, lastDuration, listener.requestThroughput(), listener.processedThroughput());
504             }
505
506         }
507     }
508
509     // Event listener to monitor throughput.
510     final class Listener implements IntentListener {
511
512         private final Counter runningTotal = new Counter();
513         private volatile Map<IntentEvent.Type, Counter> counters;
514
515         private volatile double processedThroughput = 0;
516         private volatile double requestThroughput = 0;
517
518         public Listener() {
519             counters = initCounters();
520         }
521
522         private Map<IntentEvent.Type, Counter> initCounters() {
523             Map<IntentEvent.Type, Counter> map = Maps.newHashMap();
524             for (IntentEvent.Type type : IntentEvent.Type.values()) {
525                 map.put(type, new Counter());
526             }
527             return map;
528         }
529
530         public double processedThroughput() {
531             return processedThroughput;
532         }
533
534         public double requestThroughput() {
535             return requestThroughput;
536         }
537
538         @Override
539         public void event(IntentEvent event) {
540             if (event.subject().appId().equals(appId)) {
541                 counters.get(event.type()).add(1);
542             }
543         }
544
545         public void report() {
546             Map<IntentEvent.Type, Counter> reportCounters = counters;
547             counters = initCounters();
548
549             // update running total and latest throughput
550             Counter installed = reportCounters.get(INSTALLED);
551             Counter withdrawn = reportCounters.get(WITHDRAWN);
552             processedThroughput = installed.throughput() + withdrawn.throughput();
553             runningTotal.add(installed.total() + withdrawn.total());
554
555             Counter installReq = reportCounters.get(INSTALL_REQ);
556             Counter withdrawReq = reportCounters.get(WITHDRAW_REQ);
557             requestThroughput = installReq.throughput() + withdrawReq.throughput();
558
559             // build the string to report
560             StringBuilder stringBuilder = new StringBuilder();
561             for (IntentEvent.Type type : IntentEvent.Type.values()) {
562                 Counter counter = reportCounters.get(type);
563                 stringBuilder.append(format("%s=%.2f;", type, counter.throughput()));
564             }
565             log.info("Throughput: OVERALL={}; CURRENT={}; {}",
566                      format("%.2f", runningTotal.throughput()),
567                      format("%.2f", processedThroughput),
568                      stringBuilder);
569
570             sampleCollector.recordSample(runningTotal.throughput(),
571                                          processedThroughput);
572         }
573     }
574
575     private class InternalControl implements ClusterMessageHandler {
576         @Override
577         public void handle(ClusterMessage message) {
578             String cmd = new String(message.payload());
579             log.info("Received command {}", cmd);
580             if (cmd.equals(START)) {
581                 startTestRun();
582             } else {
583                 stopTestRun();
584             }
585         }
586     }
587
588     private class ReporterTask extends TimerTask {
589         @Override
590         public void run() {
591             //adjustRates(); // FIXME we currently adjust rates in the cycle thread
592             listener.report();
593         }
594     }
595
596 }