2 * Copyright 2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onosproject.store.consistent.impl;
19 import java.util.Arrays;
20 import java.util.Collection;
21 import java.util.LinkedList;
23 import java.util.Map.Entry;
24 import java.util.Queue;
25 import java.util.concurrent.atomic.AtomicLong;
26 import java.util.stream.Collectors;
29 import org.onosproject.store.service.DatabaseUpdate;
30 import org.onosproject.store.service.Transaction;
31 import org.onosproject.store.service.Versioned;
32 import com.google.common.base.Objects;
33 import com.google.common.collect.ImmutableList;
34 import com.google.common.collect.ImmutableSet;
35 import com.google.common.collect.Lists;
36 import com.google.common.collect.Maps;
38 import net.kuujo.copycat.state.Initializer;
39 import net.kuujo.copycat.state.StateContext;
42 * Default database state.
44 public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
45 private Long nextVersion;
46 private Map<String, AtomicLong> counters;
47 private Map<String, Map<String, Versioned<byte[]>>> maps;
48 private Map<String, Queue<byte[]>> queues;
51 * This locks map has a structure similar to the "tables" map above and
52 * holds all the provisional updates made during a transaction's prepare phase.
53 * The entry value is represented as the tuple: (transactionId, newValue)
54 * If newValue == null that signifies this update is attempting to
55 * delete the existing value.
56 * This map also serves as a lock on the entries that are being updated.
57 * The presence of a entry in this map indicates that element is
58 * participating in a transaction and is currently locked for updates.
60 private Map<String, Map<String, Update>> locks;
64 public void init(StateContext<DatabaseState<String, byte[]>> context) {
65 counters = context.get("counters");
66 if (counters == null) {
67 counters = Maps.newConcurrentMap();
68 context.put("counters", counters);
70 maps = context.get("maps");
72 maps = Maps.newConcurrentMap();
73 context.put("maps", maps);
75 locks = context.get("locks");
77 locks = Maps.newConcurrentMap();
78 context.put("locks", locks);
80 queues = context.get("queues");
82 queues = Maps.newConcurrentMap();
83 context.put("queues", queues);
85 nextVersion = context.get("nextVersion");
86 if (nextVersion == null) {
88 context.put("nextVersion", nextVersion);
93 public Set<String> maps() {
94 return ImmutableSet.copyOf(maps.keySet());
98 public Map<String, Long> counters() {
99 Map<String, Long> counterMap = Maps.newHashMap();
100 counters.forEach((k, v) -> counterMap.put(k, v.get()));
105 public int mapSize(String mapName) {
106 return getMap(mapName).size();
110 public boolean mapIsEmpty(String mapName) {
111 return getMap(mapName).isEmpty();
115 public boolean mapContainsKey(String mapName, String key) {
116 return getMap(mapName).containsKey(key);
120 public boolean mapContainsValue(String mapName, byte[] value) {
121 return getMap(mapName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value));
125 public Versioned<byte[]> mapGet(String mapName, String key) {
126 return getMap(mapName).get(key);
131 public Result<UpdateResult<String, byte[]>> mapUpdate(
134 Match<byte[]> valueMatch,
135 Match<Long> versionMatch,
137 if (isLockedForUpdates(mapName, key)) {
138 return Result.locked();
140 Versioned<byte[]> currentValue = getMap(mapName).get(key);
141 if (!valueMatch.matches(currentValue == null ? null : currentValue.value()) ||
142 !versionMatch.matches(currentValue == null ? null : currentValue.version())) {
143 return Result.ok(new UpdateResult<>(false, mapName, key, currentValue, currentValue));
146 if (currentValue == null) {
147 return Result.ok(new UpdateResult<>(false, mapName, key, null, null));
149 getMap(mapName).remove(key);
150 return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, null));
153 Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
154 getMap(mapName).put(key, newValue);
155 return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, newValue));
160 public Result<Void> mapClear(String mapName) {
161 if (areTransactionsInProgress(mapName)) {
162 return Result.locked();
164 getMap(mapName).clear();
165 return Result.ok(null);
169 public Set<String> mapKeySet(String mapName) {
170 return ImmutableSet.copyOf(getMap(mapName).keySet());
174 public Collection<Versioned<byte[]>> mapValues(String mapName) {
175 return ImmutableList.copyOf(getMap(mapName).values());
179 public Set<Entry<String, Versioned<byte[]>>> mapEntrySet(String mapName) {
180 return ImmutableSet.copyOf(getMap(mapName)
183 .map(entry -> Maps.immutableEntry(entry.getKey(), entry.getValue()))
184 .collect(Collectors.toSet()));
188 public Long counterAddAndGet(String counterName, long delta) {
189 return getCounter(counterName).addAndGet(delta);
193 public Long counterGetAndAdd(String counterName, long delta) {
194 return getCounter(counterName).getAndAdd(delta);
198 public Long counterGet(String counterName) {
199 return getCounter(counterName).get();
203 public Long queueSize(String queueName) {
204 return Long.valueOf(getQueue(queueName).size());
208 public byte[] queuePeek(String queueName) {
209 return getQueue(queueName).peek();
213 public byte[] queuePop(String queueName) {
214 return getQueue(queueName).poll();
218 public void queuePush(String queueName, byte[] entry) {
219 getQueue(queueName).offer(entry);
223 public CommitResponse prepareAndCommit(Transaction transaction) {
224 if (prepare(transaction)) {
225 return commit(transaction);
227 return CommitResponse.failure();
231 public boolean prepare(Transaction transaction) {
232 if (transaction.updates().stream().anyMatch(update ->
233 isLockedByAnotherTransaction(update.mapName(),
235 transaction.id()))) {
239 if (transaction.updates().stream().allMatch(this::isUpdatePossible)) {
240 transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id()));
247 public CommitResponse commit(Transaction transaction) {
248 return CommitResponse.success(Lists.transform(transaction.updates(),
249 update -> commitProvisionalUpdate(update, transaction.id())));
253 public boolean rollback(Transaction transaction) {
254 transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id()));
258 private Map<String, Versioned<byte[]>> getMap(String mapName) {
259 return maps.computeIfAbsent(mapName, name -> Maps.newConcurrentMap());
262 private Map<String, Update> getLockMap(String mapName) {
263 return locks.computeIfAbsent(mapName, name -> Maps.newConcurrentMap());
266 private AtomicLong getCounter(String counterName) {
267 return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
270 private Queue<byte[]> getQueue(String queueName) {
271 return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
274 private boolean isUpdatePossible(DatabaseUpdate update) {
275 Versioned<byte[]> existingEntry = mapGet(update.mapName(), update.key());
276 switch (update.type()) {
281 return existingEntry == null;
282 case PUT_IF_VERSION_MATCH:
283 return existingEntry != null && existingEntry.version() == update.currentVersion();
284 case PUT_IF_VALUE_MATCH:
285 return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue());
286 case REMOVE_IF_VERSION_MATCH:
287 return existingEntry == null || existingEntry.version() == update.currentVersion();
288 case REMOVE_IF_VALUE_MATCH:
289 return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue());
291 throw new IllegalStateException("Unsupported type: " + update.type());
295 private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
296 Map<String, Update> lockMap = getLockMap(update.mapName());
297 switch (update.type()) {
300 case PUT_IF_VERSION_MATCH:
301 case PUT_IF_VALUE_MATCH:
302 lockMap.put(update.key(), new Update(transactionId, update.value()));
305 case REMOVE_IF_VERSION_MATCH:
306 case REMOVE_IF_VALUE_MATCH:
307 lockMap.put(update.key(), new Update(transactionId, null));
310 throw new IllegalStateException("Unsupported type: " + update.type());
314 private UpdateResult<String, byte[]> commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
315 String mapName = update.mapName();
316 String key = update.key();
317 Update provisionalUpdate = getLockMap(mapName).get(key);
318 if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
319 getLockMap(mapName).remove(key);
321 throw new IllegalStateException("Invalid transaction Id");
323 return mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value()).value();
326 private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
327 String mapName = update.mapName();
328 String key = update.key();
329 Update provisionalUpdate = getLockMap(mapName).get(key);
330 if (provisionalUpdate == null) {
333 if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
334 getLockMap(mapName).remove(key);
338 private boolean isLockedByAnotherTransaction(String mapName, String key, long transactionId) {
339 Update update = getLockMap(mapName).get(key);
340 return update != null && !Objects.equal(transactionId, update.transactionId());
343 private boolean isLockedForUpdates(String mapName, String key) {
344 return getLockMap(mapName).containsKey(key);
347 private boolean areTransactionsInProgress(String mapName) {
348 return !getLockMap(mapName).isEmpty();
351 private class Update {
352 private final long transactionId;
353 private final byte[] value;
355 public Update(long txId, byte[] value) {
356 this.transactionId = txId;
360 public long transactionId() {
361 return this.transactionId;
364 public byte[] value() {