2 * Copyright 2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.store.app;
18 import com.google.common.base.Charsets;
19 import com.google.common.collect.ImmutableSet;
20 import com.google.common.collect.Maps;
21 import com.google.common.collect.Multimap;
22 import com.google.common.collect.Sets;
23 import org.apache.felix.scr.annotations.Activate;
24 import org.apache.felix.scr.annotations.Component;
25 import org.apache.felix.scr.annotations.Deactivate;
26 import org.apache.felix.scr.annotations.Reference;
27 import org.apache.felix.scr.annotations.ReferenceCardinality;
28 import org.apache.felix.scr.annotations.Service;
29 import org.onlab.util.KryoNamespace;
30 import org.onosproject.app.ApplicationDescription;
31 import org.onosproject.app.ApplicationEvent;
32 import org.onosproject.app.ApplicationException;
33 import org.onosproject.app.ApplicationState;
34 import org.onosproject.app.ApplicationStore;
35 import org.onosproject.app.ApplicationStoreDelegate;
36 import org.onosproject.cluster.ClusterService;
37 import org.onosproject.cluster.ControllerNode;
38 import org.onosproject.common.app.ApplicationArchive;
39 import org.onosproject.core.Application;
40 import org.onosproject.core.ApplicationId;
41 import org.onosproject.core.ApplicationIdStore;
42 import org.onosproject.core.CoreService;
43 import org.onosproject.core.DefaultApplication;
44 import org.onosproject.security.Permission;
45 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
46 import org.onosproject.store.cluster.messaging.MessageSubject;
47 import org.onosproject.store.serializers.KryoNamespaces;
48 import org.onosproject.store.service.EventuallyConsistentMap;
49 import org.onosproject.store.service.EventuallyConsistentMapEvent;
50 import org.onosproject.store.service.EventuallyConsistentMapListener;
51 import org.onosproject.store.service.LogicalClockService;
52 import org.onosproject.store.service.MultiValuedTimestamp;
53 import org.onosproject.store.service.StorageException;
54 import org.onosproject.store.service.StorageService;
55 import org.slf4j.Logger;
57 import java.io.ByteArrayInputStream;
58 import java.io.IOException;
59 import java.io.InputStream;
61 import java.util.concurrent.CountDownLatch;
62 import java.util.concurrent.ExecutorService;
63 import java.util.concurrent.Executors;
64 import java.util.concurrent.ScheduledExecutorService;
65 import java.util.function.Function;
67 import static com.google.common.collect.Multimaps.newSetMultimap;
68 import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
69 import static com.google.common.io.ByteStreams.toByteArray;
70 import static java.util.concurrent.TimeUnit.MILLISECONDS;
71 import static org.onlab.util.Tools.groupedThreads;
72 import static org.onlab.util.Tools.randomDelay;
73 import static org.onosproject.app.ApplicationEvent.Type.*;
74 import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
75 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
76 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
77 import static org.slf4j.LoggerFactory.getLogger;
80 * Manages inventory of applications in a distributed data store that uses
81 * optimistic replication and gossip based anti-entropy techniques.
83 @Component(immediate = true)
85 public class GossipApplicationStore extends ApplicationArchive
86 implements ApplicationStore {
88 private final Logger log = getLogger(getClass());
90 private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
92 private static final int MAX_LOAD_RETRIES = 5;
93 private static final int RETRY_DELAY_MS = 2_000;
95 private static final int FETCH_TIMEOUT_MS = 10_000;
97 public enum InternalState {
98 INSTALLED, ACTIVATED, DEACTIVATED
101 private ScheduledExecutorService executor;
102 private ExecutorService messageHandlingExecutor;
104 private EventuallyConsistentMap<ApplicationId, Application> apps;
105 private EventuallyConsistentMap<Application, InternalState> states;
106 private EventuallyConsistentMap<Application, Set<Permission>> permissions;
108 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
109 protected ClusterCommunicationService clusterCommunicator;
111 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
112 protected ClusterService clusterService;
114 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
115 protected StorageService storageService;
117 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
118 protected LogicalClockService clockService;
120 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
121 protected ApplicationIdStore idStore;
123 // Multimap to track which apps are required by others apps
124 // app -> { required-by, ... }
125 // Apps explicitly activated will be required by the CORE app
126 private final Multimap<ApplicationId, ApplicationId> requiredBy =
127 synchronizedSetMultimap(newSetMultimap(Maps.newHashMap(), Sets::newHashSet));
129 private ApplicationId coreAppId;
132 public void activate() {
133 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
134 .register(KryoNamespaces.API)
135 .register(MultiValuedTimestamp.class)
136 .register(InternalState.class);
138 executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
140 messageHandlingExecutor = Executors.newSingleThreadExecutor(
141 groupedThreads("onos/store/app", "message-handler"));
143 clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST,
144 bytes -> new String(bytes, Charsets.UTF_8),
147 return toByteArray(getApplicationInputStream(name));
148 } catch (IOException e) {
149 throw new StorageException(e);
153 messageHandlingExecutor);
155 // FIXME: Consider consolidating into a single map.
157 apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
159 .withSerializer(serializer)
160 .withTimestampProvider((k, v) -> clockService.getTimestamp())
163 states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
164 .withName("app-states")
165 .withSerializer(serializer)
166 .withTimestampProvider((k, v) -> clockService.getTimestamp())
169 states.addListener(new InternalAppStatesListener());
171 permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
172 .withName("app-permissions")
173 .withSerializer(serializer)
174 .withTimestampProvider((k, v) -> clockService.getTimestamp())
177 coreAppId = getId(CoreService.CORE_APP_NAME);
182 * Loads the application inventory from the disk and activates apps if
183 * they are marked to be active.
185 private void loadFromDisk() {
186 for (String name : getApplicationNames()) {
187 for (int i = 0; i < MAX_LOAD_RETRIES; i++) {
189 Application app = create(getApplicationDescription(name), false);
190 if (app != null && isActive(app.id().name())) {
191 requiredBy.put(app.id(), coreAppId);
192 activate(app.id(), false);
193 // load app permissions
195 } catch (Exception e) {
196 log.warn("Unable to load application {} from disk; retrying", name);
197 randomDelay(RETRY_DELAY_MS); // FIXME: This is a deliberate hack; fix in Drake
204 public void deactivate() {
205 clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
206 messageHandlingExecutor.shutdown();
210 permissions.destroy();
215 public void setDelegate(ApplicationStoreDelegate delegate) {
216 super.setDelegate(delegate);
221 public Set<Application> getApplications() {
222 return ImmutableSet.copyOf(apps.values());
226 public ApplicationId getId(String name) {
227 return idStore.getAppId(name);
231 public Application getApplication(ApplicationId appId) {
232 return apps.get(appId);
236 public ApplicationState getState(ApplicationId appId) {
237 Application app = apps.get(appId);
238 InternalState s = app == null ? null : states.get(app);
239 return s == null ? null : s == ACTIVATED ?
240 ApplicationState.ACTIVE : ApplicationState.INSTALLED;
244 public Application create(InputStream appDescStream) {
245 ApplicationDescription appDesc = saveApplication(appDescStream);
246 if (hasPrerequisites(appDesc)) {
247 return create(appDesc, true);
249 throw new ApplicationException("Missing dependencies for app " + appDesc.name());
252 private boolean hasPrerequisites(ApplicationDescription app) {
253 return !app.requiredApps().stream().map(n -> getId(n))
254 .anyMatch(id -> id == null || getApplication(id) == null);
257 private Application create(ApplicationDescription appDesc, boolean updateTime) {
258 Application app = registerApp(appDesc);
260 updateTime(app.id().name());
262 apps.put(app.id(), app);
263 states.put(app, INSTALLED);
268 public void remove(ApplicationId appId) {
269 Application app = apps.get(appId);
271 uninstallDependentApps(app);
274 permissions.remove(app);
278 // Uninstalls all apps that depend on the given app.
279 private void uninstallDependentApps(Application app) {
280 getApplications().stream()
281 .filter(a -> a.requiredApps().contains(app.id().name()))
282 .forEach(a -> remove(a.id()));
286 public void activate(ApplicationId appId) {
287 activate(appId, coreAppId);
290 private void activate(ApplicationId appId, ApplicationId forAppId) {
291 requiredBy.put(appId, forAppId);
292 activate(appId, true);
296 private void activate(ApplicationId appId, boolean updateTime) {
297 Application app = apps.get(appId);
300 updateTime(appId.name());
302 activateRequiredApps(app);
303 states.put(app, ACTIVATED);
307 // Activates all apps required by this application.
308 private void activateRequiredApps(Application app) {
309 app.requiredApps().stream().map(this::getId).forEach(id -> activate(id, app.id()));
313 public void deactivate(ApplicationId appId) {
314 deactivateDependentApps(getApplication(appId));
315 deactivate(appId, coreAppId);
318 private void deactivate(ApplicationId appId, ApplicationId forAppId) {
319 requiredBy.remove(appId, forAppId);
320 if (requiredBy.get(appId).isEmpty()) {
321 Application app = apps.get(appId);
323 updateTime(appId.name());
324 states.put(app, DEACTIVATED);
325 deactivateRequiredApps(app);
330 // Deactivates all apps that require this application.
331 private void deactivateDependentApps(Application app) {
332 getApplications().stream()
333 .filter(a -> states.get(a) == ACTIVATED)
334 .filter(a -> a.requiredApps().contains(app.id().name()))
335 .forEach(a -> deactivate(a.id()));
338 // Deactivates all apps required by this application.
339 private void deactivateRequiredApps(Application app) {
340 app.requiredApps().stream().map(this::getId).map(this::getApplication)
341 .filter(a -> states.get(a) == ACTIVATED)
342 .forEach(a -> deactivate(a.id(), app.id()));
346 public Set<Permission> getPermissions(ApplicationId appId) {
347 Application app = apps.get(appId);
348 return app != null ? permissions.get(app) : null;
352 public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
353 Application app = getApplication(appId);
355 this.permissions.put(app, permissions);
356 delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
361 * Listener to application state distributed map changes.
363 private final class InternalAppStatesListener
364 implements EventuallyConsistentMapListener<Application, InternalState> {
366 public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
367 // If we do not have a delegate, refuse to process any events entirely.
368 // This is to allow the anti-entropy to kick in and process the events
369 // perhaps a bit later, but with opportunity to notify delegate.
370 if (delegate == null) {
374 Application app = event.key();
375 InternalState state = event.value();
377 if (event.type() == PUT) {
378 if (state == INSTALLED) {
379 fetchBitsIfNeeded(app);
380 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
382 } else if (state == ACTIVATED) {
383 installAppIfNeeded(app);
384 setActive(app.id().name());
385 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
387 } else if (state == DEACTIVATED) {
388 clearActive(app.id().name());
389 delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
391 } else if (event.type() == REMOVE) {
392 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
393 purgeApplication(app.id().name());
399 * Determines if the application bits are available locally.
401 private boolean appBitsAvailable(Application app) {
403 ApplicationDescription appDesc = getApplicationDescription(app.id().name());
404 return appDesc.version().equals(app.version());
405 } catch (ApplicationException e) {
411 * Fetches the bits from the cluster peers if necessary.
413 private void fetchBitsIfNeeded(Application app) {
414 if (!appBitsAvailable(app)) {
420 * Installs the application if necessary from the application peers.
422 private void installAppIfNeeded(Application app) {
423 if (!appBitsAvailable(app)) {
425 delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
430 * Fetches the bits from the cluster peers.
432 private void fetchBits(Application app) {
433 ControllerNode localNode = clusterService.getLocalNode();
434 CountDownLatch latch = new CountDownLatch(1);
436 // FIXME: send message with name & version to make sure we don't get served old bits
438 log.info("Downloading bits for application {}", app.id().name());
439 for (ControllerNode node : clusterService.getNodes()) {
440 if (latch.getCount() == 0) {
443 if (node.equals(localNode)) {
446 clusterCommunicator.sendAndReceive(app.id().name(),
448 s -> s.getBytes(Charsets.UTF_8),
451 .whenCompleteAsync((bits, error) -> {
452 if (error == null && latch.getCount() > 0) {
453 saveApplication(new ByteArrayInputStream(bits));
454 log.info("Downloaded bits for application {} from node {}",
455 app.id().name(), node.id());
457 } else if (error != null) {
458 log.warn("Unable to fetch bits for application {} from node {}",
459 app.id().name(), node.id());
465 if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
466 log.warn("Unable to fetch bits for application {}", app.id().name());
468 } catch (InterruptedException e) {
469 log.warn("Interrupted while fetching bits for application {}", app.id().name());
474 * Prunes applications which are not in the map, but are on disk.
476 private void pruneUninstalledApps() {
477 for (String name : getApplicationNames()) {
478 if (getApplication(getId(name)) == null) {
479 Application app = registerApp(getApplicationDescription(name));
480 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
481 purgeApplication(app.id().name());
487 * Produces a registered application from the supplied description.
489 private Application registerApp(ApplicationDescription appDesc) {
490 ApplicationId appId = idStore.registerApplication(appDesc.name());
491 return new DefaultApplication(appId, appDesc.version(), appDesc.description(),
492 appDesc.origin(), appDesc.role(), appDesc.permissions(),
493 appDesc.featuresRepo(), appDesc.features(),
494 appDesc.requiredApps());