cf48dcb804e2c8fb820fdc7b13447be3ba7f5144
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.store.group.impl;
17
18 import com.google.common.collect.FluentIterable;
19 import com.google.common.collect.Iterables;
20 import com.google.common.collect.Sets;
21
22 import org.apache.felix.scr.annotations.Activate;
23 import org.apache.felix.scr.annotations.Component;
24 import org.apache.felix.scr.annotations.Deactivate;
25 import org.apache.felix.scr.annotations.Reference;
26 import org.apache.felix.scr.annotations.ReferenceCardinality;
27 import org.apache.felix.scr.annotations.Service;
28 import org.onlab.util.KryoNamespace;
29 import org.onlab.util.NewConcurrentHashMap;
30 import org.onosproject.cluster.ClusterService;
31 import org.onosproject.core.DefaultApplicationId;
32 import org.onosproject.core.DefaultGroupId;
33 import org.onosproject.core.GroupId;
34 import org.onosproject.mastership.MastershipService;
35 import org.onosproject.net.DeviceId;
36 import org.onosproject.net.MastershipRole;
37 import org.onosproject.net.PortNumber;
38 import org.onosproject.net.flow.DefaultTrafficTreatment;
39 import org.onosproject.net.flow.FlowRule;
40 import org.onosproject.net.flow.instructions.Instructions;
41 import org.onosproject.net.flow.instructions.L0ModificationInstruction;
42 import org.onosproject.net.flow.instructions.L2ModificationInstruction;
43 import org.onosproject.net.flow.instructions.L3ModificationInstruction;
44 import org.onosproject.net.group.DefaultGroup;
45 import org.onosproject.net.group.DefaultGroupBucket;
46 import org.onosproject.net.group.DefaultGroupDescription;
47 import org.onosproject.net.group.DefaultGroupKey;
48 import org.onosproject.net.group.Group;
49 import org.onosproject.net.group.Group.GroupState;
50 import org.onosproject.net.group.GroupBucket;
51 import org.onosproject.net.group.GroupBuckets;
52 import org.onosproject.net.group.GroupDescription;
53 import org.onosproject.net.group.GroupEvent;
54 import org.onosproject.net.group.GroupEvent.Type;
55 import org.onosproject.net.group.GroupKey;
56 import org.onosproject.net.group.GroupOperation;
57 import org.onosproject.net.group.GroupStore;
58 import org.onosproject.net.group.GroupStoreDelegate;
59 import org.onosproject.net.group.StoredGroupBucketEntry;
60 import org.onosproject.net.group.StoredGroupEntry;
61 import org.onosproject.store.AbstractStore;
62 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
63 import org.onosproject.store.service.MultiValuedTimestamp;
64 import org.onosproject.store.serializers.DeviceIdSerializer;
65 import org.onosproject.store.serializers.KryoNamespaces;
66 import org.onosproject.store.serializers.URISerializer;
67 import org.onosproject.store.service.EventuallyConsistentMap;
68 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
69 import org.onosproject.store.service.EventuallyConsistentMapEvent;
70 import org.onosproject.store.service.EventuallyConsistentMapListener;
71 import org.onosproject.store.service.StorageService;
72 import org.slf4j.Logger;
73
74 import java.net.URI;
75 import java.util.ArrayList;
76 import java.util.Collection;
77 import java.util.HashMap;
78 import java.util.Iterator;
79 import java.util.List;
80 import java.util.Objects;
81 import java.util.Optional;
82 import java.util.Set;
83 import java.util.concurrent.ConcurrentHashMap;
84 import java.util.concurrent.ConcurrentMap;
85 import java.util.concurrent.ExecutorService;
86 import java.util.concurrent.Executors;
87 import java.util.concurrent.atomic.AtomicInteger;
88 import java.util.concurrent.atomic.AtomicLong;
89 import java.util.stream.Collectors;
90
91 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
92 import static org.onlab.util.Tools.groupedThreads;
93 import static org.slf4j.LoggerFactory.getLogger;
94
95 /**
96  * Manages inventory of group entries using trivial in-memory implementation.
97  */
98 @Component(immediate = true)
99 @Service
100 public class DistributedGroupStore
101         extends AbstractStore<GroupEvent, GroupStoreDelegate>
102         implements GroupStore {
103
104     private final Logger log = getLogger(getClass());
105
106     private final int dummyId = 0xffffffff;
107     private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
108
109     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110     protected ClusterCommunicationService clusterCommunicator;
111
112     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113     protected ClusterService clusterService;
114
115     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116     protected StorageService storageService;
117
118     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119     protected MastershipService mastershipService;
120
121     // Per device group table with (device id + app cookie) as key
122     private EventuallyConsistentMap<GroupStoreKeyMapKey,
123         StoredGroupEntry> groupStoreEntriesByKey = null;
124     // Per device group table with (device id + group id) as key
125     private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
126         groupEntriesById = new ConcurrentHashMap<>();
127     private EventuallyConsistentMap<GroupStoreKeyMapKey,
128         StoredGroupEntry> auditPendingReqQueue = null;
129     private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
130             extraneousGroupEntriesById = new ConcurrentHashMap<>();
131     private ExecutorService messageHandlingExecutor;
132     private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
133
134     private final HashMap<DeviceId, Boolean> deviceAuditStatus =
135             new HashMap<DeviceId, Boolean>();
136
137     private final AtomicInteger groupIdGen = new AtomicInteger();
138
139     private KryoNamespace.Builder kryoBuilder = null;
140
141     private final AtomicLong sequenceNumber = new AtomicLong(0);
142
143     @Activate
144     public void activate() {
145         kryoBuilder = new KryoNamespace.Builder()
146             .register(DefaultGroup.class,
147                       DefaultGroupBucket.class,
148                       DefaultGroupDescription.class,
149                       DefaultGroupKey.class,
150                       GroupDescription.Type.class,
151                       Group.GroupState.class,
152                       GroupBuckets.class,
153                       DefaultGroupId.class,
154                       GroupStoreMessage.class,
155                       GroupStoreMessage.Type.class,
156                       UpdateType.class,
157                       GroupStoreMessageSubjects.class,
158                       MultiValuedTimestamp.class,
159                       GroupStoreKeyMapKey.class,
160                       GroupStoreIdMapKey.class,
161                       GroupStoreMapKey.class
162                     )
163             .register(new URISerializer(), URI.class)
164             .register(new DeviceIdSerializer(), DeviceId.class)
165             .register(PortNumber.class)
166             .register(DefaultApplicationId.class)
167             .register(DefaultTrafficTreatment.class,
168                       Instructions.DropInstruction.class,
169                       Instructions.OutputInstruction.class,
170                       Instructions.GroupInstruction.class,
171                       Instructions.TableTypeTransition.class,
172                       FlowRule.Type.class,
173                       L0ModificationInstruction.class,
174                       L0ModificationInstruction.L0SubType.class,
175                       L0ModificationInstruction.ModLambdaInstruction.class,
176                       L2ModificationInstruction.class,
177                       L2ModificationInstruction.L2SubType.class,
178                       L2ModificationInstruction.ModEtherInstruction.class,
179                       L2ModificationInstruction.PushHeaderInstructions.class,
180                       L2ModificationInstruction.ModVlanIdInstruction.class,
181                       L2ModificationInstruction.ModVlanPcpInstruction.class,
182                       L2ModificationInstruction.ModMplsLabelInstruction.class,
183                       L2ModificationInstruction.ModMplsTtlInstruction.class,
184                       L3ModificationInstruction.class,
185                       L3ModificationInstruction.L3SubType.class,
186                       L3ModificationInstruction.ModIPInstruction.class,
187                       L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
188                       L3ModificationInstruction.ModTtlInstruction.class,
189                       org.onlab.packet.MplsLabel.class
190                     )
191             .register(org.onosproject.cluster.NodeId.class)
192             .register(KryoNamespaces.BASIC)
193             .register(KryoNamespaces.MISC);
194
195         messageHandlingExecutor = Executors.
196                 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
197                                    groupedThreads("onos/store/group",
198                                                   "message-handlers"));
199
200         clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
201                 kryoBuilder.build()::deserialize,
202                 this::process,
203                 messageHandlingExecutor);
204
205         log.debug("Creating EC map groupstorekeymap");
206         EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
207                 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
208
209         groupStoreEntriesByKey = keyMapBuilder
210                 .withName("groupstorekeymap")
211                 .withSerializer(kryoBuilder)
212                 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
213                                                                             sequenceNumber.getAndIncrement()))
214                 .build();
215         groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
216         log.debug("Current size of groupstorekeymap:{}",
217                   groupStoreEntriesByKey.size());
218
219         log.debug("Creating EC map pendinggroupkeymap");
220         EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
221                 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
222
223         auditPendingReqQueue = auditMapBuilder
224                 .withName("pendinggroupkeymap")
225                 .withSerializer(kryoBuilder)
226                 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
227                                                                             sequenceNumber.getAndIncrement()))
228                 .build();
229         log.debug("Current size of pendinggroupkeymap:{}",
230                   auditPendingReqQueue.size());
231
232         log.info("Started");
233     }
234
235     @Deactivate
236     public void deactivate() {
237         groupStoreEntriesByKey.destroy();
238         auditPendingReqQueue.destroy();
239         log.info("Stopped");
240     }
241
242     private static NewConcurrentHashMap<GroupId, Group>
243         lazyEmptyExtraneousGroupIdTable() {
244         return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
245     }
246
247     private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
248         lazyEmptyGroupIdTable() {
249         return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
250     }
251
252     /**
253      * Returns the group store eventual consistent key map.
254      *
255      * @return Map representing group key table.
256      */
257     private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
258         getGroupStoreKeyMap() {
259         return groupStoreEntriesByKey;
260     }
261
262     /**
263      * Returns the group id table for specified device.
264      *
265      * @param deviceId identifier of the device
266      * @return Map representing group key table of given device.
267      */
268     private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
269         return createIfAbsentUnchecked(groupEntriesById,
270                                        deviceId, lazyEmptyGroupIdTable());
271     }
272
273     /**
274      * Returns the pending group request table.
275      *
276      * @return Map representing group key table.
277      */
278     private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
279         getPendingGroupKeyTable() {
280         return auditPendingReqQueue;
281     }
282
283     /**
284      * Returns the extraneous group id table for specified device.
285      *
286      * @param deviceId identifier of the device
287      * @return Map representing group key table of given device.
288      */
289     private ConcurrentMap<GroupId, Group>
290     getExtraneousGroupIdTable(DeviceId deviceId) {
291         return createIfAbsentUnchecked(extraneousGroupEntriesById,
292                                        deviceId,
293                                        lazyEmptyExtraneousGroupIdTable());
294     }
295
296     /**
297      * Returns the number of groups for the specified device in the store.
298      *
299      * @return number of groups for the specified device
300      */
301     @Override
302     public int getGroupCount(DeviceId deviceId) {
303         return (getGroups(deviceId) != null) ?
304                          Iterables.size(getGroups(deviceId)) : 0;
305     }
306
307     /**
308      * Returns the groups associated with a device.
309      *
310      * @param deviceId the device ID
311      *
312      * @return the group entries
313      */
314     @Override
315     public Iterable<Group> getGroups(DeviceId deviceId) {
316         // flatten and make iterator unmodifiable
317         log.debug("getGroups: for device {} total number of groups {}",
318                   deviceId, getGroupStoreKeyMap().values().size());
319         return FluentIterable.from(getGroupStoreKeyMap().values())
320                 .filter(input -> input.deviceId().equals(deviceId))
321                 .transform(input -> input);
322     }
323
324     private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
325         // flatten and make iterator unmodifiable
326         log.debug("getGroups: for device {} total number of groups {}",
327                   deviceId, getGroupStoreKeyMap().values().size());
328         return FluentIterable.from(getGroupStoreKeyMap().values())
329                 .filter(input -> input.deviceId().equals(deviceId));
330     }
331
332     /**
333      * Returns the stored group entry.
334      *
335      * @param deviceId the device ID
336      * @param appCookie the group key
337      *
338      * @return a group associated with the key
339      */
340     @Override
341     public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
342         return getStoredGroupEntry(deviceId, appCookie);
343     }
344
345     private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
346                                                  GroupKey appCookie) {
347         return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
348                                                                  appCookie));
349     }
350
351     @Override
352     public Group getGroup(DeviceId deviceId, GroupId groupId) {
353         return getStoredGroupEntry(deviceId, groupId);
354     }
355
356     private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
357                                                  GroupId groupId) {
358         return getGroupIdTable(deviceId).get(groupId);
359     }
360
361     private int getFreeGroupIdValue(DeviceId deviceId) {
362         int freeId = groupIdGen.incrementAndGet();
363
364         while (true) {
365             Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
366             if (existing == null) {
367                 existing = (
368                         extraneousGroupEntriesById.get(deviceId) != null) ?
369                         extraneousGroupEntriesById.get(deviceId).
370                                 get(new DefaultGroupId(freeId)) :
371                         null;
372             }
373             if (existing != null) {
374                 freeId = groupIdGen.incrementAndGet();
375             } else {
376                 break;
377             }
378         }
379         log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
380         return freeId;
381     }
382
383     /**
384      * Stores a new group entry using the information from group description.
385      *
386      * @param groupDesc group description to be used to create group entry
387      */
388     @Override
389     public void storeGroupDescription(GroupDescription groupDesc) {
390         log.debug("In storeGroupDescription");
391         // Check if a group is existing with the same key
392         if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
393             log.warn("Group already exists with the same key {}",
394                      groupDesc.appCookie());
395             return;
396         }
397
398         // Check if group to be created by a remote instance
399         if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
400             log.debug("storeGroupDescription: Device {} local role is not MASTER",
401                       groupDesc.deviceId());
402             if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
403                 log.error("No Master for device {}..."
404                         + "Can not perform add group operation",
405                         groupDesc.deviceId());
406                 //TODO: Send Group operation failure event
407                 return;
408             }
409             GroupStoreMessage groupOp = GroupStoreMessage.
410                     createGroupAddRequestMsg(groupDesc.deviceId(),
411                                              groupDesc);
412
413             clusterCommunicator.unicast(groupOp,
414                     GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
415                     m -> kryoBuilder.build().serialize(m),
416                     mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
417                         if (error != null) {
418                             log.warn("Failed to send request to master: {} to {}",
419                                     groupOp,
420                                     mastershipService.getMasterFor(groupDesc.deviceId()));
421                             //TODO: Send Group operation failure event
422                         } else {
423                             log.debug("Sent Group operation request for device {} "
424                                     + "to remote MASTER {}",
425                                     groupDesc.deviceId(),
426                                     mastershipService.getMasterFor(groupDesc.deviceId()));
427                         }
428                     });
429             return;
430         }
431
432         log.debug("Store group for device {} is getting handled locally",
433                   groupDesc.deviceId());
434         storeGroupDescriptionInternal(groupDesc);
435     }
436
437     private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
438         ConcurrentMap<GroupId, Group> extraneousMap =
439                 extraneousGroupEntriesById.get(deviceId);
440         if (extraneousMap == null) {
441             return null;
442         }
443         return extraneousMap.get(new DefaultGroupId(groupId));
444     }
445
446     private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
447                                                       GroupBuckets buckets) {
448         ConcurrentMap<GroupId, Group> extraneousMap =
449                 extraneousGroupEntriesById.get(deviceId);
450         if (extraneousMap == null) {
451             return null;
452         }
453
454         for (Group extraneousGroup:extraneousMap.values()) {
455             if (extraneousGroup.buckets().equals(buckets)) {
456                 return extraneousGroup;
457             }
458         }
459         return null;
460     }
461
462     private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
463         // Check if a group is existing with the same key
464         if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
465             return;
466         }
467
468         if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
469             // Device group audit has not completed yet
470             // Add this group description to pending group key table
471             // Create a group entry object with Dummy Group ID
472             log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
473                     groupDesc.deviceId());
474             StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
475             group.setState(GroupState.WAITING_AUDIT_COMPLETE);
476             EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
477                     getPendingGroupKeyTable();
478             pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
479                                                         groupDesc.appCookie()),
480                                 group);
481             return;
482         }
483
484         Group matchingExtraneousGroup = null;
485         if (groupDesc.givenGroupId() != null) {
486             //Check if there is a extraneous group existing with the same Id
487             matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
488                                 groupDesc.deviceId(), groupDesc.givenGroupId());
489             if (matchingExtraneousGroup != null) {
490                 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {} for group id {}",
491                           groupDesc.deviceId(),
492                           groupDesc.givenGroupId());
493                 //Check if the group buckets matches with user provided buckets
494                 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
495                     //Group is already existing with the same buckets and Id
496                     // Create a group entry object
497                     log.debug("storeGroupDescriptionInternal: Buckets also matching in Device {} for group id {}",
498                               groupDesc.deviceId(),
499                               groupDesc.givenGroupId());
500                     StoredGroupEntry group = new DefaultGroup(
501                               matchingExtraneousGroup.id(), groupDesc);
502                     // Insert the newly created group entry into key and id maps
503                     getGroupStoreKeyMap().
504                         put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
505                                                     groupDesc.appCookie()), group);
506                     // Ensure it also inserted into group id based table to
507                     // avoid any chances of duplication in group id generation
508                     getGroupIdTable(groupDesc.deviceId()).
509                         put(matchingExtraneousGroup.id(), group);
510                     addOrUpdateGroupEntry(matchingExtraneousGroup);
511                     removeExtraneousGroupEntry(matchingExtraneousGroup);
512                     return;
513                 } else {
514                     //Group buckets are not matching. Update group
515                     //with user provided buckets.
516                     //TODO
517                     log.debug("storeGroupDescriptionInternal: Buckets are not matching in Device {} for group id {}",
518                               groupDesc.deviceId(),
519                               groupDesc.givenGroupId());
520                 }
521             }
522         } else {
523             //Check if there is an extraneous group with user provided buckets
524             matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
525                                         groupDesc.deviceId(), groupDesc.buckets());
526             if (matchingExtraneousGroup != null) {
527                 //Group is already existing with the same buckets.
528                 //So reuse this group.
529                 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
530                           groupDesc.deviceId());
531                 //Create a group entry object
532                 StoredGroupEntry group = new DefaultGroup(
533                          matchingExtraneousGroup.id(), groupDesc);
534                 // Insert the newly created group entry into key and id maps
535                 getGroupStoreKeyMap().
536                     put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
537                                                 groupDesc.appCookie()), group);
538                 // Ensure it also inserted into group id based table to
539                 // avoid any chances of duplication in group id generation
540                 getGroupIdTable(groupDesc.deviceId()).
541                     put(matchingExtraneousGroup.id(), group);
542                 addOrUpdateGroupEntry(matchingExtraneousGroup);
543                 removeExtraneousGroupEntry(matchingExtraneousGroup);
544                 return;
545             } else {
546                 //TODO: Check if there are any empty groups that can be used here
547                 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
548                           groupDesc.deviceId());
549             }
550         }
551
552         GroupId id = null;
553         if (groupDesc.givenGroupId() == null) {
554             // Get a new group identifier
555             id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
556         } else {
557             id = new DefaultGroupId(groupDesc.givenGroupId());
558         }
559         // Create a group entry object
560         StoredGroupEntry group = new DefaultGroup(id, groupDesc);
561         // Insert the newly created group entry into key and id maps
562         getGroupStoreKeyMap().
563             put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
564                                         groupDesc.appCookie()), group);
565         // Ensure it also inserted into group id based table to
566         // avoid any chances of duplication in group id generation
567         getGroupIdTable(groupDesc.deviceId()).
568             put(id, group);
569         log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
570                 id,
571                 groupDesc.deviceId());
572         notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
573                                       group));
574     }
575
576     /**
577      * Updates the existing group entry with the information
578      * from group description.
579      *
580      * @param deviceId the device ID
581      * @param oldAppCookie the current group key
582      * @param type update type
583      * @param newBuckets group buckets for updates
584      * @param newAppCookie optional new group key
585      */
586     @Override
587     public void updateGroupDescription(DeviceId deviceId,
588                                        GroupKey oldAppCookie,
589                                        UpdateType type,
590                                        GroupBuckets newBuckets,
591                                        GroupKey newAppCookie) {
592         // Check if group update to be done by a remote instance
593         if (mastershipService.getMasterFor(deviceId) != null &&
594                 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
595             log.debug("updateGroupDescription: Device {} local role is not MASTER",
596                       deviceId);
597             if (mastershipService.getMasterFor(deviceId) == null) {
598                 log.error("No Master for device {}..."
599                         + "Can not perform update group operation",
600                         deviceId);
601                 //TODO: Send Group operation failure event
602                 return;
603             }
604             GroupStoreMessage groupOp = GroupStoreMessage.
605                     createGroupUpdateRequestMsg(deviceId,
606                                                 oldAppCookie,
607                                                 type,
608                                                 newBuckets,
609                                                 newAppCookie);
610
611             clusterCommunicator.unicast(groupOp,
612                     GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
613                     m -> kryoBuilder.build().serialize(m),
614                     mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
615                         if (error !=  null) {
616                             log.warn("Failed to send request to master: {} to {}",
617                                     groupOp,
618                                     mastershipService.getMasterFor(deviceId), error);
619                         }
620                         //TODO: Send Group operation failure event
621                     });
622             return;
623         }
624         log.debug("updateGroupDescription for device {} is getting handled locally",
625                   deviceId);
626         updateGroupDescriptionInternal(deviceId,
627                                        oldAppCookie,
628                                        type,
629                                        newBuckets,
630                                        newAppCookie);
631     }
632
633     private void updateGroupDescriptionInternal(DeviceId deviceId,
634                                        GroupKey oldAppCookie,
635                                        UpdateType type,
636                                        GroupBuckets newBuckets,
637                                        GroupKey newAppCookie) {
638         // Check if a group is existing with the provided key
639         Group oldGroup = getGroup(deviceId, oldAppCookie);
640         if (oldGroup == null) {
641             log.warn("updateGroupDescriptionInternal: Group not found...strange");
642             return;
643         }
644
645         List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
646                                                                type,
647                                                                newBuckets);
648         if (newBucketList != null) {
649             // Create a new group object from the old group
650             GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
651             GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
652             GroupDescription updatedGroupDesc = new DefaultGroupDescription(
653                     oldGroup.deviceId(),
654                     oldGroup.type(),
655                     updatedBuckets,
656                     newCookie,
657                     oldGroup.givenGroupId(),
658                     oldGroup.appId());
659             StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
660                                                          updatedGroupDesc);
661             log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
662                     oldGroup.id(),
663                     oldGroup.deviceId(),
664                     oldGroup.state());
665             newGroup.setState(GroupState.PENDING_UPDATE);
666             newGroup.setLife(oldGroup.life());
667             newGroup.setPackets(oldGroup.packets());
668             newGroup.setBytes(oldGroup.bytes());
669             //Update the group entry in groupkey based map.
670             //Update to groupid based map will happen in the
671             //groupkey based map update listener
672             log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
673                       type);
674             getGroupStoreKeyMap().
675                 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
676                                             newGroup.appCookie()), newGroup);
677             notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
678         } else {
679             log.warn("updateGroupDescriptionInternal with type {}: No "
680                     + "change in the buckets in update", type);
681         }
682     }
683
684     private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
685                                                    UpdateType type,
686                                                    GroupBuckets buckets) {
687         GroupBuckets oldBuckets = oldGroup.buckets();
688         List<GroupBucket> newBucketList = new ArrayList<GroupBucket>(
689                 oldBuckets.buckets());
690         boolean groupDescUpdated = false;
691
692         if (type == UpdateType.ADD) {
693             // Check if the any of the new buckets are part of
694             // the old bucket list
695             for (GroupBucket addBucket:buckets.buckets()) {
696                 if (!newBucketList.contains(addBucket)) {
697                     newBucketList.add(addBucket);
698                     groupDescUpdated = true;
699                 }
700             }
701         } else if (type == UpdateType.REMOVE) {
702             // Check if the to be removed buckets are part of the
703             // old bucket list
704             for (GroupBucket removeBucket:buckets.buckets()) {
705                 if (newBucketList.contains(removeBucket)) {
706                     newBucketList.remove(removeBucket);
707                     groupDescUpdated = true;
708                 }
709             }
710         }
711
712         if (groupDescUpdated) {
713             return newBucketList;
714         } else {
715             return null;
716         }
717     }
718
719     /**
720      * Triggers deleting the existing group entry.
721      *
722      * @param deviceId the device ID
723      * @param appCookie the group key
724      */
725     @Override
726     public void deleteGroupDescription(DeviceId deviceId,
727                                        GroupKey appCookie) {
728         // Check if group to be deleted by a remote instance
729         if (mastershipService.
730                 getLocalRole(deviceId) != MastershipRole.MASTER) {
731             log.debug("deleteGroupDescription: Device {} local role is not MASTER",
732                       deviceId);
733             if (mastershipService.getMasterFor(deviceId) == null) {
734                 log.error("No Master for device {}..."
735                         + "Can not perform delete group operation",
736                         deviceId);
737                 //TODO: Send Group operation failure event
738                 return;
739             }
740             GroupStoreMessage groupOp = GroupStoreMessage.
741                     createGroupDeleteRequestMsg(deviceId,
742                                                 appCookie);
743
744             clusterCommunicator.unicast(groupOp,
745                     GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
746                     m -> kryoBuilder.build().serialize(m),
747                     mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
748                         if (error != null) {
749                             log.warn("Failed to send request to master: {} to {}",
750                                     groupOp,
751                                     mastershipService.getMasterFor(deviceId), error);
752                         }
753                         //TODO: Send Group operation failure event
754                     });
755             return;
756         }
757         log.debug("deleteGroupDescription in device {} is getting handled locally",
758                   deviceId);
759         deleteGroupDescriptionInternal(deviceId, appCookie);
760     }
761
762     private void deleteGroupDescriptionInternal(DeviceId deviceId,
763                                                 GroupKey appCookie) {
764         // Check if a group is existing with the provided key
765         StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
766         if (existing == null) {
767             return;
768         }
769
770         log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
771                 existing.id(),
772                 existing.deviceId(),
773                 existing.state());
774         synchronized (existing) {
775             existing.setState(GroupState.PENDING_DELETE);
776         }
777         log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
778                   deviceId);
779         notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
780     }
781
782     /**
783      * Stores a new group entry, or updates an existing entry.
784      *
785      * @param group group entry
786      */
787     @Override
788     public void addOrUpdateGroupEntry(Group group) {
789         // check if this new entry is an update to an existing entry
790         StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
791                                                         group.id());
792         GroupEvent event = null;
793
794         if (existing != null) {
795             log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}",
796                     group.id(),
797                     group.deviceId());
798             synchronized (existing) {
799                 for (GroupBucket bucket:group.buckets().buckets()) {
800                     Optional<GroupBucket> matchingBucket =
801                             existing.buckets().buckets()
802                             .stream()
803                             .filter((existingBucket)->(existingBucket.equals(bucket)))
804                             .findFirst();
805                     if (matchingBucket.isPresent()) {
806                         ((StoredGroupBucketEntry) matchingBucket.
807                                 get()).setPackets(bucket.packets());
808                         ((StoredGroupBucketEntry) matchingBucket.
809                                 get()).setBytes(bucket.bytes());
810                     } else {
811                         log.warn("addOrUpdateGroupEntry: No matching "
812                                 + "buckets to update stats");
813                     }
814                 }
815                 existing.setLife(group.life());
816                 existing.setPackets(group.packets());
817                 existing.setBytes(group.bytes());
818                 if ((existing.state() == GroupState.PENDING_ADD) ||
819                     (existing.state() == GroupState.PENDING_ADD_RETRY)) {
820                     log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
821                             existing.id(),
822                             existing.deviceId(),
823                             existing.state());
824                     existing.setState(GroupState.ADDED);
825                     existing.setIsGroupStateAddedFirstTime(true);
826                     event = new GroupEvent(Type.GROUP_ADDED, existing);
827                 } else {
828                     log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
829                             existing.id(),
830                             existing.deviceId(),
831                             GroupState.PENDING_UPDATE);
832                     existing.setState(GroupState.ADDED);
833                     existing.setIsGroupStateAddedFirstTime(false);
834                     event = new GroupEvent(Type.GROUP_UPDATED, existing);
835                 }
836                 //Re-PUT map entries to trigger map update events
837                 getGroupStoreKeyMap().
838                     put(new GroupStoreKeyMapKey(existing.deviceId(),
839                                                 existing.appCookie()), existing);
840             }
841         } else {
842             log.warn("addOrUpdateGroupEntry: Group update "
843                     + "happening for a non-existing entry in the map");
844         }
845
846         if (event != null) {
847             notifyDelegate(event);
848         }
849     }
850
851     /**
852      * Removes the group entry from store.
853      *
854      * @param group group entry
855      */
856     @Override
857     public void removeGroupEntry(Group group) {
858         StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
859                                                         group.id());
860
861         if (existing != null) {
862             log.debug("removeGroupEntry: removing group entry {} in device {}",
863                     group.id(),
864                     group.deviceId());
865             //Removal from groupid based map will happen in the
866             //map update listener
867             getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
868                                                                  existing.appCookie()));
869             notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
870         } else {
871             log.warn("removeGroupEntry for {} in device{} is "
872                     + "not existing in our maps",
873                     group.id(),
874                     group.deviceId());
875         }
876     }
877
878     @Override
879     public void deviceInitialAuditCompleted(DeviceId deviceId,
880                                             boolean completed) {
881         synchronized (deviceAuditStatus) {
882             if (completed) {
883                 log.debug("AUDIT completed for device {}",
884                           deviceId);
885                 deviceAuditStatus.put(deviceId, true);
886                 // Execute all pending group requests
887                 List<StoredGroupEntry> pendingGroupRequests =
888                         getPendingGroupKeyTable().values()
889                         .stream()
890                         .filter(g-> g.deviceId().equals(deviceId))
891                         .collect(Collectors.toList());
892                 log.debug("processing pending group add requests for device {} and number of pending requests {}",
893                         deviceId,
894                         pendingGroupRequests.size());
895                 for (Group group:pendingGroupRequests) {
896                     GroupDescription tmp = new DefaultGroupDescription(
897                             group.deviceId(),
898                             group.type(),
899                             group.buckets(),
900                             group.appCookie(),
901                             group.givenGroupId(),
902                             group.appId());
903                     storeGroupDescriptionInternal(tmp);
904                     getPendingGroupKeyTable().
905                         remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
906                 }
907             } else {
908                 Boolean audited = deviceAuditStatus.get(deviceId);
909                 if (audited != null && audited) {
910                     log.debug("Clearing AUDIT status for device {}", deviceId);
911                     deviceAuditStatus.put(deviceId, false);
912                 }
913             }
914         }
915     }
916
917     @Override
918     public boolean deviceInitialAuditStatus(DeviceId deviceId) {
919         synchronized (deviceAuditStatus) {
920             Boolean audited = deviceAuditStatus.get(deviceId);
921             return audited != null && audited;
922         }
923     }
924
925     @Override
926     public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
927
928         StoredGroupEntry existing = getStoredGroupEntry(deviceId,
929                                                         operation.groupId());
930
931         if (existing == null) {
932             log.warn("No group entry with ID {} found ", operation.groupId());
933             return;
934         }
935
936         log.warn("groupOperationFailed: group operation {} failed"
937                 + "for group {} in device {}",
938                 operation.opType(),
939                 existing.id(),
940                 existing.deviceId());
941         switch (operation.opType()) {
942             case ADD:
943                 if (existing.state() == GroupState.PENDING_ADD) {
944                     //TODO: Need to add support for passing the group
945                     //operation failure reason from group provider.
946                     //If the error type is anything other than GROUP_EXISTS,
947                     //then the GROUP_ADD_FAILED event should be raised even
948                     //in PENDING_ADD_RETRY state also.
949                     notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
950                     log.warn("groupOperationFailed: cleaningup "
951                             + "group {} from store in device {}....",
952                             existing.id(),
953                             existing.deviceId());
954                     //Removal from groupid based map will happen in the
955                     //map update listener
956                     getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
957                                                                          existing.appCookie()));
958                 }
959                 break;
960             case MODIFY:
961                 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
962                 break;
963             case DELETE:
964                 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
965                 break;
966             default:
967                 log.warn("Unknown group operation type {}", operation.opType());
968         }
969     }
970
971     @Override
972     public void addOrUpdateExtraneousGroupEntry(Group group) {
973         log.debug("add/update extraneous group entry {} in device {}",
974                 group.id(),
975                 group.deviceId());
976         ConcurrentMap<GroupId, Group> extraneousIdTable =
977                 getExtraneousGroupIdTable(group.deviceId());
978         extraneousIdTable.put(group.id(), group);
979         // Don't remove the extraneous groups, instead re-use it when
980         // a group request comes with the same set of buckets
981     }
982
983     @Override
984     public void removeExtraneousGroupEntry(Group group) {
985         log.debug("remove extraneous group entry {} of device {} from store",
986                 group.id(),
987                 group.deviceId());
988         ConcurrentMap<GroupId, Group> extraneousIdTable =
989                 getExtraneousGroupIdTable(group.deviceId());
990         extraneousIdTable.remove(group.id());
991     }
992
993     @Override
994     public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
995         // flatten and make iterator unmodifiable
996         return FluentIterable.from(
997                 getExtraneousGroupIdTable(deviceId).values());
998     }
999
1000     /**
1001      * Map handler to receive any events when the group key map is updated.
1002      */
1003     private class GroupStoreKeyMapListener implements
1004             EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
1005
1006         @Override
1007         public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
1008                                   StoredGroupEntry> mapEvent) {
1009             GroupEvent groupEvent = null;
1010             GroupStoreKeyMapKey key = mapEvent.key();
1011             StoredGroupEntry group = mapEvent.value();
1012             if ((key == null) && (group == null)) {
1013                 log.error("GroupStoreKeyMapListener: Received "
1014                         + "event {} with null entry", mapEvent.type());
1015                 return;
1016             } else if (group == null) {
1017                 group = getGroupIdTable(key.deviceId()).values()
1018                         .stream()
1019                         .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
1020                         .findFirst().get();
1021                 if (group == null) {
1022                     log.error("GroupStoreKeyMapListener: Received "
1023                             + "event {} with null entry... can not process", mapEvent.type());
1024                     return;
1025                 }
1026             }
1027             log.trace("received groupid map event {} for id {} in device {}",
1028                       mapEvent.type(),
1029                       group.id(),
1030                       key.deviceId());
1031             if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
1032                 // Update the group ID table
1033                 getGroupIdTable(group.deviceId()).put(group.id(), group);
1034                 if (mapEvent.value().state() == Group.GroupState.ADDED) {
1035                     if (mapEvent.value().isGroupStateAddedFirstTime()) {
1036                         groupEvent = new GroupEvent(Type.GROUP_ADDED,
1037                                                     mapEvent.value());
1038                         log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
1039                                 group.id(),
1040                                 group.deviceId());
1041                     } else {
1042                         groupEvent = new GroupEvent(Type.GROUP_UPDATED,
1043                                                     mapEvent.value());
1044                         log.trace("Received following GROUP_ADDED state update for id {} in device {}",
1045                                 group.id(),
1046                                 group.deviceId());
1047                     }
1048                 }
1049             } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
1050                 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
1051                 // Remove the entry from the group ID table
1052                 getGroupIdTable(group.deviceId()).remove(group.id(), group);
1053             }
1054
1055             if (groupEvent != null) {
1056                 notifyDelegate(groupEvent);
1057             }
1058         }
1059     }
1060
1061     private void process(GroupStoreMessage groupOp) {
1062         log.debug("Received remote group operation {} request for device {}",
1063                 groupOp.type(),
1064                 groupOp.deviceId());
1065       if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1066           log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1067           return;
1068       }
1069       if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1070           storeGroupDescriptionInternal(groupOp.groupDesc());
1071       } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1072           updateGroupDescriptionInternal(groupOp.deviceId(),
1073                                          groupOp.appCookie(),
1074                                          groupOp.updateType(),
1075                                          groupOp.updateBuckets(),
1076                                          groupOp.newAppCookie());
1077       } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1078           deleteGroupDescriptionInternal(groupOp.deviceId(),
1079                                          groupOp.appCookie());
1080       }
1081     }
1082
1083     /**
1084      * Flattened map key to be used to store group entries.
1085      */
1086     protected static class GroupStoreMapKey {
1087         private final DeviceId deviceId;
1088
1089         public GroupStoreMapKey(DeviceId deviceId) {
1090             this.deviceId = deviceId;
1091         }
1092
1093         public DeviceId deviceId() {
1094             return deviceId;
1095         }
1096
1097         @Override
1098         public boolean equals(Object o) {
1099             if (this == o) {
1100                 return true;
1101             }
1102             if (!(o instanceof GroupStoreMapKey)) {
1103                 return false;
1104             }
1105             GroupStoreMapKey that = (GroupStoreMapKey) o;
1106             return this.deviceId.equals(that.deviceId);
1107         }
1108
1109         @Override
1110         public int hashCode() {
1111             int result = 17;
1112
1113             result = 31 * result + Objects.hash(this.deviceId);
1114
1115             return result;
1116         }
1117     }
1118
1119     protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
1120         private final GroupKey appCookie;
1121         public GroupStoreKeyMapKey(DeviceId deviceId,
1122                                    GroupKey appCookie) {
1123             super(deviceId);
1124             this.appCookie = appCookie;
1125         }
1126
1127         @Override
1128         public boolean equals(Object o) {
1129             if (this == o) {
1130                 return true;
1131             }
1132             if (!(o instanceof GroupStoreKeyMapKey)) {
1133                 return false;
1134             }
1135             GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1136             return (super.equals(that) &&
1137                     this.appCookie.equals(that.appCookie));
1138         }
1139
1140         @Override
1141         public int hashCode() {
1142             int result = 17;
1143
1144             result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1145
1146             return result;
1147         }
1148     }
1149
1150     protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
1151         private final GroupId groupId;
1152         public GroupStoreIdMapKey(DeviceId deviceId,
1153                                   GroupId groupId) {
1154             super(deviceId);
1155             this.groupId = groupId;
1156         }
1157
1158         @Override
1159         public boolean equals(Object o) {
1160             if (this == o) {
1161                 return true;
1162             }
1163             if (!(o instanceof GroupStoreIdMapKey)) {
1164                 return false;
1165             }
1166             GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1167             return (super.equals(that) &&
1168                     this.groupId.equals(that.groupId));
1169         }
1170
1171         @Override
1172         public int hashCode() {
1173             int result = 17;
1174
1175             result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1176
1177             return result;
1178         }
1179     }
1180
1181     @Override
1182     public void pushGroupMetrics(DeviceId deviceId,
1183                                  Collection<Group> groupEntries) {
1184         boolean deviceInitialAuditStatus =
1185                 deviceInitialAuditStatus(deviceId);
1186         Set<Group> southboundGroupEntries =
1187                 Sets.newHashSet(groupEntries);
1188         Set<StoredGroupEntry> storedGroupEntries =
1189                 Sets.newHashSet(getStoredGroups(deviceId));
1190         Set<Group> extraneousStoredEntries =
1191                 Sets.newHashSet(getExtraneousGroups(deviceId));
1192
1193         log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1194                   southboundGroupEntries.size(),
1195                   deviceId);
1196         for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
1197             Group group = it.next();
1198             log.trace("Group {} in device {}", group, deviceId);
1199         }
1200
1201         log.trace("Displaying all ({}) stored group entries for device {}",
1202                   storedGroupEntries.size(),
1203                   deviceId);
1204         for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
1205                 it1.hasNext();) {
1206             Group group = it1.next();
1207             log.trace("Stored Group {} for device {}", group, deviceId);
1208         }
1209
1210         for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1211             Group group = it2.next();
1212             if (storedGroupEntries.remove(group)) {
1213                 // we both have the group, let's update some info then.
1214                 log.trace("Group AUDIT: group {} exists in both planes for device {}",
1215                         group.id(), deviceId);
1216                 groupAdded(group);
1217                 it2.remove();
1218             }
1219         }
1220         for (Group group : southboundGroupEntries) {
1221             if (getGroup(group.deviceId(), group.id()) != null) {
1222                 // There is a group existing with the same id
1223                 // It is possible that group update is
1224                 // in progress while we got a stale info from switch
1225                 if (!storedGroupEntries.remove(getGroup(
1226                              group.deviceId(), group.id()))) {
1227                     log.warn("Group AUDIT: Inconsistent state:"
1228                             + "Group exists in ID based table while "
1229                             + "not present in key based table");
1230                 }
1231             } else {
1232                 // there are groups in the switch that aren't in the store
1233                 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
1234                         group.id(), deviceId);
1235                 extraneousStoredEntries.remove(group);
1236                 extraneousGroup(group);
1237             }
1238         }
1239         for (Group group : storedGroupEntries) {
1240             // there are groups in the store that aren't in the switch
1241             log.debug("Group AUDIT: group {} missing in data plane for device {}",
1242                     group.id(), deviceId);
1243             groupMissing(group);
1244         }
1245         for (Group group : extraneousStoredEntries) {
1246             // there are groups in the extraneous store that
1247             // aren't in the switch
1248             log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}",
1249                     group.id(), deviceId);
1250             removeExtraneousGroupEntry(group);
1251         }
1252
1253         if (!deviceInitialAuditStatus) {
1254             log.debug("Group AUDIT: Setting device {} initial AUDIT completed",
1255                       deviceId);
1256             deviceInitialAuditCompleted(deviceId, true);
1257         }
1258     }
1259
1260     private void groupMissing(Group group) {
1261         switch (group.state()) {
1262             case PENDING_DELETE:
1263                 log.debug("Group {} delete confirmation from device {}",
1264                           group, group.deviceId());
1265                 removeGroupEntry(group);
1266                 break;
1267             case ADDED:
1268             case PENDING_ADD:
1269             case PENDING_ADD_RETRY:
1270             case PENDING_UPDATE:
1271                 log.debug("Group {} is in store but not on device {}",
1272                           group, group.deviceId());
1273                 StoredGroupEntry existing =
1274                         getStoredGroupEntry(group.deviceId(), group.id());
1275                 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
1276                         existing.id(),
1277                         existing.deviceId(),
1278                         existing.state());
1279                 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
1280                 //Re-PUT map entries to trigger map update events
1281                 getGroupStoreKeyMap().
1282                     put(new GroupStoreKeyMapKey(existing.deviceId(),
1283                                                 existing.appCookie()), existing);
1284                 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1285                                               group));
1286                 break;
1287             default:
1288                 log.debug("Group {} has not been installed.", group);
1289                 break;
1290         }
1291     }
1292
1293     private void extraneousGroup(Group group) {
1294         log.debug("Group {} is on device {} but not in store.",
1295                   group, group.deviceId());
1296         addOrUpdateExtraneousGroupEntry(group);
1297     }
1298
1299     private void groupAdded(Group group) {
1300         log.trace("Group {} Added or Updated in device {}",
1301                   group, group.deviceId());
1302         addOrUpdateGroupEntry(group);
1303     }
1304 }