1 package org.onosproject.incubator.store.resource.impl;
3 import static org.onlab.util.Tools.groupedThreads;
4 import static org.slf4j.LoggerFactory.getLogger;
6 import java.util.Collection;
7 import java.util.Collections;
8 import java.util.HashSet;
9 import java.util.Iterator;
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.TimeUnit;
17 import java.util.concurrent.TimeoutException;
19 import org.apache.felix.scr.annotations.Activate;
20 import org.apache.felix.scr.annotations.Component;
21 import org.apache.felix.scr.annotations.Deactivate;
22 import org.apache.felix.scr.annotations.Reference;
23 import org.apache.felix.scr.annotations.ReferenceCardinality;
24 import org.apache.felix.scr.annotations.Service;
25 import org.onlab.util.KryoNamespace;
26 import org.onosproject.cluster.ClusterService;
27 import org.onosproject.cluster.NodeId;
28 import org.onosproject.incubator.net.resource.label.DefaultLabelResource;
29 import org.onosproject.incubator.net.resource.label.LabelResource;
30 import org.onosproject.incubator.net.resource.label.LabelResourceDelegate;
31 import org.onosproject.incubator.net.resource.label.LabelResourceEvent;
32 import org.onosproject.incubator.net.resource.label.LabelResourceEvent.Type;
33 import org.onosproject.incubator.net.resource.label.LabelResourceId;
34 import org.onosproject.incubator.net.resource.label.LabelResourcePool;
35 import org.onosproject.incubator.net.resource.label.LabelResourceRequest;
36 import org.onosproject.incubator.net.resource.label.LabelResourceStore;
37 import org.onosproject.mastership.MastershipService;
38 import org.onosproject.net.Device;
39 import org.onosproject.net.DeviceId;
40 import org.onosproject.net.device.DeviceService;
41 import org.onosproject.store.AbstractStore;
42 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
43 import org.onosproject.store.cluster.messaging.ClusterMessage;
44 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
45 import org.onosproject.store.serializers.KryoNamespaces;
46 import org.onosproject.store.service.ConsistentMap;
47 import org.onosproject.store.service.Serializer;
48 import org.onosproject.store.service.StorageService;
49 import org.onosproject.store.service.Versioned;
50 import org.slf4j.Logger;
52 import com.google.common.collect.ImmutableSet;
53 import com.google.common.collect.Multimap;
56 * Manages label resources using copycat.
58 @Component(immediate = true, enabled = true)
60 public class DistributedLabelResourceStore
61 extends AbstractStore<LabelResourceEvent, LabelResourceDelegate>
62 implements LabelResourceStore {
63 private final Logger log = getLogger(getClass());
65 private static final String POOL_MAP_NAME = "labelresourcepool";
67 private static final String GLOBAL_RESOURCE_POOL_DEVICE_ID = "global_resource_pool_device_id";
69 private ConsistentMap<DeviceId, LabelResourcePool> resourcePool = null;
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 protected StorageService storageService;
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 protected MastershipService mastershipService;
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected ClusterCommunicationService clusterCommunicator;
80 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 protected ClusterService clusterService;
83 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
84 protected DeviceService deviceService;
86 private ExecutorService messageHandlingExecutor;
87 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
88 private static final long PEER_REQUEST_TIMEOUT_MS = 5000;
90 private static final Serializer SERIALIZER = Serializer
91 .using(new KryoNamespace.Builder().register(KryoNamespaces.API)
92 .register(LabelResourceEvent.class)
93 .register(LabelResourcePool.class).register(DeviceId.class)
94 .register(LabelResourceRequest.class)
95 .register(LabelResourceRequest.Type.class)
96 .register(LabelResourceEvent.Type.class)
97 .register(DefaultLabelResource.class)
98 .register(LabelResourceId.class)
99 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build());
102 public void activate() {
104 resourcePool = storageService
105 .<DeviceId, LabelResourcePool>consistentMapBuilder()
106 .withName(POOL_MAP_NAME).withSerializer(SERIALIZER)
107 .withPartitionsDisabled().build();
108 messageHandlingExecutor = Executors
109 .newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
110 groupedThreads("onos/store/flow",
111 "message-handlers"));
113 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED,
114 new ClusterMessageHandler() {
117 public void handle(ClusterMessage message) {
118 LabelResourcePool operation = SERIALIZER
119 .decode(message.payload());
120 log.trace("received get flow entry request for {}",
122 boolean b = internalCreate(operation);
123 message.respond(SERIALIZER.encode(b));
125 }, messageHandlingExecutor);
127 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
128 new ClusterMessageHandler() {
131 public void handle(ClusterMessage message) {
132 DeviceId deviceId = SERIALIZER
133 .decode(message.payload());
134 log.trace("received get flow entry request for {}",
136 boolean b = internalDestroy(deviceId);
137 message.respond(SERIALIZER.encode(b));
139 }, messageHandlingExecutor);
141 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY,
142 new ClusterMessageHandler() {
145 public void handle(ClusterMessage message) {
146 LabelResourceRequest request = SERIALIZER
147 .decode(message.payload());
148 log.trace("received get flow entry request for {}",
150 final Collection<LabelResource> resource = internalApply(request);
151 message.respond(SERIALIZER
154 }, messageHandlingExecutor);
156 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
157 new ClusterMessageHandler() {
160 public void handle(ClusterMessage message) {
161 LabelResourceRequest request = SERIALIZER
162 .decode(message.payload());
163 log.trace("received get flow entry request for {}",
165 final boolean isSuccess = internalRelease(request);
166 message.respond(SERIALIZER
169 }, messageHandlingExecutor);
174 public void deactivate() {
176 .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED);
178 .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY);
180 .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED);
182 .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE);
183 messageHandlingExecutor.shutdown();
188 public boolean createDevicePool(DeviceId deviceId,
189 LabelResourceId beginLabel,
190 LabelResourceId endLabel) {
191 LabelResourcePool pool = new LabelResourcePool(deviceId.toString(),
192 beginLabel.labelId(),
194 return this.create(pool);
198 public boolean createGlobalPool(LabelResourceId beginLabel,
199 LabelResourceId endLabel) {
200 LabelResourcePool pool = new LabelResourcePool(
201 GLOBAL_RESOURCE_POOL_DEVICE_ID,
202 beginLabel.labelId(),
204 return this.internalCreate(pool);
207 private boolean create(LabelResourcePool pool) {
208 Device device = (Device) deviceService.getDevice(pool.deviceId());
209 if (device == null) {
213 NodeId master = mastershipService.getMasterFor(pool.deviceId());
215 if (master == null) {
216 log.warn("Failed to create label resource pool: No master for {}", pool);
220 if (master.equals(clusterService.getLocalNode().id())) {
221 return internalCreate(pool);
224 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
225 master, pool.deviceId());
227 return complete(clusterCommunicator
228 .sendAndReceive(pool,
229 LabelResourceMessageSubjects.LABEL_POOL_CREATED,
230 SERIALIZER::encode, SERIALIZER::decode,
234 private boolean internalCreate(LabelResourcePool pool) {
235 Versioned<LabelResourcePool> poolOld = resourcePool
236 .get(pool.deviceId());
237 if (poolOld == null) {
238 resourcePool.put(pool.deviceId(), pool);
239 LabelResourceEvent event = new LabelResourceEvent(
242 notifyDelegate(event);
249 public boolean destroyDevicePool(DeviceId deviceId) {
250 Device device = (Device) deviceService.getDevice(deviceId);
251 if (device == null) {
255 NodeId master = mastershipService.getMasterFor(deviceId);
257 if (master == null) {
258 log.warn("Failed to destroyDevicePool. No master for {}", deviceId);
262 if (master.equals(clusterService.getLocalNode().id())) {
263 return internalDestroy(deviceId);
266 log.trace("Forwarding request to {}, which is the primary (master) for device {}",
269 return complete(clusterCommunicator
270 .sendAndReceive(deviceId,
271 LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
272 SERIALIZER::encode, SERIALIZER::decode,
276 private boolean internalDestroy(DeviceId deviceId) {
277 Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
278 if (poolOld != null) {
279 resourcePool.remove(deviceId);
280 LabelResourceEvent event = new LabelResourceEvent(
283 notifyDelegate(event);
285 log.info("success to destroy the label resource pool of device id {}",
291 public Collection<LabelResource> applyFromDevicePool(DeviceId deviceId,
293 Device device = (Device) deviceService.getDevice(deviceId);
294 if (device == null) {
295 return Collections.emptyList();
297 LabelResourceRequest request = new LabelResourceRequest(
299 LabelResourceRequest.Type.APPLY,
301 NodeId master = mastershipService.getMasterFor(deviceId);
303 if (master == null) {
304 log.warn("Failed to applyFromDevicePool: No master for {}", deviceId);
305 return Collections.emptyList();
308 if (master.equals(clusterService.getLocalNode().id())) {
309 return internalApply(request);
312 log.trace("Forwarding request to {}, which is the primary (master) for device {}",
315 return complete(clusterCommunicator
316 .sendAndReceive(request,
317 LabelResourceMessageSubjects.LABEL_POOL_APPLY,
318 SERIALIZER::encode, SERIALIZER::decode,
322 private Collection<LabelResource> internalApply(LabelResourceRequest request) {
323 DeviceId deviceId = request.deviceId();
324 long applyNum = request.applyNum();
325 Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
326 LabelResourcePool pool = poolOld.value();
327 Collection<LabelResource> result = new HashSet<LabelResource>();
328 long freeNum = this.getFreeNumOfDevicePool(deviceId);
329 if (applyNum > freeNum) {
330 log.info("the free number of the label resource pool of deviceId {} is not enough.");
331 return Collections.emptyList();
333 Set<LabelResource> releaseLabels = new HashSet<LabelResource>(
334 pool.releaseLabelId());
335 long tmp = releaseLabels.size() > applyNum ? applyNum : releaseLabels
337 LabelResource resource = null;
338 for (int i = 0; i < tmp; i++) {
339 Iterator<LabelResource> it = releaseLabels.iterator();
341 resource = it.next();
342 releaseLabels.remove(resource);
344 result.add(resource);
346 for (long j = pool.currentUsedMaxLabelId().labelId(); j < pool
347 .currentUsedMaxLabelId().labelId() + applyNum - tmp; j++) {
348 resource = new DefaultLabelResource(deviceId,
350 .labelResourceId(j));
351 result.add(resource);
353 long beginLabel = pool.beginLabel().labelId();
354 long endLabel = pool.endLabel().labelId();
355 long totalNum = pool.totalNum();
356 long current = pool.currentUsedMaxLabelId().labelId() + applyNum - tmp;
357 long usedNum = pool.usedNum() + applyNum;
358 ImmutableSet<LabelResource> freeLabel = ImmutableSet
359 .copyOf(releaseLabels);
360 LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(),
361 beginLabel, endLabel,
364 resourcePool.put(deviceId, newPool);
365 log.info("success to apply label resource");
370 public boolean releaseToDevicePool(Multimap<DeviceId, LabelResource> release) {
371 Map<DeviceId, Collection<LabelResource>> maps = release.asMap();
372 Set<DeviceId> deviceIdSet = maps.keySet();
373 LabelResourceRequest request = null;
374 for (Iterator<DeviceId> it = deviceIdSet.iterator(); it.hasNext();) {
375 DeviceId deviceId = (DeviceId) it.next();
376 Device device = (Device) deviceService.getDevice(deviceId);
377 if (device == null) {
380 ImmutableSet<LabelResource> collection = ImmutableSet.copyOf(maps
382 request = new LabelResourceRequest(
384 LabelResourceRequest.Type.RELEASE,
386 NodeId master = mastershipService.getMasterFor(deviceId);
388 if (master == null) {
389 log.warn("Failed to releaseToDevicePool: No master for {}", deviceId);
393 if (master.equals(clusterService.getLocalNode().id())) {
394 return internalRelease(request);
397 log.trace("Forwarding request to {}, which is the primary (master) for device {}",
400 return complete(clusterCommunicator
401 .sendAndReceive(request,
402 LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
403 SERIALIZER::encode, SERIALIZER::decode,
409 private boolean internalRelease(LabelResourceRequest request) {
410 DeviceId deviceId = request.deviceId();
411 Collection<LabelResource> release = request.releaseCollection();
412 Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
413 LabelResourcePool pool = poolOld.value();
415 log.info("the label resource pool of device id {} does not exist");
418 Set<LabelResource> storeSet = new HashSet<LabelResource>(
419 pool.releaseLabelId());
420 LabelResource labelResource = null;
421 long realReleasedNum = 0;
422 for (Iterator<LabelResource> it = release.iterator(); it.hasNext();) {
423 labelResource = it.next();
424 if (labelResource.labelResourceId().labelId() < pool.beginLabel()
426 || labelResource.labelResourceId().labelId() > pool
427 .endLabel().labelId()) {
430 if (pool.currentUsedMaxLabelId().labelId() > labelResource
431 .labelResourceId().labelId()
432 || !storeSet.contains(labelResource)) {
433 storeSet.add(labelResource);
437 long beginNum = pool.beginLabel().labelId();
438 long endNum = pool.endLabel().labelId();
439 long totalNum = pool.totalNum();
440 long usedNum = pool.usedNum() - realReleasedNum;
441 long current = pool.currentUsedMaxLabelId().labelId();
442 ImmutableSet<LabelResource> s = ImmutableSet.copyOf(storeSet);
443 LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(),
447 resourcePool.put(deviceId, newPool);
448 log.info("success to release label resource");
453 public boolean isDevicePoolFull(DeviceId deviceId) {
454 Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
458 return pool.value().currentUsedMaxLabelId() == pool.value().endLabel()
459 && pool.value().releaseLabelId().size() == 0 ? true : false;
463 public long getFreeNumOfDevicePool(DeviceId deviceId) {
464 Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
468 return pool.value().endLabel().labelId()
469 - pool.value().currentUsedMaxLabelId().labelId()
470 + pool.value().releaseLabelId().size();
474 public LabelResourcePool getDeviceLabelResourcePool(DeviceId deviceId) {
475 Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
476 return pool == null ? null : pool.value();
480 public boolean destroyGlobalPool() {
481 return this.internalDestroy(DeviceId
482 .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
486 public Collection<LabelResource> applyFromGlobalPool(long applyNum) {
487 LabelResourceRequest request = new LabelResourceRequest(
488 DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
489 LabelResourceRequest.Type.APPLY,
491 return this.internalApply(request);
495 public boolean releaseToGlobalPool(Set<LabelResourceId> release) {
496 Set<LabelResource> set = new HashSet<LabelResource>();
497 DefaultLabelResource resource = null;
498 for (LabelResourceId labelResource : release) {
499 resource = new DefaultLabelResource(
500 DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
504 LabelResourceRequest request = new LabelResourceRequest(
505 DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
506 LabelResourceRequest.Type.APPLY,
510 return this.internalRelease(request);
514 public boolean isGlobalPoolFull() {
515 return this.isDevicePoolFull(DeviceId
516 .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
520 public long getFreeNumOfGlobalPool() {
521 return this.getFreeNumOfDevicePool(DeviceId
522 .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
526 public LabelResourcePool getGlobalLabelResourcePool() {
527 return this.getDeviceLabelResourcePool(DeviceId
528 .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
531 private <T> T complete(Future<T> future) {
533 return future.get(PEER_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
534 } catch (InterruptedException e) {
535 Thread.currentThread().interrupt();
536 log.error("Interrupted while waiting for operation to complete.", e);
538 } catch (TimeoutException | ExecutionException e) {
539 log.error("Failed remote operation", e);