2 * Copyright 2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.store.group.impl;
18 import com.google.common.collect.FluentIterable;
19 import com.google.common.collect.Iterables;
20 import com.google.common.collect.Sets;
22 import org.apache.felix.scr.annotations.Activate;
23 import org.apache.felix.scr.annotations.Component;
24 import org.apache.felix.scr.annotations.Deactivate;
25 import org.apache.felix.scr.annotations.Reference;
26 import org.apache.felix.scr.annotations.ReferenceCardinality;
27 import org.apache.felix.scr.annotations.Service;
28 import org.onlab.util.KryoNamespace;
29 import org.onlab.util.NewConcurrentHashMap;
30 import org.onosproject.cluster.ClusterService;
31 import org.onosproject.core.DefaultApplicationId;
32 import org.onosproject.core.DefaultGroupId;
33 import org.onosproject.core.GroupId;
34 import org.onosproject.mastership.MastershipService;
35 import org.onosproject.net.DeviceId;
36 import org.onosproject.net.MastershipRole;
37 import org.onosproject.net.PortNumber;
38 import org.onosproject.net.flow.DefaultTrafficTreatment;
39 import org.onosproject.net.flow.FlowRule;
40 import org.onosproject.net.flow.instructions.Instructions;
41 import org.onosproject.net.flow.instructions.L0ModificationInstruction;
42 import org.onosproject.net.flow.instructions.L2ModificationInstruction;
43 import org.onosproject.net.flow.instructions.L3ModificationInstruction;
44 import org.onosproject.net.group.DefaultGroup;
45 import org.onosproject.net.group.DefaultGroupBucket;
46 import org.onosproject.net.group.DefaultGroupDescription;
47 import org.onosproject.net.group.DefaultGroupKey;
48 import org.onosproject.net.group.Group;
49 import org.onosproject.net.group.Group.GroupState;
50 import org.onosproject.net.group.GroupBucket;
51 import org.onosproject.net.group.GroupBuckets;
52 import org.onosproject.net.group.GroupDescription;
53 import org.onosproject.net.group.GroupEvent;
54 import org.onosproject.net.group.GroupEvent.Type;
55 import org.onosproject.net.group.GroupKey;
56 import org.onosproject.net.group.GroupOperation;
57 import org.onosproject.net.group.GroupStore;
58 import org.onosproject.net.group.GroupStoreDelegate;
59 import org.onosproject.net.group.StoredGroupBucketEntry;
60 import org.onosproject.net.group.StoredGroupEntry;
61 import org.onosproject.store.AbstractStore;
62 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
63 import org.onosproject.store.service.MultiValuedTimestamp;
64 import org.onosproject.store.serializers.DeviceIdSerializer;
65 import org.onosproject.store.serializers.KryoNamespaces;
66 import org.onosproject.store.serializers.URISerializer;
67 import org.onosproject.store.service.EventuallyConsistentMap;
68 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
69 import org.onosproject.store.service.EventuallyConsistentMapEvent;
70 import org.onosproject.store.service.EventuallyConsistentMapListener;
71 import org.onosproject.store.service.StorageService;
72 import org.slf4j.Logger;
75 import java.util.ArrayList;
76 import java.util.Collection;
77 import java.util.HashMap;
78 import java.util.Iterator;
79 import java.util.List;
80 import java.util.Objects;
81 import java.util.Optional;
83 import java.util.concurrent.ConcurrentHashMap;
84 import java.util.concurrent.ConcurrentMap;
85 import java.util.concurrent.ExecutorService;
86 import java.util.concurrent.Executors;
87 import java.util.concurrent.atomic.AtomicInteger;
88 import java.util.concurrent.atomic.AtomicLong;
89 import java.util.stream.Collectors;
91 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
92 import static org.onlab.util.Tools.groupedThreads;
93 import static org.slf4j.LoggerFactory.getLogger;
96 * Manages inventory of group entries using trivial in-memory implementation.
98 @Component(immediate = true)
100 public class DistributedGroupStore
101 extends AbstractStore<GroupEvent, GroupStoreDelegate>
102 implements GroupStore {
104 private final Logger log = getLogger(getClass());
106 private final int dummyId = 0xffffffff;
107 private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110 protected ClusterCommunicationService clusterCommunicator;
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 protected ClusterService clusterService;
115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116 protected StorageService storageService;
118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected MastershipService mastershipService;
121 // Per device group table with (device id + app cookie) as key
122 private EventuallyConsistentMap<GroupStoreKeyMapKey,
123 StoredGroupEntry> groupStoreEntriesByKey = null;
124 // Per device group table with (device id + group id) as key
125 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
126 groupEntriesById = new ConcurrentHashMap<>();
127 private EventuallyConsistentMap<GroupStoreKeyMapKey,
128 StoredGroupEntry> auditPendingReqQueue = null;
129 private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
130 extraneousGroupEntriesById = new ConcurrentHashMap<>();
131 private ExecutorService messageHandlingExecutor;
132 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
134 private final HashMap<DeviceId, Boolean> deviceAuditStatus =
135 new HashMap<DeviceId, Boolean>();
137 private final AtomicInteger groupIdGen = new AtomicInteger();
139 private KryoNamespace.Builder kryoBuilder = null;
141 private final AtomicLong sequenceNumber = new AtomicLong(0);
144 public void activate() {
145 kryoBuilder = new KryoNamespace.Builder()
146 .register(DefaultGroup.class,
147 DefaultGroupBucket.class,
148 DefaultGroupDescription.class,
149 DefaultGroupKey.class,
150 GroupDescription.Type.class,
151 Group.GroupState.class,
153 DefaultGroupId.class,
154 GroupStoreMessage.class,
155 GroupStoreMessage.Type.class,
157 GroupStoreMessageSubjects.class,
158 MultiValuedTimestamp.class,
159 GroupStoreKeyMapKey.class,
160 GroupStoreIdMapKey.class,
161 GroupStoreMapKey.class
163 .register(new URISerializer(), URI.class)
164 .register(new DeviceIdSerializer(), DeviceId.class)
165 .register(PortNumber.class)
166 .register(DefaultApplicationId.class)
167 .register(DefaultTrafficTreatment.class,
168 Instructions.DropInstruction.class,
169 Instructions.OutputInstruction.class,
170 Instructions.GroupInstruction.class,
171 Instructions.TableTypeTransition.class,
173 L0ModificationInstruction.class,
174 L0ModificationInstruction.L0SubType.class,
175 L0ModificationInstruction.ModLambdaInstruction.class,
176 L2ModificationInstruction.class,
177 L2ModificationInstruction.L2SubType.class,
178 L2ModificationInstruction.ModEtherInstruction.class,
179 L2ModificationInstruction.PushHeaderInstructions.class,
180 L2ModificationInstruction.ModVlanIdInstruction.class,
181 L2ModificationInstruction.ModVlanPcpInstruction.class,
182 L2ModificationInstruction.ModMplsLabelInstruction.class,
183 L2ModificationInstruction.ModMplsTtlInstruction.class,
184 L3ModificationInstruction.class,
185 L3ModificationInstruction.L3SubType.class,
186 L3ModificationInstruction.ModIPInstruction.class,
187 L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
188 L3ModificationInstruction.ModTtlInstruction.class,
189 org.onlab.packet.MplsLabel.class
191 .register(org.onosproject.cluster.NodeId.class)
192 .register(KryoNamespaces.BASIC)
193 .register(KryoNamespaces.MISC);
195 messageHandlingExecutor = Executors.
196 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
197 groupedThreads("onos/store/group",
198 "message-handlers"));
200 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
201 kryoBuilder.build()::deserialize,
203 messageHandlingExecutor);
205 log.debug("Creating EC map groupstorekeymap");
206 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
207 keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
209 groupStoreEntriesByKey = keyMapBuilder
210 .withName("groupstorekeymap")
211 .withSerializer(kryoBuilder)
212 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
213 sequenceNumber.getAndIncrement()))
215 groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
216 log.debug("Current size of groupstorekeymap:{}",
217 groupStoreEntriesByKey.size());
219 log.debug("Creating EC map pendinggroupkeymap");
220 EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
221 auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
223 auditPendingReqQueue = auditMapBuilder
224 .withName("pendinggroupkeymap")
225 .withSerializer(kryoBuilder)
226 .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
227 sequenceNumber.getAndIncrement()))
229 log.debug("Current size of pendinggroupkeymap:{}",
230 auditPendingReqQueue.size());
236 public void deactivate() {
237 groupStoreEntriesByKey.destroy();
238 auditPendingReqQueue.destroy();
242 private static NewConcurrentHashMap<GroupId, Group>
243 lazyEmptyExtraneousGroupIdTable() {
244 return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
247 private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
248 lazyEmptyGroupIdTable() {
249 return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
253 * Returns the group store eventual consistent key map.
255 * @return Map representing group key table.
257 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
258 getGroupStoreKeyMap() {
259 return groupStoreEntriesByKey;
263 * Returns the group id table for specified device.
265 * @param deviceId identifier of the device
266 * @return Map representing group key table of given device.
268 private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
269 return createIfAbsentUnchecked(groupEntriesById,
270 deviceId, lazyEmptyGroupIdTable());
274 * Returns the pending group request table.
276 * @return Map representing group key table.
278 private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
279 getPendingGroupKeyTable() {
280 return auditPendingReqQueue;
284 * Returns the extraneous group id table for specified device.
286 * @param deviceId identifier of the device
287 * @return Map representing group key table of given device.
289 private ConcurrentMap<GroupId, Group>
290 getExtraneousGroupIdTable(DeviceId deviceId) {
291 return createIfAbsentUnchecked(extraneousGroupEntriesById,
293 lazyEmptyExtraneousGroupIdTable());
297 * Returns the number of groups for the specified device in the store.
299 * @return number of groups for the specified device
302 public int getGroupCount(DeviceId deviceId) {
303 return (getGroups(deviceId) != null) ?
304 Iterables.size(getGroups(deviceId)) : 0;
308 * Returns the groups associated with a device.
310 * @param deviceId the device ID
312 * @return the group entries
315 public Iterable<Group> getGroups(DeviceId deviceId) {
316 // flatten and make iterator unmodifiable
317 log.debug("getGroups: for device {} total number of groups {}",
318 deviceId, getGroupStoreKeyMap().values().size());
319 return FluentIterable.from(getGroupStoreKeyMap().values())
320 .filter(input -> input.deviceId().equals(deviceId))
321 .transform(input -> input);
324 private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
325 // flatten and make iterator unmodifiable
326 log.debug("getGroups: for device {} total number of groups {}",
327 deviceId, getGroupStoreKeyMap().values().size());
328 return FluentIterable.from(getGroupStoreKeyMap().values())
329 .filter(input -> input.deviceId().equals(deviceId));
333 * Returns the stored group entry.
335 * @param deviceId the device ID
336 * @param appCookie the group key
338 * @return a group associated with the key
341 public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
342 return getStoredGroupEntry(deviceId, appCookie);
345 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
346 GroupKey appCookie) {
347 return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
352 public Group getGroup(DeviceId deviceId, GroupId groupId) {
353 return getStoredGroupEntry(deviceId, groupId);
356 private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
358 return getGroupIdTable(deviceId).get(groupId);
361 private int getFreeGroupIdValue(DeviceId deviceId) {
362 int freeId = groupIdGen.incrementAndGet();
365 Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
366 if (existing == null) {
368 extraneousGroupEntriesById.get(deviceId) != null) ?
369 extraneousGroupEntriesById.get(deviceId).
370 get(new DefaultGroupId(freeId)) :
373 if (existing != null) {
374 freeId = groupIdGen.incrementAndGet();
379 log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
384 * Stores a new group entry using the information from group description.
386 * @param groupDesc group description to be used to create group entry
389 public void storeGroupDescription(GroupDescription groupDesc) {
390 log.debug("In storeGroupDescription");
391 // Check if a group is existing with the same key
392 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
393 log.warn("Group already exists with the same key {}",
394 groupDesc.appCookie());
398 // Check if group to be created by a remote instance
399 if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
400 log.debug("storeGroupDescription: Device {} local role is not MASTER",
401 groupDesc.deviceId());
402 if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
403 log.error("No Master for device {}..."
404 + "Can not perform add group operation",
405 groupDesc.deviceId());
406 //TODO: Send Group operation failure event
409 GroupStoreMessage groupOp = GroupStoreMessage.
410 createGroupAddRequestMsg(groupDesc.deviceId(),
413 clusterCommunicator.unicast(groupOp,
414 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
415 m -> kryoBuilder.build().serialize(m),
416 mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
418 log.warn("Failed to send request to master: {} to {}",
420 mastershipService.getMasterFor(groupDesc.deviceId()));
421 //TODO: Send Group operation failure event
423 log.debug("Sent Group operation request for device {} "
424 + "to remote MASTER {}",
425 groupDesc.deviceId(),
426 mastershipService.getMasterFor(groupDesc.deviceId()));
432 log.debug("Store group for device {} is getting handled locally",
433 groupDesc.deviceId());
434 storeGroupDescriptionInternal(groupDesc);
437 private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) {
438 ConcurrentMap<GroupId, Group> extraneousMap =
439 extraneousGroupEntriesById.get(deviceId);
440 if (extraneousMap == null) {
443 return extraneousMap.get(new DefaultGroupId(groupId));
446 private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId,
447 GroupBuckets buckets) {
448 ConcurrentMap<GroupId, Group> extraneousMap =
449 extraneousGroupEntriesById.get(deviceId);
450 if (extraneousMap == null) {
454 for (Group extraneousGroup:extraneousMap.values()) {
455 if (extraneousGroup.buckets().equals(buckets)) {
456 return extraneousGroup;
462 private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
463 // Check if a group is existing with the same key
464 if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
468 if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
469 // Device group audit has not completed yet
470 // Add this group description to pending group key table
471 // Create a group entry object with Dummy Group ID
472 log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
473 groupDesc.deviceId());
474 StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
475 group.setState(GroupState.WAITING_AUDIT_COMPLETE);
476 EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
477 getPendingGroupKeyTable();
478 pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
479 groupDesc.appCookie()),
484 Group matchingExtraneousGroup = null;
485 if (groupDesc.givenGroupId() != null) {
486 //Check if there is a extraneous group existing with the same Id
487 matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
488 groupDesc.deviceId(), groupDesc.givenGroupId());
489 if (matchingExtraneousGroup != null) {
490 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {} for group id {}",
491 groupDesc.deviceId(),
492 groupDesc.givenGroupId());
493 //Check if the group buckets matches with user provided buckets
494 if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) {
495 //Group is already existing with the same buckets and Id
496 // Create a group entry object
497 log.debug("storeGroupDescriptionInternal: Buckets also matching in Device {} for group id {}",
498 groupDesc.deviceId(),
499 groupDesc.givenGroupId());
500 StoredGroupEntry group = new DefaultGroup(
501 matchingExtraneousGroup.id(), groupDesc);
502 // Insert the newly created group entry into key and id maps
503 getGroupStoreKeyMap().
504 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
505 groupDesc.appCookie()), group);
506 // Ensure it also inserted into group id based table to
507 // avoid any chances of duplication in group id generation
508 getGroupIdTable(groupDesc.deviceId()).
509 put(matchingExtraneousGroup.id(), group);
510 addOrUpdateGroupEntry(matchingExtraneousGroup);
511 removeExtraneousGroupEntry(matchingExtraneousGroup);
514 //Group buckets are not matching. Update group
515 //with user provided buckets.
517 log.debug("storeGroupDescriptionInternal: Buckets are not matching in Device {} for group id {}",
518 groupDesc.deviceId(),
519 groupDesc.givenGroupId());
523 //Check if there is an extraneous group with user provided buckets
524 matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
525 groupDesc.deviceId(), groupDesc.buckets());
526 if (matchingExtraneousGroup != null) {
527 //Group is already existing with the same buckets.
528 //So reuse this group.
529 log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}",
530 groupDesc.deviceId());
531 //Create a group entry object
532 StoredGroupEntry group = new DefaultGroup(
533 matchingExtraneousGroup.id(), groupDesc);
534 // Insert the newly created group entry into key and id maps
535 getGroupStoreKeyMap().
536 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
537 groupDesc.appCookie()), group);
538 // Ensure it also inserted into group id based table to
539 // avoid any chances of duplication in group id generation
540 getGroupIdTable(groupDesc.deviceId()).
541 put(matchingExtraneousGroup.id(), group);
542 addOrUpdateGroupEntry(matchingExtraneousGroup);
543 removeExtraneousGroupEntry(matchingExtraneousGroup);
546 //TODO: Check if there are any empty groups that can be used here
547 log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}",
548 groupDesc.deviceId());
553 if (groupDesc.givenGroupId() == null) {
554 // Get a new group identifier
555 id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
557 id = new DefaultGroupId(groupDesc.givenGroupId());
559 // Create a group entry object
560 StoredGroupEntry group = new DefaultGroup(id, groupDesc);
561 // Insert the newly created group entry into key and id maps
562 getGroupStoreKeyMap().
563 put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
564 groupDesc.appCookie()), group);
565 // Ensure it also inserted into group id based table to
566 // avoid any chances of duplication in group id generation
567 getGroupIdTable(groupDesc.deviceId()).
569 log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
571 groupDesc.deviceId());
572 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
577 * Updates the existing group entry with the information
578 * from group description.
580 * @param deviceId the device ID
581 * @param oldAppCookie the current group key
582 * @param type update type
583 * @param newBuckets group buckets for updates
584 * @param newAppCookie optional new group key
587 public void updateGroupDescription(DeviceId deviceId,
588 GroupKey oldAppCookie,
590 GroupBuckets newBuckets,
591 GroupKey newAppCookie) {
592 // Check if group update to be done by a remote instance
593 if (mastershipService.getMasterFor(deviceId) != null &&
594 mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
595 log.debug("updateGroupDescription: Device {} local role is not MASTER",
597 if (mastershipService.getMasterFor(deviceId) == null) {
598 log.error("No Master for device {}..."
599 + "Can not perform update group operation",
601 //TODO: Send Group operation failure event
604 GroupStoreMessage groupOp = GroupStoreMessage.
605 createGroupUpdateRequestMsg(deviceId,
611 clusterCommunicator.unicast(groupOp,
612 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
613 m -> kryoBuilder.build().serialize(m),
614 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
616 log.warn("Failed to send request to master: {} to {}",
618 mastershipService.getMasterFor(deviceId), error);
620 //TODO: Send Group operation failure event
624 log.debug("updateGroupDescription for device {} is getting handled locally",
626 updateGroupDescriptionInternal(deviceId,
633 private void updateGroupDescriptionInternal(DeviceId deviceId,
634 GroupKey oldAppCookie,
636 GroupBuckets newBuckets,
637 GroupKey newAppCookie) {
638 // Check if a group is existing with the provided key
639 Group oldGroup = getGroup(deviceId, oldAppCookie);
640 if (oldGroup == null) {
641 log.warn("updateGroupDescriptionInternal: Group not found...strange");
645 List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
648 if (newBucketList != null) {
649 // Create a new group object from the old group
650 GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
651 GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
652 GroupDescription updatedGroupDesc = new DefaultGroupDescription(
657 oldGroup.givenGroupId(),
659 StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
661 log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
665 newGroup.setState(GroupState.PENDING_UPDATE);
666 newGroup.setLife(oldGroup.life());
667 newGroup.setPackets(oldGroup.packets());
668 newGroup.setBytes(oldGroup.bytes());
669 //Update the group entry in groupkey based map.
670 //Update to groupid based map will happen in the
671 //groupkey based map update listener
672 log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
674 getGroupStoreKeyMap().
675 put(new GroupStoreKeyMapKey(newGroup.deviceId(),
676 newGroup.appCookie()), newGroup);
677 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
679 log.warn("updateGroupDescriptionInternal with type {}: No "
680 + "change in the buckets in update", type);
684 private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
686 GroupBuckets buckets) {
687 GroupBuckets oldBuckets = oldGroup.buckets();
688 List<GroupBucket> newBucketList = new ArrayList<GroupBucket>(
689 oldBuckets.buckets());
690 boolean groupDescUpdated = false;
692 if (type == UpdateType.ADD) {
693 // Check if the any of the new buckets are part of
694 // the old bucket list
695 for (GroupBucket addBucket:buckets.buckets()) {
696 if (!newBucketList.contains(addBucket)) {
697 newBucketList.add(addBucket);
698 groupDescUpdated = true;
701 } else if (type == UpdateType.REMOVE) {
702 // Check if the to be removed buckets are part of the
704 for (GroupBucket removeBucket:buckets.buckets()) {
705 if (newBucketList.contains(removeBucket)) {
706 newBucketList.remove(removeBucket);
707 groupDescUpdated = true;
712 if (groupDescUpdated) {
713 return newBucketList;
720 * Triggers deleting the existing group entry.
722 * @param deviceId the device ID
723 * @param appCookie the group key
726 public void deleteGroupDescription(DeviceId deviceId,
727 GroupKey appCookie) {
728 // Check if group to be deleted by a remote instance
729 if (mastershipService.
730 getLocalRole(deviceId) != MastershipRole.MASTER) {
731 log.debug("deleteGroupDescription: Device {} local role is not MASTER",
733 if (mastershipService.getMasterFor(deviceId) == null) {
734 log.error("No Master for device {}..."
735 + "Can not perform delete group operation",
737 //TODO: Send Group operation failure event
740 GroupStoreMessage groupOp = GroupStoreMessage.
741 createGroupDeleteRequestMsg(deviceId,
744 clusterCommunicator.unicast(groupOp,
745 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
746 m -> kryoBuilder.build().serialize(m),
747 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
749 log.warn("Failed to send request to master: {} to {}",
751 mastershipService.getMasterFor(deviceId), error);
753 //TODO: Send Group operation failure event
757 log.debug("deleteGroupDescription in device {} is getting handled locally",
759 deleteGroupDescriptionInternal(deviceId, appCookie);
762 private void deleteGroupDescriptionInternal(DeviceId deviceId,
763 GroupKey appCookie) {
764 // Check if a group is existing with the provided key
765 StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
766 if (existing == null) {
770 log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
774 synchronized (existing) {
775 existing.setState(GroupState.PENDING_DELETE);
777 log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
779 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
783 * Stores a new group entry, or updates an existing entry.
785 * @param group group entry
788 public void addOrUpdateGroupEntry(Group group) {
789 // check if this new entry is an update to an existing entry
790 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
792 GroupEvent event = null;
794 if (existing != null) {
795 log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}",
798 synchronized (existing) {
799 for (GroupBucket bucket:group.buckets().buckets()) {
800 Optional<GroupBucket> matchingBucket =
801 existing.buckets().buckets()
803 .filter((existingBucket)->(existingBucket.equals(bucket)))
805 if (matchingBucket.isPresent()) {
806 ((StoredGroupBucketEntry) matchingBucket.
807 get()).setPackets(bucket.packets());
808 ((StoredGroupBucketEntry) matchingBucket.
809 get()).setBytes(bucket.bytes());
811 log.warn("addOrUpdateGroupEntry: No matching "
812 + "buckets to update stats");
815 existing.setLife(group.life());
816 existing.setPackets(group.packets());
817 existing.setBytes(group.bytes());
818 if ((existing.state() == GroupState.PENDING_ADD) ||
819 (existing.state() == GroupState.PENDING_ADD_RETRY)) {
820 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
824 existing.setState(GroupState.ADDED);
825 existing.setIsGroupStateAddedFirstTime(true);
826 event = new GroupEvent(Type.GROUP_ADDED, existing);
828 log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
831 GroupState.PENDING_UPDATE);
832 existing.setState(GroupState.ADDED);
833 existing.setIsGroupStateAddedFirstTime(false);
834 event = new GroupEvent(Type.GROUP_UPDATED, existing);
836 //Re-PUT map entries to trigger map update events
837 getGroupStoreKeyMap().
838 put(new GroupStoreKeyMapKey(existing.deviceId(),
839 existing.appCookie()), existing);
842 log.warn("addOrUpdateGroupEntry: Group update "
843 + "happening for a non-existing entry in the map");
847 notifyDelegate(event);
852 * Removes the group entry from store.
854 * @param group group entry
857 public void removeGroupEntry(Group group) {
858 StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
861 if (existing != null) {
862 log.debug("removeGroupEntry: removing group entry {} in device {}",
865 //Removal from groupid based map will happen in the
866 //map update listener
867 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
868 existing.appCookie()));
869 notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
871 log.warn("removeGroupEntry for {} in device{} is "
872 + "not existing in our maps",
879 public void deviceInitialAuditCompleted(DeviceId deviceId,
881 synchronized (deviceAuditStatus) {
883 log.debug("AUDIT completed for device {}",
885 deviceAuditStatus.put(deviceId, true);
886 // Execute all pending group requests
887 List<StoredGroupEntry> pendingGroupRequests =
888 getPendingGroupKeyTable().values()
890 .filter(g-> g.deviceId().equals(deviceId))
891 .collect(Collectors.toList());
892 log.debug("processing pending group add requests for device {} and number of pending requests {}",
894 pendingGroupRequests.size());
895 for (Group group:pendingGroupRequests) {
896 GroupDescription tmp = new DefaultGroupDescription(
901 group.givenGroupId(),
903 storeGroupDescriptionInternal(tmp);
904 getPendingGroupKeyTable().
905 remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
908 Boolean audited = deviceAuditStatus.get(deviceId);
909 if (audited != null && audited) {
910 log.debug("Clearing AUDIT status for device {}", deviceId);
911 deviceAuditStatus.put(deviceId, false);
918 public boolean deviceInitialAuditStatus(DeviceId deviceId) {
919 synchronized (deviceAuditStatus) {
920 Boolean audited = deviceAuditStatus.get(deviceId);
921 return audited != null && audited;
926 public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
928 StoredGroupEntry existing = getStoredGroupEntry(deviceId,
929 operation.groupId());
931 if (existing == null) {
932 log.warn("No group entry with ID {} found ", operation.groupId());
936 log.warn("groupOperationFailed: group operation {} failed"
937 + "for group {} in device {}",
940 existing.deviceId());
941 switch (operation.opType()) {
943 if (existing.state() == GroupState.PENDING_ADD) {
944 //TODO: Need to add support for passing the group
945 //operation failure reason from group provider.
946 //If the error type is anything other than GROUP_EXISTS,
947 //then the GROUP_ADD_FAILED event should be raised even
948 //in PENDING_ADD_RETRY state also.
949 notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
950 log.warn("groupOperationFailed: cleaningup "
951 + "group {} from store in device {}....",
953 existing.deviceId());
954 //Removal from groupid based map will happen in the
955 //map update listener
956 getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
957 existing.appCookie()));
961 notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
964 notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
967 log.warn("Unknown group operation type {}", operation.opType());
972 public void addOrUpdateExtraneousGroupEntry(Group group) {
973 log.debug("add/update extraneous group entry {} in device {}",
976 ConcurrentMap<GroupId, Group> extraneousIdTable =
977 getExtraneousGroupIdTable(group.deviceId());
978 extraneousIdTable.put(group.id(), group);
979 // Don't remove the extraneous groups, instead re-use it when
980 // a group request comes with the same set of buckets
984 public void removeExtraneousGroupEntry(Group group) {
985 log.debug("remove extraneous group entry {} of device {} from store",
988 ConcurrentMap<GroupId, Group> extraneousIdTable =
989 getExtraneousGroupIdTable(group.deviceId());
990 extraneousIdTable.remove(group.id());
994 public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
995 // flatten and make iterator unmodifiable
996 return FluentIterable.from(
997 getExtraneousGroupIdTable(deviceId).values());
1001 * Map handler to receive any events when the group key map is updated.
1003 private class GroupStoreKeyMapListener implements
1004 EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
1007 public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
1008 StoredGroupEntry> mapEvent) {
1009 GroupEvent groupEvent = null;
1010 GroupStoreKeyMapKey key = mapEvent.key();
1011 StoredGroupEntry group = mapEvent.value();
1012 if ((key == null) && (group == null)) {
1013 log.error("GroupStoreKeyMapListener: Received "
1014 + "event {} with null entry", mapEvent.type());
1016 } else if (group == null) {
1017 group = getGroupIdTable(key.deviceId()).values()
1019 .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
1021 if (group == null) {
1022 log.error("GroupStoreKeyMapListener: Received "
1023 + "event {} with null entry... can not process", mapEvent.type());
1027 log.trace("received groupid map event {} for id {} in device {}",
1031 if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
1032 // Update the group ID table
1033 getGroupIdTable(group.deviceId()).put(group.id(), group);
1034 if (mapEvent.value().state() == Group.GroupState.ADDED) {
1035 if (mapEvent.value().isGroupStateAddedFirstTime()) {
1036 groupEvent = new GroupEvent(Type.GROUP_ADDED,
1038 log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
1042 groupEvent = new GroupEvent(Type.GROUP_UPDATED,
1044 log.trace("Received following GROUP_ADDED state update for id {} in device {}",
1049 } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
1050 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
1051 // Remove the entry from the group ID table
1052 getGroupIdTable(group.deviceId()).remove(group.id(), group);
1055 if (groupEvent != null) {
1056 notifyDelegate(groupEvent);
1061 private void process(GroupStoreMessage groupOp) {
1062 log.debug("Received remote group operation {} request for device {}",
1064 groupOp.deviceId());
1065 if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
1066 log.warn("This node is not MASTER for device {}", groupOp.deviceId());
1069 if (groupOp.type() == GroupStoreMessage.Type.ADD) {
1070 storeGroupDescriptionInternal(groupOp.groupDesc());
1071 } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
1072 updateGroupDescriptionInternal(groupOp.deviceId(),
1073 groupOp.appCookie(),
1074 groupOp.updateType(),
1075 groupOp.updateBuckets(),
1076 groupOp.newAppCookie());
1077 } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
1078 deleteGroupDescriptionInternal(groupOp.deviceId(),
1079 groupOp.appCookie());
1084 * Flattened map key to be used to store group entries.
1086 protected static class GroupStoreMapKey {
1087 private final DeviceId deviceId;
1089 public GroupStoreMapKey(DeviceId deviceId) {
1090 this.deviceId = deviceId;
1093 public DeviceId deviceId() {
1098 public boolean equals(Object o) {
1102 if (!(o instanceof GroupStoreMapKey)) {
1105 GroupStoreMapKey that = (GroupStoreMapKey) o;
1106 return this.deviceId.equals(that.deviceId);
1110 public int hashCode() {
1113 result = 31 * result + Objects.hash(this.deviceId);
1119 protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
1120 private final GroupKey appCookie;
1121 public GroupStoreKeyMapKey(DeviceId deviceId,
1122 GroupKey appCookie) {
1124 this.appCookie = appCookie;
1128 public boolean equals(Object o) {
1132 if (!(o instanceof GroupStoreKeyMapKey)) {
1135 GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
1136 return (super.equals(that) &&
1137 this.appCookie.equals(that.appCookie));
1141 public int hashCode() {
1144 result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
1150 protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
1151 private final GroupId groupId;
1152 public GroupStoreIdMapKey(DeviceId deviceId,
1155 this.groupId = groupId;
1159 public boolean equals(Object o) {
1163 if (!(o instanceof GroupStoreIdMapKey)) {
1166 GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
1167 return (super.equals(that) &&
1168 this.groupId.equals(that.groupId));
1172 public int hashCode() {
1175 result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
1182 public void pushGroupMetrics(DeviceId deviceId,
1183 Collection<Group> groupEntries) {
1184 boolean deviceInitialAuditStatus =
1185 deviceInitialAuditStatus(deviceId);
1186 Set<Group> southboundGroupEntries =
1187 Sets.newHashSet(groupEntries);
1188 Set<StoredGroupEntry> storedGroupEntries =
1189 Sets.newHashSet(getStoredGroups(deviceId));
1190 Set<Group> extraneousStoredEntries =
1191 Sets.newHashSet(getExtraneousGroups(deviceId));
1193 log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
1194 southboundGroupEntries.size(),
1196 for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
1197 Group group = it.next();
1198 log.trace("Group {} in device {}", group, deviceId);
1201 log.trace("Displaying all ({}) stored group entries for device {}",
1202 storedGroupEntries.size(),
1204 for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
1206 Group group = it1.next();
1207 log.trace("Stored Group {} for device {}", group, deviceId);
1210 for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
1211 Group group = it2.next();
1212 if (storedGroupEntries.remove(group)) {
1213 // we both have the group, let's update some info then.
1214 log.trace("Group AUDIT: group {} exists in both planes for device {}",
1215 group.id(), deviceId);
1220 for (Group group : southboundGroupEntries) {
1221 if (getGroup(group.deviceId(), group.id()) != null) {
1222 // There is a group existing with the same id
1223 // It is possible that group update is
1224 // in progress while we got a stale info from switch
1225 if (!storedGroupEntries.remove(getGroup(
1226 group.deviceId(), group.id()))) {
1227 log.warn("Group AUDIT: Inconsistent state:"
1228 + "Group exists in ID based table while "
1229 + "not present in key based table");
1232 // there are groups in the switch that aren't in the store
1233 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
1234 group.id(), deviceId);
1235 extraneousStoredEntries.remove(group);
1236 extraneousGroup(group);
1239 for (Group group : storedGroupEntries) {
1240 // there are groups in the store that aren't in the switch
1241 log.debug("Group AUDIT: group {} missing in data plane for device {}",
1242 group.id(), deviceId);
1243 groupMissing(group);
1245 for (Group group : extraneousStoredEntries) {
1246 // there are groups in the extraneous store that
1247 // aren't in the switch
1248 log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}",
1249 group.id(), deviceId);
1250 removeExtraneousGroupEntry(group);
1253 if (!deviceInitialAuditStatus) {
1254 log.debug("Group AUDIT: Setting device {} initial AUDIT completed",
1256 deviceInitialAuditCompleted(deviceId, true);
1260 private void groupMissing(Group group) {
1261 switch (group.state()) {
1262 case PENDING_DELETE:
1263 log.debug("Group {} delete confirmation from device {}",
1264 group, group.deviceId());
1265 removeGroupEntry(group);
1269 case PENDING_ADD_RETRY:
1270 case PENDING_UPDATE:
1271 log.debug("Group {} is in store but not on device {}",
1272 group, group.deviceId());
1273 StoredGroupEntry existing =
1274 getStoredGroupEntry(group.deviceId(), group.id());
1275 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
1277 existing.deviceId(),
1279 existing.setState(Group.GroupState.PENDING_ADD_RETRY);
1280 //Re-PUT map entries to trigger map update events
1281 getGroupStoreKeyMap().
1282 put(new GroupStoreKeyMapKey(existing.deviceId(),
1283 existing.appCookie()), existing);
1284 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
1288 log.debug("Group {} has not been installed.", group);
1293 private void extraneousGroup(Group group) {
1294 log.debug("Group {} is on device {} but not in store.",
1295 group, group.deviceId());
1296 addOrUpdateExtraneousGroupEntry(group);
1299 private void groupAdded(Group group) {
1300 log.trace("Group {} Added or Updated in device {}",
1301 group, group.deviceId());
1302 addOrUpdateGroupEntry(group);