9d3505bd22bfbaa61d164ff0f90bf46118367947
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onosproject.store.consistent.impl;
18
19 import java.util.Arrays;
20 import java.util.Collection;
21 import java.util.LinkedList;
22 import java.util.Map;
23 import java.util.Map.Entry;
24 import java.util.Queue;
25 import java.util.concurrent.atomic.AtomicLong;
26 import java.util.stream.Collectors;
27 import java.util.Set;
28
29 import org.onosproject.store.service.DatabaseUpdate;
30 import org.onosproject.store.service.Transaction;
31 import org.onosproject.store.service.Versioned;
32 import com.google.common.base.Objects;
33 import com.google.common.collect.ImmutableList;
34 import com.google.common.collect.ImmutableSet;
35 import com.google.common.collect.Lists;
36 import com.google.common.collect.Maps;
37
38 import net.kuujo.copycat.state.Initializer;
39 import net.kuujo.copycat.state.StateContext;
40
41 /**
42  * Default database state.
43  */
44 public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
45     private Long nextVersion;
46     private Map<String, AtomicLong> counters;
47     private Map<String, Map<String, Versioned<byte[]>>> maps;
48     private Map<String, Queue<byte[]>> queues;
49
50     /**
51      * This locks map has a structure similar to the "tables" map above and
52      * holds all the provisional updates made during a transaction's prepare phase.
53      * The entry value is represented as the tuple: (transactionId, newValue)
54      * If newValue == null that signifies this update is attempting to
55      * delete the existing value.
56      * This map also serves as a lock on the entries that are being updated.
57      * The presence of a entry in this map indicates that element is
58      * participating in a transaction and is currently locked for updates.
59      */
60     private Map<String, Map<String, Update>> locks;
61
62     @Initializer
63     @Override
64     public void init(StateContext<DatabaseState<String, byte[]>> context) {
65         counters = context.get("counters");
66         if (counters == null) {
67             counters = Maps.newConcurrentMap();
68             context.put("counters", counters);
69         }
70         maps = context.get("maps");
71         if (maps == null) {
72             maps = Maps.newConcurrentMap();
73             context.put("maps", maps);
74         }
75         locks = context.get("locks");
76         if (locks == null) {
77             locks = Maps.newConcurrentMap();
78             context.put("locks", locks);
79         }
80         queues = context.get("queues");
81         if (queues == null) {
82             queues = Maps.newConcurrentMap();
83             context.put("queues", queues);
84         }
85         nextVersion = context.get("nextVersion");
86         if (nextVersion == null) {
87             nextVersion = new Long(0);
88             context.put("nextVersion", nextVersion);
89         }
90     }
91
92     @Override
93     public Set<String> maps() {
94         return ImmutableSet.copyOf(maps.keySet());
95     }
96
97     @Override
98     public Map<String, Long> counters() {
99         Map<String, Long> counterMap = Maps.newHashMap();
100         counters.forEach((k, v) -> counterMap.put(k, v.get()));
101         return counterMap;
102     }
103
104     @Override
105     public int mapSize(String mapName) {
106       return getMap(mapName).size();
107     }
108
109     @Override
110     public boolean mapIsEmpty(String mapName) {
111         return getMap(mapName).isEmpty();
112     }
113
114     @Override
115     public boolean mapContainsKey(String mapName, String key) {
116         return getMap(mapName).containsKey(key);
117     }
118
119     @Override
120     public boolean mapContainsValue(String mapName, byte[] value) {
121         return getMap(mapName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value));
122     }
123
124     @Override
125     public Versioned<byte[]> mapGet(String mapName, String key) {
126         return getMap(mapName).get(key);
127     }
128
129
130     @Override
131     public Result<UpdateResult<String, byte[]>> mapUpdate(
132             String mapName,
133             String key,
134             Match<byte[]> valueMatch,
135             Match<Long> versionMatch,
136             byte[] value) {
137         if (isLockedForUpdates(mapName, key)) {
138             return Result.locked();
139         }
140         Versioned<byte[]> currentValue = getMap(mapName).get(key);
141         if (!valueMatch.matches(currentValue == null ? null : currentValue.value()) ||
142                 !versionMatch.matches(currentValue == null ? null : currentValue.version())) {
143             return Result.ok(new UpdateResult<>(false, mapName, key, currentValue, currentValue));
144         } else {
145             if (value == null) {
146                 if (currentValue == null) {
147                     return Result.ok(new UpdateResult<>(false, mapName, key, null, null));
148                 } else {
149                     getMap(mapName).remove(key);
150                     return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, null));
151                 }
152             }
153             Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
154             getMap(mapName).put(key, newValue);
155             return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, newValue));
156         }
157     }
158
159     @Override
160     public Result<Void> mapClear(String mapName) {
161         if (areTransactionsInProgress(mapName)) {
162             return Result.locked();
163         }
164         getMap(mapName).clear();
165         return Result.ok(null);
166     }
167
168     @Override
169     public Set<String> mapKeySet(String mapName) {
170         return ImmutableSet.copyOf(getMap(mapName).keySet());
171     }
172
173     @Override
174     public Collection<Versioned<byte[]>> mapValues(String mapName) {
175         return ImmutableList.copyOf(getMap(mapName).values());
176     }
177
178     @Override
179     public Set<Entry<String, Versioned<byte[]>>> mapEntrySet(String mapName) {
180         return ImmutableSet.copyOf(getMap(mapName)
181                 .entrySet()
182                 .stream()
183                 .map(entry -> Maps.immutableEntry(entry.getKey(), entry.getValue()))
184                 .collect(Collectors.toSet()));
185     }
186
187     @Override
188     public Long counterAddAndGet(String counterName, long delta) {
189         return getCounter(counterName).addAndGet(delta);
190     }
191
192     @Override
193     public Long counterGetAndAdd(String counterName, long delta) {
194         return getCounter(counterName).getAndAdd(delta);
195     }
196
197     @Override
198     public Long counterGet(String counterName) {
199         return getCounter(counterName).get();
200     }
201
202     @Override
203     public Long queueSize(String queueName) {
204         return Long.valueOf(getQueue(queueName).size());
205     }
206
207     @Override
208     public byte[] queuePeek(String queueName) {
209         return getQueue(queueName).peek();
210     }
211
212     @Override
213     public byte[] queuePop(String queueName) {
214         return getQueue(queueName).poll();
215     }
216
217     @Override
218     public void queuePush(String queueName, byte[] entry) {
219         getQueue(queueName).offer(entry);
220     }
221
222     @Override
223     public CommitResponse prepareAndCommit(Transaction transaction) {
224         if (prepare(transaction)) {
225             return commit(transaction);
226         }
227         return CommitResponse.failure();
228     }
229
230     @Override
231     public boolean prepare(Transaction transaction) {
232         if (transaction.updates().stream().anyMatch(update ->
233                     isLockedByAnotherTransaction(update.mapName(),
234                                                  update.key(),
235                                                  transaction.id()))) {
236             return false;
237         }
238
239         if (transaction.updates().stream().allMatch(this::isUpdatePossible)) {
240             transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id()));
241             return true;
242         }
243         return false;
244     }
245
246     @Override
247     public CommitResponse commit(Transaction transaction) {
248         return CommitResponse.success(Lists.transform(transaction.updates(),
249                                                       update -> commitProvisionalUpdate(update, transaction.id())));
250     }
251
252     @Override
253     public boolean rollback(Transaction transaction) {
254         transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id()));
255         return true;
256     }
257
258     private Map<String, Versioned<byte[]>> getMap(String mapName) {
259         return maps.computeIfAbsent(mapName, name -> Maps.newConcurrentMap());
260     }
261
262     private Map<String, Update> getLockMap(String mapName) {
263         return locks.computeIfAbsent(mapName, name -> Maps.newConcurrentMap());
264     }
265
266     private AtomicLong getCounter(String counterName) {
267         return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
268     }
269
270     private Queue<byte[]> getQueue(String queueName) {
271         return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
272     }
273
274     private boolean isUpdatePossible(DatabaseUpdate update) {
275         Versioned<byte[]> existingEntry = mapGet(update.mapName(), update.key());
276         switch (update.type()) {
277         case PUT:
278         case REMOVE:
279             return true;
280         case PUT_IF_ABSENT:
281             return existingEntry == null;
282         case PUT_IF_VERSION_MATCH:
283             return existingEntry != null && existingEntry.version() == update.currentVersion();
284         case PUT_IF_VALUE_MATCH:
285             return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue());
286         case REMOVE_IF_VERSION_MATCH:
287             return existingEntry == null || existingEntry.version() == update.currentVersion();
288         case REMOVE_IF_VALUE_MATCH:
289             return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue());
290         default:
291             throw new IllegalStateException("Unsupported type: " + update.type());
292         }
293     }
294
295     private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
296         Map<String, Update> lockMap = getLockMap(update.mapName());
297         switch (update.type()) {
298         case PUT:
299         case PUT_IF_ABSENT:
300         case PUT_IF_VERSION_MATCH:
301         case PUT_IF_VALUE_MATCH:
302             lockMap.put(update.key(), new Update(transactionId, update.value()));
303             break;
304         case REMOVE:
305         case REMOVE_IF_VERSION_MATCH:
306         case REMOVE_IF_VALUE_MATCH:
307             lockMap.put(update.key(), new Update(transactionId, null));
308             break;
309         default:
310             throw new IllegalStateException("Unsupported type: " + update.type());
311         }
312     }
313
314     private UpdateResult<String, byte[]> commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
315         String mapName = update.mapName();
316         String key = update.key();
317         Update provisionalUpdate = getLockMap(mapName).get(key);
318         if (Objects.equal(transactionId, provisionalUpdate.transactionId()))  {
319             getLockMap(mapName).remove(key);
320         } else {
321             throw new IllegalStateException("Invalid transaction Id");
322         }
323         return mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value()).value();
324     }
325
326     private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
327         String mapName = update.mapName();
328         String key = update.key();
329         Update provisionalUpdate = getLockMap(mapName).get(key);
330         if (provisionalUpdate == null) {
331             return;
332         }
333         if (Objects.equal(transactionId, provisionalUpdate.transactionId()))  {
334             getLockMap(mapName).remove(key);
335         }
336     }
337
338     private boolean isLockedByAnotherTransaction(String mapName, String key, long transactionId) {
339         Update update = getLockMap(mapName).get(key);
340         return update != null && !Objects.equal(transactionId, update.transactionId());
341     }
342
343     private boolean isLockedForUpdates(String mapName, String key) {
344         return getLockMap(mapName).containsKey(key);
345     }
346
347     private boolean areTransactionsInProgress(String mapName) {
348         return !getLockMap(mapName).isEmpty();
349     }
350
351     private class Update {
352         private final long transactionId;
353         private final byte[] value;
354
355         public Update(long txId, byte[] value) {
356             this.transactionId = txId;
357             this.value = value;
358         }
359
360         public long transactionId() {
361             return this.transactionId;
362         }
363
364         public byte[] value() {
365             return this.value;
366         }
367     }
368 }