dda820ae045608b38e384490bc62f210fb781221
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.store.app;
17
18 import com.google.common.base.Charsets;
19 import com.google.common.collect.ImmutableSet;
20 import com.google.common.collect.Maps;
21 import com.google.common.collect.Multimap;
22 import com.google.common.collect.Sets;
23 import org.apache.felix.scr.annotations.Activate;
24 import org.apache.felix.scr.annotations.Component;
25 import org.apache.felix.scr.annotations.Deactivate;
26 import org.apache.felix.scr.annotations.Reference;
27 import org.apache.felix.scr.annotations.ReferenceCardinality;
28 import org.apache.felix.scr.annotations.Service;
29 import org.onlab.util.KryoNamespace;
30 import org.onosproject.app.ApplicationDescription;
31 import org.onosproject.app.ApplicationEvent;
32 import org.onosproject.app.ApplicationException;
33 import org.onosproject.app.ApplicationState;
34 import org.onosproject.app.ApplicationStore;
35 import org.onosproject.app.ApplicationStoreDelegate;
36 import org.onosproject.cluster.ClusterService;
37 import org.onosproject.cluster.ControllerNode;
38 import org.onosproject.common.app.ApplicationArchive;
39 import org.onosproject.core.Application;
40 import org.onosproject.core.ApplicationId;
41 import org.onosproject.core.ApplicationIdStore;
42 import org.onosproject.core.CoreService;
43 import org.onosproject.core.DefaultApplication;
44 import org.onosproject.security.Permission;
45 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
46 import org.onosproject.store.cluster.messaging.MessageSubject;
47 import org.onosproject.store.serializers.KryoNamespaces;
48 import org.onosproject.store.service.EventuallyConsistentMap;
49 import org.onosproject.store.service.EventuallyConsistentMapEvent;
50 import org.onosproject.store.service.EventuallyConsistentMapListener;
51 import org.onosproject.store.service.LogicalClockService;
52 import org.onosproject.store.service.MultiValuedTimestamp;
53 import org.onosproject.store.service.StorageException;
54 import org.onosproject.store.service.StorageService;
55 import org.slf4j.Logger;
56
57 import java.io.ByteArrayInputStream;
58 import java.io.IOException;
59 import java.io.InputStream;
60 import java.util.Set;
61 import java.util.concurrent.CountDownLatch;
62 import java.util.concurrent.ExecutorService;
63 import java.util.concurrent.Executors;
64 import java.util.concurrent.ScheduledExecutorService;
65 import java.util.function.Function;
66
67 import static com.google.common.collect.Multimaps.newSetMultimap;
68 import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
69 import static com.google.common.io.ByteStreams.toByteArray;
70 import static java.util.concurrent.TimeUnit.MILLISECONDS;
71 import static org.onlab.util.Tools.groupedThreads;
72 import static org.onlab.util.Tools.randomDelay;
73 import static org.onosproject.app.ApplicationEvent.Type.*;
74 import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
75 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
76 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
77 import static org.slf4j.LoggerFactory.getLogger;
78
79 /**
80  * Manages inventory of applications in a distributed data store that uses
81  * optimistic replication and gossip based anti-entropy techniques.
82  */
83 @Component(immediate = true)
84 @Service
85 public class GossipApplicationStore extends ApplicationArchive
86         implements ApplicationStore {
87
88     private final Logger log = getLogger(getClass());
89
90     private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
91
92     private static final int MAX_LOAD_RETRIES = 5;
93     private static final int RETRY_DELAY_MS = 2_000;
94
95     private static final int FETCH_TIMEOUT_MS = 10_000;
96
97     public enum InternalState {
98         INSTALLED, ACTIVATED, DEACTIVATED
99     }
100
101     private ScheduledExecutorService executor;
102     private ExecutorService messageHandlingExecutor;
103
104     private EventuallyConsistentMap<ApplicationId, Application> apps;
105     private EventuallyConsistentMap<Application, InternalState> states;
106     private EventuallyConsistentMap<Application, Set<Permission>> permissions;
107
108     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
109     protected ClusterCommunicationService clusterCommunicator;
110
111     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
112     protected ClusterService clusterService;
113
114     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
115     protected StorageService storageService;
116
117     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
118     protected LogicalClockService clockService;
119
120     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
121     protected ApplicationIdStore idStore;
122
123     // Multimap to track which apps are required by others apps
124     // app -> { required-by, ... }
125     // Apps explicitly activated will be required by the CORE app
126     private final Multimap<ApplicationId, ApplicationId> requiredBy =
127             synchronizedSetMultimap(newSetMultimap(Maps.newHashMap(), Sets::newHashSet));
128
129     private ApplicationId coreAppId;
130
131     @Activate
132     public void activate() {
133         KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
134                 .register(KryoNamespaces.API)
135                 .register(MultiValuedTimestamp.class)
136                 .register(InternalState.class);
137
138         executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
139
140         messageHandlingExecutor = Executors.newSingleThreadExecutor(
141                 groupedThreads("onos/store/app", "message-handler"));
142
143         clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST,
144                                                           bytes -> new String(bytes, Charsets.UTF_8),
145                                                           name -> {
146                                                               try {
147                                                                   return toByteArray(getApplicationInputStream(name));
148                                                               } catch (IOException e) {
149                                                                   throw new StorageException(e);
150                                                               }
151                                                           },
152                                                           Function.identity(),
153                                                           messageHandlingExecutor);
154
155         // FIXME: Consider consolidating into a single map.
156
157         apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
158                 .withName("apps")
159                 .withSerializer(serializer)
160                 .withTimestampProvider((k, v) -> clockService.getTimestamp())
161                 .build();
162
163         states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
164                 .withName("app-states")
165                 .withSerializer(serializer)
166                 .withTimestampProvider((k, v) -> clockService.getTimestamp())
167                 .build();
168
169         states.addListener(new InternalAppStatesListener());
170
171         permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
172                 .withName("app-permissions")
173                 .withSerializer(serializer)
174                 .withTimestampProvider((k, v) -> clockService.getTimestamp())
175                 .build();
176
177         coreAppId = getId(CoreService.CORE_APP_NAME);
178         log.info("Started");
179     }
180
181     /**
182      * Loads the application inventory from the disk and activates apps if
183      * they are marked to be active.
184      */
185     private void loadFromDisk() {
186         for (String name : getApplicationNames()) {
187             for (int i = 0; i < MAX_LOAD_RETRIES; i++) {
188                 try {
189                     Application app = create(getApplicationDescription(name), false);
190                     if (app != null && isActive(app.id().name())) {
191                         requiredBy.put(app.id(), coreAppId);
192                         activate(app.id(), false);
193                         // load app permissions
194                     }
195                 } catch (Exception e) {
196                     log.warn("Unable to load application {} from disk; retrying", name);
197                     randomDelay(RETRY_DELAY_MS);  // FIXME: This is a deliberate hack; fix in Drake
198                 }
199             }
200         }
201     }
202
203     @Deactivate
204     public void deactivate() {
205         clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
206         messageHandlingExecutor.shutdown();
207         executor.shutdown();
208         apps.destroy();
209         states.destroy();
210         permissions.destroy();
211         log.info("Stopped");
212     }
213
214     @Override
215     public void setDelegate(ApplicationStoreDelegate delegate) {
216         super.setDelegate(delegate);
217         loadFromDisk();
218     }
219
220     @Override
221     public Set<Application> getApplications() {
222         return ImmutableSet.copyOf(apps.values());
223     }
224
225     @Override
226     public ApplicationId getId(String name) {
227         return idStore.getAppId(name);
228     }
229
230     @Override
231     public Application getApplication(ApplicationId appId) {
232         return apps.get(appId);
233     }
234
235     @Override
236     public ApplicationState getState(ApplicationId appId) {
237         Application app = apps.get(appId);
238         InternalState s = app == null ? null : states.get(app);
239         return s == null ? null : s == ACTIVATED ?
240                 ApplicationState.ACTIVE : ApplicationState.INSTALLED;
241     }
242
243     @Override
244     public Application create(InputStream appDescStream) {
245         ApplicationDescription appDesc = saveApplication(appDescStream);
246         if (hasPrerequisites(appDesc)) {
247             return create(appDesc, true);
248         }
249         throw new ApplicationException("Missing dependencies for app " + appDesc.name());
250     }
251
252     private boolean hasPrerequisites(ApplicationDescription app) {
253         return !app.requiredApps().stream().map(n -> getId(n))
254                 .anyMatch(id -> id == null || getApplication(id) == null);
255     }
256
257     private Application create(ApplicationDescription appDesc, boolean updateTime) {
258         Application app = registerApp(appDesc);
259         if (updateTime) {
260             updateTime(app.id().name());
261         }
262         apps.put(app.id(), app);
263         states.put(app, INSTALLED);
264         return app;
265     }
266
267     @Override
268     public void remove(ApplicationId appId) {
269         Application app = apps.get(appId);
270         if (app != null) {
271             uninstallDependentApps(app);
272             apps.remove(appId);
273             states.remove(app);
274             permissions.remove(app);
275         }
276     }
277
278     // Uninstalls all apps that depend on the given app.
279     private void uninstallDependentApps(Application app) {
280         getApplications().stream()
281                 .filter(a -> a.requiredApps().contains(app.id().name()))
282                 .forEach(a -> remove(a.id()));
283     }
284
285     @Override
286     public void activate(ApplicationId appId) {
287         activate(appId, coreAppId);
288     }
289
290     private void activate(ApplicationId appId, ApplicationId forAppId) {
291         requiredBy.put(appId, forAppId);
292         activate(appId, true);
293     }
294
295
296     private void activate(ApplicationId appId, boolean updateTime) {
297         Application app = apps.get(appId);
298         if (app != null) {
299             if (updateTime) {
300                 updateTime(appId.name());
301             }
302             activateRequiredApps(app);
303             states.put(app, ACTIVATED);
304         }
305     }
306
307     // Activates all apps required by this application.
308     private void activateRequiredApps(Application app) {
309         app.requiredApps().stream().map(this::getId).forEach(id -> activate(id, app.id()));
310     }
311
312     @Override
313     public void deactivate(ApplicationId appId) {
314         deactivateDependentApps(getApplication(appId));
315         deactivate(appId, coreAppId);
316     }
317
318     private void deactivate(ApplicationId appId, ApplicationId forAppId) {
319         requiredBy.remove(appId, forAppId);
320         if (requiredBy.get(appId).isEmpty()) {
321             Application app = apps.get(appId);
322             if (app != null) {
323                 updateTime(appId.name());
324                 states.put(app, DEACTIVATED);
325                 deactivateRequiredApps(app);
326             }
327         }
328     }
329
330     // Deactivates all apps that require this application.
331     private void deactivateDependentApps(Application app) {
332         getApplications().stream()
333                 .filter(a -> states.get(a) == ACTIVATED)
334                 .filter(a -> a.requiredApps().contains(app.id().name()))
335                 .forEach(a -> deactivate(a.id()));
336     }
337
338     // Deactivates all apps required by this application.
339     private void deactivateRequiredApps(Application app) {
340         app.requiredApps().stream().map(this::getId).map(this::getApplication)
341                 .filter(a -> states.get(a) == ACTIVATED)
342                 .forEach(a -> deactivate(a.id(), app.id()));
343     }
344
345     @Override
346     public Set<Permission> getPermissions(ApplicationId appId) {
347         Application app = apps.get(appId);
348         return app != null ? permissions.get(app) : null;
349     }
350
351     @Override
352     public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
353         Application app = getApplication(appId);
354         if (app != null) {
355             this.permissions.put(app, permissions);
356             delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
357         }
358     }
359
360     /**
361      * Listener to application state distributed map changes.
362      */
363     private final class InternalAppStatesListener
364             implements EventuallyConsistentMapListener<Application, InternalState> {
365         @Override
366         public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
367             // If we do not have a delegate, refuse to process any events entirely.
368             // This is to allow the anti-entropy to kick in and process the events
369             // perhaps a bit later, but with opportunity to notify delegate.
370             if (delegate == null) {
371                 return;
372             }
373
374             Application app = event.key();
375             InternalState state = event.value();
376
377             if (event.type() == PUT) {
378                 if (state == INSTALLED) {
379                     fetchBitsIfNeeded(app);
380                     delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
381
382                 } else if (state == ACTIVATED) {
383                     installAppIfNeeded(app);
384                     setActive(app.id().name());
385                     delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
386
387                 } else if (state == DEACTIVATED) {
388                     clearActive(app.id().name());
389                     delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
390                 }
391             } else if (event.type() == REMOVE) {
392                 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
393                 purgeApplication(app.id().name());
394             }
395         }
396     }
397
398     /**
399      * Determines if the application bits are available locally.
400      */
401     private boolean appBitsAvailable(Application app) {
402         try {
403             ApplicationDescription appDesc = getApplicationDescription(app.id().name());
404             return appDesc.version().equals(app.version());
405         } catch (ApplicationException e) {
406             return false;
407         }
408     }
409
410     /**
411      * Fetches the bits from the cluster peers if necessary.
412      */
413     private void fetchBitsIfNeeded(Application app) {
414         if (!appBitsAvailable(app)) {
415             fetchBits(app);
416         }
417     }
418
419     /**
420      * Installs the application if necessary from the application peers.
421      */
422     private void installAppIfNeeded(Application app) {
423         if (!appBitsAvailable(app)) {
424             fetchBits(app);
425             delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
426         }
427     }
428
429     /**
430      * Fetches the bits from the cluster peers.
431      */
432     private void fetchBits(Application app) {
433         ControllerNode localNode = clusterService.getLocalNode();
434         CountDownLatch latch = new CountDownLatch(1);
435
436         // FIXME: send message with name & version to make sure we don't get served old bits
437
438         log.info("Downloading bits for application {}", app.id().name());
439         for (ControllerNode node : clusterService.getNodes()) {
440             if (latch.getCount() == 0) {
441                 break;
442             }
443             if (node.equals(localNode)) {
444                 continue;
445             }
446             clusterCommunicator.sendAndReceive(app.id().name(),
447                                                APP_BITS_REQUEST,
448                                                s -> s.getBytes(Charsets.UTF_8),
449                                                Function.identity(),
450                                                node.id())
451                     .whenCompleteAsync((bits, error) -> {
452                         if (error == null && latch.getCount() > 0) {
453                             saveApplication(new ByteArrayInputStream(bits));
454                             log.info("Downloaded bits for application {} from node {}",
455                                      app.id().name(), node.id());
456                             latch.countDown();
457                         } else if (error != null) {
458                             log.warn("Unable to fetch bits for application {} from node {}",
459                                      app.id().name(), node.id());
460                         }
461                     }, executor);
462         }
463
464         try {
465             if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
466                 log.warn("Unable to fetch bits for application {}", app.id().name());
467             }
468         } catch (InterruptedException e) {
469             log.warn("Interrupted while fetching bits for application {}", app.id().name());
470         }
471     }
472
473     /**
474      * Prunes applications which are not in the map, but are on disk.
475      */
476     private void pruneUninstalledApps() {
477         for (String name : getApplicationNames()) {
478             if (getApplication(getId(name)) == null) {
479                 Application app = registerApp(getApplicationDescription(name));
480                 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
481                 purgeApplication(app.id().name());
482             }
483         }
484     }
485
486     /**
487      * Produces a registered application from the supplied description.
488      */
489     private Application registerApp(ApplicationDescription appDesc) {
490         ApplicationId appId = idStore.registerApplication(appDesc.name());
491         return new DefaultApplication(appId, appDesc.version(), appDesc.description(),
492                                       appDesc.origin(), appDesc.role(), appDesc.permissions(),
493                                       appDesc.featuresRepo(), appDesc.features(),
494                                       appDesc.requiredApps());
495     }
496 }