4d9776ee4a697fa52b92f130ee03c8cb6365fead
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onosproject.store.consistent.impl;
18
19 import net.kuujo.copycat.state.StateMachine;
20 import net.kuujo.copycat.resource.internal.AbstractResource;
21 import net.kuujo.copycat.resource.internal.ResourceManager;
22 import net.kuujo.copycat.state.internal.DefaultStateMachine;
23 import net.kuujo.copycat.util.concurrent.Futures;
24 import net.kuujo.copycat.util.function.TriConsumer;
25
26 import java.util.Collection;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.concurrent.CompletableFuture;
30 import java.util.function.Consumer;
31 import java.util.function.Supplier;
32
33 import org.onosproject.store.service.Transaction;
34 import org.onosproject.store.service.Versioned;
35
36 import com.google.common.collect.Sets;
37
38 /**
39  * Default database.
40  */
41 public class DefaultDatabase extends AbstractResource<Database> implements Database {
42     private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
43     private DatabaseProxy<String, byte[]> proxy;
44     private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet();
45     private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher();
46
47     @SuppressWarnings({ "unchecked", "rawtypes" })
48     public DefaultDatabase(ResourceManager context) {
49         super(context);
50         this.stateMachine = new DefaultStateMachine(context,
51                 DatabaseState.class,
52                 DefaultDatabaseState.class,
53                 DefaultDatabase.class.getClassLoader());
54         this.stateMachine.addStartupTask(() -> {
55             stateMachine.registerWatcher(watcher);
56             return CompletableFuture.completedFuture(null);
57         });
58         this.stateMachine.addShutdownTask(() -> {
59             stateMachine.unregisterWatcher(watcher);
60             return CompletableFuture.completedFuture(null);
61         });
62     }
63
64     /**
65      * If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to
66      * return the completed future result.
67      *
68      * @param supplier The supplier to call if the database is open.
69      * @param <T> The future result type.
70      * @return A completable future that if this database is closed is immediately failed.
71      */
72     protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
73         if (proxy == null) {
74             return Futures.exceptionalFuture(new IllegalStateException("Database closed"));
75         }
76         return supplier.get();
77     }
78
79     @Override
80     public CompletableFuture<Set<String>> maps() {
81         return checkOpen(() -> proxy.maps());
82     }
83
84     @Override
85     public CompletableFuture<Map<String, Long>> counters() {
86         return checkOpen(() -> proxy.counters());
87     }
88
89     @Override
90     public CompletableFuture<Integer> mapSize(String mapName) {
91         return checkOpen(() -> proxy.mapSize(mapName));
92     }
93
94     @Override
95     public CompletableFuture<Boolean> mapIsEmpty(String mapName) {
96         return checkOpen(() -> proxy.mapIsEmpty(mapName));
97     }
98
99     @Override
100     public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) {
101         return checkOpen(() -> proxy.mapContainsKey(mapName, key));
102     }
103
104     @Override
105     public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) {
106         return checkOpen(() -> proxy.mapContainsValue(mapName, value));
107     }
108
109     @Override
110     public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) {
111         return checkOpen(() -> proxy.mapGet(mapName, key));
112     }
113
114     @Override
115     public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(
116             String mapName, String key, Match<byte[]> valueMatch, Match<Long> versionMatch, byte[] value) {
117         return checkOpen(() -> proxy.mapUpdate(mapName, key, valueMatch, versionMatch, value));
118     }
119
120     @Override
121     public CompletableFuture<Result<Void>> mapClear(String mapName) {
122         return checkOpen(() -> proxy.mapClear(mapName));
123     }
124
125     @Override
126     public CompletableFuture<Set<String>> mapKeySet(String mapName) {
127         return checkOpen(() -> proxy.mapKeySet(mapName));
128     }
129
130     @Override
131     public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) {
132         return checkOpen(() -> proxy.mapValues(mapName));
133     }
134
135     @Override
136     public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) {
137         return checkOpen(() -> proxy.mapEntrySet(mapName));
138     }
139
140     @Override
141     public CompletableFuture<Long> counterGet(String counterName) {
142         return checkOpen(() -> proxy.counterGet(counterName));
143     }
144
145     @Override
146     public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
147         return checkOpen(() -> proxy.counterAddAndGet(counterName, delta));
148     }
149
150     @Override
151     public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
152         return checkOpen(() -> proxy.counterGetAndAdd(counterName, delta));
153     }
154
155     @Override
156     public CompletableFuture<Long> queueSize(String queueName) {
157         return checkOpen(() -> proxy.queueSize(queueName));
158     }
159
160     @Override
161     public CompletableFuture<Void> queuePush(String queueName, byte[] entry) {
162         return checkOpen(() -> proxy.queuePush(queueName, entry));
163     }
164
165     @Override
166     public CompletableFuture<byte[]> queuePop(String queueName) {
167         return checkOpen(() -> proxy.queuePop(queueName));
168     }
169
170     @Override
171     public CompletableFuture<byte[]> queuePeek(String queueName) {
172         return checkOpen(() -> proxy.queuePeek(queueName));
173     }
174
175     @Override
176     public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
177         return checkOpen(() -> proxy.prepareAndCommit(transaction));
178     }
179
180     @Override
181     public CompletableFuture<Boolean> prepare(Transaction transaction) {
182         return checkOpen(() -> proxy.prepare(transaction));
183     }
184
185     @Override
186     public CompletableFuture<CommitResponse> commit(Transaction transaction) {
187         return checkOpen(() -> proxy.commit(transaction));
188     }
189
190     @Override
191     public CompletableFuture<Boolean> rollback(Transaction transaction) {
192         return checkOpen(() -> proxy.rollback(transaction));
193     }
194
195     @Override
196     @SuppressWarnings("unchecked")
197     public synchronized CompletableFuture<Database> open() {
198         return runStartupTasks()
199                 .thenCompose(v -> stateMachine.open())
200                 .thenRun(() -> {
201                     this.proxy = stateMachine.createProxy(DatabaseProxy.class, this.getClass().getClassLoader());
202                 })
203                 .thenApply(v -> null);
204     }
205
206     @Override
207     public synchronized CompletableFuture<Void> close() {
208         proxy = null;
209         return stateMachine.close()
210                 .thenCompose(v -> runShutdownTasks());
211     }
212
213     @Override
214     public int hashCode() {
215         return name().hashCode();
216     }
217
218     @Override
219     public boolean equals(Object other) {
220         if (other instanceof Database) {
221             return name().equals(((Database) other).name());
222         }
223         return false;
224     }
225
226     @Override
227     public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
228         consumers.add(consumer);
229     }
230
231     @Override
232     public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
233         consumers.remove(consumer);
234     }
235
236     private class InternalStateMachineWatcher implements TriConsumer<String, Object, Object> {
237         @Override
238         public void accept(String name, Object input, Object output) {
239             StateMachineUpdate update = new StateMachineUpdate(name, input, output);
240             consumers.forEach(consumer -> consumer.accept(update));
241         }
242     }
243 }