a294681e9fe935602ebbc6ae80f8b4436e67748e
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onosproject.store.consistent.impl;
18
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Map.Entry;
23 import java.util.Set;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.CopyOnWriteArrayList;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.function.Consumer;
29 import java.util.stream.Collectors;
30
31 import org.onosproject.store.service.DatabaseUpdate;
32 import org.onosproject.store.service.Transaction;
33 import org.onosproject.store.service.Versioned;
34
35 import com.google.common.collect.ImmutableList;
36 import com.google.common.collect.Lists;
37 import com.google.common.collect.Maps;
38 import com.google.common.collect.Sets;
39
40 import net.kuujo.copycat.Task;
41 import net.kuujo.copycat.cluster.Cluster;
42 import net.kuujo.copycat.resource.ResourceState;
43 import static com.google.common.base.Preconditions.checkState;
44
45 /**
46  * A database that partitions the keys across one or more database partitions.
47  */
48 public class PartitionedDatabase implements Database {
49
50     private final String name;
51     private final Partitioner<String> partitioner;
52     private final List<Database> partitions;
53     private final AtomicBoolean isOpen = new AtomicBoolean(false);
54     private static final String DB_NOT_OPEN = "Partitioned Database is not open";
55     private TransactionManager transactionManager;
56
57     public PartitionedDatabase(
58             String name,
59             Collection<Database> partitions) {
60         this.name = name;
61         this.partitions = partitions
62                 .stream()
63                 .sorted((db1, db2) -> db1.name().compareTo(db2.name()))
64                 .collect(Collectors.toList());
65         this.partitioner = new SimpleKeyHashPartitioner(this.partitions);
66     }
67
68     /**
69      * Returns the databases for individual partitions.
70      * @return list of database partitions
71      */
72     public List<Database> getPartitions() {
73         return partitions;
74     }
75
76     /**
77      * Returns true if the database is open.
78      * @return true if open, false otherwise
79      */
80     @Override
81     public boolean isOpen() {
82         return isOpen.get();
83     }
84
85     @Override
86     public CompletableFuture<Set<String>> maps() {
87         checkState(isOpen.get(), DB_NOT_OPEN);
88         Set<String> mapNames = Sets.newConcurrentHashSet();
89         return CompletableFuture.allOf(partitions
90                 .stream()
91                 .map(db -> db.maps().thenApply(mapNames::addAll))
92                 .toArray(CompletableFuture[]::new))
93             .thenApply(v -> mapNames);
94     }
95
96     @Override
97     public CompletableFuture<Map<String, Long>> counters() {
98         checkState(isOpen.get(), DB_NOT_OPEN);
99         Map<String, Long> counters = Maps.newConcurrentMap();
100         return CompletableFuture.allOf(partitions
101                 .stream()
102                 .map(db -> db.counters()
103                              .thenApply(m -> {
104                                  counters.putAll(m);
105                                  return null;
106                              }))
107                 .toArray(CompletableFuture[]::new))
108             .thenApply(v -> counters);
109     }
110
111     @Override
112     public CompletableFuture<Integer> mapSize(String mapName) {
113         checkState(isOpen.get(), DB_NOT_OPEN);
114         AtomicInteger totalSize = new AtomicInteger(0);
115         return CompletableFuture.allOf(partitions
116                     .stream()
117                     .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet))
118                     .toArray(CompletableFuture[]::new))
119                 .thenApply(v -> totalSize.get());
120     }
121
122     @Override
123     public CompletableFuture<Boolean> mapIsEmpty(String mapName) {
124         checkState(isOpen.get(), DB_NOT_OPEN);
125         return mapSize(mapName).thenApply(size -> size == 0);
126     }
127
128     @Override
129     public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) {
130         checkState(isOpen.get(), DB_NOT_OPEN);
131         return partitioner.getPartition(mapName, key).mapContainsKey(mapName, key);
132     }
133
134     @Override
135     public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) {
136         checkState(isOpen.get(), DB_NOT_OPEN);
137         AtomicBoolean containsValue = new AtomicBoolean(false);
138         return CompletableFuture.allOf(partitions
139                     .stream()
140                     .map(p -> p.mapContainsValue(mapName, value)
141                                .thenApply(v -> containsValue.compareAndSet(false, v)))
142                     .toArray(CompletableFuture[]::new))
143                 .thenApply(v -> containsValue.get());
144     }
145
146     @Override
147     public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) {
148         checkState(isOpen.get(), DB_NOT_OPEN);
149         return partitioner.getPartition(mapName, key).mapGet(mapName, key);
150     }
151
152     @Override
153     public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(
154             String mapName, String key, Match<byte[]> valueMatch,
155             Match<Long> versionMatch, byte[] value) {
156         return partitioner.getPartition(mapName, key).mapUpdate(mapName, key, valueMatch, versionMatch, value);
157
158     }
159
160     @Override
161     public CompletableFuture<Result<Void>> mapClear(String mapName) {
162         AtomicBoolean isLocked = new AtomicBoolean(false);
163         checkState(isOpen.get(), DB_NOT_OPEN);
164         return CompletableFuture.allOf(partitions
165                     .stream()
166                     .map(p -> p.mapClear(mapName)
167                             .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status())))
168                     .toArray(CompletableFuture[]::new))
169                 .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null));
170     }
171
172     @Override
173     public CompletableFuture<Set<String>> mapKeySet(String mapName) {
174         checkState(isOpen.get(), DB_NOT_OPEN);
175         Set<String> keySet = Sets.newConcurrentHashSet();
176         return CompletableFuture.allOf(partitions
177                     .stream()
178                     .map(p -> p.mapKeySet(mapName).thenApply(keySet::addAll))
179                     .toArray(CompletableFuture[]::new))
180                 .thenApply(v -> keySet);
181     }
182
183     @Override
184     public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) {
185         checkState(isOpen.get(), DB_NOT_OPEN);
186         List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
187         return CompletableFuture.allOf(partitions
188                     .stream()
189                     .map(p -> p.mapValues(mapName).thenApply(values::addAll))
190                     .toArray(CompletableFuture[]::new))
191                 .thenApply(v -> values);
192     }
193
194     @Override
195     public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) {
196         checkState(isOpen.get(), DB_NOT_OPEN);
197         Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
198         return CompletableFuture.allOf(partitions
199                     .stream()
200                     .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll))
201                     .toArray(CompletableFuture[]::new))
202                 .thenApply(v -> entrySet);
203     }
204
205     @Override
206     public CompletableFuture<Long> counterGet(String counterName) {
207         checkState(isOpen.get(), DB_NOT_OPEN);
208         return partitioner.getPartition(counterName, counterName).counterGet(counterName);
209     }
210
211     @Override
212     public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
213         checkState(isOpen.get(), DB_NOT_OPEN);
214         return partitioner.getPartition(counterName, counterName).counterAddAndGet(counterName, delta);
215     }
216
217     @Override
218     public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
219         checkState(isOpen.get(), DB_NOT_OPEN);
220         return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta);
221     }
222
223
224     @Override
225     public CompletableFuture<Long> queueSize(String queueName) {
226         checkState(isOpen.get(), DB_NOT_OPEN);
227         return partitioner.getPartition(queueName, queueName).queueSize(queueName);
228     }
229
230     @Override
231     public CompletableFuture<Void> queuePush(String queueName, byte[] entry) {
232         checkState(isOpen.get(), DB_NOT_OPEN);
233         return partitioner.getPartition(queueName, queueName).queuePush(queueName, entry);
234     }
235
236     @Override
237     public CompletableFuture<byte[]> queuePop(String queueName) {
238         checkState(isOpen.get(), DB_NOT_OPEN);
239         return partitioner.getPartition(queueName, queueName).queuePop(queueName);
240     }
241
242     @Override
243     public CompletableFuture<byte[]> queuePeek(String queueName) {
244         checkState(isOpen.get(), DB_NOT_OPEN);
245         return partitioner.getPartition(queueName, queueName).queuePeek(queueName);
246     }
247
248     @Override
249     public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
250         Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
251         if (subTransactions.isEmpty()) {
252             return CompletableFuture.completedFuture(CommitResponse.success(ImmutableList.of()));
253         } else if (subTransactions.size() == 1) {
254             Entry<Database, Transaction> entry =
255                     subTransactions.entrySet().iterator().next();
256             return entry.getKey().prepareAndCommit(entry.getValue());
257         } else {
258             if (transactionManager == null) {
259                 throw new IllegalStateException("TransactionManager is not initialized");
260             }
261             return transactionManager.execute(transaction);
262         }
263     }
264
265     @Override
266     public CompletableFuture<Boolean> prepare(Transaction transaction) {
267         Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
268         AtomicBoolean status = new AtomicBoolean(true);
269         return CompletableFuture.allOf(subTransactions.entrySet()
270                 .stream()
271                 .map(entry -> entry
272                         .getKey()
273                         .prepare(entry.getValue())
274                         .thenApply(v -> status.compareAndSet(true, v)))
275                 .toArray(CompletableFuture[]::new))
276             .thenApply(v -> status.get());
277     }
278
279     @Override
280     public CompletableFuture<CommitResponse> commit(Transaction transaction) {
281         Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
282         AtomicBoolean success = new AtomicBoolean(true);
283         List<UpdateResult<String, byte[]>> allUpdates = Lists.newArrayList();
284         return CompletableFuture.allOf(subTransactions.entrySet()
285                                    .stream()
286                                    .map(entry -> entry.getKey().commit(entry.getValue())
287                                                            .thenAccept(response -> {
288                                                                success.set(success.get() && response.success());
289                                                                if (success.get()) {
290                                                                    allUpdates.addAll(response.updates());
291                                                                }
292                                                            }))
293                                    .toArray(CompletableFuture[]::new))
294                                .thenApply(v -> success.get() ?
295                                        CommitResponse.success(allUpdates) : CommitResponse.failure());
296     }
297
298     @Override
299     public CompletableFuture<Boolean> rollback(Transaction transaction) {
300         Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
301         return CompletableFuture.allOf(subTransactions.entrySet()
302                 .stream()
303                 .map(entry -> entry.getKey().rollback(entry.getValue()))
304                 .toArray(CompletableFuture[]::new))
305             .thenApply(v -> true);
306     }
307
308     @Override
309     public CompletableFuture<Database> open() {
310         return CompletableFuture.allOf(partitions
311                     .stream()
312                     .map(Database::open)
313                     .toArray(CompletableFuture[]::new))
314                 .thenApply(v -> {
315                     isOpen.set(true);
316                     return this;
317                 });
318     }
319
320     @Override
321     public CompletableFuture<Void> close() {
322         checkState(isOpen.get(), DB_NOT_OPEN);
323         return CompletableFuture.allOf(partitions
324                 .stream()
325                 .map(database -> database.close())
326                 .toArray(CompletableFuture[]::new));
327     }
328
329     @Override
330     public boolean isClosed() {
331         return !isOpen.get();
332     }
333
334     @Override
335     public String name() {
336         return name;
337     }
338
339     @Override
340     public Cluster cluster() {
341         throw new UnsupportedOperationException();
342     }
343
344     @Override
345     public Database addStartupTask(Task<CompletableFuture<Void>> task) {
346         throw new UnsupportedOperationException();
347     }
348
349     @Override
350     public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
351         throw new UnsupportedOperationException();
352     }
353
354     @Override
355     public ResourceState state() {
356         throw new UnsupportedOperationException();
357     }
358
359     private Map<Database, Transaction> createSubTransactions(
360             Transaction transaction) {
361         Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
362         for (DatabaseUpdate update : transaction.updates()) {
363             Database partition = partitioner.getPartition(update.mapName(), update.key());
364             List<DatabaseUpdate> partitionUpdates =
365                     perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
366             partitionUpdates.add(update);
367         }
368         Map<Database, Transaction> subTransactions = Maps.newHashMap();
369         perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
370         return subTransactions;
371     }
372
373     protected void setTransactionManager(TransactionManager transactionManager) {
374         this.transactionManager = transactionManager;
375     }
376
377     @Override
378     public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
379         partitions.forEach(p -> p.registerConsumer(consumer));
380     }
381
382     @Override
383     public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
384         partitions.forEach(p -> p.unregisterConsumer(consumer));
385     }
386 }