2 * Copyright 2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.store.group.impl;
18 import com.google.common.collect.FluentIterable;
19 import com.google.common.collect.Iterables;
20 import com.google.common.collect.Sets;
22 import org.apache.felix.scr.annotations.Activate;
23 import org.apache.felix.scr.annotations.Component;
24 import org.apache.felix.scr.annotations.Deactivate;
25 import org.apache.felix.scr.annotations.Reference;
26 import org.apache.felix.scr.annotations.ReferenceCardinality;
27 import org.apache.felix.scr.annotations.Service;
28 import org.onlab.util.KryoNamespace;
29 import org.onlab.util.NewConcurrentHashMap;
30 import org.onosproject.cluster.ClusterService;
31 import org.onosproject.core.DefaultApplicationId;
32 import org.onosproject.core.DefaultGroupId;
33 import org.onosproject.core.GroupId;
34 import org.onosproject.mastership.MastershipService;
35 import org.onosproject.net.DeviceId;
36 import org.onosproject.net.MastershipRole;
37 import org.onosproject.net.PortNumber;
38 import org.onosproject.net.flow.DefaultTrafficTreatment;
39 import org.onosproject.net.flow.FlowRule;
40 import org.onosproject.net.flow.instructions.Instructions;
41 import org.onosproject.net.flow.instructions.L0ModificationInstruction;
42 import org.onosproject.net.flow.instructions.L2ModificationInstruction;
43 import org.onosproject.net.flow.instructions.L3ModificationInstruction;
44 import org.onosproject.net.group.DefaultGroup;
45 import org.onosproject.net.group.DefaultGroupBucket;
46 import org.onosproject.net.group.DefaultGroupDescription;
47 import org.onosproject.net.group.DefaultGroupKey;
48 import org.onosproject.net.group.Group;
49 import org.onosproject.net.group.Group.GroupState;
50 import org.onosproject.net.group.GroupBucket;
51 import org.onosproject.net.group.GroupBuckets;
52 import org.onosproject.net.group.GroupDescription;
53 import org.onosproject.net.group.GroupEvent;
54 import org.onosproject.net.group.GroupEvent.Type;
55 import org.onosproject.net.group.GroupKey;
56 import org.onosproject.net.group.GroupOperation;
57 import org.onosproject.net.group.GroupStore;
58 import org.onosproject.net.group.GroupStoreDelegate;
59 import org.onosproject.net.group.StoredGroupBucketEntry;
60 import org.onosproject.net.group.StoredGroupEntry;
61 import org.onosproject.store.AbstractStore;
62 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
63 import org.onosproject.store.service.MultiValuedTimestamp;
64 import org.onosproject.store.serializers.DeviceIdSerializer;
65 import org.onosproject.store.serializers.KryoNamespaces;
66 import org.onosproject.store.serializers.URISerializer;
67 import org.onosproject.store.service.EventuallyConsistentMap;
68 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
69 import org.onosproject.store.service.EventuallyConsistentMapEvent;
70 import org.onosproject.store.service.EventuallyConsistentMapListener;
71 import org.onosproject.store.service.StorageService;
72 import org.slf4j.Logger;
75 import java.util.ArrayList;
76 import java.util.Collection;
77 import java.util.HashMap;
78 import java.util.Iterator;
79 import java.util.List;
80 import java.util.Objects;
81 import java.util.Optional;
83 import java.util.concurrent.ConcurrentHashMap;
84 import java.util.concurrent.ConcurrentMap;
85 import java.util.concurrent.ExecutorService;
86 import java.util.concurrent.Executors;
87 import java.util.concurrent.atomic.AtomicInteger;
88 import java.util.concurrent.atomic.AtomicLong;
89 import java.util.stream.Collectors;
91 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
92 import static org.onlab.util.Tools.groupedThreads;
93 import static org.slf4j.LoggerFactory.getLogger;
96 * Manages inventory of group entries using trivial in-memory implementation.
98 @Component(immediate = true)
100 public class DistributedGroupStore
101 extends AbstractStore<GroupEvent, GroupStoreDelegate>
102 implements GroupStore {
104 private final Logger log = getLogger(getClass());
106 private final int dummyId = 0xffffffff;
107 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110 protected ClusterCommunicationService clusterCommunicator;
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 protected ClusterService clusterService;
115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116 protected StorageService storageService;
118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected MastershipService mastershipService;
121 // Per device group table with (device id + app cookie) as key
122 private EventuallyConsistentMap<GroupStoreKeyMapKey,
123 StoredGroupEntry> groupStoreEntriesByKey = null;
124 // Per device group table with (device id + group id) as key
125 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
126 groupEntriesById = new ConcurrentHashMap<>();
127 private EventuallyConsistentMap<GroupStoreKeyMapKey,
128 StoredGroupEntry> auditPendingReqQueue = null;
129 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
130 extraneousGroupEntriesById = new ConcurrentHashMap<>();
131 private ExecutorService messageHandlingExecutor;
132 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
134 private final HashMap<DeviceId, Boolean> deviceAuditStatus = new HashMap<>();
136 private final AtomicInteger groupIdGen = new AtomicInteger();
138 private KryoNamespace.Builder kryoBuilder = null;
140 private final AtomicLong sequenceNumber = new AtomicLong(0);
143 public void activate() {
144 kryoBuilder = new KryoNamespace.Builder()
145 .register(DefaultGroup.class,
146 DefaultGroupBucket.class,
147 DefaultGroupDescription.class,
148 DefaultGroupKey.class,
149 GroupDescription.Type.class,
150 Group.GroupState.class,
152 DefaultGroupId.class,
153 GroupStoreMessage.class,
154 GroupStoreMessage.Type.class,
156 GroupStoreMessageSubjects.class,
157 MultiValuedTimestamp.class,
158 GroupStoreKeyMapKey.class,
159 GroupStoreIdMapKey.class,
160 GroupStoreMapKey.class
162 .register(new URISerializer(), URI.class)
163 .register(new DeviceIdSerializer(), DeviceId.class)
164 .register(PortNumber.class)
165 .register(DefaultApplicationId.class)
166 .register(DefaultTrafficTreatment.class,
167 Instructions.DropInstruction.class,
168 Instructions.OutputInstruction.class,
169 Instructions.GroupInstruction.class,
170 Instructions.TableTypeTransition.class,
172 L0ModificationInstruction.class,
173 L0ModificationInstruction.L0SubType.class,
174 L0ModificationInstruction.ModLambdaInstruction.class,
175 L2ModificationInstruction.class,
176 L2ModificationInstruction.L2SubType.class,
177 L2ModificationInstruction.ModEtherInstruction.class,
178 L2ModificationInstruction.PushHeaderInstructions.class,
179 L2ModificationInstruction.ModVlanIdInstruction.class,
180 L2ModificationInstruction.ModVlanPcpInstruction.class,
181 L2ModificationInstruction.ModMplsLabelInstruction.class,
182 L2ModificationInstruction.ModMplsTtlInstruction.class,
183 L3ModificationInstruction.class,
184 L3ModificationInstruction.L3SubType.class,
185 L3ModificationInstruction.ModIPInstruction.class,
186 L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
187 L3ModificationInstruction.ModTtlInstruction.class,
188 org.onlab.packet.MplsLabel.class
190 .register(org.onosproject.cluster.NodeId.class)
191 .register(KryoNamespaces.BASIC)
192 .register(KryoNamespaces.MISC);
194 messageHandlingExecutor = Executors.
195 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
196 groupedThreads("onos/store/group",
197 "message-handlers"));
199 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
200 kryoBuilder.build()::deserialize,
202 messageHandlingExecutor);
204 log.debug("Creating EC map groupstorekeymap");
205 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
206 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
208 groupStoreEntriesByKey = keyMapBuilder
209 .withName("groupstorekeymap")
210 .withSerializer(kryoBuilder)
211 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
212 sequenceNumber.getAndIncrement()))
214 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
215 log.debug("Current size of groupstorekeymap:{}",
216 groupStoreEntriesByKey.size());
218 log.debug("Creating EC map pendinggroupkeymap");
219 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
220 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
222 auditPendingReqQueue = auditMapBuilder
223 .withName("pendinggroupkeymap")
224 .withSerializer(kryoBuilder)
225 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
226 sequenceNumber.getAndIncrement()))
228 log.debug("Current size of pendinggroupkeymap:{}",
229 auditPendingReqQueue.size());
235 public void deactivate() {
236 groupStoreEntriesByKey.destroy();
237 auditPendingReqQueue.destroy();
241 private static NewConcurrentHashMap<GroupId, Group>
242 lazyEmptyExtraneousGroupIdTable() {
243 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
246 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
247 lazyEmptyGroupIdTable() {
248 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
252 * Returns the group store eventual consistent key map.
254 * @return Map representing group key table.
256 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
257 getGroupStoreKeyMap() {
258 return groupStoreEntriesByKey;
262 * Returns the group id table for specified device.
264 * @param deviceId identifier of the device
265 * @return Map representing group key table of given device.
267 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
268 return createIfAbsentUnchecked(groupEntriesById,
269 deviceId, lazyEmptyGroupIdTable());
273 * Returns the pending group request table.
275 * @return Map representing group key table.
277 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
278 getPendingGroupKeyTable() {
279 return auditPendingReqQueue;
283 * Returns the extraneous group id table for specified device.
285 * @param deviceId identifier of the device
286 * @return Map representing group key table of given device.
288 private ConcurrentMap<GroupId, Group>
289 getExtraneousGroupIdTable(DeviceId deviceId) {
290 return createIfAbsentUnchecked(extraneousGroupEntriesById,
292 lazyEmptyExtraneousGroupIdTable());
296 * Returns the number of groups for the specified device in the store.
298 * @return number of groups for the specified device
301 public int getGroupCount(DeviceId deviceId) {
302 return (getGroups(deviceId) != null) ?
303 Iterables.size(getGroups(deviceId)) : 0;
307 * Returns the groups associated with a device.
309 * @param deviceId the device ID
311 * @return the group entries
314 public Iterable<Group> getGroups(DeviceId deviceId) {
315 // flatten and make iterator unmodifiable
316 log.debug("getGroups: for device {} total number of groups {}",
317 deviceId, getGroupStoreKeyMap().values().size());
318 return FluentIterable.from(getGroupStoreKeyMap().values())
319 .filter(input -> input.deviceId().equals(deviceId))
320 .transform(input -> input);
323 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
324 // flatten and make iterator unmodifiable
325 log.debug("getGroups: for device {} total number of groups {}",
326 deviceId, getGroupStoreKeyMap().values().size());
327 return FluentIterable.from(getGroupStoreKeyMap().values())
328 .filter(input -> input.deviceId().equals(deviceId));
332 * Returns the stored group entry.
334 * @param deviceId the device ID
335 * @param appCookie the group key
337 * @return a group associated with the key
340 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
341 return getStoredGroupEntry(deviceId, appCookie);
344 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
345 GroupKey appCookie) {
346 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
351 public Group getGroup(DeviceId deviceId, GroupId groupId) {
352 return getStoredGroupEntry(deviceId, groupId);
355 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
357 return getGroupIdTable(deviceId).get(groupId);
360 private int getFreeGroupIdValue(DeviceId deviceId) {
361 int freeId = groupIdGen.incrementAndGet();
364 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
365 if (existing == null) {
367 extraneousGroupEntriesById.get(deviceId) != null) ?
368 extraneousGroupEntriesById.get(deviceId).
369 get(new DefaultGroupId(freeId)) :
372 if (existing != null) {
373 freeId = groupIdGen.incrementAndGet();
378 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
383 * Stores a new group entry using the information from group description.
385 * @param groupDesc group description to be used to create group entry
388 public void storeGroupDescription(GroupDescription groupDesc) {
389 log.debug("In storeGroupDescription");
390 // Check if a group is existing with the same key
391 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
392 log.warn("Group already exists with the same key {}",
393 groupDesc.appCookie());
397 // Check if group to be created by a remote instance
398 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
399 log.debug("storeGroupDescription: Device {} local role is not MASTER",
400 groupDesc.deviceId());
401 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
402 log.error("No Master for device {}..."
403 + "Can not perform add group operation",
404 groupDesc.deviceId());
405 //TODO: Send Group operation failure event
408 GroupStoreMessage groupOp = GroupStoreMessage.
409 createGroupAddRequestMsg(groupDesc.deviceId(),
412 clusterCommunicator.unicast(groupOp,
413 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
414 m -> kryoBuilder.build().serialize(m),
415 mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
417 log.warn("Failed to send request to master: {} to {}",
419 mastershipService.getMasterFor(groupDesc.deviceId()));
420 //TODO: Send Group operation failure event
422 log.debug("Sent Group operation request for device {} "
423 + "to remote MASTER {}",
424 groupDesc.deviceId(),
425 mastershipService.getMasterFor(groupDesc.deviceId()));
431 log.debug("Store group for device {} is getting handled locally",
432 groupDesc.deviceId());
433 storeGroupDescriptionInternal(groupDesc);
436 private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
437 ConcurrentMap<GroupId, Group> extraneousMap =
438 extraneousGroupEntriesById.get(deviceId);
439 if (extraneousMap == null) {
442 return extraneousMap.get(new DefaultGroupId(groupId));
445 private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
446 GroupBuckets buckets) {
447 ConcurrentMap<GroupId, Group> extraneousMap =
448 extraneousGroupEntriesById.get(deviceId);
449 if (extraneousMap == null) {
453 for (Group extraneousGroup:extraneousMap.values()) {
454 if (extraneousGroup.buckets().equals(buckets)) {
455 return extraneousGroup;
461 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
462 // Check if a group is existing with the same key
463 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
467 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
468 // Device group audit has not completed yet
469 // Add this group description to pending group key table
470 // Create a group entry object with Dummy Group ID
471 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
472 groupDesc.deviceId());
473 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
474 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
475 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
476 getPendingGroupKeyTable();
477 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
478 groupDesc.appCookie()),
483 Group matchingExtraneousGroup = null;
484 if (groupDesc.givenGroupId() != null) {
485 //Check if there is a extraneous group existing with the same Id
486 matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
487 groupDesc.deviceId(), groupDesc.givenGroupId());
488 if (matchingExtraneousGroup != null) {
489 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {} for group id {}",
490 groupDesc.deviceId(),
491 groupDesc.givenGroupId());
492 //Check if the group buckets matches with user provided buckets
493 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
494 //Group is already existing with the same buckets and Id
495 // Create a group entry object
496 log.debug("storeGroupDescriptionInternal: Buckets also matching in Device {} for group id {}",
497 groupDesc.deviceId(),
498 groupDesc.givenGroupId());
499 StoredGroupEntry group = new DefaultGroup(
500 matchingExtraneousGroup.id(), groupDesc);
501 // Insert the newly created group entry into key and id maps
502 getGroupStoreKeyMap().
503 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
504 groupDesc.appCookie()), group);
505 // Ensure it also inserted into group id based table to
506 // avoid any chances of duplication in group id generation
507 getGroupIdTable(groupDesc.deviceId()).
508 put(matchingExtraneousGroup.id(), group);
509 addOrUpdateGroupEntry(matchingExtraneousGroup);
510 removeExtraneousGroupEntry(matchingExtraneousGroup);
513 //Group buckets are not matching. Update group
514 //with user provided buckets.
516 log.debug("storeGroupDescriptionInternal: Buckets are not matching in Device {} for group id {}",
517 groupDesc.deviceId(),
518 groupDesc.givenGroupId());
522 //Check if there is an extraneous group with user provided buckets
523 matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
524 groupDesc.deviceId(), groupDesc.buckets());
525 if (matchingExtraneousGroup != null) {
526 //Group is already existing with the same buckets.
527 //So reuse this group.
528 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
529 groupDesc.deviceId());
530 //Create a group entry object
531 StoredGroupEntry group = new DefaultGroup(
532 matchingExtraneousGroup.id(), groupDesc);
533 // Insert the newly created group entry into key and id maps
534 getGroupStoreKeyMap().
535 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
536 groupDesc.appCookie()), group);
537 // Ensure it also inserted into group id based table to
538 // avoid any chances of duplication in group id generation
539 getGroupIdTable(groupDesc.deviceId()).
540 put(matchingExtraneousGroup.id(), group);
541 addOrUpdateGroupEntry(matchingExtraneousGroup);
542 removeExtraneousGroupEntry(matchingExtraneousGroup);
545 //TODO: Check if there are any empty groups that can be used here
546 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
547 groupDesc.deviceId());
552 if (groupDesc.givenGroupId() == null) {
553 // Get a new group identifier
554 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
556 id = new DefaultGroupId(groupDesc.givenGroupId());
558 // Create a group entry object
559 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
560 // Insert the newly created group entry into key and id maps
561 getGroupStoreKeyMap().
562 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
563 groupDesc.appCookie()), group);
564 // Ensure it also inserted into group id based table to
565 // avoid any chances of duplication in group id generation
566 getGroupIdTable(groupDesc.deviceId()).
568 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
570 groupDesc.deviceId());
571 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
576 * Updates the existing group entry with the information
577 * from group description.
579 * @param deviceId the device ID
580 * @param oldAppCookie the current group key
581 * @param type update type
582 * @param newBuckets group buckets for updates
583 * @param newAppCookie optional new group key
586 public void updateGroupDescription(DeviceId deviceId,
587 GroupKey oldAppCookie,
589 GroupBuckets newBuckets,
590 GroupKey newAppCookie) {
591 // Check if group update to be done by a remote instance
592 if (mastershipService.getMasterFor(deviceId) != null &&
593 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
594 log.debug("updateGroupDescription: Device {} local role is not MASTER",
596 if (mastershipService.getMasterFor(deviceId) == null) {
597 log.error("No Master for device {}..."
598 + "Can not perform update group operation",
600 //TODO: Send Group operation failure event
603 GroupStoreMessage groupOp = GroupStoreMessage.
604 createGroupUpdateRequestMsg(deviceId,
610 clusterCommunicator.unicast(groupOp,
611 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
612 m -> kryoBuilder.build().serialize(m),
613 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
615 log.warn("Failed to send request to master: {} to {}",
617 mastershipService.getMasterFor(deviceId), error);
619 //TODO: Send Group operation failure event
623 log.debug("updateGroupDescription for device {} is getting handled locally",
625 updateGroupDescriptionInternal(deviceId,
632 private void updateGroupDescriptionInternal(DeviceId deviceId,
633 GroupKey oldAppCookie,
635 GroupBuckets newBuckets,
636 GroupKey newAppCookie) {
637 // Check if a group is existing with the provided key
638 Group oldGroup = getGroup(deviceId, oldAppCookie);
639 if (oldGroup == null) {
640 log.warn("updateGroupDescriptionInternal: Group not found...strange");
644 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
647 if (newBucketList != null) {
648 // Create a new group object from the old group
649 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
650 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
651 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
656 oldGroup.givenGroupId(),
658 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
660 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
664 newGroup.setState(GroupState.PENDING_UPDATE);
665 newGroup.setLife(oldGroup.life());
666 newGroup.setPackets(oldGroup.packets());
667 newGroup.setBytes(oldGroup.bytes());
668 //Update the group entry in groupkey based map.
669 //Update to groupid based map will happen in the
670 //groupkey based map update listener
671 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
673 getGroupStoreKeyMap().
674 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
675 newGroup.appCookie()), newGroup);
676 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
678 log.warn("updateGroupDescriptionInternal with type {}: No "
679 + "change in the buckets in update", type);
683 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
685 GroupBuckets buckets) {
686 GroupBuckets oldBuckets = oldGroup.buckets();
687 List<GroupBucket> newBucketList = new ArrayList<>(oldBuckets.buckets());
688 boolean groupDescUpdated = false;
690 if (type == UpdateType.ADD) {
691 // Check if the any of the new buckets are part of
692 // the old bucket list
693 for (GroupBucket addBucket:buckets.buckets()) {
694 if (!newBucketList.contains(addBucket)) {
695 newBucketList.add(addBucket);
696 groupDescUpdated = true;
699 } else if (type == UpdateType.REMOVE) {
700 // Check if the to be removed buckets are part of the
702 for (GroupBucket removeBucket:buckets.buckets()) {
703 if (newBucketList.contains(removeBucket)) {
704 newBucketList.remove(removeBucket);
705 groupDescUpdated = true;
710 if (groupDescUpdated) {
711 return newBucketList;
718 * Triggers deleting the existing group entry.
720 * @param deviceId the device ID
721 * @param appCookie the group key
724 public void deleteGroupDescription(DeviceId deviceId,
725 GroupKey appCookie) {
726 // Check if group to be deleted by a remote instance
727 if (mastershipService.
728 getLocalRole(deviceId) != MastershipRole.MASTER) {
729 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
731 if (mastershipService.getMasterFor(deviceId) == null) {
732 log.error("No Master for device {}..."
733 + "Can not perform delete group operation",
735 //TODO: Send Group operation failure event
738 GroupStoreMessage groupOp = GroupStoreMessage.
739 createGroupDeleteRequestMsg(deviceId,
742 clusterCommunicator.unicast(groupOp,
743 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
744 m -> kryoBuilder.build().serialize(m),
745 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
747 log.warn("Failed to send request to master: {} to {}",
749 mastershipService.getMasterFor(deviceId), error);
751 //TODO: Send Group operation failure event
755 log.debug("deleteGroupDescription in device {} is getting handled locally",
757 deleteGroupDescriptionInternal(deviceId, appCookie);
760 private void deleteGroupDescriptionInternal(DeviceId deviceId,
761 GroupKey appCookie) {
762 // Check if a group is existing with the provided key
763 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
764 if (existing == null) {
768 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
772 synchronized (existing) {
773 existing.setState(GroupState.PENDING_DELETE);
775 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
777 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
781 * Stores a new group entry, or updates an existing entry.
783 * @param group group entry
786 public void addOrUpdateGroupEntry(Group group) {
787 // check if this new entry is an update to an existing entry
788 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
790 GroupEvent event = null;
792 if (existing != null) {
793 log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}",
796 synchronized (existing) {
797 for (GroupBucket bucket:group.buckets().buckets()) {
798 Optional<GroupBucket> matchingBucket =
799 existing.buckets().buckets()
801 .filter((existingBucket)->(existingBucket.equals(bucket)))
803 if (matchingBucket.isPresent()) {
804 ((StoredGroupBucketEntry) matchingBucket.
805 get()).setPackets(bucket.packets());
806 ((StoredGroupBucketEntry) matchingBucket.
807 get()).setBytes(bucket.bytes());
809 log.warn("addOrUpdateGroupEntry: No matching "
810 + "buckets to update stats");
813 existing.setLife(group.life());
814 existing.setPackets(group.packets());
815 existing.setBytes(group.bytes());
816 if ((existing.state() == GroupState.PENDING_ADD) ||
817 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
818 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
822 existing.setState(GroupState.ADDED);
823 existing.setIsGroupStateAddedFirstTime(true);
824 event = new GroupEvent(Type.GROUP_ADDED, existing);
826 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
829 GroupState.PENDING_UPDATE);
830 existing.setState(GroupState.ADDED);
831 existing.setIsGroupStateAddedFirstTime(false);
832 event = new GroupEvent(Type.GROUP_UPDATED, existing);
834 //Re-PUT map entries to trigger map update events
835 getGroupStoreKeyMap().
836 put(new GroupStoreKeyMapKey(existing.deviceId(),
837 existing.appCookie()), existing);
840 log.warn("addOrUpdateGroupEntry: Group update "
841 + "happening for a non-existing entry in the map");
845 notifyDelegate(event);
850 * Removes the group entry from store.
852 * @param group group entry
855 public void removeGroupEntry(Group group) {
856 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
859 if (existing != null) {
860 log.debug("removeGroupEntry: removing group entry {} in device {}",
863 //Removal from groupid based map will happen in the
864 //map update listener
865 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
866 existing.appCookie()));
867 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
869 log.warn("removeGroupEntry for {} in device{} is "
870 + "not existing in our maps",
877 public void deviceInitialAuditCompleted(DeviceId deviceId,
879 synchronized (deviceAuditStatus) {
881 log.debug("AUDIT completed for device {}",
883 deviceAuditStatus.put(deviceId, true);
884 // Execute all pending group requests
885 List<StoredGroupEntry> pendingGroupRequests =
886 getPendingGroupKeyTable().values()
888 .filter(g-> g.deviceId().equals(deviceId))
889 .collect(Collectors.toList());
890 log.debug("processing pending group add requests for device {} and number of pending requests {}",
892 pendingGroupRequests.size());
893 for (Group group:pendingGroupRequests) {
894 GroupDescription tmp = new DefaultGroupDescription(
899 group.givenGroupId(),
901 storeGroupDescriptionInternal(tmp);
902 getPendingGroupKeyTable().
903 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
906 Boolean audited = deviceAuditStatus.get(deviceId);
907 if (audited != null && audited) {
908 log.debug("Clearing AUDIT status for device {}", deviceId);
909 deviceAuditStatus.put(deviceId, false);
916 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
917 synchronized (deviceAuditStatus) {
918 Boolean audited = deviceAuditStatus.get(deviceId);
919 return audited != null && audited;
924 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
926 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
927 operation.groupId());
929 if (existing == null) {
930 log.warn("No group entry with ID {} found ", operation.groupId());
934 log.warn("groupOperationFailed: group operation {} failed"
935 + "for group {} in device {}",
938 existing.deviceId());
939 switch (operation.opType()) {
941 if (existing.state() == GroupState.PENDING_ADD) {
942 //TODO: Need to add support for passing the group
943 //operation failure reason from group provider.
944 //If the error type is anything other than GROUP_EXISTS,
945 //then the GROUP_ADD_FAILED event should be raised even
946 //in PENDING_ADD_RETRY state also.
947 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
948 log.warn("groupOperationFailed: cleaningup "
949 + "group {} from store in device {}....",
951 existing.deviceId());
952 //Removal from groupid based map will happen in the
953 //map update listener
954 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
955 existing.appCookie()));
959 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
962 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
965 log.warn("Unknown group operation type {}", operation.opType());
970 public void addOrUpdateExtraneousGroupEntry(Group group) {
971 log.debug("add/update extraneous group entry {} in device {}",
974 ConcurrentMap<GroupId, Group> extraneousIdTable =
975 getExtraneousGroupIdTable(group.deviceId());
976 extraneousIdTable.put(group.id(), group);
977 // Don't remove the extraneous groups, instead re-use it when
978 // a group request comes with the same set of buckets
982 public void removeExtraneousGroupEntry(Group group) {
983 log.debug("remove extraneous group entry {} of device {} from store",
986 ConcurrentMap<GroupId, Group> extraneousIdTable =
987 getExtraneousGroupIdTable(group.deviceId());
988 extraneousIdTable.remove(group.id());
992 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
993 // flatten and make iterator unmodifiable
994 return FluentIterable.from(
995 getExtraneousGroupIdTable(deviceId).values());
999 * Map handler to receive any events when the group key map is updated.
1001 private class GroupStoreKeyMapListener implements
1002 EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
1005 public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
1006 StoredGroupEntry> mapEvent) {
1007 GroupEvent groupEvent = null;
1008 GroupStoreKeyMapKey key = mapEvent.key();
1009 StoredGroupEntry group = mapEvent.value();
1010 if ((key == null) && (group == null)) {
1011 log.error("GroupStoreKeyMapListener: Received "
1012 + "event {} with null entry", mapEvent.type());
1014 } else if (group == null) {
1015 group = getGroupIdTable(key.deviceId()).values()
1017 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
1019 if (group == null) {
1020 log.error("GroupStoreKeyMapListener: Received "
1021 + "event {} with null entry... can not process", mapEvent.type());
1025 log.trace("received groupid map event {} for id {} in device {}",
1029 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
1030 // Update the group ID table
1031 getGroupIdTable(group.deviceId()).put(group.id(), group);
1032 if (mapEvent.value().state() == Group.GroupState.ADDED) {
1033 if (mapEvent.value().isGroupStateAddedFirstTime()) {
1034 groupEvent = new GroupEvent(Type.GROUP_ADDED,
1036 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
1040 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
1042 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
1047 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
1048 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
1049 // Remove the entry from the group ID table
1050 getGroupIdTable(group.deviceId()).remove(group.id(), group);
1053 if (groupEvent != null) {
1054 notifyDelegate(groupEvent);
1059 private void process(GroupStoreMessage groupOp) {
1060 log.debug("Received remote group operation {} request for device {}",
1062 groupOp.deviceId());
1063 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1064 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1067 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1068 storeGroupDescriptionInternal(groupOp.groupDesc());
1069 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1070 updateGroupDescriptionInternal(groupOp.deviceId(),
1071 groupOp.appCookie(),
1072 groupOp.updateType(),
1073 groupOp.updateBuckets(),
1074 groupOp.newAppCookie());
1075 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1076 deleteGroupDescriptionInternal(groupOp.deviceId(),
1077 groupOp.appCookie());
1082 * Flattened map key to be used to store group entries.
1084 protected static class GroupStoreMapKey {
1085 private final DeviceId deviceId;
1087 public GroupStoreMapKey(DeviceId deviceId) {
1088 this.deviceId = deviceId;
1091 public DeviceId deviceId() {
1096 public boolean equals(Object o) {
1100 if (!(o instanceof GroupStoreMapKey)) {
1103 GroupStoreMapKey that = (GroupStoreMapKey) o;
1104 return this.deviceId.equals(that.deviceId);
1108 public int hashCode() {
1111 result = 31 * result + Objects.hash(this.deviceId);
1117 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
1118 private final GroupKey appCookie;
1119 public GroupStoreKeyMapKey(DeviceId deviceId,
1120 GroupKey appCookie) {
1122 this.appCookie = appCookie;
1126 public boolean equals(Object o) {
1130 if (!(o instanceof GroupStoreKeyMapKey)) {
1133 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1134 return (super.equals(that) &&
1135 this.appCookie.equals(that.appCookie));
1139 public int hashCode() {
1142 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1148 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
1149 private final GroupId groupId;
1150 public GroupStoreIdMapKey(DeviceId deviceId,
1153 this.groupId = groupId;
1157 public boolean equals(Object o) {
1161 if (!(o instanceof GroupStoreIdMapKey)) {
1164 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1165 return (super.equals(that) &&
1166 this.groupId.equals(that.groupId));
1170 public int hashCode() {
1173 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1180 public void pushGroupMetrics(DeviceId deviceId,
1181 Collection<Group> groupEntries) {
1182 boolean deviceInitialAuditStatus =
1183 deviceInitialAuditStatus(deviceId);
1184 Set<Group> southboundGroupEntries =
1185 Sets.newHashSet(groupEntries);
1186 Set<StoredGroupEntry> storedGroupEntries =
1187 Sets.newHashSet(getStoredGroups(deviceId));
1188 Set<Group> extraneousStoredEntries =
1189 Sets.newHashSet(getExtraneousGroups(deviceId));
1191 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1192 southboundGroupEntries.size(),
1194 for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
1195 Group group = it.next();
1196 log.trace("Group {} in device {}", group, deviceId);
1199 log.trace("Displaying all ({}) stored group entries for device {}",
1200 storedGroupEntries.size(),
1202 for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
1204 Group group = it1.next();
1205 log.trace("Stored Group {} for device {}", group, deviceId);
1208 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1209 Group group = it2.next();
1210 if (storedGroupEntries.remove(group)) {
1211 // we both have the group, let's update some info then.
1212 log.trace("Group AUDIT: group {} exists in both planes for device {}",
1213 group.id(), deviceId);
1218 for (Group group : southboundGroupEntries) {
1219 if (getGroup(group.deviceId(), group.id()) != null) {
1220 // There is a group existing with the same id
1221 // It is possible that group update is
1222 // in progress while we got a stale info from switch
1223 if (!storedGroupEntries.remove(getGroup(
1224 group.deviceId(), group.id()))) {
1225 log.warn("Group AUDIT: Inconsistent state:"
1226 + "Group exists in ID based table while "
1227 + "not present in key based table");
1230 // there are groups in the switch that aren't in the store
1231 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
1232 group.id(), deviceId);
1233 extraneousStoredEntries.remove(group);
1234 extraneousGroup(group);
1237 for (Group group : storedGroupEntries) {
1238 // there are groups in the store that aren't in the switch
1239 log.debug("Group AUDIT: group {} missing in data plane for device {}",
1240 group.id(), deviceId);
1241 groupMissing(group);
1243 for (Group group : extraneousStoredEntries) {
1244 // there are groups in the extraneous store that
1245 // aren't in the switch
1246 log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}",
1247 group.id(), deviceId);
1248 removeExtraneousGroupEntry(group);
1251 if (!deviceInitialAuditStatus) {
1252 log.debug("Group AUDIT: Setting device {} initial AUDIT completed",
1254 deviceInitialAuditCompleted(deviceId, true);
1258 private void groupMissing(Group group) {
1259 switch (group.state()) {
1260 case PENDING_DELETE:
1261 log.debug("Group {} delete confirmation from device {}",
1262 group, group.deviceId());
1263 removeGroupEntry(group);
1267 case PENDING_ADD_RETRY:
1268 case PENDING_UPDATE:
1269 log.debug("Group {} is in store but not on device {}",
1270 group, group.deviceId());
1271 StoredGroupEntry existing =
1272 getStoredGroupEntry(group.deviceId(), group.id());
1273 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
1275 existing.deviceId(),
1277 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
1278 //Re-PUT map entries to trigger map update events
1279 getGroupStoreKeyMap().
1280 put(new GroupStoreKeyMapKey(existing.deviceId(),
1281 existing.appCookie()), existing);
1282 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1286 log.debug("Group {} has not been installed.", group);
1291 private void extraneousGroup(Group group) {
1292 log.debug("Group {} is on device {} but not in store.",
1293 group, group.deviceId());
1294 addOrUpdateExtraneousGroupEntry(group);
1297 private void groupAdded(Group group) {
1298 log.trace("Group {} Added or Updated in device {}",
1299 group, group.deviceId());
1300 addOrUpdateGroupEntry(group);