2  * Copyright 2015 Open Networking Laboratory
 
   4  * Licensed under the Apache License, Version 2.0 (the "License");
 
   5  * you may not use this file except in compliance with the License.
 
   6  * You may obtain a copy of the License at
 
   8  *     http://www.apache.org/licenses/LICENSE-2.0
 
  10  * Unless required by applicable law or agreed to in writing, software
 
  11  * distributed under the License is distributed on an "AS IS" BASIS,
 
  12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  13  * See the License for the specific language governing permissions and
 
  14  * limitations under the License.
 
  16 package org.onosproject.store.ecmap;
 
  18 import java.util.ArrayList;
 
  19 import java.util.Collection;
 
  20 import java.util.HashMap;
 
  21 import java.util.HashSet;
 
  22 import java.util.List;
 
  24 import java.util.Objects;
 
  25 import java.util.Optional;
 
  27 import java.util.concurrent.CompletableFuture;
 
  28 import java.util.concurrent.CountDownLatch;
 
  29 import java.util.concurrent.Executor;
 
  30 import java.util.concurrent.TimeUnit;
 
  31 import java.util.concurrent.atomic.AtomicLong;
 
  32 import java.util.function.Consumer;
 
  33 import java.util.function.Function;
 
  35 import org.junit.After;
 
  36 import org.junit.Before;
 
  37 import org.junit.Test;
 
  38 import org.onlab.packet.IpAddress;
 
  39 import org.onlab.util.KryoNamespace;
 
  40 import org.onosproject.cluster.ClusterService;
 
  41 import org.onosproject.cluster.ControllerNode;
 
  42 import org.onosproject.cluster.DefaultControllerNode;
 
  43 import org.onosproject.cluster.NodeId;
 
  44 import org.onosproject.event.AbstractEvent;
 
  45 import org.onosproject.store.Timestamp;
 
  46 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 
  47 import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
 
  48 import org.onosproject.store.cluster.messaging.MessageSubject;
 
  49 import org.onosproject.store.impl.LogicalTimestamp;
 
  50 import org.onosproject.store.serializers.KryoNamespaces;
 
  51 import org.onosproject.store.serializers.KryoSerializer;
 
  52 import org.onosproject.store.service.EventuallyConsistentMap;
 
  53 import org.onosproject.store.service.EventuallyConsistentMapEvent;
 
  54 import org.onosproject.store.service.EventuallyConsistentMapListener;
 
  55 import org.onosproject.store.service.WallClockTimestamp;
 
  57 import com.google.common.collect.ComparisonChain;
 
  58 import com.google.common.collect.ImmutableList;
 
  59 import com.google.common.collect.ImmutableSet;
 
  60 import com.google.common.util.concurrent.MoreExecutors;
 
  62 import static com.google.common.base.Preconditions.checkArgument;
 
  63 import static junit.framework.TestCase.assertFalse;
 
  64 import static org.easymock.EasyMock.anyObject;
 
  65 import static org.easymock.EasyMock.createMock;
 
  66 import static org.easymock.EasyMock.eq;
 
  67 import static org.easymock.EasyMock.expect;
 
  68 import static org.easymock.EasyMock.expectLastCall;
 
  69 import static org.easymock.EasyMock.replay;
 
  70 import static org.easymock.EasyMock.reset;
 
  71 import static org.easymock.EasyMock.verify;
 
  72 import static org.junit.Assert.assertEquals;
 
  73 import static org.junit.Assert.assertNull;
 
  74 import static org.junit.Assert.assertTrue;
 
  75 import static org.junit.Assert.fail;
 
  78  * Unit tests for EventuallyConsistentMapImpl.
 
  80 public class EventuallyConsistentMapImplTest {
 
  82     private EventuallyConsistentMap<String, String> ecMap;
 
  84     private ClusterService clusterService;
 
  85     private ClusterCommunicationService clusterCommunicator;
 
  86     private SequentialClockService<String, String> clockService;
 
  88     private static final String MAP_NAME = "test";
 
  89     private static final MessageSubject UPDATE_MESSAGE_SUBJECT
 
  90             = new MessageSubject("ecm-" + MAP_NAME + "-update");
 
  91     private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
 
  92             = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
 
  94     private static final String KEY1 = "one";
 
  95     private static final String KEY2 = "two";
 
  96     private static final String VALUE1 = "oneValue";
 
  97     private static final String VALUE2 = "twoValue";
 
  99     private final ControllerNode self =
 
 100             new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
 
 102     private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
 
 103     private Consumer<AntiEntropyAdvertisement<String>> antiEntropyHandler;
 
 106      * Serialization is a bit tricky here. We need to serialize in the tests
 
 107      * to set the expectations, which will use this serializer here, but the
 
 108      * EventuallyConsistentMap will use its own internal serializer. This means
 
 109      * this serializer must be set up exactly the same as map's internal
 
 112     private static final KryoSerializer SERIALIZER = new KryoSerializer() {
 
 114         protected void setupKryoPool() {
 
 115             serializerPool = KryoNamespace.newBuilder()
 
 116                     // Classes we give to the map
 
 117                     .register(KryoNamespaces.API)
 
 118                     .register(TestTimestamp.class)
 
 119                     // Below is the classes that the map internally registers
 
 120                     .register(LogicalTimestamp.class)
 
 121                     .register(WallClockTimestamp.class)
 
 122                     .register(ArrayList.class)
 
 123                     .register(AntiEntropyAdvertisement.class)
 
 124                     .register(HashMap.class)
 
 125                     .register(Optional.class)
 
 131     public void setUp() throws Exception {
 
 132         clusterService = createMock(ClusterService.class);
 
 133         expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
 
 134         expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
 
 135         replay(clusterService);
 
 137         clusterCommunicator = createMock(ClusterCommunicationService.class);
 
 139         // Add expectation for adding cluster message subscribers which
 
 140         // delegate to our ClusterCommunicationService implementation. This
 
 141         // allows us to get a reference to the map's internal cluster message
 
 142         // handlers so we can induce events coming in from a peer.
 
 143         clusterCommunicator.<String>addSubscriber(anyObject(MessageSubject.class),
 
 144                 anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
 
 145         expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(2);
 
 147         replay(clusterCommunicator);
 
 149         clockService = new SequentialClockService<>();
 
 151         KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
 
 152                 .register(KryoNamespaces.API)
 
 153                 .register(TestTimestamp.class);
 
 155         ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
 
 156                         clusterService, clusterCommunicator)
 
 158                 .withSerializer(serializer)
 
 159                 .withTimestampProvider((k, v) -> clockService.getTimestamp(k, v))
 
 160                 .withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
 
 163         // Reset ready for tests to add their own expectations
 
 164         reset(clusterCommunicator);
 
 168     public void tearDown() {
 
 169         reset(clusterCommunicator);
 
 173     @SuppressWarnings("unchecked")
 
 174     private EventuallyConsistentMapListener<String, String> getListener() {
 
 175         return createMock(EventuallyConsistentMapListener.class);
 
 179     public void testSize() throws Exception {
 
 180         expectPeerMessage(clusterCommunicator);
 
 182         assertEquals(0, ecMap.size());
 
 183         ecMap.put(KEY1, VALUE1);
 
 184         assertEquals(1, ecMap.size());
 
 185         ecMap.put(KEY1, VALUE2);
 
 186         assertEquals(1, ecMap.size());
 
 187         ecMap.put(KEY2, VALUE2);
 
 188         assertEquals(2, ecMap.size());
 
 189         for (int i = 0; i < 10; i++) {
 
 190             ecMap.put("" + i, "" + i);
 
 192         assertEquals(12, ecMap.size());
 
 194         assertEquals(11, ecMap.size());
 
 196         assertEquals(11, ecMap.size());
 
 200     public void testIsEmpty() throws Exception {
 
 201         expectPeerMessage(clusterCommunicator);
 
 203         assertTrue(ecMap.isEmpty());
 
 204         ecMap.put(KEY1, VALUE1);
 
 205         assertFalse(ecMap.isEmpty());
 
 207         assertTrue(ecMap.isEmpty());
 
 211     public void testContainsKey() throws Exception {
 
 212         expectPeerMessage(clusterCommunicator);
 
 214         assertFalse(ecMap.containsKey(KEY1));
 
 215         ecMap.put(KEY1, VALUE1);
 
 216         assertTrue(ecMap.containsKey(KEY1));
 
 217         assertFalse(ecMap.containsKey(KEY2));
 
 219         assertFalse(ecMap.containsKey(KEY1));
 
 223     public void testContainsValue() throws Exception {
 
 224         expectPeerMessage(clusterCommunicator);
 
 226         assertFalse(ecMap.containsValue(VALUE1));
 
 227         ecMap.put(KEY1, VALUE1);
 
 228         assertTrue(ecMap.containsValue(VALUE1));
 
 229         assertFalse(ecMap.containsValue(VALUE2));
 
 230         ecMap.put(KEY1, VALUE2);
 
 231         assertFalse(ecMap.containsValue(VALUE1));
 
 232         assertTrue(ecMap.containsValue(VALUE2));
 
 234         assertFalse(ecMap.containsValue(VALUE2));
 
 238     public void testGet() throws Exception {
 
 239         expectPeerMessage(clusterCommunicator);
 
 241         CountDownLatch latch;
 
 244         assertNull(ecMap.get(KEY1));
 
 245         ecMap.put(KEY1, VALUE1);
 
 246         assertEquals(VALUE1, ecMap.get(KEY1));
 
 249         List<UpdateEntry<String, String>> message
 
 250                 = ImmutableList.of(generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2)));
 
 252         // Create a latch so we know when the put operation has finished
 
 253         latch = new CountDownLatch(1);
 
 254         ecMap.addListener(new TestListener(latch));
 
 256         assertNull(ecMap.get(KEY2));
 
 257         updateHandler.accept(message);
 
 258         assertTrue("External listener never got notified of internal event",
 
 259                    latch.await(100, TimeUnit.MILLISECONDS));
 
 260         assertEquals(VALUE2, ecMap.get(KEY2));
 
 264         assertNull(ecMap.get(KEY2));
 
 267         message = ImmutableList.of(generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1)));
 
 269         // Create a latch so we know when the remove operation has finished
 
 270         latch = new CountDownLatch(1);
 
 271         ecMap.addListener(new TestListener(latch));
 
 273         updateHandler.accept(message);
 
 274         assertTrue("External listener never got notified of internal event",
 
 275                    latch.await(100, TimeUnit.MILLISECONDS));
 
 276         assertNull(ecMap.get(KEY1));
 
 280     public void testPut() throws Exception {
 
 281         // Set up expectations of external events to be sent to listeners during
 
 282         // the test. These don't use timestamps so we can set them all up at once.
 
 283         EventuallyConsistentMapListener<String, String> listener
 
 285         listener.event(new EventuallyConsistentMapEvent<>(
 
 286                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
 
 287         listener.event(new EventuallyConsistentMapEvent<>(
 
 288                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
 
 291         ecMap.addListener(listener);
 
 293         // Set up expected internal message to be broadcast to peers on first put
 
 294         expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
 
 295                 .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
 298         assertNull(ecMap.get(KEY1));
 
 299         ecMap.put(KEY1, VALUE1);
 
 300         assertEquals(VALUE1, ecMap.get(KEY1));
 
 302         verify(clusterCommunicator);
 
 304         // Set up expected internal message to be broadcast to peers on second put
 
 305         expectSpecificMulticastMessage(generatePutMessage(
 
 306                 KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
 308         // Update same key to a new value
 
 309         ecMap.put(KEY1, VALUE2);
 
 310         assertEquals(VALUE2, ecMap.get(KEY1));
 
 312         verify(clusterCommunicator);
 
 314         // Do a put with a older timestamp than the value already there.
 
 315         // The map data should not be changed and no notifications should be sent.
 
 316         reset(clusterCommunicator);
 
 317         replay(clusterCommunicator);
 
 319         clockService.turnBackTime();
 
 320         ecMap.put(KEY1, VALUE1);
 
 321         // Value should not have changed.
 
 322         assertEquals(VALUE2, ecMap.get(KEY1));
 
 324         verify(clusterCommunicator);
 
 326         // Check that our listener received the correct events during the test
 
 331     public void testRemove() throws Exception {
 
 332         // Set up expectations of external events to be sent to listeners during
 
 333         // the test. These don't use timestamps so we can set them all up at once.
 
 334         EventuallyConsistentMapListener<String, String> listener
 
 336         listener.event(new EventuallyConsistentMapEvent<>(
 
 337                 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
 
 338         listener.event(new EventuallyConsistentMapEvent<>(
 
 339                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
 
 340         listener.event(new EventuallyConsistentMapEvent<>(
 
 341                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
 
 344         ecMap.addListener(listener);
 
 346         // Put in an initial value
 
 347         expectPeerMessage(clusterCommunicator);
 
 348         ecMap.put(KEY1, VALUE1);
 
 349         assertEquals(VALUE1, ecMap.get(KEY1));
 
 351         // Remove the value and check the correct internal cluster messages
 
 353         expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
 
 354                 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
 357         assertNull(ecMap.get(KEY1));
 
 359         verify(clusterCommunicator);
 
 361         // Remove the same value again. Even though the value is no longer in
 
 362         // the map, we expect that the tombstone is updated and another remove
 
 363         // event is sent to the cluster and external listeners.
 
 364         expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
 
 365                 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
 368         assertNull(ecMap.get(KEY1));
 
 370         verify(clusterCommunicator);
 
 373         // Put in a new value for us to try and remove
 
 374         expectPeerMessage(clusterCommunicator);
 
 376         ecMap.put(KEY2, VALUE2);
 
 378         clockService.turnBackTime();
 
 380         // Remove should have no effect, since it has an older timestamp than
 
 381         // the put. Expect no notifications to be sent out
 
 382         reset(clusterCommunicator);
 
 383         replay(clusterCommunicator);
 
 387         verify(clusterCommunicator);
 
 389         // Check that our listener received the correct events during the test
 
 394     public void testCompute() throws Exception {
 
 395         // Set up expectations of external events to be sent to listeners during
 
 396         // the test. These don't use timestamps so we can set them all up at once.
 
 397         EventuallyConsistentMapListener<String, String> listener
 
 399         listener.event(new EventuallyConsistentMapEvent<>(
 
 400                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
 
 401         listener.event(new EventuallyConsistentMapEvent<>(
 
 402                 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
 
 403         listener.event(new EventuallyConsistentMapEvent<>(
 
 404                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
 
 407         ecMap.addListener(listener);
 
 409         // Put in an initial value
 
 410         expectPeerMessage(clusterCommunicator);
 
 411         ecMap.compute(KEY1, (k, v) -> VALUE1);
 
 412         assertEquals(VALUE1, ecMap.get(KEY1));
 
 414         // Remove the value and check the correct internal cluster messages
 
 416         expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
 
 417                 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
 419         ecMap.compute(KEY1, (k, v) -> null);
 
 420         assertNull(ecMap.get(KEY1));
 
 422         verify(clusterCommunicator);
 
 424         // Remove the same value again. Even though the value is no longer in
 
 425         // the map, we expect that the tombstone is updated and another remove
 
 426         // event is sent to the cluster and external listeners.
 
 427         expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
 
 428                 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
 430         ecMap.compute(KEY1, (k, v) -> null);
 
 431         assertNull(ecMap.get(KEY1));
 
 433         verify(clusterCommunicator);
 
 435         // Put in a new value for us to try and remove
 
 436         expectPeerMessage(clusterCommunicator);
 
 438         ecMap.compute(KEY2, (k, v) -> VALUE2);
 
 440         clockService.turnBackTime();
 
 442         // Remove should have no effect, since it has an older timestamp than
 
 443         // the put. Expect no notifications to be sent out
 
 444         reset(clusterCommunicator);
 
 445         replay(clusterCommunicator);
 
 447         ecMap.compute(KEY2, (k, v) -> null);
 
 449         verify(clusterCommunicator);
 
 451         // Check that our listener received the correct events during the test
 
 456     public void testPutAll() throws Exception {
 
 457         // putAll() with an empty map is a no-op - no messages will be sent
 
 458         reset(clusterCommunicator);
 
 459         replay(clusterCommunicator);
 
 461         ecMap.putAll(new HashMap<>());
 
 463         verify(clusterCommunicator);
 
 465         // Set up the listener with our expected events
 
 466         EventuallyConsistentMapListener<String, String> listener
 
 468         listener.event(new EventuallyConsistentMapEvent<>(
 
 469                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
 
 470         listener.event(new EventuallyConsistentMapEvent<>(
 
 471                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
 
 474         ecMap.addListener(listener);
 
 476         // Expect a multi-update inter-instance message
 
 477         expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
 
 478                                        clusterCommunicator);
 
 480         Map<String, String> putAllValues = new HashMap<>();
 
 481         putAllValues.put(KEY1, VALUE1);
 
 482         putAllValues.put(KEY2, VALUE2);
 
 484         // Put the values in the map
 
 485         ecMap.putAll(putAllValues);
 
 487         // Check the correct messages and events were sent
 
 488         verify(clusterCommunicator);
 
 493     public void testClear() throws Exception {
 
 494         EventuallyConsistentMapListener<String, String> listener
 
 496         listener.event(new EventuallyConsistentMapEvent<>(
 
 497                 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
 
 498         listener.event(new EventuallyConsistentMapEvent<>(
 
 499                 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY2, VALUE2));
 
 502         // clear() on an empty map is a no-op - no messages will be sent
 
 503         reset(clusterCommunicator);
 
 504         replay(clusterCommunicator);
 
 506         assertTrue(ecMap.isEmpty());
 
 508         verify(clusterCommunicator);
 
 510         // Put some items in the map
 
 511         expectPeerMessage(clusterCommunicator);
 
 512         ecMap.put(KEY1, VALUE1);
 
 513         ecMap.put(KEY2, VALUE2);
 
 515         ecMap.addListener(listener);
 
 516         expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
 520         verify(clusterCommunicator);
 
 525     public void testKeySet() throws Exception {
 
 526         expectPeerMessage(clusterCommunicator);
 
 528         assertTrue(ecMap.keySet().isEmpty());
 
 530         // Generate some keys
 
 531         Set<String> keys = new HashSet<>();
 
 532         for (int i = 1; i <= 10; i++) {
 
 536         // Put each key in the map
 
 537         keys.forEach(k -> ecMap.put(k, "value" + k));
 
 539         // Check keySet() returns the correct value
 
 540         assertEquals(keys, ecMap.keySet());
 
 542         // Update the value for one of the keys
 
 543         ecMap.put(keys.iterator().next(), "new-value");
 
 545         // Check the key set is still the same
 
 546         assertEquals(keys, ecMap.keySet());
 
 549         String removeKey = keys.iterator().next();
 
 550         keys.remove(removeKey);
 
 551         ecMap.remove(removeKey);
 
 553         // Check the key set is still correct
 
 554         assertEquals(keys, ecMap.keySet());
 
 558     public void testValues() throws Exception {
 
 559         expectPeerMessage(clusterCommunicator);
 
 561         assertTrue(ecMap.values().isEmpty());
 
 563         // Generate some values
 
 564         Map<String, String> expectedValues = new HashMap<>();
 
 565         for (int i = 1; i <= 10; i++) {
 
 566             expectedValues.put("" + i, "value" + i);
 
 569         // Add them into the map
 
 570         expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
 
 572         // Check the values collection is correct
 
 573         assertEquals(expectedValues.values().size(), ecMap.values().size());
 
 574         expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
 
 576         // Update the value for one of the keys
 
 577         Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
 
 578         expectedValues.put(first.getKey(), "new-value");
 
 579         ecMap.put(first.getKey(), "new-value");
 
 581         // Check the values collection is still correct
 
 582         assertEquals(expectedValues.values().size(), ecMap.values().size());
 
 583         expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
 
 586         String removeKey = expectedValues.keySet().iterator().next();
 
 587         expectedValues.remove(removeKey);
 
 588         ecMap.remove(removeKey);
 
 590         // Check the values collection is still correct
 
 591         assertEquals(expectedValues.values().size(), ecMap.values().size());
 
 592         expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
 
 596     public void testEntrySet() throws Exception {
 
 597         expectPeerMessage(clusterCommunicator);
 
 599         assertTrue(ecMap.entrySet().isEmpty());
 
 601         // Generate some values
 
 602         Map<String, String> expectedValues = new HashMap<>();
 
 603         for (int i = 1; i <= 10; i++) {
 
 604             expectedValues.put("" + i, "value" + i);
 
 607         // Add them into the map
 
 608         expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
 
 610         // Check the entry set is correct
 
 611         assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
 
 613         // Update the value for one of the keys
 
 614         Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
 
 615         expectedValues.put(first.getKey(), "new-value");
 
 616         ecMap.put(first.getKey(), "new-value");
 
 618         // Check the entry set is still correct
 
 619         assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
 
 622         String removeKey = expectedValues.keySet().iterator().next();
 
 623         expectedValues.remove(removeKey);
 
 624         ecMap.remove(removeKey);
 
 626         // Check the entry set is still correct
 
 627         assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
 
 630     private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
 
 631         if (expectedMap.entrySet().size() != actual.size()) {
 
 635         for (Map.Entry<String, String> e : actual) {
 
 636             if (!expectedMap.containsKey(e.getKey())) {
 
 639             if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
 
 647     public void testDestroy() throws Exception {
 
 648         clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
 
 649         clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
 
 651         replay(clusterCommunicator);
 
 655         verify(clusterCommunicator);
 
 659             fail("get after destroy should throw exception");
 
 660         } catch (IllegalStateException e) {
 
 665             ecMap.put(KEY1, VALUE1);
 
 666             fail("put after destroy should throw exception");
 
 667         } catch (IllegalStateException e) {
 
 672     private UpdateEntry<String, String> generatePutMessage(String key, String value, Timestamp timestamp) {
 
 673         return new UpdateEntry<>(key, new MapValue<>(value, timestamp));
 
 676     private List<UpdateEntry<String, String>> generatePutMessage(
 
 677             String key1, String value1, String key2, String value2) {
 
 678         List<UpdateEntry<String, String>> list = new ArrayList<>();
 
 680         Timestamp timestamp1 = clockService.peek(1);
 
 681         Timestamp timestamp2 = clockService.peek(2);
 
 683         list.add(generatePutMessage(key1, value1, timestamp1));
 
 684         list.add(generatePutMessage(key2, value2, timestamp2));
 
 689     private UpdateEntry<String, String> generateRemoveMessage(String key, Timestamp timestamp) {
 
 690         return new UpdateEntry<>(key, new MapValue<>(null, timestamp));
 
 693     private List<UpdateEntry<String, String>> generateRemoveMessage(String key1, String key2) {
 
 694         List<UpdateEntry<String, String>> list = new ArrayList<>();
 
 696         Timestamp timestamp1 = clockService.peek(1);
 
 697         Timestamp timestamp2 = clockService.peek(2);
 
 699         list.add(generateRemoveMessage(key1, timestamp1));
 
 700         list.add(generateRemoveMessage(key2, timestamp2));
 
 706      * Sets up a mock ClusterCommunicationService to expect a specific cluster
 
 707      * message to be broadcast to the cluster.
 
 709      * @param message message we expect to be sent
 
 710      * @param clusterCommunicator a mock ClusterCommunicationService to set up
 
 713     private static <T> void expectSpecificBroadcastMessage(
 
 715             MessageSubject subject,
 
 716             ClusterCommunicationService clusterCommunicator) {
 
 717         reset(clusterCommunicator);
 
 718         clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
 
 719         expectLastCall().anyTimes();
 
 720         replay(clusterCommunicator);
 
 724      * Sets up a mock ClusterCommunicationService to expect a specific cluster
 
 725      * message to be multicast to the cluster.
 
 727      * @param message message we expect to be sent
 
 728      * @param subject subject we expect to be sent to
 
 729      * @param clusterCommunicator a mock ClusterCommunicationService to set up
 
 732     private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
 
 733                            ClusterCommunicationService clusterCommunicator) {
 
 734         reset(clusterCommunicator);
 
 735         clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
 
 736         expectLastCall().anyTimes();
 
 737         replay(clusterCommunicator);
 
 742      * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
 
 743      * that is sent to it. This is useful for unit tests where we aren't
 
 744      * interested in testing the messaging component.
 
 746      * @param clusterCommunicator a mock ClusterCommunicationService to set up
 
 749     private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
 
 750         reset(clusterCommunicator);
 
 751 //        expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
 
 752 //                                             anyObject(Iterable.class)))
 
 753         expect(clusterCommunicator.<T>unicast(
 
 755                     anyObject(MessageSubject.class),
 
 756                     anyObject(Function.class),
 
 757                     anyObject(NodeId.class)))
 
 758                 .andReturn(CompletableFuture.completedFuture(null))
 
 760         replay(clusterCommunicator);
 
 764      * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
 
 765      * that is sent to it. This is useful for unit tests where we aren't
 
 766      * interested in testing the messaging component.
 
 768      * @param clusterCommunicator a mock ClusterCommunicationService to set up
 
 770     private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
 
 771         reset(clusterCommunicator);
 
 772         clusterCommunicator.<AbstractEvent>multicast(
 
 773                 anyObject(AbstractEvent.class),
 
 774                 anyObject(MessageSubject.class),
 
 775                 anyObject(Function.class),
 
 776                 anyObject(Set.class));
 
 777         expectLastCall().anyTimes();
 
 778         replay(clusterCommunicator);
 
 782      * ClusterCommunicationService implementation that the map's addSubscriber
 
 783      * call will delegate to. This means we can get a reference to the
 
 784      * internal cluster message handler used by the map, so that we can simulate
 
 785      * events coming in from other instances.
 
 787     private final class TestClusterCommunicationService
 
 788             extends ClusterCommunicationServiceAdapter {
 
 791         public <M> void addSubscriber(MessageSubject subject,
 
 792                 Function<byte[], M> decoder, Consumer<M> handler,
 
 794             if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
 
 795                 updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
 
 796             } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
 
 797                 antiEntropyHandler = (Consumer<AntiEntropyAdvertisement<String>>) handler;
 
 799                 throw new RuntimeException("Unexpected message subject " + subject.toString());
 
 805      * ClockService implementation that gives out timestamps based on a
 
 806      * sequential counter. This clock service enables more control over the
 
 807      * timestamps that are given out, including being able to "turn back time"
 
 808      * to give out timestamps from the past.
 
 810      * @param <T> Type that the clock service will give out timestamps for
 
 811      * @param <U> Second type that the clock service will give out values for
 
 813     private class SequentialClockService<T, U> {
 
 815         private static final long INITIAL_VALUE = 1;
 
 816         private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
 
 818         public Timestamp getTimestamp(T object, U object2) {
 
 819             return new TestTimestamp(counter.getAndIncrement());
 
 823          * Returns what the next timestamp will be without consuming the
 
 824          * timestamp. This allows test code to set expectations correctly while
 
 825          * still allowing the CUT to get the same timestamp.
 
 827          * @return timestamp equal to the timestamp that will be returned by the
 
 828          * next call to {@link #getTimestamp(T, U)}.
 
 830         public Timestamp peekAtNextTimestamp() {
 
 835          * Returns the ith timestamp to be given out in the future without
 
 836          * consuming the timestamp. For example, i=1 returns the next timestamp,
 
 837          * i=2 returns the timestamp after that, and so on.
 
 839          * @param i number of the timestamp to peek at
 
 840          * @return the ith timestamp that will be given out
 
 842         public Timestamp peek(int i) {
 
 843             checkArgument(i > 0, "i must be a positive integer");
 
 845             return new TestTimestamp(counter.get() + i - 1);
 
 849          * Turns the clock back two ticks, so the next call to getTimestamp will
 
 850          * return an older timestamp than the previous call to getTimestamp.
 
 852         public void turnBackTime() {
 
 853             // Not atomic, but should be OK for these tests.
 
 854             counter.decrementAndGet();
 
 855             counter.decrementAndGet();
 
 861      * Timestamp implementation where the value of the timestamp can be
 
 862      * specified explicitly at creation time.
 
 864     private class TestTimestamp implements Timestamp {
 
 866         private final long timestamp;
 
 869          * Creates a new timestamp that has the specified value.
 
 871          * @param timestamp value of the timestamp
 
 873         public TestTimestamp(long timestamp) {
 
 874             this.timestamp = timestamp;
 
 878         public int compareTo(Timestamp o) {
 
 879             checkArgument(o instanceof TestTimestamp);
 
 880             TestTimestamp otherTimestamp = (TestTimestamp) o;
 
 881             return ComparisonChain.start()
 
 882                     .compare(this.timestamp, otherTimestamp.timestamp)
 
 888      * EventuallyConsistentMapListener implementation which triggers a latch
 
 889      * when it receives an event.
 
 891     private class TestListener implements EventuallyConsistentMapListener<String, String> {
 
 892         private CountDownLatch latch;
 
 895          * Creates a new listener that will trigger the specified latch when it
 
 896          * receives and event.
 
 898          * @param latch the latch to trigger on events
 
 900         public TestListener(CountDownLatch latch) {
 
 905         public void event(EventuallyConsistentMapEvent<String, String> event) {