ef92ded2fae6407ceed465ffbc758cd17df2572c
[onosfw.git] /
1 /*
2  * Copyright 2014-2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.store.trivial;
17
18 import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
19 import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
20 import static org.slf4j.LoggerFactory.getLogger;
21
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Objects;
29 import java.util.Set;
30 import java.util.concurrent.CompletableFuture;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.apache.felix.scr.annotations.Activate;
34 import org.apache.felix.scr.annotations.Component;
35 import org.apache.felix.scr.annotations.Deactivate;
36 import org.apache.felix.scr.annotations.Reference;
37 import org.apache.felix.scr.annotations.ReferenceCardinality;
38 import org.apache.felix.scr.annotations.Service;
39 import org.joda.time.DateTime;
40 import org.onlab.packet.IpAddress;
41 import org.onosproject.cluster.ClusterEventListener;
42 import org.onosproject.cluster.ClusterService;
43 import org.onosproject.cluster.ControllerNode;
44 import org.onosproject.cluster.ControllerNode.State;
45 import org.onosproject.cluster.DefaultControllerNode;
46 import org.onosproject.cluster.NodeId;
47 import org.onosproject.cluster.RoleInfo;
48 import org.onosproject.mastership.MastershipEvent;
49 import org.onosproject.mastership.MastershipStore;
50 import org.onosproject.mastership.MastershipStoreDelegate;
51 import org.onosproject.mastership.MastershipTerm;
52 import org.onosproject.net.DeviceId;
53 import org.onosproject.net.MastershipRole;
54 import org.onosproject.store.AbstractStore;
55 import org.slf4j.Logger;
56
57 import com.google.common.collect.ImmutableList;
58 import com.google.common.collect.ImmutableSet;
59
60 /**
61  * Manages inventory of controller mastership over devices using
62  * trivial, non-distributed in-memory structures implementation.
63  */
64 @Component(immediate = true)
65 @Service
66 public class SimpleMastershipStore
67         extends AbstractStore<MastershipEvent, MastershipStoreDelegate>
68         implements MastershipStore {
69
70     private final Logger log = getLogger(getClass());
71
72     private static final int NOTHING = 0;
73     private static final int INIT = 1;
74
75     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76     protected ClusterService clusterService;
77
78     //devices mapped to their masters, to emulate multiple nodes
79     protected final Map<DeviceId, NodeId> masterMap = new HashMap<>();
80     //emulate backups with pile of nodes
81     protected final Map<DeviceId, List<NodeId>> backups = new HashMap<>();
82     //terms
83     protected final Map<DeviceId, AtomicInteger> termMap = new HashMap<>();
84
85     @Activate
86     public void activate() {
87         if (clusterService == null) {
88           // just for ease of unit test
89           final ControllerNode instance =
90                   new DefaultControllerNode(new NodeId("local"),
91                                             IpAddress.valueOf("127.0.0.1"));
92
93             clusterService = new ClusterService() {
94
95                 private final DateTime creationTime = DateTime.now();
96
97                 @Override
98                 public ControllerNode getLocalNode() {
99                     return instance;
100                 }
101
102                 @Override
103                 public Set<ControllerNode> getNodes() {
104                     return ImmutableSet.of(instance);
105                 }
106
107                 @Override
108                 public ControllerNode getNode(NodeId nodeId) {
109                     if (instance.id().equals(nodeId)) {
110                         return instance;
111                     }
112                     return null;
113                 }
114
115                 @Override
116                 public State getState(NodeId nodeId) {
117                     if (instance.id().equals(nodeId)) {
118                         return State.ACTIVE;
119                     } else {
120                         return State.INACTIVE;
121                     }
122                 }
123
124                 @Override
125                 public DateTime getLastUpdated(NodeId nodeId) {
126                     return creationTime;
127                 }
128
129                 @Override
130                 public void addListener(ClusterEventListener listener) {
131                 }
132
133                 @Override
134                 public void removeListener(ClusterEventListener listener) {
135                 }
136             };
137         }
138         log.info("Started");
139     }
140
141     @Deactivate
142     public void deactivate() {
143         log.info("Stopped");
144     }
145
146     @Override
147     public synchronized CompletableFuture<MastershipEvent> setMaster(NodeId nodeId, DeviceId deviceId) {
148
149         MastershipRole role = getRole(nodeId, deviceId);
150         switch (role) {
151         case MASTER:
152             // no-op
153             return CompletableFuture.completedFuture(null);
154         case STANDBY:
155         case NONE:
156             NodeId prevMaster = masterMap.put(deviceId, nodeId);
157             incrementTerm(deviceId);
158             removeFromBackups(deviceId, nodeId);
159             addToBackup(deviceId, prevMaster);
160             break;
161         default:
162             log.warn("unknown Mastership Role {}", role);
163             return null;
164         }
165
166         return CompletableFuture.completedFuture(
167                 new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
168     }
169
170     @Override
171     public NodeId getMaster(DeviceId deviceId) {
172         return masterMap.get(deviceId);
173     }
174
175     // synchronized for atomic read
176     @Override
177     public synchronized RoleInfo getNodes(DeviceId deviceId) {
178         return new RoleInfo(masterMap.get(deviceId),
179                             backups.getOrDefault(deviceId, ImmutableList.of()));
180     }
181
182     @Override
183     public Set<DeviceId> getDevices(NodeId nodeId) {
184         Set<DeviceId> ids = new HashSet<>();
185         for (Map.Entry<DeviceId, NodeId> d : masterMap.entrySet()) {
186             if (Objects.equals(d.getValue(), nodeId)) {
187                 ids.add(d.getKey());
188             }
189         }
190         return ids;
191     }
192
193     @Override
194     public synchronized CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
195         //query+possible reelection
196         NodeId node = clusterService.getLocalNode().id();
197         MastershipRole role = getRole(node, deviceId);
198
199         switch (role) {
200             case MASTER:
201                 return CompletableFuture.completedFuture(MastershipRole.MASTER);
202             case STANDBY:
203                 if (getMaster(deviceId) == null) {
204                     // no master => become master
205                     masterMap.put(deviceId, node);
206                     incrementTerm(deviceId);
207                     // remove from backup list
208                     removeFromBackups(deviceId, node);
209                     notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId,
210                                                        getNodes(deviceId)));
211                     return CompletableFuture.completedFuture(MastershipRole.MASTER);
212                 }
213                 return CompletableFuture.completedFuture(MastershipRole.STANDBY);
214             case NONE:
215                 if (getMaster(deviceId) == null) {
216                     // no master => become master
217                     masterMap.put(deviceId, node);
218                     incrementTerm(deviceId);
219                     notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId,
220                                                        getNodes(deviceId)));
221                     return CompletableFuture.completedFuture(MastershipRole.MASTER);
222                 }
223                 // add to backup list
224                 if (addToBackup(deviceId, node)) {
225                     notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId,
226                                                        getNodes(deviceId)));
227                 }
228                 return CompletableFuture.completedFuture(MastershipRole.STANDBY);
229             default:
230                 log.warn("unknown Mastership Role {}", role);
231         }
232         return CompletableFuture.completedFuture(role);
233     }
234
235     // add to backup if not there already, silently ignores null node
236     private synchronized boolean addToBackup(DeviceId deviceId, NodeId nodeId) {
237         boolean modified = false;
238         List<NodeId> stbys = backups.getOrDefault(deviceId, new ArrayList<>());
239         if (nodeId != null && !stbys.contains(nodeId)) {
240             stbys.add(nodeId);
241             modified = true;
242         }
243         backups.put(deviceId, stbys);
244         return modified;
245     }
246
247     private synchronized boolean removeFromBackups(DeviceId deviceId, NodeId node) {
248         List<NodeId> stbys = backups.getOrDefault(deviceId, new ArrayList<>());
249         boolean modified = stbys.remove(node);
250         backups.put(deviceId, stbys);
251         return modified;
252     }
253
254     private synchronized void incrementTerm(DeviceId deviceId) {
255         AtomicInteger term = termMap.getOrDefault(deviceId, new AtomicInteger(NOTHING));
256         term.incrementAndGet();
257         termMap.put(deviceId, term);
258     }
259
260     @Override
261     public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
262         //just query
263         NodeId current = masterMap.get(deviceId);
264         MastershipRole role;
265
266         if (current != null && current.equals(nodeId)) {
267             return MastershipRole.MASTER;
268         }
269
270         if (backups.getOrDefault(deviceId, Collections.emptyList()).contains(nodeId)) {
271             role = MastershipRole.STANDBY;
272         } else {
273             role = MastershipRole.NONE;
274         }
275         return role;
276     }
277
278     // synchronized for atomic read
279     @Override
280     public synchronized MastershipTerm getTermFor(DeviceId deviceId) {
281         if ((termMap.get(deviceId) == null)) {
282             return MastershipTerm.of(masterMap.get(deviceId), NOTHING);
283         }
284         return MastershipTerm.of(
285                 masterMap.get(deviceId), termMap.get(deviceId).get());
286     }
287
288     @Override
289     public synchronized CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
290         MastershipRole role = getRole(nodeId, deviceId);
291         switch (role) {
292         case MASTER:
293             NodeId backup = reelect(deviceId, nodeId);
294             if (backup == null) {
295                 // no master alternative
296                 masterMap.remove(deviceId);
297                 // TODO: Should there be new event type for no MASTER?
298                 return CompletableFuture.completedFuture(
299                         new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
300             } else {
301                 NodeId prevMaster = masterMap.put(deviceId, backup);
302                 incrementTerm(deviceId);
303                 addToBackup(deviceId, prevMaster);
304                 return CompletableFuture.completedFuture(
305                         new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
306             }
307
308         case STANDBY:
309         case NONE:
310             boolean modified = addToBackup(deviceId, nodeId);
311             if (modified) {
312                 return CompletableFuture.completedFuture(
313                         new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
314             }
315             break;
316
317         default:
318             log.warn("unknown Mastership Role {}", role);
319         }
320         return null;
321     }
322
323     //dumbly selects next-available node that's not the current one
324     //emulate leader election
325     private synchronized NodeId reelect(DeviceId did, NodeId nodeId) {
326         List<NodeId> stbys = backups.getOrDefault(did, Collections.emptyList());
327         NodeId backup = null;
328         for (NodeId n : stbys) {
329             if (!n.equals(nodeId)) {
330                 backup = n;
331                 break;
332             }
333         }
334         stbys.remove(backup);
335         return backup;
336     }
337
338     @Override
339     public synchronized CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
340         MastershipRole role = getRole(nodeId, deviceId);
341         switch (role) {
342         case MASTER:
343             NodeId backup = reelect(deviceId, nodeId);
344             masterMap.put(deviceId, backup);
345             incrementTerm(deviceId);
346             return CompletableFuture.completedFuture(
347                     new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
348
349         case STANDBY:
350             if (removeFromBackups(deviceId, nodeId)) {
351                 return CompletableFuture.completedFuture(
352                     new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
353             }
354             break;
355
356         case NONE:
357             break;
358
359         default:
360             log.warn("unknown Mastership Role {}", role);
361         }
362         return CompletableFuture.completedFuture(null);
363     }
364
365     @Override
366     public synchronized void relinquishAllRole(NodeId nodeId) {
367         List<CompletableFuture<MastershipEvent>> eventFutures = new ArrayList<>();
368         Set<DeviceId> toRelinquish = new HashSet<>();
369
370         masterMap.entrySet().stream()
371             .filter(entry -> nodeId.equals(entry.getValue()))
372             .forEach(entry -> toRelinquish.add(entry.getKey()));
373
374         backups.entrySet().stream()
375             .filter(entry -> entry.getValue().contains(nodeId))
376             .forEach(entry -> toRelinquish.add(entry.getKey()));
377
378         toRelinquish.forEach(deviceId -> {
379             eventFutures.add(relinquishRole(nodeId, deviceId));
380         });
381
382         eventFutures.forEach(future -> {
383             future.whenComplete((event, error) -> {
384                 notifyDelegate(event);
385             });
386         });
387     }
388 }