2 * Copyright 2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.store.group.impl;
18 import com.google.common.collect.FluentIterable;
19 import com.google.common.collect.Iterables;
20 import com.google.common.collect.Sets;
22 import org.apache.felix.scr.annotations.Activate;
23 import org.apache.felix.scr.annotations.Component;
24 import org.apache.felix.scr.annotations.Deactivate;
25 import org.apache.felix.scr.annotations.Reference;
26 import org.apache.felix.scr.annotations.ReferenceCardinality;
27 import org.apache.felix.scr.annotations.Service;
28 import org.onlab.util.KryoNamespace;
29 import org.onlab.util.NewConcurrentHashMap;
30 import org.onosproject.cluster.ClusterService;
31 import org.onosproject.core.DefaultGroupId;
32 import org.onosproject.core.GroupId;
33 import org.onosproject.mastership.MastershipService;
34 import org.onosproject.net.DeviceId;
35 import org.onosproject.net.MastershipRole;
36 import org.onosproject.net.group.DefaultGroup;
37 import org.onosproject.net.group.DefaultGroupBucket;
38 import org.onosproject.net.group.DefaultGroupDescription;
39 import org.onosproject.net.group.DefaultGroupKey;
40 import org.onosproject.net.group.Group;
41 import org.onosproject.net.group.Group.GroupState;
42 import org.onosproject.net.group.GroupBucket;
43 import org.onosproject.net.group.GroupBuckets;
44 import org.onosproject.net.group.GroupDescription;
45 import org.onosproject.net.group.GroupEvent;
46 import org.onosproject.net.group.GroupEvent.Type;
47 import org.onosproject.net.group.GroupKey;
48 import org.onosproject.net.group.GroupOperation;
49 import org.onosproject.net.group.GroupStore;
50 import org.onosproject.net.group.GroupStoreDelegate;
51 import org.onosproject.net.group.StoredGroupBucketEntry;
52 import org.onosproject.net.group.StoredGroupEntry;
53 import org.onosproject.store.AbstractStore;
54 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
55 import org.onosproject.store.service.MultiValuedTimestamp;
56 import org.onosproject.store.serializers.KryoNamespaces;
57 import org.onosproject.store.service.EventuallyConsistentMap;
58 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
59 import org.onosproject.store.service.EventuallyConsistentMapEvent;
60 import org.onosproject.store.service.EventuallyConsistentMapListener;
61 import org.onosproject.store.service.StorageService;
62 import org.slf4j.Logger;
64 import java.util.ArrayList;
65 import java.util.Collection;
66 import java.util.HashMap;
67 import java.util.Iterator;
68 import java.util.List;
69 import java.util.Objects;
70 import java.util.Optional;
72 import java.util.concurrent.ConcurrentHashMap;
73 import java.util.concurrent.ConcurrentMap;
74 import java.util.concurrent.ExecutorService;
75 import java.util.concurrent.Executors;
76 import java.util.concurrent.atomic.AtomicInteger;
77 import java.util.concurrent.atomic.AtomicLong;
78 import java.util.stream.Collectors;
80 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
81 import static org.onlab.util.Tools.groupedThreads;
82 import static org.slf4j.LoggerFactory.getLogger;
85 * Manages inventory of group entries using trivial in-memory implementation.
87 @Component(immediate = true)
89 public class DistributedGroupStore
90 extends AbstractStore<GroupEvent, GroupStoreDelegate>
91 implements GroupStore {
93 private final Logger log = getLogger(getClass());
95 private final int dummyId = 0xffffffff;
96 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
98 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99 protected ClusterCommunicationService clusterCommunicator;
101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 protected ClusterService clusterService;
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected StorageService storageService;
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected MastershipService mastershipService;
110 // Per device group table with (device id + app cookie) as key
111 private EventuallyConsistentMap<GroupStoreKeyMapKey,
112 StoredGroupEntry> groupStoreEntriesByKey = null;
113 // Per device group table with (device id + group id) as key
114 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
115 groupEntriesById = new ConcurrentHashMap<>();
116 private EventuallyConsistentMap<GroupStoreKeyMapKey,
117 StoredGroupEntry> auditPendingReqQueue = null;
118 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
119 extraneousGroupEntriesById = new ConcurrentHashMap<>();
120 private ExecutorService messageHandlingExecutor;
121 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
123 private final HashMap<DeviceId, Boolean> deviceAuditStatus = new HashMap<>();
125 private final AtomicInteger groupIdGen = new AtomicInteger();
127 private KryoNamespace.Builder kryoBuilder = null;
129 private final AtomicLong sequenceNumber = new AtomicLong(0);
131 private KryoNamespace clusterMsgSerializer;
134 public void activate() {
135 kryoBuilder = new KryoNamespace.Builder()
136 .register(KryoNamespaces.API)
137 .register(DefaultGroup.class,
138 DefaultGroupBucket.class,
139 DefaultGroupDescription.class,
140 DefaultGroupKey.class,
141 GroupDescription.Type.class,
142 Group.GroupState.class,
144 DefaultGroupId.class,
145 GroupStoreMessage.class,
146 GroupStoreMessage.Type.class,
148 GroupStoreMessageSubjects.class,
149 MultiValuedTimestamp.class,
150 GroupStoreKeyMapKey.class,
151 GroupStoreIdMapKey.class,
152 GroupStoreMapKey.class
155 clusterMsgSerializer = kryoBuilder.build();
157 messageHandlingExecutor = Executors.
158 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
159 groupedThreads("onos/store/group",
160 "message-handlers"));
162 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
163 clusterMsgSerializer::deserialize,
165 messageHandlingExecutor);
167 log.debug("Creating EC map groupstorekeymap");
168 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
169 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
171 groupStoreEntriesByKey = keyMapBuilder
172 .withName("groupstorekeymap")
173 .withSerializer(kryoBuilder)
174 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
175 sequenceNumber.getAndIncrement()))
177 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
178 log.debug("Current size of groupstorekeymap:{}",
179 groupStoreEntriesByKey.size());
181 log.debug("Creating EC map pendinggroupkeymap");
182 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
183 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
185 auditPendingReqQueue = auditMapBuilder
186 .withName("pendinggroupkeymap")
187 .withSerializer(kryoBuilder)
188 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
189 sequenceNumber.getAndIncrement()))
191 log.debug("Current size of pendinggroupkeymap:{}",
192 auditPendingReqQueue.size());
198 public void deactivate() {
199 clusterCommunicator.removeSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST);
200 groupStoreEntriesByKey.destroy();
201 auditPendingReqQueue.destroy();
205 private static NewConcurrentHashMap<GroupId, Group>
206 lazyEmptyExtraneousGroupIdTable() {
207 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
210 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
211 lazyEmptyGroupIdTable() {
212 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
216 * Returns the group store eventual consistent key map.
218 * @return Map representing group key table.
220 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
221 getGroupStoreKeyMap() {
222 return groupStoreEntriesByKey;
226 * Returns the group id table for specified device.
228 * @param deviceId identifier of the device
229 * @return Map representing group key table of given device.
231 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
232 return createIfAbsentUnchecked(groupEntriesById,
233 deviceId, lazyEmptyGroupIdTable());
237 * Returns the pending group request table.
239 * @return Map representing group key table.
241 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
242 getPendingGroupKeyTable() {
243 return auditPendingReqQueue;
247 * Returns the extraneous group id table for specified device.
249 * @param deviceId identifier of the device
250 * @return Map representing group key table of given device.
252 private ConcurrentMap<GroupId, Group>
253 getExtraneousGroupIdTable(DeviceId deviceId) {
254 return createIfAbsentUnchecked(extraneousGroupEntriesById,
256 lazyEmptyExtraneousGroupIdTable());
260 * Returns the number of groups for the specified device in the store.
262 * @return number of groups for the specified device
265 public int getGroupCount(DeviceId deviceId) {
266 return (getGroups(deviceId) != null) ?
267 Iterables.size(getGroups(deviceId)) : 0;
271 * Returns the groups associated with a device.
273 * @param deviceId the device ID
275 * @return the group entries
278 public Iterable<Group> getGroups(DeviceId deviceId) {
279 // flatten and make iterator unmodifiable
280 return FluentIterable.from(getGroupStoreKeyMap().values())
281 .filter(input -> input.deviceId().equals(deviceId))
282 .transform(input -> input);
285 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
286 // flatten and make iterator unmodifiable
287 return FluentIterable.from(getGroupStoreKeyMap().values())
288 .filter(input -> input.deviceId().equals(deviceId));
292 * Returns the stored group entry.
294 * @param deviceId the device ID
295 * @param appCookie the group key
297 * @return a group associated with the key
300 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
301 return getStoredGroupEntry(deviceId, appCookie);
304 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
305 GroupKey appCookie) {
306 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
311 public Group getGroup(DeviceId deviceId, GroupId groupId) {
312 return getStoredGroupEntry(deviceId, groupId);
315 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
317 return getGroupIdTable(deviceId).get(groupId);
320 private int getFreeGroupIdValue(DeviceId deviceId) {
321 int freeId = groupIdGen.incrementAndGet();
324 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
325 if (existing == null) {
327 extraneousGroupEntriesById.get(deviceId) != null) ?
328 extraneousGroupEntriesById.get(deviceId).
329 get(new DefaultGroupId(freeId)) :
332 if (existing != null) {
333 freeId = groupIdGen.incrementAndGet();
338 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
343 * Stores a new group entry using the information from group description.
345 * @param groupDesc group description to be used to create group entry
348 public void storeGroupDescription(GroupDescription groupDesc) {
349 log.debug("In storeGroupDescription");
350 // Check if a group is existing with the same key
351 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
352 log.warn("Group already exists with the same key {}",
353 groupDesc.appCookie());
357 // Check if group to be created by a remote instance
358 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
359 log.debug("storeGroupDescription: Device {} local role is not MASTER",
360 groupDesc.deviceId());
361 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
362 log.error("No Master for device {}..."
363 + "Can not perform add group operation",
364 groupDesc.deviceId());
365 //TODO: Send Group operation failure event
368 GroupStoreMessage groupOp = GroupStoreMessage.
369 createGroupAddRequestMsg(groupDesc.deviceId(),
372 clusterCommunicator.unicast(groupOp,
373 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
374 clusterMsgSerializer::serialize,
375 mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
377 log.warn("Failed to send request to master: {} to {}",
379 mastershipService.getMasterFor(groupDesc.deviceId()));
380 //TODO: Send Group operation failure event
382 log.debug("Sent Group operation request for device {} "
383 + "to remote MASTER {}",
384 groupDesc.deviceId(),
385 mastershipService.getMasterFor(groupDesc.deviceId()));
391 log.debug("Store group for device {} is getting handled locally",
392 groupDesc.deviceId());
393 storeGroupDescriptionInternal(groupDesc);
396 private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
397 ConcurrentMap<GroupId, Group> extraneousMap =
398 extraneousGroupEntriesById.get(deviceId);
399 if (extraneousMap == null) {
402 return extraneousMap.get(new DefaultGroupId(groupId));
405 private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
406 GroupBuckets buckets) {
407 ConcurrentMap<GroupId, Group> extraneousMap =
408 extraneousGroupEntriesById.get(deviceId);
409 if (extraneousMap == null) {
413 for (Group extraneousGroup:extraneousMap.values()) {
414 if (extraneousGroup.buckets().equals(buckets)) {
415 return extraneousGroup;
421 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
422 // Check if a group is existing with the same key
423 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
427 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
428 // Device group audit has not completed yet
429 // Add this group description to pending group key table
430 // Create a group entry object with Dummy Group ID
431 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
432 groupDesc.deviceId());
433 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
434 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
435 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
436 getPendingGroupKeyTable();
437 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
438 groupDesc.appCookie()),
443 Group matchingExtraneousGroup = null;
444 if (groupDesc.givenGroupId() != null) {
445 //Check if there is a extraneous group existing with the same Id
446 matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
447 groupDesc.deviceId(), groupDesc.givenGroupId());
448 if (matchingExtraneousGroup != null) {
449 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {} for group id {}",
450 groupDesc.deviceId(),
451 groupDesc.givenGroupId());
452 //Check if the group buckets matches with user provided buckets
453 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
454 //Group is already existing with the same buckets and Id
455 // Create a group entry object
456 log.debug("storeGroupDescriptionInternal: Buckets also matching in Device {} for group id {}",
457 groupDesc.deviceId(),
458 groupDesc.givenGroupId());
459 StoredGroupEntry group = new DefaultGroup(
460 matchingExtraneousGroup.id(), groupDesc);
461 // Insert the newly created group entry into key and id maps
462 getGroupStoreKeyMap().
463 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
464 groupDesc.appCookie()), group);
465 // Ensure it also inserted into group id based table to
466 // avoid any chances of duplication in group id generation
467 getGroupIdTable(groupDesc.deviceId()).
468 put(matchingExtraneousGroup.id(), group);
469 addOrUpdateGroupEntry(matchingExtraneousGroup);
470 removeExtraneousGroupEntry(matchingExtraneousGroup);
473 //Group buckets are not matching. Update group
474 //with user provided buckets.
476 log.debug("storeGroupDescriptionInternal: Buckets are not matching in Device {} for group id {}",
477 groupDesc.deviceId(),
478 groupDesc.givenGroupId());
482 //Check if there is an extraneous group with user provided buckets
483 matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
484 groupDesc.deviceId(), groupDesc.buckets());
485 if (matchingExtraneousGroup != null) {
486 //Group is already existing with the same buckets.
487 //So reuse this group.
488 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
489 groupDesc.deviceId());
490 //Create a group entry object
491 StoredGroupEntry group = new DefaultGroup(
492 matchingExtraneousGroup.id(), groupDesc);
493 // Insert the newly created group entry into key and id maps
494 getGroupStoreKeyMap().
495 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
496 groupDesc.appCookie()), group);
497 // Ensure it also inserted into group id based table to
498 // avoid any chances of duplication in group id generation
499 getGroupIdTable(groupDesc.deviceId()).
500 put(matchingExtraneousGroup.id(), group);
501 addOrUpdateGroupEntry(matchingExtraneousGroup);
502 removeExtraneousGroupEntry(matchingExtraneousGroup);
505 //TODO: Check if there are any empty groups that can be used here
506 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
507 groupDesc.deviceId());
512 if (groupDesc.givenGroupId() == null) {
513 // Get a new group identifier
514 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
516 id = new DefaultGroupId(groupDesc.givenGroupId());
518 // Create a group entry object
519 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
520 // Insert the newly created group entry into key and id maps
521 getGroupStoreKeyMap().
522 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
523 groupDesc.appCookie()), group);
524 // Ensure it also inserted into group id based table to
525 // avoid any chances of duplication in group id generation
526 getGroupIdTable(groupDesc.deviceId()).
528 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
530 groupDesc.deviceId());
531 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
536 * Updates the existing group entry with the information
537 * from group description.
539 * @param deviceId the device ID
540 * @param oldAppCookie the current group key
541 * @param type update type
542 * @param newBuckets group buckets for updates
543 * @param newAppCookie optional new group key
546 public void updateGroupDescription(DeviceId deviceId,
547 GroupKey oldAppCookie,
549 GroupBuckets newBuckets,
550 GroupKey newAppCookie) {
551 // Check if group update to be done by a remote instance
552 if (mastershipService.getMasterFor(deviceId) != null &&
553 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
554 log.debug("updateGroupDescription: Device {} local role is not MASTER",
556 if (mastershipService.getMasterFor(deviceId) == null) {
557 log.error("No Master for device {}..."
558 + "Can not perform update group operation",
560 //TODO: Send Group operation failure event
563 GroupStoreMessage groupOp = GroupStoreMessage.
564 createGroupUpdateRequestMsg(deviceId,
570 clusterCommunicator.unicast(groupOp,
571 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
572 clusterMsgSerializer::serialize,
573 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
575 log.warn("Failed to send request to master: {} to {}",
577 mastershipService.getMasterFor(deviceId), error);
579 //TODO: Send Group operation failure event
583 log.debug("updateGroupDescription for device {} is getting handled locally",
585 updateGroupDescriptionInternal(deviceId,
592 private void updateGroupDescriptionInternal(DeviceId deviceId,
593 GroupKey oldAppCookie,
595 GroupBuckets newBuckets,
596 GroupKey newAppCookie) {
597 // Check if a group is existing with the provided key
598 Group oldGroup = getGroup(deviceId, oldAppCookie);
599 if (oldGroup == null) {
600 log.warn("updateGroupDescriptionInternal: Group not found...strange");
604 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
607 if (newBucketList != null) {
608 // Create a new group object from the old group
609 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
610 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
611 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
616 oldGroup.givenGroupId(),
618 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
620 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
624 newGroup.setState(GroupState.PENDING_UPDATE);
625 newGroup.setLife(oldGroup.life());
626 newGroup.setPackets(oldGroup.packets());
627 newGroup.setBytes(oldGroup.bytes());
628 //Update the group entry in groupkey based map.
629 //Update to groupid based map will happen in the
630 //groupkey based map update listener
631 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
633 getGroupStoreKeyMap().
634 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
635 newGroup.appCookie()), newGroup);
636 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
638 log.warn("updateGroupDescriptionInternal with type {}: No "
639 + "change in the buckets in update", type);
643 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
645 GroupBuckets buckets) {
646 GroupBuckets oldBuckets = oldGroup.buckets();
647 List<GroupBucket> newBucketList = new ArrayList<>(oldBuckets.buckets());
648 boolean groupDescUpdated = false;
650 if (type == UpdateType.ADD) {
651 // Check if the any of the new buckets are part of
652 // the old bucket list
653 for (GroupBucket addBucket:buckets.buckets()) {
654 if (!newBucketList.contains(addBucket)) {
655 newBucketList.add(addBucket);
656 groupDescUpdated = true;
659 } else if (type == UpdateType.REMOVE) {
660 // Check if the to be removed buckets are part of the
662 for (GroupBucket removeBucket:buckets.buckets()) {
663 if (newBucketList.contains(removeBucket)) {
664 newBucketList.remove(removeBucket);
665 groupDescUpdated = true;
670 if (groupDescUpdated) {
671 return newBucketList;
678 * Triggers deleting the existing group entry.
680 * @param deviceId the device ID
681 * @param appCookie the group key
684 public void deleteGroupDescription(DeviceId deviceId,
685 GroupKey appCookie) {
686 // Check if group to be deleted by a remote instance
687 if (mastershipService.
688 getLocalRole(deviceId) != MastershipRole.MASTER) {
689 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
691 if (mastershipService.getMasterFor(deviceId) == null) {
692 log.error("No Master for device {}..."
693 + "Can not perform delete group operation",
695 //TODO: Send Group operation failure event
698 GroupStoreMessage groupOp = GroupStoreMessage.
699 createGroupDeleteRequestMsg(deviceId,
702 clusterCommunicator.unicast(groupOp,
703 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
704 clusterMsgSerializer::serialize,
705 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
707 log.warn("Failed to send request to master: {} to {}",
709 mastershipService.getMasterFor(deviceId), error);
711 //TODO: Send Group operation failure event
715 log.debug("deleteGroupDescription in device {} is getting handled locally",
717 deleteGroupDescriptionInternal(deviceId, appCookie);
720 private void deleteGroupDescriptionInternal(DeviceId deviceId,
721 GroupKey appCookie) {
722 // Check if a group is existing with the provided key
723 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
724 if (existing == null) {
728 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
732 synchronized (existing) {
733 existing.setState(GroupState.PENDING_DELETE);
735 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
737 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
741 * Stores a new group entry, or updates an existing entry.
743 * @param group group entry
746 public void addOrUpdateGroupEntry(Group group) {
747 // check if this new entry is an update to an existing entry
748 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
750 GroupEvent event = null;
752 if (existing != null) {
753 log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}",
756 synchronized (existing) {
757 for (GroupBucket bucket:group.buckets().buckets()) {
758 Optional<GroupBucket> matchingBucket =
759 existing.buckets().buckets()
761 .filter((existingBucket)->(existingBucket.equals(bucket)))
763 if (matchingBucket.isPresent()) {
764 ((StoredGroupBucketEntry) matchingBucket.
765 get()).setPackets(bucket.packets());
766 ((StoredGroupBucketEntry) matchingBucket.
767 get()).setBytes(bucket.bytes());
769 log.warn("addOrUpdateGroupEntry: No matching "
770 + "buckets to update stats");
773 existing.setLife(group.life());
774 existing.setPackets(group.packets());
775 existing.setBytes(group.bytes());
776 if ((existing.state() == GroupState.PENDING_ADD) ||
777 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
778 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
782 existing.setState(GroupState.ADDED);
783 existing.setIsGroupStateAddedFirstTime(true);
784 event = new GroupEvent(Type.GROUP_ADDED, existing);
786 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
789 GroupState.PENDING_UPDATE);
790 existing.setState(GroupState.ADDED);
791 existing.setIsGroupStateAddedFirstTime(false);
792 event = new GroupEvent(Type.GROUP_UPDATED, existing);
794 //Re-PUT map entries to trigger map update events
795 getGroupStoreKeyMap().
796 put(new GroupStoreKeyMapKey(existing.deviceId(),
797 existing.appCookie()), existing);
800 log.warn("addOrUpdateGroupEntry: Group update "
801 + "happening for a non-existing entry in the map");
805 notifyDelegate(event);
810 * Removes the group entry from store.
812 * @param group group entry
815 public void removeGroupEntry(Group group) {
816 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
819 if (existing != null) {
820 log.debug("removeGroupEntry: removing group entry {} in device {}",
823 //Removal from groupid based map will happen in the
824 //map update listener
825 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
826 existing.appCookie()));
827 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
829 log.warn("removeGroupEntry for {} in device{} is "
830 + "not existing in our maps",
837 public void deviceInitialAuditCompleted(DeviceId deviceId,
839 synchronized (deviceAuditStatus) {
841 log.debug("AUDIT completed for device {}",
843 deviceAuditStatus.put(deviceId, true);
844 // Execute all pending group requests
845 List<StoredGroupEntry> pendingGroupRequests =
846 getPendingGroupKeyTable().values()
848 .filter(g-> g.deviceId().equals(deviceId))
849 .collect(Collectors.toList());
850 log.debug("processing pending group add requests for device {} and number of pending requests {}",
852 pendingGroupRequests.size());
853 for (Group group:pendingGroupRequests) {
854 GroupDescription tmp = new DefaultGroupDescription(
859 group.givenGroupId(),
861 storeGroupDescriptionInternal(tmp);
862 getPendingGroupKeyTable().
863 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
866 Boolean audited = deviceAuditStatus.get(deviceId);
867 if (audited != null && audited) {
868 log.debug("Clearing AUDIT status for device {}", deviceId);
869 deviceAuditStatus.put(deviceId, false);
876 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
877 synchronized (deviceAuditStatus) {
878 Boolean audited = deviceAuditStatus.get(deviceId);
879 return audited != null && audited;
884 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
886 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
887 operation.groupId());
889 if (existing == null) {
890 log.warn("No group entry with ID {} found ", operation.groupId());
894 log.warn("groupOperationFailed: group operation {} failed"
895 + "for group {} in device {}",
898 existing.deviceId());
899 switch (operation.opType()) {
901 if (existing.state() == GroupState.PENDING_ADD) {
902 //TODO: Need to add support for passing the group
903 //operation failure reason from group provider.
904 //If the error type is anything other than GROUP_EXISTS,
905 //then the GROUP_ADD_FAILED event should be raised even
906 //in PENDING_ADD_RETRY state also.
907 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
908 log.warn("groupOperationFailed: cleaningup "
909 + "group {} from store in device {}....",
911 existing.deviceId());
912 //Removal from groupid based map will happen in the
913 //map update listener
914 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
915 existing.appCookie()));
919 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
922 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
925 log.warn("Unknown group operation type {}", operation.opType());
930 public void addOrUpdateExtraneousGroupEntry(Group group) {
931 log.debug("add/update extraneous group entry {} in device {}",
934 ConcurrentMap<GroupId, Group> extraneousIdTable =
935 getExtraneousGroupIdTable(group.deviceId());
936 extraneousIdTable.put(group.id(), group);
937 // Don't remove the extraneous groups, instead re-use it when
938 // a group request comes with the same set of buckets
942 public void removeExtraneousGroupEntry(Group group) {
943 log.debug("remove extraneous group entry {} of device {} from store",
946 ConcurrentMap<GroupId, Group> extraneousIdTable =
947 getExtraneousGroupIdTable(group.deviceId());
948 extraneousIdTable.remove(group.id());
952 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
953 // flatten and make iterator unmodifiable
954 return FluentIterable.from(
955 getExtraneousGroupIdTable(deviceId).values());
959 * Map handler to receive any events when the group key map is updated.
961 private class GroupStoreKeyMapListener implements
962 EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
965 public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
966 StoredGroupEntry> mapEvent) {
967 GroupEvent groupEvent = null;
968 GroupStoreKeyMapKey key = mapEvent.key();
969 StoredGroupEntry group = mapEvent.value();
970 if ((key == null) && (group == null)) {
971 log.error("GroupStoreKeyMapListener: Received "
972 + "event {} with null entry", mapEvent.type());
974 } else if (group == null) {
975 group = getGroupIdTable(key.deviceId()).values()
977 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
980 log.error("GroupStoreKeyMapListener: Received "
981 + "event {} with null entry... can not process", mapEvent.type());
985 log.trace("received groupid map event {} for id {} in device {}",
989 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
990 // Update the group ID table
991 getGroupIdTable(group.deviceId()).put(group.id(), group);
992 if (mapEvent.value().state() == Group.GroupState.ADDED) {
993 if (mapEvent.value().isGroupStateAddedFirstTime()) {
994 groupEvent = new GroupEvent(Type.GROUP_ADDED,
996 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
1000 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
1002 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
1007 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
1008 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
1009 // Remove the entry from the group ID table
1010 getGroupIdTable(group.deviceId()).remove(group.id(), group);
1013 if (groupEvent != null) {
1014 notifyDelegate(groupEvent);
1019 private void process(GroupStoreMessage groupOp) {
1020 log.debug("Received remote group operation {} request for device {}",
1022 groupOp.deviceId());
1023 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1024 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1027 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1028 storeGroupDescriptionInternal(groupOp.groupDesc());
1029 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1030 updateGroupDescriptionInternal(groupOp.deviceId(),
1031 groupOp.appCookie(),
1032 groupOp.updateType(),
1033 groupOp.updateBuckets(),
1034 groupOp.newAppCookie());
1035 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1036 deleteGroupDescriptionInternal(groupOp.deviceId(),
1037 groupOp.appCookie());
1042 * Flattened map key to be used to store group entries.
1044 protected static class GroupStoreMapKey {
1045 private final DeviceId deviceId;
1047 public GroupStoreMapKey(DeviceId deviceId) {
1048 this.deviceId = deviceId;
1051 public DeviceId deviceId() {
1056 public boolean equals(Object o) {
1060 if (!(o instanceof GroupStoreMapKey)) {
1063 GroupStoreMapKey that = (GroupStoreMapKey) o;
1064 return this.deviceId.equals(that.deviceId);
1068 public int hashCode() {
1071 result = 31 * result + Objects.hash(this.deviceId);
1077 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
1078 private final GroupKey appCookie;
1079 public GroupStoreKeyMapKey(DeviceId deviceId,
1080 GroupKey appCookie) {
1082 this.appCookie = appCookie;
1086 public boolean equals(Object o) {
1090 if (!(o instanceof GroupStoreKeyMapKey)) {
1093 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1094 return (super.equals(that) &&
1095 this.appCookie.equals(that.appCookie));
1099 public int hashCode() {
1102 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1108 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
1109 private final GroupId groupId;
1110 public GroupStoreIdMapKey(DeviceId deviceId,
1113 this.groupId = groupId;
1117 public boolean equals(Object o) {
1121 if (!(o instanceof GroupStoreIdMapKey)) {
1124 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1125 return (super.equals(that) &&
1126 this.groupId.equals(that.groupId));
1130 public int hashCode() {
1133 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1140 public void pushGroupMetrics(DeviceId deviceId,
1141 Collection<Group> groupEntries) {
1142 boolean deviceInitialAuditStatus =
1143 deviceInitialAuditStatus(deviceId);
1144 Set<Group> southboundGroupEntries =
1145 Sets.newHashSet(groupEntries);
1146 Set<StoredGroupEntry> storedGroupEntries =
1147 Sets.newHashSet(getStoredGroups(deviceId));
1148 Set<Group> extraneousStoredEntries =
1149 Sets.newHashSet(getExtraneousGroups(deviceId));
1151 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1152 southboundGroupEntries.size(),
1154 for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
1155 Group group = it.next();
1156 log.trace("Group {} in device {}", group, deviceId);
1159 log.trace("Displaying all ({}) stored group entries for device {}",
1160 storedGroupEntries.size(),
1162 for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
1164 Group group = it1.next();
1165 log.trace("Stored Group {} for device {}", group, deviceId);
1168 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1169 Group group = it2.next();
1170 if (storedGroupEntries.remove(group)) {
1171 // we both have the group, let's update some info then.
1172 log.trace("Group AUDIT: group {} exists in both planes for device {}",
1173 group.id(), deviceId);
1178 for (Group group : southboundGroupEntries) {
1179 if (getGroup(group.deviceId(), group.id()) != null) {
1180 // There is a group existing with the same id
1181 // It is possible that group update is
1182 // in progress while we got a stale info from switch
1183 if (!storedGroupEntries.remove(getGroup(
1184 group.deviceId(), group.id()))) {
1185 log.warn("Group AUDIT: Inconsistent state:"
1186 + "Group exists in ID based table while "
1187 + "not present in key based table");
1190 // there are groups in the switch that aren't in the store
1191 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
1192 group.id(), deviceId);
1193 extraneousStoredEntries.remove(group);
1194 extraneousGroup(group);
1197 for (Group group : storedGroupEntries) {
1198 // there are groups in the store that aren't in the switch
1199 log.debug("Group AUDIT: group {} missing in data plane for device {}",
1200 group.id(), deviceId);
1201 groupMissing(group);
1203 for (Group group : extraneousStoredEntries) {
1204 // there are groups in the extraneous store that
1205 // aren't in the switch
1206 log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}",
1207 group.id(), deviceId);
1208 removeExtraneousGroupEntry(group);
1211 if (!deviceInitialAuditStatus) {
1212 log.debug("Group AUDIT: Setting device {} initial AUDIT completed",
1214 deviceInitialAuditCompleted(deviceId, true);
1218 private void groupMissing(Group group) {
1219 switch (group.state()) {
1220 case PENDING_DELETE:
1221 log.debug("Group {} delete confirmation from device {}",
1222 group, group.deviceId());
1223 removeGroupEntry(group);
1227 case PENDING_ADD_RETRY:
1228 case PENDING_UPDATE:
1229 log.debug("Group {} is in store but not on device {}",
1230 group, group.deviceId());
1231 StoredGroupEntry existing =
1232 getStoredGroupEntry(group.deviceId(), group.id());
1233 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
1235 existing.deviceId(),
1237 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
1238 //Re-PUT map entries to trigger map update events
1239 getGroupStoreKeyMap().
1240 put(new GroupStoreKeyMapKey(existing.deviceId(),
1241 existing.appCookie()), existing);
1242 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1246 log.debug("Group {} has not been installed.", group);
1251 private void extraneousGroup(Group group) {
1252 log.debug("Group {} is on device {} but not in store.",
1253 group, group.deviceId());
1254 addOrUpdateExtraneousGroupEntry(group);
1257 private void groupAdded(Group group) {
1258 log.trace("Group {} Added or Updated in device {}",
1259 group, group.deviceId());
1260 addOrUpdateGroupEntry(group);