8cd63e7dd8811152e12a92c2cd42a0a51d805719
[onosfw.git] /
1  /*
2  * Copyright 2014-2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.store.flow.impl;
17
18 import com.google.common.base.Objects;
19 import com.google.common.collect.ImmutableList;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.collect.Iterables;
22 import com.google.common.collect.Maps;
23 import com.google.common.collect.Sets;
24 import com.google.common.util.concurrent.Futures;
25
26 import org.apache.felix.scr.annotations.Activate;
27 import org.apache.felix.scr.annotations.Component;
28 import org.apache.felix.scr.annotations.Deactivate;
29 import org.apache.felix.scr.annotations.Modified;
30 import org.apache.felix.scr.annotations.Property;
31 import org.apache.felix.scr.annotations.Reference;
32 import org.apache.felix.scr.annotations.ReferenceCardinality;
33 import org.apache.felix.scr.annotations.Service;
34 import org.onlab.util.KryoNamespace;
35 import org.onlab.util.Tools;
36 import org.onosproject.cfg.ComponentConfigService;
37 import org.onosproject.cluster.ClusterService;
38 import org.onosproject.cluster.NodeId;
39 import org.onosproject.core.CoreService;
40 import org.onosproject.core.IdGenerator;
41 import org.onosproject.mastership.MastershipService;
42 import org.onosproject.net.DeviceId;
43 import org.onosproject.net.device.DeviceService;
44 import org.onosproject.net.flow.CompletedBatchOperation;
45 import org.onosproject.net.flow.DefaultFlowEntry;
46 import org.onosproject.net.flow.FlowEntry;
47 import org.onosproject.net.flow.FlowEntry.FlowEntryState;
48 import org.onosproject.net.flow.FlowId;
49 import org.onosproject.net.flow.FlowRule;
50 import org.onosproject.net.flow.FlowRuleBatchEntry;
51 import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
52 import org.onosproject.net.flow.FlowRuleBatchEvent;
53 import org.onosproject.net.flow.FlowRuleBatchOperation;
54 import org.onosproject.net.flow.FlowRuleBatchRequest;
55 import org.onosproject.net.flow.FlowRuleEvent;
56 import org.onosproject.net.flow.FlowRuleEvent.Type;
57 import org.onosproject.net.flow.FlowRuleService;
58 import org.onosproject.net.flow.FlowRuleStore;
59 import org.onosproject.net.flow.FlowRuleStoreDelegate;
60 import org.onosproject.net.flow.StoredFlowEntry;
61 import org.onosproject.net.flow.TableStatisticsEntry;
62 import org.onosproject.store.AbstractStore;
63 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
64 import org.onosproject.store.cluster.messaging.ClusterMessage;
65 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
66 import org.onosproject.store.flow.ReplicaInfoEvent;
67 import org.onosproject.store.flow.ReplicaInfoEventListener;
68 import org.onosproject.store.flow.ReplicaInfoService;
69 import org.onosproject.store.impl.MastershipBasedTimestamp;
70 import org.onosproject.store.serializers.KryoNamespaces;
71 import org.onosproject.store.serializers.KryoSerializer;
72 import org.onosproject.store.serializers.StoreSerializer;
73 import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
74 import org.onosproject.store.service.EventuallyConsistentMap;
75 import org.onosproject.store.service.EventuallyConsistentMapEvent;
76 import org.onosproject.store.service.EventuallyConsistentMapListener;
77 import org.onosproject.store.service.StorageService;
78 import org.onosproject.store.service.WallClockTimestamp;
79 import org.osgi.service.component.ComponentContext;
80 import org.slf4j.Logger;
81
82 import java.util.Collections;
83 import java.util.Dictionary;
84 import java.util.HashSet;
85 import java.util.List;
86 import java.util.Map;
87 import java.util.Set;
88 import java.util.concurrent.ExecutorService;
89 import java.util.concurrent.Executors;
90 import java.util.concurrent.ScheduledExecutorService;
91 import java.util.concurrent.ScheduledFuture;
92 import java.util.concurrent.TimeUnit;
93 import java.util.concurrent.atomic.AtomicInteger;
94 import java.util.stream.Collectors;
95
96 import static com.google.common.base.Strings.isNullOrEmpty;
97 import static org.onlab.util.Tools.get;
98 import static org.onlab.util.Tools.groupedThreads;
99 import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
100 import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
101 import static org.slf4j.LoggerFactory.getLogger;
102
103 /**
104  * Manages inventory of flow rules using a distributed state management protocol.
105  */
106 @Component(immediate = true, enabled = true)
107 @Service
108 public class NewDistributedFlowRuleStore
109         extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
110         implements FlowRuleStore {
111
112     private final Logger log = getLogger(getClass());
113
114     private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
115     private static final boolean DEFAULT_BACKUP_ENABLED = true;
116     private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
117     private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
118     // number of devices whose flow entries will be backed up in one communication round
119     private static final int FLOW_TABLE_BACKUP_BATCH_SIZE = 1;
120
121     @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
122             label = "Number of threads in the message handler pool")
123     private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
124
125     @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
126             label = "Indicates whether backups are enabled or not")
127     private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
128
129     @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
130             label = "Delay in ms between successive backup runs")
131     private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
132
133     private InternalFlowTable flowTable = new InternalFlowTable();
134
135     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
136     protected ReplicaInfoService replicaInfoManager;
137
138     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
139     protected ClusterCommunicationService clusterCommunicator;
140
141     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
142     protected ClusterService clusterService;
143
144     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
145     protected DeviceService deviceService;
146
147     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
148     protected CoreService coreService;
149
150     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
151     protected ComponentConfigService configService;
152
153     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
154     protected MastershipService mastershipService;
155
156     private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
157     private ExecutorService messageHandlingExecutor;
158
159     private ScheduledFuture<?> backupTask;
160     private final ScheduledExecutorService backupSenderExecutor =
161             Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
162
163     private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
164     private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
165             new InternalTableStatsListener();
166
167     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
168     protected StorageService storageService;
169
170     protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
171         @Override
172         protected void setupKryoPool() {
173             serializerPool = KryoNamespace.newBuilder()
174                     .register(DistributedStoreSerializers.STORE_COMMON)
175                     .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
176                     .build();
177         }
178     };
179
180     protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
181             .register(KryoNamespaces.API)
182             .register(MastershipBasedTimestamp.class);
183
184
185     private IdGenerator idGenerator;
186     private NodeId local;
187
188     @Activate
189     public void activate(ComponentContext context) {
190         configService.registerProperties(getClass());
191
192         idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
193
194         local = clusterService.getLocalNode().id();
195
196         messageHandlingExecutor = Executors.newFixedThreadPool(
197                 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
198
199         registerMessageHandlers(messageHandlingExecutor);
200
201         if (backupEnabled) {
202             replicaInfoManager.addListener(flowTable);
203             backupTask = backupSenderExecutor.scheduleWithFixedDelay(
204                     flowTable::backup,
205                     0,
206                     backupPeriod,
207                     TimeUnit.MILLISECONDS);
208         }
209
210         deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
211                 .withName("onos-flow-table-stats")
212                 .withSerializer(SERIALIZER_BUILDER)
213                 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
214                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
215                 .withTombstonesDisabled()
216                 .build();
217         deviceTableStats.addListener(tableStatsListener);
218
219         logConfig("Started");
220     }
221
222     @Deactivate
223     public void deactivate(ComponentContext context) {
224         if (backupEnabled) {
225             replicaInfoManager.removeListener(flowTable);
226             backupTask.cancel(true);
227         }
228         configService.unregisterProperties(getClass(), false);
229         unregisterMessageHandlers();
230         deviceTableStats.removeListener(tableStatsListener);
231         deviceTableStats.destroy();
232         messageHandlingExecutor.shutdownNow();
233         backupSenderExecutor.shutdownNow();
234         log.info("Stopped");
235     }
236
237     @SuppressWarnings("rawtypes")
238     @Modified
239     public void modified(ComponentContext context) {
240         if (context == null) {
241             backupEnabled = DEFAULT_BACKUP_ENABLED;
242             logConfig("Default config");
243             return;
244         }
245
246         Dictionary properties = context.getProperties();
247         int newPoolSize;
248         boolean newBackupEnabled;
249         int newBackupPeriod;
250         try {
251             String s = get(properties, "msgHandlerPoolSize");
252             newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
253
254             s = get(properties, "backupEnabled");
255             newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
256
257             s = get(properties, "backupPeriod");
258             newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
259
260         } catch (NumberFormatException | ClassCastException e) {
261             newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
262             newBackupEnabled = DEFAULT_BACKUP_ENABLED;
263             newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
264         }
265
266         boolean restartBackupTask = false;
267         if (newBackupEnabled != backupEnabled) {
268             backupEnabled = newBackupEnabled;
269             if (!backupEnabled) {
270                 replicaInfoManager.removeListener(flowTable);
271                 if (backupTask != null) {
272                     backupTask.cancel(false);
273                     backupTask = null;
274                 }
275             } else {
276                 replicaInfoManager.addListener(flowTable);
277             }
278             restartBackupTask = backupEnabled;
279         }
280         if (newBackupPeriod != backupPeriod) {
281             backupPeriod = newBackupPeriod;
282             restartBackupTask = backupEnabled;
283         }
284         if (restartBackupTask) {
285             if (backupTask != null) {
286                 // cancel previously running task
287                 backupTask.cancel(false);
288             }
289             backupTask = backupSenderExecutor.scheduleWithFixedDelay(
290                     flowTable::backup,
291                     0,
292                     backupPeriod,
293                     TimeUnit.MILLISECONDS);
294         }
295         if (newPoolSize != msgHandlerPoolSize) {
296             msgHandlerPoolSize = newPoolSize;
297             ExecutorService oldMsgHandler = messageHandlingExecutor;
298             messageHandlingExecutor = Executors.newFixedThreadPool(
299                     msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
300
301             // replace previously registered handlers.
302             registerMessageHandlers(messageHandlingExecutor);
303             oldMsgHandler.shutdown();
304         }
305         logConfig("Reconfigured");
306     }
307
308     private void registerMessageHandlers(ExecutorService executor) {
309
310         clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
311         clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
312                 REMOTE_APPLY_COMPLETED, SERIALIZER::decode, this::notifyDelegate, executor);
313         clusterCommunicator.addSubscriber(
314                 GET_FLOW_ENTRY, SERIALIZER::decode, flowTable::getFlowEntry, SERIALIZER::encode, executor);
315         clusterCommunicator.addSubscriber(
316                 GET_DEVICE_FLOW_ENTRIES, SERIALIZER::decode, flowTable::getFlowEntries, SERIALIZER::encode, executor);
317         clusterCommunicator.addSubscriber(
318                 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
319         clusterCommunicator.addSubscriber(
320                 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
321         clusterCommunicator.addSubscriber(
322                 FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, SERIALIZER::encode, executor);
323     }
324
325     private void unregisterMessageHandlers() {
326         clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
327         clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
328         clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
329         clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
330         clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
331         clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
332     }
333
334     private void logConfig(String prefix) {
335         log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}",
336                  prefix, msgHandlerPoolSize, backupEnabled, backupPeriod);
337     }
338
339     // This is not a efficient operation on a distributed sharded
340     // flow store. We need to revisit the need for this operation or at least
341     // make it device specific.
342     @Override
343     public int getFlowRuleCount() {
344         AtomicInteger sum = new AtomicInteger(0);
345         deviceService.getDevices().forEach(device -> sum.addAndGet(Iterables.size(getFlowEntries(device.id()))));
346         return sum.get();
347     }
348
349     @Override
350     public FlowEntry getFlowEntry(FlowRule rule) {
351         NodeId master = mastershipService.getMasterFor(rule.deviceId());
352
353         if (master == null) {
354             log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
355             return null;
356         }
357
358         if (Objects.equal(local, master)) {
359             return flowTable.getFlowEntry(rule);
360         }
361
362         log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
363                   master, rule.deviceId());
364
365         return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
366                                     FlowStoreMessageSubjects.GET_FLOW_ENTRY,
367                                     SERIALIZER::encode,
368                                     SERIALIZER::decode,
369                                     master),
370                                FLOW_RULE_STORE_TIMEOUT_MILLIS,
371                                TimeUnit.MILLISECONDS,
372                                null);
373     }
374
375     @Override
376     public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
377         NodeId master = mastershipService.getMasterFor(deviceId);
378
379         if (master == null) {
380             log.debug("Failed to getFlowEntries: No master for {}", deviceId);
381             return Collections.emptyList();
382         }
383
384         if (Objects.equal(local, master)) {
385             return flowTable.getFlowEntries(deviceId);
386         }
387
388         log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
389                   master, deviceId);
390
391         return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
392                                     FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
393                                     SERIALIZER::encode,
394                                     SERIALIZER::decode,
395                                     master),
396                                FLOW_RULE_STORE_TIMEOUT_MILLIS,
397                                TimeUnit.MILLISECONDS,
398                                Collections.emptyList());
399     }
400
401     @Override
402     public void storeFlowRule(FlowRule rule) {
403         storeBatch(new FlowRuleBatchOperation(
404                 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
405                 rule.deviceId(), idGenerator.getNewId()));
406     }
407
408     @Override
409     public void storeBatch(FlowRuleBatchOperation operation) {
410         if (operation.getOperations().isEmpty()) {
411             notifyDelegate(FlowRuleBatchEvent.completed(
412                     new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
413                     new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
414             return;
415         }
416
417         DeviceId deviceId = operation.deviceId();
418         NodeId master = mastershipService.getMasterFor(deviceId);
419
420         if (master == null) {
421             log.warn("No master for {} : flows will be marked for removal", deviceId);
422
423             updateStoreInternal(operation);
424
425             notifyDelegate(FlowRuleBatchEvent.completed(
426                     new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
427                     new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
428             return;
429         }
430
431         if (Objects.equal(local, master)) {
432             storeBatchInternal(operation);
433             return;
434         }
435
436         log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
437                   master, deviceId);
438
439         clusterCommunicator.unicast(operation,
440                                     APPLY_BATCH_FLOWS,
441                                     SERIALIZER::encode,
442                                     master)
443                            .whenComplete((result, error) -> {
444                                if (error != null) {
445                                    log.warn("Failed to storeBatch: {} to {}", operation, master, error);
446
447                                    Set<FlowRule> allFailures = operation.getOperations()
448                                            .stream()
449                                            .map(op -> op.target())
450                                            .collect(Collectors.toSet());
451
452                                    notifyDelegate(FlowRuleBatchEvent.completed(
453                                            new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
454                                            new CompletedBatchOperation(false, allFailures, deviceId)));
455                                }
456                            });
457     }
458
459     private void storeBatchInternal(FlowRuleBatchOperation operation) {
460
461         final DeviceId did = operation.deviceId();
462         //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
463         Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
464         if (currentOps.isEmpty()) {
465             batchOperationComplete(FlowRuleBatchEvent.completed(
466                     new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
467                     new CompletedBatchOperation(true, Collections.emptySet(), did)));
468             return;
469         }
470
471         notifyDelegate(FlowRuleBatchEvent.requested(new
472                            FlowRuleBatchRequest(operation.id(),
473                                                 currentOps), operation.deviceId()));
474     }
475
476     private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
477         return operation.getOperations().stream().map(
478                 op -> {
479                     StoredFlowEntry entry;
480                     switch (op.operator()) {
481                         case ADD:
482                             entry = new DefaultFlowEntry(op.target());
483                             // always add requested FlowRule
484                             // Note: 2 equal FlowEntry may have different treatment
485                             flowTable.remove(entry.deviceId(), entry);
486                             flowTable.add(entry);
487
488                             return op;
489                         case REMOVE:
490                             entry = flowTable.getFlowEntry(op.target());
491                             if (entry != null) {
492                                 entry.setState(FlowEntryState.PENDING_REMOVE);
493                                 return op;
494                             }
495                             break;
496                         case MODIFY:
497                             //TODO: figure this out at some point
498                             break;
499                         default:
500                             log.warn("Unknown flow operation operator: {}", op.operator());
501                     }
502                     return null;
503                 }
504         ).filter(op -> op != null).collect(Collectors.toSet());
505     }
506
507     @Override
508     public void deleteFlowRule(FlowRule rule) {
509         storeBatch(
510                 new FlowRuleBatchOperation(
511                         Collections.singletonList(
512                                 new FlowRuleBatchEntry(
513                                         FlowRuleOperation.REMOVE,
514                                         rule)), rule.deviceId(), idGenerator.getNewId()));
515     }
516
517     @Override
518     public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
519         NodeId master = mastershipService.getMasterFor(rule.deviceId());
520         if (Objects.equal(local, master)) {
521             return addOrUpdateFlowRuleInternal(rule);
522         }
523
524         log.warn("Tried to update FlowRule {} state,"
525                          + " while the Node was not the master.", rule);
526         return null;
527     }
528
529     private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
530         // check if this new rule is an update to an existing entry
531         StoredFlowEntry stored = flowTable.getFlowEntry(rule);
532         if (stored != null) {
533             stored.setBytes(rule.bytes());
534             stored.setLife(rule.life());
535             stored.setPackets(rule.packets());
536             if (stored.state() == FlowEntryState.PENDING_ADD) {
537                 stored.setState(FlowEntryState.ADDED);
538                 return new FlowRuleEvent(Type.RULE_ADDED, rule);
539             }
540             return new FlowRuleEvent(Type.RULE_UPDATED, rule);
541         }
542
543         // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
544         // TODO: also update backup if the behavior is correct.
545         flowTable.add(rule);
546         return null;
547     }
548
549     @Override
550     public FlowRuleEvent removeFlowRule(FlowEntry rule) {
551         final DeviceId deviceId = rule.deviceId();
552         NodeId master = mastershipService.getMasterFor(deviceId);
553
554         if (Objects.equal(local, master)) {
555             // bypass and handle it locally
556             return removeFlowRuleInternal(rule);
557         }
558
559         if (master == null) {
560             log.warn("Failed to removeFlowRule: No master for {}", deviceId);
561             // TODO: revisit if this should be null (="no-op") or Exception
562             return null;
563         }
564
565         log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
566                   master, deviceId);
567
568         return Futures.get(clusterCommunicator.sendAndReceive(
569                                rule,
570                                REMOVE_FLOW_ENTRY,
571                                SERIALIZER::encode,
572                                SERIALIZER::decode,
573                                master),
574                            FLOW_RULE_STORE_TIMEOUT_MILLIS,
575                            TimeUnit.MILLISECONDS,
576                            RuntimeException.class);
577     }
578
579     private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
580         final DeviceId deviceId = rule.deviceId();
581         // This is where one could mark a rule as removed and still keep it in the store.
582         final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
583         return removed ? new FlowRuleEvent(RULE_REMOVED, rule) : null;
584     }
585
586     @Override
587     public void batchOperationComplete(FlowRuleBatchEvent event) {
588         //FIXME: need a per device pending response
589         NodeId nodeId = pendingResponses.remove(event.subject().batchId());
590         if (nodeId == null) {
591             notifyDelegate(event);
592         } else {
593             // TODO check unicast return value
594             clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
595             //error log: log.warn("Failed to respond to peer for batch operation result");
596         }
597     }
598
599     private final class OnStoreBatch implements ClusterMessageHandler {
600
601         @Override
602         public void handle(final ClusterMessage message) {
603             FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
604             log.debug("received batch request {}", operation);
605
606             final DeviceId deviceId = operation.deviceId();
607             NodeId master = mastershipService.getMasterFor(deviceId);
608             if (!Objects.equal(local, master)) {
609                 Set<FlowRule> failures = new HashSet<>(operation.size());
610                 for (FlowRuleBatchEntry op : operation.getOperations()) {
611                     failures.add(op.target());
612                 }
613                 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
614                 // This node is no longer the master, respond as all failed.
615                 // TODO: we might want to wrap response in envelope
616                 // to distinguish sw programming failure and hand over
617                 // it make sense in the latter case to retry immediately.
618                 message.respond(SERIALIZER.encode(allFailed));
619                 return;
620             }
621
622             pendingResponses.put(operation.id(), message.sender());
623             storeBatchInternal(operation);
624         }
625     }
626
627     private class InternalFlowTable implements ReplicaInfoEventListener {
628
629         private final Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>
630                 flowEntries = Maps.newConcurrentMap();
631
632         private final Map<DeviceId, Long> lastBackupTimes = Maps.newConcurrentMap();
633         private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
634         private final Map<DeviceId, NodeId> lastBackupNodes = Maps.newConcurrentMap();
635
636         @Override
637         public void event(ReplicaInfoEvent event) {
638             if (!backupEnabled) {
639                 return;
640             }
641             if (event.type() == ReplicaInfoEvent.Type.BACKUPS_CHANGED) {
642                 DeviceId deviceId = event.subject();
643                 NodeId master = mastershipService.getMasterFor(deviceId);
644                 if (!Objects.equal(local, master)) {
645                  // ignore since this event is for a device this node does not manage.
646                     return;
647                 }
648                 NodeId newBackupNode = getBackupNode(deviceId);
649                 NodeId currentBackupNode = lastBackupNodes.get(deviceId);
650                 if (Objects.equal(newBackupNode, currentBackupNode)) {
651                     // ignore since backup location hasn't changed.
652                     return;
653                 }
654                 if (currentBackupNode != null && newBackupNode == null) {
655                     // Current backup node is most likely down and no alternate backup node
656                     // has been chosen. Clear current backup location so that we can resume
657                     // backups when either current backup comes online or a different backup node
658                     // is chosen.
659                     log.warn("Lost backup location {} for deviceId {} and no alternate backup node exists. "
660                             + "Flows can be lost if the master goes down", currentBackupNode, deviceId);
661                     lastBackupNodes.remove(deviceId);
662                     lastBackupTimes.remove(deviceId);
663                     return;
664                     // TODO: Pick any available node as backup and ensure hand-off occurs when
665                     // a new master is elected.
666                 }
667                 log.debug("Backup location for {} has changed from {} to {}.",
668                         deviceId, currentBackupNode, newBackupNode);
669                 backupSenderExecutor.schedule(() -> backupFlowEntries(newBackupNode, Sets.newHashSet(deviceId)),
670                         0,
671                         TimeUnit.SECONDS);
672             }
673         }
674
675         private void sendBackups(NodeId nodeId, Set<DeviceId> deviceIds) {
676             // split up the devices into smaller batches and send them separately.
677             Iterables.partition(deviceIds, FLOW_TABLE_BACKUP_BATCH_SIZE)
678                      .forEach(ids -> backupFlowEntries(nodeId, Sets.newHashSet(ids)));
679         }
680
681         private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
682             if (deviceIds.isEmpty()) {
683                 return;
684             }
685             log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
686             Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
687                     Maps.newConcurrentMap();
688             deviceIds.forEach(id -> deviceFlowEntries.put(id, ImmutableMap.copyOf(getFlowTable(id))));
689             clusterCommunicator.<Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>, Set<DeviceId>>sendAndReceive(
690                                         deviceFlowEntries,
691                                         FLOW_TABLE_BACKUP,
692                                         SERIALIZER::encode,
693                                         SERIALIZER::decode,
694                                         nodeId)
695                                .whenComplete((backedupDevices, error) -> {
696                                    Set<DeviceId> devicesNotBackedup = error != null ?
697                                            deviceFlowEntries.keySet() :
698                                            Sets.difference(deviceFlowEntries.keySet(), backedupDevices);
699                                    if (devicesNotBackedup.size() > 0) {
700                                        log.warn("Failed to backup devices: {}. Reason: {}",
701                                                devicesNotBackedup, error.getMessage());
702                                    }
703                                    if (backedupDevices != null) {
704                                        backedupDevices.forEach(id -> {
705                                            lastBackupTimes.put(id, System.currentTimeMillis());
706                                            lastBackupNodes.put(id, nodeId);
707                                        });
708                                    }
709                                });
710         }
711
712         /**
713          * Returns the flow table for specified device.
714          *
715          * @param deviceId identifier of the device
716          * @return Map representing Flow Table of given device.
717          */
718         private Map<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
719             return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
720         }
721
722         private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
723             return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
724         }
725
726         private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
727             Set<StoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.deviceId(), rule.id());
728             return flowEntries.stream()
729                               .filter(entry -> Objects.equal(entry, rule))
730                               .findAny()
731                               .orElse(null);
732         }
733
734         private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
735             Set<FlowEntry> result = Sets.newHashSet();
736             getFlowTable(deviceId).values().forEach(result::addAll);
737             return result;
738         }
739
740         public StoredFlowEntry getFlowEntry(FlowRule rule) {
741             return getFlowEntryInternal(rule);
742         }
743
744         public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
745             return getFlowEntriesInternal(deviceId);
746         }
747
748         public void add(FlowEntry rule) {
749             getFlowEntriesInternal(rule.deviceId(), rule.id()).add((StoredFlowEntry) rule);
750             lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
751         }
752
753         public boolean remove(DeviceId deviceId, FlowEntry rule) {
754             try {
755                 return getFlowEntriesInternal(deviceId, rule.id()).remove(rule);
756             } finally {
757                 lastUpdateTimes.put(deviceId, System.currentTimeMillis());
758             }
759         }
760
761         private NodeId getBackupNode(DeviceId deviceId) {
762             List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
763             // pick the standby which is most likely to become next master
764             return deviceStandbys.isEmpty() ? null : deviceStandbys.get(0);
765         }
766
767         private void backup() {
768             if (!backupEnabled) {
769                 return;
770             }
771             try {
772                 // determine the set of devices that we need to backup during this run.
773                 Set<DeviceId> devicesToBackup = mastershipService.getDevicesOf(local)
774                             .stream()
775                             .filter(deviceId -> {
776                                 Long lastBackupTime = lastBackupTimes.get(deviceId);
777                                 Long lastUpdateTime = lastUpdateTimes.get(deviceId);
778                                 NodeId lastBackupNode = lastBackupNodes.get(deviceId);
779                                 NodeId newBackupNode = getBackupNode(deviceId);
780                                 return lastBackupTime == null
781                                         ||  !Objects.equal(lastBackupNode, newBackupNode)
782                                         || (lastUpdateTime != null && lastUpdateTime > lastBackupTime);
783                             })
784                             .collect(Collectors.toSet());
785
786                 // compute a mapping from node to the set of devices whose flow entries it should backup
787                 Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
788                 devicesToBackup.forEach(deviceId -> {
789                     NodeId backupLocation = getBackupNode(deviceId);
790                     if (backupLocation != null) {
791                         devicesToBackupByNode.computeIfAbsent(backupLocation, nodeId -> Sets.newHashSet())
792                                              .add(deviceId);
793                     }
794                 });
795                 // send the device flow entries to their respective backup nodes
796                 devicesToBackupByNode.forEach(this::sendBackups);
797             } catch (Exception e) {
798                 log.error("Backup failed.", e);
799             }
800         }
801
802         private Set<DeviceId> onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
803             log.debug("Received flowEntries for {} to backup", flowTables.keySet());
804             Set<DeviceId> backedupDevices = Sets.newHashSet();
805             try {
806                 flowTables.forEach((deviceId, deviceFlowTable) -> {
807                     // Only process those devices are that not managed by the local node.
808                     if (!Objects.equal(local, mastershipService.getMasterFor(deviceId))) {
809                         Map<FlowId, Set<StoredFlowEntry>> backupFlowTable = getFlowTable(deviceId);
810                         backupFlowTable.clear();
811                         backupFlowTable.putAll(deviceFlowTable);
812                         backedupDevices.add(deviceId);
813                     }
814                 });
815             } catch (Exception e) {
816                 log.warn("Failure processing backup request", e);
817             }
818             return backedupDevices;
819         }
820     }
821
822     @Override
823     public FlowRuleEvent updateTableStatistics(DeviceId deviceId,
824                                                List<TableStatisticsEntry> tableStats) {
825         deviceTableStats.put(deviceId, tableStats);
826         return null;
827     }
828
829     @Override
830     public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
831         NodeId master = mastershipService.getMasterFor(deviceId);
832
833         if (master == null) {
834             log.debug("Failed to getTableStats: No master for {}", deviceId);
835             return Collections.emptyList();
836         }
837
838         List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
839         if (tableStats == null) {
840             return Collections.emptyList();
841         }
842         return ImmutableList.copyOf(tableStats);
843     }
844
845     private class InternalTableStatsListener
846         implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
847         @Override
848         public void event(EventuallyConsistentMapEvent<DeviceId,
849                           List<TableStatisticsEntry>> event) {
850             //TODO: Generate an event to listeners (do we need?)
851         }
852     }
853 }