6764c2223ad539cf029f7ca4dd40a1928dae6b6f
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.store.app;
17
18 import com.google.common.base.Charsets;
19 import com.google.common.collect.ImmutableSet;
20
21 import org.apache.felix.scr.annotations.Activate;
22 import org.apache.felix.scr.annotations.Component;
23 import org.apache.felix.scr.annotations.Deactivate;
24 import org.apache.felix.scr.annotations.Reference;
25 import org.apache.felix.scr.annotations.ReferenceCardinality;
26 import org.apache.felix.scr.annotations.Service;
27 import org.onlab.util.KryoNamespace;
28 import org.onosproject.app.ApplicationDescription;
29 import org.onosproject.app.ApplicationEvent;
30 import org.onosproject.app.ApplicationException;
31 import org.onosproject.app.ApplicationState;
32 import org.onosproject.app.ApplicationStore;
33 import org.onosproject.app.ApplicationStoreDelegate;
34 import org.onosproject.cluster.ClusterService;
35 import org.onosproject.cluster.ControllerNode;
36 import org.onosproject.common.app.ApplicationArchive;
37 import org.onosproject.core.Application;
38 import org.onosproject.core.ApplicationId;
39 import org.onosproject.core.ApplicationIdStore;
40 import org.onosproject.core.DefaultApplication;
41 import org.onosproject.security.Permission;
42 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
43 import org.onosproject.store.cluster.messaging.MessageSubject;
44 import org.onosproject.store.serializers.KryoNamespaces;
45 import org.onosproject.store.service.EventuallyConsistentMap;
46 import org.onosproject.store.service.EventuallyConsistentMapEvent;
47 import org.onosproject.store.service.EventuallyConsistentMapListener;
48 import org.onosproject.store.service.LogicalClockService;
49 import org.onosproject.store.service.MultiValuedTimestamp;
50 import org.onosproject.store.service.StorageException;
51 import org.onosproject.store.service.StorageService;
52 import org.slf4j.Logger;
53
54 import java.io.ByteArrayInputStream;
55 import java.io.IOException;
56 import java.io.InputStream;
57 import java.util.Set;
58 import java.util.concurrent.CountDownLatch;
59 import java.util.concurrent.ExecutorService;
60 import java.util.concurrent.Executors;
61 import java.util.concurrent.ScheduledExecutorService;
62 import java.util.function.Function;
63
64 import static com.google.common.io.ByteStreams.toByteArray;
65 import static java.util.concurrent.TimeUnit.MILLISECONDS;
66 import static org.onlab.util.Tools.groupedThreads;
67 import static org.onlab.util.Tools.randomDelay;
68 import static org.onosproject.app.ApplicationEvent.Type.*;
69 import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
70 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
71 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
72 import static org.slf4j.LoggerFactory.getLogger;
73
74 /**
75  * Manages inventory of applications in a distributed data store that uses
76  * optimistic replication and gossip based anti-entropy techniques.
77  */
78 @Component(immediate = true)
79 @Service
80 public class GossipApplicationStore extends ApplicationArchive
81         implements ApplicationStore {
82
83     private final Logger log = getLogger(getClass());
84
85     private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
86
87     private static final int MAX_LOAD_RETRIES = 5;
88     private static final int RETRY_DELAY_MS = 2_000;
89
90     private static final int FETCH_TIMEOUT_MS = 10_000;
91
92     public enum InternalState {
93         INSTALLED, ACTIVATED, DEACTIVATED
94     }
95
96     private ScheduledExecutorService executor;
97     private ExecutorService messageHandlingExecutor;
98
99     private EventuallyConsistentMap<ApplicationId, Application> apps;
100     private EventuallyConsistentMap<Application, InternalState> states;
101     private EventuallyConsistentMap<Application, Set<Permission>> permissions;
102
103     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104     protected ClusterCommunicationService clusterCommunicator;
105
106     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107     protected ClusterService clusterService;
108
109     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110     protected StorageService storageService;
111
112     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113     protected LogicalClockService clockService;
114
115     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116     protected ApplicationIdStore idStore;
117
118     @Activate
119     public void activate() {
120         KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
121                 .register(KryoNamespaces.API)
122                 .register(MultiValuedTimestamp.class)
123                 .register(InternalState.class);
124
125         executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
126
127         messageHandlingExecutor = Executors.newSingleThreadExecutor(
128                 groupedThreads("onos/store/app", "message-handler"));
129
130         clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST,
131                 bytes -> new String(bytes, Charsets.UTF_8),
132                 name -> {
133                     try {
134                         return toByteArray(getApplicationInputStream(name));
135                     } catch (IOException e) {
136                         throw new StorageException(e);
137                     }
138                 },
139                 Function.identity(),
140                 messageHandlingExecutor);
141
142         // FIXME: Consider consolidating into a single map.
143
144         apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
145                 .withName("apps")
146                 .withSerializer(serializer)
147                 .withTimestampProvider((k, v) -> clockService.getTimestamp())
148                 .build();
149
150         states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
151                 .withName("app-states")
152                 .withSerializer(serializer)
153                 .withTimestampProvider((k, v) -> clockService.getTimestamp())
154                 .build();
155
156         states.addListener(new InternalAppStatesListener());
157
158         permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
159                 .withName("app-permissions")
160                 .withSerializer(serializer)
161                 .withTimestampProvider((k, v) -> clockService.getTimestamp())
162                 .build();
163
164         log.info("Started");
165     }
166
167     /**
168      * Loads the application inventory from the disk and activates apps if
169      * they are marked to be active.
170      */
171     private void loadFromDisk() {
172         for (String name : getApplicationNames()) {
173             for (int i = 0; i < MAX_LOAD_RETRIES; i++) {
174                 try {
175                     Application app = create(getApplicationDescription(name), false);
176                     if (app != null && isActive(app.id().name())) {
177                         activate(app.id(), false);
178                         // load app permissions
179                     }
180                 } catch (Exception e) {
181                     log.warn("Unable to load application {} from disk; retrying", name);
182                     randomDelay(RETRY_DELAY_MS);  // FIXME: This is a deliberate hack; fix in Drake
183                 }
184             }
185         }
186     }
187
188     @Deactivate
189     public void deactivate() {
190         clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
191         messageHandlingExecutor.shutdown();
192         executor.shutdown();
193         apps.destroy();
194         states.destroy();
195         permissions.destroy();
196         log.info("Stopped");
197     }
198
199     @Override
200     public void setDelegate(ApplicationStoreDelegate delegate) {
201         super.setDelegate(delegate);
202         loadFromDisk();
203 //        executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
204     }
205
206     @Override
207     public Set<Application> getApplications() {
208         return ImmutableSet.copyOf(apps.values());
209     }
210
211     @Override
212     public ApplicationId getId(String name) {
213         return idStore.getAppId(name);
214     }
215
216     @Override
217     public Application getApplication(ApplicationId appId) {
218         return apps.get(appId);
219     }
220
221     @Override
222     public ApplicationState getState(ApplicationId appId) {
223         Application app = apps.get(appId);
224         InternalState s = app == null ? null : states.get(app);
225         return s == null ? null : s == ACTIVATED ?
226                 ApplicationState.ACTIVE : ApplicationState.INSTALLED;
227     }
228
229     @Override
230     public Application create(InputStream appDescStream) {
231         ApplicationDescription appDesc = saveApplication(appDescStream);
232         return create(appDesc, true);
233     }
234
235     private Application create(ApplicationDescription appDesc, boolean updateTime) {
236         Application app = registerApp(appDesc);
237         if (updateTime) {
238             updateTime(app.id().name());
239         }
240         apps.put(app.id(), app);
241         states.put(app, INSTALLED);
242         return app;
243     }
244
245     @Override
246     public void remove(ApplicationId appId) {
247         Application app = apps.get(appId);
248         if (app != null) {
249             apps.remove(appId);
250             states.remove(app);
251             permissions.remove(app);
252         }
253     }
254
255     @Override
256     public void activate(ApplicationId appId) {
257         activate(appId, true);
258     }
259
260     private void activate(ApplicationId appId, boolean updateTime) {
261         Application app = apps.get(appId);
262         if (app != null) {
263             if (updateTime) {
264                 updateTime(appId.name());
265             }
266             states.put(app, ACTIVATED);
267         }
268     }
269
270     @Override
271     public void deactivate(ApplicationId appId) {
272         Application app = apps.get(appId);
273         if (app != null) {
274             updateTime(appId.name());
275             states.put(app, DEACTIVATED);
276         }
277     }
278
279     @Override
280     public Set<Permission> getPermissions(ApplicationId appId) {
281         Application app = apps.get(appId);
282         return app != null ? permissions.get(app) : null;
283     }
284
285     @Override
286     public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
287         Application app = getApplication(appId);
288         if (app != null) {
289             this.permissions.put(app, permissions);
290             delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
291         }
292     }
293
294     /**
295      * Listener to application state distributed map changes.
296      */
297     private final class InternalAppStatesListener
298             implements EventuallyConsistentMapListener<Application, InternalState> {
299         @Override
300         public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
301             // If we do not have a delegate, refuse to process any events entirely.
302             // This is to allow the anti-entropy to kick in and process the events
303             // perhaps a bit later, but with opportunity to notify delegate.
304             if (delegate == null) {
305                 return;
306             }
307
308             Application app = event.key();
309             InternalState state = event.value();
310
311             if (event.type() == PUT) {
312                 if (state == INSTALLED) {
313                     fetchBitsIfNeeded(app);
314                     delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
315
316                 } else if (state == ACTIVATED) {
317                     installAppIfNeeded(app);
318                     setActive(app.id().name());
319                     delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
320
321                 } else if (state == DEACTIVATED) {
322                     clearActive(app.id().name());
323                     delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
324                 }
325             } else if (event.type() == REMOVE) {
326                 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
327                 purgeApplication(app.id().name());
328             }
329         }
330     }
331
332     /**
333      * Determines if the application bits are available locally.
334      */
335     private boolean appBitsAvailable(Application app) {
336         try {
337             ApplicationDescription appDesc = getApplicationDescription(app.id().name());
338             return appDesc.version().equals(app.version());
339         } catch (ApplicationException e) {
340             return false;
341         }
342     }
343
344     /**
345      * Fetches the bits from the cluster peers if necessary.
346      */
347     private void fetchBitsIfNeeded(Application app) {
348         if (!appBitsAvailable(app)) {
349             fetchBits(app);
350         }
351     }
352
353     /**
354      * Installs the application if necessary from the application peers.
355      */
356     private void installAppIfNeeded(Application app) {
357         if (!appBitsAvailable(app)) {
358             fetchBits(app);
359             delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
360         }
361     }
362
363     /**
364      * Fetches the bits from the cluster peers.
365      */
366     private void fetchBits(Application app) {
367         ControllerNode localNode = clusterService.getLocalNode();
368         CountDownLatch latch = new CountDownLatch(1);
369
370         // FIXME: send message with name & version to make sure we don't get served old bits
371
372         log.info("Downloading bits for application {}", app.id().name());
373         for (ControllerNode node : clusterService.getNodes()) {
374             if (latch.getCount() == 0) {
375                 break;
376             }
377             if (node.equals(localNode)) {
378                 continue;
379             }
380             clusterCommunicator.sendAndReceive(app.id().name(),
381                                                APP_BITS_REQUEST,
382                                                s -> s.getBytes(Charsets.UTF_8),
383                                                Function.identity(),
384                                                node.id())
385                     .whenCompleteAsync((bits, error) -> {
386                         if (error == null && latch.getCount() > 0) {
387                             saveApplication(new ByteArrayInputStream(bits));
388                             log.info("Downloaded bits for application {} from node {}",
389                                      app.id().name(), node.id());
390                             latch.countDown();
391                         } else if (error != null) {
392                             log.warn("Unable to fetch bits for application {} from node {}",
393                                      app.id().name(), node.id());
394                         }
395                     }, executor);
396         }
397
398         try {
399             if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
400                 log.warn("Unable to fetch bits for application {}", app.id().name());
401             }
402         } catch (InterruptedException e) {
403             log.warn("Interrupted while fetching bits for application {}", app.id().name());
404         }
405     }
406
407     /**
408      * Prunes applications which are not in the map, but are on disk.
409      */
410     private void pruneUninstalledApps() {
411         for (String name : getApplicationNames()) {
412             if (getApplication(getId(name)) == null) {
413                 Application app = registerApp(getApplicationDescription(name));
414                 delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
415                 purgeApplication(app.id().name());
416             }
417         }
418     }
419
420     /**
421      * Produces a registered application from the supplied description.
422      */
423     private Application registerApp(ApplicationDescription appDesc) {
424         ApplicationId appId = idStore.registerApplication(appDesc.name());
425         return new DefaultApplication(appId, appDesc.version(), appDesc.description(),
426                                       appDesc.origin(), appDesc.role(), appDesc.permissions(),
427                                       appDesc.featuresRepo(), appDesc.features());
428     }
429 }