d129def724302f169f32a886e3ae7169cefa6fc4
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.incubator.store.resource.impl;
17
18 import static org.onlab.util.Tools.groupedThreads;
19 import static org.slf4j.LoggerFactory.getLogger;
20
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashSet;
24 import java.util.Iterator;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33
34 import org.apache.felix.scr.annotations.Activate;
35 import org.apache.felix.scr.annotations.Component;
36 import org.apache.felix.scr.annotations.Deactivate;
37 import org.apache.felix.scr.annotations.Reference;
38 import org.apache.felix.scr.annotations.ReferenceCardinality;
39 import org.apache.felix.scr.annotations.Service;
40 import org.onlab.util.KryoNamespace;
41 import org.onosproject.cluster.ClusterService;
42 import org.onosproject.cluster.NodeId;
43 import org.onosproject.incubator.net.resource.label.DefaultLabelResource;
44 import org.onosproject.incubator.net.resource.label.LabelResource;
45 import org.onosproject.incubator.net.resource.label.LabelResourceDelegate;
46 import org.onosproject.incubator.net.resource.label.LabelResourceEvent;
47 import org.onosproject.incubator.net.resource.label.LabelResourceEvent.Type;
48 import org.onosproject.incubator.net.resource.label.LabelResourceId;
49 import org.onosproject.incubator.net.resource.label.LabelResourcePool;
50 import org.onosproject.incubator.net.resource.label.LabelResourceRequest;
51 import org.onosproject.incubator.net.resource.label.LabelResourceStore;
52 import org.onosproject.mastership.MastershipService;
53 import org.onosproject.net.Device;
54 import org.onosproject.net.DeviceId;
55 import org.onosproject.net.device.DeviceService;
56 import org.onosproject.store.AbstractStore;
57 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
58 import org.onosproject.store.cluster.messaging.ClusterMessage;
59 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
60 import org.onosproject.store.serializers.KryoNamespaces;
61 import org.onosproject.store.service.ConsistentMap;
62 import org.onosproject.store.service.Serializer;
63 import org.onosproject.store.service.StorageService;
64 import org.onosproject.store.service.Versioned;
65 import org.slf4j.Logger;
66
67 import com.google.common.collect.ImmutableSet;
68 import com.google.common.collect.Multimap;
69
70 /**
71  * Manages label resources using copycat.
72  */
73 @Component(immediate = true, enabled = true)
74 @Service
75 public class DistributedLabelResourceStore
76         extends AbstractStore<LabelResourceEvent, LabelResourceDelegate>
77         implements LabelResourceStore {
78     private final Logger log = getLogger(getClass());
79
80     private static final String POOL_MAP_NAME = "labelresourcepool";
81
82     private static final String GLOBAL_RESOURCE_POOL_DEVICE_ID = "global_resource_pool_device_id";
83
84     private ConsistentMap<DeviceId, LabelResourcePool> resourcePool = null;
85
86     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
87     protected StorageService storageService;
88
89     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
90     protected MastershipService mastershipService;
91
92     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93     protected ClusterCommunicationService clusterCommunicator;
94
95     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96     protected ClusterService clusterService;
97
98     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99     protected DeviceService deviceService;
100
101     private ExecutorService messageHandlingExecutor;
102     private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
103     private static final long PEER_REQUEST_TIMEOUT_MS = 5000;
104
105     private static final Serializer SERIALIZER = Serializer
106             .using(new KryoNamespace.Builder().register(KryoNamespaces.API)
107                     .register(LabelResourceEvent.class)
108                     .register(LabelResourcePool.class).register(DeviceId.class)
109                     .register(LabelResourceRequest.class)
110                     .register(LabelResourceRequest.Type.class)
111                     .register(LabelResourceEvent.Type.class)
112                     .register(DefaultLabelResource.class)
113                     .register(LabelResourceId.class)
114                     .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build());
115
116     @Activate
117     public void activate() {
118
119         resourcePool = storageService
120                 .<DeviceId, LabelResourcePool>consistentMapBuilder()
121                 .withName(POOL_MAP_NAME).withSerializer(SERIALIZER)
122                 .withPartitionsDisabled().build();
123         messageHandlingExecutor = Executors
124                 .newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
125                                     groupedThreads("onos/store/flow",
126                                                    "message-handlers"));
127         clusterCommunicator
128                 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED,
129                                new ClusterMessageHandler() {
130
131                                    @Override
132                                    public void handle(ClusterMessage message) {
133                                        LabelResourcePool operation = SERIALIZER
134                                                .decode(message.payload());
135                                        log.trace("received get flow entry request for {}",
136                                                  operation);
137                                        boolean b = internalCreate(operation);
138                                        message.respond(SERIALIZER.encode(b));
139                                    }
140                                }, messageHandlingExecutor);
141         clusterCommunicator
142                 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
143                                new ClusterMessageHandler() {
144
145                                    @Override
146                                    public void handle(ClusterMessage message) {
147                                        DeviceId deviceId = SERIALIZER
148                                                .decode(message.payload());
149                                        log.trace("received get flow entry request for {}",
150                                                  deviceId);
151                                        boolean b = internalDestroy(deviceId);
152                                        message.respond(SERIALIZER.encode(b));
153                                    }
154                                }, messageHandlingExecutor);
155         clusterCommunicator
156                 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY,
157                                new ClusterMessageHandler() {
158
159                                    @Override
160                                    public void handle(ClusterMessage message) {
161                                        LabelResourceRequest request = SERIALIZER
162                                                .decode(message.payload());
163                                        log.trace("received get flow entry request for {}",
164                                                  request);
165                                        final Collection<LabelResource> resource = internalApply(request);
166                                        message.respond(SERIALIZER
167                                                .encode(resource));
168                                    }
169                                }, messageHandlingExecutor);
170         clusterCommunicator
171                 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
172                                new ClusterMessageHandler() {
173
174                                    @Override
175                                    public void handle(ClusterMessage message) {
176                                        LabelResourceRequest request = SERIALIZER
177                                                .decode(message.payload());
178                                        log.trace("received get flow entry request for {}",
179                                                  request);
180                                        final boolean isSuccess = internalRelease(request);
181                                        message.respond(SERIALIZER
182                                                .encode(isSuccess));
183                                    }
184                                }, messageHandlingExecutor);
185         log.info("Started");
186     }
187
188     @Deactivate
189     public void deactivate() {
190         clusterCommunicator
191                 .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED);
192         clusterCommunicator
193                 .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY);
194         clusterCommunicator
195                 .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED);
196         clusterCommunicator
197                 .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE);
198         messageHandlingExecutor.shutdown();
199         log.info("Stopped");
200     }
201
202     @Override
203     public boolean createDevicePool(DeviceId deviceId,
204                                     LabelResourceId beginLabel,
205                                     LabelResourceId endLabel) {
206         LabelResourcePool pool = new LabelResourcePool(deviceId.toString(),
207                                                        beginLabel.labelId(),
208                                                        endLabel.labelId());
209         return this.create(pool);
210     }
211
212     @Override
213     public boolean createGlobalPool(LabelResourceId beginLabel,
214                                     LabelResourceId endLabel) {
215         LabelResourcePool pool = new LabelResourcePool(
216                                                        GLOBAL_RESOURCE_POOL_DEVICE_ID,
217                                                        beginLabel.labelId(),
218                                                        endLabel.labelId());
219         return this.internalCreate(pool);
220     }
221
222     private boolean create(LabelResourcePool pool) {
223         Device device = (Device) deviceService.getDevice(pool.deviceId());
224         if (device == null) {
225             return false;
226         }
227
228         NodeId master = mastershipService.getMasterFor(pool.deviceId());
229
230         if (master == null) {
231             log.warn("Failed to create label resource pool: No master for {}", pool);
232             return false;
233         }
234
235         if (master.equals(clusterService.getLocalNode().id())) {
236             return internalCreate(pool);
237         }
238
239         log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
240                   master, pool.deviceId());
241
242         return complete(clusterCommunicator
243                 .sendAndReceive(pool,
244                                 LabelResourceMessageSubjects.LABEL_POOL_CREATED,
245                                 SERIALIZER::encode, SERIALIZER::decode,
246                                 master));
247     }
248
249     private boolean internalCreate(LabelResourcePool pool) {
250         Versioned<LabelResourcePool> poolOld = resourcePool
251                 .get(pool.deviceId());
252         if (poolOld == null) {
253             resourcePool.put(pool.deviceId(), pool);
254             LabelResourceEvent event = new LabelResourceEvent(
255                                                               Type.POOL_CREATED,
256                                                               pool);
257             notifyDelegate(event);
258             return true;
259         }
260         return false;
261     }
262
263     @Override
264     public boolean destroyDevicePool(DeviceId deviceId) {
265         Device device = (Device) deviceService.getDevice(deviceId);
266         if (device == null) {
267             return false;
268         }
269
270         NodeId master = mastershipService.getMasterFor(deviceId);
271
272         if (master == null) {
273             log.warn("Failed to destroyDevicePool. No master for {}", deviceId);
274             return false;
275         }
276
277         if (master.equals(clusterService.getLocalNode().id())) {
278             return internalDestroy(deviceId);
279         }
280
281         log.trace("Forwarding request to {}, which is the primary (master) for device {}",
282                   master, deviceId);
283
284         return complete(clusterCommunicator
285                 .sendAndReceive(deviceId,
286                                 LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
287                                 SERIALIZER::encode, SERIALIZER::decode,
288                                 master));
289     }
290
291     private boolean internalDestroy(DeviceId deviceId) {
292         Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
293         if (poolOld != null) {
294             resourcePool.remove(deviceId);
295             LabelResourceEvent event = new LabelResourceEvent(
296                                                               Type.POOL_CREATED,
297                                                               poolOld.value());
298             notifyDelegate(event);
299         }
300         log.info("success to destroy the label resource pool of device id {}",
301                  deviceId);
302         return true;
303     }
304
305     @Override
306     public Collection<LabelResource> applyFromDevicePool(DeviceId deviceId,
307                                                          long applyNum) {
308         Device device = (Device) deviceService.getDevice(deviceId);
309         if (device == null) {
310             return Collections.emptyList();
311         }
312         LabelResourceRequest request = new LabelResourceRequest(
313                                                                 deviceId,
314                                                                 LabelResourceRequest.Type.APPLY,
315                                                                 applyNum, null);
316         NodeId master = mastershipService.getMasterFor(deviceId);
317
318         if (master == null) {
319             log.warn("Failed to applyFromDevicePool: No master for {}", deviceId);
320             return Collections.emptyList();
321         }
322
323         if (master.equals(clusterService.getLocalNode().id())) {
324             return internalApply(request);
325         }
326
327         log.trace("Forwarding request to {}, which is the primary (master) for device {}",
328                   master, deviceId);
329
330         return complete(clusterCommunicator
331                 .sendAndReceive(request,
332                                 LabelResourceMessageSubjects.LABEL_POOL_APPLY,
333                                 SERIALIZER::encode, SERIALIZER::decode,
334                                 master));
335     }
336
337     private Collection<LabelResource> internalApply(LabelResourceRequest request) {
338         DeviceId deviceId = request.deviceId();
339         long applyNum = request.applyNum();
340         Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
341         LabelResourcePool pool = poolOld.value();
342         Collection<LabelResource> result = new HashSet<LabelResource>();
343         long freeNum = this.getFreeNumOfDevicePool(deviceId);
344         if (applyNum > freeNum) {
345             log.info("the free number of the label resource pool of deviceId {} is not enough.");
346             return Collections.emptyList();
347         }
348         Set<LabelResource> releaseLabels = new HashSet<LabelResource>(
349                                                                       pool.releaseLabelId());
350         long tmp = releaseLabels.size() > applyNum ? applyNum : releaseLabels
351                 .size();
352         LabelResource resource = null;
353         for (int i = 0; i < tmp; i++) {
354             Iterator<LabelResource> it = releaseLabels.iterator();
355             if (it.hasNext()) {
356                 resource = it.next();
357                 releaseLabels.remove(resource);
358             }
359             result.add(resource);
360         }
361         for (long j = pool.currentUsedMaxLabelId().labelId(); j < pool
362                 .currentUsedMaxLabelId().labelId() + applyNum - tmp; j++) {
363             resource = new DefaultLabelResource(deviceId,
364                                                 LabelResourceId
365                                                         .labelResourceId(j));
366             result.add(resource);
367         }
368         long beginLabel = pool.beginLabel().labelId();
369         long endLabel = pool.endLabel().labelId();
370         long totalNum = pool.totalNum();
371         long current = pool.currentUsedMaxLabelId().labelId() + applyNum - tmp;
372         long usedNum = pool.usedNum() + applyNum;
373         ImmutableSet<LabelResource> freeLabel = ImmutableSet
374                 .copyOf(releaseLabels);
375         LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(),
376                                                           beginLabel, endLabel,
377                                                           totalNum, usedNum,
378                                                           current, freeLabel);
379         resourcePool.put(deviceId, newPool);
380         log.info("success to apply label resource");
381         return result;
382     }
383
384     @Override
385     public boolean releaseToDevicePool(Multimap<DeviceId, LabelResource> release) {
386         Map<DeviceId, Collection<LabelResource>> maps = release.asMap();
387         Set<DeviceId> deviceIdSet = maps.keySet();
388         LabelResourceRequest request = null;
389         for (Iterator<DeviceId> it = deviceIdSet.iterator(); it.hasNext();) {
390             DeviceId deviceId = (DeviceId) it.next();
391             Device device = (Device) deviceService.getDevice(deviceId);
392             if (device == null) {
393                 continue;
394             }
395             ImmutableSet<LabelResource> collection = ImmutableSet.copyOf(maps
396                     .get(deviceId));
397             request = new LabelResourceRequest(
398                                                deviceId,
399                                                LabelResourceRequest.Type.RELEASE,
400                                                0, collection);
401             NodeId master = mastershipService.getMasterFor(deviceId);
402
403             if (master == null) {
404                 log.warn("Failed to releaseToDevicePool: No master for {}", deviceId);
405                 return false;
406             }
407
408             if (master.equals(clusterService.getLocalNode().id())) {
409                 return internalRelease(request);
410             }
411
412             log.trace("Forwarding request to {}, which is the primary (master) for device {}",
413                       master, deviceId);
414
415             return complete(clusterCommunicator
416                     .sendAndReceive(request,
417                                     LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
418                                     SERIALIZER::encode, SERIALIZER::decode,
419                                     master));
420         }
421         return false;
422     }
423
424     private boolean internalRelease(LabelResourceRequest request) {
425         DeviceId deviceId = request.deviceId();
426         Collection<LabelResource> release = request.releaseCollection();
427         Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
428         LabelResourcePool pool = poolOld.value();
429         if (pool == null) {
430             log.info("the label resource pool of device id {} does not exist");
431             return false;
432         }
433         Set<LabelResource> storeSet = new HashSet<LabelResource>(
434                                                                  pool.releaseLabelId());
435         LabelResource labelResource = null;
436         long realReleasedNum = 0;
437         for (Iterator<LabelResource> it = release.iterator(); it.hasNext();) {
438             labelResource = it.next();
439             if (labelResource.labelResourceId().labelId() < pool.beginLabel()
440                     .labelId()
441                     || labelResource.labelResourceId().labelId() > pool
442                             .endLabel().labelId()) {
443                 continue;
444             }
445             if (pool.currentUsedMaxLabelId().labelId() > labelResource
446                     .labelResourceId().labelId()
447                     || !storeSet.contains(labelResource)) {
448                 storeSet.add(labelResource);
449                 realReleasedNum++;
450             }
451         }
452         long beginNum = pool.beginLabel().labelId();
453         long endNum = pool.endLabel().labelId();
454         long totalNum = pool.totalNum();
455         long usedNum = pool.usedNum() - realReleasedNum;
456         long current = pool.currentUsedMaxLabelId().labelId();
457         ImmutableSet<LabelResource> s = ImmutableSet.copyOf(storeSet);
458         LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(),
459                                                           beginNum, endNum,
460                                                           totalNum, usedNum,
461                                                           current, s);
462         resourcePool.put(deviceId, newPool);
463         log.info("success to release label resource");
464         return true;
465     }
466
467     @Override
468     public boolean isDevicePoolFull(DeviceId deviceId) {
469         Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
470         if (pool == null) {
471             return true;
472         }
473         return pool.value().currentUsedMaxLabelId() == pool.value().endLabel()
474                 && pool.value().releaseLabelId().size() == 0 ? true : false;
475     }
476
477     @Override
478     public long getFreeNumOfDevicePool(DeviceId deviceId) {
479         Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
480         if (pool == null) {
481             return 0;
482         }
483         return pool.value().endLabel().labelId()
484                 - pool.value().currentUsedMaxLabelId().labelId()
485                 + pool.value().releaseLabelId().size();
486     }
487
488     @Override
489     public LabelResourcePool getDeviceLabelResourcePool(DeviceId deviceId) {
490         Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
491         return pool == null ? null : pool.value();
492     }
493
494     @Override
495     public boolean destroyGlobalPool() {
496         return this.internalDestroy(DeviceId
497                 .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
498     }
499
500     @Override
501     public Collection<LabelResource> applyFromGlobalPool(long applyNum) {
502         LabelResourceRequest request = new LabelResourceRequest(
503                                                                 DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
504                                                                 LabelResourceRequest.Type.APPLY,
505                                                                 applyNum, null);
506         return this.internalApply(request);
507     }
508
509     @Override
510     public boolean releaseToGlobalPool(Set<LabelResourceId> release) {
511         Set<LabelResource> set = new HashSet<LabelResource>();
512         DefaultLabelResource resource = null;
513         for (LabelResourceId labelResource : release) {
514             resource = new DefaultLabelResource(
515                                                 DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
516                                                 labelResource);
517             set.add(resource);
518         }
519         LabelResourceRequest request = new LabelResourceRequest(
520                                                                 DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
521                                                                 LabelResourceRequest.Type.APPLY,
522                                                                 0,
523                                                                 ImmutableSet
524                                                                         .copyOf(set));
525         return this.internalRelease(request);
526     }
527
528     @Override
529     public boolean isGlobalPoolFull() {
530         return this.isDevicePoolFull(DeviceId
531                 .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
532     }
533
534     @Override
535     public long getFreeNumOfGlobalPool() {
536         return this.getFreeNumOfDevicePool(DeviceId
537                 .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
538     }
539
540     @Override
541     public LabelResourcePool getGlobalLabelResourcePool() {
542         return this.getDeviceLabelResourcePool(DeviceId
543                 .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
544     }
545
546     private <T> T complete(Future<T> future) {
547         try {
548             return future.get(PEER_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
549         } catch (InterruptedException e) {
550             Thread.currentThread().interrupt();
551             log.error("Interrupted while waiting for operation to complete.", e);
552             return null;
553         } catch (TimeoutException | ExecutionException e) {
554             log.error("Failed remote operation", e);
555             return null;
556         }
557     }
558 }