2 * Copyright 2014-2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.store.trivial;
18 import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
19 import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
20 import static org.slf4j.LoggerFactory.getLogger;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.List;
28 import java.util.Objects;
30 import java.util.concurrent.CompletableFuture;
31 import java.util.concurrent.atomic.AtomicInteger;
33 import org.apache.felix.scr.annotations.Activate;
34 import org.apache.felix.scr.annotations.Component;
35 import org.apache.felix.scr.annotations.Deactivate;
36 import org.apache.felix.scr.annotations.Reference;
37 import org.apache.felix.scr.annotations.ReferenceCardinality;
38 import org.apache.felix.scr.annotations.Service;
39 import org.joda.time.DateTime;
40 import org.onlab.packet.IpAddress;
41 import org.onosproject.cluster.ClusterEventListener;
42 import org.onosproject.cluster.ClusterService;
43 import org.onosproject.cluster.ControllerNode;
44 import org.onosproject.cluster.ControllerNode.State;
45 import org.onosproject.cluster.DefaultControllerNode;
46 import org.onosproject.cluster.NodeId;
47 import org.onosproject.cluster.RoleInfo;
48 import org.onosproject.mastership.MastershipEvent;
49 import org.onosproject.mastership.MastershipStore;
50 import org.onosproject.mastership.MastershipStoreDelegate;
51 import org.onosproject.mastership.MastershipTerm;
52 import org.onosproject.net.DeviceId;
53 import org.onosproject.net.MastershipRole;
54 import org.onosproject.store.AbstractStore;
55 import org.slf4j.Logger;
57 import com.google.common.collect.ImmutableList;
58 import com.google.common.collect.ImmutableSet;
61 * Manages inventory of controller mastership over devices using
62 * trivial, non-distributed in-memory structures implementation.
64 @Component(immediate = true)
66 public class SimpleMastershipStore
67 extends AbstractStore<MastershipEvent, MastershipStoreDelegate>
68 implements MastershipStore {
70 private final Logger log = getLogger(getClass());
72 private static final int NOTHING = 0;
73 private static final int INIT = 1;
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected ClusterService clusterService;
78 //devices mapped to their masters, to emulate multiple nodes
79 protected final Map<DeviceId, NodeId> masterMap = new HashMap<>();
80 //emulate backups with pile of nodes
81 protected final Map<DeviceId, List<NodeId>> backups = new HashMap<>();
83 protected final Map<DeviceId, AtomicInteger> termMap = new HashMap<>();
86 public void activate() {
87 if (clusterService == null) {
88 // just for ease of unit test
89 final ControllerNode instance =
90 new DefaultControllerNode(new NodeId("local"),
91 IpAddress.valueOf("127.0.0.1"));
93 clusterService = new ClusterService() {
95 private final DateTime creationTime = DateTime.now();
98 public ControllerNode getLocalNode() {
103 public Set<ControllerNode> getNodes() {
104 return ImmutableSet.of(instance);
108 public ControllerNode getNode(NodeId nodeId) {
109 if (instance.id().equals(nodeId)) {
116 public State getState(NodeId nodeId) {
117 if (instance.id().equals(nodeId)) {
120 return State.INACTIVE;
125 public DateTime getLastUpdated(NodeId nodeId) {
130 public void addListener(ClusterEventListener listener) {
134 public void removeListener(ClusterEventListener listener) {
142 public void deactivate() {
147 public synchronized CompletableFuture<MastershipEvent> setMaster(NodeId nodeId, DeviceId deviceId) {
149 MastershipRole role = getRole(nodeId, deviceId);
153 return CompletableFuture.completedFuture(null);
156 NodeId prevMaster = masterMap.put(deviceId, nodeId);
157 incrementTerm(deviceId);
158 removeFromBackups(deviceId, nodeId);
159 addToBackup(deviceId, prevMaster);
162 log.warn("unknown Mastership Role {}", role);
166 return CompletableFuture.completedFuture(
167 new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
171 public NodeId getMaster(DeviceId deviceId) {
172 return masterMap.get(deviceId);
175 // synchronized for atomic read
177 public synchronized RoleInfo getNodes(DeviceId deviceId) {
178 return new RoleInfo(masterMap.get(deviceId),
179 backups.getOrDefault(deviceId, ImmutableList.of()));
183 public Set<DeviceId> getDevices(NodeId nodeId) {
184 Set<DeviceId> ids = new HashSet<>();
185 for (Map.Entry<DeviceId, NodeId> d : masterMap.entrySet()) {
186 if (Objects.equals(d.getValue(), nodeId)) {
194 public synchronized CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
195 //query+possible reelection
196 NodeId node = clusterService.getLocalNode().id();
197 MastershipRole role = getRole(node, deviceId);
201 return CompletableFuture.completedFuture(MastershipRole.MASTER);
203 if (getMaster(deviceId) == null) {
204 // no master => become master
205 masterMap.put(deviceId, node);
206 incrementTerm(deviceId);
207 // remove from backup list
208 removeFromBackups(deviceId, node);
209 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId,
210 getNodes(deviceId)));
211 return CompletableFuture.completedFuture(MastershipRole.MASTER);
213 return CompletableFuture.completedFuture(MastershipRole.STANDBY);
215 if (getMaster(deviceId) == null) {
216 // no master => become master
217 masterMap.put(deviceId, node);
218 incrementTerm(deviceId);
219 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId,
220 getNodes(deviceId)));
221 return CompletableFuture.completedFuture(MastershipRole.MASTER);
223 // add to backup list
224 if (addToBackup(deviceId, node)) {
225 notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId,
226 getNodes(deviceId)));
228 return CompletableFuture.completedFuture(MastershipRole.STANDBY);
230 log.warn("unknown Mastership Role {}", role);
232 return CompletableFuture.completedFuture(role);
235 // add to backup if not there already, silently ignores null node
236 private synchronized boolean addToBackup(DeviceId deviceId, NodeId nodeId) {
237 boolean modified = false;
238 List<NodeId> stbys = backups.getOrDefault(deviceId, new ArrayList<>());
239 if (nodeId != null && !stbys.contains(nodeId)) {
243 backups.put(deviceId, stbys);
247 private synchronized boolean removeFromBackups(DeviceId deviceId, NodeId node) {
248 List<NodeId> stbys = backups.getOrDefault(deviceId, new ArrayList<>());
249 boolean modified = stbys.remove(node);
250 backups.put(deviceId, stbys);
254 private synchronized void incrementTerm(DeviceId deviceId) {
255 AtomicInteger term = termMap.getOrDefault(deviceId, new AtomicInteger(NOTHING));
256 term.incrementAndGet();
257 termMap.put(deviceId, term);
261 public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
263 NodeId current = masterMap.get(deviceId);
266 if (current != null && current.equals(nodeId)) {
267 return MastershipRole.MASTER;
270 if (backups.getOrDefault(deviceId, Collections.emptyList()).contains(nodeId)) {
271 role = MastershipRole.STANDBY;
273 role = MastershipRole.NONE;
278 // synchronized for atomic read
280 public synchronized MastershipTerm getTermFor(DeviceId deviceId) {
281 if ((termMap.get(deviceId) == null)) {
282 return MastershipTerm.of(masterMap.get(deviceId), NOTHING);
284 return MastershipTerm.of(
285 masterMap.get(deviceId), termMap.get(deviceId).get());
289 public synchronized CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
290 MastershipRole role = getRole(nodeId, deviceId);
293 NodeId backup = reelect(deviceId, nodeId);
294 if (backup == null) {
295 // no master alternative
296 masterMap.remove(deviceId);
297 // TODO: Should there be new event type for no MASTER?
298 return CompletableFuture.completedFuture(
299 new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
301 NodeId prevMaster = masterMap.put(deviceId, backup);
302 incrementTerm(deviceId);
303 addToBackup(deviceId, prevMaster);
304 return CompletableFuture.completedFuture(
305 new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
310 boolean modified = addToBackup(deviceId, nodeId);
312 return CompletableFuture.completedFuture(
313 new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
318 log.warn("unknown Mastership Role {}", role);
323 //dumbly selects next-available node that's not the current one
324 //emulate leader election
325 private synchronized NodeId reelect(DeviceId did, NodeId nodeId) {
326 List<NodeId> stbys = backups.getOrDefault(did, Collections.emptyList());
327 NodeId backup = null;
328 for (NodeId n : stbys) {
329 if (!n.equals(nodeId)) {
334 stbys.remove(backup);
339 public synchronized CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
340 MastershipRole role = getRole(nodeId, deviceId);
343 NodeId backup = reelect(deviceId, nodeId);
344 masterMap.put(deviceId, backup);
345 incrementTerm(deviceId);
346 return CompletableFuture.completedFuture(
347 new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
350 if (removeFromBackups(deviceId, nodeId)) {
351 return CompletableFuture.completedFuture(
352 new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
360 log.warn("unknown Mastership Role {}", role);
362 return CompletableFuture.completedFuture(null);
366 public synchronized void relinquishAllRole(NodeId nodeId) {
367 List<CompletableFuture<MastershipEvent>> eventFutures = new ArrayList<>();
368 Set<DeviceId> toRelinquish = new HashSet<>();
370 masterMap.entrySet().stream()
371 .filter(entry -> nodeId.equals(entry.getValue()))
372 .forEach(entry -> toRelinquish.add(entry.getKey()));
374 backups.entrySet().stream()
375 .filter(entry -> entry.getValue().contains(nodeId))
376 .forEach(entry -> toRelinquish.add(entry.getKey()));
378 toRelinquish.forEach(deviceId -> {
379 eventFutures.add(relinquishRole(nodeId, deviceId));
382 eventFutures.forEach(future -> {
383 future.whenComplete((event, error) -> {
384 notifyDelegate(event);