ccf6ee714a32f69360baffae443a8ca7be7d987e
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.store.ecmap;
17
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.HashMap;
21 import java.util.HashSet;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Objects;
25 import java.util.Optional;
26 import java.util.Set;
27 import java.util.concurrent.CompletableFuture;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.Executor;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicLong;
32 import java.util.function.Consumer;
33 import java.util.function.Function;
34
35 import org.junit.After;
36 import org.junit.Before;
37 import org.junit.Test;
38 import org.onlab.packet.IpAddress;
39 import org.onlab.util.KryoNamespace;
40 import org.onosproject.cluster.ClusterService;
41 import org.onosproject.cluster.ControllerNode;
42 import org.onosproject.cluster.DefaultControllerNode;
43 import org.onosproject.cluster.NodeId;
44 import org.onosproject.event.AbstractEvent;
45 import org.onosproject.store.Timestamp;
46 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
47 import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
48 import org.onosproject.store.cluster.messaging.MessageSubject;
49 import org.onosproject.store.impl.LogicalTimestamp;
50 import org.onosproject.store.serializers.KryoNamespaces;
51 import org.onosproject.store.serializers.KryoSerializer;
52 import org.onosproject.store.service.EventuallyConsistentMap;
53 import org.onosproject.store.service.EventuallyConsistentMapEvent;
54 import org.onosproject.store.service.EventuallyConsistentMapListener;
55 import org.onosproject.store.service.WallClockTimestamp;
56
57 import com.google.common.collect.ComparisonChain;
58 import com.google.common.collect.ImmutableList;
59 import com.google.common.collect.ImmutableSet;
60 import com.google.common.util.concurrent.MoreExecutors;
61
62 import static com.google.common.base.Preconditions.checkArgument;
63 import static junit.framework.TestCase.assertFalse;
64 import static org.easymock.EasyMock.anyObject;
65 import static org.easymock.EasyMock.createMock;
66 import static org.easymock.EasyMock.eq;
67 import static org.easymock.EasyMock.expect;
68 import static org.easymock.EasyMock.expectLastCall;
69 import static org.easymock.EasyMock.replay;
70 import static org.easymock.EasyMock.reset;
71 import static org.easymock.EasyMock.verify;
72 import static org.junit.Assert.assertEquals;
73 import static org.junit.Assert.assertNull;
74 import static org.junit.Assert.assertTrue;
75 import static org.junit.Assert.fail;
76
77 /**
78  * Unit tests for EventuallyConsistentMapImpl.
79  */
80 public class EventuallyConsistentMapImplTest {
81
82     private EventuallyConsistentMap<String, String> ecMap;
83
84     private ClusterService clusterService;
85     private ClusterCommunicationService clusterCommunicator;
86     private SequentialClockService<String, String> clockService;
87
88     private static final String MAP_NAME = "test";
89     private static final MessageSubject UPDATE_MESSAGE_SUBJECT
90             = new MessageSubject("ecm-" + MAP_NAME + "-update");
91     private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
92             = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
93
94     private static final String KEY1 = "one";
95     private static final String KEY2 = "two";
96     private static final String VALUE1 = "oneValue";
97     private static final String VALUE2 = "twoValue";
98
99     private final ControllerNode self =
100             new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
101
102     private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
103     private Consumer<AntiEntropyAdvertisement<String>> antiEntropyHandler;
104
105     /*
106      * Serialization is a bit tricky here. We need to serialize in the tests
107      * to set the expectations, which will use this serializer here, but the
108      * EventuallyConsistentMap will use its own internal serializer. This means
109      * this serializer must be set up exactly the same as map's internal
110      * serializer.
111      */
112     private static final KryoSerializer SERIALIZER = new KryoSerializer() {
113         @Override
114         protected void setupKryoPool() {
115             serializerPool = KryoNamespace.newBuilder()
116                     // Classes we give to the map
117                     .register(KryoNamespaces.API)
118                     .register(TestTimestamp.class)
119                     // Below is the classes that the map internally registers
120                     .register(LogicalTimestamp.class)
121                     .register(WallClockTimestamp.class)
122                     .register(ArrayList.class)
123                     .register(AntiEntropyAdvertisement.class)
124                     .register(HashMap.class)
125                     .register(Optional.class)
126                     .build();
127         }
128     };
129
130     @Before
131     public void setUp() throws Exception {
132         clusterService = createMock(ClusterService.class);
133         expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
134         expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
135         replay(clusterService);
136
137         clusterCommunicator = createMock(ClusterCommunicationService.class);
138
139         // Add expectation for adding cluster message subscribers which
140         // delegate to our ClusterCommunicationService implementation. This
141         // allows us to get a reference to the map's internal cluster message
142         // handlers so we can induce events coming in from a peer.
143         clusterCommunicator.<String>addSubscriber(anyObject(MessageSubject.class),
144                 anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
145         expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(2);
146
147         replay(clusterCommunicator);
148
149         clockService = new SequentialClockService<>();
150
151         KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
152                 .register(KryoNamespaces.API)
153                 .register(TestTimestamp.class);
154
155         ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
156                         clusterService, clusterCommunicator)
157                 .withName(MAP_NAME)
158                 .withSerializer(serializer)
159                 .withTimestampProvider((k, v) -> clockService.getTimestamp(k, v))
160                 .withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
161                 .build();
162
163         // Reset ready for tests to add their own expectations
164         reset(clusterCommunicator);
165     }
166
167     @After
168     public void tearDown() {
169         reset(clusterCommunicator);
170         ecMap.destroy();
171     }
172
173     @SuppressWarnings("unchecked")
174     private EventuallyConsistentMapListener<String, String> getListener() {
175         return createMock(EventuallyConsistentMapListener.class);
176     }
177
178     @Test
179     public void testSize() throws Exception {
180         expectPeerMessage(clusterCommunicator);
181
182         assertEquals(0, ecMap.size());
183         ecMap.put(KEY1, VALUE1);
184         assertEquals(1, ecMap.size());
185         ecMap.put(KEY1, VALUE2);
186         assertEquals(1, ecMap.size());
187         ecMap.put(KEY2, VALUE2);
188         assertEquals(2, ecMap.size());
189         for (int i = 0; i < 10; i++) {
190             ecMap.put("" + i, "" + i);
191         }
192         assertEquals(12, ecMap.size());
193         ecMap.remove(KEY1);
194         assertEquals(11, ecMap.size());
195         ecMap.remove(KEY1);
196         assertEquals(11, ecMap.size());
197     }
198
199     @Test
200     public void testIsEmpty() throws Exception {
201         expectPeerMessage(clusterCommunicator);
202
203         assertTrue(ecMap.isEmpty());
204         ecMap.put(KEY1, VALUE1);
205         assertFalse(ecMap.isEmpty());
206         ecMap.remove(KEY1);
207         assertTrue(ecMap.isEmpty());
208     }
209
210     @Test
211     public void testContainsKey() throws Exception {
212         expectPeerMessage(clusterCommunicator);
213
214         assertFalse(ecMap.containsKey(KEY1));
215         ecMap.put(KEY1, VALUE1);
216         assertTrue(ecMap.containsKey(KEY1));
217         assertFalse(ecMap.containsKey(KEY2));
218         ecMap.remove(KEY1);
219         assertFalse(ecMap.containsKey(KEY1));
220     }
221
222     @Test
223     public void testContainsValue() throws Exception {
224         expectPeerMessage(clusterCommunicator);
225
226         assertFalse(ecMap.containsValue(VALUE1));
227         ecMap.put(KEY1, VALUE1);
228         assertTrue(ecMap.containsValue(VALUE1));
229         assertFalse(ecMap.containsValue(VALUE2));
230         ecMap.put(KEY1, VALUE2);
231         assertFalse(ecMap.containsValue(VALUE1));
232         assertTrue(ecMap.containsValue(VALUE2));
233         ecMap.remove(KEY1);
234         assertFalse(ecMap.containsValue(VALUE2));
235     }
236
237     @Test
238     public void testGet() throws Exception {
239         expectPeerMessage(clusterCommunicator);
240
241         CountDownLatch latch;
242
243         // Local put
244         assertNull(ecMap.get(KEY1));
245         ecMap.put(KEY1, VALUE1);
246         assertEquals(VALUE1, ecMap.get(KEY1));
247
248         // Remote put
249         List<UpdateEntry<String, String>> message
250                 = ImmutableList.of(generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2)));
251
252         // Create a latch so we know when the put operation has finished
253         latch = new CountDownLatch(1);
254         ecMap.addListener(new TestListener(latch));
255
256         assertNull(ecMap.get(KEY2));
257         updateHandler.accept(message);
258         assertTrue("External listener never got notified of internal event",
259                    latch.await(100, TimeUnit.MILLISECONDS));
260         assertEquals(VALUE2, ecMap.get(KEY2));
261
262         // Local remove
263         ecMap.remove(KEY2);
264         assertNull(ecMap.get(KEY2));
265
266         // Remote remove
267         message = ImmutableList.of(generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1)));
268
269         // Create a latch so we know when the remove operation has finished
270         latch = new CountDownLatch(1);
271         ecMap.addListener(new TestListener(latch));
272
273         updateHandler.accept(message);
274         assertTrue("External listener never got notified of internal event",
275                    latch.await(100, TimeUnit.MILLISECONDS));
276         assertNull(ecMap.get(KEY1));
277     }
278
279     @Test
280     public void testPut() throws Exception {
281         // Set up expectations of external events to be sent to listeners during
282         // the test. These don't use timestamps so we can set them all up at once.
283         EventuallyConsistentMapListener<String, String> listener
284                 = getListener();
285         listener.event(new EventuallyConsistentMapEvent<>(
286                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
287         listener.event(new EventuallyConsistentMapEvent<>(
288                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
289         replay(listener);
290
291         ecMap.addListener(listener);
292
293         // Set up expected internal message to be broadcast to peers on first put
294         expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
295                 .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
296
297         // Put first value
298         assertNull(ecMap.get(KEY1));
299         ecMap.put(KEY1, VALUE1);
300         assertEquals(VALUE1, ecMap.get(KEY1));
301
302         verify(clusterCommunicator);
303
304         // Set up expected internal message to be broadcast to peers on second put
305         expectSpecificMulticastMessage(generatePutMessage(
306                 KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
307
308         // Update same key to a new value
309         ecMap.put(KEY1, VALUE2);
310         assertEquals(VALUE2, ecMap.get(KEY1));
311
312         verify(clusterCommunicator);
313
314         // Do a put with a older timestamp than the value already there.
315         // The map data should not be changed and no notifications should be sent.
316         reset(clusterCommunicator);
317         replay(clusterCommunicator);
318
319         clockService.turnBackTime();
320         ecMap.put(KEY1, VALUE1);
321         // Value should not have changed.
322         assertEquals(VALUE2, ecMap.get(KEY1));
323
324         verify(clusterCommunicator);
325
326         // Check that our listener received the correct events during the test
327         verify(listener);
328     }
329
330     @Test
331     public void testRemove() throws Exception {
332         // Set up expectations of external events to be sent to listeners during
333         // the test. These don't use timestamps so we can set them all up at once.
334         EventuallyConsistentMapListener<String, String> listener
335                 = getListener();
336         listener.event(new EventuallyConsistentMapEvent<>(
337                 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
338         listener.event(new EventuallyConsistentMapEvent<>(
339                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
340         listener.event(new EventuallyConsistentMapEvent<>(
341                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
342         replay(listener);
343
344         ecMap.addListener(listener);
345
346         // Put in an initial value
347         expectPeerMessage(clusterCommunicator);
348         ecMap.put(KEY1, VALUE1);
349         assertEquals(VALUE1, ecMap.get(KEY1));
350
351         // Remove the value and check the correct internal cluster messages
352         // are sent
353         expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
354                 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
355
356         ecMap.remove(KEY1);
357         assertNull(ecMap.get(KEY1));
358
359         verify(clusterCommunicator);
360
361         // Remove the same value again. Even though the value is no longer in
362         // the map, we expect that the tombstone is updated and another remove
363         // event is sent to the cluster and external listeners.
364         expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
365                 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
366
367         ecMap.remove(KEY1);
368         assertNull(ecMap.get(KEY1));
369
370         verify(clusterCommunicator);
371
372
373         // Put in a new value for us to try and remove
374         expectPeerMessage(clusterCommunicator);
375
376         ecMap.put(KEY2, VALUE2);
377
378         clockService.turnBackTime();
379
380         // Remove should have no effect, since it has an older timestamp than
381         // the put. Expect no notifications to be sent out
382         reset(clusterCommunicator);
383         replay(clusterCommunicator);
384
385         ecMap.remove(KEY2);
386
387         verify(clusterCommunicator);
388
389         // Check that our listener received the correct events during the test
390         verify(listener);
391     }
392
393     @Test
394     public void testCompute() throws Exception {
395         // Set up expectations of external events to be sent to listeners during
396         // the test. These don't use timestamps so we can set them all up at once.
397         EventuallyConsistentMapListener<String, String> listener
398                 = getListener();
399         listener.event(new EventuallyConsistentMapEvent<>(
400                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
401         listener.event(new EventuallyConsistentMapEvent<>(
402                 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
403         listener.event(new EventuallyConsistentMapEvent<>(
404                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
405         replay(listener);
406
407         ecMap.addListener(listener);
408
409         // Put in an initial value
410         expectPeerMessage(clusterCommunicator);
411         ecMap.compute(KEY1, (k, v) -> VALUE1);
412         assertEquals(VALUE1, ecMap.get(KEY1));
413
414         // Remove the value and check the correct internal cluster messages
415         // are sent
416         expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
417                 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
418
419         ecMap.compute(KEY1, (k, v) -> null);
420         assertNull(ecMap.get(KEY1));
421
422         verify(clusterCommunicator);
423
424         // Remove the same value again. Even though the value is no longer in
425         // the map, we expect that the tombstone is updated and another remove
426         // event is sent to the cluster and external listeners.
427         expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
428                 UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
429
430         ecMap.compute(KEY1, (k, v) -> null);
431         assertNull(ecMap.get(KEY1));
432
433         verify(clusterCommunicator);
434
435         // Put in a new value for us to try and remove
436         expectPeerMessage(clusterCommunicator);
437
438         ecMap.compute(KEY2, (k, v) -> VALUE2);
439
440         clockService.turnBackTime();
441
442         // Remove should have no effect, since it has an older timestamp than
443         // the put. Expect no notifications to be sent out
444         reset(clusterCommunicator);
445         replay(clusterCommunicator);
446
447         ecMap.compute(KEY2, (k, v) -> null);
448
449         verify(clusterCommunicator);
450
451         // Check that our listener received the correct events during the test
452         verify(listener);
453     }
454
455     @Test
456     public void testPutAll() throws Exception {
457         // putAll() with an empty map is a no-op - no messages will be sent
458         reset(clusterCommunicator);
459         replay(clusterCommunicator);
460
461         ecMap.putAll(new HashMap<>());
462
463         verify(clusterCommunicator);
464
465         // Set up the listener with our expected events
466         EventuallyConsistentMapListener<String, String> listener
467                 = getListener();
468         listener.event(new EventuallyConsistentMapEvent<>(
469                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
470         listener.event(new EventuallyConsistentMapEvent<>(
471                 MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
472         replay(listener);
473
474         ecMap.addListener(listener);
475
476         // Expect a multi-update inter-instance message
477         expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
478                                        clusterCommunicator);
479
480         Map<String, String> putAllValues = new HashMap<>();
481         putAllValues.put(KEY1, VALUE1);
482         putAllValues.put(KEY2, VALUE2);
483
484         // Put the values in the map
485         ecMap.putAll(putAllValues);
486
487         // Check the correct messages and events were sent
488         verify(clusterCommunicator);
489         verify(listener);
490     }
491
492     @Test
493     public void testClear() throws Exception {
494         EventuallyConsistentMapListener<String, String> listener
495                 = getListener();
496         listener.event(new EventuallyConsistentMapEvent<>(
497                 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
498         listener.event(new EventuallyConsistentMapEvent<>(
499                 MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY2, VALUE2));
500         replay(listener);
501
502         // clear() on an empty map is a no-op - no messages will be sent
503         reset(clusterCommunicator);
504         replay(clusterCommunicator);
505
506         assertTrue(ecMap.isEmpty());
507         ecMap.clear();
508         verify(clusterCommunicator);
509
510         // Put some items in the map
511         expectPeerMessage(clusterCommunicator);
512         ecMap.put(KEY1, VALUE1);
513         ecMap.put(KEY2, VALUE2);
514
515         ecMap.addListener(listener);
516         expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
517
518         ecMap.clear();
519
520         verify(clusterCommunicator);
521         verify(listener);
522     }
523
524     @Test
525     public void testKeySet() throws Exception {
526         expectPeerMessage(clusterCommunicator);
527
528         assertTrue(ecMap.keySet().isEmpty());
529
530         // Generate some keys
531         Set<String> keys = new HashSet<>();
532         for (int i = 1; i <= 10; i++) {
533             keys.add("" + i);
534         }
535
536         // Put each key in the map
537         keys.forEach(k -> ecMap.put(k, "value" + k));
538
539         // Check keySet() returns the correct value
540         assertEquals(keys, ecMap.keySet());
541
542         // Update the value for one of the keys
543         ecMap.put(keys.iterator().next(), "new-value");
544
545         // Check the key set is still the same
546         assertEquals(keys, ecMap.keySet());
547
548         // Remove a key
549         String removeKey = keys.iterator().next();
550         keys.remove(removeKey);
551         ecMap.remove(removeKey);
552
553         // Check the key set is still correct
554         assertEquals(keys, ecMap.keySet());
555     }
556
557     @Test
558     public void testValues() throws Exception {
559         expectPeerMessage(clusterCommunicator);
560
561         assertTrue(ecMap.values().isEmpty());
562
563         // Generate some values
564         Map<String, String> expectedValues = new HashMap<>();
565         for (int i = 1; i <= 10; i++) {
566             expectedValues.put("" + i, "value" + i);
567         }
568
569         // Add them into the map
570         expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
571
572         // Check the values collection is correct
573         assertEquals(expectedValues.values().size(), ecMap.values().size());
574         expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
575
576         // Update the value for one of the keys
577         Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
578         expectedValues.put(first.getKey(), "new-value");
579         ecMap.put(first.getKey(), "new-value");
580
581         // Check the values collection is still correct
582         assertEquals(expectedValues.values().size(), ecMap.values().size());
583         expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
584
585         // Remove a key
586         String removeKey = expectedValues.keySet().iterator().next();
587         expectedValues.remove(removeKey);
588         ecMap.remove(removeKey);
589
590         // Check the values collection is still correct
591         assertEquals(expectedValues.values().size(), ecMap.values().size());
592         expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
593     }
594
595     @Test
596     public void testEntrySet() throws Exception {
597         expectPeerMessage(clusterCommunicator);
598
599         assertTrue(ecMap.entrySet().isEmpty());
600
601         // Generate some values
602         Map<String, String> expectedValues = new HashMap<>();
603         for (int i = 1; i <= 10; i++) {
604             expectedValues.put("" + i, "value" + i);
605         }
606
607         // Add them into the map
608         expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
609
610         // Check the entry set is correct
611         assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
612
613         // Update the value for one of the keys
614         Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
615         expectedValues.put(first.getKey(), "new-value");
616         ecMap.put(first.getKey(), "new-value");
617
618         // Check the entry set is still correct
619         assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
620
621         // Remove a key
622         String removeKey = expectedValues.keySet().iterator().next();
623         expectedValues.remove(removeKey);
624         ecMap.remove(removeKey);
625
626         // Check the entry set is still correct
627         assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
628     }
629
630     private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
631         if (expectedMap.entrySet().size() != actual.size()) {
632             return false;
633         }
634
635         for (Map.Entry<String, String> e : actual) {
636             if (!expectedMap.containsKey(e.getKey())) {
637                 return false;
638             }
639             if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
640                 return false;
641             }
642         }
643         return true;
644     }
645
646     @Test
647     public void testDestroy() throws Exception {
648         clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
649         clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
650
651         replay(clusterCommunicator);
652
653         ecMap.destroy();
654
655         verify(clusterCommunicator);
656
657         try {
658             ecMap.get(KEY1);
659             fail("get after destroy should throw exception");
660         } catch (IllegalStateException e) {
661             assertTrue(true);
662         }
663
664         try {
665             ecMap.put(KEY1, VALUE1);
666             fail("put after destroy should throw exception");
667         } catch (IllegalStateException e) {
668             assertTrue(true);
669         }
670     }
671
672     private UpdateEntry<String, String> generatePutMessage(String key, String value, Timestamp timestamp) {
673         return new UpdateEntry<>(key, new MapValue<>(value, timestamp));
674     }
675
676     private List<UpdateEntry<String, String>> generatePutMessage(
677             String key1, String value1, String key2, String value2) {
678         List<UpdateEntry<String, String>> list = new ArrayList<>();
679
680         Timestamp timestamp1 = clockService.peek(1);
681         Timestamp timestamp2 = clockService.peek(2);
682
683         list.add(generatePutMessage(key1, value1, timestamp1));
684         list.add(generatePutMessage(key2, value2, timestamp2));
685
686         return list;
687     }
688
689     private UpdateEntry<String, String> generateRemoveMessage(String key, Timestamp timestamp) {
690         return new UpdateEntry<>(key, new MapValue<>(null, timestamp));
691     }
692
693     private List<UpdateEntry<String, String>> generateRemoveMessage(String key1, String key2) {
694         List<UpdateEntry<String, String>> list = new ArrayList<>();
695
696         Timestamp timestamp1 = clockService.peek(1);
697         Timestamp timestamp2 = clockService.peek(2);
698
699         list.add(generateRemoveMessage(key1, timestamp1));
700         list.add(generateRemoveMessage(key2, timestamp2));
701
702         return list;
703     }
704
705     /**
706      * Sets up a mock ClusterCommunicationService to expect a specific cluster
707      * message to be broadcast to the cluster.
708      *
709      * @param message message we expect to be sent
710      * @param clusterCommunicator a mock ClusterCommunicationService to set up
711      */
712     //FIXME rename
713     private static <T> void expectSpecificBroadcastMessage(
714             T message,
715             MessageSubject subject,
716             ClusterCommunicationService clusterCommunicator) {
717         reset(clusterCommunicator);
718         clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
719         expectLastCall().anyTimes();
720         replay(clusterCommunicator);
721     }
722
723     /**
724      * Sets up a mock ClusterCommunicationService to expect a specific cluster
725      * message to be multicast to the cluster.
726      *
727      * @param message message we expect to be sent
728      * @param subject subject we expect to be sent to
729      * @param clusterCommunicator a mock ClusterCommunicationService to set up
730      */
731     //FIXME rename
732     private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
733                            ClusterCommunicationService clusterCommunicator) {
734         reset(clusterCommunicator);
735         clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
736         expectLastCall().anyTimes();
737         replay(clusterCommunicator);
738     }
739
740
741     /**
742      * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
743      * that is sent to it. This is useful for unit tests where we aren't
744      * interested in testing the messaging component.
745      *
746      * @param clusterCommunicator a mock ClusterCommunicationService to set up
747      */
748     //FIXME rename
749     private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
750         reset(clusterCommunicator);
751 //        expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
752 //                                             anyObject(Iterable.class)))
753         expect(clusterCommunicator.<T>unicast(
754                     anyObject(),
755                     anyObject(MessageSubject.class),
756                     anyObject(Function.class),
757                     anyObject(NodeId.class)))
758                 .andReturn(CompletableFuture.completedFuture(null))
759                 .anyTimes();
760         replay(clusterCommunicator);
761     }
762
763     /**
764      * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
765      * that is sent to it. This is useful for unit tests where we aren't
766      * interested in testing the messaging component.
767      *
768      * @param clusterCommunicator a mock ClusterCommunicationService to set up
769      */
770     private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
771         reset(clusterCommunicator);
772         clusterCommunicator.<AbstractEvent>multicast(
773                 anyObject(AbstractEvent.class),
774                 anyObject(MessageSubject.class),
775                 anyObject(Function.class),
776                 anyObject(Set.class));
777         expectLastCall().anyTimes();
778         replay(clusterCommunicator);
779     }
780
781     /**
782      * ClusterCommunicationService implementation that the map's addSubscriber
783      * call will delegate to. This means we can get a reference to the
784      * internal cluster message handler used by the map, so that we can simulate
785      * events coming in from other instances.
786      */
787     private final class TestClusterCommunicationService
788             extends ClusterCommunicationServiceAdapter {
789
790         @Override
791         public <M> void addSubscriber(MessageSubject subject,
792                 Function<byte[], M> decoder, Consumer<M> handler,
793                 Executor executor) {
794             if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
795                 updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
796             } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
797                 antiEntropyHandler = (Consumer<AntiEntropyAdvertisement<String>>) handler;
798             } else {
799                 throw new RuntimeException("Unexpected message subject " + subject.toString());
800             }
801         }
802     }
803
804     /**
805      * ClockService implementation that gives out timestamps based on a
806      * sequential counter. This clock service enables more control over the
807      * timestamps that are given out, including being able to "turn back time"
808      * to give out timestamps from the past.
809      *
810      * @param <T> Type that the clock service will give out timestamps for
811      * @param <U> Second type that the clock service will give out values for
812      */
813     private class SequentialClockService<T, U> {
814
815         private static final long INITIAL_VALUE = 1;
816         private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
817
818         public Timestamp getTimestamp(T object, U object2) {
819             return new TestTimestamp(counter.getAndIncrement());
820         }
821
822         /**
823          * Returns what the next timestamp will be without consuming the
824          * timestamp. This allows test code to set expectations correctly while
825          * still allowing the CUT to get the same timestamp.
826          *
827          * @return timestamp equal to the timestamp that will be returned by the
828          * next call to {@link #getTimestamp(T, U)}.
829          */
830         public Timestamp peekAtNextTimestamp() {
831             return peek(1);
832         }
833
834         /**
835          * Returns the ith timestamp to be given out in the future without
836          * consuming the timestamp. For example, i=1 returns the next timestamp,
837          * i=2 returns the timestamp after that, and so on.
838          *
839          * @param i number of the timestamp to peek at
840          * @return the ith timestamp that will be given out
841          */
842         public Timestamp peek(int i) {
843             checkArgument(i > 0, "i must be a positive integer");
844
845             return new TestTimestamp(counter.get() + i - 1);
846         }
847
848         /**
849          * Turns the clock back two ticks, so the next call to getTimestamp will
850          * return an older timestamp than the previous call to getTimestamp.
851          */
852         public void turnBackTime() {
853             // Not atomic, but should be OK for these tests.
854             counter.decrementAndGet();
855             counter.decrementAndGet();
856         }
857
858     }
859
860     /**
861      * Timestamp implementation where the value of the timestamp can be
862      * specified explicitly at creation time.
863      */
864     private class TestTimestamp implements Timestamp {
865
866         private final long timestamp;
867
868         /**
869          * Creates a new timestamp that has the specified value.
870          *
871          * @param timestamp value of the timestamp
872          */
873         public TestTimestamp(long timestamp) {
874             this.timestamp = timestamp;
875         }
876
877         @Override
878         public int compareTo(Timestamp o) {
879             checkArgument(o instanceof TestTimestamp);
880             TestTimestamp otherTimestamp = (TestTimestamp) o;
881             return ComparisonChain.start()
882                     .compare(this.timestamp, otherTimestamp.timestamp)
883                     .result();
884         }
885     }
886
887     /**
888      * EventuallyConsistentMapListener implementation which triggers a latch
889      * when it receives an event.
890      */
891     private class TestListener implements EventuallyConsistentMapListener<String, String> {
892         private CountDownLatch latch;
893
894         /**
895          * Creates a new listener that will trigger the specified latch when it
896          * receives and event.
897          *
898          * @param latch the latch to trigger on events
899          */
900         public TestListener(CountDownLatch latch) {
901             this.latch = latch;
902         }
903
904         @Override
905         public void event(EventuallyConsistentMapEvent<String, String> event) {
906             latch.countDown();
907         }
908     }
909 }