2 * Copyright 2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onosproject.store.consistent.impl;
19 import java.util.Collection;
20 import java.util.List;
22 import java.util.Map.Entry;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.CopyOnWriteArrayList;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.function.Consumer;
29 import java.util.stream.Collectors;
31 import org.onosproject.store.service.DatabaseUpdate;
32 import org.onosproject.store.service.Transaction;
33 import org.onosproject.store.service.Versioned;
35 import com.google.common.collect.ImmutableList;
36 import com.google.common.collect.Lists;
37 import com.google.common.collect.Maps;
38 import com.google.common.collect.Sets;
40 import net.kuujo.copycat.Task;
41 import net.kuujo.copycat.cluster.Cluster;
42 import net.kuujo.copycat.resource.ResourceState;
43 import static com.google.common.base.Preconditions.checkState;
46 * A database that partitions the keys across one or more database partitions.
48 public class PartitionedDatabase implements Database {
50 private final String name;
51 private final Partitioner<String> partitioner;
52 private final List<Database> partitions;
53 private final AtomicBoolean isOpen = new AtomicBoolean(false);
54 private static final String DB_NOT_OPEN = "Partitioned Database is not open";
55 private TransactionManager transactionManager;
57 public PartitionedDatabase(
59 Collection<Database> partitions) {
61 this.partitions = partitions
63 .sorted((db1, db2) -> db1.name().compareTo(db2.name()))
64 .collect(Collectors.toList());
65 this.partitioner = new SimpleKeyHashPartitioner(this.partitions);
69 * Returns the databases for individual partitions.
70 * @return list of database partitions
72 public List<Database> getPartitions() {
77 * Returns true if the database is open.
78 * @return true if open, false otherwise
81 public boolean isOpen() {
86 public CompletableFuture<Set<String>> maps() {
87 checkState(isOpen.get(), DB_NOT_OPEN);
88 Set<String> mapNames = Sets.newConcurrentHashSet();
89 return CompletableFuture.allOf(partitions
91 .map(db -> db.maps().thenApply(mapNames::addAll))
92 .toArray(CompletableFuture[]::new))
93 .thenApply(v -> mapNames);
97 public CompletableFuture<Map<String, Long>> counters() {
98 checkState(isOpen.get(), DB_NOT_OPEN);
99 Map<String, Long> counters = Maps.newConcurrentMap();
100 return CompletableFuture.allOf(partitions
102 .map(db -> db.counters()
107 .toArray(CompletableFuture[]::new))
108 .thenApply(v -> counters);
112 public CompletableFuture<Integer> mapSize(String mapName) {
113 checkState(isOpen.get(), DB_NOT_OPEN);
114 AtomicInteger totalSize = new AtomicInteger(0);
115 return CompletableFuture.allOf(partitions
117 .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet))
118 .toArray(CompletableFuture[]::new))
119 .thenApply(v -> totalSize.get());
123 public CompletableFuture<Boolean> mapIsEmpty(String mapName) {
124 checkState(isOpen.get(), DB_NOT_OPEN);
125 return mapSize(mapName).thenApply(size -> size == 0);
129 public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) {
130 checkState(isOpen.get(), DB_NOT_OPEN);
131 return partitioner.getPartition(mapName, key).mapContainsKey(mapName, key);
135 public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) {
136 checkState(isOpen.get(), DB_NOT_OPEN);
137 AtomicBoolean containsValue = new AtomicBoolean(false);
138 return CompletableFuture.allOf(partitions
140 .map(p -> p.mapContainsValue(mapName, value)
141 .thenApply(v -> containsValue.compareAndSet(false, v)))
142 .toArray(CompletableFuture[]::new))
143 .thenApply(v -> containsValue.get());
147 public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) {
148 checkState(isOpen.get(), DB_NOT_OPEN);
149 return partitioner.getPartition(mapName, key).mapGet(mapName, key);
153 public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(
154 String mapName, String key, Match<byte[]> valueMatch,
155 Match<Long> versionMatch, byte[] value) {
156 return partitioner.getPartition(mapName, key).mapUpdate(mapName, key, valueMatch, versionMatch, value);
161 public CompletableFuture<Result<Void>> mapClear(String mapName) {
162 AtomicBoolean isLocked = new AtomicBoolean(false);
163 checkState(isOpen.get(), DB_NOT_OPEN);
164 return CompletableFuture.allOf(partitions
166 .map(p -> p.mapClear(mapName)
167 .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status())))
168 .toArray(CompletableFuture[]::new))
169 .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null));
173 public CompletableFuture<Set<String>> mapKeySet(String mapName) {
174 checkState(isOpen.get(), DB_NOT_OPEN);
175 Set<String> keySet = Sets.newConcurrentHashSet();
176 return CompletableFuture.allOf(partitions
178 .map(p -> p.mapKeySet(mapName).thenApply(keySet::addAll))
179 .toArray(CompletableFuture[]::new))
180 .thenApply(v -> keySet);
184 public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) {
185 checkState(isOpen.get(), DB_NOT_OPEN);
186 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
187 return CompletableFuture.allOf(partitions
189 .map(p -> p.mapValues(mapName).thenApply(values::addAll))
190 .toArray(CompletableFuture[]::new))
191 .thenApply(v -> values);
195 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) {
196 checkState(isOpen.get(), DB_NOT_OPEN);
197 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
198 return CompletableFuture.allOf(partitions
200 .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll))
201 .toArray(CompletableFuture[]::new))
202 .thenApply(v -> entrySet);
206 public CompletableFuture<Long> counterGet(String counterName) {
207 checkState(isOpen.get(), DB_NOT_OPEN);
208 return partitioner.getPartition(counterName, counterName).counterGet(counterName);
212 public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
213 checkState(isOpen.get(), DB_NOT_OPEN);
214 return partitioner.getPartition(counterName, counterName).counterAddAndGet(counterName, delta);
218 public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
219 checkState(isOpen.get(), DB_NOT_OPEN);
220 return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta);
225 public CompletableFuture<Long> queueSize(String queueName) {
226 checkState(isOpen.get(), DB_NOT_OPEN);
227 return partitioner.getPartition(queueName, queueName).queueSize(queueName);
231 public CompletableFuture<Void> queuePush(String queueName, byte[] entry) {
232 checkState(isOpen.get(), DB_NOT_OPEN);
233 return partitioner.getPartition(queueName, queueName).queuePush(queueName, entry);
237 public CompletableFuture<byte[]> queuePop(String queueName) {
238 checkState(isOpen.get(), DB_NOT_OPEN);
239 return partitioner.getPartition(queueName, queueName).queuePop(queueName);
243 public CompletableFuture<byte[]> queuePeek(String queueName) {
244 checkState(isOpen.get(), DB_NOT_OPEN);
245 return partitioner.getPartition(queueName, queueName).queuePeek(queueName);
249 public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
250 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
251 if (subTransactions.isEmpty()) {
252 return CompletableFuture.completedFuture(CommitResponse.success(ImmutableList.of()));
253 } else if (subTransactions.size() == 1) {
254 Entry<Database, Transaction> entry =
255 subTransactions.entrySet().iterator().next();
256 return entry.getKey().prepareAndCommit(entry.getValue());
258 if (transactionManager == null) {
259 throw new IllegalStateException("TransactionManager is not initialized");
261 return transactionManager.execute(transaction);
266 public CompletableFuture<Boolean> prepare(Transaction transaction) {
267 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
268 AtomicBoolean status = new AtomicBoolean(true);
269 return CompletableFuture.allOf(subTransactions.entrySet()
273 .prepare(entry.getValue())
274 .thenApply(v -> status.compareAndSet(true, v)))
275 .toArray(CompletableFuture[]::new))
276 .thenApply(v -> status.get());
280 public CompletableFuture<CommitResponse> commit(Transaction transaction) {
281 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
282 AtomicBoolean success = new AtomicBoolean(true);
283 List<UpdateResult<String, byte[]>> allUpdates = Lists.newArrayList();
284 return CompletableFuture.allOf(subTransactions.entrySet()
286 .map(entry -> entry.getKey().commit(entry.getValue())
287 .thenAccept(response -> {
288 success.set(success.get() && response.success());
290 allUpdates.addAll(response.updates());
293 .toArray(CompletableFuture[]::new))
294 .thenApply(v -> success.get() ?
295 CommitResponse.success(allUpdates) : CommitResponse.failure());
299 public CompletableFuture<Boolean> rollback(Transaction transaction) {
300 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
301 return CompletableFuture.allOf(subTransactions.entrySet()
303 .map(entry -> entry.getKey().rollback(entry.getValue()))
304 .toArray(CompletableFuture[]::new))
305 .thenApply(v -> true);
309 public CompletableFuture<Database> open() {
310 return CompletableFuture.allOf(partitions
313 .toArray(CompletableFuture[]::new))
321 public CompletableFuture<Void> close() {
322 checkState(isOpen.get(), DB_NOT_OPEN);
323 return CompletableFuture.allOf(partitions
325 .map(database -> database.close())
326 .toArray(CompletableFuture[]::new));
330 public boolean isClosed() {
331 return !isOpen.get();
335 public String name() {
340 public Cluster cluster() {
341 throw new UnsupportedOperationException();
345 public Database addStartupTask(Task<CompletableFuture<Void>> task) {
346 throw new UnsupportedOperationException();
350 public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
351 throw new UnsupportedOperationException();
355 public ResourceState state() {
356 throw new UnsupportedOperationException();
359 private Map<Database, Transaction> createSubTransactions(
360 Transaction transaction) {
361 Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
362 for (DatabaseUpdate update : transaction.updates()) {
363 Database partition = partitioner.getPartition(update.mapName(), update.key());
364 List<DatabaseUpdate> partitionUpdates =
365 perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
366 partitionUpdates.add(update);
368 Map<Database, Transaction> subTransactions = Maps.newHashMap();
369 perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
370 return subTransactions;
373 protected void setTransactionManager(TransactionManager transactionManager) {
374 this.transactionManager = transactionManager;
378 public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
379 partitions.forEach(p -> p.registerConsumer(consumer));
383 public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
384 partitions.forEach(p -> p.unregisterConsumer(consumer));