2 * Copyright 2014-2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.store.packet.impl;
18 import com.google.common.collect.ImmutableSet;
19 import com.google.common.collect.Lists;
20 import com.google.common.collect.Sets;
21 import org.apache.felix.scr.annotations.Activate;
22 import org.apache.felix.scr.annotations.Component;
23 import org.apache.felix.scr.annotations.Deactivate;
24 import org.apache.felix.scr.annotations.Reference;
25 import org.apache.felix.scr.annotations.ReferenceCardinality;
26 import org.apache.felix.scr.annotations.Service;
27 import org.onlab.util.KryoNamespace;
28 import org.onosproject.cluster.ClusterService;
29 import org.onosproject.cluster.NodeId;
30 import org.onosproject.mastership.MastershipService;
31 import org.onosproject.net.flow.TrafficSelector;
32 import org.onosproject.net.packet.OutboundPacket;
33 import org.onosproject.net.packet.PacketEvent;
34 import org.onosproject.net.packet.PacketEvent.Type;
35 import org.onosproject.net.packet.PacketRequest;
36 import org.onosproject.net.packet.PacketStore;
37 import org.onosproject.net.packet.PacketStoreDelegate;
38 import org.onosproject.store.AbstractStore;
39 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
40 import org.onosproject.store.cluster.messaging.MessageSubject;
41 import org.onosproject.store.serializers.KryoNamespaces;
42 import org.onosproject.store.serializers.KryoSerializer;
43 import org.onosproject.store.service.ConsistentMap;
44 import org.onosproject.store.service.Serializer;
45 import org.onosproject.store.service.StorageService;
46 import org.slf4j.Logger;
48 import java.util.List;
50 import java.util.concurrent.ExecutorService;
51 import java.util.concurrent.Executors;
52 import java.util.concurrent.atomic.AtomicBoolean;
54 import static org.onlab.util.Tools.groupedThreads;
55 import static org.slf4j.LoggerFactory.getLogger;
58 * Distributed packet store implementation allowing packets to be sent to
61 @Component(immediate = true)
63 public class DistributedPacketStore
64 extends AbstractStore<PacketEvent, PacketStoreDelegate>
65 implements PacketStore {
67 private final Logger log = getLogger(getClass());
69 // TODO: make this configurable.
70 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 protected MastershipService mastershipService;
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected ClusterService clusterService;
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 protected ClusterCommunicationService communicationService;
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected StorageService storageService;
84 private PacketRequestTracker tracker;
86 private static final MessageSubject PACKET_OUT_SUBJECT =
87 new MessageSubject("packet-out");
89 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
91 protected void setupKryoPool() {
92 serializerPool = KryoNamespace.newBuilder()
93 .register(KryoNamespaces.API)
94 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
99 private ExecutorService messageHandlingExecutor;
102 public void activate() {
103 messageHandlingExecutor = Executors.newFixedThreadPool(
104 MESSAGE_HANDLER_THREAD_POOL_SIZE,
105 groupedThreads("onos/store/packet", "message-handlers"));
107 communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
109 packet -> notifyDelegate(new PacketEvent(Type.EMIT, packet)),
110 messageHandlingExecutor);
112 tracker = new PacketRequestTracker();
118 public void deactivate() {
119 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
120 messageHandlingExecutor.shutdown();
126 public void emit(OutboundPacket packet) {
127 NodeId myId = clusterService.getLocalNode().id();
128 NodeId master = mastershipService.getMasterFor(packet.sendThrough());
130 if (master == null) {
134 if (myId.equals(master)) {
135 notifyDelegate(new PacketEvent(Type.EMIT, packet));
139 communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
140 .whenComplete((r, error) -> {
142 log.warn("Failed to send packet-out to {}", master, error);
148 public void requestPackets(PacketRequest request) {
149 tracker.add(request);
153 public void cancelPackets(PacketRequest request) {
154 tracker.remove(request);
158 public List<PacketRequest> existingRequests() {
159 return tracker.requests();
162 private class PacketRequestTracker {
164 private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;
166 public PacketRequestTracker() {
167 requests = storageService.<TrafficSelector, Set<PacketRequest>>consistentMapBuilder()
168 .withName("onos-packet-requests")
169 .withPartitionsDisabled()
170 .withSerializer(Serializer.using(KryoNamespaces.API))
174 public void add(PacketRequest request) {
175 AtomicBoolean firstRequest = new AtomicBoolean(false);
176 requests.compute(request.selector(), (s, existingRequests) -> {
177 if (existingRequests == null) {
178 firstRequest.set(true);
179 return ImmutableSet.of(request);
180 } else if (!existingRequests.contains(request)) {
181 return ImmutableSet.<PacketRequest>builder()
182 .addAll(existingRequests)
186 return existingRequests;
190 if (firstRequest.get() && delegate != null) {
191 // The instance that makes the first request will push to all devices
192 delegate.requestPackets(request);
196 public void remove(PacketRequest request) {
197 AtomicBoolean removedLast = new AtomicBoolean(false);
198 requests.computeIfPresent(request.selector(), (s, existingRequests) -> {
199 if (existingRequests.contains(request)) {
200 Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
201 newRequests.remove(request);
202 if (newRequests.size() > 0) {
203 return ImmutableSet.copyOf(newRequests);
205 removedLast.set(true);
209 return existingRequests;
213 if (removedLast.get() && delegate != null) {
214 // The instance that removes the last request will remove from all devices
215 delegate.cancelPackets(request);
220 public List<PacketRequest> requests() {
221 List<PacketRequest> list = Lists.newArrayList();
222 requests.values().forEach(v -> list.addAll(v.value()));
223 list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());