2 * Copyright 2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onosproject.store.consistent.impl;
19 import net.kuujo.copycat.state.StateMachine;
20 import net.kuujo.copycat.resource.internal.AbstractResource;
21 import net.kuujo.copycat.resource.internal.ResourceManager;
22 import net.kuujo.copycat.state.internal.DefaultStateMachine;
23 import net.kuujo.copycat.util.concurrent.Futures;
24 import net.kuujo.copycat.util.function.TriConsumer;
26 import java.util.Collection;
29 import java.util.concurrent.CompletableFuture;
30 import java.util.function.Consumer;
31 import java.util.function.Supplier;
33 import org.onosproject.store.service.Transaction;
34 import org.onosproject.store.service.Versioned;
36 import com.google.common.collect.Sets;
41 public class DefaultDatabase extends AbstractResource<Database> implements Database {
42 private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
43 private DatabaseProxy<String, byte[]> proxy;
44 private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet();
45 private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher();
47 @SuppressWarnings({ "unchecked", "rawtypes" })
48 public DefaultDatabase(ResourceManager context) {
50 this.stateMachine = new DefaultStateMachine(context,
52 DefaultDatabaseState.class,
53 DefaultDatabase.class.getClassLoader());
54 this.stateMachine.addStartupTask(() -> {
55 stateMachine.registerWatcher(watcher);
56 return CompletableFuture.completedFuture(null);
58 this.stateMachine.addShutdownTask(() -> {
59 stateMachine.unregisterWatcher(watcher);
60 return CompletableFuture.completedFuture(null);
65 * If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to
66 * return the completed future result.
68 * @param supplier The supplier to call if the database is open.
69 * @param <T> The future result type.
70 * @return A completable future that if this database is closed is immediately failed.
72 protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
74 return Futures.exceptionalFuture(new IllegalStateException("Database closed"));
76 return supplier.get();
80 public CompletableFuture<Set<String>> maps() {
81 return checkOpen(() -> proxy.maps());
85 public CompletableFuture<Map<String, Long>> counters() {
86 return checkOpen(() -> proxy.counters());
90 public CompletableFuture<Integer> mapSize(String mapName) {
91 return checkOpen(() -> proxy.mapSize(mapName));
95 public CompletableFuture<Boolean> mapIsEmpty(String mapName) {
96 return checkOpen(() -> proxy.mapIsEmpty(mapName));
100 public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) {
101 return checkOpen(() -> proxy.mapContainsKey(mapName, key));
105 public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) {
106 return checkOpen(() -> proxy.mapContainsValue(mapName, value));
110 public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) {
111 return checkOpen(() -> proxy.mapGet(mapName, key));
115 public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(
116 String mapName, String key, Match<byte[]> valueMatch, Match<Long> versionMatch, byte[] value) {
117 return checkOpen(() -> proxy.mapUpdate(mapName, key, valueMatch, versionMatch, value));
121 public CompletableFuture<Result<Void>> mapClear(String mapName) {
122 return checkOpen(() -> proxy.mapClear(mapName));
126 public CompletableFuture<Set<String>> mapKeySet(String mapName) {
127 return checkOpen(() -> proxy.mapKeySet(mapName));
131 public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) {
132 return checkOpen(() -> proxy.mapValues(mapName));
136 public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) {
137 return checkOpen(() -> proxy.mapEntrySet(mapName));
141 public CompletableFuture<Long> counterGet(String counterName) {
142 return checkOpen(() -> proxy.counterGet(counterName));
146 public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
147 return checkOpen(() -> proxy.counterAddAndGet(counterName, delta));
151 public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
152 return checkOpen(() -> proxy.counterGetAndAdd(counterName, delta));
156 public CompletableFuture<Long> queueSize(String queueName) {
157 return checkOpen(() -> proxy.queueSize(queueName));
161 public CompletableFuture<Void> queuePush(String queueName, byte[] entry) {
162 return checkOpen(() -> proxy.queuePush(queueName, entry));
166 public CompletableFuture<byte[]> queuePop(String queueName) {
167 return checkOpen(() -> proxy.queuePop(queueName));
171 public CompletableFuture<byte[]> queuePeek(String queueName) {
172 return checkOpen(() -> proxy.queuePeek(queueName));
176 public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
177 return checkOpen(() -> proxy.prepareAndCommit(transaction));
181 public CompletableFuture<Boolean> prepare(Transaction transaction) {
182 return checkOpen(() -> proxy.prepare(transaction));
186 public CompletableFuture<CommitResponse> commit(Transaction transaction) {
187 return checkOpen(() -> proxy.commit(transaction));
191 public CompletableFuture<Boolean> rollback(Transaction transaction) {
192 return checkOpen(() -> proxy.rollback(transaction));
196 @SuppressWarnings("unchecked")
197 public synchronized CompletableFuture<Database> open() {
198 return runStartupTasks()
199 .thenCompose(v -> stateMachine.open())
201 this.proxy = stateMachine.createProxy(DatabaseProxy.class, this.getClass().getClassLoader());
203 .thenApply(v -> null);
207 public synchronized CompletableFuture<Void> close() {
209 return stateMachine.close()
210 .thenCompose(v -> runShutdownTasks());
214 public int hashCode() {
215 return name().hashCode();
219 public boolean equals(Object other) {
220 if (other instanceof Database) {
221 return name().equals(((Database) other).name());
227 public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
228 consumers.add(consumer);
232 public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
233 consumers.remove(consumer);
236 private class InternalStateMachineWatcher implements TriConsumer<String, Object, Object> {
238 public void accept(String name, Object input, Object output) {
239 StateMachineUpdate update = new StateMachineUpdate(name, input, output);
240 consumers.forEach(consumer -> consumer.accept(update));