f0f3eb5e96f86f1125ce21f64ffa29077b249dba
[onosfw.git] /
1 /*
2  * Copyright 2014-2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.store.packet.impl;
17
18 import com.google.common.collect.ImmutableSet;
19 import com.google.common.collect.Lists;
20 import com.google.common.collect.Sets;
21 import org.apache.felix.scr.annotations.Activate;
22 import org.apache.felix.scr.annotations.Component;
23 import org.apache.felix.scr.annotations.Deactivate;
24 import org.apache.felix.scr.annotations.Reference;
25 import org.apache.felix.scr.annotations.ReferenceCardinality;
26 import org.apache.felix.scr.annotations.Service;
27 import org.onlab.util.KryoNamespace;
28 import org.onosproject.cluster.ClusterService;
29 import org.onosproject.cluster.NodeId;
30 import org.onosproject.mastership.MastershipService;
31 import org.onosproject.net.flow.TrafficSelector;
32 import org.onosproject.net.packet.OutboundPacket;
33 import org.onosproject.net.packet.PacketEvent;
34 import org.onosproject.net.packet.PacketEvent.Type;
35 import org.onosproject.net.packet.PacketRequest;
36 import org.onosproject.net.packet.PacketStore;
37 import org.onosproject.net.packet.PacketStoreDelegate;
38 import org.onosproject.store.AbstractStore;
39 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
40 import org.onosproject.store.cluster.messaging.MessageSubject;
41 import org.onosproject.store.serializers.KryoNamespaces;
42 import org.onosproject.store.serializers.KryoSerializer;
43 import org.onosproject.store.service.ConsistentMap;
44 import org.onosproject.store.service.Serializer;
45 import org.onosproject.store.service.StorageService;
46 import org.slf4j.Logger;
47
48 import java.util.List;
49 import java.util.Set;
50 import java.util.concurrent.ExecutorService;
51 import java.util.concurrent.Executors;
52 import java.util.concurrent.atomic.AtomicBoolean;
53
54 import static org.onlab.util.Tools.groupedThreads;
55 import static org.slf4j.LoggerFactory.getLogger;
56
57 /**
58  * Distributed packet store implementation allowing packets to be sent to
59  * remote instances.
60  */
61 @Component(immediate = true)
62 @Service
63 public class DistributedPacketStore
64         extends AbstractStore<PacketEvent, PacketStoreDelegate>
65         implements PacketStore {
66
67     private final Logger log = getLogger(getClass());
68
69     // TODO: make this configurable.
70     private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
71
72     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73     protected MastershipService mastershipService;
74
75     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76     protected ClusterService clusterService;
77
78     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79     protected ClusterCommunicationService communicationService;
80
81     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82     protected StorageService storageService;
83
84     private PacketRequestTracker tracker;
85
86     private static final MessageSubject PACKET_OUT_SUBJECT =
87             new MessageSubject("packet-out");
88
89     private static final KryoSerializer SERIALIZER = new KryoSerializer() {
90         @Override
91         protected void setupKryoPool() {
92             serializerPool = KryoNamespace.newBuilder()
93                     .register(KryoNamespaces.API)
94                     .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
95                     .build();
96         }
97     };
98
99     private ExecutorService messageHandlingExecutor;
100
101     @Activate
102     public void activate() {
103         messageHandlingExecutor = Executors.newFixedThreadPool(
104                 MESSAGE_HANDLER_THREAD_POOL_SIZE,
105                 groupedThreads("onos/store/packet", "message-handlers"));
106
107         communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
108                 SERIALIZER::decode,
109                 packet -> notifyDelegate(new PacketEvent(Type.EMIT, packet)),
110                 messageHandlingExecutor);
111
112         tracker = new PacketRequestTracker();
113
114         log.info("Started");
115     }
116
117     @Deactivate
118     public void deactivate() {
119         communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
120         messageHandlingExecutor.shutdown();
121         tracker = null;
122         log.info("Stopped");
123     }
124
125     @Override
126     public void emit(OutboundPacket packet) {
127         NodeId myId = clusterService.getLocalNode().id();
128         NodeId master = mastershipService.getMasterFor(packet.sendThrough());
129
130         if (master == null) {
131             return;
132         }
133
134         if (myId.equals(master)) {
135             notifyDelegate(new PacketEvent(Type.EMIT, packet));
136             return;
137         }
138
139         communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
140                             .whenComplete((r, error) -> {
141                                 if (error != null) {
142                                     log.warn("Failed to send packet-out to {}", master, error);
143                                 }
144                             });
145     }
146
147     @Override
148     public void requestPackets(PacketRequest request) {
149         tracker.add(request);
150     }
151
152     @Override
153     public void cancelPackets(PacketRequest request) {
154         tracker.remove(request);
155     }
156
157     @Override
158     public List<PacketRequest> existingRequests() {
159         return tracker.requests();
160     }
161
162     private class PacketRequestTracker {
163
164         private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;
165
166         public PacketRequestTracker() {
167             requests = storageService.<TrafficSelector, Set<PacketRequest>>consistentMapBuilder()
168                     .withName("onos-packet-requests")
169                     .withPartitionsDisabled()
170                     .withSerializer(Serializer.using(KryoNamespaces.API))
171                     .build();
172         }
173
174         public void add(PacketRequest request) {
175             AtomicBoolean firstRequest = new AtomicBoolean(false);
176             requests.compute(request.selector(), (s, existingRequests) -> {
177                 if (existingRequests == null) {
178                     firstRequest.set(true);
179                     return ImmutableSet.of(request);
180                 } else if (!existingRequests.contains(request)) {
181                     return ImmutableSet.<PacketRequest>builder()
182                                        .addAll(existingRequests)
183                                        .add(request)
184                                        .build();
185                 } else {
186                     return existingRequests;
187                 }
188             });
189
190             if (firstRequest.get() && delegate != null) {
191                 // The instance that makes the first request will push to all devices
192                 delegate.requestPackets(request);
193             }
194         }
195
196         public void remove(PacketRequest request) {
197             AtomicBoolean removedLast = new AtomicBoolean(false);
198             requests.computeIfPresent(request.selector(), (s, existingRequests) -> {
199                 if (existingRequests.contains(request)) {
200                     Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
201                     newRequests.remove(request);
202                     if (newRequests.size() > 0) {
203                         return ImmutableSet.copyOf(newRequests);
204                     } else {
205                         removedLast.set(true);
206                         return null;
207                     }
208                 } else {
209                     return existingRequests;
210                 }
211             });
212
213             if (removedLast.get() && delegate != null) {
214                 // The instance that removes the last request will remove from all devices
215                 delegate.cancelPackets(request);
216             }
217
218         }
219
220         public List<PacketRequest> requests() {
221             List<PacketRequest> list = Lists.newArrayList();
222             requests.values().forEach(v -> list.addAll(v.value()));
223             list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
224             return list;
225         }
226     }
227 }