ef8d9924ae9faf01f7eafe6741f378982c331eb6
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.store.ecmap;
17
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.HashMap;
21 import java.util.HashSet;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Objects;
25 import java.util.Optional;
26 import java.util.Set;
27 import java.util.concurrent.CompletableFuture;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.Executor;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicLong;
32 import java.util.function.Consumer;
33 import java.util.function.Function;
34
35 import org.junit.After;
36 import org.junit.Before;
37 import org.junit.Test;
38 import org.onlab.packet.IpAddress;
39 import org.onlab.util.KryoNamespace;
40 import org.onosproject.cluster.ClusterService;
41 import org.onosproject.cluster.ControllerNode;
42 import org.onosproject.cluster.DefaultControllerNode;
43 import org.onosproject.cluster.NodeId;
44 import org.onosproject.event.AbstractEvent;
45 import org.onosproject.persistence.impl.PersistenceManager;
46 import org.onosproject.store.Timestamp;
47 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
48 import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
49 import org.onosproject.store.cluster.messaging.MessageSubject;
50 import org.onosproject.store.impl.LogicalTimestamp;
51 import org.onosproject.store.serializers.KryoNamespaces;
52 import org.onosproject.store.serializers.KryoSerializer;
53 import org.onosproject.store.service.EventuallyConsistentMap;
54 import org.onosproject.store.service.EventuallyConsistentMapEvent;
55 import org.onosproject.store.service.EventuallyConsistentMapListener;
56 import org.onosproject.store.service.WallClockTimestamp;
57
58 import com.google.common.collect.ComparisonChain;
59 import com.google.common.collect.ImmutableList;
60 import com.google.common.collect.ImmutableSet;
61 import com.google.common.util.concurrent.MoreExecutors;
62
63 import static com.google.common.base.Preconditions.checkArgument;
64 import static junit.framework.TestCase.assertFalse;
65 import static org.easymock.EasyMock.anyObject;
66 import static org.easymock.EasyMock.createMock;
67 import static org.easymock.EasyMock.eq;
68 import static org.easymock.EasyMock.expect;
69 import static org.easymock.EasyMock.expectLastCall;
70 import static org.easymock.EasyMock.replay;
71 import static org.easymock.EasyMock.reset;
72 import static org.easymock.EasyMock.verify;
73 import static org.junit.Assert.assertEquals;
74 import static org.junit.Assert.assertNull;
75 import static org.junit.Assert.assertTrue;
76 import static org.junit.Assert.fail;
77
78 /**
79  * Unit tests for EventuallyConsistentMapImpl.
80  */
81 public class EventuallyConsistentMapImplTest {
82
83     private EventuallyConsistentMap<String, String> ecMap;
84
85     private PersistenceManager persistenceService;
86     private ClusterService clusterService;
87     private ClusterCommunicationService clusterCommunicator;
88     private SequentialClockService<String, String> clockService;
89
90     private static final String MAP_NAME = "test";
91     private static final MessageSubject UPDATE_MESSAGE_SUBJECT
92             = new MessageSubject("ecm-" + MAP_NAME + "-update");
93     private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
94             = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
95
96     private static final String KEY1 = "one";
97     private static final String KEY2 = "two";
98     private static final String VALUE1 = "oneValue";
99     private static final String VALUE2 = "twoValue";
100
101     private final ControllerNode self =
102             new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
103
104     private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
105     private Consumer<AntiEntropyAdvertisement<String>> antiEntropyHandler;
106
107     /*
108      * Serialization is a bit tricky here. We need to serialize in the tests
109      * to set the expectations, which will use this serializer here, but the
110      * EventuallyConsistentMap will use its own internal serializer. This means
111      * this serializer must be set up exactly the same as map's internal
112      * serializer.
113      */
114     private static final KryoSerializer SERIALIZER = new KryoSerializer() {
115         @Override
116         protected void setupKryoPool() {
117             serializerPool = KryoNamespace.newBuilder()
118                     // Classes we give to the map
119                     .register(KryoNamespaces.API)
120                     .register(TestTimestamp.class)
121                     // Below is the classes that the map internally registers
122                     .register(LogicalTimestamp.class)
123                     .register(WallClockTimestamp.class)
124                     .register(ArrayList.class)
125                     .register(AntiEntropyAdvertisement.class)
126                     .register(HashMap.class)
127                     .register(Optional.class)
128                     .build();
129         }
130     };
131
132     @Before
133     public void setUp() throws Exception {
134         clusterService = createMock(ClusterService.class);
135         expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
136         expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
137         replay(clusterService);
138
139         clusterCommunicator = createMock(ClusterCommunicationService.class);
140
141         persistenceService = new PersistenceManager();
142         persistenceService.activate();
143         // Add expectation for adding cluster message subscribers which
144         // delegate to our ClusterCommunicationService implementation. This
145         // allows us to get a reference to the map's internal cluster message
146         // handlers so we can induce events coming in from a peer.
147         clusterCommunicator.<String>addSubscriber(anyObject(MessageSubject.class),
148                 anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
149         expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(2);
150
151         replay(clusterCommunicator);
152
153         clockService = new SequentialClockService<>();
154
155         KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
156                 .register(KryoNamespaces.API)
157                 .register(TestTimestamp.class);
158
159         ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
160                         clusterService, clusterCommunicator, persistenceService)
161                 .withName(MAP_NAME)
162                 .withSerializer(serializer)
163                 .withTimestampProvider((k, v) -> clockService.getTimestamp(k, v))
164                 .withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
165                 .withPersistence()
166                 .build();
167
168         // Reset ready for tests to add their own expectations
169         reset(clusterCommunicator);
170     }
171
172     @After
173     public void tearDown() {
174         reset(clusterCommunicator);
175         ecMap.destroy();
176     }
177
178     @SuppressWarnings("unchecked")
179     private EventuallyConsistentMapListener<String, String> getListener() {
180         return createMock(EventuallyConsistentMapListener.class);
181     }
182
183     @Test
184     public void testSize() throws Exception {
185         expectPeerMessage(clusterCommunicator);
186
187         assertEquals(0, ecMap.size());
188         ecMap.put(KEY1, VALUE1);
189         assertEquals(1, ecMap.size());
190         ecMap.put(KEY1, VALUE2);
191         assertEquals(1, ecMap.size());
192         ecMap.put(KEY2, VALUE2);
193         assertEquals(2, ecMap.size());
194         for (int i = 0; i < 10; i++) {
195             ecMap.put("" + i, "" + i);
196         }
197         assertEquals(12, ecMap.size());
198         ecMap.remove(KEY1);
199         assertEquals(11, ecMap.size());
200         ecMap.remove(KEY1);
201         assertEquals(11, ecMap.size());
202     }
203
204     @Test
205     public void testIsEmpty() throws Exception {
206         expectPeerMessage(clusterCommunicator);
207
208         assertTrue(ecMap.isEmpty());
209         ecMap.put(KEY1, VALUE1);
210         assertFalse(ecMap.isEmpty());
211         ecMap.remove(KEY1);
212         assertTrue(ecMap.isEmpty());
213     }
214
215     @Test
216     public void testContainsKey() throws Exception {
217         expectPeerMessage(clusterCommunicator);
218
219         assertFalse(ecMap.containsKey(KEY1));
220         ecMap.put(KEY1, VALUE1);
221         assertTrue(ecMap.containsKey(KEY1));
222         assertFalse(ecMap.containsKey(KEY2));
223         ecMap.remove(KEY1);
224         assertFalse(ecMap.containsKey(KEY1));
225     }
226
227     @Test
228     public void testContainsValue() throws Exception {
229         expectPeerMessage(clusterCommunicator);
230
231         assertFalse(ecMap.containsValue(VALUE1));
232         ecMap.put(KEY1, VALUE1);
233         assertTrue(ecMap.containsValue(VALUE1));
234         assertFalse(ecMap.containsValue(VALUE2));
235         ecMap.put(KEY1, VALUE2);
236         assertFalse(ecMap.containsValue(VALUE1));
237         assertTrue(ecMap.containsValue(VALUE2));
238         ecMap.remove(KEY1);
239         assertFalse(ecMap.containsValue(VALUE2));
240     }
241
242     @Test
243     public void testGet() throws Exception {
244         expectPeerMessage(clusterCommunicator);
245
246         CountDownLatch latch;
247
248         // Local put
249         assertNull(ecMap.get(KEY1));
250         ecMap.put(KEY1, VALUE1);
251         assertEquals(VALUE1, ecMap.get(KEY1));
252
253         // Remote put
254         List<UpdateEntry<String, String>> message
255                 = ImmutableList.of(generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2)));
256
257         // Create a latch so we know when the put operation has finished
258         latch = new CountDownLatch(1);
259         ecMap.addListener(new TestListener(latch));
260
261         assertNull(ecMap.get(KEY2));
262         updateHandler.accept(message);
263         assertTrue("External listener never got notified of internal event",
264                    latch.await(100, TimeUnit.MILLISECONDS));
265         assertEquals(VALUE2, ecMap.get(KEY2));
266
267         // Local remove
268         ecMap.remove(KEY2);
269         assertNull(ecMap.get(KEY2));
270
271         // Remote remove
272         message = ImmutableList.of(generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1)));
273
274         // Create a latch so we know when the remove operation has finished
275         latch = new CountDownLatch(1);
276         ecMap.addListener(new TestListener(latch));
277
278         updateHandler.accept(message);
279         assertTrue("External listener never got notified of internal event",
280                    latch.await(100, TimeUnit.MILLISECONDS));
281         assertNull(ecMap.get(KEY1));
282     }
283
284     @Test
285     public void testPut() throws Exception {
286         // Set up expectations of external events to be sent to listeners during
287         // the test. These don't use timestamps so we can set them all up at once.
288         EventuallyConsistentMapListener<String, String> listener
289                 = getListener();
290         listener.event(new EventuallyConsistentMapEvent<>(
291                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
292         listener.event(new EventuallyConsistentMapEvent<>(
293                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
294         replay(listener);
295
296         ecMap.addListener(listener);
297
298         // Set up expected internal message to be broadcast to peers on first put
299         expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
300                 .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
301
302         // Put first value
303         assertNull(ecMap.get(KEY1));
304         ecMap.put(KEY1, VALUE1);
305         assertEquals(VALUE1, ecMap.get(KEY1));
306
307         verify(clusterCommunicator);
308
309         // Set up expected internal message to be broadcast to peers on second put
310         expectSpecificMulticastMessage(generatePutMessage(
311                 KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
312
313         // Update same key to a new value
314         ecMap.put(KEY1, VALUE2);
315         assertEquals(VALUE2, ecMap.get(KEY1));
316
317         verify(clusterCommunicator);
318
319         // Do a put with a older timestamp than the value already there.
320         // The map data should not be changed and no notifications should be sent.
321         reset(clusterCommunicator);
322         replay(clusterCommunicator);
323
324         clockService.turnBackTime();
325         ecMap.put(KEY1, VALUE1);
326         // Value should not have changed.
327         assertEquals(VALUE2, ecMap.get(KEY1));
328
329         verify(clusterCommunicator);
330
331         // Check that our listener received the correct events during the test
332         verify(listener);
333     }
334
335     @Test
336     public void testRemove() throws Exception {
337         // Set up expectations of external events to be sent to listeners during
338         // the test. These don't use timestamps so we can set them all up at once.
339         EventuallyConsistentMapListener<String, String> listener
340                 = getListener();
341         listener.event(new EventuallyConsistentMapEvent<>(
342                 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
343         listener.event(new EventuallyConsistentMapEvent<>(
344                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
345         listener.event(new EventuallyConsistentMapEvent<>(
346                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
347         replay(listener);
348
349         ecMap.addListener(listener);
350
351         // Put in an initial value
352         expectPeerMessage(clusterCommunicator);
353         ecMap.put(KEY1, VALUE1);
354         assertEquals(VALUE1, ecMap.get(KEY1));
355
356         // Remove the value and check the correct internal cluster messages
357         // are sent
358         expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
359                 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
360
361         ecMap.remove(KEY1);
362         assertNull(ecMap.get(KEY1));
363
364         verify(clusterCommunicator);
365
366         // Remove the same value again. Even though the value is no longer in
367         // the map, we expect that the tombstone is updated and another remove
368         // event is sent to the cluster and external listeners.
369         expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
370                 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
371
372         ecMap.remove(KEY1);
373         assertNull(ecMap.get(KEY1));
374
375         verify(clusterCommunicator);
376
377
378         // Put in a new value for us to try and remove
379         expectPeerMessage(clusterCommunicator);
380
381         ecMap.put(KEY2, VALUE2);
382
383         clockService.turnBackTime();
384
385         // Remove should have no effect, since it has an older timestamp than
386         // the put. Expect no notifications to be sent out
387         reset(clusterCommunicator);
388         replay(clusterCommunicator);
389
390         ecMap.remove(KEY2);
391
392         verify(clusterCommunicator);
393
394         // Check that our listener received the correct events during the test
395         verify(listener);
396     }
397
398     @Test
399     public void testCompute() throws Exception {
400         // Set up expectations of external events to be sent to listeners during
401         // the test. These don't use timestamps so we can set them all up at once.
402         EventuallyConsistentMapListener<String, String> listener
403                 = getListener();
404         listener.event(new EventuallyConsistentMapEvent<>(
405                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
406         listener.event(new EventuallyConsistentMapEvent<>(
407                 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
408         listener.event(new EventuallyConsistentMapEvent<>(
409                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
410         replay(listener);
411
412         ecMap.addListener(listener);
413
414         // Put in an initial value
415         expectPeerMessage(clusterCommunicator);
416         ecMap.compute(KEY1, (k, v) -> VALUE1);
417         assertEquals(VALUE1, ecMap.get(KEY1));
418
419         // Remove the value and check the correct internal cluster messages
420         // are sent
421         expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
422                 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
423
424         ecMap.compute(KEY1, (k, v) -> null);
425         assertNull(ecMap.get(KEY1));
426
427         verify(clusterCommunicator);
428
429         // Remove the same value again. Even though the value is no longer in
430         // the map, we expect that the tombstone is updated and another remove
431         // event is sent to the cluster and external listeners.
432         expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
433                 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
434
435         ecMap.compute(KEY1, (k, v) -> null);
436         assertNull(ecMap.get(KEY1));
437
438         verify(clusterCommunicator);
439
440         // Put in a new value for us to try and remove
441         expectPeerMessage(clusterCommunicator);
442
443         ecMap.compute(KEY2, (k, v) -> VALUE2);
444
445         clockService.turnBackTime();
446
447         // Remove should have no effect, since it has an older timestamp than
448         // the put. Expect no notifications to be sent out
449         reset(clusterCommunicator);
450         replay(clusterCommunicator);
451
452         ecMap.compute(KEY2, (k, v) -> null);
453
454         verify(clusterCommunicator);
455
456         // Check that our listener received the correct events during the test
457         verify(listener);
458     }
459
460     @Test
461     public void testPutAll() throws Exception {
462         // putAll() with an empty map is a no-op - no messages will be sent
463         reset(clusterCommunicator);
464         replay(clusterCommunicator);
465
466         ecMap.putAll(new HashMap<>());
467
468         verify(clusterCommunicator);
469
470         // Set up the listener with our expected events
471         EventuallyConsistentMapListener<String, String> listener
472                 = getListener();
473         listener.event(new EventuallyConsistentMapEvent<>(
474                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
475         listener.event(new EventuallyConsistentMapEvent<>(
476                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
477         replay(listener);
478
479         ecMap.addListener(listener);
480
481         // Expect a multi-update inter-instance message
482         expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
483                                        clusterCommunicator);
484
485         Map<String, String> putAllValues = new HashMap<>();
486         putAllValues.put(KEY1, VALUE1);
487         putAllValues.put(KEY2, VALUE2);
488
489         // Put the values in the map
490         ecMap.putAll(putAllValues);
491
492         // Check the correct messages and events were sent
493         verify(clusterCommunicator);
494         verify(listener);
495     }
496
497     @Test
498     public void testClear() throws Exception {
499         EventuallyConsistentMapListener<String, String> listener
500                 = getListener();
501         listener.event(new EventuallyConsistentMapEvent<>(
502                 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
503         listener.event(new EventuallyConsistentMapEvent<>(
504                 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY2, VALUE2));
505         replay(listener);
506
507         // clear() on an empty map is a no-op - no messages will be sent
508         reset(clusterCommunicator);
509         replay(clusterCommunicator);
510
511         assertTrue(ecMap.isEmpty());
512         ecMap.clear();
513         verify(clusterCommunicator);
514
515         // Put some items in the map
516         expectPeerMessage(clusterCommunicator);
517         ecMap.put(KEY1, VALUE1);
518         ecMap.put(KEY2, VALUE2);
519
520         ecMap.addListener(listener);
521         expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
522
523         ecMap.clear();
524
525         verify(clusterCommunicator);
526         verify(listener);
527     }
528
529     @Test
530     public void testKeySet() throws Exception {
531         expectPeerMessage(clusterCommunicator);
532
533         assertTrue(ecMap.keySet().isEmpty());
534
535         // Generate some keys
536         Set<String> keys = new HashSet<>();
537         for (int i = 1; i <= 10; i++) {
538             keys.add("" + i);
539         }
540
541         // Put each key in the map
542         keys.forEach(k -> ecMap.put(k, "value" + k));
543
544         // Check keySet() returns the correct value
545         assertEquals(keys, ecMap.keySet());
546
547         // Update the value for one of the keys
548         ecMap.put(keys.iterator().next(), "new-value");
549
550         // Check the key set is still the same
551         assertEquals(keys, ecMap.keySet());
552
553         // Remove a key
554         String removeKey = keys.iterator().next();
555         keys.remove(removeKey);
556         ecMap.remove(removeKey);
557
558         // Check the key set is still correct
559         assertEquals(keys, ecMap.keySet());
560     }
561
562     @Test
563     public void testValues() throws Exception {
564         expectPeerMessage(clusterCommunicator);
565
566         assertTrue(ecMap.values().isEmpty());
567
568         // Generate some values
569         Map<String, String> expectedValues = new HashMap<>();
570         for (int i = 1; i <= 10; i++) {
571             expectedValues.put("" + i, "value" + i);
572         }
573
574         // Add them into the map
575         expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
576
577         // Check the values collection is correct
578         assertEquals(expectedValues.values().size(), ecMap.values().size());
579         expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
580
581         // Update the value for one of the keys
582         Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
583         expectedValues.put(first.getKey(), "new-value");
584         ecMap.put(first.getKey(), "new-value");
585
586         // Check the values collection is still correct
587         assertEquals(expectedValues.values().size(), ecMap.values().size());
588         expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
589
590         // Remove a key
591         String removeKey = expectedValues.keySet().iterator().next();
592         expectedValues.remove(removeKey);
593         ecMap.remove(removeKey);
594
595         // Check the values collection is still correct
596         assertEquals(expectedValues.values().size(), ecMap.values().size());
597         expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
598     }
599
600     @Test
601     public void testEntrySet() throws Exception {
602         expectPeerMessage(clusterCommunicator);
603
604         assertTrue(ecMap.entrySet().isEmpty());
605
606         // Generate some values
607         Map<String, String> expectedValues = new HashMap<>();
608         for (int i = 1; i <= 10; i++) {
609             expectedValues.put("" + i, "value" + i);
610         }
611
612         // Add them into the map
613         expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
614
615         // Check the entry set is correct
616         assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
617
618         // Update the value for one of the keys
619         Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
620         expectedValues.put(first.getKey(), "new-value");
621         ecMap.put(first.getKey(), "new-value");
622
623         // Check the entry set is still correct
624         assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
625
626         // Remove a key
627         String removeKey = expectedValues.keySet().iterator().next();
628         expectedValues.remove(removeKey);
629         ecMap.remove(removeKey);
630
631         // Check the entry set is still correct
632         assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
633     }
634
635     private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
636         if (expectedMap.entrySet().size() != actual.size()) {
637             return false;
638         }
639
640         for (Map.Entry<String, String> e : actual) {
641             if (!expectedMap.containsKey(e.getKey())) {
642                 return false;
643             }
644             if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
645                 return false;
646             }
647         }
648         return true;
649     }
650
651     @Test
652     public void testDestroy() throws Exception {
653         clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
654         clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
655
656         replay(clusterCommunicator);
657
658         ecMap.destroy();
659
660         verify(clusterCommunicator);
661
662         try {
663             ecMap.get(KEY1);
664             fail("get after destroy should throw exception");
665         } catch (IllegalStateException e) {
666             assertTrue(true);
667         }
668
669         try {
670             ecMap.put(KEY1, VALUE1);
671             fail("put after destroy should throw exception");
672         } catch (IllegalStateException e) {
673             assertTrue(true);
674         }
675     }
676
677     private UpdateEntry<String, String> generatePutMessage(String key, String value, Timestamp timestamp) {
678         return new UpdateEntry<>(key, new MapValue<>(value, timestamp));
679     }
680
681     private List<UpdateEntry<String, String>> generatePutMessage(
682             String key1, String value1, String key2, String value2) {
683         List<UpdateEntry<String, String>> list = new ArrayList<>();
684
685         Timestamp timestamp1 = clockService.peek(1);
686         Timestamp timestamp2 = clockService.peek(2);
687
688         list.add(generatePutMessage(key1, value1, timestamp1));
689         list.add(generatePutMessage(key2, value2, timestamp2));
690
691         return list;
692     }
693
694     private UpdateEntry<String, String> generateRemoveMessage(String key, Timestamp timestamp) {
695         return new UpdateEntry<>(key, new MapValue<>(null, timestamp));
696     }
697
698     private List<UpdateEntry<String, String>> generateRemoveMessage(String key1, String key2) {
699         List<UpdateEntry<String, String>> list = new ArrayList<>();
700
701         Timestamp timestamp1 = clockService.peek(1);
702         Timestamp timestamp2 = clockService.peek(2);
703
704         list.add(generateRemoveMessage(key1, timestamp1));
705         list.add(generateRemoveMessage(key2, timestamp2));
706
707         return list;
708     }
709
710     /**
711      * Sets up a mock ClusterCommunicationService to expect a specific cluster
712      * message to be broadcast to the cluster.
713      *
714      * @param message message we expect to be sent
715      * @param clusterCommunicator a mock ClusterCommunicationService to set up
716      */
717     //FIXME rename
718     private static <T> void expectSpecificBroadcastMessage(
719             T message,
720             MessageSubject subject,
721             ClusterCommunicationService clusterCommunicator) {
722         reset(clusterCommunicator);
723         clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
724         expectLastCall().anyTimes();
725         replay(clusterCommunicator);
726     }
727
728     /**
729      * Sets up a mock ClusterCommunicationService to expect a specific cluster
730      * message to be multicast to the cluster.
731      *
732      * @param message message we expect to be sent
733      * @param subject subject we expect to be sent to
734      * @param clusterCommunicator a mock ClusterCommunicationService to set up
735      */
736     //FIXME rename
737     private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
738                            ClusterCommunicationService clusterCommunicator) {
739         reset(clusterCommunicator);
740         clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
741         expectLastCall().anyTimes();
742         replay(clusterCommunicator);
743     }
744
745
746     /**
747      * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
748      * that is sent to it. This is useful for unit tests where we aren't
749      * interested in testing the messaging component.
750      *
751      * @param clusterCommunicator a mock ClusterCommunicationService to set up
752      */
753     //FIXME rename
754     private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
755         reset(clusterCommunicator);
756 //        expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
757 //                                             anyObject(Iterable.class)))
758         expect(clusterCommunicator.<T>unicast(
759                     anyObject(),
760                     anyObject(MessageSubject.class),
761                     anyObject(Function.class),
762                     anyObject(NodeId.class)))
763                 .andReturn(CompletableFuture.completedFuture(null))
764                 .anyTimes();
765         replay(clusterCommunicator);
766     }
767
768     /**
769      * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
770      * that is sent to it. This is useful for unit tests where we aren't
771      * interested in testing the messaging component.
772      *
773      * @param clusterCommunicator a mock ClusterCommunicationService to set up
774      */
775     private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
776         reset(clusterCommunicator);
777         clusterCommunicator.<AbstractEvent>multicast(
778                 anyObject(AbstractEvent.class),
779                 anyObject(MessageSubject.class),
780                 anyObject(Function.class),
781                 anyObject(Set.class));
782         expectLastCall().anyTimes();
783         replay(clusterCommunicator);
784     }
785
786     /**
787      * ClusterCommunicationService implementation that the map's addSubscriber
788      * call will delegate to. This means we can get a reference to the
789      * internal cluster message handler used by the map, so that we can simulate
790      * events coming in from other instances.
791      */
792     private final class TestClusterCommunicationService
793             extends ClusterCommunicationServiceAdapter {
794
795         @Override
796         public <M> void addSubscriber(MessageSubject subject,
797                 Function<byte[], M> decoder, Consumer<M> handler,
798                 Executor executor) {
799             if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
800                 updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
801             } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
802                 antiEntropyHandler = (Consumer<AntiEntropyAdvertisement<String>>) handler;
803             } else {
804                 throw new RuntimeException("Unexpected message subject " + subject.toString());
805             }
806         }
807     }
808
809     /**
810      * ClockService implementation that gives out timestamps based on a
811      * sequential counter. This clock service enables more control over the
812      * timestamps that are given out, including being able to "turn back time"
813      * to give out timestamps from the past.
814      *
815      * @param <T> Type that the clock service will give out timestamps for
816      * @param <U> Second type that the clock service will give out values for
817      */
818     private class SequentialClockService<T, U> {
819
820         private static final long INITIAL_VALUE = 1;
821         private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
822
823         public Timestamp getTimestamp(T object, U object2) {
824             return new TestTimestamp(counter.getAndIncrement());
825         }
826
827         /**
828          * Returns what the next timestamp will be without consuming the
829          * timestamp. This allows test code to set expectations correctly while
830          * still allowing the CUT to get the same timestamp.
831          *
832          * @return timestamp equal to the timestamp that will be returned by the
833          * next call to {@link #getTimestamp(T, U)}.
834          */
835         public Timestamp peekAtNextTimestamp() {
836             return peek(1);
837         }
838
839         /**
840          * Returns the ith timestamp to be given out in the future without
841          * consuming the timestamp. For example, i=1 returns the next timestamp,
842          * i=2 returns the timestamp after that, and so on.
843          *
844          * @param i number of the timestamp to peek at
845          * @return the ith timestamp that will be given out
846          */
847         public Timestamp peek(int i) {
848             checkArgument(i > 0, "i must be a positive integer");
849
850             return new TestTimestamp(counter.get() + i - 1);
851         }
852
853         /**
854          * Turns the clock back two ticks, so the next call to getTimestamp will
855          * return an older timestamp than the previous call to getTimestamp.
856          */
857         public void turnBackTime() {
858             // Not atomic, but should be OK for these tests.
859             counter.decrementAndGet();
860             counter.decrementAndGet();
861         }
862
863     }
864
865     /**
866      * Timestamp implementation where the value of the timestamp can be
867      * specified explicitly at creation time.
868      */
869     private class TestTimestamp implements Timestamp {
870
871         private final long timestamp;
872
873         /**
874          * Creates a new timestamp that has the specified value.
875          *
876          * @param timestamp value of the timestamp
877          */
878         public TestTimestamp(long timestamp) {
879             this.timestamp = timestamp;
880         }
881
882         @Override
883         public int compareTo(Timestamp o) {
884             checkArgument(o instanceof TestTimestamp);
885             TestTimestamp otherTimestamp = (TestTimestamp) o;
886             return ComparisonChain.start()
887                     .compare(this.timestamp, otherTimestamp.timestamp)
888                     .result();
889         }
890     }
891
892     /**
893      * EventuallyConsistentMapListener implementation which triggers a latch
894      * when it receives an event.
895      */
896     private class TestListener implements EventuallyConsistentMapListener<String, String> {
897         private CountDownLatch latch;
898
899         /**
900          * Creates a new listener that will trigger the specified latch when it
901          * receives and event.
902          *
903          * @param latch the latch to trigger on events
904          */
905         public TestListener(CountDownLatch latch) {
906             this.latch = latch;
907         }
908
909         @Override
910         public void event(EventuallyConsistentMapEvent<String, String> event) {
911             latch.countDown();
912         }
913     }
914 }