da4e3cc41c253ed6108f01f59d24d4a1778fe64a
[onosfw.git] /
1 /*
2  * Copyright 2014 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.store.topology.impl;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static org.onlab.util.Tools.isNullOrEmpty;
20 import static org.onosproject.net.topology.TopologyEvent.Type.TOPOLOGY_CHANGED;
21 import static org.slf4j.LoggerFactory.getLogger;
22
23 import java.util.Collections;
24 import java.util.Map;
25 import java.util.List;
26 import java.util.Set;
27 import java.util.stream.Collectors;
28
29 import org.apache.felix.scr.annotations.Activate;
30 import org.apache.felix.scr.annotations.Component;
31 import org.apache.felix.scr.annotations.Deactivate;
32 import org.apache.felix.scr.annotations.Reference;
33 import org.apache.felix.scr.annotations.ReferenceCardinality;
34 import org.apache.felix.scr.annotations.Service;
35 import org.onlab.util.KryoNamespace;
36 import org.onosproject.common.DefaultTopology;
37 import org.onosproject.event.Event;
38 import org.onosproject.mastership.MastershipService;
39 import org.onosproject.net.ConnectPoint;
40 import org.onosproject.net.Device;
41 import org.onosproject.net.DeviceId;
42 import org.onosproject.net.Link;
43 import org.onosproject.net.Path;
44 import org.onosproject.net.DisjointPath;
45 import org.onosproject.net.provider.ProviderId;
46 import org.onosproject.net.topology.ClusterId;
47 import org.onosproject.net.topology.DefaultGraphDescription;
48 import org.onosproject.net.topology.GraphDescription;
49 import org.onosproject.net.topology.LinkWeight;
50 import org.onosproject.net.topology.Topology;
51 import org.onosproject.net.topology.TopologyCluster;
52 import org.onosproject.net.topology.TopologyEvent;
53 import org.onosproject.net.topology.TopologyGraph;
54 import org.onosproject.net.topology.TopologyStore;
55 import org.onosproject.net.topology.TopologyStoreDelegate;
56 import org.onosproject.store.AbstractStore;
57 import org.onosproject.store.serializers.KryoNamespaces;
58 import org.onosproject.store.service.EventuallyConsistentMap;
59 import org.onosproject.store.service.EventuallyConsistentMapEvent;
60 import org.onosproject.store.service.EventuallyConsistentMapListener;
61 import org.onosproject.store.service.LogicalClockService;
62 import org.onosproject.store.service.StorageService;
63 import org.slf4j.Logger;
64
65 /**
66  * Manages inventory of topology snapshots using trivial in-memory
67  * structures implementation.
68  * <p>
69  * Note: This component is not distributed per-se. It runs on every
70  * instance and feeds off of other distributed stores.
71  */
72 @Component(immediate = true)
73 @Service
74 public class DistributedTopologyStore
75         extends AbstractStore<TopologyEvent, TopologyStoreDelegate>
76         implements TopologyStore {
77
78     private final Logger log = getLogger(getClass());
79     private volatile DefaultTopology current =
80             new DefaultTopology(ProviderId.NONE,
81                                 new DefaultGraphDescription(0L, System.currentTimeMillis(),
82                                                             Collections.<Device>emptyList(),
83                                                             Collections.<Link>emptyList()));
84
85     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
86     protected StorageService storageService;
87
88     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
89     protected LogicalClockService clockService;
90
91     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
92     protected MastershipService mastershipService;
93
94     // Cluster root to broadcast points bindings to allow convergence to
95     // a shared broadcast tree; node that is the master of the cluster root
96     // is the primary.
97     private EventuallyConsistentMap<DeviceId, Set<ConnectPoint>> broadcastPoints;
98
99     private EventuallyConsistentMapListener<DeviceId, Set<ConnectPoint>> listener =
100             new InternalBroadcastPointListener();
101
102     @Activate
103     public void activate() {
104         KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder()
105                 .register(KryoNamespaces.API);
106
107         broadcastPoints = storageService.<DeviceId, Set<ConnectPoint>>eventuallyConsistentMapBuilder()
108                 .withName("onos-broadcast-trees")
109                 .withSerializer(hostSerializer)
110                 .withTimestampProvider((k, v) -> clockService.getTimestamp())
111                 .build();
112         broadcastPoints.addListener(listener);
113         log.info("Started");
114     }
115
116     @Deactivate
117     public void deactivate() {
118         broadcastPoints.removeListener(listener);
119         broadcastPoints.destroy();
120         log.info("Stopped");
121     }
122
123     @Override
124     public Topology currentTopology() {
125         return current;
126     }
127
128     @Override
129     public boolean isLatest(Topology topology) {
130         // Topology is current only if it is the same as our current topology
131         return topology == current;
132     }
133
134     @Override
135     public TopologyGraph getGraph(Topology topology) {
136         return defaultTopology(topology).getGraph();
137     }
138
139     @Override
140     public Set<TopologyCluster> getClusters(Topology topology) {
141         return defaultTopology(topology).getClusters();
142     }
143
144     @Override
145     public TopologyCluster getCluster(Topology topology, ClusterId clusterId) {
146         return defaultTopology(topology).getCluster(clusterId);
147     }
148
149     @Override
150     public Set<DeviceId> getClusterDevices(Topology topology, TopologyCluster cluster) {
151         return defaultTopology(topology).getClusterDevices(cluster);
152     }
153
154     @Override
155     public Set<Link> getClusterLinks(Topology topology, TopologyCluster cluster) {
156         return defaultTopology(topology).getClusterLinks(cluster);
157     }
158
159     @Override
160     public Set<Path> getPaths(Topology topology, DeviceId src, DeviceId dst) {
161         return defaultTopology(topology).getPaths(src, dst);
162     }
163
164     @Override
165     public Set<Path> getPaths(Topology topology, DeviceId src, DeviceId dst,
166                               LinkWeight weight) {
167         return defaultTopology(topology).getPaths(src, dst, weight);
168     }
169
170     @Override
171     public Set<DisjointPath> getDisjointPaths(Topology topology, DeviceId src, DeviceId dst) {
172         return defaultTopology(topology).getDisjointPaths(src, dst);
173     }
174
175     @Override
176     public Set<DisjointPath> getDisjointPaths(Topology topology, DeviceId src, DeviceId dst,
177                                               LinkWeight weight) {
178         return defaultTopology(topology).getDisjointPaths(src, dst, weight);
179     }
180
181     @Override
182     public Set<DisjointPath> getDisjointPaths(Topology topology, DeviceId src, DeviceId dst,
183                                               Map<Link, Object> riskProfile) {
184         return defaultTopology(topology).getDisjointPaths(src, dst, riskProfile);
185     }
186
187     @Override
188     public Set<DisjointPath> getDisjointPaths(Topology topology, DeviceId src, DeviceId dst,
189                                               LinkWeight weight, Map<Link, Object> riskProfile) {
190         return defaultTopology(topology).getDisjointPaths(src, dst, weight, riskProfile);
191     }
192
193     @Override
194     public boolean isInfrastructure(Topology topology, ConnectPoint connectPoint) {
195         return defaultTopology(topology).isInfrastructure(connectPoint);
196     }
197
198     @Override
199     public boolean isBroadcastPoint(Topology topology, ConnectPoint connectPoint) {
200         return defaultTopology(topology).isBroadcastPoint(connectPoint);
201     }
202
203     private boolean isBroadcastPoint(ConnectPoint connectPoint) {
204         // Any non-infrastructure, i.e. edge points are assumed to be OK.
205         if (!current.isInfrastructure(connectPoint)) {
206             return true;
207         }
208
209         // Find the cluster to which the device belongs.
210         TopologyCluster cluster = current.getCluster(connectPoint.deviceId());
211         checkArgument(cluster != null, "No cluster found for device %s", connectPoint.deviceId());
212
213         // If the broadcast set is null or empty, or if the point explicitly
214         // belongs to it, return true;
215         Set<ConnectPoint> points = broadcastPoints.get(cluster.root().deviceId());
216         return isNullOrEmpty(points) || points.contains(connectPoint);
217     }
218
219     @Override
220     public TopologyEvent updateTopology(ProviderId providerId,
221                                         GraphDescription graphDescription,
222                                         List<Event> reasons) {
223         // First off, make sure that what we're given is indeed newer than
224         // what we already have.
225         if (current != null && graphDescription.timestamp() < current.time()) {
226             return null;
227         }
228
229         // Have the default topology construct self from the description data.
230         DefaultTopology newTopology =
231                 new DefaultTopology(providerId, graphDescription, this::isBroadcastPoint);
232         updateBroadcastPoints(newTopology);
233
234         // Promote the new topology to current and return a ready-to-send event.
235         synchronized (this) {
236             current = newTopology;
237             return new TopologyEvent(TOPOLOGY_CHANGED, current, reasons);
238         }
239     }
240
241     private void updateBroadcastPoints(DefaultTopology topology) {
242         // Remove any broadcast trees rooted by devices for which we are master.
243         Set<DeviceId> toRemove = broadcastPoints.keySet().stream()
244                 .filter(mastershipService::isLocalMaster)
245                 .collect(Collectors.toSet());
246
247         // Update the broadcast trees rooted by devices for which we are master.
248         topology.getClusters().forEach(c -> {
249             toRemove.remove(c.root().deviceId());
250             if (mastershipService.isLocalMaster(c.root().deviceId())) {
251                 broadcastPoints.put(c.root().deviceId(),
252                                     topology.broadcastPoints(c.id()));
253             }
254         });
255
256         toRemove.forEach(broadcastPoints::remove);
257     }
258
259     // Validates the specified topology and returns it as a default
260     private DefaultTopology defaultTopology(Topology topology) {
261         checkArgument(topology instanceof DefaultTopology,
262                       "Topology class %s not supported", topology.getClass());
263         return (DefaultTopology) topology;
264     }
265
266     private class InternalBroadcastPointListener
267             implements EventuallyConsistentMapListener<DeviceId, Set<ConnectPoint>> {
268         @Override
269         public void event(EventuallyConsistentMapEvent<DeviceId, Set<ConnectPoint>> event) {
270             if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
271                 if (!event.value().isEmpty()) {
272                     log.info("Cluster rooted at {} has {} broadcast-points; #{}",
273                              event.key(), event.value().size(), event.value().hashCode());
274                 }
275             }
276         }
277     }
278 }