2 * Copyright 2014-2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.store.flow.impl;
18 import com.google.common.base.Objects;
19 import com.google.common.collect.ImmutableList;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.collect.Iterables;
22 import com.google.common.collect.Maps;
23 import com.google.common.collect.Sets;
24 import com.google.common.util.concurrent.Futures;
26 import org.apache.felix.scr.annotations.Activate;
27 import org.apache.felix.scr.annotations.Component;
28 import org.apache.felix.scr.annotations.Deactivate;
29 import org.apache.felix.scr.annotations.Modified;
30 import org.apache.felix.scr.annotations.Property;
31 import org.apache.felix.scr.annotations.Reference;
32 import org.apache.felix.scr.annotations.ReferenceCardinality;
33 import org.apache.felix.scr.annotations.Service;
34 import org.onlab.util.KryoNamespace;
35 import org.onlab.util.Tools;
36 import org.onosproject.cfg.ComponentConfigService;
37 import org.onosproject.cluster.ClusterService;
38 import org.onosproject.cluster.NodeId;
39 import org.onosproject.core.CoreService;
40 import org.onosproject.core.IdGenerator;
41 import org.onosproject.mastership.MastershipService;
42 import org.onosproject.net.DeviceId;
43 import org.onosproject.net.device.DeviceService;
44 import org.onosproject.net.flow.CompletedBatchOperation;
45 import org.onosproject.net.flow.DefaultFlowEntry;
46 import org.onosproject.net.flow.FlowEntry;
47 import org.onosproject.net.flow.FlowEntry.FlowEntryState;
48 import org.onosproject.net.flow.FlowId;
49 import org.onosproject.net.flow.FlowRule;
50 import org.onosproject.net.flow.FlowRuleBatchEntry;
51 import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
52 import org.onosproject.net.flow.FlowRuleBatchEvent;
53 import org.onosproject.net.flow.FlowRuleBatchOperation;
54 import org.onosproject.net.flow.FlowRuleBatchRequest;
55 import org.onosproject.net.flow.FlowRuleEvent;
56 import org.onosproject.net.flow.FlowRuleEvent.Type;
57 import org.onosproject.net.flow.FlowRuleService;
58 import org.onosproject.net.flow.FlowRuleStore;
59 import org.onosproject.net.flow.FlowRuleStoreDelegate;
60 import org.onosproject.net.flow.StoredFlowEntry;
61 import org.onosproject.net.flow.TableStatisticsEntry;
62 import org.onosproject.store.AbstractStore;
63 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
64 import org.onosproject.store.cluster.messaging.ClusterMessage;
65 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
66 import org.onosproject.store.flow.ReplicaInfoEvent;
67 import org.onosproject.store.flow.ReplicaInfoEventListener;
68 import org.onosproject.store.flow.ReplicaInfoService;
69 import org.onosproject.store.impl.MastershipBasedTimestamp;
70 import org.onosproject.store.serializers.KryoNamespaces;
71 import org.onosproject.store.serializers.KryoSerializer;
72 import org.onosproject.store.serializers.StoreSerializer;
73 import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
74 import org.onosproject.store.service.EventuallyConsistentMap;
75 import org.onosproject.store.service.EventuallyConsistentMapEvent;
76 import org.onosproject.store.service.EventuallyConsistentMapListener;
77 import org.onosproject.store.service.StorageService;
78 import org.onosproject.store.service.WallClockTimestamp;
79 import org.osgi.service.component.ComponentContext;
80 import org.slf4j.Logger;
82 import java.util.Collections;
83 import java.util.Dictionary;
84 import java.util.HashSet;
85 import java.util.List;
88 import java.util.concurrent.ExecutorService;
89 import java.util.concurrent.Executors;
90 import java.util.concurrent.ScheduledExecutorService;
91 import java.util.concurrent.ScheduledFuture;
92 import java.util.concurrent.TimeUnit;
93 import java.util.concurrent.atomic.AtomicInteger;
94 import java.util.stream.Collectors;
96 import static com.google.common.base.Strings.isNullOrEmpty;
97 import static org.onlab.util.Tools.get;
98 import static org.onlab.util.Tools.groupedThreads;
99 import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
100 import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
101 import static org.slf4j.LoggerFactory.getLogger;
104 * Manages inventory of flow rules using a distributed state management protocol.
106 @Component(immediate = true, enabled = true)
108 public class NewDistributedFlowRuleStore
109 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
110 implements FlowRuleStore {
112 private final Logger log = getLogger(getClass());
114 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
115 private static final boolean DEFAULT_BACKUP_ENABLED = true;
116 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
117 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
118 // number of devices whose flow entries will be backed up in one communication round
119 private static final int FLOW_TABLE_BACKUP_BATCH_SIZE = 1;
121 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
122 label = "Number of threads in the message handler pool")
123 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
125 @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
126 label = "Indicates whether backups are enabled or not")
127 private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
129 @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
130 label = "Delay in ms between successive backup runs")
131 private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
133 private InternalFlowTable flowTable = new InternalFlowTable();
135 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
136 protected ReplicaInfoService replicaInfoManager;
138 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
139 protected ClusterCommunicationService clusterCommunicator;
141 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
142 protected ClusterService clusterService;
144 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
145 protected DeviceService deviceService;
147 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
148 protected CoreService coreService;
150 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
151 protected ComponentConfigService configService;
153 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
154 protected MastershipService mastershipService;
156 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
157 private ExecutorService messageHandlingExecutor;
159 private ScheduledFuture<?> backupTask;
160 private final ScheduledExecutorService backupSenderExecutor =
161 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
163 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
164 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
165 new InternalTableStatsListener();
167 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
168 protected StorageService storageService;
170 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
172 protected void setupKryoPool() {
173 serializerPool = KryoNamespace.newBuilder()
174 .register(DistributedStoreSerializers.STORE_COMMON)
175 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
180 protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
181 .register(KryoNamespaces.API)
182 .register(MastershipBasedTimestamp.class);
185 private IdGenerator idGenerator;
186 private NodeId local;
189 public void activate(ComponentContext context) {
190 configService.registerProperties(getClass());
192 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
194 local = clusterService.getLocalNode().id();
196 messageHandlingExecutor = Executors.newFixedThreadPool(
197 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
199 registerMessageHandlers(messageHandlingExecutor);
202 replicaInfoManager.addListener(flowTable);
203 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
207 TimeUnit.MILLISECONDS);
210 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
211 .withName("onos-flow-table-stats")
212 .withSerializer(SERIALIZER_BUILDER)
213 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
214 .withTimestampProvider((k, v) -> new WallClockTimestamp())
215 .withTombstonesDisabled()
217 deviceTableStats.addListener(tableStatsListener);
219 logConfig("Started");
223 public void deactivate(ComponentContext context) {
225 replicaInfoManager.removeListener(flowTable);
226 backupTask.cancel(true);
228 configService.unregisterProperties(getClass(), false);
229 unregisterMessageHandlers();
230 deviceTableStats.removeListener(tableStatsListener);
231 deviceTableStats.destroy();
232 messageHandlingExecutor.shutdownNow();
233 backupSenderExecutor.shutdownNow();
237 @SuppressWarnings("rawtypes")
239 public void modified(ComponentContext context) {
240 if (context == null) {
241 backupEnabled = DEFAULT_BACKUP_ENABLED;
242 logConfig("Default config");
246 Dictionary properties = context.getProperties();
248 boolean newBackupEnabled;
251 String s = get(properties, "msgHandlerPoolSize");
252 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
254 s = get(properties, "backupEnabled");
255 newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
257 s = get(properties, "backupPeriod");
258 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
260 } catch (NumberFormatException | ClassCastException e) {
261 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
262 newBackupEnabled = DEFAULT_BACKUP_ENABLED;
263 newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
266 boolean restartBackupTask = false;
267 if (newBackupEnabled != backupEnabled) {
268 backupEnabled = newBackupEnabled;
269 if (!backupEnabled) {
270 replicaInfoManager.removeListener(flowTable);
271 if (backupTask != null) {
272 backupTask.cancel(false);
276 replicaInfoManager.addListener(flowTable);
278 restartBackupTask = backupEnabled;
280 if (newBackupPeriod != backupPeriod) {
281 backupPeriod = newBackupPeriod;
282 restartBackupTask = backupEnabled;
284 if (restartBackupTask) {
285 if (backupTask != null) {
286 // cancel previously running task
287 backupTask.cancel(false);
289 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
293 TimeUnit.MILLISECONDS);
295 if (newPoolSize != msgHandlerPoolSize) {
296 msgHandlerPoolSize = newPoolSize;
297 ExecutorService oldMsgHandler = messageHandlingExecutor;
298 messageHandlingExecutor = Executors.newFixedThreadPool(
299 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
301 // replace previously registered handlers.
302 registerMessageHandlers(messageHandlingExecutor);
303 oldMsgHandler.shutdown();
305 logConfig("Reconfigured");
308 private void registerMessageHandlers(ExecutorService executor) {
310 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
311 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
312 REMOTE_APPLY_COMPLETED, SERIALIZER::decode, this::notifyDelegate, executor);
313 clusterCommunicator.addSubscriber(
314 GET_FLOW_ENTRY, SERIALIZER::decode, flowTable::getFlowEntry, SERIALIZER::encode, executor);
315 clusterCommunicator.addSubscriber(
316 GET_DEVICE_FLOW_ENTRIES, SERIALIZER::decode, flowTable::getFlowEntries, SERIALIZER::encode, executor);
317 clusterCommunicator.addSubscriber(
318 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
319 clusterCommunicator.addSubscriber(
320 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
321 clusterCommunicator.addSubscriber(
322 FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, SERIALIZER::encode, executor);
325 private void unregisterMessageHandlers() {
326 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
327 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
328 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
329 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
330 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
331 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
334 private void logConfig(String prefix) {
335 log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}",
336 prefix, msgHandlerPoolSize, backupEnabled, backupPeriod);
339 // This is not a efficient operation on a distributed sharded
340 // flow store. We need to revisit the need for this operation or at least
341 // make it device specific.
343 public int getFlowRuleCount() {
344 AtomicInteger sum = new AtomicInteger(0);
345 deviceService.getDevices().forEach(device -> sum.addAndGet(Iterables.size(getFlowEntries(device.id()))));
350 public FlowEntry getFlowEntry(FlowRule rule) {
351 NodeId master = mastershipService.getMasterFor(rule.deviceId());
353 if (master == null) {
354 log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
358 if (Objects.equal(local, master)) {
359 return flowTable.getFlowEntry(rule);
362 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
363 master, rule.deviceId());
365 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
366 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
370 FLOW_RULE_STORE_TIMEOUT_MILLIS,
371 TimeUnit.MILLISECONDS,
376 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
377 NodeId master = mastershipService.getMasterFor(deviceId);
379 if (master == null) {
380 log.debug("Failed to getFlowEntries: No master for {}", deviceId);
381 return Collections.emptyList();
384 if (Objects.equal(local, master)) {
385 return flowTable.getFlowEntries(deviceId);
388 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
391 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
392 FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
396 FLOW_RULE_STORE_TIMEOUT_MILLIS,
397 TimeUnit.MILLISECONDS,
398 Collections.emptyList());
402 public void storeFlowRule(FlowRule rule) {
403 storeBatch(new FlowRuleBatchOperation(
404 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
405 rule.deviceId(), idGenerator.getNewId()));
409 public void storeBatch(FlowRuleBatchOperation operation) {
410 if (operation.getOperations().isEmpty()) {
411 notifyDelegate(FlowRuleBatchEvent.completed(
412 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
413 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
417 DeviceId deviceId = operation.deviceId();
418 NodeId master = mastershipService.getMasterFor(deviceId);
420 if (master == null) {
421 log.warn("No master for {} : flows will be marked for removal", deviceId);
423 updateStoreInternal(operation);
425 notifyDelegate(FlowRuleBatchEvent.completed(
426 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
427 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
431 if (Objects.equal(local, master)) {
432 storeBatchInternal(operation);
436 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
439 clusterCommunicator.unicast(operation,
443 .whenComplete((result, error) -> {
445 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
447 Set<FlowRule> allFailures = operation.getOperations()
449 .map(op -> op.target())
450 .collect(Collectors.toSet());
452 notifyDelegate(FlowRuleBatchEvent.completed(
453 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
454 new CompletedBatchOperation(false, allFailures, deviceId)));
459 private void storeBatchInternal(FlowRuleBatchOperation operation) {
461 final DeviceId did = operation.deviceId();
462 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
463 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
464 if (currentOps.isEmpty()) {
465 batchOperationComplete(FlowRuleBatchEvent.completed(
466 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
467 new CompletedBatchOperation(true, Collections.emptySet(), did)));
471 notifyDelegate(FlowRuleBatchEvent.requested(new
472 FlowRuleBatchRequest(operation.id(),
473 currentOps), operation.deviceId()));
476 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
477 return operation.getOperations().stream().map(
479 StoredFlowEntry entry;
480 switch (op.operator()) {
482 entry = new DefaultFlowEntry(op.target());
483 // always add requested FlowRule
484 // Note: 2 equal FlowEntry may have different treatment
485 flowTable.remove(entry.deviceId(), entry);
486 flowTable.add(entry);
490 entry = flowTable.getFlowEntry(op.target());
492 entry.setState(FlowEntryState.PENDING_REMOVE);
497 //TODO: figure this out at some point
500 log.warn("Unknown flow operation operator: {}", op.operator());
504 ).filter(op -> op != null).collect(Collectors.toSet());
508 public void deleteFlowRule(FlowRule rule) {
510 new FlowRuleBatchOperation(
511 Collections.singletonList(
512 new FlowRuleBatchEntry(
513 FlowRuleOperation.REMOVE,
514 rule)), rule.deviceId(), idGenerator.getNewId()));
518 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
519 NodeId master = mastershipService.getMasterFor(rule.deviceId());
520 if (Objects.equal(local, master)) {
521 return addOrUpdateFlowRuleInternal(rule);
524 log.warn("Tried to update FlowRule {} state,"
525 + " while the Node was not the master.", rule);
529 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
530 // check if this new rule is an update to an existing entry
531 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
532 if (stored != null) {
533 stored.setBytes(rule.bytes());
534 stored.setLife(rule.life());
535 stored.setPackets(rule.packets());
536 if (stored.state() == FlowEntryState.PENDING_ADD) {
537 stored.setState(FlowEntryState.ADDED);
538 return new FlowRuleEvent(Type.RULE_ADDED, rule);
540 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
543 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
544 // TODO: also update backup if the behavior is correct.
550 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
551 final DeviceId deviceId = rule.deviceId();
552 NodeId master = mastershipService.getMasterFor(deviceId);
554 if (Objects.equal(local, master)) {
555 // bypass and handle it locally
556 return removeFlowRuleInternal(rule);
559 if (master == null) {
560 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
561 // TODO: revisit if this should be null (="no-op") or Exception
565 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
568 return Futures.get(clusterCommunicator.sendAndReceive(
574 FLOW_RULE_STORE_TIMEOUT_MILLIS,
575 TimeUnit.MILLISECONDS,
576 RuntimeException.class);
579 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
580 final DeviceId deviceId = rule.deviceId();
581 // This is where one could mark a rule as removed and still keep it in the store.
582 final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
583 return removed ? new FlowRuleEvent(RULE_REMOVED, rule) : null;
587 public void batchOperationComplete(FlowRuleBatchEvent event) {
588 //FIXME: need a per device pending response
589 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
590 if (nodeId == null) {
591 notifyDelegate(event);
593 // TODO check unicast return value
594 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
595 //error log: log.warn("Failed to respond to peer for batch operation result");
599 private final class OnStoreBatch implements ClusterMessageHandler {
602 public void handle(final ClusterMessage message) {
603 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
604 log.debug("received batch request {}", operation);
606 final DeviceId deviceId = operation.deviceId();
607 NodeId master = mastershipService.getMasterFor(deviceId);
608 if (!Objects.equal(local, master)) {
609 Set<FlowRule> failures = new HashSet<>(operation.size());
610 for (FlowRuleBatchEntry op : operation.getOperations()) {
611 failures.add(op.target());
613 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
614 // This node is no longer the master, respond as all failed.
615 // TODO: we might want to wrap response in envelope
616 // to distinguish sw programming failure and hand over
617 // it make sense in the latter case to retry immediately.
618 message.respond(SERIALIZER.encode(allFailed));
622 pendingResponses.put(operation.id(), message.sender());
623 storeBatchInternal(operation);
627 private class InternalFlowTable implements ReplicaInfoEventListener {
629 private final Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>
630 flowEntries = Maps.newConcurrentMap();
632 private final Map<DeviceId, Long> lastBackupTimes = Maps.newConcurrentMap();
633 private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
634 private final Map<DeviceId, NodeId> lastBackupNodes = Maps.newConcurrentMap();
637 public void event(ReplicaInfoEvent event) {
638 if (!backupEnabled) {
641 if (event.type() == ReplicaInfoEvent.Type.BACKUPS_CHANGED) {
642 DeviceId deviceId = event.subject();
643 NodeId master = mastershipService.getMasterFor(deviceId);
644 if (!Objects.equal(local, master)) {
645 // ignore since this event is for a device this node does not manage.
648 NodeId newBackupNode = getBackupNode(deviceId);
649 NodeId currentBackupNode = lastBackupNodes.get(deviceId);
650 if (Objects.equal(newBackupNode, currentBackupNode)) {
651 // ignore since backup location hasn't changed.
654 if (currentBackupNode != null && newBackupNode == null) {
655 // Current backup node is most likely down and no alternate backup node
656 // has been chosen. Clear current backup location so that we can resume
657 // backups when either current backup comes online or a different backup node
659 log.warn("Lost backup location {} for deviceId {} and no alternate backup node exists. "
660 + "Flows can be lost if the master goes down", currentBackupNode, deviceId);
661 lastBackupNodes.remove(deviceId);
662 lastBackupTimes.remove(deviceId);
664 // TODO: Pick any available node as backup and ensure hand-off occurs when
665 // a new master is elected.
667 log.debug("Backup location for {} has changed from {} to {}.",
668 deviceId, currentBackupNode, newBackupNode);
669 backupSenderExecutor.schedule(() -> backupFlowEntries(newBackupNode, Sets.newHashSet(deviceId)),
675 private void sendBackups(NodeId nodeId, Set<DeviceId> deviceIds) {
676 // split up the devices into smaller batches and send them separately.
677 Iterables.partition(deviceIds, FLOW_TABLE_BACKUP_BATCH_SIZE)
678 .forEach(ids -> backupFlowEntries(nodeId, Sets.newHashSet(ids)));
681 private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
682 if (deviceIds.isEmpty()) {
685 log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
686 Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
687 Maps.newConcurrentMap();
688 deviceIds.forEach(id -> deviceFlowEntries.put(id, ImmutableMap.copyOf(getFlowTable(id))));
689 clusterCommunicator.<Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>, Set<DeviceId>>sendAndReceive(
695 .whenComplete((backedupDevices, error) -> {
696 Set<DeviceId> devicesNotBackedup = error != null ?
697 deviceFlowEntries.keySet() :
698 Sets.difference(deviceFlowEntries.keySet(), backedupDevices);
699 if (devicesNotBackedup.size() > 0) {
700 log.warn("Failed to backup devices: {}. Reason: {}",
701 devicesNotBackedup, error.getMessage());
703 if (backedupDevices != null) {
704 backedupDevices.forEach(id -> {
705 lastBackupTimes.put(id, System.currentTimeMillis());
706 lastBackupNodes.put(id, nodeId);
713 * Returns the flow table for specified device.
715 * @param deviceId identifier of the device
716 * @return Map representing Flow Table of given device.
718 private Map<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
719 return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
722 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
723 return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
726 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
727 Set<StoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.deviceId(), rule.id());
728 return flowEntries.stream()
729 .filter(entry -> Objects.equal(entry, rule))
734 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
735 Set<FlowEntry> result = Sets.newHashSet();
736 getFlowTable(deviceId).values().forEach(result::addAll);
740 public StoredFlowEntry getFlowEntry(FlowRule rule) {
741 return getFlowEntryInternal(rule);
744 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
745 return getFlowEntriesInternal(deviceId);
748 public void add(FlowEntry rule) {
749 getFlowEntriesInternal(rule.deviceId(), rule.id()).add((StoredFlowEntry) rule);
750 lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
753 public boolean remove(DeviceId deviceId, FlowEntry rule) {
755 return getFlowEntriesInternal(deviceId, rule.id()).remove(rule);
757 lastUpdateTimes.put(deviceId, System.currentTimeMillis());
761 private NodeId getBackupNode(DeviceId deviceId) {
762 List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
763 // pick the standby which is most likely to become next master
764 return deviceStandbys.isEmpty() ? null : deviceStandbys.get(0);
767 private void backup() {
768 if (!backupEnabled) {
772 // determine the set of devices that we need to backup during this run.
773 Set<DeviceId> devicesToBackup = mastershipService.getDevicesOf(local)
775 .filter(deviceId -> {
776 Long lastBackupTime = lastBackupTimes.get(deviceId);
777 Long lastUpdateTime = lastUpdateTimes.get(deviceId);
778 NodeId lastBackupNode = lastBackupNodes.get(deviceId);
779 NodeId newBackupNode = getBackupNode(deviceId);
780 return lastBackupTime == null
781 || !Objects.equal(lastBackupNode, newBackupNode)
782 || (lastUpdateTime != null && lastUpdateTime > lastBackupTime);
784 .collect(Collectors.toSet());
786 // compute a mapping from node to the set of devices whose flow entries it should backup
787 Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
788 devicesToBackup.forEach(deviceId -> {
789 NodeId backupLocation = getBackupNode(deviceId);
790 if (backupLocation != null) {
791 devicesToBackupByNode.computeIfAbsent(backupLocation, nodeId -> Sets.newHashSet())
795 // send the device flow entries to their respective backup nodes
796 devicesToBackupByNode.forEach(this::sendBackups);
797 } catch (Exception e) {
798 log.error("Backup failed.", e);
802 private Set<DeviceId> onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
803 log.debug("Received flowEntries for {} to backup", flowTables.keySet());
804 Set<DeviceId> backedupDevices = Sets.newHashSet();
806 flowTables.forEach((deviceId, deviceFlowTable) -> {
807 // Only process those devices are that not managed by the local node.
808 if (!Objects.equal(local, mastershipService.getMasterFor(deviceId))) {
809 Map<FlowId, Set<StoredFlowEntry>> backupFlowTable = getFlowTable(deviceId);
810 backupFlowTable.clear();
811 backupFlowTable.putAll(deviceFlowTable);
812 backedupDevices.add(deviceId);
815 } catch (Exception e) {
816 log.warn("Failure processing backup request", e);
818 return backedupDevices;
823 public FlowRuleEvent updateTableStatistics(DeviceId deviceId,
824 List<TableStatisticsEntry> tableStats) {
825 deviceTableStats.put(deviceId, tableStats);
830 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
831 NodeId master = mastershipService.getMasterFor(deviceId);
833 if (master == null) {
834 log.debug("Failed to getTableStats: No master for {}", deviceId);
835 return Collections.emptyList();
838 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
839 if (tableStats == null) {
840 return Collections.emptyList();
842 return ImmutableList.copyOf(tableStats);
845 private class InternalTableStatsListener
846 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
848 public void event(EventuallyConsistentMapEvent<DeviceId,
849 List<TableStatisticsEntry>> event) {
850 //TODO: Generate an event to listeners (do we need?)