HA集群中如何判断ActiveNN

偶然的机会,想到一个问题,HA的集群中,客户端是如何得知Active NN的地址的?

以前以为是从zk中查的,类似hbase一样,只配置一个zookeeper集群的地址就可以访问了。

结果发现不是。。
hadoop的机制就是简单暴力的重试。不过也有些道理,毕竟NN最多就2个,重试最多1次就能知道哪个是Active了。不像hbase,可能有任意多个master,一个个重试的话效率会很低

新的问题

但测试时又发现有些问题。我们的配置:

1
2
3
4
5
6
7
8
9
10
11
12
<property>
<name>dfs.ha.namenodes.hp1</name>
<value>nn1,nn2</value>
</property>
<property>
<name>dfs.namenode.rpc-address.hp1.nn1</name>
<value>inspur129.photo.163.org:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.hp1.nn2</name>
<value>inspur120.photo.163.org:8020</value>
</property>

发现无论dfs.ha.namenodes.hp1写成nn1,nn2还是nn2,nn1,客户端总是会先尝试去连nn1。其实nn1是standby,如果开启了DEBUG日志,就会看到客户端抛出一个异常:

1
2
3
4
5
6
7
15/02/02 15:48:05 DEBUG ipc.Client: IPC Client (2136875125) connection to inspur129.photo.163.org/10.160.128.197:8020 from mine/[email protected] got value #0
15/02/02 15:48:05 DEBUG retry.RetryInvocationHandler: Exception while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB. Trying to fail over immediately.
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby
at org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
at org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1496)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1030)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3275)

然后才去尝试nn2并成功:

1
2
3
4
15/02/02 15:48:05 DEBUG ipc.Client: IPC Client (2136875125) connection to inspur120.photo.163.org/10.160.128.192:8020 from mine/[email protected]: starting, having connections 2
15/02/02 15:48:05 DEBUG ipc.Client: IPC Client (2136875125) connection to inspur120.photo.163.org/10.160.128.192:8020 from mine/[email protected] sending #0
15/02/02 15:48:05 DEBUG ipc.Client: IPC Client (2136875125) connection to inspur120.photo.163.org/10.160.128.192:8020 from mine/[email protected] got value #0
15/02/02 15:48:05 DEBUG ipc.ProtobufRpcEngine: Call: getFileInfo took 29ms

无论重复多少次都是这样。为什么呢?按理说没必要排序。一时好奇研究下。

看下代码

一路追查:FileSystem.get(conf) -> FileSystem.createFileSystem -> FileSystem.initialize -> DistributedFileSystem.initialize

DistributedFileSystem.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
setConf(conf);
String host = uri.getHost();
if (host == null) {
throw new IOException("Incomplete HDFS URI, no host: "+ uri);
}
// 关键在这里,DFSClient才是利用RPC协议真正和Namenode交互的,只是平常我们不会直接使用,FileSystem在它上面包装了一层
// 这里传入的uri就是在core-site.xml中的配置,我们的集群上是hdfs://hp1
this.dfs = new DFSClient(uri, conf, statistics);
this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
this.workingDir = getHomeDirectory();
}

继续看DFSClient的构造函数:

DFSClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public DFSClient(URI nameNodeUri, Configuration conf,
FileSystem.Statistics stats)
throws IOException {
this(nameNodeUri, null, conf, stats);
}
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats)
throws IOException {
// 省略了很多无关代码
......
if (proxyInfo != null) {
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy();
} else if (rpcNamenode != null) {
// This case is used for testing.
Preconditions.checkArgument(nameNodeUri == null);
this.namenode = rpcNamenode;
dtService = null;
} else {
Preconditions.checkArgument(nameNodeUri != null,
"null URI");
// DFSClient也不知道具体访问的是那个机器,它是通过一个proxy对象访问的
proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
ClientProtocol.class);
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy();
}
......
}

继续:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
URI nameNodeUri, Class<T> xface) throws IOException {
// 这个类其实就是hdfs-site.xml中的dfs.client.failover.proxy.provider.$nameservice
// 在我们的集群上就是dfs.client.failover.proxy.provider.hp1,默认值是org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
Class<FailoverProxyProvider<T>> failoverProxyProviderClass =
getFailoverProxyProviderClass(conf, nameNodeUri, xface);
// 如果配置了dfs.client.failover.proxy.provider.$nameservice属性,就认为是开启HA的集群
if (failoverProxyProviderClass == null) {
// Non-HA case
return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
UserGroupInformation.getCurrentUser(), true);
} else {
// HA case
FailoverProxyProvider<T> failoverProxyProvider = NameNodeProxies
.createFailoverProxyProvider(conf, failoverProxyProviderClass, xface,
nameNodeUri);
Conf config = new Conf(conf);
// 这里有点复杂,利用java的代理机制返回一个ClientProtocol对象,调用这个对象的任意方法都会有重试机制,具体见RetryInvocationHandler.invoke方法
T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
config.maxFailoverAttempts, config.failoverSleepBaseMillis,
config.failoverSleepMaxMillis));
Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri);
return new ProxyAndInfo<T>(proxy, dtService);
}
}

RetryProxy.create方法具体的代码不列了,大概的原理是通过代理返回一个实现了ClientProtocol接口的对象A(这个对象是java动态生成的,没有对应的Class定义),这个对象内部包装了一个真正的ClientProtocol对象B,对A的操作都是在内部转化成对B的操作,只是加上了很多异常处理和重试机制。要先了解代理机制。

那真正的对象B是怎么得到的?是FailoverProxyProvider.getProxy得到的,看具体的实现类ConfiguredFailoverProxyProvider:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private final List<AddressRpcProxyPair<T>> proxies =
new ArrayList<AddressRpcProxyPair<T>>();
public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface) {
......
// 其实就是从conf中读取dfs.namenode.rpc-address.hp1.nn1等配置
Map<String, Map<String, InetSocketAddress>> map = DFSUtil.getHaNnRpcAddresses(
conf);
Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost());
if (addressesInNN == null || addressesInNN.size() == 0) {
throw new RuntimeException("Could not find any configured addresses " +
"for URI " + uri);
}
Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
// 将values加入ArrayList
for (InetSocketAddress address : addressesOfNns) {
proxies.add(new AddressRpcProxyPair<T>(address));
}
......
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public synchronized T getProxy() {
// 从这里可知,尝试连接的顺序和加入ArrayList的顺序是对应的
AddressRpcProxyPair current = proxies.get(currentProxyIndex);
if (current.namenode == null) {
try {
current.namenode = NameNodeProxies.createNonHAProxy(conf,
current.address, xface, ugi, false).getProxy();
} catch (IOException e) {
LOG.error("Failed to create RPC proxy to NameNode", e);
throw new RuntimeException(e);
}
}
return (T)current.namenode;
}

继续看DFSUtil的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private static Map<String, Map<String, InetSocketAddress>>
getAddresses(Configuration conf,
String defaultAddress, String... keys) {
// 在我们的集群上就一个nameservice,就是hp1
Collection<String> nameserviceIds = getNameServiceIds(conf);
// Look for configurations of the form <key>[.<nameserviceId>][.<namenodeId>]
// across all of the configured nameservices and namenodes.
Map<String, Map<String, InetSocketAddress>> ret = Maps.newLinkedHashMap();
for (String nsId : emptyAsSingletonNull(nameserviceIds)) {
Map<String, InetSocketAddress> isas =
getAddressesForNameserviceId(conf, nsId, defaultAddress, keys);
if (!isas.isEmpty()) {
ret.put(nsId, isas);
}
}
return ret;
}
private static Map<String, InetSocketAddress> getAddressesForNameserviceId(
Configuration conf, String nsId, String defaultValue,
String[] keys) {
// 这里是读取dfs.ha.namenodes.hp1属性,将逗号分隔的字符串转化为一个Collection,其实是个ArrayList
// 所以遍历整个Collection时其实是有序的,和字符串中逗号分隔的顺序一样
Collection<String> nnIds = getNameNodeIds(conf, nsId);
// 注意这里是个HashMap存储,按理说应该是无序的
Map<String, InetSocketAddress> ret = Maps.newHashMap();
// 注意这里的遍历是有序的,如果配置是nn2,nn1,遍历的顺序就是先nn2再nn1
for (String nnId : emptyAsSingletonNull(nnIds)) {
String suffix = concatSuffixes(nsId, nnId);
String address = getConfValue(defaultValue, suffix, conf, keys);
if (address != null) {
InetSocketAddress isa = NetUtils.createSocketAddr(address);
// 放到一个HashMap里
ret.put(nnId, isa);
}
}
return ret;
}

于是问题退化成了为何HashMap.values()方法返回的集合是有序的。。。

可以用下面一段代码验证:

1
2
3
4
5
6
7
8
9
HashMap<String, String> map = new HashMap<String, String>();
map.put("nn4", "value4");
map.put("nn3", "value3");
map.put("nn1", "value1");
map.put("nn2", "value2");
map.put("nn5", "value5");
for (String x : map.values()) {
System.out.println(x); // 这里的输出必定是有序的value1,value2,value3,value4,value5
}

其实这是个巧合。。。HashMap确实不保证顺序。

HashMap内部用一个Entry[]保存各个元素,默认size是16,就是16个桶。HashMap根据对象的hashCode方法决定将对象放入哪个桶。而nn1~nn5这5个字符串的hashCode都正好相差1,HashMap转换后会将这个5个key分别放入9、10、11、12、13号桶(具体的计算方法不列了)。而values()方法返回的集合的iterator是按顺序遍历Entry[]的(从Entry[0]开始顺序遍历),结果就好像里面的元素有序一样。。。

具体的原理去看HashMap的代码吧。

追查了半天结果只是个巧合。。。不过也算有些收获。如果我们的集群配置不是nn1,nn2,而是其他字符串,结果应该就不同了

P.S.1

关于DFSClient的重试机制

DFSClient连接出现异常时会重试,比如连接DN异常时加入deadNode再换一个节点。重试机制就是通过上面说过的RetryProxy.create方法实现的。
关于重试有很多参数可以调,见DFSClient.Conf类。

P.S.2

关于HA相关参数

上述配置中的hp1/nn1/nn2等名称完全是客户端的,其实可以是任意字符串,只要能对应上就行,可以和服务端不同。
因为hp1之类的名称只是逻辑上的概念,不会真的存在一个叫hp1的机器,客户端API会自动根据配置替换成实际的地址。