a014504c5a2eaf9a33337c2e3d27855eabca394b
[onosfw.git] /
1 package org.onosproject.incubator.store.resource.impl;
2
3 import static org.onlab.util.Tools.groupedThreads;
4 import static org.slf4j.LoggerFactory.getLogger;
5
6 import java.util.Collection;
7 import java.util.Collections;
8 import java.util.HashSet;
9 import java.util.Iterator;
10 import java.util.Map;
11 import java.util.Set;
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.TimeUnit;
17 import java.util.concurrent.TimeoutException;
18
19 import org.apache.felix.scr.annotations.Activate;
20 import org.apache.felix.scr.annotations.Component;
21 import org.apache.felix.scr.annotations.Deactivate;
22 import org.apache.felix.scr.annotations.Reference;
23 import org.apache.felix.scr.annotations.ReferenceCardinality;
24 import org.apache.felix.scr.annotations.Service;
25 import org.onlab.util.KryoNamespace;
26 import org.onosproject.cluster.ClusterService;
27 import org.onosproject.cluster.NodeId;
28 import org.onosproject.incubator.net.resource.label.DefaultLabelResource;
29 import org.onosproject.incubator.net.resource.label.LabelResource;
30 import org.onosproject.incubator.net.resource.label.LabelResourceDelegate;
31 import org.onosproject.incubator.net.resource.label.LabelResourceEvent;
32 import org.onosproject.incubator.net.resource.label.LabelResourceEvent.Type;
33 import org.onosproject.incubator.net.resource.label.LabelResourceId;
34 import org.onosproject.incubator.net.resource.label.LabelResourcePool;
35 import org.onosproject.incubator.net.resource.label.LabelResourceRequest;
36 import org.onosproject.incubator.net.resource.label.LabelResourceStore;
37 import org.onosproject.mastership.MastershipService;
38 import org.onosproject.net.Device;
39 import org.onosproject.net.DeviceId;
40 import org.onosproject.net.device.DeviceService;
41 import org.onosproject.store.AbstractStore;
42 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
43 import org.onosproject.store.cluster.messaging.ClusterMessage;
44 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
45 import org.onosproject.store.serializers.KryoNamespaces;
46 import org.onosproject.store.service.ConsistentMap;
47 import org.onosproject.store.service.Serializer;
48 import org.onosproject.store.service.StorageService;
49 import org.onosproject.store.service.Versioned;
50 import org.slf4j.Logger;
51
52 import com.google.common.collect.ImmutableSet;
53 import com.google.common.collect.Multimap;
54
55 /**
56  * Manages label resources using copycat.
57  */
58 @Component(immediate = true, enabled = true)
59 @Service
60 public class DistributedLabelResourceStore
61         extends AbstractStore<LabelResourceEvent, LabelResourceDelegate>
62         implements LabelResourceStore {
63     private final Logger log = getLogger(getClass());
64
65     private static final String POOL_MAP_NAME = "labelresourcepool";
66
67     private static final String GLOBAL_RESOURCE_POOL_DEVICE_ID = "global_resource_pool_device_id";
68
69     private ConsistentMap<DeviceId, LabelResourcePool> resourcePool = null;
70
71     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72     protected StorageService storageService;
73
74     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75     protected MastershipService mastershipService;
76
77     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78     protected ClusterCommunicationService clusterCommunicator;
79
80     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81     protected ClusterService clusterService;
82
83     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
84     protected DeviceService deviceService;
85
86     private ExecutorService messageHandlingExecutor;
87     private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
88     private static final long PEER_REQUEST_TIMEOUT_MS = 5000;
89
90     private static final Serializer SERIALIZER = Serializer
91             .using(new KryoNamespace.Builder().register(KryoNamespaces.API)
92                     .register(LabelResourceEvent.class)
93                     .register(LabelResourcePool.class).register(DeviceId.class)
94                     .register(LabelResourceRequest.class)
95                     .register(LabelResourceRequest.Type.class)
96                     .register(LabelResourceEvent.Type.class)
97                     .register(DefaultLabelResource.class)
98                     .register(LabelResourceId.class)
99                     .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build());
100
101     @Activate
102     public void activate() {
103
104         resourcePool = storageService
105                 .<DeviceId, LabelResourcePool>consistentMapBuilder()
106                 .withName(POOL_MAP_NAME).withSerializer(SERIALIZER)
107                 .withPartitionsDisabled().build();
108         messageHandlingExecutor = Executors
109                 .newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
110                                     groupedThreads("onos/store/flow",
111                                                    "message-handlers"));
112         clusterCommunicator
113                 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED,
114                                new ClusterMessageHandler() {
115
116                                    @Override
117                                    public void handle(ClusterMessage message) {
118                                        LabelResourcePool operation = SERIALIZER
119                                                .decode(message.payload());
120                                        log.trace("received get flow entry request for {}",
121                                                  operation);
122                                        boolean b = internalCreate(operation);
123                                        message.respond(SERIALIZER.encode(b));
124                                    }
125                                }, messageHandlingExecutor);
126         clusterCommunicator
127                 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
128                                new ClusterMessageHandler() {
129
130                                    @Override
131                                    public void handle(ClusterMessage message) {
132                                        DeviceId deviceId = SERIALIZER
133                                                .decode(message.payload());
134                                        log.trace("received get flow entry request for {}",
135                                                  deviceId);
136                                        boolean b = internalDestroy(deviceId);
137                                        message.respond(SERIALIZER.encode(b));
138                                    }
139                                }, messageHandlingExecutor);
140         clusterCommunicator
141                 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY,
142                                new ClusterMessageHandler() {
143
144                                    @Override
145                                    public void handle(ClusterMessage message) {
146                                        LabelResourceRequest request = SERIALIZER
147                                                .decode(message.payload());
148                                        log.trace("received get flow entry request for {}",
149                                                  request);
150                                        final Collection<LabelResource> resource = internalApply(request);
151                                        message.respond(SERIALIZER
152                                                .encode(resource));
153                                    }
154                                }, messageHandlingExecutor);
155         clusterCommunicator
156                 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
157                                new ClusterMessageHandler() {
158
159                                    @Override
160                                    public void handle(ClusterMessage message) {
161                                        LabelResourceRequest request = SERIALIZER
162                                                .decode(message.payload());
163                                        log.trace("received get flow entry request for {}",
164                                                  request);
165                                        final boolean isSuccess = internalRelease(request);
166                                        message.respond(SERIALIZER
167                                                .encode(isSuccess));
168                                    }
169                                }, messageHandlingExecutor);
170         log.info("Started");
171     }
172
173     @Deactivate
174     public void deactivate() {
175         clusterCommunicator
176                 .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED);
177         clusterCommunicator
178                 .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY);
179         clusterCommunicator
180                 .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED);
181         clusterCommunicator
182                 .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE);
183         messageHandlingExecutor.shutdown();
184         log.info("Stopped");
185     }
186
187     @Override
188     public boolean createDevicePool(DeviceId deviceId,
189                                     LabelResourceId beginLabel,
190                                     LabelResourceId endLabel) {
191         LabelResourcePool pool = new LabelResourcePool(deviceId.toString(),
192                                                        beginLabel.labelId(),
193                                                        endLabel.labelId());
194         return this.create(pool);
195     }
196
197     @Override
198     public boolean createGlobalPool(LabelResourceId beginLabel,
199                                     LabelResourceId endLabel) {
200         LabelResourcePool pool = new LabelResourcePool(
201                                                        GLOBAL_RESOURCE_POOL_DEVICE_ID,
202                                                        beginLabel.labelId(),
203                                                        endLabel.labelId());
204         return this.internalCreate(pool);
205     }
206
207     private boolean create(LabelResourcePool pool) {
208         Device device = (Device) deviceService.getDevice(pool.deviceId());
209         if (device == null) {
210             return false;
211         }
212
213         NodeId master = mastershipService.getMasterFor(pool.deviceId());
214
215         if (master == null) {
216             log.warn("Failed to create label resource pool: No master for {}", pool);
217             return false;
218         }
219
220         if (master.equals(clusterService.getLocalNode().id())) {
221             return internalCreate(pool);
222         }
223
224         log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
225                   master, pool.deviceId());
226
227         return complete(clusterCommunicator
228                 .sendAndReceive(pool,
229                                 LabelResourceMessageSubjects.LABEL_POOL_CREATED,
230                                 SERIALIZER::encode, SERIALIZER::decode,
231                                 master));
232     }
233
234     private boolean internalCreate(LabelResourcePool pool) {
235         Versioned<LabelResourcePool> poolOld = resourcePool
236                 .get(pool.deviceId());
237         if (poolOld == null) {
238             resourcePool.put(pool.deviceId(), pool);
239             LabelResourceEvent event = new LabelResourceEvent(
240                                                               Type.POOL_CREATED,
241                                                               pool);
242             notifyDelegate(event);
243             return true;
244         }
245         return false;
246     }
247
248     @Override
249     public boolean destroyDevicePool(DeviceId deviceId) {
250         Device device = (Device) deviceService.getDevice(deviceId);
251         if (device == null) {
252             return false;
253         }
254
255         NodeId master = mastershipService.getMasterFor(deviceId);
256
257         if (master == null) {
258             log.warn("Failed to destroyDevicePool. No master for {}", deviceId);
259             return false;
260         }
261
262         if (master.equals(clusterService.getLocalNode().id())) {
263             return internalDestroy(deviceId);
264         }
265
266         log.trace("Forwarding request to {}, which is the primary (master) for device {}",
267                   master, deviceId);
268
269         return complete(clusterCommunicator
270                 .sendAndReceive(deviceId,
271                                 LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
272                                 SERIALIZER::encode, SERIALIZER::decode,
273                                 master));
274     }
275
276     private boolean internalDestroy(DeviceId deviceId) {
277         Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
278         if (poolOld != null) {
279             resourcePool.remove(deviceId);
280             LabelResourceEvent event = new LabelResourceEvent(
281                                                               Type.POOL_CREATED,
282                                                               poolOld.value());
283             notifyDelegate(event);
284         }
285         log.info("success to destroy the label resource pool of device id {}",
286                  deviceId);
287         return true;
288     }
289
290     @Override
291     public Collection<LabelResource> applyFromDevicePool(DeviceId deviceId,
292                                                          long applyNum) {
293         Device device = (Device) deviceService.getDevice(deviceId);
294         if (device == null) {
295             return Collections.emptyList();
296         }
297         LabelResourceRequest request = new LabelResourceRequest(
298                                                                 deviceId,
299                                                                 LabelResourceRequest.Type.APPLY,
300                                                                 applyNum, null);
301         NodeId master = mastershipService.getMasterFor(deviceId);
302
303         if (master == null) {
304             log.warn("Failed to applyFromDevicePool: No master for {}", deviceId);
305             return Collections.emptyList();
306         }
307
308         if (master.equals(clusterService.getLocalNode().id())) {
309             return internalApply(request);
310         }
311
312         log.trace("Forwarding request to {}, which is the primary (master) for device {}",
313                   master, deviceId);
314
315         return complete(clusterCommunicator
316                 .sendAndReceive(request,
317                                 LabelResourceMessageSubjects.LABEL_POOL_APPLY,
318                                 SERIALIZER::encode, SERIALIZER::decode,
319                                 master));
320     }
321
322     private Collection<LabelResource> internalApply(LabelResourceRequest request) {
323         DeviceId deviceId = request.deviceId();
324         long applyNum = request.applyNum();
325         Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
326         LabelResourcePool pool = poolOld.value();
327         Collection<LabelResource> result = new HashSet<LabelResource>();
328         long freeNum = this.getFreeNumOfDevicePool(deviceId);
329         if (applyNum > freeNum) {
330             log.info("the free number of the label resource pool of deviceId {} is not enough.");
331             return Collections.emptyList();
332         }
333         Set<LabelResource> releaseLabels = new HashSet<LabelResource>(
334                                                                       pool.releaseLabelId());
335         long tmp = releaseLabels.size() > applyNum ? applyNum : releaseLabels
336                 .size();
337         LabelResource resource = null;
338         for (int i = 0; i < tmp; i++) {
339             Iterator<LabelResource> it = releaseLabels.iterator();
340             if (it.hasNext()) {
341                 resource = it.next();
342                 releaseLabels.remove(resource);
343             }
344             result.add(resource);
345         }
346         for (long j = pool.currentUsedMaxLabelId().labelId(); j < pool
347                 .currentUsedMaxLabelId().labelId() + applyNum - tmp; j++) {
348             resource = new DefaultLabelResource(deviceId,
349                                                 LabelResourceId
350                                                         .labelResourceId(j));
351             result.add(resource);
352         }
353         long beginLabel = pool.beginLabel().labelId();
354         long endLabel = pool.endLabel().labelId();
355         long totalNum = pool.totalNum();
356         long current = pool.currentUsedMaxLabelId().labelId() + applyNum - tmp;
357         long usedNum = pool.usedNum() + applyNum;
358         ImmutableSet<LabelResource> freeLabel = ImmutableSet
359                 .copyOf(releaseLabels);
360         LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(),
361                                                           beginLabel, endLabel,
362                                                           totalNum, usedNum,
363                                                           current, freeLabel);
364         resourcePool.put(deviceId, newPool);
365         log.info("success to apply label resource");
366         return result;
367     }
368
369     @Override
370     public boolean releaseToDevicePool(Multimap<DeviceId, LabelResource> release) {
371         Map<DeviceId, Collection<LabelResource>> maps = release.asMap();
372         Set<DeviceId> deviceIdSet = maps.keySet();
373         LabelResourceRequest request = null;
374         for (Iterator<DeviceId> it = deviceIdSet.iterator(); it.hasNext();) {
375             DeviceId deviceId = (DeviceId) it.next();
376             Device device = (Device) deviceService.getDevice(deviceId);
377             if (device == null) {
378                 continue;
379             }
380             ImmutableSet<LabelResource> collection = ImmutableSet.copyOf(maps
381                     .get(deviceId));
382             request = new LabelResourceRequest(
383                                                deviceId,
384                                                LabelResourceRequest.Type.RELEASE,
385                                                0, collection);
386             NodeId master = mastershipService.getMasterFor(deviceId);
387
388             if (master == null) {
389                 log.warn("Failed to releaseToDevicePool: No master for {}", deviceId);
390                 return false;
391             }
392
393             if (master.equals(clusterService.getLocalNode().id())) {
394                 return internalRelease(request);
395             }
396
397             log.trace("Forwarding request to {}, which is the primary (master) for device {}",
398                       master, deviceId);
399
400             return complete(clusterCommunicator
401                     .sendAndReceive(request,
402                                     LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
403                                     SERIALIZER::encode, SERIALIZER::decode,
404                                     master));
405         }
406         return false;
407     }
408
409     private boolean internalRelease(LabelResourceRequest request) {
410         DeviceId deviceId = request.deviceId();
411         Collection<LabelResource> release = request.releaseCollection();
412         Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
413         LabelResourcePool pool = poolOld.value();
414         if (pool == null) {
415             log.info("the label resource pool of device id {} does not exist");
416             return false;
417         }
418         Set<LabelResource> storeSet = new HashSet<LabelResource>(
419                                                                  pool.releaseLabelId());
420         LabelResource labelResource = null;
421         long realReleasedNum = 0;
422         for (Iterator<LabelResource> it = release.iterator(); it.hasNext();) {
423             labelResource = it.next();
424             if (labelResource.labelResourceId().labelId() < pool.beginLabel()
425                     .labelId()
426                     || labelResource.labelResourceId().labelId() > pool
427                             .endLabel().labelId()) {
428                 continue;
429             }
430             if (pool.currentUsedMaxLabelId().labelId() > labelResource
431                     .labelResourceId().labelId()
432                     || !storeSet.contains(labelResource)) {
433                 storeSet.add(labelResource);
434                 realReleasedNum++;
435             }
436         }
437         long beginNum = pool.beginLabel().labelId();
438         long endNum = pool.endLabel().labelId();
439         long totalNum = pool.totalNum();
440         long usedNum = pool.usedNum() - realReleasedNum;
441         long current = pool.currentUsedMaxLabelId().labelId();
442         ImmutableSet<LabelResource> s = ImmutableSet.copyOf(storeSet);
443         LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(),
444                                                           beginNum, endNum,
445                                                           totalNum, usedNum,
446                                                           current, s);
447         resourcePool.put(deviceId, newPool);
448         log.info("success to release label resource");
449         return true;
450     }
451
452     @Override
453     public boolean isDevicePoolFull(DeviceId deviceId) {
454         Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
455         if (pool == null) {
456             return true;
457         }
458         return pool.value().currentUsedMaxLabelId() == pool.value().endLabel()
459                 && pool.value().releaseLabelId().size() == 0 ? true : false;
460     }
461
462     @Override
463     public long getFreeNumOfDevicePool(DeviceId deviceId) {
464         Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
465         if (pool == null) {
466             return 0;
467         }
468         return pool.value().endLabel().labelId()
469                 - pool.value().currentUsedMaxLabelId().labelId()
470                 + pool.value().releaseLabelId().size();
471     }
472
473     @Override
474     public LabelResourcePool getDeviceLabelResourcePool(DeviceId deviceId) {
475         Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
476         return pool == null ? null : pool.value();
477     }
478
479     @Override
480     public boolean destroyGlobalPool() {
481         return this.internalDestroy(DeviceId
482                 .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
483     }
484
485     @Override
486     public Collection<LabelResource> applyFromGlobalPool(long applyNum) {
487         LabelResourceRequest request = new LabelResourceRequest(
488                                                                 DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
489                                                                 LabelResourceRequest.Type.APPLY,
490                                                                 applyNum, null);
491         return this.internalApply(request);
492     }
493
494     @Override
495     public boolean releaseToGlobalPool(Set<LabelResourceId> release) {
496         Set<LabelResource> set = new HashSet<LabelResource>();
497         DefaultLabelResource resource = null;
498         for (LabelResourceId labelResource : release) {
499             resource = new DefaultLabelResource(
500                                                 DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
501                                                 labelResource);
502             set.add(resource);
503         }
504         LabelResourceRequest request = new LabelResourceRequest(
505                                                                 DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
506                                                                 LabelResourceRequest.Type.APPLY,
507                                                                 0,
508                                                                 ImmutableSet
509                                                                         .copyOf(set));
510         return this.internalRelease(request);
511     }
512
513     @Override
514     public boolean isGlobalPoolFull() {
515         return this.isDevicePoolFull(DeviceId
516                 .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
517     }
518
519     @Override
520     public long getFreeNumOfGlobalPool() {
521         return this.getFreeNumOfDevicePool(DeviceId
522                 .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
523     }
524
525     @Override
526     public LabelResourcePool getGlobalLabelResourcePool() {
527         return this.getDeviceLabelResourcePool(DeviceId
528                 .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
529     }
530
531     private <T> T complete(Future<T> future) {
532         try {
533             return future.get(PEER_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
534         } catch (InterruptedException e) {
535             Thread.currentThread().interrupt();
536             log.error("Interrupted while waiting for operation to complete.", e);
537             return null;
538         } catch (TimeoutException | ExecutionException e) {
539             log.error("Failed remote operation", e);
540             return null;
541         }
542     }
543 }