2 * Copyright 2014-2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.store.packet.impl;
18 import com.google.common.collect.Lists;
19 import org.apache.felix.scr.annotations.Activate;
20 import org.apache.felix.scr.annotations.Component;
21 import org.apache.felix.scr.annotations.Deactivate;
22 import org.apache.felix.scr.annotations.Reference;
23 import org.apache.felix.scr.annotations.ReferenceCardinality;
24 import org.apache.felix.scr.annotations.Service;
25 import org.onlab.util.KryoNamespace;
26 import org.onosproject.cluster.ClusterService;
27 import org.onosproject.cluster.NodeId;
28 import org.onosproject.mastership.MastershipService;
29 import org.onosproject.net.flow.TrafficSelector;
30 import org.onosproject.net.packet.OutboundPacket;
31 import org.onosproject.net.packet.PacketEvent;
32 import org.onosproject.net.packet.PacketEvent.Type;
33 import org.onosproject.net.packet.PacketRequest;
34 import org.onosproject.net.packet.PacketStore;
35 import org.onosproject.net.packet.PacketStoreDelegate;
36 import org.onosproject.store.AbstractStore;
37 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
38 import org.onosproject.store.cluster.messaging.MessageSubject;
39 import org.onosproject.store.serializers.KryoNamespaces;
40 import org.onosproject.store.serializers.KryoSerializer;
41 import org.onosproject.store.service.ConsistentMap;
42 import org.onosproject.store.service.Serializer;
43 import org.onosproject.store.service.StorageService;
44 import org.onosproject.store.service.Versioned;
45 import org.slf4j.Logger;
47 import java.util.HashSet;
48 import java.util.List;
50 import java.util.concurrent.ExecutorService;
51 import java.util.concurrent.Executors;
53 import static org.onlab.util.Tools.groupedThreads;
54 import static org.slf4j.LoggerFactory.getLogger;
57 * Distributed packet store implementation allowing packets to be sent to
60 @Component(immediate = true)
62 public class DistributedPacketStore
63 extends AbstractStore<PacketEvent, PacketStoreDelegate>
64 implements PacketStore {
66 private final Logger log = getLogger(getClass());
68 // TODO: make this configurable.
69 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 protected MastershipService mastershipService;
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 protected ClusterService clusterService;
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected ClusterCommunicationService communicationService;
80 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 protected StorageService storageService;
83 private PacketRequestTracker tracker;
85 private static final MessageSubject PACKET_OUT_SUBJECT =
86 new MessageSubject("packet-out");
88 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
90 protected void setupKryoPool() {
91 serializerPool = KryoNamespace.newBuilder()
92 .register(KryoNamespaces.API)
93 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
98 private ExecutorService messageHandlingExecutor;
101 public void activate() {
102 messageHandlingExecutor = Executors.newFixedThreadPool(
103 MESSAGE_HANDLER_THREAD_POOL_SIZE,
104 groupedThreads("onos/store/packet", "message-handlers"));
106 communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
108 packet -> notifyDelegate(new PacketEvent(Type.EMIT, packet)),
109 messageHandlingExecutor);
111 tracker = new PacketRequestTracker();
117 public void deactivate() {
118 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
119 messageHandlingExecutor.shutdown();
124 public void emit(OutboundPacket packet) {
125 NodeId myId = clusterService.getLocalNode().id();
126 NodeId master = mastershipService.getMasterFor(packet.sendThrough());
128 if (master == null) {
132 if (myId.equals(master)) {
133 notifyDelegate(new PacketEvent(Type.EMIT, packet));
137 communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
138 .whenComplete((r, error) -> {
140 log.warn("Failed to send packet-out to {}", master, error);
146 public boolean requestPackets(PacketRequest request) {
147 return tracker.add(request);
151 public boolean cancelPackets(PacketRequest request) {
152 return tracker.remove(request);
156 public List<PacketRequest> existingRequests() {
157 return tracker.requests();
160 private class PacketRequestTracker {
162 private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;
164 public PacketRequestTracker() {
165 requests = storageService.<TrafficSelector, Set<PacketRequest>>consistentMapBuilder()
166 .withName("onos-packet-requests")
167 .withPartitionsDisabled()
168 .withSerializer(Serializer.using(KryoNamespaces.API))
172 public boolean add(PacketRequest request) {
173 Versioned<Set<PacketRequest>> old = requests.get(request.selector());
174 if (old != null && old.value().contains(request)) {
177 // FIXME: add retry logic using a random delay
178 Set<PacketRequest> newSet = new HashSet<>();
181 return requests.putIfAbsent(request.selector(), newSet) == null;
183 newSet.addAll(old.value());
184 return requests.replace(request.selector(), old.version(), newSet);
187 public boolean remove(PacketRequest request) {
188 Versioned<Set<PacketRequest>> old = requests.get(request.selector());
189 if (old == null || !old.value().contains(request)) {
192 // FIXME: add retry logic using a random delay
193 Set<PacketRequest> newSet = new HashSet<>(old.value());
194 newSet.remove(request);
195 if (newSet.isEmpty()) {
196 return requests.remove(request.selector(), old.version());
198 return requests.replace(request.selector(), old.version(), newSet);
201 public List<PacketRequest> requests() {
202 List<PacketRequest> list = Lists.newArrayList();
203 requests.values().forEach(v -> list.addAll(v.value()));
204 list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());