偶然的机会,想到一个问题,HA的集群中,客户端是如何得知Active NN的地址的?
以前以为是从zk中查的,类似hbase一样,只配置一个zookeeper集群的地址就可以访问了。
结果发现不是。。 hadoop的机制就是简单暴力的重试。不过也有些道理,毕竟NN最多就2个,重试最多1次就能知道哪个是Active了。不像hbase,可能有任意多个master,一个个重试的话效率会很低
新的问题
但测试时又发现有些问题。我们的配置:
1
2
3
4
5
6
7
8
9
10
11
12
<property >
<name > dfs.ha.namenodes.hp1</name >
<value > nn1,nn2</value >
</property >
<property >
<name > dfs.namenode.rpc-address.hp1.nn1</name >
<value > inspur129.photo.163.org:8020</value >
</property >
<property >
<name > dfs.namenode.rpc-address.hp1.nn2</name >
<value > inspur120.photo.163.org:8020</value >
</property >
发现无论dfs.ha.namenodes.hp1写成nn1,nn2还是nn2,nn1,客户端总是会先尝试去连nn1。其实nn1是standby,如果开启了DEBUG日志,就会看到客户端抛出一个异常:
1
2
3
4
5
6
7
15 /02 /02 15 :48 :05 DEBUG ipc.Client: IPC Client (2136875125 ) connection to inspur129.photo.163 .org/10.160 .128 .197 :8020 from mine/admin@HADOOP.HZ.NETEASE.COM got value
15 /02 /02 15 :48 :05 DEBUG retry.RetryInvocationHandler: Exception while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB. Trying to fail over immediately.
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby
at org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87 )
at org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1496 )
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1030 )
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3275 )
然后才去尝试nn2并成功:
1
2
3
4
15 /02 /02 15 :48 :05 DEBUG ipc.Client: IPC Client (2136875125 ) connection to inspur120.photo.163 .org/10.160 .128 .192 :8020 from mine/admin@HADOOP.HZ.NETEASE.COM: starting, having connections 2
15 /02 /02 15 :48 :05 DEBUG ipc.Client: IPC Client (2136875125 ) connection to inspur120.photo.163 .org/10.160 .128 .192 :8020 from mine/admin@HADOOP.HZ.NETEASE.COM sending
15 /02 /02 15 :48 :05 DEBUG ipc.Client: IPC Client (2136875125 ) connection to inspur120.photo.163 .org/10.160 .128 .192 :8020 from mine/admin@HADOOP.HZ.NETEASE.COM got value
15 /02 /02 15 :48 :05 DEBUG ipc.ProtobufRpcEngine: Call: getFileInfo took 29 ms
无论重复多少次都是这样。为什么呢?按理说没必要排序。一时好奇研究下。
看下代码
一路追查:FileSystem.get(conf)
-> FileSystem.createFileSystem
-> FileSystem.initialize
-> DistributedFileSystem.initialize
:
DistributedFileSystem.java 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void initialize (URI uri, Configuration conf) throws IOException {
super .initialize(uri, conf);
setConf(conf);
String host = uri.getHost();
if (host == null ) {
throw new IOException("Incomplete HDFS URI, no host: " + uri);
}
this .dfs = new DFSClient(uri, conf, statistics);
this .uri = URI.create(uri.getScheme()+"://" +uri.getAuthority());
this .workingDir = getHomeDirectory();
}
继续看DFSClient的构造函数:
DFSClient.java 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public DFSClient (URI nameNodeUri, Configuration conf,
FileSystem.Statistics stats)
throws IOException {
this (nameNodeUri, null , conf, stats);
}
public DFSClient (URI nameNodeUri, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats)
throws IOException {
......
if (proxyInfo != null ) {
this .dtService = proxyInfo.getDelegationTokenService();
this .namenode = proxyInfo.getProxy();
} else if (rpcNamenode != null ) {
Preconditions.checkArgument(nameNodeUri == null );
this .namenode = rpcNamenode;
dtService = null ;
} else {
Preconditions.checkArgument(nameNodeUri != null ,
"null URI" );
proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
ClientProtocol.class);
this .dtService = proxyInfo.getDelegationTokenService();
this .namenode = proxyInfo.getProxy();
}
......
}
继续:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static <T> ProxyAndInfo<T> createProxy (Configuration conf,
URI nameNodeUri, Class<T> xface) throws IOException {
Class<FailoverProxyProvider<T>> failoverProxyProviderClass =
getFailoverProxyProviderClass(conf, nameNodeUri, xface);
if (failoverProxyProviderClass == null ) {
return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
UserGroupInformation.getCurrentUser(), true );
} else {
FailoverProxyProvider<T> failoverProxyProvider = NameNodeProxies
.createFailoverProxyProvider(conf, failoverProxyProviderClass, xface,
nameNodeUri);
Conf config = new Conf(conf);
T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
config.maxFailoverAttempts, config.failoverSleepBaseMillis,
config.failoverSleepMaxMillis));
Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri);
return new ProxyAndInfo<T>(proxy, dtService);
}
}
RetryProxy.create方法具体的代码不列了,大概的原理是通过代理返回一个实现了ClientProtocol接口的对象A(这个对象是java动态生成的,没有对应的Class定义),这个对象内部包装了一个真正的ClientProtocol对象B,对A的操作都是在内部转化成对B的操作,只是加上了很多异常处理和重试机制。要先了解代理机制。
那真正的对象B是怎么得到的?是FailoverProxyProvider.getProxy得到的,看具体的实现类ConfiguredFailoverProxyProvider:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private final List<AddressRpcProxyPair<T>> proxies =
new ArrayList<AddressRpcProxyPair<T>>();
public ConfiguredFailoverProxyProvider (Configuration conf, URI uri,
Class<T> xface) {
......
Map<String, Map<String, InetSocketAddress>> map = DFSUtil.getHaNnRpcAddresses(
conf);
Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost());
if (addressesInNN == null || addressesInNN.size() == 0 ) {
throw new RuntimeException("Could not find any configured addresses " +
"for URI " + uri);
}
Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
for (InetSocketAddress address : addressesOfNns) {
proxies.add(new AddressRpcProxyPair<T>(address));
}
......
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public synchronized T getProxy () {
AddressRpcProxyPair current = proxies.get(currentProxyIndex);
if (current.namenode == null ) {
try {
current.namenode = NameNodeProxies.createNonHAProxy(conf,
current.address, xface, ugi, false ).getProxy();
} catch (IOException e) {
LOG.error("Failed to create RPC proxy to NameNode" , e);
throw new RuntimeException(e);
}
}
return (T)current.namenode;
}
继续看DFSUtil的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private static Map<String, Map<String, InetSocketAddress>>
getAddresses (Configuration conf,
String defaultAddress, String... keys) {
Collection<String> nameserviceIds = getNameServiceIds(conf);
Map<String, Map<String, InetSocketAddress>> ret = Maps.newLinkedHashMap();
for (String nsId : emptyAsSingletonNull(nameserviceIds)) {
Map<String, InetSocketAddress> isas =
getAddressesForNameserviceId(conf, nsId, defaultAddress, keys);
if (!isas.isEmpty()) {
ret.put(nsId, isas);
}
}
return ret;
}
private static Map<String, InetSocketAddress> getAddressesForNameserviceId (
Configuration conf, String nsId, String defaultValue,
String[] keys) {
Collection<String> nnIds = getNameNodeIds(conf, nsId);
Map<String, InetSocketAddress> ret = Maps.newHashMap();
for (String nnId : emptyAsSingletonNull(nnIds)) {
String suffix = concatSuffixes(nsId, nnId);
String address = getConfValue(defaultValue, suffix, conf, keys);
if (address != null ) {
InetSocketAddress isa = NetUtils.createSocketAddr(address);
ret.put(nnId, isa);
}
}
return ret;
}
于是问题退化成了为何HashMap.values()方法返回的集合是有序的。。。
可以用下面一段代码验证:
1
2
3
4
5
6
7
8
9
HashMap<String, String> map = new HashMap<String, String>();
map.put("nn4" , "value4" );
map.put("nn3" , "value3" );
map.put("nn1" , "value1" );
map.put("nn2" , "value2" );
map.put("nn5" , "value5" );
for (String x : map.values()) {
System.out.println(x);
}
其实这是个巧合。。。HashMap确实不保证顺序。
HashMap内部用一个Entry[]保存各个元素,默认size是16,就是16个桶。HashMap根据对象的hashCode方法决定将对象放入哪个桶。而nn1~nn5这5个字符串的hashCode都正好相差1,HashMap转换后会将这个5个key分别放入9、10、11、12、13号桶(具体的计算方法不列了)。而values()方法返回的集合的iterator是按顺序遍历Entry[]的(从Entry[0]开始顺序遍历),结果就好像里面的元素有序一样。。。
具体的原理去看HashMap的代码吧。
追查了半天结果只是个巧合。。。不过也算有些收获。如果我们的集群配置不是nn1,nn2,而是其他字符串,结果应该就不同了
P.S.1
关于DFSClient的重试机制
DFSClient连接出现异常时会重试,比如连接DN异常时加入deadNode再换一个节点。重试机制就是通过上面说过的RetryProxy.create方法实现的。 关于重试有很多参数可以调,见DFSClient.Conf类。
P.S.2
关于HA相关参数
上述配置中的hp1/nn1/nn2等名称完全是客户端的,其实可以是任意字符串,只要能对应上就行,可以和服务端不同。 因为hp1之类的名称只是逻辑上的概念,不会真的存在一个叫hp1的机器,客户端API会自动根据配置替换成实际的地址。