2 * Copyright 2014-2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.store.flow.impl;
18 import com.google.common.base.Objects;
19 import com.google.common.collect.ImmutableMap;
20 import com.google.common.collect.Iterables;
21 import com.google.common.collect.Maps;
22 import com.google.common.collect.Sets;
23 import com.google.common.util.concurrent.Futures;
25 import org.apache.felix.scr.annotations.Activate;
26 import org.apache.felix.scr.annotations.Component;
27 import org.apache.felix.scr.annotations.Deactivate;
28 import org.apache.felix.scr.annotations.Modified;
29 import org.apache.felix.scr.annotations.Property;
30 import org.apache.felix.scr.annotations.Reference;
31 import org.apache.felix.scr.annotations.ReferenceCardinality;
32 import org.apache.felix.scr.annotations.Service;
33 import org.onlab.util.KryoNamespace;
34 import org.onlab.util.Tools;
35 import org.onosproject.cfg.ComponentConfigService;
36 import org.onosproject.cluster.ClusterService;
37 import org.onosproject.cluster.NodeId;
38 import org.onosproject.core.CoreService;
39 import org.onosproject.core.IdGenerator;
40 import org.onosproject.mastership.MastershipService;
41 import org.onosproject.net.DeviceId;
42 import org.onosproject.net.device.DeviceService;
43 import org.onosproject.net.flow.CompletedBatchOperation;
44 import org.onosproject.net.flow.DefaultFlowEntry;
45 import org.onosproject.net.flow.FlowEntry;
46 import org.onosproject.net.flow.FlowEntry.FlowEntryState;
47 import org.onosproject.net.flow.FlowId;
48 import org.onosproject.net.flow.FlowRule;
49 import org.onosproject.net.flow.FlowRuleBatchEntry;
50 import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
51 import org.onosproject.net.flow.FlowRuleBatchEvent;
52 import org.onosproject.net.flow.FlowRuleBatchOperation;
53 import org.onosproject.net.flow.FlowRuleBatchRequest;
54 import org.onosproject.net.flow.FlowRuleEvent;
55 import org.onosproject.net.flow.FlowRuleEvent.Type;
56 import org.onosproject.net.flow.FlowRuleService;
57 import org.onosproject.net.flow.FlowRuleStore;
58 import org.onosproject.net.flow.FlowRuleStoreDelegate;
59 import org.onosproject.net.flow.StoredFlowEntry;
60 import org.onosproject.store.AbstractStore;
61 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
62 import org.onosproject.store.cluster.messaging.ClusterMessage;
63 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
64 import org.onosproject.store.flow.ReplicaInfoEvent;
65 import org.onosproject.store.flow.ReplicaInfoEventListener;
66 import org.onosproject.store.flow.ReplicaInfoService;
67 import org.onosproject.store.serializers.KryoSerializer;
68 import org.onosproject.store.serializers.StoreSerializer;
69 import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
70 import org.osgi.service.component.ComponentContext;
71 import org.slf4j.Logger;
73 import java.util.Collections;
74 import java.util.Dictionary;
75 import java.util.HashSet;
76 import java.util.List;
79 import java.util.concurrent.ExecutorService;
80 import java.util.concurrent.Executors;
81 import java.util.concurrent.ScheduledExecutorService;
82 import java.util.concurrent.ScheduledFuture;
83 import java.util.concurrent.TimeUnit;
84 import java.util.concurrent.atomic.AtomicInteger;
85 import java.util.stream.Collectors;
87 import static com.google.common.base.Strings.isNullOrEmpty;
88 import static org.onlab.util.Tools.get;
89 import static org.onlab.util.Tools.groupedThreads;
90 import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
91 import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
92 import static org.slf4j.LoggerFactory.getLogger;
95 * Manages inventory of flow rules using a distributed state management protocol.
97 @Component(immediate = true, enabled = true)
99 public class NewDistributedFlowRuleStore
100 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
101 implements FlowRuleStore {
103 private final Logger log = getLogger(getClass());
105 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
106 private static final boolean DEFAULT_BACKUP_ENABLED = true;
107 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
108 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
109 // number of devices whose flow entries will be backed up in one communication round
110 private static final int FLOW_TABLE_BACKUP_BATCH_SIZE = 1;
112 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
113 label = "Number of threads in the message handler pool")
114 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
116 @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
117 label = "Indicates whether backups are enabled or not")
118 private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
120 @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
121 label = "Delay in ms between successive backup runs")
122 private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
124 private InternalFlowTable flowTable = new InternalFlowTable();
126 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
127 protected ReplicaInfoService replicaInfoManager;
129 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
130 protected ClusterCommunicationService clusterCommunicator;
132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
133 protected ClusterService clusterService;
135 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
136 protected DeviceService deviceService;
138 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
139 protected CoreService coreService;
141 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
142 protected ComponentConfigService configService;
144 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
145 protected MastershipService mastershipService;
147 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
148 private ExecutorService messageHandlingExecutor;
150 private ScheduledFuture<?> backupTask;
151 private final ScheduledExecutorService backupSenderExecutor =
152 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
154 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
156 protected void setupKryoPool() {
157 serializerPool = KryoNamespace.newBuilder()
158 .register(DistributedStoreSerializers.STORE_COMMON)
159 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
164 private IdGenerator idGenerator;
165 private NodeId local;
168 public void activate(ComponentContext context) {
169 configService.registerProperties(getClass());
171 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
173 local = clusterService.getLocalNode().id();
175 messageHandlingExecutor = Executors.newFixedThreadPool(
176 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
178 registerMessageHandlers(messageHandlingExecutor);
181 replicaInfoManager.addListener(flowTable);
182 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
186 TimeUnit.MILLISECONDS);
189 logConfig("Started");
193 public void deactivate(ComponentContext context) {
195 replicaInfoManager.removeListener(flowTable);
196 backupTask.cancel(true);
198 configService.unregisterProperties(getClass(), false);
199 unregisterMessageHandlers();
200 messageHandlingExecutor.shutdownNow();
201 backupSenderExecutor.shutdownNow();
205 @SuppressWarnings("rawtypes")
207 public void modified(ComponentContext context) {
208 if (context == null) {
209 backupEnabled = DEFAULT_BACKUP_ENABLED;
210 logConfig("Default config");
214 Dictionary properties = context.getProperties();
216 boolean newBackupEnabled;
219 String s = get(properties, "msgHandlerPoolSize");
220 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
222 s = get(properties, "backupEnabled");
223 newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
225 s = get(properties, "backupPeriod");
226 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
228 } catch (NumberFormatException | ClassCastException e) {
229 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
230 newBackupEnabled = DEFAULT_BACKUP_ENABLED;
231 newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
234 boolean restartBackupTask = false;
235 if (newBackupEnabled != backupEnabled) {
236 backupEnabled = newBackupEnabled;
237 if (!backupEnabled) {
238 replicaInfoManager.removeListener(flowTable);
239 if (backupTask != null) {
240 backupTask.cancel(false);
244 replicaInfoManager.addListener(flowTable);
246 restartBackupTask = backupEnabled;
248 if (newBackupPeriod != backupPeriod) {
249 backupPeriod = newBackupPeriod;
250 restartBackupTask = backupEnabled;
252 if (restartBackupTask) {
253 if (backupTask != null) {
254 // cancel previously running task
255 backupTask.cancel(false);
257 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
261 TimeUnit.MILLISECONDS);
263 if (newPoolSize != msgHandlerPoolSize) {
264 msgHandlerPoolSize = newPoolSize;
265 ExecutorService oldMsgHandler = messageHandlingExecutor;
266 messageHandlingExecutor = Executors.newFixedThreadPool(
267 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
269 // replace previously registered handlers.
270 registerMessageHandlers(messageHandlingExecutor);
271 oldMsgHandler.shutdown();
273 logConfig("Reconfigured");
276 private void registerMessageHandlers(ExecutorService executor) {
278 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
279 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
280 REMOTE_APPLY_COMPLETED, SERIALIZER::decode, this::notifyDelegate, executor);
281 clusterCommunicator.addSubscriber(
282 GET_FLOW_ENTRY, SERIALIZER::decode, flowTable::getFlowEntry, SERIALIZER::encode, executor);
283 clusterCommunicator.addSubscriber(
284 GET_DEVICE_FLOW_ENTRIES, SERIALIZER::decode, flowTable::getFlowEntries, SERIALIZER::encode, executor);
285 clusterCommunicator.addSubscriber(
286 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
287 clusterCommunicator.addSubscriber(
288 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
289 clusterCommunicator.addSubscriber(
290 FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, SERIALIZER::encode, executor);
293 private void unregisterMessageHandlers() {
294 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
295 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
296 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
297 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
298 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
299 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
302 private void logConfig(String prefix) {
303 log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}",
304 prefix, msgHandlerPoolSize, backupEnabled, backupPeriod);
307 // This is not a efficient operation on a distributed sharded
308 // flow store. We need to revisit the need for this operation or at least
309 // make it device specific.
311 public int getFlowRuleCount() {
312 AtomicInteger sum = new AtomicInteger(0);
313 deviceService.getDevices().forEach(device -> sum.addAndGet(Iterables.size(getFlowEntries(device.id()))));
318 public FlowEntry getFlowEntry(FlowRule rule) {
319 NodeId master = mastershipService.getMasterFor(rule.deviceId());
321 if (master == null) {
322 log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
326 if (Objects.equal(local, master)) {
327 return flowTable.getFlowEntry(rule);
330 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
331 master, rule.deviceId());
333 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
334 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
338 FLOW_RULE_STORE_TIMEOUT_MILLIS,
339 TimeUnit.MILLISECONDS,
344 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
345 NodeId master = mastershipService.getMasterFor(deviceId);
347 if (master == null) {
348 log.debug("Failed to getFlowEntries: No master for {}", deviceId);
349 return Collections.emptyList();
352 if (Objects.equal(local, master)) {
353 return flowTable.getFlowEntries(deviceId);
356 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
359 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
360 FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
364 FLOW_RULE_STORE_TIMEOUT_MILLIS,
365 TimeUnit.MILLISECONDS,
366 Collections.emptyList());
370 public void storeFlowRule(FlowRule rule) {
371 storeBatch(new FlowRuleBatchOperation(
372 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
373 rule.deviceId(), idGenerator.getNewId()));
377 public void storeBatch(FlowRuleBatchOperation operation) {
378 if (operation.getOperations().isEmpty()) {
379 notifyDelegate(FlowRuleBatchEvent.completed(
380 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
381 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
385 DeviceId deviceId = operation.deviceId();
386 NodeId master = mastershipService.getMasterFor(deviceId);
388 if (master == null) {
389 log.warn("No master for {} : flows will be marked for removal", deviceId);
391 updateStoreInternal(operation);
393 notifyDelegate(FlowRuleBatchEvent.completed(
394 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
395 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
399 if (Objects.equal(local, master)) {
400 storeBatchInternal(operation);
404 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
407 clusterCommunicator.unicast(operation,
411 .whenComplete((result, error) -> {
413 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
415 Set<FlowRule> allFailures = operation.getOperations()
417 .map(op -> op.target())
418 .collect(Collectors.toSet());
420 notifyDelegate(FlowRuleBatchEvent.completed(
421 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
422 new CompletedBatchOperation(false, allFailures, deviceId)));
427 private void storeBatchInternal(FlowRuleBatchOperation operation) {
429 final DeviceId did = operation.deviceId();
430 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
431 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
432 if (currentOps.isEmpty()) {
433 batchOperationComplete(FlowRuleBatchEvent.completed(
434 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
435 new CompletedBatchOperation(true, Collections.emptySet(), did)));
439 notifyDelegate(FlowRuleBatchEvent.requested(new
440 FlowRuleBatchRequest(operation.id(),
441 currentOps), operation.deviceId()));
444 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
445 return operation.getOperations().stream().map(
447 StoredFlowEntry entry;
448 switch (op.operator()) {
450 entry = new DefaultFlowEntry(op.target());
451 // always add requested FlowRule
452 // Note: 2 equal FlowEntry may have different treatment
453 flowTable.remove(entry.deviceId(), entry);
454 flowTable.add(entry);
458 entry = flowTable.getFlowEntry(op.target());
460 entry.setState(FlowEntryState.PENDING_REMOVE);
465 //TODO: figure this out at some point
468 log.warn("Unknown flow operation operator: {}", op.operator());
472 ).filter(op -> op != null).collect(Collectors.toSet());
476 public void deleteFlowRule(FlowRule rule) {
478 new FlowRuleBatchOperation(
479 Collections.singletonList(
480 new FlowRuleBatchEntry(
481 FlowRuleOperation.REMOVE,
482 rule)), rule.deviceId(), idGenerator.getNewId()));
486 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
487 NodeId master = mastershipService.getMasterFor(rule.deviceId());
488 if (Objects.equal(local, master)) {
489 return addOrUpdateFlowRuleInternal(rule);
492 log.warn("Tried to update FlowRule {} state,"
493 + " while the Node was not the master.", rule);
497 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
498 // check if this new rule is an update to an existing entry
499 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
500 if (stored != null) {
501 stored.setBytes(rule.bytes());
502 stored.setLife(rule.life());
503 stored.setPackets(rule.packets());
504 if (stored.state() == FlowEntryState.PENDING_ADD) {
505 stored.setState(FlowEntryState.ADDED);
506 return new FlowRuleEvent(Type.RULE_ADDED, rule);
508 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
511 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
512 // TODO: also update backup if the behavior is correct.
518 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
519 final DeviceId deviceId = rule.deviceId();
520 NodeId master = mastershipService.getMasterFor(deviceId);
522 if (Objects.equal(local, master)) {
523 // bypass and handle it locally
524 return removeFlowRuleInternal(rule);
527 if (master == null) {
528 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
529 // TODO: revisit if this should be null (="no-op") or Exception
533 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
536 return Futures.get(clusterCommunicator.sendAndReceive(
542 FLOW_RULE_STORE_TIMEOUT_MILLIS,
543 TimeUnit.MILLISECONDS,
544 RuntimeException.class);
547 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
548 final DeviceId deviceId = rule.deviceId();
549 // This is where one could mark a rule as removed and still keep it in the store.
550 final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
551 return removed ? new FlowRuleEvent(RULE_REMOVED, rule) : null;
555 public void batchOperationComplete(FlowRuleBatchEvent event) {
556 //FIXME: need a per device pending response
557 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
558 if (nodeId == null) {
559 notifyDelegate(event);
561 // TODO check unicast return value
562 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
563 //error log: log.warn("Failed to respond to peer for batch operation result");
567 private final class OnStoreBatch implements ClusterMessageHandler {
570 public void handle(final ClusterMessage message) {
571 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
572 log.debug("received batch request {}", operation);
574 final DeviceId deviceId = operation.deviceId();
575 NodeId master = mastershipService.getMasterFor(deviceId);
576 if (!Objects.equal(local, master)) {
577 Set<FlowRule> failures = new HashSet<>(operation.size());
578 for (FlowRuleBatchEntry op : operation.getOperations()) {
579 failures.add(op.target());
581 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
582 // This node is no longer the master, respond as all failed.
583 // TODO: we might want to wrap response in envelope
584 // to distinguish sw programming failure and hand over
585 // it make sense in the latter case to retry immediately.
586 message.respond(SERIALIZER.encode(allFailed));
590 pendingResponses.put(operation.id(), message.sender());
591 storeBatchInternal(operation);
595 private class InternalFlowTable implements ReplicaInfoEventListener {
597 private final Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>
598 flowEntries = Maps.newConcurrentMap();
600 private final Map<DeviceId, Long> lastBackupTimes = Maps.newConcurrentMap();
601 private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
602 private final Map<DeviceId, NodeId> lastBackupNodes = Maps.newConcurrentMap();
605 public void event(ReplicaInfoEvent event) {
606 if (!backupEnabled) {
609 if (event.type() == ReplicaInfoEvent.Type.BACKUPS_CHANGED) {
610 DeviceId deviceId = event.subject();
611 NodeId master = mastershipService.getMasterFor(deviceId);
612 if (!Objects.equal(local, master)) {
613 // ignore since this event is for a device this node does not manage.
616 NodeId newBackupNode = getBackupNode(deviceId);
617 NodeId currentBackupNode = lastBackupNodes.get(deviceId);
618 if (Objects.equal(newBackupNode, currentBackupNode)) {
619 // ignore since backup location hasn't changed.
622 if (currentBackupNode != null && newBackupNode == null) {
623 // Current backup node is most likely down and no alternate backup node
624 // has been chosen. Clear current backup location so that we can resume
625 // backups when either current backup comes online or a different backup node
627 log.warn("Lost backup location {} for deviceId {} and no alternate backup node exists. "
628 + "Flows can be lost if the master goes down", currentBackupNode, deviceId);
629 lastBackupNodes.remove(deviceId);
630 lastBackupTimes.remove(deviceId);
632 // TODO: Pick any available node as backup and ensure hand-off occurs when
633 // a new master is elected.
635 log.debug("Backup location for {} has changed from {} to {}.",
636 deviceId, currentBackupNode, newBackupNode);
637 backupSenderExecutor.schedule(() -> backupFlowEntries(newBackupNode, Sets.newHashSet(deviceId)),
643 private void sendBackups(NodeId nodeId, Set<DeviceId> deviceIds) {
644 // split up the devices into smaller batches and send them separately.
645 Iterables.partition(deviceIds, FLOW_TABLE_BACKUP_BATCH_SIZE)
646 .forEach(ids -> backupFlowEntries(nodeId, Sets.newHashSet(ids)));
649 private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
650 if (deviceIds.isEmpty()) {
653 log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
654 Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
655 Maps.newConcurrentMap();
656 deviceIds.forEach(id -> deviceFlowEntries.put(id, ImmutableMap.copyOf(getFlowTable(id))));
657 clusterCommunicator.<Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>, Set<DeviceId>>sendAndReceive(
663 .whenComplete((backedupDevices, error) -> {
664 Set<DeviceId> devicesNotBackedup = error != null ?
665 deviceFlowEntries.keySet() :
666 Sets.difference(deviceFlowEntries.keySet(), backedupDevices);
667 if (devicesNotBackedup.size() > 0) {
668 log.warn("Failed to backup devices: {}. Reason: {}",
669 devicesNotBackedup, error.getMessage());
671 if (backedupDevices != null) {
672 backedupDevices.forEach(id -> {
673 lastBackupTimes.put(id, System.currentTimeMillis());
674 lastBackupNodes.put(id, nodeId);
681 * Returns the flow table for specified device.
683 * @param deviceId identifier of the device
684 * @return Map representing Flow Table of given device.
686 private Map<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
687 return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
690 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
691 return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
694 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
695 Set<StoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.deviceId(), rule.id());
696 return flowEntries.stream()
697 .filter(entry -> Objects.equal(entry, rule))
702 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
703 Set<FlowEntry> result = Sets.newHashSet();
704 getFlowTable(deviceId).values().forEach(result::addAll);
708 public StoredFlowEntry getFlowEntry(FlowRule rule) {
709 return getFlowEntryInternal(rule);
712 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
713 return getFlowEntriesInternal(deviceId);
716 public void add(FlowEntry rule) {
717 getFlowEntriesInternal(rule.deviceId(), rule.id()).add((StoredFlowEntry) rule);
718 lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
721 public boolean remove(DeviceId deviceId, FlowEntry rule) {
723 return getFlowEntriesInternal(deviceId, rule.id()).remove(rule);
725 lastUpdateTimes.put(deviceId, System.currentTimeMillis());
729 private NodeId getBackupNode(DeviceId deviceId) {
730 List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
731 // pick the standby which is most likely to become next master
732 return deviceStandbys.isEmpty() ? null : deviceStandbys.get(0);
735 private void backup() {
736 if (!backupEnabled) {
740 // determine the set of devices that we need to backup during this run.
741 Set<DeviceId> devicesToBackup = mastershipService.getDevicesOf(local)
743 .filter(deviceId -> {
744 Long lastBackupTime = lastBackupTimes.get(deviceId);
745 Long lastUpdateTime = lastUpdateTimes.get(deviceId);
746 NodeId lastBackupNode = lastBackupNodes.get(deviceId);
747 NodeId newBackupNode = getBackupNode(deviceId);
748 return lastBackupTime == null
749 || !Objects.equal(lastBackupNode, newBackupNode)
750 || (lastUpdateTime != null && lastUpdateTime > lastBackupTime);
752 .collect(Collectors.toSet());
754 // compute a mapping from node to the set of devices whose flow entries it should backup
755 Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
756 devicesToBackup.forEach(deviceId -> {
757 NodeId backupLocation = getBackupNode(deviceId);
758 if (backupLocation != null) {
759 devicesToBackupByNode.computeIfAbsent(backupLocation, nodeId -> Sets.newHashSet())
763 // send the device flow entries to their respective backup nodes
764 devicesToBackupByNode.forEach(this::sendBackups);
765 } catch (Exception e) {
766 log.error("Backup failed.", e);
770 private Set<DeviceId> onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
771 log.debug("Received flowEntries for {} to backup", flowTables.keySet());
772 Set<DeviceId> backedupDevices = Sets.newHashSet();
774 flowTables.forEach((deviceId, deviceFlowTable) -> {
775 // Only process those devices are that not managed by the local node.
776 if (!Objects.equal(local, mastershipService.getMasterFor(deviceId))) {
777 Map<FlowId, Set<StoredFlowEntry>> backupFlowTable = getFlowTable(deviceId);
778 backupFlowTable.clear();
779 backupFlowTable.putAll(deviceFlowTable);
780 backedupDevices.add(deviceId);
783 } catch (Exception e) {
784 log.warn("Failure processing backup request", e);
786 return backedupDevices;