24697933e84fafc30f1386a223c9f417ae928e5d
[onosfw.git] /
1 package org.onosproject.messagingperf;
2
3 import static com.google.common.base.Strings.isNullOrEmpty;
4 import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
5 import static org.onlab.util.Tools.get;
6 import static org.onlab.util.Tools.groupedThreads;
7 import static org.slf4j.LoggerFactory.getLogger;
8
9 import java.util.Dictionary;
10 import java.util.List;
11 import java.util.Objects;
12 import java.util.Set;
13 import java.util.concurrent.CompletableFuture;
14 import java.util.concurrent.Executor;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.ScheduledExecutorService;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicInteger;
20 import java.util.function.Function;
21 import java.util.stream.IntStream;
22
23 import org.apache.felix.scr.annotations.Activate;
24 import org.apache.felix.scr.annotations.Component;
25 import org.apache.felix.scr.annotations.Deactivate;
26 import org.apache.felix.scr.annotations.Modified;
27 import org.apache.felix.scr.annotations.Property;
28 import org.apache.felix.scr.annotations.Reference;
29 import org.apache.felix.scr.annotations.ReferenceCardinality;
30 import org.apache.felix.scr.annotations.Service;
31 import org.onlab.util.BoundedThreadPool;
32 import org.onlab.util.KryoNamespace;
33 import org.onosproject.cfg.ComponentConfigService;
34 import org.onosproject.cluster.ClusterService;
35 import org.onosproject.cluster.NodeId;
36 import org.onosproject.core.CoreService;
37 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
38 import org.onosproject.store.cluster.messaging.MessageSubject;
39 import org.onosproject.store.serializers.KryoNamespaces;
40 import org.onosproject.store.serializers.KryoSerializer;
41 import org.osgi.service.component.ComponentContext;
42 import org.slf4j.Logger;
43
44 import com.google.common.collect.ImmutableList;
45 import com.google.common.collect.ImmutableSet;
46 import com.google.common.collect.Lists;
47 import com.google.common.collect.Sets;
48 import com.google.common.util.concurrent.MoreExecutors;
49
50 /**
51  * Application for measuring cluster messaging performance.
52  */
53 @Component(immediate = true, enabled = true)
54 @Service(value = MessagingPerfApp.class)
55 public class MessagingPerfApp {
56     private final Logger log = getLogger(getClass());
57
58     @Reference(cardinality = MANDATORY_UNARY)
59     protected ClusterService clusterService;
60
61     @Reference(cardinality = MANDATORY_UNARY)
62     protected ClusterCommunicationService communicationService;
63
64     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
65     protected CoreService coreService;
66
67     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68     protected ComponentConfigService configService;
69
70     private static final MessageSubject TEST_UNICAST_MESSAGE_TOPIC =
71             new MessageSubject("net-perf-unicast-message");
72
73     private static final MessageSubject TEST_REQUEST_REPLY_TOPIC =
74             new MessageSubject("net-perf-rr-message");
75
76     private static final int DEFAULT_SENDER_THREAD_POOL_SIZE = 2;
77     private static final int DEFAULT_RECEIVER_THREAD_POOL_SIZE = 2;
78
79     @Property(name = "totalSenderThreads", intValue = DEFAULT_SENDER_THREAD_POOL_SIZE,
80             label = "Number of sender threads")
81     protected int totalSenderThreads = DEFAULT_SENDER_THREAD_POOL_SIZE;
82
83     @Property(name = "totalReceiverThreads", intValue = DEFAULT_RECEIVER_THREAD_POOL_SIZE,
84             label = "Number of receiver threads")
85     protected int totalReceiverThreads = DEFAULT_RECEIVER_THREAD_POOL_SIZE;
86
87     @Property(name = "serializationOn", boolValue = true,
88             label = "Turn serialization on/off")
89     private boolean serializationOn = true;
90
91     @Property(name = "receiveOnIOLoopThread", boolValue = false,
92             label = "Set this to true to handle message on IO thread")
93     private boolean receiveOnIOLoopThread = false;
94
95     protected int reportIntervalSeconds = 1;
96
97     private Executor messageReceivingExecutor;
98
99     private ExecutorService messageSendingExecutor =
100             BoundedThreadPool.newFixedThreadPool(totalSenderThreads,
101                     groupedThreads("onos/messaging-perf-test", "sender-%d"));
102
103     private final ScheduledExecutorService reporter =
104             Executors.newSingleThreadScheduledExecutor(
105                     groupedThreads("onos/net-perf-test", "reporter"));
106
107     private AtomicInteger received = new AtomicInteger(0);
108     private AtomicInteger sent = new AtomicInteger(0);
109     private AtomicInteger attempted = new AtomicInteger(0);
110     private AtomicInteger completed = new AtomicInteger(0);
111
112     protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
113         @Override
114         protected void setupKryoPool() {
115             serializerPool = KryoNamespace.newBuilder()
116                     .register(KryoNamespaces.BASIC)
117                     .register(KryoNamespaces.MISC)
118                     .register(byte[].class)
119                     .register(Data.class)
120                     .build();
121         }
122     };
123
124     private final Data data = new Data().withStringField("test")
125                                 .withListField(Lists.newArrayList("1", "2", "3"))
126                                 .withSetField(Sets.newHashSet("1", "2", "3"));
127     private final byte[] dataBytes = SERIALIZER.encode(new Data().withStringField("test")
128             .withListField(Lists.newArrayList("1", "2", "3"))
129             .withSetField(Sets.newHashSet("1", "2", "3")));
130
131     private Function<Data, byte[]> encoder;
132     private Function<byte[], Data> decoder;
133
134     @Activate
135     public void activate(ComponentContext context) {
136         configService.registerProperties(getClass());
137         setupCodecs();
138         messageReceivingExecutor = receiveOnIOLoopThread
139                 ? MoreExecutors.directExecutor()
140                 : Executors.newFixedThreadPool(
141                         totalReceiverThreads,
142                         groupedThreads("onos/net-perf-test", "receiver-%d"));
143         registerMessageHandlers();
144         startTest();
145         reporter.scheduleWithFixedDelay(this::reportPerformance,
146                 reportIntervalSeconds,
147                 reportIntervalSeconds,
148                 TimeUnit.SECONDS);
149         logConfig("Started");
150     }
151
152     @Deactivate
153     public void deactivate(ComponentContext context) {
154         configService.unregisterProperties(getClass(), false);
155         stopTest();
156         reporter.shutdown();
157         unregisterMessageHandlers();
158         log.info("Stopped.");
159     }
160
161     @Modified
162     public void modified(ComponentContext context) {
163         if (context == null) {
164             totalSenderThreads = DEFAULT_SENDER_THREAD_POOL_SIZE;
165             totalReceiverThreads = DEFAULT_RECEIVER_THREAD_POOL_SIZE;
166             serializationOn = true;
167             receiveOnIOLoopThread = false;
168             return;
169         }
170
171         Dictionary properties = context.getProperties();
172
173         int newTotalSenderThreads = totalSenderThreads;
174         int newTotalReceiverThreads = totalReceiverThreads;
175         boolean newSerializationOn = serializationOn;
176         boolean newReceiveOnIOLoopThread = receiveOnIOLoopThread;
177         try {
178             String s = get(properties, "totalSenderThreads");
179             newTotalSenderThreads = isNullOrEmpty(s)
180                     ? totalSenderThreads : Integer.parseInt(s.trim());
181
182             s = get(properties, "totalReceiverThreads");
183             newTotalReceiverThreads = isNullOrEmpty(s)
184                     ? totalReceiverThreads : Integer.parseInt(s.trim());
185
186             s = get(properties, "serializationOn");
187             newSerializationOn = isNullOrEmpty(s)
188                     ? serializationOn : Boolean.parseBoolean(s.trim());
189
190             s = get(properties, "receiveOnIOLoopThread");
191             newReceiveOnIOLoopThread = isNullOrEmpty(s)
192                     ? receiveOnIOLoopThread : Boolean.parseBoolean(s.trim());
193
194         } catch (NumberFormatException | ClassCastException e) {
195             return;
196         }
197
198         boolean modified = newTotalSenderThreads != totalSenderThreads ||
199                 newTotalReceiverThreads != totalReceiverThreads ||
200                 newSerializationOn != serializationOn ||
201                 newReceiveOnIOLoopThread != receiveOnIOLoopThread;
202
203         // If nothing has changed, simply return.
204         if (!modified) {
205             return;
206         }
207
208         totalSenderThreads = newTotalSenderThreads;
209         totalReceiverThreads = newTotalReceiverThreads;
210         serializationOn = newSerializationOn;
211         if (!receiveOnIOLoopThread && newReceiveOnIOLoopThread != receiveOnIOLoopThread) {
212             ((ExecutorService) messageReceivingExecutor).shutdown();
213         }
214         receiveOnIOLoopThread = newReceiveOnIOLoopThread;
215
216         // restart test.
217
218         stopTest();
219         unregisterMessageHandlers();
220         setupCodecs();
221         messageSendingExecutor =
222                 BoundedThreadPool.newFixedThreadPool(
223                         totalSenderThreads,
224                         groupedThreads("onos/net-perf-test", "sender-%d"));
225         messageReceivingExecutor = receiveOnIOLoopThread
226                     ? MoreExecutors.directExecutor()
227                     : Executors.newFixedThreadPool(
228                             totalReceiverThreads,
229                             groupedThreads("onos/net-perf-test", "receiver-%d"));
230
231         registerMessageHandlers();
232         startTest();
233
234         logConfig("Reconfigured");
235     }
236
237
238     private void logConfig(String prefix) {
239         log.info("{} with senderThreadPoolSize = {}; receivingThreadPoolSize = {}"
240                 + " serializationOn = {}, receiveOnIOLoopThread = {}",
241                  prefix,
242                  totalSenderThreads,
243                  totalReceiverThreads,
244                  serializationOn,
245                  receiveOnIOLoopThread);
246     }
247
248     private void setupCodecs() {
249         encoder = serializationOn ? SERIALIZER::encode : d -> dataBytes;
250         decoder = serializationOn ? SERIALIZER::decode : b -> data;
251     }
252
253     private void registerMessageHandlers() {
254         communicationService.<Data>addSubscriber(
255                 TEST_UNICAST_MESSAGE_TOPIC,
256                 decoder,
257                 d -> { received.incrementAndGet(); },
258                 messageReceivingExecutor);
259
260         communicationService.<Data, Data>addSubscriber(
261                 TEST_REQUEST_REPLY_TOPIC,
262                 decoder,
263                 Function.identity(),
264                 encoder,
265                 messageReceivingExecutor);
266     }
267
268     private void unregisterMessageHandlers() {
269         communicationService.removeSubscriber(TEST_UNICAST_MESSAGE_TOPIC);
270         communicationService.removeSubscriber(TEST_REQUEST_REPLY_TOPIC);
271     }
272
273     private void startTest() {
274         IntStream.range(0, totalSenderThreads).forEach(i -> requestReply());
275     }
276
277     private void stopTest() {
278         messageSendingExecutor.shutdown();
279     }
280
281     private void requestReply() {
282         try {
283             attempted.incrementAndGet();
284             CompletableFuture<Data> response =
285                     communicationService.<Data, Data>sendAndReceive(
286                             data,
287                             TEST_REQUEST_REPLY_TOPIC,
288                             encoder,
289                             decoder,
290                             randomPeer());
291             response.whenComplete((result, error) -> {
292                 if (Objects.equals(data, result)) {
293                     completed.incrementAndGet();
294                 }
295                 messageSendingExecutor.submit(this::requestReply);
296             });
297         } catch (Exception e) {
298             e.printStackTrace();
299         }
300     }
301
302     private void unicast() {
303         try {
304             sent.incrementAndGet();
305             communicationService.<Data>unicast(
306                     data,
307                     TEST_UNICAST_MESSAGE_TOPIC,
308                     encoder,
309                     randomPeer());
310         } catch (Exception e) {
311             e.printStackTrace();
312         }
313         messageSendingExecutor.submit(this::unicast);
314     }
315
316     private void broadcast() {
317         try {
318             sent.incrementAndGet();
319             communicationService.<Data>broadcast(
320                     data,
321                     TEST_UNICAST_MESSAGE_TOPIC,
322                     encoder);
323         } catch (Exception e) {
324             e.printStackTrace();
325         }
326         messageSendingExecutor.submit(this::broadcast);
327     }
328
329     private NodeId randomPeer() {
330         return clusterService.getNodes()
331                     .stream()
332                     .filter(node -> clusterService.getLocalNode().equals(node))
333                     .findAny()
334                     .get()
335                     .id();
336     }
337
338     private void reportPerformance() {
339         log.info("Attempted: {} Completed: {}", attempted.getAndSet(0), completed.getAndSet(0));
340     }
341
342     private static class Data {
343         private String stringField;
344         private List<String> listField;
345         private Set<String> setField;
346
347         public Data withStringField(String value) {
348             stringField = value;
349             return this;
350         }
351
352         public Data withListField(List<String> value) {
353             listField = ImmutableList.copyOf(value);
354             return this;
355         }
356
357         public Data withSetField(Set<String> value) {
358             setField = ImmutableSet.copyOf(value);
359             return this;
360         }
361
362         @Override
363         public int hashCode() {
364             return Objects.hash(stringField, listField, setField);
365         }
366
367         @Override
368         public boolean equals(Object other) {
369             if (other instanceof Data) {
370                 Data that = (Data) other;
371                 return Objects.equals(this.stringField, that.stringField) &&
372                 Objects.equals(this.listField, that.listField) &&
373                 Objects.equals(this.setField, that.setField);
374             }
375             return false;
376         }
377     }
378 }