487fad9b26da3d59e3d0eb6782c8284d46a3df34
[onosfw.git] /
1 /*
2  * Copyright 2014 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.store.topology.impl;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static org.onlab.util.Tools.isNullOrEmpty;
20 import static org.onosproject.net.topology.TopologyEvent.Type.TOPOLOGY_CHANGED;
21 import static org.slf4j.LoggerFactory.getLogger;
22
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.Set;
26 import java.util.stream.Collectors;
27
28 import org.apache.felix.scr.annotations.Activate;
29 import org.apache.felix.scr.annotations.Component;
30 import org.apache.felix.scr.annotations.Deactivate;
31 import org.apache.felix.scr.annotations.Reference;
32 import org.apache.felix.scr.annotations.ReferenceCardinality;
33 import org.apache.felix.scr.annotations.Service;
34 import org.onlab.util.KryoNamespace;
35 import org.onosproject.common.DefaultTopology;
36 import org.onosproject.event.Event;
37 import org.onosproject.mastership.MastershipService;
38 import org.onosproject.net.ConnectPoint;
39 import org.onosproject.net.Device;
40 import org.onosproject.net.DeviceId;
41 import org.onosproject.net.Link;
42 import org.onosproject.net.Path;
43 import org.onosproject.net.provider.ProviderId;
44 import org.onosproject.net.topology.ClusterId;
45 import org.onosproject.net.topology.DefaultGraphDescription;
46 import org.onosproject.net.topology.GraphDescription;
47 import org.onosproject.net.topology.LinkWeight;
48 import org.onosproject.net.topology.Topology;
49 import org.onosproject.net.topology.TopologyCluster;
50 import org.onosproject.net.topology.TopologyEvent;
51 import org.onosproject.net.topology.TopologyGraph;
52 import org.onosproject.net.topology.TopologyStore;
53 import org.onosproject.net.topology.TopologyStoreDelegate;
54 import org.onosproject.store.AbstractStore;
55 import org.onosproject.store.serializers.KryoNamespaces;
56 import org.onosproject.store.service.EventuallyConsistentMap;
57 import org.onosproject.store.service.EventuallyConsistentMapEvent;
58 import org.onosproject.store.service.EventuallyConsistentMapListener;
59 import org.onosproject.store.service.LogicalClockService;
60 import org.onosproject.store.service.StorageService;
61 import org.slf4j.Logger;
62
63 /**
64  * Manages inventory of topology snapshots using trivial in-memory
65  * structures implementation.
66  * <p>
67  * Note: This component is not distributed per-se. It runs on every
68  * instance and feeds off of other distributed stores.
69  */
70 @Component(immediate = true)
71 @Service
72 public class DistributedTopologyStore
73         extends AbstractStore<TopologyEvent, TopologyStoreDelegate>
74         implements TopologyStore {
75
76     private final Logger log = getLogger(getClass());
77
78     private volatile DefaultTopology current =
79             new DefaultTopology(ProviderId.NONE,
80                                 new DefaultGraphDescription(0L, System.currentTimeMillis(),
81                                                             Collections.<Device>emptyList(),
82                                                             Collections.<Link>emptyList()));
83
84     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85     protected StorageService storageService;
86
87     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88     protected LogicalClockService clockService;
89
90     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
91     protected MastershipService mastershipService;
92
93     // Cluster root to broadcast points bindings to allow convergence to
94     // a shared broadcast tree; node that is the master of the cluster root
95     // is the primary.
96     private EventuallyConsistentMap<DeviceId, Set<ConnectPoint>> broadcastPoints;
97
98     private EventuallyConsistentMapListener<DeviceId, Set<ConnectPoint>> listener =
99             new InternalBroadcastPointListener();
100
101     @Activate
102     public void activate() {
103         KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder()
104                 .register(KryoNamespaces.API);
105
106         broadcastPoints = storageService.<DeviceId, Set<ConnectPoint>>eventuallyConsistentMapBuilder()
107                 .withName("onos-broadcast-trees")
108                 .withSerializer(hostSerializer)
109                 .withTimestampProvider((k, v) -> clockService.getTimestamp())
110                 .build();
111         broadcastPoints.addListener(listener);
112         log.info("Started");
113     }
114
115     @Deactivate
116     public void deactivate() {
117         broadcastPoints.removeListener(listener);
118         broadcastPoints.destroy();
119         log.info("Stopped");
120     }
121
122     @Override
123     public Topology currentTopology() {
124         return current;
125     }
126
127     @Override
128     public boolean isLatest(Topology topology) {
129         // Topology is current only if it is the same as our current topology
130         return topology == current;
131     }
132
133     @Override
134     public TopologyGraph getGraph(Topology topology) {
135         return defaultTopology(topology).getGraph();
136     }
137
138     @Override
139     public Set<TopologyCluster> getClusters(Topology topology) {
140         return defaultTopology(topology).getClusters();
141     }
142
143     @Override
144     public TopologyCluster getCluster(Topology topology, ClusterId clusterId) {
145         return defaultTopology(topology).getCluster(clusterId);
146     }
147
148     @Override
149     public Set<DeviceId> getClusterDevices(Topology topology, TopologyCluster cluster) {
150         return defaultTopology(topology).getClusterDevices(cluster);
151     }
152
153     @Override
154     public Set<Link> getClusterLinks(Topology topology, TopologyCluster cluster) {
155         return defaultTopology(topology).getClusterLinks(cluster);
156     }
157
158     @Override
159     public Set<Path> getPaths(Topology topology, DeviceId src, DeviceId dst) {
160         return defaultTopology(topology).getPaths(src, dst);
161     }
162
163     @Override
164     public Set<Path> getPaths(Topology topology, DeviceId src, DeviceId dst,
165                               LinkWeight weight) {
166         return defaultTopology(topology).getPaths(src, dst, weight);
167     }
168
169     @Override
170     public boolean isInfrastructure(Topology topology, ConnectPoint connectPoint) {
171         return defaultTopology(topology).isInfrastructure(connectPoint);
172     }
173
174     @Override
175     public boolean isBroadcastPoint(Topology topology, ConnectPoint connectPoint) {
176         return defaultTopology(topology).isBroadcastPoint(connectPoint);
177     }
178
179     private boolean isBroadcastPoint(ConnectPoint connectPoint) {
180         // Any non-infrastructure, i.e. edge points are assumed to be OK.
181         if (!current.isInfrastructure(connectPoint)) {
182             return true;
183         }
184
185         // Find the cluster to which the device belongs.
186         TopologyCluster cluster = current.getCluster(connectPoint.deviceId());
187         checkArgument(cluster != null, "No cluster found for device %s", connectPoint.deviceId());
188
189         // If the broadcast set is null or empty, or if the point explicitly
190         // belongs to it, return true;
191         Set<ConnectPoint> points = broadcastPoints.get(cluster.root().deviceId());
192         return isNullOrEmpty(points) || points.contains(connectPoint);
193     }
194
195     @Override
196     public TopologyEvent updateTopology(ProviderId providerId,
197                                         GraphDescription graphDescription,
198                                         List<Event> reasons) {
199         // First off, make sure that what we're given is indeed newer than
200         // what we already have.
201         if (current != null && graphDescription.timestamp() < current.time()) {
202             return null;
203         }
204
205         // Have the default topology construct self from the description data.
206         DefaultTopology newTopology =
207                 new DefaultTopology(providerId, graphDescription, this::isBroadcastPoint);
208         updateBroadcastPoints(newTopology);
209
210         // Promote the new topology to current and return a ready-to-send event.
211         synchronized (this) {
212             current = newTopology;
213             return new TopologyEvent(TOPOLOGY_CHANGED, current, reasons);
214         }
215     }
216
217     private void updateBroadcastPoints(DefaultTopology topology) {
218         // Remove any broadcast trees rooted by devices for which we are master.
219         Set<DeviceId> toRemove = broadcastPoints.keySet().stream()
220                 .filter(mastershipService::isLocalMaster)
221                 .collect(Collectors.toSet());
222
223         // Update the broadcast trees rooted by devices for which we are master.
224         topology.getClusters().forEach(c -> {
225             toRemove.remove(c.root().deviceId());
226             if (mastershipService.isLocalMaster(c.root().deviceId())) {
227                 broadcastPoints.put(c.root().deviceId(),
228                                     topology.broadcastPoints(c.id()));
229             }
230         });
231
232         toRemove.forEach(broadcastPoints::remove);
233     }
234
235     // Validates the specified topology and returns it as a default
236     private DefaultTopology defaultTopology(Topology topology) {
237         checkArgument(topology instanceof DefaultTopology,
238                       "Topology class %s not supported", topology.getClass());
239         return (DefaultTopology) topology;
240     }
241
242     private class InternalBroadcastPointListener
243             implements EventuallyConsistentMapListener<DeviceId, Set<ConnectPoint>> {
244         @Override
245         public void event(EventuallyConsistentMapEvent<DeviceId, Set<ConnectPoint>> event) {
246             if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
247                 if (!event.value().isEmpty()) {
248                     log.info("Cluster rooted at {} has {} broadcast-points; #{}",
249                              event.key(), event.value().size(), event.value().hashCode());
250                 }
251             }
252         }
253     }
254 }