d4c89c93ec3913f4b53f5e8ce851bea7a56981bc
[onosfw.git] /
1 /*
2  * Copyright 2014-2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.store.packet.impl;
17
18 import com.google.common.collect.Lists;
19 import org.apache.felix.scr.annotations.Activate;
20 import org.apache.felix.scr.annotations.Component;
21 import org.apache.felix.scr.annotations.Deactivate;
22 import org.apache.felix.scr.annotations.Reference;
23 import org.apache.felix.scr.annotations.ReferenceCardinality;
24 import org.apache.felix.scr.annotations.Service;
25 import org.onlab.util.KryoNamespace;
26 import org.onosproject.cluster.ClusterService;
27 import org.onosproject.cluster.NodeId;
28 import org.onosproject.mastership.MastershipService;
29 import org.onosproject.net.flow.TrafficSelector;
30 import org.onosproject.net.packet.OutboundPacket;
31 import org.onosproject.net.packet.PacketEvent;
32 import org.onosproject.net.packet.PacketEvent.Type;
33 import org.onosproject.net.packet.PacketRequest;
34 import org.onosproject.net.packet.PacketStore;
35 import org.onosproject.net.packet.PacketStoreDelegate;
36 import org.onosproject.store.AbstractStore;
37 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
38 import org.onosproject.store.cluster.messaging.MessageSubject;
39 import org.onosproject.store.serializers.KryoNamespaces;
40 import org.onosproject.store.serializers.KryoSerializer;
41 import org.onosproject.store.service.ConsistentMap;
42 import org.onosproject.store.service.Serializer;
43 import org.onosproject.store.service.StorageService;
44 import org.onosproject.store.service.Versioned;
45 import org.slf4j.Logger;
46
47 import java.util.HashSet;
48 import java.util.List;
49 import java.util.Set;
50 import java.util.concurrent.ExecutorService;
51 import java.util.concurrent.Executors;
52
53 import static org.onlab.util.Tools.groupedThreads;
54 import static org.slf4j.LoggerFactory.getLogger;
55
56 /**
57  * Distributed packet store implementation allowing packets to be sent to
58  * remote instances.
59  */
60 @Component(immediate = true)
61 @Service
62 public class DistributedPacketStore
63         extends AbstractStore<PacketEvent, PacketStoreDelegate>
64         implements PacketStore {
65
66     private final Logger log = getLogger(getClass());
67
68     // TODO: make this configurable.
69     private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
70
71     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72     protected MastershipService mastershipService;
73
74     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75     protected ClusterService clusterService;
76
77     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78     protected ClusterCommunicationService communicationService;
79
80     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81     protected StorageService storageService;
82
83     private PacketRequestTracker tracker;
84
85     private static final MessageSubject PACKET_OUT_SUBJECT =
86             new MessageSubject("packet-out");
87
88     private static final KryoSerializer SERIALIZER = new KryoSerializer() {
89         @Override
90         protected void setupKryoPool() {
91             serializerPool = KryoNamespace.newBuilder()
92                     .register(KryoNamespaces.API)
93                     .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
94                     .build();
95         }
96     };
97
98     private ExecutorService messageHandlingExecutor;
99
100     @Activate
101     public void activate() {
102         messageHandlingExecutor = Executors.newFixedThreadPool(
103                 MESSAGE_HANDLER_THREAD_POOL_SIZE,
104                 groupedThreads("onos/store/packet", "message-handlers"));
105
106         communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
107                 SERIALIZER::decode,
108                 packet -> notifyDelegate(new PacketEvent(Type.EMIT, packet)),
109                 messageHandlingExecutor);
110
111         tracker = new PacketRequestTracker();
112
113         log.info("Started");
114     }
115
116     @Deactivate
117     public void deactivate() {
118         communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
119         messageHandlingExecutor.shutdown();
120         log.info("Stopped");
121     }
122
123     @Override
124     public void emit(OutboundPacket packet) {
125         NodeId myId = clusterService.getLocalNode().id();
126         NodeId master = mastershipService.getMasterFor(packet.sendThrough());
127
128         if (master == null) {
129             return;
130         }
131
132         if (myId.equals(master)) {
133             notifyDelegate(new PacketEvent(Type.EMIT, packet));
134             return;
135         }
136
137         communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
138                             .whenComplete((r, error) -> {
139                                 if (error != null) {
140                                     log.warn("Failed to send packet-out to {}", master, error);
141                                 }
142                             });
143     }
144
145     @Override
146     public boolean requestPackets(PacketRequest request) {
147         return tracker.add(request);
148     }
149
150     @Override
151     public boolean cancelPackets(PacketRequest request) {
152         return tracker.remove(request);
153     }
154
155     @Override
156     public List<PacketRequest> existingRequests() {
157         return tracker.requests();
158     }
159
160     private class PacketRequestTracker {
161
162         private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;
163
164         public PacketRequestTracker() {
165             requests = storageService.<TrafficSelector, Set<PacketRequest>>consistentMapBuilder()
166                     .withName("onos-packet-requests")
167                     .withPartitionsDisabled()
168                     .withSerializer(Serializer.using(KryoNamespaces.API))
169                     .build();
170         }
171
172         public boolean add(PacketRequest request) {
173             Versioned<Set<PacketRequest>> old = requests.get(request.selector());
174             if (old != null && old.value().contains(request)) {
175                 return false;
176             }
177             // FIXME: add retry logic using a random delay
178             Set<PacketRequest> newSet = new HashSet<>();
179             newSet.add(request);
180             if (old == null) {
181                 return requests.putIfAbsent(request.selector(), newSet) == null;
182             }
183             newSet.addAll(old.value());
184             return requests.replace(request.selector(), old.version(), newSet);
185         }
186
187         public boolean remove(PacketRequest request) {
188             Versioned<Set<PacketRequest>> old = requests.get(request.selector());
189             if (old == null || !old.value().contains(request)) {
190                 return false;
191             }
192             // FIXME: add retry logic using a random delay
193             Set<PacketRequest> newSet = new HashSet<>(old.value());
194             newSet.remove(request);
195             if (newSet.isEmpty()) {
196                 return requests.remove(request.selector(), old.version());
197             }
198             return requests.replace(request.selector(), old.version(), newSet);
199         }
200
201         public List<PacketRequest> requests() {
202             List<PacketRequest> list = Lists.newArrayList();
203             requests.values().forEach(v -> list.addAll(v.value()));
204             list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
205             return list;
206         }
207
208     }
209 }