2 * Copyright 2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.incubator.store.resource.impl;
18 import static org.onlab.util.Tools.groupedThreads;
19 import static org.slf4j.LoggerFactory.getLogger;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashSet;
24 import java.util.Iterator;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
34 import org.apache.felix.scr.annotations.Activate;
35 import org.apache.felix.scr.annotations.Component;
36 import org.apache.felix.scr.annotations.Deactivate;
37 import org.apache.felix.scr.annotations.Reference;
38 import org.apache.felix.scr.annotations.ReferenceCardinality;
39 import org.apache.felix.scr.annotations.Service;
40 import org.onlab.util.KryoNamespace;
41 import org.onosproject.cluster.ClusterService;
42 import org.onosproject.cluster.NodeId;
43 import org.onosproject.incubator.net.resource.label.DefaultLabelResource;
44 import org.onosproject.incubator.net.resource.label.LabelResource;
45 import org.onosproject.incubator.net.resource.label.LabelResourceDelegate;
46 import org.onosproject.incubator.net.resource.label.LabelResourceEvent;
47 import org.onosproject.incubator.net.resource.label.LabelResourceEvent.Type;
48 import org.onosproject.incubator.net.resource.label.LabelResourceId;
49 import org.onosproject.incubator.net.resource.label.LabelResourcePool;
50 import org.onosproject.incubator.net.resource.label.LabelResourceRequest;
51 import org.onosproject.incubator.net.resource.label.LabelResourceStore;
52 import org.onosproject.mastership.MastershipService;
53 import org.onosproject.net.Device;
54 import org.onosproject.net.DeviceId;
55 import org.onosproject.net.device.DeviceService;
56 import org.onosproject.store.AbstractStore;
57 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
58 import org.onosproject.store.cluster.messaging.ClusterMessage;
59 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
60 import org.onosproject.store.serializers.KryoNamespaces;
61 import org.onosproject.store.service.ConsistentMap;
62 import org.onosproject.store.service.Serializer;
63 import org.onosproject.store.service.StorageService;
64 import org.onosproject.store.service.Versioned;
65 import org.slf4j.Logger;
67 import com.google.common.collect.ImmutableSet;
68 import com.google.common.collect.Multimap;
71 * Manages label resources using copycat.
73 @Component(immediate = true, enabled = true)
75 public class DistributedLabelResourceStore
76 extends AbstractStore<LabelResourceEvent, LabelResourceDelegate>
77 implements LabelResourceStore {
78 private final Logger log = getLogger(getClass());
80 private static final String POOL_MAP_NAME = "labelresourcepool";
82 private static final String GLOBAL_RESOURCE_POOL_DEVICE_ID = "global_resource_pool_device_id";
84 private ConsistentMap<DeviceId, LabelResourcePool> resourcePool = null;
86 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
87 protected StorageService storageService;
89 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
90 protected MastershipService mastershipService;
92 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93 protected ClusterCommunicationService clusterCommunicator;
95 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96 protected ClusterService clusterService;
98 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99 protected DeviceService deviceService;
101 private ExecutorService messageHandlingExecutor;
102 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
103 private static final long PEER_REQUEST_TIMEOUT_MS = 5000;
105 private static final Serializer SERIALIZER = Serializer
106 .using(new KryoNamespace.Builder().register(KryoNamespaces.API)
107 .register(LabelResourceEvent.class)
108 .register(LabelResourcePool.class).register(DeviceId.class)
109 .register(LabelResourceRequest.class)
110 .register(LabelResourceRequest.Type.class)
111 .register(LabelResourceEvent.Type.class)
112 .register(DefaultLabelResource.class)
113 .register(LabelResourceId.class)
114 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build());
117 public void activate() {
119 resourcePool = storageService
120 .<DeviceId, LabelResourcePool>consistentMapBuilder()
121 .withName(POOL_MAP_NAME).withSerializer(SERIALIZER)
122 .withPartitionsDisabled().build();
123 messageHandlingExecutor = Executors
124 .newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
125 groupedThreads("onos/store/flow",
126 "message-handlers"));
128 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED,
129 new ClusterMessageHandler() {
132 public void handle(ClusterMessage message) {
133 LabelResourcePool operation = SERIALIZER
134 .decode(message.payload());
135 log.trace("received get flow entry request for {}",
137 boolean b = internalCreate(operation);
138 message.respond(SERIALIZER.encode(b));
140 }, messageHandlingExecutor);
142 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
143 new ClusterMessageHandler() {
146 public void handle(ClusterMessage message) {
147 DeviceId deviceId = SERIALIZER
148 .decode(message.payload());
149 log.trace("received get flow entry request for {}",
151 boolean b = internalDestroy(deviceId);
152 message.respond(SERIALIZER.encode(b));
154 }, messageHandlingExecutor);
156 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY,
157 new ClusterMessageHandler() {
160 public void handle(ClusterMessage message) {
161 LabelResourceRequest request = SERIALIZER
162 .decode(message.payload());
163 log.trace("received get flow entry request for {}",
165 final Collection<LabelResource> resource = internalApply(request);
166 message.respond(SERIALIZER
169 }, messageHandlingExecutor);
171 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
172 new ClusterMessageHandler() {
175 public void handle(ClusterMessage message) {
176 LabelResourceRequest request = SERIALIZER
177 .decode(message.payload());
178 log.trace("received get flow entry request for {}",
180 final boolean isSuccess = internalRelease(request);
181 message.respond(SERIALIZER
184 }, messageHandlingExecutor);
189 public void deactivate() {
191 .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED);
193 .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY);
195 .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED);
197 .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE);
198 messageHandlingExecutor.shutdown();
203 public boolean createDevicePool(DeviceId deviceId,
204 LabelResourceId beginLabel,
205 LabelResourceId endLabel) {
206 LabelResourcePool pool = new LabelResourcePool(deviceId.toString(),
207 beginLabel.labelId(),
209 return this.create(pool);
213 public boolean createGlobalPool(LabelResourceId beginLabel,
214 LabelResourceId endLabel) {
215 LabelResourcePool pool = new LabelResourcePool(
216 GLOBAL_RESOURCE_POOL_DEVICE_ID,
217 beginLabel.labelId(),
219 return this.internalCreate(pool);
222 private boolean create(LabelResourcePool pool) {
223 Device device = (Device) deviceService.getDevice(pool.deviceId());
224 if (device == null) {
228 NodeId master = mastershipService.getMasterFor(pool.deviceId());
230 if (master == null) {
231 log.warn("Failed to create label resource pool: No master for {}", pool);
235 if (master.equals(clusterService.getLocalNode().id())) {
236 return internalCreate(pool);
239 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
240 master, pool.deviceId());
242 return complete(clusterCommunicator
243 .sendAndReceive(pool,
244 LabelResourceMessageSubjects.LABEL_POOL_CREATED,
245 SERIALIZER::encode, SERIALIZER::decode,
249 private boolean internalCreate(LabelResourcePool pool) {
250 Versioned<LabelResourcePool> poolOld = resourcePool
251 .get(pool.deviceId());
252 if (poolOld == null) {
253 resourcePool.put(pool.deviceId(), pool);
254 LabelResourceEvent event = new LabelResourceEvent(
257 notifyDelegate(event);
264 public boolean destroyDevicePool(DeviceId deviceId) {
265 Device device = (Device) deviceService.getDevice(deviceId);
266 if (device == null) {
270 NodeId master = mastershipService.getMasterFor(deviceId);
272 if (master == null) {
273 log.warn("Failed to destroyDevicePool. No master for {}", deviceId);
277 if (master.equals(clusterService.getLocalNode().id())) {
278 return internalDestroy(deviceId);
281 log.trace("Forwarding request to {}, which is the primary (master) for device {}",
284 return complete(clusterCommunicator
285 .sendAndReceive(deviceId,
286 LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
287 SERIALIZER::encode, SERIALIZER::decode,
291 private boolean internalDestroy(DeviceId deviceId) {
292 Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
293 if (poolOld != null) {
294 resourcePool.remove(deviceId);
295 LabelResourceEvent event = new LabelResourceEvent(
298 notifyDelegate(event);
300 log.info("success to destroy the label resource pool of device id {}",
306 public Collection<LabelResource> applyFromDevicePool(DeviceId deviceId,
308 Device device = (Device) deviceService.getDevice(deviceId);
309 if (device == null) {
310 return Collections.emptyList();
312 LabelResourceRequest request = new LabelResourceRequest(
314 LabelResourceRequest.Type.APPLY,
316 NodeId master = mastershipService.getMasterFor(deviceId);
318 if (master == null) {
319 log.warn("Failed to applyFromDevicePool: No master for {}", deviceId);
320 return Collections.emptyList();
323 if (master.equals(clusterService.getLocalNode().id())) {
324 return internalApply(request);
327 log.trace("Forwarding request to {}, which is the primary (master) for device {}",
330 return complete(clusterCommunicator
331 .sendAndReceive(request,
332 LabelResourceMessageSubjects.LABEL_POOL_APPLY,
333 SERIALIZER::encode, SERIALIZER::decode,
337 private Collection<LabelResource> internalApply(LabelResourceRequest request) {
338 DeviceId deviceId = request.deviceId();
339 long applyNum = request.applyNum();
340 Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
341 LabelResourcePool pool = poolOld.value();
342 Collection<LabelResource> result = new HashSet<LabelResource>();
343 long freeNum = this.getFreeNumOfDevicePool(deviceId);
344 if (applyNum > freeNum) {
345 log.info("the free number of the label resource pool of deviceId {} is not enough.");
346 return Collections.emptyList();
348 Set<LabelResource> releaseLabels = new HashSet<LabelResource>(
349 pool.releaseLabelId());
350 long tmp = releaseLabels.size() > applyNum ? applyNum : releaseLabels
352 LabelResource resource = null;
353 for (int i = 0; i < tmp; i++) {
354 Iterator<LabelResource> it = releaseLabels.iterator();
356 resource = it.next();
357 releaseLabels.remove(resource);
359 result.add(resource);
361 for (long j = pool.currentUsedMaxLabelId().labelId(); j < pool
362 .currentUsedMaxLabelId().labelId() + applyNum - tmp; j++) {
363 resource = new DefaultLabelResource(deviceId,
365 .labelResourceId(j));
366 result.add(resource);
368 long beginLabel = pool.beginLabel().labelId();
369 long endLabel = pool.endLabel().labelId();
370 long totalNum = pool.totalNum();
371 long current = pool.currentUsedMaxLabelId().labelId() + applyNum - tmp;
372 long usedNum = pool.usedNum() + applyNum;
373 ImmutableSet<LabelResource> freeLabel = ImmutableSet
374 .copyOf(releaseLabels);
375 LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(),
376 beginLabel, endLabel,
379 resourcePool.put(deviceId, newPool);
380 log.info("success to apply label resource");
385 public boolean releaseToDevicePool(Multimap<DeviceId, LabelResource> release) {
386 Map<DeviceId, Collection<LabelResource>> maps = release.asMap();
387 Set<DeviceId> deviceIdSet = maps.keySet();
388 LabelResourceRequest request = null;
389 for (Iterator<DeviceId> it = deviceIdSet.iterator(); it.hasNext();) {
390 DeviceId deviceId = (DeviceId) it.next();
391 Device device = (Device) deviceService.getDevice(deviceId);
392 if (device == null) {
395 ImmutableSet<LabelResource> collection = ImmutableSet.copyOf(maps
397 request = new LabelResourceRequest(
399 LabelResourceRequest.Type.RELEASE,
401 NodeId master = mastershipService.getMasterFor(deviceId);
403 if (master == null) {
404 log.warn("Failed to releaseToDevicePool: No master for {}", deviceId);
408 if (master.equals(clusterService.getLocalNode().id())) {
409 return internalRelease(request);
412 log.trace("Forwarding request to {}, which is the primary (master) for device {}",
415 return complete(clusterCommunicator
416 .sendAndReceive(request,
417 LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
418 SERIALIZER::encode, SERIALIZER::decode,
424 private boolean internalRelease(LabelResourceRequest request) {
425 DeviceId deviceId = request.deviceId();
426 Collection<LabelResource> release = request.releaseCollection();
427 Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
428 LabelResourcePool pool = poolOld.value();
430 log.info("the label resource pool of device id {} does not exist");
433 Set<LabelResource> storeSet = new HashSet<LabelResource>(
434 pool.releaseLabelId());
435 LabelResource labelResource = null;
436 long realReleasedNum = 0;
437 for (Iterator<LabelResource> it = release.iterator(); it.hasNext();) {
438 labelResource = it.next();
439 if (labelResource.labelResourceId().labelId() < pool.beginLabel()
441 || labelResource.labelResourceId().labelId() > pool
442 .endLabel().labelId()) {
445 if (pool.currentUsedMaxLabelId().labelId() > labelResource
446 .labelResourceId().labelId()
447 || !storeSet.contains(labelResource)) {
448 storeSet.add(labelResource);
452 long beginNum = pool.beginLabel().labelId();
453 long endNum = pool.endLabel().labelId();
454 long totalNum = pool.totalNum();
455 long usedNum = pool.usedNum() - realReleasedNum;
456 long current = pool.currentUsedMaxLabelId().labelId();
457 ImmutableSet<LabelResource> s = ImmutableSet.copyOf(storeSet);
458 LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(),
462 resourcePool.put(deviceId, newPool);
463 log.info("success to release label resource");
468 public boolean isDevicePoolFull(DeviceId deviceId) {
469 Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
473 return pool.value().currentUsedMaxLabelId() == pool.value().endLabel()
474 && pool.value().releaseLabelId().size() == 0 ? true : false;
478 public long getFreeNumOfDevicePool(DeviceId deviceId) {
479 Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
483 return pool.value().endLabel().labelId()
484 - pool.value().currentUsedMaxLabelId().labelId()
485 + pool.value().releaseLabelId().size();
489 public LabelResourcePool getDeviceLabelResourcePool(DeviceId deviceId) {
490 Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
491 return pool == null ? null : pool.value();
495 public boolean destroyGlobalPool() {
496 return this.internalDestroy(DeviceId
497 .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
501 public Collection<LabelResource> applyFromGlobalPool(long applyNum) {
502 LabelResourceRequest request = new LabelResourceRequest(
503 DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
504 LabelResourceRequest.Type.APPLY,
506 return this.internalApply(request);
510 public boolean releaseToGlobalPool(Set<LabelResourceId> release) {
511 Set<LabelResource> set = new HashSet<LabelResource>();
512 DefaultLabelResource resource = null;
513 for (LabelResourceId labelResource : release) {
514 resource = new DefaultLabelResource(
515 DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
519 LabelResourceRequest request = new LabelResourceRequest(
520 DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
521 LabelResourceRequest.Type.APPLY,
525 return this.internalRelease(request);
529 public boolean isGlobalPoolFull() {
530 return this.isDevicePoolFull(DeviceId
531 .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
535 public long getFreeNumOfGlobalPool() {
536 return this.getFreeNumOfDevicePool(DeviceId
537 .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
541 public LabelResourcePool getGlobalLabelResourcePool() {
542 return this.getDeviceLabelResourcePool(DeviceId
543 .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
546 private <T> T complete(Future<T> future) {
548 return future.get(PEER_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
549 } catch (InterruptedException e) {
550 Thread.currentThread().interrupt();
551 log.error("Interrupted while waiting for operation to complete.", e);
553 } catch (TimeoutException | ExecutionException e) {
554 log.error("Failed remote operation", e);