2 * Copyright 2014-2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.store.device.impl;
18 import com.google.common.collect.Iterables;
19 import com.google.common.collect.Sets;
21 import org.easymock.Capture;
22 import org.junit.After;
23 import org.junit.AfterClass;
24 import org.junit.Before;
25 import org.junit.BeforeClass;
26 import org.junit.Ignore;
27 import org.junit.Test;
28 import org.onlab.packet.ChassisId;
29 import org.onlab.packet.IpAddress;
30 import org.onosproject.cluster.ClusterService;
31 import org.onosproject.cluster.ControllerNode;
32 import org.onosproject.cluster.DefaultControllerNode;
33 import org.onosproject.cluster.NodeId;
34 import org.onosproject.mastership.MastershipServiceAdapter;
35 import org.onosproject.net.Annotations;
36 import org.onosproject.net.DefaultAnnotations;
37 import org.onosproject.net.Device;
38 import org.onosproject.net.DeviceId;
39 import org.onosproject.net.MastershipRole;
40 import org.onosproject.net.Port;
41 import org.onosproject.net.PortNumber;
42 import org.onosproject.net.SparseAnnotations;
43 import org.onosproject.net.device.DefaultDeviceDescription;
44 import org.onosproject.net.device.DefaultPortDescription;
45 import org.onosproject.net.device.DeviceClockService;
46 import org.onosproject.net.device.DeviceClockServiceAdapter;
47 import org.onosproject.net.device.DeviceDescription;
48 import org.onosproject.net.device.DeviceEvent;
49 import org.onosproject.net.device.DeviceStore;
50 import org.onosproject.net.device.DeviceStoreDelegate;
51 import org.onosproject.net.device.PortDescription;
52 import org.onosproject.net.provider.ProviderId;
53 import org.onosproject.store.Timestamp;
54 import org.onosproject.store.cluster.StaticClusterService;
55 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
56 import org.onosproject.store.cluster.messaging.ClusterMessage;
57 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
58 import org.onosproject.store.cluster.messaging.MessageSubject;
59 import org.onosproject.store.consistent.impl.DatabaseManager;
60 import org.onosproject.store.impl.MastershipBasedTimestamp;
62 import java.io.IOException;
63 import java.util.Arrays;
64 import java.util.Collections;
65 import java.util.HashMap;
66 import java.util.List;
69 import java.util.concurrent.CompletableFuture;
70 import java.util.concurrent.CountDownLatch;
71 import java.util.concurrent.ExecutorService;
72 import java.util.concurrent.TimeUnit;
73 import java.util.concurrent.atomic.AtomicLong;
74 import java.util.function.Function;
76 import static java.util.Arrays.asList;
77 import static org.easymock.EasyMock.*;
78 import static org.junit.Assert.*;
79 import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
80 import static org.onosproject.net.DefaultAnnotations.union;
81 import static org.onosproject.net.Device.Type.SWITCH;
82 import static org.onosproject.net.DeviceId.deviceId;
83 import static org.onosproject.net.device.DeviceEvent.Type.*;
86 // TODO add tests for remote replication
88 * Test of the gossip based distributed DeviceStore implementation.
90 public class GossipDeviceStoreTest {
92 private static final ProviderId PID = new ProviderId("of", "foo");
93 private static final ProviderId PIDA = new ProviderId("of", "bar", true);
94 private static final DeviceId DID1 = deviceId("of:foo");
95 private static final DeviceId DID2 = deviceId("of:bar");
96 private static final String MFR = "whitebox";
97 private static final String HW = "1.1.x";
98 private static final String SW1 = "3.8.1";
99 private static final String SW2 = "3.9.5";
100 private static final String SN = "43311-12345";
101 private static final ChassisId CID = new ChassisId();
103 private static final PortNumber P1 = PortNumber.portNumber(1);
104 private static final PortNumber P2 = PortNumber.portNumber(2);
105 private static final PortNumber P3 = PortNumber.portNumber(3);
107 private static final SparseAnnotations A1 = DefaultAnnotations.builder()
111 private static final SparseAnnotations A1_2 = DefaultAnnotations.builder()
115 private static final SparseAnnotations A2 = DefaultAnnotations.builder()
119 private static final SparseAnnotations A2_2 = DefaultAnnotations.builder()
125 private static final NodeId NID1 = new NodeId("local");
126 private static final ControllerNode ONOS1 =
127 new DefaultControllerNode(NID1, IpAddress.valueOf("127.0.0.1"));
130 private static final NodeId NID2 = new NodeId("remote");
131 private static final ControllerNode ONOS2 =
132 new DefaultControllerNode(NID2, IpAddress.valueOf("127.0.0.2"));
133 private static final List<SparseAnnotations> NO_ANNOTATION = Collections.<SparseAnnotations>emptyList();
136 private TestGossipDeviceStore testGossipDeviceStore;
137 private GossipDeviceStore gossipDeviceStore;
138 private DeviceStore deviceStore;
140 private DeviceClockService deviceClockService = new TestDeviceClockService();
141 private ClusterCommunicationService clusterCommunicator;
144 public static void setUpBeforeClass() throws Exception {
148 public static void tearDownAfterClass() throws Exception {
153 public void setUp() throws Exception {
154 clusterCommunicator = createNiceMock(ClusterCommunicationService.class);
155 clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
156 anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
157 expectLastCall().anyTimes();
158 replay(clusterCommunicator);
159 ClusterService clusterService = new TestClusterService();
161 testGossipDeviceStore = new TestGossipDeviceStore(deviceClockService, clusterService, clusterCommunicator);
162 testGossipDeviceStore.mastershipService = new TestMastershipService();
164 TestDatabaseManager testDatabaseManager = new TestDatabaseManager();
165 testDatabaseManager.init(clusterService, clusterCommunicator);
166 testGossipDeviceStore.storageService = testDatabaseManager;
167 testGossipDeviceStore.deviceClockService = deviceClockService;
169 gossipDeviceStore = testGossipDeviceStore;
170 gossipDeviceStore.activate();
171 deviceStore = gossipDeviceStore;
172 verify(clusterCommunicator);
173 reset(clusterCommunicator);
177 public void tearDown() throws Exception {
178 gossipDeviceStore.deactivate();
181 private void putDevice(DeviceId deviceId, String swVersion,
182 SparseAnnotations... annotations) {
183 DeviceDescription description =
184 new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
185 HW, swVersion, SN, CID, annotations);
186 reset(clusterCommunicator);
187 clusterCommunicator.<InternalDeviceEvent>broadcast(
188 anyObject(InternalDeviceEvent.class), anyObject(MessageSubject.class), anyObject(Function.class));
189 expectLastCall().anyTimes();
190 replay(clusterCommunicator);
191 deviceStore.createOrUpdateDevice(PID, deviceId, description);
192 verify(clusterCommunicator);
195 private void putDeviceAncillary(DeviceId deviceId, String swVersion,
196 SparseAnnotations... annotations) {
197 DeviceDescription description =
198 new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
199 HW, swVersion, SN, CID, annotations);
200 deviceStore.createOrUpdateDevice(PIDA, deviceId, description);
203 private static void assertDevice(DeviceId id, String swVersion, Device device) {
204 assertNotNull(device);
205 assertEquals(id, device.id());
206 assertEquals(MFR, device.manufacturer());
207 assertEquals(HW, device.hwVersion());
208 assertEquals(swVersion, device.swVersion());
209 assertEquals(SN, device.serialNumber());
213 * Verifies that Annotations created by merging {@code annotations} is
214 * equal to actual Annotations.
216 * @param actual Annotations to check
219 private static void assertAnnotationsEquals(Annotations actual, SparseAnnotations... annotations) {
220 SparseAnnotations expected = DefaultAnnotations.builder().build();
221 for (SparseAnnotations a : annotations) {
222 expected = DefaultAnnotations.union(expected, a);
224 assertEquals(expected.keys(), actual.keys());
225 for (String key : expected.keys()) {
226 assertEquals(expected.value(key), actual.value(key));
230 private static void assertDeviceDescriptionEquals(DeviceDescription expected,
231 DeviceDescription actual) {
232 if (expected == actual) {
235 assertEquals(expected.deviceURI(), actual.deviceURI());
236 assertEquals(expected.hwVersion(), actual.hwVersion());
237 assertEquals(expected.manufacturer(), actual.manufacturer());
238 assertEquals(expected.serialNumber(), actual.serialNumber());
239 assertEquals(expected.swVersion(), actual.swVersion());
241 assertAnnotationsEquals(actual.annotations(), expected.annotations());
244 private static void assertDeviceDescriptionEquals(DeviceDescription expected,
245 List<SparseAnnotations> expectedAnnotations,
246 DeviceDescription actual) {
247 if (expected == actual) {
250 assertEquals(expected.deviceURI(), actual.deviceURI());
251 assertEquals(expected.hwVersion(), actual.hwVersion());
252 assertEquals(expected.manufacturer(), actual.manufacturer());
253 assertEquals(expected.serialNumber(), actual.serialNumber());
254 assertEquals(expected.swVersion(), actual.swVersion());
256 assertAnnotationsEquals(actual.annotations(),
257 expectedAnnotations.toArray(new SparseAnnotations[0]));
261 public final void testGetDeviceCount() {
262 assertEquals("initialy empty", 0, deviceStore.getDeviceCount());
264 putDevice(DID1, SW1);
265 putDevice(DID2, SW2);
266 putDevice(DID1, SW1);
268 assertEquals("expect 2 uniq devices", 2, deviceStore.getDeviceCount());
272 public final void testGetDevices() {
273 assertEquals("initialy empty", 0, Iterables.size(deviceStore.getDevices()));
275 putDevice(DID1, SW1);
276 putDevice(DID2, SW2);
277 putDevice(DID1, SW1);
279 assertEquals("expect 2 uniq devices",
280 2, Iterables.size(deviceStore.getDevices()));
282 Map<DeviceId, Device> devices = new HashMap<>();
283 for (Device device : deviceStore.getDevices()) {
284 devices.put(device.id(), device);
287 assertDevice(DID1, SW1, devices.get(DID1));
288 assertDevice(DID2, SW2, devices.get(DID2));
290 // add case for new node?
294 public final void testGetDevice() {
296 putDevice(DID1, SW1);
298 assertDevice(DID1, SW1, deviceStore.getDevice(DID1));
299 assertNull("DID2 shouldn't be there", deviceStore.getDevice(DID2));
302 private void assertInternalDeviceEvent(NodeId sender,
304 ProviderId providerId,
305 DeviceDescription expectedDesc,
306 Capture<InternalDeviceEvent> actualEvent,
307 Capture<MessageSubject> actualSubject,
308 Capture<Function<InternalDeviceEvent, byte[]>> actualEncoder) {
309 assertTrue(actualEvent.hasCaptured());
310 assertTrue(actualSubject.hasCaptured());
311 assertTrue(actualEncoder.hasCaptured());
313 assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
314 actualSubject.getValue());
315 assertEquals(deviceId, actualEvent.getValue().deviceId());
316 assertEquals(providerId, actualEvent.getValue().providerId());
317 assertDeviceDescriptionEquals(expectedDesc, actualEvent.getValue().deviceDescription().value());
320 private void assertInternalDeviceEvent(NodeId sender,
322 ProviderId providerId,
323 DeviceDescription expectedDesc,
324 List<SparseAnnotations> expectedAnnotations,
325 Capture<InternalDeviceEvent> actualEvent,
326 Capture<MessageSubject> actualSubject,
327 Capture<Function<InternalDeviceEvent, byte[]>> actualEncoder) {
328 assertTrue(actualEvent.hasCaptured());
329 assertTrue(actualSubject.hasCaptured());
330 assertTrue(actualEncoder.hasCaptured());
332 assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
333 actualSubject.getValue());
334 assertEquals(deviceId, actualEvent.getValue().deviceId());
335 assertEquals(providerId, actualEvent.getValue().providerId());
336 assertDeviceDescriptionEquals(
339 actualEvent.getValue().deviceDescription().value());
343 public final void testCreateOrUpdateDevice() throws IOException {
344 DeviceDescription description =
345 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
347 Capture<InternalDeviceEvent> message = new Capture<>();
348 Capture<MessageSubject> subject = new Capture<>();
349 Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
351 resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
352 DeviceEvent event = deviceStore.createOrUpdateDevice(PID, DID1, description);
353 assertEquals(DEVICE_ADDED, event.type());
354 assertDevice(DID1, SW1, event.subject());
355 verify(clusterCommunicator);
356 assertInternalDeviceEvent(NID1, DID1, PID, description, message, subject, encoder);
359 DeviceDescription description2 =
360 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
362 resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
363 DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
364 assertEquals(DEVICE_UPDATED, event2.type());
365 assertDevice(DID1, SW2, event2.subject());
367 verify(clusterCommunicator);
368 assertInternalDeviceEvent(NID1, DID1, PID, description2, message, subject, encoder);
369 reset(clusterCommunicator);
371 assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
375 public final void testCreateOrUpdateDeviceAncillary() throws IOException {
377 DeviceDescription description =
378 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
379 HW, SW1, SN, CID, A2);
380 Capture<ClusterMessage> bcast = new Capture<>();
382 Capture<InternalDeviceEvent> message = new Capture<>();
383 Capture<MessageSubject> subject = new Capture<>();
384 Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
386 resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
387 DeviceEvent event = deviceStore.createOrUpdateDevice(PIDA, DID1, description);
388 assertEquals(DEVICE_ADDED, event.type());
389 assertDevice(DID1, SW1, event.subject());
390 assertEquals(PIDA, event.subject().providerId());
391 assertAnnotationsEquals(event.subject().annotations(), A2);
392 assertFalse("Ancillary will not bring device up", deviceStore.isAvailable(DID1));
393 verify(clusterCommunicator);
394 assertInternalDeviceEvent(NID1, DID1, PIDA, description, message, subject, encoder);
396 // update from primary
397 DeviceDescription description2 =
398 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
399 HW, SW2, SN, CID, A1);
400 resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
402 DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
403 assertEquals(DEVICE_UPDATED, event2.type());
404 assertDevice(DID1, SW2, event2.subject());
405 assertEquals(PID, event2.subject().providerId());
406 assertAnnotationsEquals(event2.subject().annotations(), A1, A2);
407 assertTrue(deviceStore.isAvailable(DID1));
408 verify(clusterCommunicator);
409 assertInternalDeviceEvent(NID1, DID1, PID, description2, message, subject, encoder);
411 // no-op update from primary
412 resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
413 assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
415 verify(clusterCommunicator);
416 assertFalse("no broadcast expected", bcast.hasCaptured());
418 // For now, Ancillary is ignored once primary appears
419 resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
421 assertNull("No change expected", deviceStore.createOrUpdateDevice(PIDA, DID1, description));
423 verify(clusterCommunicator);
424 assertFalse("no broadcast expected", bcast.hasCaptured());
426 // But, Ancillary annotations will be in effect
427 DeviceDescription description3 =
428 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
429 HW, SW1, SN, CID, A2_2);
430 resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
432 DeviceEvent event3 = deviceStore.createOrUpdateDevice(PIDA, DID1, description3);
433 assertEquals(DEVICE_UPDATED, event3.type());
434 // basic information will be the one from Primary
435 assertDevice(DID1, SW2, event3.subject());
436 assertEquals(PID, event3.subject().providerId());
437 // but annotation from Ancillary will be merged
438 assertAnnotationsEquals(event3.subject().annotations(), A1, A2, A2_2);
439 assertTrue(deviceStore.isAvailable(DID1));
440 verify(clusterCommunicator);
441 // note: only annotation from PIDA is sent over the wire
442 assertInternalDeviceEvent(NID1, DID1, PIDA, description3,
443 asList(union(A2, A2_2)), message, subject, encoder);
449 public final void testMarkOffline() {
451 putDevice(DID1, SW1);
452 assertTrue(deviceStore.isAvailable(DID1));
454 Capture<InternalDeviceEvent> message = new Capture<>();
455 Capture<MessageSubject> subject = new Capture<>();
456 Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
458 resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
459 DeviceEvent event = deviceStore.markOffline(DID1);
460 assertEquals(DEVICE_AVAILABILITY_CHANGED, event.type());
461 assertDevice(DID1, SW1, event.subject());
462 assertFalse(deviceStore.isAvailable(DID1));
463 verify(clusterCommunicator);
464 // TODO: verify broadcast message
465 assertTrue(message.hasCaptured());
468 resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
469 DeviceEvent event2 = deviceStore.markOffline(DID1);
470 assertNull("No change, no event", event2);
471 verify(clusterCommunicator);
472 assertFalse(message.hasCaptured());
476 public final void testUpdatePorts() {
477 putDevice(DID1, SW1);
478 List<PortDescription> pds = Arrays.<PortDescription>asList(
479 new DefaultPortDescription(P1, true),
480 new DefaultPortDescription(P2, true)
482 Capture<InternalDeviceEvent> message = new Capture<>();
483 Capture<MessageSubject> subject = new Capture<>();
484 Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
486 resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
487 List<DeviceEvent> events = deviceStore.updatePorts(PID, DID1, pds);
488 verify(clusterCommunicator);
489 // TODO: verify broadcast message
490 assertTrue(message.hasCaptured());
492 Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
493 for (DeviceEvent event : events) {
494 assertEquals(PORT_ADDED, event.type());
495 assertDevice(DID1, SW1, event.subject());
496 assertTrue("PortNumber is one of expected",
497 expectedPorts.remove(event.port().number()));
498 assertTrue("Port is enabled", event.port().isEnabled());
500 assertTrue("Event for all expectedport appeared", expectedPorts.isEmpty());
503 List<PortDescription> pds2 = Arrays.<PortDescription>asList(
504 new DefaultPortDescription(P1, false),
505 new DefaultPortDescription(P2, true),
506 new DefaultPortDescription(P3, true)
509 resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
510 events = deviceStore.updatePorts(PID, DID1, pds2);
511 verify(clusterCommunicator);
512 // TODO: verify broadcast message
513 assertTrue(message.hasCaptured());
515 assertFalse("event should be triggered", events.isEmpty());
516 for (DeviceEvent event : events) {
517 PortNumber num = event.port().number();
518 if (P1.equals(num)) {
519 assertEquals(PORT_UPDATED, event.type());
520 assertDevice(DID1, SW1, event.subject());
521 assertFalse("Port is disabled", event.port().isEnabled());
522 } else if (P2.equals(num)) {
523 fail("P2 event not expected.");
524 } else if (P3.equals(num)) {
525 assertEquals(PORT_ADDED, event.type());
526 assertDevice(DID1, SW1, event.subject());
527 assertTrue("Port is enabled", event.port().isEnabled());
529 fail("Unknown port number encountered: " + num);
533 List<PortDescription> pds3 = Arrays.<PortDescription>asList(
534 new DefaultPortDescription(P1, false),
535 new DefaultPortDescription(P2, true)
537 resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
538 events = deviceStore.updatePorts(PID, DID1, pds3);
539 verify(clusterCommunicator);
540 // TODO: verify broadcast message
541 assertTrue(message.hasCaptured());
543 assertFalse("event should be triggered", events.isEmpty());
544 for (DeviceEvent event : events) {
545 PortNumber num = event.port().number();
546 if (P1.equals(num)) {
547 fail("P1 event not expected.");
548 } else if (P2.equals(num)) {
549 fail("P2 event not expected.");
550 } else if (P3.equals(num)) {
551 assertEquals(PORT_REMOVED, event.type());
552 assertDevice(DID1, SW1, event.subject());
553 assertTrue("Port was enabled", event.port().isEnabled());
555 fail("Unknown port number encountered: " + num);
561 public final void testUpdatePortStatus() {
562 putDevice(DID1, SW1);
563 List<PortDescription> pds = Arrays.<PortDescription>asList(
564 new DefaultPortDescription(P1, true)
566 deviceStore.updatePorts(PID, DID1, pds);
568 Capture<InternalPortStatusEvent> message = new Capture<>();
569 Capture<MessageSubject> subject = new Capture<>();
570 Capture<Function<InternalPortStatusEvent, byte[]>> encoder = new Capture<>();
572 resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
573 final DefaultPortDescription desc = new DefaultPortDescription(P1, false);
574 DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc);
575 assertEquals(PORT_UPDATED, event.type());
576 assertDevice(DID1, SW1, event.subject());
577 assertEquals(P1, event.port().number());
578 assertFalse("Port is disabled", event.port().isEnabled());
579 verify(clusterCommunicator);
580 assertInternalPortStatusEvent(NID1, DID1, PID, desc, NO_ANNOTATION, message, subject, encoder);
581 assertTrue(message.hasCaptured());
585 public final void testUpdatePortStatusAncillary() throws IOException {
586 putDeviceAncillary(DID1, SW1);
587 putDevice(DID1, SW1);
588 List<PortDescription> pds = Arrays.<PortDescription>asList(
589 new DefaultPortDescription(P1, true, A1)
591 deviceStore.updatePorts(PID, DID1, pds);
593 Capture<InternalPortStatusEvent> message = new Capture<>();
594 Capture<MessageSubject> subject = new Capture<>();
595 Capture<Function<InternalPortStatusEvent, byte[]>> encoder = new Capture<>();
597 // update port from primary
598 resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
600 final DefaultPortDescription desc1 = new DefaultPortDescription(P1, false, A1_2);
601 DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc1);
602 assertEquals(PORT_UPDATED, event.type());
603 assertDevice(DID1, SW1, event.subject());
604 assertEquals(P1, event.port().number());
605 assertAnnotationsEquals(event.port().annotations(), A1, A1_2);
606 assertFalse("Port is disabled", event.port().isEnabled());
607 verify(clusterCommunicator);
608 assertInternalPortStatusEvent(NID1, DID1, PID, desc1, asList(A1, A1_2), message, subject, encoder);
609 assertTrue(message.hasCaptured());
611 // update port from ancillary with no attributes
612 resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
613 final DefaultPortDescription desc2 = new DefaultPortDescription(P1, true);
614 DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1, desc2);
615 assertNull("Ancillary is ignored if primary exists", event2);
616 verify(clusterCommunicator);
617 assertFalse(message.hasCaptured());
619 // but, Ancillary annotation update will be notified
620 resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
621 final DefaultPortDescription desc3 = new DefaultPortDescription(P1, true, A2);
622 DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1, desc3);
623 assertEquals(PORT_UPDATED, event3.type());
624 assertDevice(DID1, SW1, event3.subject());
625 assertEquals(P1, event3.port().number());
626 assertAnnotationsEquals(event3.port().annotations(), A1, A1_2, A2);
627 assertFalse("Port is disabled", event3.port().isEnabled());
628 verify(clusterCommunicator);
629 assertInternalPortStatusEvent(NID1, DID1, PIDA, desc3, asList(A2), message, subject, encoder);
630 assertTrue(message.hasCaptured());
632 // port only reported from Ancillary will be notified as down
633 resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
634 final DefaultPortDescription desc4 = new DefaultPortDescription(P2, true);
635 DeviceEvent event4 = deviceStore.updatePortStatus(PIDA, DID1, desc4);
636 assertEquals(PORT_ADDED, event4.type());
637 assertDevice(DID1, SW1, event4.subject());
638 assertEquals(P2, event4.port().number());
639 assertAnnotationsEquals(event4.port().annotations());
640 assertFalse("Port is disabled if not given from primary provider",
641 event4.port().isEnabled());
642 verify(clusterCommunicator);
643 // TODO: verify broadcast message content
644 assertInternalPortStatusEvent(NID1, DID1, PIDA, desc4, NO_ANNOTATION, message, subject, encoder);
645 assertTrue(message.hasCaptured());
648 private void assertInternalPortStatusEvent(NodeId sender,
651 DefaultPortDescription expectedDesc,
652 List<SparseAnnotations> expectedAnnotations,
653 Capture<InternalPortStatusEvent> actualEvent,
654 Capture<MessageSubject> actualSubject,
655 Capture<Function<InternalPortStatusEvent, byte[]>> actualEncoder) {
657 assertTrue(actualEvent.hasCaptured());
658 assertTrue(actualSubject.hasCaptured());
659 assertTrue(actualEncoder.hasCaptured());
661 assertEquals(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
662 actualSubject.getValue());
663 assertEquals(did, actualEvent.getValue().deviceId());
664 assertEquals(pid, actualEvent.getValue().providerId());
665 assertPortDescriptionEquals(expectedDesc, expectedAnnotations,
666 actualEvent.getValue().portDescription().value());
669 private void assertPortDescriptionEquals(
670 PortDescription expectedDesc,
671 List<SparseAnnotations> expectedAnnotations,
672 PortDescription actual) {
674 assertEquals(expectedDesc.portNumber(), actual.portNumber());
675 assertEquals(expectedDesc.isEnabled(), actual.isEnabled());
677 assertAnnotationsEquals(actual.annotations(),
678 expectedAnnotations.toArray(new SparseAnnotations[0]));
681 private <T> void resetCommunicatorExpectingNoBroadcast(
683 Capture<MessageSubject> subject,
684 Capture<Function<T, byte[]>> encoder) {
688 reset(clusterCommunicator);
689 replay(clusterCommunicator);
692 private <T> void resetCommunicatorExpectingSingleBroadcast(
694 Capture<MessageSubject> subject,
695 Capture<Function<T, byte[]>> encoder) {
700 reset(clusterCommunicator);
701 clusterCommunicator.broadcast(
705 expectLastCall().once();
706 replay(clusterCommunicator);
710 public final void testGetPorts() {
711 putDevice(DID1, SW1);
712 putDevice(DID2, SW1);
713 List<PortDescription> pds = Arrays.<PortDescription>asList(
714 new DefaultPortDescription(P1, true),
715 new DefaultPortDescription(P2, true)
717 deviceStore.updatePorts(PID, DID1, pds);
719 Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
720 List<Port> ports = deviceStore.getPorts(DID1);
721 for (Port port : ports) {
722 assertTrue("Port is enabled", port.isEnabled());
723 assertTrue("PortNumber is one of expected",
724 expectedPorts.remove(port.number()));
726 assertTrue("Event for all expectedport appeared", expectedPorts.isEmpty());
729 assertTrue("DID2 has no ports", deviceStore.getPorts(DID2).isEmpty());
733 public final void testGetPort() {
734 putDevice(DID1, SW1);
735 putDevice(DID2, SW1);
736 List<PortDescription> pds = Arrays.<PortDescription>asList(
737 new DefaultPortDescription(P1, true),
738 new DefaultPortDescription(P2, false)
740 deviceStore.updatePorts(PID, DID1, pds);
742 Port port1 = deviceStore.getPort(DID1, P1);
743 assertEquals(P1, port1.number());
744 assertTrue("Port is enabled", port1.isEnabled());
746 Port port2 = deviceStore.getPort(DID1, P2);
747 assertEquals(P2, port2.number());
748 assertFalse("Port is disabled", port2.isEnabled());
750 Port port3 = deviceStore.getPort(DID1, P3);
751 assertNull("P3 not expected", port3);
755 public final void testRemoveDevice() {
756 putDevice(DID1, SW1, A1);
757 List<PortDescription> pds = Arrays.<PortDescription>asList(
758 new DefaultPortDescription(P1, true, A2)
760 deviceStore.updatePorts(PID, DID1, pds);
761 putDevice(DID2, SW1);
763 assertEquals(2, deviceStore.getDeviceCount());
764 assertEquals(1, deviceStore.getPorts(DID1).size());
765 assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations(), A1);
766 assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations(), A2);
768 Capture<InternalDeviceEvent> message = new Capture<>();
769 Capture<MessageSubject> subject = new Capture<>();
770 Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
772 resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
774 DeviceEvent event = deviceStore.removeDevice(DID1);
775 assertEquals(DEVICE_REMOVED, event.type());
776 assertDevice(DID1, SW1, event.subject());
778 assertEquals(1, deviceStore.getDeviceCount());
779 assertEquals(0, deviceStore.getPorts(DID1).size());
780 verify(clusterCommunicator);
781 // TODO: verify broadcast message
782 assertTrue(message.hasCaptured());
784 // putBack Device, Port w/o annotation
785 putDevice(DID1, SW1);
786 List<PortDescription> pds2 = Arrays.<PortDescription>asList(
787 new DefaultPortDescription(P1, true)
789 deviceStore.updatePorts(PID, DID1, pds2);
791 // annotations should not survive
792 assertEquals(2, deviceStore.getDeviceCount());
793 assertEquals(1, deviceStore.getPorts(DID1).size());
794 assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations());
795 assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations());
798 // If Delegates should be called only on remote events,
799 // then Simple* should never call them, thus not test required.
800 // TODO add test for Port events when we have them
801 @Ignore("Ignore until Delegate spec. is clear.")
803 public final void testEvents() throws InterruptedException {
804 final CountDownLatch addLatch = new CountDownLatch(1);
805 DeviceStoreDelegate checkAdd = event -> {
806 assertEquals(DEVICE_ADDED, event.type());
807 assertDevice(DID1, SW1, event.subject());
808 addLatch.countDown();
810 final CountDownLatch updateLatch = new CountDownLatch(1);
811 DeviceStoreDelegate checkUpdate = event -> {
812 assertEquals(DEVICE_UPDATED, event.type());
813 assertDevice(DID1, SW2, event.subject());
814 updateLatch.countDown();
816 final CountDownLatch removeLatch = new CountDownLatch(1);
817 DeviceStoreDelegate checkRemove = event -> {
818 assertEquals(DEVICE_REMOVED, event.type());
819 assertDevice(DID1, SW2, event.subject());
820 removeLatch.countDown();
823 DeviceDescription description =
824 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
826 deviceStore.setDelegate(checkAdd);
827 deviceStore.createOrUpdateDevice(PID, DID1, description);
828 assertTrue("Add event fired", addLatch.await(1, TimeUnit.SECONDS));
831 DeviceDescription description2 =
832 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
834 deviceStore.unsetDelegate(checkAdd);
835 deviceStore.setDelegate(checkUpdate);
836 deviceStore.createOrUpdateDevice(PID, DID1, description2);
837 assertTrue("Update event fired", updateLatch.await(1, TimeUnit.SECONDS));
839 deviceStore.unsetDelegate(checkUpdate);
840 deviceStore.setDelegate(checkRemove);
841 deviceStore.removeDevice(DID1);
842 assertTrue("Remove event fired", removeLatch.await(1, TimeUnit.SECONDS));
845 private final class TestMastershipService extends MastershipServiceAdapter {
847 public NodeId getMasterFor(DeviceId deviceId) {
851 public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
852 return CompletableFuture.completedFuture(null);
856 private static final class TestGossipDeviceStore extends GossipDeviceStore {
858 public TestGossipDeviceStore(
859 DeviceClockService deviceClockService,
860 ClusterService clusterService,
861 ClusterCommunicationService clusterCommunicator) {
862 this.deviceClockService = deviceClockService;
863 this.clusterService = clusterService;
864 this.clusterCommunicator = clusterCommunicator;
868 private static final class TestClusterService extends StaticClusterService {
870 public TestClusterService() {
872 nodes.put(NID1, ONOS1);
873 nodeStates.put(NID1, ACTIVE);
875 nodes.put(NID2, ONOS2);
876 nodeStates.put(NID2, ACTIVE);
880 private final class TestDeviceClockService extends DeviceClockServiceAdapter {
882 private final AtomicLong ticker = new AtomicLong();
885 public Timestamp getTimestamp(DeviceId deviceId) {
886 if (DID1.equals(deviceId)) {
887 return new MastershipBasedTimestamp(1, ticker.getAndIncrement());
888 } else if (DID2.equals(deviceId)) {
889 return new MastershipBasedTimestamp(2, ticker.getAndIncrement());
891 throw new IllegalStateException();
896 public boolean isTimestampAvailable(DeviceId deviceId) {
897 return DID1.equals(deviceId) || DID2.equals(deviceId);
901 private class TestDatabaseManager extends DatabaseManager {
902 void init(ClusterService clusterService,
903 ClusterCommunicationService clusterCommunicator) {
904 this.clusterService = clusterService;
905 this.clusterCommunicator = clusterCommunicator;