de7a3ac3128121055fdce11a095513d863fb68e6
[onosfw.git] /
1  /*
2  * Copyright 2014-2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.store.flow.impl;
17
18 import com.google.common.base.Objects;
19 import com.google.common.collect.ImmutableMap;
20 import com.google.common.collect.Iterables;
21 import com.google.common.collect.Maps;
22 import com.google.common.collect.Sets;
23 import com.google.common.util.concurrent.Futures;
24
25 import org.apache.felix.scr.annotations.Activate;
26 import org.apache.felix.scr.annotations.Component;
27 import org.apache.felix.scr.annotations.Deactivate;
28 import org.apache.felix.scr.annotations.Modified;
29 import org.apache.felix.scr.annotations.Property;
30 import org.apache.felix.scr.annotations.Reference;
31 import org.apache.felix.scr.annotations.ReferenceCardinality;
32 import org.apache.felix.scr.annotations.Service;
33 import org.onlab.util.KryoNamespace;
34 import org.onlab.util.Tools;
35 import org.onosproject.cfg.ComponentConfigService;
36 import org.onosproject.cluster.ClusterService;
37 import org.onosproject.cluster.NodeId;
38 import org.onosproject.core.CoreService;
39 import org.onosproject.core.IdGenerator;
40 import org.onosproject.mastership.MastershipService;
41 import org.onosproject.net.DeviceId;
42 import org.onosproject.net.device.DeviceService;
43 import org.onosproject.net.flow.CompletedBatchOperation;
44 import org.onosproject.net.flow.DefaultFlowEntry;
45 import org.onosproject.net.flow.FlowEntry;
46 import org.onosproject.net.flow.FlowEntry.FlowEntryState;
47 import org.onosproject.net.flow.FlowId;
48 import org.onosproject.net.flow.FlowRule;
49 import org.onosproject.net.flow.FlowRuleBatchEntry;
50 import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
51 import org.onosproject.net.flow.FlowRuleBatchEvent;
52 import org.onosproject.net.flow.FlowRuleBatchOperation;
53 import org.onosproject.net.flow.FlowRuleBatchRequest;
54 import org.onosproject.net.flow.FlowRuleEvent;
55 import org.onosproject.net.flow.FlowRuleEvent.Type;
56 import org.onosproject.net.flow.FlowRuleService;
57 import org.onosproject.net.flow.FlowRuleStore;
58 import org.onosproject.net.flow.FlowRuleStoreDelegate;
59 import org.onosproject.net.flow.StoredFlowEntry;
60 import org.onosproject.store.AbstractStore;
61 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
62 import org.onosproject.store.cluster.messaging.ClusterMessage;
63 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
64 import org.onosproject.store.flow.ReplicaInfoEvent;
65 import org.onosproject.store.flow.ReplicaInfoEventListener;
66 import org.onosproject.store.flow.ReplicaInfoService;
67 import org.onosproject.store.serializers.KryoSerializer;
68 import org.onosproject.store.serializers.StoreSerializer;
69 import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
70 import org.osgi.service.component.ComponentContext;
71 import org.slf4j.Logger;
72
73 import java.util.Collections;
74 import java.util.Dictionary;
75 import java.util.HashSet;
76 import java.util.List;
77 import java.util.Map;
78 import java.util.Set;
79 import java.util.concurrent.ExecutorService;
80 import java.util.concurrent.Executors;
81 import java.util.concurrent.ScheduledExecutorService;
82 import java.util.concurrent.ScheduledFuture;
83 import java.util.concurrent.TimeUnit;
84 import java.util.concurrent.atomic.AtomicInteger;
85 import java.util.stream.Collectors;
86
87 import static com.google.common.base.Strings.isNullOrEmpty;
88 import static org.onlab.util.Tools.get;
89 import static org.onlab.util.Tools.groupedThreads;
90 import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
91 import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
92 import static org.slf4j.LoggerFactory.getLogger;
93
94 /**
95  * Manages inventory of flow rules using a distributed state management protocol.
96  */
97 @Component(immediate = true, enabled = true)
98 @Service
99 public class NewDistributedFlowRuleStore
100         extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
101         implements FlowRuleStore {
102
103     private final Logger log = getLogger(getClass());
104
105     private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
106     private static final boolean DEFAULT_BACKUP_ENABLED = true;
107     private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
108     private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
109     // number of devices whose flow entries will be backed up in one communication round
110     private static final int FLOW_TABLE_BACKUP_BATCH_SIZE = 1;
111
112     @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
113             label = "Number of threads in the message handler pool")
114     private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
115
116     @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
117             label = "Indicates whether backups are enabled or not")
118     private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
119
120     @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
121             label = "Delay in ms between successive backup runs")
122     private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
123
124     private InternalFlowTable flowTable = new InternalFlowTable();
125
126     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
127     protected ReplicaInfoService replicaInfoManager;
128
129     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
130     protected ClusterCommunicationService clusterCommunicator;
131
132     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
133     protected ClusterService clusterService;
134
135     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
136     protected DeviceService deviceService;
137
138     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
139     protected CoreService coreService;
140
141     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
142     protected ComponentConfigService configService;
143
144     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
145     protected MastershipService mastershipService;
146
147     private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
148     private ExecutorService messageHandlingExecutor;
149
150     private ScheduledFuture<?> backupTask;
151     private final ScheduledExecutorService backupSenderExecutor =
152             Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
153
154     protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
155         @Override
156         protected void setupKryoPool() {
157             serializerPool = KryoNamespace.newBuilder()
158                     .register(DistributedStoreSerializers.STORE_COMMON)
159                     .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
160                     .build();
161         }
162     };
163
164     private IdGenerator idGenerator;
165     private NodeId local;
166
167     @Activate
168     public void activate(ComponentContext context) {
169         configService.registerProperties(getClass());
170
171         idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
172
173         local = clusterService.getLocalNode().id();
174
175         messageHandlingExecutor = Executors.newFixedThreadPool(
176                 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
177
178         registerMessageHandlers(messageHandlingExecutor);
179
180         if (backupEnabled) {
181             replicaInfoManager.addListener(flowTable);
182             backupTask = backupSenderExecutor.scheduleWithFixedDelay(
183                     flowTable::backup,
184                     0,
185                     backupPeriod,
186                     TimeUnit.MILLISECONDS);
187         }
188
189         logConfig("Started");
190     }
191
192     @Deactivate
193     public void deactivate(ComponentContext context) {
194         if (backupEnabled) {
195             replicaInfoManager.removeListener(flowTable);
196             backupTask.cancel(true);
197         }
198         configService.unregisterProperties(getClass(), false);
199         unregisterMessageHandlers();
200         messageHandlingExecutor.shutdownNow();
201         backupSenderExecutor.shutdownNow();
202         log.info("Stopped");
203     }
204
205     @SuppressWarnings("rawtypes")
206     @Modified
207     public void modified(ComponentContext context) {
208         if (context == null) {
209             backupEnabled = DEFAULT_BACKUP_ENABLED;
210             logConfig("Default config");
211             return;
212         }
213
214         Dictionary properties = context.getProperties();
215         int newPoolSize;
216         boolean newBackupEnabled;
217         int newBackupPeriod;
218         try {
219             String s = get(properties, "msgHandlerPoolSize");
220             newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
221
222             s = get(properties, "backupEnabled");
223             newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
224
225             s = get(properties, "backupPeriod");
226             newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
227
228         } catch (NumberFormatException | ClassCastException e) {
229             newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
230             newBackupEnabled = DEFAULT_BACKUP_ENABLED;
231             newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
232         }
233
234         boolean restartBackupTask = false;
235         if (newBackupEnabled != backupEnabled) {
236             backupEnabled = newBackupEnabled;
237             if (!backupEnabled) {
238                 replicaInfoManager.removeListener(flowTable);
239                 if (backupTask != null) {
240                     backupTask.cancel(false);
241                     backupTask = null;
242                 }
243             } else {
244                 replicaInfoManager.addListener(flowTable);
245             }
246             restartBackupTask = backupEnabled;
247         }
248         if (newBackupPeriod != backupPeriod) {
249             backupPeriod = newBackupPeriod;
250             restartBackupTask = backupEnabled;
251         }
252         if (restartBackupTask) {
253             if (backupTask != null) {
254                 // cancel previously running task
255                 backupTask.cancel(false);
256             }
257             backupTask = backupSenderExecutor.scheduleWithFixedDelay(
258                     flowTable::backup,
259                     0,
260                     backupPeriod,
261                     TimeUnit.MILLISECONDS);
262         }
263         if (newPoolSize != msgHandlerPoolSize) {
264             msgHandlerPoolSize = newPoolSize;
265             ExecutorService oldMsgHandler = messageHandlingExecutor;
266             messageHandlingExecutor = Executors.newFixedThreadPool(
267                     msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
268
269             // replace previously registered handlers.
270             registerMessageHandlers(messageHandlingExecutor);
271             oldMsgHandler.shutdown();
272         }
273         logConfig("Reconfigured");
274     }
275
276     private void registerMessageHandlers(ExecutorService executor) {
277
278         clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
279         clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
280                 REMOTE_APPLY_COMPLETED, SERIALIZER::decode, this::notifyDelegate, executor);
281         clusterCommunicator.addSubscriber(
282                 GET_FLOW_ENTRY, SERIALIZER::decode, flowTable::getFlowEntry, SERIALIZER::encode, executor);
283         clusterCommunicator.addSubscriber(
284                 GET_DEVICE_FLOW_ENTRIES, SERIALIZER::decode, flowTable::getFlowEntries, SERIALIZER::encode, executor);
285         clusterCommunicator.addSubscriber(
286                 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
287         clusterCommunicator.addSubscriber(
288                 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
289         clusterCommunicator.addSubscriber(
290                 FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, SERIALIZER::encode, executor);
291     }
292
293     private void unregisterMessageHandlers() {
294         clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
295         clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
296         clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
297         clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
298         clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
299         clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
300     }
301
302     private void logConfig(String prefix) {
303         log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}",
304                  prefix, msgHandlerPoolSize, backupEnabled, backupPeriod);
305     }
306
307     // This is not a efficient operation on a distributed sharded
308     // flow store. We need to revisit the need for this operation or at least
309     // make it device specific.
310     @Override
311     public int getFlowRuleCount() {
312         AtomicInteger sum = new AtomicInteger(0);
313         deviceService.getDevices().forEach(device -> sum.addAndGet(Iterables.size(getFlowEntries(device.id()))));
314         return sum.get();
315     }
316
317     @Override
318     public FlowEntry getFlowEntry(FlowRule rule) {
319         NodeId master = mastershipService.getMasterFor(rule.deviceId());
320
321         if (master == null) {
322             log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
323             return null;
324         }
325
326         if (Objects.equal(local, master)) {
327             return flowTable.getFlowEntry(rule);
328         }
329
330         log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
331                   master, rule.deviceId());
332
333         return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
334                                     FlowStoreMessageSubjects.GET_FLOW_ENTRY,
335                                     SERIALIZER::encode,
336                                     SERIALIZER::decode,
337                                     master),
338                                FLOW_RULE_STORE_TIMEOUT_MILLIS,
339                                TimeUnit.MILLISECONDS,
340                                null);
341     }
342
343     @Override
344     public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
345         NodeId master = mastershipService.getMasterFor(deviceId);
346
347         if (master == null) {
348             log.debug("Failed to getFlowEntries: No master for {}", deviceId);
349             return Collections.emptyList();
350         }
351
352         if (Objects.equal(local, master)) {
353             return flowTable.getFlowEntries(deviceId);
354         }
355
356         log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
357                   master, deviceId);
358
359         return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
360                                     FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
361                                     SERIALIZER::encode,
362                                     SERIALIZER::decode,
363                                     master),
364                                FLOW_RULE_STORE_TIMEOUT_MILLIS,
365                                TimeUnit.MILLISECONDS,
366                                Collections.emptyList());
367     }
368
369     @Override
370     public void storeFlowRule(FlowRule rule) {
371         storeBatch(new FlowRuleBatchOperation(
372                 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
373                 rule.deviceId(), idGenerator.getNewId()));
374     }
375
376     @Override
377     public void storeBatch(FlowRuleBatchOperation operation) {
378         if (operation.getOperations().isEmpty()) {
379             notifyDelegate(FlowRuleBatchEvent.completed(
380                     new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
381                     new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
382             return;
383         }
384
385         DeviceId deviceId = operation.deviceId();
386         NodeId master = mastershipService.getMasterFor(deviceId);
387
388         if (master == null) {
389             log.warn("No master for {} : flows will be marked for removal", deviceId);
390
391             updateStoreInternal(operation);
392
393             notifyDelegate(FlowRuleBatchEvent.completed(
394                     new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
395                     new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
396             return;
397         }
398
399         if (Objects.equal(local, master)) {
400             storeBatchInternal(operation);
401             return;
402         }
403
404         log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
405                   master, deviceId);
406
407         clusterCommunicator.unicast(operation,
408                                     APPLY_BATCH_FLOWS,
409                                     SERIALIZER::encode,
410                                     master)
411                            .whenComplete((result, error) -> {
412                                if (error != null) {
413                                    log.warn("Failed to storeBatch: {} to {}", operation, master, error);
414
415                                    Set<FlowRule> allFailures = operation.getOperations()
416                                            .stream()
417                                            .map(op -> op.target())
418                                            .collect(Collectors.toSet());
419
420                                    notifyDelegate(FlowRuleBatchEvent.completed(
421                                            new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
422                                            new CompletedBatchOperation(false, allFailures, deviceId)));
423                                }
424                            });
425     }
426
427     private void storeBatchInternal(FlowRuleBatchOperation operation) {
428
429         final DeviceId did = operation.deviceId();
430         //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
431         Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
432         if (currentOps.isEmpty()) {
433             batchOperationComplete(FlowRuleBatchEvent.completed(
434                     new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
435                     new CompletedBatchOperation(true, Collections.emptySet(), did)));
436             return;
437         }
438
439         notifyDelegate(FlowRuleBatchEvent.requested(new
440                            FlowRuleBatchRequest(operation.id(),
441                                                 currentOps), operation.deviceId()));
442     }
443
444     private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
445         return operation.getOperations().stream().map(
446                 op -> {
447                     StoredFlowEntry entry;
448                     switch (op.operator()) {
449                         case ADD:
450                             entry = new DefaultFlowEntry(op.target());
451                             // always add requested FlowRule
452                             // Note: 2 equal FlowEntry may have different treatment
453                             flowTable.remove(entry.deviceId(), entry);
454                             flowTable.add(entry);
455
456                             return op;
457                         case REMOVE:
458                             entry = flowTable.getFlowEntry(op.target());
459                             if (entry != null) {
460                                 entry.setState(FlowEntryState.PENDING_REMOVE);
461                                 return op;
462                             }
463                             break;
464                         case MODIFY:
465                             //TODO: figure this out at some point
466                             break;
467                         default:
468                             log.warn("Unknown flow operation operator: {}", op.operator());
469                     }
470                     return null;
471                 }
472         ).filter(op -> op != null).collect(Collectors.toSet());
473     }
474
475     @Override
476     public void deleteFlowRule(FlowRule rule) {
477         storeBatch(
478                 new FlowRuleBatchOperation(
479                         Collections.singletonList(
480                                 new FlowRuleBatchEntry(
481                                         FlowRuleOperation.REMOVE,
482                                         rule)), rule.deviceId(), idGenerator.getNewId()));
483     }
484
485     @Override
486     public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
487         NodeId master = mastershipService.getMasterFor(rule.deviceId());
488         if (Objects.equal(local, master)) {
489             return addOrUpdateFlowRuleInternal(rule);
490         }
491
492         log.warn("Tried to update FlowRule {} state,"
493                          + " while the Node was not the master.", rule);
494         return null;
495     }
496
497     private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
498         // check if this new rule is an update to an existing entry
499         StoredFlowEntry stored = flowTable.getFlowEntry(rule);
500         if (stored != null) {
501             stored.setBytes(rule.bytes());
502             stored.setLife(rule.life());
503             stored.setPackets(rule.packets());
504             if (stored.state() == FlowEntryState.PENDING_ADD) {
505                 stored.setState(FlowEntryState.ADDED);
506                 return new FlowRuleEvent(Type.RULE_ADDED, rule);
507             }
508             return new FlowRuleEvent(Type.RULE_UPDATED, rule);
509         }
510
511         // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
512         // TODO: also update backup if the behavior is correct.
513         flowTable.add(rule);
514         return null;
515     }
516
517     @Override
518     public FlowRuleEvent removeFlowRule(FlowEntry rule) {
519         final DeviceId deviceId = rule.deviceId();
520         NodeId master = mastershipService.getMasterFor(deviceId);
521
522         if (Objects.equal(local, master)) {
523             // bypass and handle it locally
524             return removeFlowRuleInternal(rule);
525         }
526
527         if (master == null) {
528             log.warn("Failed to removeFlowRule: No master for {}", deviceId);
529             // TODO: revisit if this should be null (="no-op") or Exception
530             return null;
531         }
532
533         log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
534                   master, deviceId);
535
536         return Futures.get(clusterCommunicator.sendAndReceive(
537                                rule,
538                                REMOVE_FLOW_ENTRY,
539                                SERIALIZER::encode,
540                                SERIALIZER::decode,
541                                master),
542                            FLOW_RULE_STORE_TIMEOUT_MILLIS,
543                            TimeUnit.MILLISECONDS,
544                            RuntimeException.class);
545     }
546
547     private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
548         final DeviceId deviceId = rule.deviceId();
549         // This is where one could mark a rule as removed and still keep it in the store.
550         final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
551         return removed ? new FlowRuleEvent(RULE_REMOVED, rule) : null;
552     }
553
554     @Override
555     public void batchOperationComplete(FlowRuleBatchEvent event) {
556         //FIXME: need a per device pending response
557         NodeId nodeId = pendingResponses.remove(event.subject().batchId());
558         if (nodeId == null) {
559             notifyDelegate(event);
560         } else {
561             // TODO check unicast return value
562             clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
563             //error log: log.warn("Failed to respond to peer for batch operation result");
564         }
565     }
566
567     private final class OnStoreBatch implements ClusterMessageHandler {
568
569         @Override
570         public void handle(final ClusterMessage message) {
571             FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
572             log.debug("received batch request {}", operation);
573
574             final DeviceId deviceId = operation.deviceId();
575             NodeId master = mastershipService.getMasterFor(deviceId);
576             if (!Objects.equal(local, master)) {
577                 Set<FlowRule> failures = new HashSet<>(operation.size());
578                 for (FlowRuleBatchEntry op : operation.getOperations()) {
579                     failures.add(op.target());
580                 }
581                 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
582                 // This node is no longer the master, respond as all failed.
583                 // TODO: we might want to wrap response in envelope
584                 // to distinguish sw programming failure and hand over
585                 // it make sense in the latter case to retry immediately.
586                 message.respond(SERIALIZER.encode(allFailed));
587                 return;
588             }
589
590             pendingResponses.put(operation.id(), message.sender());
591             storeBatchInternal(operation);
592         }
593     }
594
595     private class InternalFlowTable implements ReplicaInfoEventListener {
596
597         private final Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>
598                 flowEntries = Maps.newConcurrentMap();
599
600         private final Map<DeviceId, Long> lastBackupTimes = Maps.newConcurrentMap();
601         private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
602         private final Map<DeviceId, NodeId> lastBackupNodes = Maps.newConcurrentMap();
603
604         @Override
605         public void event(ReplicaInfoEvent event) {
606             if (!backupEnabled) {
607                 return;
608             }
609             if (event.type() == ReplicaInfoEvent.Type.BACKUPS_CHANGED) {
610                 DeviceId deviceId = event.subject();
611                 NodeId master = mastershipService.getMasterFor(deviceId);
612                 if (!Objects.equal(local, master)) {
613                  // ignore since this event is for a device this node does not manage.
614                     return;
615                 }
616                 NodeId newBackupNode = getBackupNode(deviceId);
617                 NodeId currentBackupNode = lastBackupNodes.get(deviceId);
618                 if (Objects.equal(newBackupNode, currentBackupNode)) {
619                     // ignore since backup location hasn't changed.
620                     return;
621                 }
622                 if (currentBackupNode != null && newBackupNode == null) {
623                     // Current backup node is most likely down and no alternate backup node
624                     // has been chosen. Clear current backup location so that we can resume
625                     // backups when either current backup comes online or a different backup node
626                     // is chosen.
627                     log.warn("Lost backup location {} for deviceId {} and no alternate backup node exists. "
628                             + "Flows can be lost if the master goes down", currentBackupNode, deviceId);
629                     lastBackupNodes.remove(deviceId);
630                     lastBackupTimes.remove(deviceId);
631                     return;
632                     // TODO: Pick any available node as backup and ensure hand-off occurs when
633                     // a new master is elected.
634                 }
635                 log.debug("Backup location for {} has changed from {} to {}.",
636                         deviceId, currentBackupNode, newBackupNode);
637                 backupSenderExecutor.schedule(() -> backupFlowEntries(newBackupNode, Sets.newHashSet(deviceId)),
638                         0,
639                         TimeUnit.SECONDS);
640             }
641         }
642
643         private void sendBackups(NodeId nodeId, Set<DeviceId> deviceIds) {
644             // split up the devices into smaller batches and send them separately.
645             Iterables.partition(deviceIds, FLOW_TABLE_BACKUP_BATCH_SIZE)
646                      .forEach(ids -> backupFlowEntries(nodeId, Sets.newHashSet(ids)));
647         }
648
649         private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
650             if (deviceIds.isEmpty()) {
651                 return;
652             }
653             log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
654             Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
655                     Maps.newConcurrentMap();
656             deviceIds.forEach(id -> deviceFlowEntries.put(id, ImmutableMap.copyOf(getFlowTable(id))));
657             clusterCommunicator.<Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>, Set<DeviceId>>sendAndReceive(
658                                         deviceFlowEntries,
659                                         FLOW_TABLE_BACKUP,
660                                         SERIALIZER::encode,
661                                         SERIALIZER::decode,
662                                         nodeId)
663                                .whenComplete((backedupDevices, error) -> {
664                                    Set<DeviceId> devicesNotBackedup = error != null ?
665                                            deviceFlowEntries.keySet() :
666                                            Sets.difference(deviceFlowEntries.keySet(), backedupDevices);
667                                    if (devicesNotBackedup.size() > 0) {
668                                        log.warn("Failed to backup devices: {}. Reason: {}",
669                                                devicesNotBackedup, error.getMessage());
670                                    }
671                                    if (backedupDevices != null) {
672                                        backedupDevices.forEach(id -> {
673                                            lastBackupTimes.put(id, System.currentTimeMillis());
674                                            lastBackupNodes.put(id, nodeId);
675                                        });
676                                    }
677                                });
678         }
679
680         /**
681          * Returns the flow table for specified device.
682          *
683          * @param deviceId identifier of the device
684          * @return Map representing Flow Table of given device.
685          */
686         private Map<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
687             return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
688         }
689
690         private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
691             return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
692         }
693
694         private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
695             Set<StoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.deviceId(), rule.id());
696             return flowEntries.stream()
697                               .filter(entry -> Objects.equal(entry, rule))
698                               .findAny()
699                               .orElse(null);
700         }
701
702         private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
703             Set<FlowEntry> result = Sets.newHashSet();
704             getFlowTable(deviceId).values().forEach(result::addAll);
705             return result;
706         }
707
708         public StoredFlowEntry getFlowEntry(FlowRule rule) {
709             return getFlowEntryInternal(rule);
710         }
711
712         public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
713             return getFlowEntriesInternal(deviceId);
714         }
715
716         public void add(FlowEntry rule) {
717             getFlowEntriesInternal(rule.deviceId(), rule.id()).add((StoredFlowEntry) rule);
718             lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
719         }
720
721         public boolean remove(DeviceId deviceId, FlowEntry rule) {
722             try {
723                 return getFlowEntriesInternal(deviceId, rule.id()).remove(rule);
724             } finally {
725                 lastUpdateTimes.put(deviceId, System.currentTimeMillis());
726             }
727         }
728
729         private NodeId getBackupNode(DeviceId deviceId) {
730             List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
731             // pick the standby which is most likely to become next master
732             return deviceStandbys.isEmpty() ? null : deviceStandbys.get(0);
733         }
734
735         private void backup() {
736             if (!backupEnabled) {
737                 return;
738             }
739             try {
740                 // determine the set of devices that we need to backup during this run.
741                 Set<DeviceId> devicesToBackup = mastershipService.getDevicesOf(local)
742                             .stream()
743                             .filter(deviceId -> {
744                                 Long lastBackupTime = lastBackupTimes.get(deviceId);
745                                 Long lastUpdateTime = lastUpdateTimes.get(deviceId);
746                                 NodeId lastBackupNode = lastBackupNodes.get(deviceId);
747                                 NodeId newBackupNode = getBackupNode(deviceId);
748                                 return lastBackupTime == null
749                                         ||  !Objects.equal(lastBackupNode, newBackupNode)
750                                         || (lastUpdateTime != null && lastUpdateTime > lastBackupTime);
751                             })
752                             .collect(Collectors.toSet());
753
754                 // compute a mapping from node to the set of devices whose flow entries it should backup
755                 Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
756                 devicesToBackup.forEach(deviceId -> {
757                     NodeId backupLocation = getBackupNode(deviceId);
758                     if (backupLocation != null) {
759                         devicesToBackupByNode.computeIfAbsent(backupLocation, nodeId -> Sets.newHashSet())
760                                              .add(deviceId);
761                     }
762                 });
763                 // send the device flow entries to their respective backup nodes
764                 devicesToBackupByNode.forEach(this::sendBackups);
765             } catch (Exception e) {
766                 log.error("Backup failed.", e);
767             }
768         }
769
770         private Set<DeviceId> onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
771             log.debug("Received flowEntries for {} to backup", flowTables.keySet());
772             Set<DeviceId> backedupDevices = Sets.newHashSet();
773             try {
774                 flowTables.forEach((deviceId, deviceFlowTable) -> {
775                     // Only process those devices are that not managed by the local node.
776                     if (!Objects.equal(local, mastershipService.getMasterFor(deviceId))) {
777                         Map<FlowId, Set<StoredFlowEntry>> backupFlowTable = getFlowTable(deviceId);
778                         backupFlowTable.clear();
779                         backupFlowTable.putAll(deviceFlowTable);
780                         backedupDevices.add(deviceId);
781                     }
782                 });
783             } catch (Exception e) {
784                 log.warn("Failure processing backup request", e);
785             }
786             return backedupDevices;
787         }
788     }
789 }