ca8eea37af1c05ff6e37ea361a9d4cfde831e190
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.store.config.impl;
17
18 import com.fasterxml.jackson.databind.JsonNode;
19 import com.fasterxml.jackson.databind.ObjectMapper;
20 import com.fasterxml.jackson.databind.node.ArrayNode;
21 import com.fasterxml.jackson.databind.node.BooleanNode;
22 import com.fasterxml.jackson.databind.node.DoubleNode;
23 import com.fasterxml.jackson.databind.node.IntNode;
24 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
25 import com.fasterxml.jackson.databind.node.LongNode;
26 import com.fasterxml.jackson.databind.node.ObjectNode;
27 import com.fasterxml.jackson.databind.node.ShortNode;
28 import com.fasterxml.jackson.databind.node.TextNode;
29 import com.google.common.collect.ImmutableSet;
30 import com.google.common.collect.Maps;
31
32 import org.apache.felix.scr.annotations.Activate;
33 import org.apache.felix.scr.annotations.Component;
34 import org.apache.felix.scr.annotations.Deactivate;
35 import org.apache.felix.scr.annotations.Reference;
36 import org.apache.felix.scr.annotations.ReferenceCardinality;
37 import org.apache.felix.scr.annotations.Service;
38 import org.onlab.util.KryoNamespace;
39 import org.onlab.util.Tools;
40 import org.onosproject.net.config.Config;
41 import org.onosproject.net.config.ConfigApplyDelegate;
42 import org.onosproject.net.config.ConfigFactory;
43 import org.onosproject.net.config.NetworkConfigEvent;
44 import org.onosproject.net.config.NetworkConfigStore;
45 import org.onosproject.net.config.NetworkConfigStoreDelegate;
46 import org.onosproject.store.AbstractStore;
47 import org.onosproject.store.serializers.KryoNamespaces;
48 import org.onosproject.store.service.ConsistentMap;
49 import org.onosproject.store.service.ConsistentMapException;
50 import org.onosproject.store.service.MapEvent;
51 import org.onosproject.store.service.MapEventListener;
52 import org.onosproject.store.service.Serializer;
53 import org.onosproject.store.service.StorageService;
54 import org.onosproject.store.service.Versioned;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 import java.util.LinkedHashMap;
59 import java.util.Map;
60 import java.util.Objects;
61 import java.util.Set;
62
63 import static com.google.common.base.Preconditions.checkArgument;
64 import static org.onosproject.net.config.NetworkConfigEvent.Type.*;
65
66 /**
67  * Implementation of a distributed network configuration store.
68  */
69 @Component(immediate = true)
70 @Service
71 public class DistributedNetworkConfigStore
72         extends AbstractStore<NetworkConfigEvent, NetworkConfigStoreDelegate>
73         implements NetworkConfigStore {
74
75     private final Logger log = LoggerFactory.getLogger(getClass());
76
77     private static final int MAX_BACKOFF = 10;
78     private static final String INVALID_CONFIG_JSON =
79             "JSON node does not contain valid configuration";
80
81     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82     protected StorageService storageService;
83
84     private ConsistentMap<ConfigKey, JsonNode> configs;
85
86     private final Map<String, ConfigFactory> factoriesByConfig = Maps.newConcurrentMap();
87     private final ObjectMapper mapper = new ObjectMapper();
88     private final ConfigApplyDelegate applyDelegate = new InternalApplyDelegate();
89     private final MapEventListener<ConfigKey, JsonNode> listener = new InternalMapListener();
90
91     @Activate
92     public void activate() {
93         KryoNamespace.Builder kryoBuilder = new KryoNamespace.Builder()
94                 .register(KryoNamespaces.API)
95                 .register(ConfigKey.class, ObjectNode.class, ArrayNode.class,
96                           JsonNodeFactory.class, LinkedHashMap.class,
97                           TextNode.class, BooleanNode.class,
98                           LongNode.class, DoubleNode.class, ShortNode.class, IntNode.class);
99
100         configs = storageService.<ConfigKey, JsonNode>consistentMapBuilder()
101                 .withSerializer(Serializer.using(kryoBuilder.build()))
102                 .withName("onos-network-configs")
103                 .withRelaxedReadConsistency()
104                 .build();
105         configs.addListener(listener);
106         log.info("Started");
107     }
108
109     @Deactivate
110     public void deactivate() {
111         configs.removeListener(listener);
112         log.info("Stopped");
113     }
114
115     @Override
116     public void addConfigFactory(ConfigFactory configFactory) {
117         factoriesByConfig.put(configFactory.configClass().getName(), configFactory);
118         notifyDelegate(new NetworkConfigEvent(CONFIG_REGISTERED, configFactory.configKey(),
119                                               configFactory.configClass()));
120     }
121
122     @Override
123     public void removeConfigFactory(ConfigFactory configFactory) {
124         factoriesByConfig.remove(configFactory.configClass().getName());
125         notifyDelegate(new NetworkConfigEvent(CONFIG_UNREGISTERED, configFactory.configKey(),
126                                               configFactory.configClass()));
127     }
128
129     @Override
130     @SuppressWarnings("unchecked")
131     public <S, C extends Config<S>> ConfigFactory<S, C> getConfigFactory(Class<C> configClass) {
132         return (ConfigFactory<S, C>) factoriesByConfig.get(configClass.getName());
133     }
134
135     @Override
136     @SuppressWarnings("unchecked")
137     public <S> Set<S> getSubjects(Class<S> subjectClass) {
138         ImmutableSet.Builder<S> builder = ImmutableSet.builder();
139         configs.keySet().forEach(k -> {
140             if (subjectClass.isInstance(k.subject)) {
141                 builder.add((S) k.subject);
142             }
143         });
144         return builder.build();
145     }
146
147     @Override
148     @SuppressWarnings("unchecked")
149     public <S, C extends Config<S>> Set<S> getSubjects(Class<S> subjectClass, Class<C> configClass) {
150         ImmutableSet.Builder<S> builder = ImmutableSet.builder();
151         String cName = configClass.getName();
152         configs.keySet().forEach(k -> {
153             if (subjectClass.isInstance(k.subject) && cName.equals(k.configClass)) {
154                 builder.add((S) k.subject);
155             }
156         });
157         return builder.build();
158     }
159
160     @Override
161     @SuppressWarnings("unchecked")
162     public <S> Set<Class<? extends Config<S>>> getConfigClasses(S subject) {
163         ImmutableSet.Builder<Class<? extends Config<S>>> builder = ImmutableSet.builder();
164         configs.keySet().forEach(k -> {
165             if (Objects.equals(subject, k.subject) && delegate != null) {
166                 builder.add(factoriesByConfig.get(k.configClass).configClass());
167             }
168         });
169         return builder.build();
170     }
171
172     @Override
173     public <S, T extends Config<S>> T getConfig(S subject, Class<T> configClass) {
174         // TODO: need to identify and address the root cause for timeouts.
175         Versioned<JsonNode> json = Tools.retryable(configs::get, ConsistentMapException.class, 1, MAX_BACKOFF)
176                 .apply(key(subject, configClass));
177         return json != null ? createConfig(subject, configClass, json.value()) : null;
178     }
179
180
181     @Override
182     public <S, C extends Config<S>> C createConfig(S subject, Class<C> configClass) {
183         ConfigFactory<S, C> factory = getConfigFactory(configClass);
184         Versioned<JsonNode> json = configs.computeIfAbsent(key(subject, configClass),
185                                                              k -> factory.isList() ?
186                                                                      mapper.createArrayNode() :
187                                                                      mapper.createObjectNode());
188         return createConfig(subject, configClass, json.value());
189     }
190
191     @Override
192     public <S, C extends Config<S>> C applyConfig(S subject, Class<C> configClass, JsonNode json) {
193         // Create the configuration and validate it.
194         C config = createConfig(subject, configClass, json);
195         checkArgument(config.isValid(), INVALID_CONFIG_JSON);
196
197         // Insert the validated configuration and get it back.
198         Versioned<JsonNode> versioned = configs.putAndGet(key(subject, configClass), json);
199
200         // Re-create the config if for some reason what we attempted to put
201         // was supplanted by someone else already.
202         return versioned.value() == json ? config :
203                 createConfig(subject, configClass, versioned.value());
204     }
205
206     @Override
207     public <S, C extends Config<S>> void clearConfig(S subject, Class<C> configClass) {
208         configs.remove(key(subject, configClass));
209     }
210
211     /**
212      * Produces a config from the specified subject, config class and raw JSON.
213      *
214      * @param subject     config subject
215      * @param configClass config class
216      * @param json        raw JSON data
217      * @return config object or null of no factory found or if the specified
218      * JSON is null
219      */
220     @SuppressWarnings("unchecked")
221     private <S, C extends Config<S>> C createConfig(S subject, Class<C> configClass,
222                                                     JsonNode json) {
223         if (json != null) {
224             ConfigFactory<S, C> factory = factoriesByConfig.get(configClass.getName());
225             if (factory != null) {
226                 C config = factory.createConfig();
227                 config.init(subject, factory.configKey(), json, mapper, applyDelegate);
228                 return config;
229             }
230         }
231         return null;
232     }
233
234
235     // Auxiliary delegate to receive notifications about changes applied to
236     // the network configuration - by the apps.
237     private class InternalApplyDelegate implements ConfigApplyDelegate {
238         @Override
239         public void onApply(Config config) {
240             configs.put(key(config.subject(), config.getClass()), config.node());
241         }
242     }
243
244     // Produces a key for uniquely tracking a subject config.
245     private static ConfigKey key(Object subject, Class<?> configClass) {
246         return new ConfigKey(subject, configClass);
247     }
248
249     // Auxiliary key to track subject configurations.
250     private static final class ConfigKey {
251         final Object subject;
252         final String configClass;
253
254         private ConfigKey(Object subject, Class<?> configClass) {
255             this.subject = subject;
256             this.configClass = configClass.getName();
257         }
258
259         @Override
260         public int hashCode() {
261             return Objects.hash(subject, configClass);
262         }
263
264         @Override
265         public boolean equals(Object obj) {
266             if (this == obj) {
267                 return true;
268             }
269             if (obj instanceof ConfigKey) {
270                 final ConfigKey other = (ConfigKey) obj;
271                 return Objects.equals(this.subject, other.subject)
272                         && Objects.equals(this.configClass, other.configClass);
273             }
274             return false;
275         }
276     }
277
278     private class InternalMapListener implements MapEventListener<ConfigKey, JsonNode> {
279         @Override
280         public void event(MapEvent<ConfigKey, JsonNode> event) {
281             NetworkConfigEvent.Type type;
282             switch (event.type()) {
283                 case INSERT:
284                     type = CONFIG_ADDED;
285                     break;
286                 case UPDATE:
287                     type = CONFIG_UPDATED;
288                     break;
289                 case REMOVE:
290                 default:
291                     type = CONFIG_REMOVED;
292                     break;
293             }
294             ConfigFactory factory = factoriesByConfig.get(event.key().configClass);
295             if (factory != null) {
296                 notifyDelegate(new NetworkConfigEvent(type, event.key().subject,
297                                                       factory.configClass()));
298             }
299         }
300     }
301 }