4e28e3c24fc79439baa62dd039d74a05c459476c
[onosfw.git] /
1 package org.onosproject.store.cluster.impl;
2
3 import com.google.common.collect.ImmutableSet;
4 import com.google.common.collect.Sets;
5 import org.apache.felix.scr.annotations.Activate;
6 import org.apache.felix.scr.annotations.Component;
7 import org.apache.felix.scr.annotations.Deactivate;
8 import org.apache.felix.scr.annotations.Service;
9 import org.onlab.packet.IpAddress;
10 import org.onosproject.cluster.ClusterDefinitionService;
11 import org.onosproject.cluster.ControllerNode;
12 import org.onosproject.cluster.DefaultControllerNode;
13 import org.onosproject.cluster.NodeId;
14 import org.onosproject.store.consistent.impl.DatabaseDefinition;
15 import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
16 import org.slf4j.Logger;
17
18 import java.io.File;
19 import java.io.IOException;
20 import java.net.InetAddress;
21 import java.net.NetworkInterface;
22 import java.net.SocketException;
23 import java.util.Enumeration;
24 import java.util.Set;
25 import java.util.stream.Collectors;
26
27 import static java.net.NetworkInterface.getNetworkInterfaces;
28 import static java.util.Collections.list;
29 import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
30 import static org.onosproject.store.consistent.impl.DatabaseManager.PARTITION_DEFINITION_FILE;
31 import static org.slf4j.LoggerFactory.getLogger;
32
33 /**
34  * Implementation of ClusterDefinitionService.
35  */
36 @Component(immediate = true)
37 @Service
38 public class ClusterDefinitionManager implements ClusterDefinitionService {
39
40     public static final String CLUSTER_DEFINITION_FILE = "../config/cluster.json";
41     private static final String ONOS_NIC = "ONOS_NIC";
42     private static final Logger log = getLogger(ClusterDefinitionManager.class);
43     private ControllerNode localNode;
44     private Set<ControllerNode> seedNodes;
45
46     @Activate
47     public void activate() {
48         File clusterDefinitionFile = new File(CLUSTER_DEFINITION_FILE);
49         ClusterDefinitionStore clusterDefinitionStore =
50                 new ClusterDefinitionStore(clusterDefinitionFile.getPath());
51
52         if (!clusterDefinitionFile.exists()) {
53             createDefaultClusterDefinition(clusterDefinitionStore);
54         }
55
56         try {
57             ClusterDefinition clusterDefinition = clusterDefinitionStore.read();
58             establishSelfIdentity(clusterDefinition);
59             seedNodes = ImmutableSet
60                     .copyOf(clusterDefinition.getNodes())
61                     .stream()
62                     .filter(n -> !localNode.id().equals(new NodeId(n.getId())))
63                     .map(n -> new DefaultControllerNode(new NodeId(n.getId()),
64                                                         IpAddress.valueOf(n.getIp()),
65                                                         n.getTcpPort()))
66                     .collect(Collectors.toSet());
67         } catch (IOException e) {
68             throw new IllegalStateException("Failed to read cluster definition.", e);
69         }
70
71         log.info("Started");
72     }
73
74     @Deactivate
75     public void deactivate() {
76         log.info("Stopped");
77     }
78
79     @Override
80     public ControllerNode localNode() {
81         return localNode;
82     }
83
84     @Override
85     public Set<ControllerNode> seedNodes() {
86         return seedNodes;
87     }
88
89     @Override
90     public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
91         try {
92             Set<NodeInfo> infos = Sets.newHashSet();
93             nodes.forEach(n -> infos.add(NodeInfo.from(n.id().toString(),
94                                                        n.ip().toString(),
95                                                        n.tcpPort())));
96
97             ClusterDefinition cdef = ClusterDefinition.from(infos, ipPrefix);
98             new ClusterDefinitionStore(CLUSTER_DEFINITION_FILE).write(cdef);
99
100             DatabaseDefinition ddef = DatabaseDefinition.from(infos);
101             new DatabaseDefinitionStore(PARTITION_DEFINITION_FILE).write(ddef);
102         } catch (IOException e) {
103             log.error("Unable to form cluster", e);
104         }
105     }
106
107     private IpAddress findLocalIp(ClusterDefinition clusterDefinition) throws SocketException {
108         Enumeration<NetworkInterface> interfaces =
109                 NetworkInterface.getNetworkInterfaces();
110         while (interfaces.hasMoreElements()) {
111             NetworkInterface iface = interfaces.nextElement();
112             Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
113             while (inetAddresses.hasMoreElements()) {
114                 IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
115                 if (clusterDefinition.getNodes().stream()
116                         .map(NodeInfo::getIp)
117                         .map(IpAddress::valueOf)
118                         .anyMatch(nodeIp -> ip.equals(nodeIp))) {
119                     return ip;
120                 }
121             }
122         }
123         throw new IllegalStateException("Unable to determine local ip");
124     }
125
126     private void establishSelfIdentity(ClusterDefinition clusterDefinition) {
127         try {
128             IpAddress ip = findLocalIp(clusterDefinition);
129             localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
130         } catch (SocketException e) {
131             throw new IllegalStateException("Cannot determine local IP", e);
132         }
133     }
134
135     private void createDefaultClusterDefinition(ClusterDefinitionStore store) {
136         // Assumes IPv4 is returned.
137         String ip = getSiteLocalAddress();
138         String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
139         NodeInfo node = NodeInfo.from(ip, ip, DEFAULT_PORT);
140         try {
141             store.write(ClusterDefinition.from(ImmutableSet.of(node), ipPrefix));
142         } catch (IOException e) {
143             log.warn("Unable to write default cluster definition", e);
144         }
145     }
146
147     /**
148      * Returns the address that matches the IP prefix given in ONOS_NIC
149      * environment variable if one was specified, or the first site local
150      * address if one can be found or the loopback address otherwise.
151      *
152      * @return site-local address in string form
153      */
154     public static String getSiteLocalAddress() {
155         try {
156             String ipPrefix = System.getenv(ONOS_NIC);
157             for (NetworkInterface nif : list(getNetworkInterfaces())) {
158                 for (InetAddress address : list(nif.getInetAddresses())) {
159                     IpAddress ip = IpAddress.valueOf(address);
160                     if (ipPrefix == null && address.isSiteLocalAddress() ||
161                             ipPrefix != null && matchInterface(ip.toString(), ipPrefix)) {
162                         return ip.toString();
163                     }
164                 }
165             }
166         } catch (SocketException e) {
167             log.error("Unable to get network interfaces", e);
168         }
169
170         return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
171     }
172
173     // Indicates whether the specified interface address matches the given prefix.
174     // FIXME: Add a facility to IpPrefix to make this more robust
175     private static boolean matchInterface(String ip, String ipPrefix) {
176         String s = ipPrefix.replaceAll("\\.\\*", "");
177         return ip.startsWith(s);
178     }
179 }