1 package org.onosproject.store.cluster.impl;
3 import com.google.common.collect.ImmutableSet;
4 import com.google.common.collect.Sets;
5 import org.apache.felix.scr.annotations.Activate;
6 import org.apache.felix.scr.annotations.Component;
7 import org.apache.felix.scr.annotations.Deactivate;
8 import org.apache.felix.scr.annotations.Service;
9 import org.onlab.packet.IpAddress;
10 import org.onosproject.cluster.ClusterDefinitionService;
11 import org.onosproject.cluster.ControllerNode;
12 import org.onosproject.cluster.DefaultControllerNode;
13 import org.onosproject.cluster.NodeId;
14 import org.onosproject.store.consistent.impl.DatabaseDefinition;
15 import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
16 import org.slf4j.Logger;
19 import java.io.IOException;
20 import java.net.InetAddress;
21 import java.net.NetworkInterface;
22 import java.net.SocketException;
23 import java.util.Enumeration;
25 import java.util.stream.Collectors;
27 import static java.net.NetworkInterface.getNetworkInterfaces;
28 import static java.util.Collections.list;
29 import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
30 import static org.onosproject.store.consistent.impl.DatabaseManager.PARTITION_DEFINITION_FILE;
31 import static org.slf4j.LoggerFactory.getLogger;
34 * Implementation of ClusterDefinitionService.
36 @Component(immediate = true)
38 public class ClusterDefinitionManager implements ClusterDefinitionService {
40 public static final String CLUSTER_DEFINITION_FILE = "../config/cluster.json";
41 private static final String ONOS_NIC = "ONOS_NIC";
42 private static final Logger log = getLogger(ClusterDefinitionManager.class);
43 private ControllerNode localNode;
44 private Set<ControllerNode> seedNodes;
47 public void activate() {
48 File clusterDefinitionFile = new File(CLUSTER_DEFINITION_FILE);
49 ClusterDefinitionStore clusterDefinitionStore =
50 new ClusterDefinitionStore(clusterDefinitionFile.getPath());
52 if (!clusterDefinitionFile.exists()) {
53 createDefaultClusterDefinition(clusterDefinitionStore);
57 ClusterDefinition clusterDefinition = clusterDefinitionStore.read();
58 establishSelfIdentity(clusterDefinition);
59 seedNodes = ImmutableSet
60 .copyOf(clusterDefinition.getNodes())
62 .filter(n -> !localNode.id().equals(new NodeId(n.getId())))
63 .map(n -> new DefaultControllerNode(new NodeId(n.getId()),
64 IpAddress.valueOf(n.getIp()),
66 .collect(Collectors.toSet());
67 } catch (IOException e) {
68 throw new IllegalStateException("Failed to read cluster definition.", e);
75 public void deactivate() {
80 public ControllerNode localNode() {
85 public Set<ControllerNode> seedNodes() {
90 public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
92 Set<NodeInfo> infos = Sets.newHashSet();
93 nodes.forEach(n -> infos.add(NodeInfo.from(n.id().toString(),
97 ClusterDefinition cdef = ClusterDefinition.from(infos, ipPrefix);
98 new ClusterDefinitionStore(CLUSTER_DEFINITION_FILE).write(cdef);
100 DatabaseDefinition ddef = DatabaseDefinition.from(infos);
101 new DatabaseDefinitionStore(PARTITION_DEFINITION_FILE).write(ddef);
102 } catch (IOException e) {
103 log.error("Unable to form cluster", e);
107 private IpAddress findLocalIp(ClusterDefinition clusterDefinition) throws SocketException {
108 Enumeration<NetworkInterface> interfaces =
109 NetworkInterface.getNetworkInterfaces();
110 while (interfaces.hasMoreElements()) {
111 NetworkInterface iface = interfaces.nextElement();
112 Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
113 while (inetAddresses.hasMoreElements()) {
114 IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
115 if (clusterDefinition.getNodes().stream()
116 .map(NodeInfo::getIp)
117 .map(IpAddress::valueOf)
118 .anyMatch(nodeIp -> ip.equals(nodeIp))) {
123 throw new IllegalStateException("Unable to determine local ip");
126 private void establishSelfIdentity(ClusterDefinition clusterDefinition) {
128 IpAddress ip = findLocalIp(clusterDefinition);
129 localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
130 } catch (SocketException e) {
131 throw new IllegalStateException("Cannot determine local IP", e);
135 private void createDefaultClusterDefinition(ClusterDefinitionStore store) {
136 // Assumes IPv4 is returned.
137 String ip = getSiteLocalAddress();
138 String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
139 NodeInfo node = NodeInfo.from(ip, ip, DEFAULT_PORT);
141 store.write(ClusterDefinition.from(ImmutableSet.of(node), ipPrefix));
142 } catch (IOException e) {
143 log.warn("Unable to write default cluster definition", e);
148 * Returns the address that matches the IP prefix given in ONOS_NIC
149 * environment variable if one was specified, or the first site local
150 * address if one can be found or the loopback address otherwise.
152 * @return site-local address in string form
154 public static String getSiteLocalAddress() {
156 String ipPrefix = System.getenv(ONOS_NIC);
157 for (NetworkInterface nif : list(getNetworkInterfaces())) {
158 for (InetAddress address : list(nif.getInetAddresses())) {
159 IpAddress ip = IpAddress.valueOf(address);
160 if (ipPrefix == null && address.isSiteLocalAddress() ||
161 ipPrefix != null && matchInterface(ip.toString(), ipPrefix)) {
162 return ip.toString();
166 } catch (SocketException e) {
167 log.error("Unable to get network interfaces", e);
170 return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
173 // Indicates whether the specified interface address matches the given prefix.
174 // FIXME: Add a facility to IpPrefix to make this more robust
175 private static boolean matchInterface(String ip, String ipPrefix) {
176 String s = ipPrefix.replaceAll("\\.\\*", "");
177 return ip.startsWith(s);