2 * Copyright 2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.store.group.impl;
18 import com.google.common.collect.FluentIterable;
19 import com.google.common.collect.Iterables;
20 import com.google.common.collect.Sets;
22 import org.apache.felix.scr.annotations.Activate;
23 import org.apache.felix.scr.annotations.Component;
24 import org.apache.felix.scr.annotations.Deactivate;
25 import org.apache.felix.scr.annotations.Reference;
26 import org.apache.felix.scr.annotations.ReferenceCardinality;
27 import org.apache.felix.scr.annotations.Service;
28 import org.onlab.util.KryoNamespace;
29 import org.onlab.util.NewConcurrentHashMap;
30 import org.onosproject.cluster.ClusterService;
31 import org.onosproject.core.DefaultGroupId;
32 import org.onosproject.core.GroupId;
33 import org.onosproject.mastership.MastershipService;
34 import org.onosproject.net.DeviceId;
35 import org.onosproject.net.MastershipRole;
36 import org.onosproject.net.group.DefaultGroup;
37 import org.onosproject.net.group.DefaultGroupBucket;
38 import org.onosproject.net.group.DefaultGroupDescription;
39 import org.onosproject.net.group.DefaultGroupKey;
40 import org.onosproject.net.group.Group;
41 import org.onosproject.net.group.Group.GroupState;
42 import org.onosproject.net.group.GroupBucket;
43 import org.onosproject.net.group.GroupBuckets;
44 import org.onosproject.net.group.GroupDescription;
45 import org.onosproject.net.group.GroupEvent;
46 import org.onosproject.net.group.GroupEvent.Type;
47 import org.onosproject.net.group.GroupKey;
48 import org.onosproject.net.group.GroupOperation;
49 import org.onosproject.net.group.GroupStore;
50 import org.onosproject.net.group.GroupStoreDelegate;
51 import org.onosproject.net.group.StoredGroupBucketEntry;
52 import org.onosproject.net.group.StoredGroupEntry;
53 import org.onosproject.store.AbstractStore;
54 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
55 import org.onosproject.store.service.MultiValuedTimestamp;
56 import org.onosproject.store.serializers.KryoNamespaces;
57 import org.onosproject.store.service.EventuallyConsistentMap;
58 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
59 import org.onosproject.store.service.EventuallyConsistentMapEvent;
60 import org.onosproject.store.service.EventuallyConsistentMapListener;
61 import org.onosproject.store.service.StorageService;
62 import org.slf4j.Logger;
64 import java.util.ArrayList;
65 import java.util.Collection;
66 import java.util.HashMap;
67 import java.util.Iterator;
68 import java.util.List;
69 import java.util.Objects;
70 import java.util.Optional;
72 import java.util.concurrent.ConcurrentHashMap;
73 import java.util.concurrent.ConcurrentMap;
74 import java.util.concurrent.ExecutorService;
75 import java.util.concurrent.Executors;
76 import java.util.concurrent.atomic.AtomicInteger;
77 import java.util.concurrent.atomic.AtomicLong;
78 import java.util.stream.Collectors;
80 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
81 import static org.onlab.util.Tools.groupedThreads;
82 import static org.slf4j.LoggerFactory.getLogger;
85 * Manages inventory of group entries using trivial in-memory implementation.
87 @Component(immediate = true)
89 public class DistributedGroupStore
90 extends AbstractStore<GroupEvent, GroupStoreDelegate>
91 implements GroupStore {
93 private final Logger log = getLogger(getClass());
95 private final int dummyId = 0xffffffff;
96 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
98 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99 protected ClusterCommunicationService clusterCommunicator;
101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 protected ClusterService clusterService;
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected StorageService storageService;
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected MastershipService mastershipService;
110 // Per device group table with (device id + app cookie) as key
111 private EventuallyConsistentMap<GroupStoreKeyMapKey,
112 StoredGroupEntry> groupStoreEntriesByKey = null;
113 // Per device group table with (device id + group id) as key
114 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
115 groupEntriesById = new ConcurrentHashMap<>();
116 private EventuallyConsistentMap<GroupStoreKeyMapKey,
117 StoredGroupEntry> auditPendingReqQueue = null;
118 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
119 extraneousGroupEntriesById = new ConcurrentHashMap<>();
120 private ExecutorService messageHandlingExecutor;
121 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
123 private final HashMap<DeviceId, Boolean> deviceAuditStatus = new HashMap<>();
125 private final AtomicInteger groupIdGen = new AtomicInteger();
127 private KryoNamespace.Builder kryoBuilder = null;
129 private final AtomicLong sequenceNumber = new AtomicLong(0);
131 private KryoNamespace clusterMsgSerializer;
134 public void activate() {
135 kryoBuilder = new KryoNamespace.Builder()
136 .register(KryoNamespaces.API)
137 .register(DefaultGroup.class,
138 DefaultGroupBucket.class,
139 DefaultGroupDescription.class,
140 DefaultGroupKey.class,
141 GroupDescription.Type.class,
142 Group.GroupState.class,
144 DefaultGroupId.class,
145 GroupStoreMessage.class,
146 GroupStoreMessage.Type.class,
148 GroupStoreMessageSubjects.class,
149 MultiValuedTimestamp.class,
150 GroupStoreKeyMapKey.class,
151 GroupStoreIdMapKey.class,
152 GroupStoreMapKey.class
155 clusterMsgSerializer = kryoBuilder.build();
157 messageHandlingExecutor = Executors.
158 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
159 groupedThreads("onos/store/group",
160 "message-handlers"));
162 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
163 clusterMsgSerializer::deserialize,
165 messageHandlingExecutor);
167 log.debug("Creating EC map groupstorekeymap");
168 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
169 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
171 groupStoreEntriesByKey = keyMapBuilder
172 .withName("groupstorekeymap")
173 .withSerializer(kryoBuilder)
174 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
175 sequenceNumber.getAndIncrement()))
177 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
178 log.debug("Current size of groupstorekeymap:{}",
179 groupStoreEntriesByKey.size());
181 log.debug("Creating EC map pendinggroupkeymap");
182 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
183 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
185 auditPendingReqQueue = auditMapBuilder
186 .withName("pendinggroupkeymap")
187 .withSerializer(kryoBuilder)
188 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
189 sequenceNumber.getAndIncrement()))
191 log.debug("Current size of pendinggroupkeymap:{}",
192 auditPendingReqQueue.size());
198 public void deactivate() {
199 clusterCommunicator.removeSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST);
200 groupStoreEntriesByKey.destroy();
201 auditPendingReqQueue.destroy();
205 private static NewConcurrentHashMap<GroupId, Group>
206 lazyEmptyExtraneousGroupIdTable() {
207 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
210 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
211 lazyEmptyGroupIdTable() {
212 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
216 * Returns the group store eventual consistent key map.
218 * @return Map representing group key table.
220 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
221 getGroupStoreKeyMap() {
222 return groupStoreEntriesByKey;
226 * Returns the group id table for specified device.
228 * @param deviceId identifier of the device
229 * @return Map representing group key table of given device.
231 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
232 return createIfAbsentUnchecked(groupEntriesById,
233 deviceId, lazyEmptyGroupIdTable());
237 * Returns the pending group request table.
239 * @return Map representing group key table.
241 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
242 getPendingGroupKeyTable() {
243 return auditPendingReqQueue;
247 * Returns the extraneous group id table for specified device.
249 * @param deviceId identifier of the device
250 * @return Map representing group key table of given device.
252 private ConcurrentMap<GroupId, Group>
253 getExtraneousGroupIdTable(DeviceId deviceId) {
254 return createIfAbsentUnchecked(extraneousGroupEntriesById,
256 lazyEmptyExtraneousGroupIdTable());
260 * Returns the number of groups for the specified device in the store.
262 * @return number of groups for the specified device
265 public int getGroupCount(DeviceId deviceId) {
266 return (getGroups(deviceId) != null) ?
267 Iterables.size(getGroups(deviceId)) : 0;
271 * Returns the groups associated with a device.
273 * @param deviceId the device ID
275 * @return the group entries
278 public Iterable<Group> getGroups(DeviceId deviceId) {
279 // flatten and make iterator unmodifiable
280 return FluentIterable.from(getGroupStoreKeyMap().values())
281 .filter(input -> input.deviceId().equals(deviceId))
282 .transform(input -> input);
285 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
286 // flatten and make iterator unmodifiable
287 return FluentIterable.from(getGroupStoreKeyMap().values())
288 .filter(input -> input.deviceId().equals(deviceId));
292 * Returns the stored group entry.
294 * @param deviceId the device ID
295 * @param appCookie the group key
297 * @return a group associated with the key
300 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
301 return getStoredGroupEntry(deviceId, appCookie);
304 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
305 GroupKey appCookie) {
306 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
311 public Group getGroup(DeviceId deviceId, GroupId groupId) {
312 return getStoredGroupEntry(deviceId, groupId);
315 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
317 return getGroupIdTable(deviceId).get(groupId);
320 private int getFreeGroupIdValue(DeviceId deviceId) {
321 int freeId = groupIdGen.incrementAndGet();
324 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
325 if (existing == null) {
327 extraneousGroupEntriesById.get(deviceId) != null) ?
328 extraneousGroupEntriesById.get(deviceId).
329 get(new DefaultGroupId(freeId)) :
332 if (existing != null) {
333 freeId = groupIdGen.incrementAndGet();
338 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
343 * Stores a new group entry using the information from group description.
345 * @param groupDesc group description to be used to create group entry
348 public void storeGroupDescription(GroupDescription groupDesc) {
349 log.debug("In storeGroupDescription");
350 // Check if a group is existing with the same key
351 Group existingGroup = getGroup(groupDesc.deviceId(), groupDesc.appCookie());
352 if (existingGroup != null) {
353 log.warn("Group already exists with the same key {} in dev:{} with id:{}",
354 groupDesc.appCookie(), groupDesc.deviceId(),
355 Integer.toHexString(existingGroup.id().id()));
359 // Check if group to be created by a remote instance
360 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
361 log.debug("storeGroupDescription: Device {} local role is not MASTER",
362 groupDesc.deviceId());
363 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
364 log.error("No Master for device {}..."
365 + "Can not perform add group operation",
366 groupDesc.deviceId());
367 //TODO: Send Group operation failure event
370 GroupStoreMessage groupOp = GroupStoreMessage.
371 createGroupAddRequestMsg(groupDesc.deviceId(),
374 clusterCommunicator.unicast(groupOp,
375 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
376 clusterMsgSerializer::serialize,
377 mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
379 log.warn("Failed to send request to master: {} to {}",
381 mastershipService.getMasterFor(groupDesc.deviceId()));
382 //TODO: Send Group operation failure event
384 log.debug("Sent Group operation request for device {} "
385 + "to remote MASTER {}",
386 groupDesc.deviceId(),
387 mastershipService.getMasterFor(groupDesc.deviceId()));
393 log.debug("Store group for device {} is getting handled locally",
394 groupDesc.deviceId());
395 storeGroupDescriptionInternal(groupDesc);
398 private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
399 ConcurrentMap<GroupId, Group> extraneousMap =
400 extraneousGroupEntriesById.get(deviceId);
401 if (extraneousMap == null) {
404 return extraneousMap.get(new DefaultGroupId(groupId));
407 private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
408 GroupBuckets buckets) {
409 ConcurrentMap<GroupId, Group> extraneousMap =
410 extraneousGroupEntriesById.get(deviceId);
411 if (extraneousMap == null) {
415 for (Group extraneousGroup:extraneousMap.values()) {
416 if (extraneousGroup.buckets().equals(buckets)) {
417 return extraneousGroup;
423 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
424 // Check if a group is existing with the same key
425 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
429 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
430 // Device group audit has not completed yet
431 // Add this group description to pending group key table
432 // Create a group entry object with Dummy Group ID
433 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
434 groupDesc.deviceId());
435 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
436 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
437 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
438 getPendingGroupKeyTable();
439 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
440 groupDesc.appCookie()),
445 Group matchingExtraneousGroup = null;
446 if (groupDesc.givenGroupId() != null) {
447 //Check if there is a extraneous group existing with the same Id
448 matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
449 groupDesc.deviceId(), groupDesc.givenGroupId());
450 if (matchingExtraneousGroup != null) {
451 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {} for group id {}",
452 groupDesc.deviceId(),
453 groupDesc.givenGroupId());
454 //Check if the group buckets matches with user provided buckets
455 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
456 //Group is already existing with the same buckets and Id
457 // Create a group entry object
458 log.debug("storeGroupDescriptionInternal: Buckets also matching in Device {} for group id {}",
459 groupDesc.deviceId(),
460 groupDesc.givenGroupId());
461 StoredGroupEntry group = new DefaultGroup(
462 matchingExtraneousGroup.id(), groupDesc);
463 // Insert the newly created group entry into key and id maps
464 getGroupStoreKeyMap().
465 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
466 groupDesc.appCookie()), group);
467 // Ensure it also inserted into group id based table to
468 // avoid any chances of duplication in group id generation
469 getGroupIdTable(groupDesc.deviceId()).
470 put(matchingExtraneousGroup.id(), group);
471 addOrUpdateGroupEntry(matchingExtraneousGroup);
472 removeExtraneousGroupEntry(matchingExtraneousGroup);
475 //Group buckets are not matching. Update group
476 //with user provided buckets.
478 log.debug("storeGroupDescriptionInternal: Buckets are not matching in Device {} for group id {}",
479 groupDesc.deviceId(),
480 groupDesc.givenGroupId());
484 //Check if there is an extraneous group with user provided buckets
485 matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
486 groupDesc.deviceId(), groupDesc.buckets());
487 if (matchingExtraneousGroup != null) {
488 //Group is already existing with the same buckets.
489 //So reuse this group.
490 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
491 groupDesc.deviceId());
492 //Create a group entry object
493 StoredGroupEntry group = new DefaultGroup(
494 matchingExtraneousGroup.id(), groupDesc);
495 // Insert the newly created group entry into key and id maps
496 getGroupStoreKeyMap().
497 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
498 groupDesc.appCookie()), group);
499 // Ensure it also inserted into group id based table to
500 // avoid any chances of duplication in group id generation
501 getGroupIdTable(groupDesc.deviceId()).
502 put(matchingExtraneousGroup.id(), group);
503 addOrUpdateGroupEntry(matchingExtraneousGroup);
504 removeExtraneousGroupEntry(matchingExtraneousGroup);
507 //TODO: Check if there are any empty groups that can be used here
508 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
509 groupDesc.deviceId());
514 if (groupDesc.givenGroupId() == null) {
515 // Get a new group identifier
516 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
518 id = new DefaultGroupId(groupDesc.givenGroupId());
520 // Create a group entry object
521 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
522 // Insert the newly created group entry into key and id maps
523 getGroupStoreKeyMap().
524 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
525 groupDesc.appCookie()), group);
526 // Ensure it also inserted into group id based table to
527 // avoid any chances of duplication in group id generation
528 getGroupIdTable(groupDesc.deviceId()).
530 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
532 groupDesc.deviceId());
533 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
538 * Updates the existing group entry with the information
539 * from group description.
541 * @param deviceId the device ID
542 * @param oldAppCookie the current group key
543 * @param type update type
544 * @param newBuckets group buckets for updates
545 * @param newAppCookie optional new group key
548 public void updateGroupDescription(DeviceId deviceId,
549 GroupKey oldAppCookie,
551 GroupBuckets newBuckets,
552 GroupKey newAppCookie) {
553 // Check if group update to be done by a remote instance
554 if (mastershipService.getMasterFor(deviceId) != null &&
555 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
556 log.debug("updateGroupDescription: Device {} local role is not MASTER",
558 if (mastershipService.getMasterFor(deviceId) == null) {
559 log.error("No Master for device {}..."
560 + "Can not perform update group operation",
562 //TODO: Send Group operation failure event
565 GroupStoreMessage groupOp = GroupStoreMessage.
566 createGroupUpdateRequestMsg(deviceId,
572 clusterCommunicator.unicast(groupOp,
573 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
574 clusterMsgSerializer::serialize,
575 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
577 log.warn("Failed to send request to master: {} to {}",
579 mastershipService.getMasterFor(deviceId), error);
581 //TODO: Send Group operation failure event
585 log.debug("updateGroupDescription for device {} is getting handled locally",
587 updateGroupDescriptionInternal(deviceId,
594 private void updateGroupDescriptionInternal(DeviceId deviceId,
595 GroupKey oldAppCookie,
597 GroupBuckets newBuckets,
598 GroupKey newAppCookie) {
599 // Check if a group is existing with the provided key
600 Group oldGroup = getGroup(deviceId, oldAppCookie);
601 if (oldGroup == null) {
602 log.warn("updateGroupDescriptionInternal: Group not found...strange");
606 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
609 if (newBucketList != null) {
610 // Create a new group object from the old group
611 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
612 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
613 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
618 oldGroup.givenGroupId(),
620 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
622 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
626 newGroup.setState(GroupState.PENDING_UPDATE);
627 newGroup.setLife(oldGroup.life());
628 newGroup.setPackets(oldGroup.packets());
629 newGroup.setBytes(oldGroup.bytes());
630 //Update the group entry in groupkey based map.
631 //Update to groupid based map will happen in the
632 //groupkey based map update listener
633 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
635 getGroupStoreKeyMap().
636 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
637 newGroup.appCookie()), newGroup);
638 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
640 log.warn("updateGroupDescriptionInternal with type {}: No "
641 + "change in the buckets in update", type);
645 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
647 GroupBuckets buckets) {
648 GroupBuckets oldBuckets = oldGroup.buckets();
649 List<GroupBucket> newBucketList = new ArrayList<>(oldBuckets.buckets());
650 boolean groupDescUpdated = false;
652 if (type == UpdateType.ADD) {
653 // Check if the any of the new buckets are part of
654 // the old bucket list
655 for (GroupBucket addBucket:buckets.buckets()) {
656 if (!newBucketList.contains(addBucket)) {
657 newBucketList.add(addBucket);
658 groupDescUpdated = true;
661 } else if (type == UpdateType.REMOVE) {
662 // Check if the to be removed buckets are part of the
664 for (GroupBucket removeBucket:buckets.buckets()) {
665 if (newBucketList.contains(removeBucket)) {
666 newBucketList.remove(removeBucket);
667 groupDescUpdated = true;
672 if (groupDescUpdated) {
673 return newBucketList;
680 * Triggers deleting the existing group entry.
682 * @param deviceId the device ID
683 * @param appCookie the group key
686 public void deleteGroupDescription(DeviceId deviceId,
687 GroupKey appCookie) {
688 // Check if group to be deleted by a remote instance
689 if (mastershipService.
690 getLocalRole(deviceId) != MastershipRole.MASTER) {
691 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
693 if (mastershipService.getMasterFor(deviceId) == null) {
694 log.error("No Master for device {}..."
695 + "Can not perform delete group operation",
697 //TODO: Send Group operation failure event
700 GroupStoreMessage groupOp = GroupStoreMessage.
701 createGroupDeleteRequestMsg(deviceId,
704 clusterCommunicator.unicast(groupOp,
705 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
706 clusterMsgSerializer::serialize,
707 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
709 log.warn("Failed to send request to master: {} to {}",
711 mastershipService.getMasterFor(deviceId), error);
713 //TODO: Send Group operation failure event
717 log.debug("deleteGroupDescription in device {} is getting handled locally",
719 deleteGroupDescriptionInternal(deviceId, appCookie);
722 private void deleteGroupDescriptionInternal(DeviceId deviceId,
723 GroupKey appCookie) {
724 // Check if a group is existing with the provided key
725 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
726 if (existing == null) {
730 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
734 synchronized (existing) {
735 existing.setState(GroupState.PENDING_DELETE);
737 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
739 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
743 * Stores a new group entry, or updates an existing entry.
745 * @param group group entry
748 public void addOrUpdateGroupEntry(Group group) {
749 // check if this new entry is an update to an existing entry
750 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
752 GroupEvent event = null;
754 if (existing != null) {
755 log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}",
758 synchronized (existing) {
759 for (GroupBucket bucket:group.buckets().buckets()) {
760 Optional<GroupBucket> matchingBucket =
761 existing.buckets().buckets()
763 .filter((existingBucket)->(existingBucket.equals(bucket)))
765 if (matchingBucket.isPresent()) {
766 ((StoredGroupBucketEntry) matchingBucket.
767 get()).setPackets(bucket.packets());
768 ((StoredGroupBucketEntry) matchingBucket.
769 get()).setBytes(bucket.bytes());
771 log.warn("addOrUpdateGroupEntry: No matching "
772 + "buckets to update stats");
775 existing.setLife(group.life());
776 existing.setPackets(group.packets());
777 existing.setBytes(group.bytes());
778 if ((existing.state() == GroupState.PENDING_ADD) ||
779 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
780 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
784 existing.setState(GroupState.ADDED);
785 existing.setIsGroupStateAddedFirstTime(true);
786 event = new GroupEvent(Type.GROUP_ADDED, existing);
788 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
791 GroupState.PENDING_UPDATE);
792 existing.setState(GroupState.ADDED);
793 existing.setIsGroupStateAddedFirstTime(false);
794 event = new GroupEvent(Type.GROUP_UPDATED, existing);
796 //Re-PUT map entries to trigger map update events
797 getGroupStoreKeyMap().
798 put(new GroupStoreKeyMapKey(existing.deviceId(),
799 existing.appCookie()), existing);
802 log.warn("addOrUpdateGroupEntry: Group update "
803 + "happening for a non-existing entry in the map");
807 notifyDelegate(event);
812 * Removes the group entry from store.
814 * @param group group entry
817 public void removeGroupEntry(Group group) {
818 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
821 if (existing != null) {
822 log.debug("removeGroupEntry: removing group entry {} in device {}",
825 //Removal from groupid based map will happen in the
826 //map update listener
827 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
828 existing.appCookie()));
829 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
831 log.warn("removeGroupEntry for {} in device{} is "
832 + "not existing in our maps",
839 public void deviceInitialAuditCompleted(DeviceId deviceId,
841 synchronized (deviceAuditStatus) {
843 log.debug("AUDIT completed for device {}",
845 deviceAuditStatus.put(deviceId, true);
846 // Execute all pending group requests
847 List<StoredGroupEntry> pendingGroupRequests =
848 getPendingGroupKeyTable().values()
850 .filter(g-> g.deviceId().equals(deviceId))
851 .collect(Collectors.toList());
852 log.debug("processing pending group add requests for device {} and number of pending requests {}",
854 pendingGroupRequests.size());
855 for (Group group:pendingGroupRequests) {
856 GroupDescription tmp = new DefaultGroupDescription(
861 group.givenGroupId(),
863 storeGroupDescriptionInternal(tmp);
864 getPendingGroupKeyTable().
865 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
868 Boolean audited = deviceAuditStatus.get(deviceId);
869 if (audited != null && audited) {
870 log.debug("Clearing AUDIT status for device {}", deviceId);
871 deviceAuditStatus.put(deviceId, false);
878 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
879 synchronized (deviceAuditStatus) {
880 Boolean audited = deviceAuditStatus.get(deviceId);
881 return audited != null && audited;
886 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
888 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
889 operation.groupId());
891 if (existing == null) {
892 log.warn("No group entry with ID {} found ", operation.groupId());
896 log.warn("groupOperationFailed: group operation {} failed"
897 + "for group {} in device {}",
900 existing.deviceId());
901 switch (operation.opType()) {
903 if (existing.state() == GroupState.PENDING_ADD) {
904 //TODO: Need to add support for passing the group
905 //operation failure reason from group provider.
906 //If the error type is anything other than GROUP_EXISTS,
907 //then the GROUP_ADD_FAILED event should be raised even
908 //in PENDING_ADD_RETRY state also.
909 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
910 log.warn("groupOperationFailed: cleaningup "
911 + "group {} from store in device {}....",
913 existing.deviceId());
914 //Removal from groupid based map will happen in the
915 //map update listener
916 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
917 existing.appCookie()));
921 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
924 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
927 log.warn("Unknown group operation type {}", operation.opType());
932 public void addOrUpdateExtraneousGroupEntry(Group group) {
933 log.debug("add/update extraneous group entry {} in device {}",
936 ConcurrentMap<GroupId, Group> extraneousIdTable =
937 getExtraneousGroupIdTable(group.deviceId());
938 extraneousIdTable.put(group.id(), group);
939 // Don't remove the extraneous groups, instead re-use it when
940 // a group request comes with the same set of buckets
944 public void removeExtraneousGroupEntry(Group group) {
945 log.debug("remove extraneous group entry {} of device {} from store",
948 ConcurrentMap<GroupId, Group> extraneousIdTable =
949 getExtraneousGroupIdTable(group.deviceId());
950 extraneousIdTable.remove(group.id());
954 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
955 // flatten and make iterator unmodifiable
956 return FluentIterable.from(
957 getExtraneousGroupIdTable(deviceId).values());
961 * Map handler to receive any events when the group key map is updated.
963 private class GroupStoreKeyMapListener implements
964 EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
967 public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
968 StoredGroupEntry> mapEvent) {
969 GroupEvent groupEvent = null;
970 GroupStoreKeyMapKey key = mapEvent.key();
971 StoredGroupEntry group = mapEvent.value();
972 if ((key == null) && (group == null)) {
973 log.error("GroupStoreKeyMapListener: Received "
974 + "event {} with null entry", mapEvent.type());
976 } else if (group == null) {
977 group = getGroupIdTable(key.deviceId()).values()
979 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
982 log.error("GroupStoreKeyMapListener: Received "
983 + "event {} with null entry... can not process", mapEvent.type());
987 log.trace("received groupid map event {} for id {} in device {}",
991 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
992 // Update the group ID table
993 getGroupIdTable(group.deviceId()).put(group.id(), group);
994 if (mapEvent.value().state() == Group.GroupState.ADDED) {
995 if (mapEvent.value().isGroupStateAddedFirstTime()) {
996 groupEvent = new GroupEvent(Type.GROUP_ADDED,
998 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
1002 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
1004 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
1009 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
1010 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
1011 // Remove the entry from the group ID table
1012 getGroupIdTable(group.deviceId()).remove(group.id(), group);
1015 if (groupEvent != null) {
1016 notifyDelegate(groupEvent);
1021 private void process(GroupStoreMessage groupOp) {
1022 log.debug("Received remote group operation {} request for device {}",
1024 groupOp.deviceId());
1025 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1026 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1029 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1030 storeGroupDescriptionInternal(groupOp.groupDesc());
1031 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1032 updateGroupDescriptionInternal(groupOp.deviceId(),
1033 groupOp.appCookie(),
1034 groupOp.updateType(),
1035 groupOp.updateBuckets(),
1036 groupOp.newAppCookie());
1037 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1038 deleteGroupDescriptionInternal(groupOp.deviceId(),
1039 groupOp.appCookie());
1044 * Flattened map key to be used to store group entries.
1046 protected static class GroupStoreMapKey {
1047 private final DeviceId deviceId;
1049 public GroupStoreMapKey(DeviceId deviceId) {
1050 this.deviceId = deviceId;
1053 public DeviceId deviceId() {
1058 public boolean equals(Object o) {
1062 if (!(o instanceof GroupStoreMapKey)) {
1065 GroupStoreMapKey that = (GroupStoreMapKey) o;
1066 return this.deviceId.equals(that.deviceId);
1070 public int hashCode() {
1073 result = 31 * result + Objects.hash(this.deviceId);
1079 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
1080 private final GroupKey appCookie;
1081 public GroupStoreKeyMapKey(DeviceId deviceId,
1082 GroupKey appCookie) {
1084 this.appCookie = appCookie;
1088 public boolean equals(Object o) {
1092 if (!(o instanceof GroupStoreKeyMapKey)) {
1095 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1096 return (super.equals(that) &&
1097 this.appCookie.equals(that.appCookie));
1101 public int hashCode() {
1104 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1110 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
1111 private final GroupId groupId;
1112 public GroupStoreIdMapKey(DeviceId deviceId,
1115 this.groupId = groupId;
1119 public boolean equals(Object o) {
1123 if (!(o instanceof GroupStoreIdMapKey)) {
1126 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1127 return (super.equals(that) &&
1128 this.groupId.equals(that.groupId));
1132 public int hashCode() {
1135 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1142 public void pushGroupMetrics(DeviceId deviceId,
1143 Collection<Group> groupEntries) {
1144 boolean deviceInitialAuditStatus =
1145 deviceInitialAuditStatus(deviceId);
1146 Set<Group> southboundGroupEntries =
1147 Sets.newHashSet(groupEntries);
1148 Set<StoredGroupEntry> storedGroupEntries =
1149 Sets.newHashSet(getStoredGroups(deviceId));
1150 Set<Group> extraneousStoredEntries =
1151 Sets.newHashSet(getExtraneousGroups(deviceId));
1153 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1154 southboundGroupEntries.size(),
1156 for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
1157 Group group = it.next();
1158 log.trace("Group {} in device {}", group, deviceId);
1161 log.trace("Displaying all ({}) stored group entries for device {}",
1162 storedGroupEntries.size(),
1164 for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
1166 Group group = it1.next();
1167 log.trace("Stored Group {} for device {}", group, deviceId);
1170 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1171 Group group = it2.next();
1172 if (storedGroupEntries.remove(group)) {
1173 // we both have the group, let's update some info then.
1174 log.trace("Group AUDIT: group {} exists in both planes for device {}",
1175 group.id(), deviceId);
1180 for (Group group : southboundGroupEntries) {
1181 if (getGroup(group.deviceId(), group.id()) != null) {
1182 // There is a group existing with the same id
1183 // It is possible that group update is
1184 // in progress while we got a stale info from switch
1185 if (!storedGroupEntries.remove(getGroup(
1186 group.deviceId(), group.id()))) {
1187 log.warn("Group AUDIT: Inconsistent state:"
1188 + "Group exists in ID based table while "
1189 + "not present in key based table");
1192 // there are groups in the switch that aren't in the store
1193 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
1194 group.id(), deviceId);
1195 extraneousStoredEntries.remove(group);
1196 extraneousGroup(group);
1199 for (Group group : storedGroupEntries) {
1200 // there are groups in the store that aren't in the switch
1201 log.debug("Group AUDIT: group {} missing in data plane for device {}",
1202 group.id(), deviceId);
1203 groupMissing(group);
1205 for (Group group : extraneousStoredEntries) {
1206 // there are groups in the extraneous store that
1207 // aren't in the switch
1208 log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}",
1209 group.id(), deviceId);
1210 removeExtraneousGroupEntry(group);
1213 if (!deviceInitialAuditStatus) {
1214 log.debug("Group AUDIT: Setting device {} initial AUDIT completed",
1216 deviceInitialAuditCompleted(deviceId, true);
1220 private void groupMissing(Group group) {
1221 switch (group.state()) {
1222 case PENDING_DELETE:
1223 log.debug("Group {} delete confirmation from device {}",
1224 group, group.deviceId());
1225 removeGroupEntry(group);
1229 case PENDING_ADD_RETRY:
1230 case PENDING_UPDATE:
1231 log.debug("Group {} is in store but not on device {}",
1232 group, group.deviceId());
1233 StoredGroupEntry existing =
1234 getStoredGroupEntry(group.deviceId(), group.id());
1235 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
1237 existing.deviceId(),
1239 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
1240 //Re-PUT map entries to trigger map update events
1241 getGroupStoreKeyMap().
1242 put(new GroupStoreKeyMapKey(existing.deviceId(),
1243 existing.appCookie()), existing);
1244 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1248 log.debug("Group {} has not been installed.", group);
1253 private void extraneousGroup(Group group) {
1254 log.debug("Group {} is on device {} but not in store.",
1255 group, group.deviceId());
1256 addOrUpdateExtraneousGroupEntry(group);
1259 private void groupAdded(Group group) {
1260 log.trace("Group {} Added or Updated in device {}",
1261 group, group.deviceId());
1262 addOrUpdateGroupEntry(group);