3e89635a6fa2505358e6b4605b9ccada0f8695ca
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onosproject.store.consistent.impl;
18
19 import com.google.common.collect.ArrayListMultimap;
20 import com.google.common.collect.ImmutableList;
21 import com.google.common.collect.Lists;
22 import com.google.common.collect.Maps;
23 import com.google.common.collect.Multimap;
24 import com.google.common.collect.Multimaps;
25 import com.google.common.collect.Sets;
26 import com.google.common.util.concurrent.Futures;
27
28 import net.kuujo.copycat.CopycatConfig;
29 import net.kuujo.copycat.cluster.ClusterConfig;
30 import net.kuujo.copycat.cluster.Member;
31 import net.kuujo.copycat.cluster.Member.Type;
32 import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
33 import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
34 import net.kuujo.copycat.log.BufferedLog;
35 import net.kuujo.copycat.log.FileLog;
36 import net.kuujo.copycat.log.Log;
37 import net.kuujo.copycat.protocol.Consistency;
38 import net.kuujo.copycat.protocol.Protocol;
39 import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
40
41 import org.apache.commons.lang.math.RandomUtils;
42 import org.apache.felix.scr.annotations.Activate;
43 import org.apache.felix.scr.annotations.Component;
44 import org.apache.felix.scr.annotations.Deactivate;
45 import org.apache.felix.scr.annotations.Reference;
46 import org.apache.felix.scr.annotations.ReferenceCardinality;
47 import org.apache.felix.scr.annotations.ReferencePolicy;
48 import org.apache.felix.scr.annotations.Service;
49 import org.onosproject.app.ApplicationEvent;
50 import org.onosproject.app.ApplicationListener;
51 import org.onosproject.app.ApplicationService;
52 import org.onosproject.cluster.ClusterMetadataService;
53 import org.onosproject.cluster.ClusterService;
54 import org.onosproject.cluster.ControllerNode;
55 import org.onosproject.cluster.NodeId;
56 import org.onosproject.core.ApplicationId;
57 import org.onosproject.core.IdGenerator;
58 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
59 import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
60 import org.onosproject.store.service.AtomicCounterBuilder;
61 import org.onosproject.store.service.AtomicValueBuilder;
62 import org.onosproject.store.service.ConsistentMapBuilder;
63 import org.onosproject.store.service.ConsistentMapException;
64 import org.onosproject.store.service.DistributedQueueBuilder;
65 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
66 import org.onosproject.store.service.MapInfo;
67 import org.onosproject.store.service.PartitionInfo;
68 import org.onosproject.store.service.DistributedSetBuilder;
69 import org.onosproject.store.service.StorageAdminService;
70 import org.onosproject.store.service.StorageService;
71 import org.onosproject.store.service.Transaction;
72 import org.onosproject.store.service.TransactionContextBuilder;
73 import org.slf4j.Logger;
74
75 import java.util.Collection;
76 import java.util.List;
77 import java.util.Map;
78 import java.util.Set;
79 import java.util.concurrent.CompletableFuture;
80 import java.util.concurrent.ExecutionException;
81 import java.util.concurrent.Executors;
82 import java.util.concurrent.TimeUnit;
83 import java.util.concurrent.TimeoutException;
84 import java.util.stream.Collectors;
85
86 import static org.slf4j.LoggerFactory.getLogger;
87 import static org.onosproject.app.ApplicationEvent.Type.APP_UNINSTALLED;
88 import static org.onosproject.app.ApplicationEvent.Type.APP_DEACTIVATED;
89
90 /**
91  * Database manager.
92  */
93 @Component(immediate = true, enabled = true)
94 @Service
95 public class DatabaseManager implements StorageService, StorageAdminService {
96
97     private final Logger log = getLogger(getClass());
98
99     public static final String BASE_PARTITION_NAME = "p0";
100
101     private static final int RAFT_ELECTION_TIMEOUT_MILLIS = 3000;
102     private static final int DATABASE_OPERATION_TIMEOUT_MILLIS = 5000;
103
104     private ClusterCoordinator coordinator;
105     protected PartitionedDatabase partitionedDatabase;
106     protected Database inMemoryDatabase;
107     protected NodeId localNodeId;
108
109     private TransactionManager transactionManager;
110     private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong();
111
112     private ApplicationListener appListener = new InternalApplicationListener();
113
114     private final Multimap<String, DefaultAsyncConsistentMap> maps =
115             Multimaps.synchronizedMultimap(ArrayListMultimap.create());
116     private final Multimap<ApplicationId, DefaultAsyncConsistentMap> mapsByApplication =
117             Multimaps.synchronizedMultimap(ArrayListMultimap.create());
118
119     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
120     protected ClusterMetadataService clusterMetadataService;
121
122     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
123     protected ClusterService clusterService;
124
125     @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC)
126     protected ApplicationService applicationService;
127
128     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
129     protected ClusterCommunicationService clusterCommunicator;
130
131     protected String nodeIdToUri(NodeId nodeId) {
132         ControllerNode node = clusterService.getNode(nodeId);
133         return String.format("onos://%s:%d", node.ip(), node.tcpPort());
134     }
135
136     protected void bindApplicationService(ApplicationService service) {
137         applicationService = service;
138         applicationService.addListener(appListener);
139     }
140
141     protected void unbindApplicationService(ApplicationService service) {
142         applicationService.removeListener(appListener);
143         this.applicationService = null;
144     }
145
146     @Activate
147     public void activate() {
148         localNodeId = clusterService.getLocalNode().id();
149
150         Map<String, Set<NodeId>> partitionMap = Maps.newHashMap();
151         clusterMetadataService.getClusterMetadata().getPartitions().forEach(p -> {
152             partitionMap.put(p.getName(), Sets.newHashSet(p.getMembers()));
153         });
154
155
156         String[] activeNodeUris = partitionMap.values()
157                     .stream()
158                     .reduce((s1, s2) -> Sets.union(s1, s2))
159                     .get()
160                     .stream()
161                     .map(this::nodeIdToUri)
162                     .toArray(String[]::new);
163
164         String localNodeUri = nodeIdToUri(clusterMetadataService.getLocalNode().id());
165         Protocol protocol = new CopycatCommunicationProtocol(clusterService, clusterCommunicator);
166
167         ClusterConfig clusterConfig = new ClusterConfig()
168             .withProtocol(protocol)
169             .withElectionTimeout(electionTimeoutMillis(activeNodeUris))
170             .withHeartbeatInterval(heartbeatTimeoutMillis(activeNodeUris))
171             .withMembers(activeNodeUris)
172             .withLocalMember(localNodeUri);
173
174         CopycatConfig copycatConfig = new CopycatConfig()
175             .withName("onos")
176             .withClusterConfig(clusterConfig)
177             .withDefaultSerializer(new DatabaseSerializer())
178             .withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
179
180         coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
181
182         DatabaseConfig inMemoryDatabaseConfig =
183                 newDatabaseConfig(BASE_PARTITION_NAME, newInMemoryLog(), activeNodeUris);
184         inMemoryDatabase = coordinator
185                 .getResource(inMemoryDatabaseConfig.getName(), inMemoryDatabaseConfig.resolve(clusterConfig)
186                 .withSerializer(copycatConfig.getDefaultSerializer())
187                 .withDefaultExecutor(copycatConfig.getDefaultExecutor()));
188
189         List<Database> partitions = partitionMap.entrySet()
190             .stream()
191             .map(entry -> {
192                 String[] replicas = entry.getValue().stream().map(this::nodeIdToUri).toArray(String[]::new);
193                 return newDatabaseConfig(entry.getKey(), newPersistentLog(), replicas);
194                 })
195             .map(config -> {
196                 Database db = coordinator.getResource(config.getName(), config.resolve(clusterConfig)
197                         .withSerializer(copycatConfig.getDefaultSerializer())
198                         .withDefaultExecutor(copycatConfig.getDefaultExecutor()));
199                 return db;
200             })
201             .collect(Collectors.toList());
202
203         partitionedDatabase = new PartitionedDatabase("onos-store", partitions);
204
205         CompletableFuture<Void> status = coordinator.open()
206             .thenCompose(v -> CompletableFuture.allOf(inMemoryDatabase.open(), partitionedDatabase.open())
207             .whenComplete((db, error) -> {
208                 if (error != null) {
209                     log.error("Failed to initialize database.", error);
210                 } else {
211                     log.info("Successfully initialized database.");
212                 }
213             }));
214
215         Futures.getUnchecked(status);
216
217         transactionManager = new TransactionManager(partitionedDatabase, consistentMapBuilder());
218         partitionedDatabase.setTransactionManager(transactionManager);
219
220         log.info("Started");
221     }
222
223     @Deactivate
224     public void deactivate() {
225         CompletableFuture.allOf(inMemoryDatabase.close(), partitionedDatabase.close())
226             .thenCompose(v -> coordinator.close())
227             .whenComplete((result, error) -> {
228                 if (error != null) {
229                     log.warn("Failed to cleanly close databases.", error);
230                 } else {
231                     log.info("Successfully closed databases.");
232                 }
233             });
234         ImmutableList.copyOf(maps.values()).forEach(this::unregisterMap);
235         if (applicationService != null) {
236             applicationService.removeListener(appListener);
237         }
238         log.info("Stopped");
239     }
240
241     @Override
242     public TransactionContextBuilder transactionContextBuilder() {
243         return new DefaultTransactionContextBuilder(this, transactionIdGenerator.getNewId());
244     }
245
246     @Override
247     public List<PartitionInfo> getPartitionInfo() {
248         return Lists.asList(
249                     inMemoryDatabase,
250                     partitionedDatabase.getPartitions().toArray(new Database[]{}))
251                 .stream()
252                 .map(DatabaseManager::toPartitionInfo)
253                 .collect(Collectors.toList());
254     }
255
256     private Log newPersistentLog() {
257         String logDir = System.getProperty("karaf.data", "./data");
258         return new FileLog()
259             .withDirectory(logDir)
260             .withSegmentSize(1073741824) // 1GB
261             .withFlushOnWrite(true)
262             .withSegmentInterval(Long.MAX_VALUE);
263     }
264
265     private Log newInMemoryLog() {
266         return new BufferedLog()
267             .withFlushOnWrite(false)
268             .withFlushInterval(Long.MAX_VALUE)
269             .withSegmentSize(10485760) // 10MB
270             .withSegmentInterval(Long.MAX_VALUE);
271     }
272
273     private DatabaseConfig newDatabaseConfig(String name, Log log, String[] replicas) {
274         return new DatabaseConfig()
275             .withName(name)
276             .withElectionTimeout(electionTimeoutMillis(replicas))
277             .withHeartbeatInterval(heartbeatTimeoutMillis(replicas))
278             .withConsistency(Consistency.DEFAULT)
279             .withLog(log)
280             .withDefaultSerializer(new DatabaseSerializer())
281             .withReplicas(replicas);
282     }
283
284     private long electionTimeoutMillis(String[] replicas) {
285         return replicas.length == 1 ? 10L : RAFT_ELECTION_TIMEOUT_MILLIS;
286     }
287
288     private long heartbeatTimeoutMillis(String[] replicas) {
289         return electionTimeoutMillis(replicas) / 2;
290     }
291
292     /**
293      * Maps a Raft Database object to a PartitionInfo object.
294      *
295      * @param database database containing input data
296      * @return PartitionInfo object
297      */
298     private static PartitionInfo toPartitionInfo(Database database) {
299         return new PartitionInfo(database.name(),
300                           database.cluster().term(),
301                           database.cluster().members()
302                                   .stream()
303                                   .filter(member -> Type.ACTIVE.equals(member.type()))
304                                   .map(Member::uri)
305                                   .sorted()
306                                   .collect(Collectors.toList()),
307                           database.cluster().leader() != null ?
308                                   database.cluster().leader().uri() : null);
309     }
310
311
312     @Override
313     public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
314         return new EventuallyConsistentMapBuilderImpl<>(clusterService,
315                                                         clusterCommunicator);
316     }
317
318     @Override
319     public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
320         return new DefaultConsistentMapBuilder<>(this);
321     }
322
323     @Override
324     public <E> DistributedSetBuilder<E> setBuilder() {
325         return new DefaultDistributedSetBuilder<>(this);
326     }
327
328
329     @Override
330     public <E> DistributedQueueBuilder<E> queueBuilder() {
331         return new DefaultDistributedQueueBuilder<>(this);
332     }
333
334     @Override
335     public AtomicCounterBuilder atomicCounterBuilder() {
336         return new DefaultAtomicCounterBuilder(inMemoryDatabase, partitionedDatabase);
337     }
338
339     @Override
340     public <V> AtomicValueBuilder<V> atomicValueBuilder() {
341         return new DefaultAtomicValueBuilder<>(this);
342     }
343
344     @Override
345     public List<MapInfo> getMapInfo() {
346         List<MapInfo> maps = Lists.newArrayList();
347         maps.addAll(getMapInfo(inMemoryDatabase));
348         maps.addAll(getMapInfo(partitionedDatabase));
349         return maps;
350     }
351
352     private List<MapInfo> getMapInfo(Database database) {
353         return complete(database.maps())
354             .stream()
355             .map(name -> new MapInfo(name, complete(database.mapSize(name))))
356             .filter(info -> info.size() > 0)
357             .collect(Collectors.toList());
358     }
359
360
361     @Override
362     public Map<String, Long> getCounters() {
363         Map<String, Long> counters = Maps.newHashMap();
364         counters.putAll(complete(inMemoryDatabase.counters()));
365         counters.putAll(complete(partitionedDatabase.counters()));
366         return counters;
367     }
368
369     @Override
370     public Map<String, Long> getPartitionedDatabaseCounters() {
371         Map<String, Long> counters = Maps.newHashMap();
372         counters.putAll(complete(partitionedDatabase.counters()));
373         return counters;
374     }
375
376     @Override
377     public Map<String, Long> getInMemoryDatabaseCounters() {
378         Map<String, Long> counters = Maps.newHashMap();
379         counters.putAll(complete(inMemoryDatabase.counters()));
380         return counters;
381     }
382
383     @Override
384     public Collection<Transaction> getTransactions() {
385         return complete(transactionManager.getTransactions());
386     }
387
388     private static <T> T complete(CompletableFuture<T> future) {
389         try {
390             return future.get(DATABASE_OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
391         } catch (InterruptedException e) {
392             Thread.currentThread().interrupt();
393             throw new ConsistentMapException.Interrupted();
394         } catch (TimeoutException e) {
395             throw new ConsistentMapException.Timeout();
396         } catch (ExecutionException e) {
397             throw new ConsistentMapException(e.getCause());
398         }
399     }
400
401     @Override
402     public void redriveTransactions() {
403         getTransactions().stream().forEach(transactionManager::execute);
404     }
405
406     protected <K, V> DefaultAsyncConsistentMap<K, V> registerMap(DefaultAsyncConsistentMap<K, V> map) {
407         maps.put(map.name(), map);
408         if (map.applicationId() != null) {
409             mapsByApplication.put(map.applicationId(), map);
410         }
411         return map;
412     }
413
414     protected <K, V> void unregisterMap(DefaultAsyncConsistentMap<K, V> map) {
415         maps.remove(map.name(), map);
416         if (map.applicationId() != null) {
417             mapsByApplication.remove(map.applicationId(), map);
418         }
419     }
420
421     private class InternalApplicationListener implements ApplicationListener {
422         @Override
423         public void event(ApplicationEvent event) {
424             if (event.type() == APP_UNINSTALLED || event.type() == APP_DEACTIVATED) {
425                 ApplicationId appId = event.subject().id();
426                 List<DefaultAsyncConsistentMap> mapsToRemove;
427                 synchronized (mapsByApplication) {
428                     mapsToRemove = ImmutableList.copyOf(mapsByApplication.get(appId));
429                 }
430                 mapsToRemove.forEach(DatabaseManager.this::unregisterMap);
431                 if (event.type() == APP_UNINSTALLED) {
432                     mapsToRemove.stream().filter(map -> map.purgeOnUninstall()).forEach(map -> map.clear());
433                 }
434             }
435         }
436     }
437 }