2  * Copyright 2015 Open Networking Laboratory
 
   4  * Licensed under the Apache License, Version 2.0 (the "License");
 
   5  * you may not use this file except in compliance with the License.
 
   6  * You may obtain a copy of the License at
 
   8  *     http://www.apache.org/licenses/LICENSE-2.0
 
  10  * Unless required by applicable law or agreed to in writing, software
 
  11  * distributed under the License is distributed on an "AS IS" BASIS,
 
  12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  13  * See the License for the specific language governing permissions and
 
  14  * limitations under the License.
 
  16 package org.onosproject.store.config.impl;
 
  18 import com.fasterxml.jackson.databind.JsonNode;
 
  19 import com.fasterxml.jackson.databind.ObjectMapper;
 
  20 import com.fasterxml.jackson.databind.node.ArrayNode;
 
  21 import com.fasterxml.jackson.databind.node.BooleanNode;
 
  22 import com.fasterxml.jackson.databind.node.DoubleNode;
 
  23 import com.fasterxml.jackson.databind.node.IntNode;
 
  24 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 
  25 import com.fasterxml.jackson.databind.node.LongNode;
 
  26 import com.fasterxml.jackson.databind.node.ObjectNode;
 
  27 import com.fasterxml.jackson.databind.node.ShortNode;
 
  28 import com.fasterxml.jackson.databind.node.TextNode;
 
  29 import com.google.common.collect.ImmutableSet;
 
  30 import com.google.common.collect.Maps;
 
  32 import org.apache.felix.scr.annotations.Activate;
 
  33 import org.apache.felix.scr.annotations.Component;
 
  34 import org.apache.felix.scr.annotations.Deactivate;
 
  35 import org.apache.felix.scr.annotations.Reference;
 
  36 import org.apache.felix.scr.annotations.ReferenceCardinality;
 
  37 import org.apache.felix.scr.annotations.Service;
 
  38 import org.onlab.util.KryoNamespace;
 
  39 import org.onlab.util.Tools;
 
  40 import org.onosproject.net.config.Config;
 
  41 import org.onosproject.net.config.ConfigApplyDelegate;
 
  42 import org.onosproject.net.config.ConfigFactory;
 
  43 import org.onosproject.net.config.NetworkConfigEvent;
 
  44 import org.onosproject.net.config.NetworkConfigStore;
 
  45 import org.onosproject.net.config.NetworkConfigStoreDelegate;
 
  46 import org.onosproject.store.AbstractStore;
 
  47 import org.onosproject.store.serializers.KryoNamespaces;
 
  48 import org.onosproject.store.service.ConsistentMap;
 
  49 import org.onosproject.store.service.ConsistentMapException;
 
  50 import org.onosproject.store.service.MapEvent;
 
  51 import org.onosproject.store.service.MapEventListener;
 
  52 import org.onosproject.store.service.Serializer;
 
  53 import org.onosproject.store.service.StorageService;
 
  54 import org.onosproject.store.service.Versioned;
 
  55 import org.slf4j.Logger;
 
  56 import org.slf4j.LoggerFactory;
 
  58 import java.util.LinkedHashMap;
 
  60 import java.util.Objects;
 
  63 import static org.onosproject.net.config.NetworkConfigEvent.Type.*;
 
  66  * Implementation of a distributed network configuration store.
 
  68 @Component(immediate = true)
 
  70 public class DistributedNetworkConfigStore
 
  71         extends AbstractStore<NetworkConfigEvent, NetworkConfigStoreDelegate>
 
  72         implements NetworkConfigStore {
 
  74     private static final int MAX_BACKOFF = 10;
 
  76     private final Logger log = LoggerFactory.getLogger(getClass());
 
  78     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
 
  79     protected StorageService storageService;
 
  81     private ConsistentMap<ConfigKey, JsonNode> configs;
 
  83     private final Map<String, ConfigFactory> factoriesByConfig = Maps.newConcurrentMap();
 
  84     private final ObjectMapper mapper = new ObjectMapper();
 
  85     private final ConfigApplyDelegate applyDelegate = new InternalApplyDelegate();
 
  86     private final MapEventListener<ConfigKey, JsonNode> listener = new InternalMapListener();
 
  89     public void activate() {
 
  90         KryoNamespace.Builder kryoBuilder = new KryoNamespace.Builder()
 
  91                 .register(KryoNamespaces.API)
 
  92                 .register(ConfigKey.class, ObjectNode.class, ArrayNode.class,
 
  93                           JsonNodeFactory.class, LinkedHashMap.class,
 
  94                           TextNode.class, BooleanNode.class,
 
  95                           LongNode.class, DoubleNode.class, ShortNode.class, IntNode.class);
 
  97         configs = storageService.<ConfigKey, JsonNode>consistentMapBuilder()
 
  98                 .withSerializer(Serializer.using(kryoBuilder.build()))
 
  99                 .withName("onos-network-configs")
 
 100                 .withRelaxedReadConsistency()
 
 102         configs.addListener(listener);
 
 107     public void deactivate() {
 
 108         configs.removeListener(listener);
 
 113     public void addConfigFactory(ConfigFactory configFactory) {
 
 114         factoriesByConfig.put(configFactory.configClass().getName(), configFactory);
 
 115         notifyDelegate(new NetworkConfigEvent(CONFIG_REGISTERED, configFactory.configKey(),
 
 116                                               configFactory.configClass()));
 
 120     public void removeConfigFactory(ConfigFactory configFactory) {
 
 121         factoriesByConfig.remove(configFactory.configClass().getName());
 
 122         notifyDelegate(new NetworkConfigEvent(CONFIG_UNREGISTERED, configFactory.configKey(),
 
 123                                               configFactory.configClass()));
 
 127     @SuppressWarnings("unchecked")
 
 128     public <S, C extends Config<S>> ConfigFactory<S, C> getConfigFactory(Class<C> configClass) {
 
 129         return (ConfigFactory<S, C>) factoriesByConfig.get(configClass.getName());
 
 133     @SuppressWarnings("unchecked")
 
 134     public <S> Set<S> getSubjects(Class<S> subjectClass) {
 
 135         ImmutableSet.Builder<S> builder = ImmutableSet.builder();
 
 136         configs.keySet().forEach(k -> {
 
 137             if (subjectClass.isInstance(k.subject)) {
 
 138                 builder.add((S) k.subject);
 
 141         return builder.build();
 
 145     @SuppressWarnings("unchecked")
 
 146     public <S, C extends Config<S>> Set<S> getSubjects(Class<S> subjectClass, Class<C> configClass) {
 
 147         ImmutableSet.Builder<S> builder = ImmutableSet.builder();
 
 148         String cName = configClass.getName();
 
 149         configs.keySet().forEach(k -> {
 
 150             if (subjectClass.isInstance(k.subject) && cName.equals(k.configClass)) {
 
 151                 builder.add((S) k.subject);
 
 154         return builder.build();
 
 158     @SuppressWarnings("unchecked")
 
 159     public <S> Set<Class<? extends Config<S>>> getConfigClasses(S subject) {
 
 160         ImmutableSet.Builder<Class<? extends Config<S>>> builder = ImmutableSet.builder();
 
 161         configs.keySet().forEach(k -> {
 
 162             if (Objects.equals(subject, k.subject) && delegate != null) {
 
 163                 builder.add(factoriesByConfig.get(k.configClass).configClass());
 
 166         return builder.build();
 
 170     public <S, T extends Config<S>> T getConfig(S subject, Class<T> configClass) {
 
 171         // TODO: need to identify and address the root cause for timeouts.
 
 172         Versioned<JsonNode> json = Tools.retryable(configs::get, ConsistentMapException.class, 1, MAX_BACKOFF)
 
 173                 .apply(key(subject, configClass));
 
 174         return json != null ? createConfig(subject, configClass, json.value()) : null;
 
 179     public <S, C extends Config<S>> C createConfig(S subject, Class<C> configClass) {
 
 180         ConfigFactory<S, C> factory = getConfigFactory(configClass);
 
 181         Versioned<JsonNode> json = configs.computeIfAbsent(key(subject, configClass),
 
 182                                                              k -> factory.isList() ?
 
 183                                                                      mapper.createArrayNode() :
 
 184                                                                      mapper.createObjectNode());
 
 185         return createConfig(subject, configClass, json.value());
 
 189     public <S, C extends Config<S>> C applyConfig(S subject, Class<C> configClass, JsonNode json) {
 
 190         return createConfig(subject, configClass,
 
 191                             configs.putAndGet(key(subject, configClass), json).value());
 
 195     public <S, C extends Config<S>> void clearConfig(S subject, Class<C> configClass) {
 
 196         configs.remove(key(subject, configClass));
 
 200      * Produces a config from the specified subject, config class and raw JSON.
 
 202      * @param subject     config subject
 
 203      * @param configClass config class
 
 204      * @param json        raw JSON data
 
 205      * @return config object or null of no factory found or if the specified
 
 208     @SuppressWarnings("unchecked")
 
 209     private <S, C extends Config<S>> C createConfig(S subject, Class<C> configClass,
 
 212             ConfigFactory<S, C> factory = factoriesByConfig.get(configClass.getName());
 
 213             if (factory != null) {
 
 214                 C config = factory.createConfig();
 
 215                 config.init(subject, factory.configKey(), json, mapper, applyDelegate);
 
 223     // Auxiliary delegate to receive notifications about changes applied to
 
 224     // the network configuration - by the apps.
 
 225     private class InternalApplyDelegate implements ConfigApplyDelegate {
 
 227         public void onApply(Config config) {
 
 228             configs.put(key(config.subject(), config.getClass()), config.node());
 
 232     // Produces a key for uniquely tracking a subject config.
 
 233     private static ConfigKey key(Object subject, Class<?> configClass) {
 
 234         return new ConfigKey(subject, configClass);
 
 237     // Auxiliary key to track subject configurations.
 
 238     private static final class ConfigKey {
 
 239         final Object subject;
 
 240         final String configClass;
 
 242         private ConfigKey(Object subject, Class<?> configClass) {
 
 243             this.subject = subject;
 
 244             this.configClass = configClass.getName();
 
 248         public int hashCode() {
 
 249             return Objects.hash(subject, configClass);
 
 253         public boolean equals(Object obj) {
 
 257             if (obj instanceof ConfigKey) {
 
 258                 final ConfigKey other = (ConfigKey) obj;
 
 259                 return Objects.equals(this.subject, other.subject)
 
 260                         && Objects.equals(this.configClass, other.configClass);
 
 266     private class InternalMapListener implements MapEventListener<ConfigKey, JsonNode> {
 
 268         public void event(MapEvent<ConfigKey, JsonNode> event) {
 
 269             NetworkConfigEvent.Type type;
 
 270             switch (event.type()) {
 
 275                     type = CONFIG_UPDATED;
 
 279                     type = CONFIG_REMOVED;
 
 282             ConfigFactory factory = factoriesByConfig.get(event.key().configClass);
 
 283             if (factory != null) {
 
 284                 notifyDelegate(new NetworkConfigEvent(type, event.key().subject,
 
 285                                                       factory.configClass()));