偶然的机会,想到一个问题,HA的集群中,客户端是如何得知Active NN的地址的?
以前以为是从zk中查的,类似hbase一样,只配置一个zookeeper集群的地址就可以访问了。
结果发现不是。。
新的问题 
但测试时又发现有些问题。我们的配置:
1
2
3
4
5
6
7
8
9
10
11
12
<property > 
    <name > dfs.ha.namenodes.hp1</name > 
    <value > nn1,nn2</value > 
</property > 
<property > 
    <name > dfs.namenode.rpc-address.hp1.nn1</name > 
    <value > inspur129.photo.163.org:8020</value > 
</property > 
<property > 
    <name > dfs.namenode.rpc-address.hp1.nn2</name > 
    <value > inspur120.photo.163.org:8020</value > 
</property > 
发现无论dfs.ha.namenodes.hp1写成nn1,nn2还是nn2,nn1,客户端总是会先尝试去连nn1。其实nn1是standby,如果开启了DEBUG日志,就会看到客户端抛出一个异常:
1
2
3
4
5
6
7
15 /02 /02  15 :48 :05  DEBUG ipc.Client: IPC Client (2136875125 ) connection to  inspur129.photo.163 .org/10.160 .128 .197 :8020  from  mine/admin@HADOOP.HZ.NETEASE.COM got value 
15 /02 /02  15 :48 :05  DEBUG retry.RetryInvocationHandler: Exception while  invoking getFileInfo of  class  ClientNamenodeProtocolTranslatorPB. Trying to  fail over  immediately.
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is  not  supported in  state standby
    at  org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87 )
    at  org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1496 )
    at  org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1030 )
    at  org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3275 )
然后才去尝试nn2并成功:
1
2
3
4
15 /02 /02  15 :48 :05  DEBUG ipc.Client: IPC Client (2136875125 ) connection to  inspur120.photo.163 .org/10.160 .128 .192 :8020  from  mine/admin@HADOOP.HZ.NETEASE.COM: starting, having connections 2 
15 /02 /02  15 :48 :05  DEBUG ipc.Client: IPC Client (2136875125 ) connection to  inspur120.photo.163 .org/10.160 .128 .192 :8020  from  mine/admin@HADOOP.HZ.NETEASE.COM sending 
15 /02 /02  15 :48 :05  DEBUG ipc.Client: IPC Client (2136875125 ) connection to  inspur120.photo.163 .org/10.160 .128 .192 :8020  from  mine/admin@HADOOP.HZ.NETEASE.COM got value  
15 /02 /02  15 :48 :05  DEBUG ipc.ProtobufRpcEngine: Call: getFileInfo took 29 ms
无论重复多少次都是这样。为什么呢?按理说没必要排序。一时好奇研究下。
看下代码 
一路追查:FileSystem.get(conf) -> FileSystem.createFileSystem -> FileSystem.initialize -> DistributedFileSystem.initialize:
DistributedFileSystem.java 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override 
public  void  initialize (URI uri, Configuration conf) throws  IOException {
  super .initialize(uri, conf);
  setConf(conf);
 
  String host = uri.getHost();
  if  (host == null ) {
    throw  new  IOException("Incomplete HDFS URI, no host: " + uri);
  }
  
  
  this .dfs = new  DFSClient(uri, conf, statistics);
  this .uri = URI.create(uri.getScheme()+"://" +uri.getAuthority());
  this .workingDir = getHomeDirectory();
}
继续看DFSClient的构造函数:
DFSClient.java 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public  DFSClient (URI nameNodeUri, Configuration conf,
                 FileSystem.Statistics stats)
  throws  IOException {
  this (nameNodeUri, null , conf, stats);
}
 
public  DFSClient (URI nameNodeUri, ClientProtocol rpcNamenode,
    Configuration conf, FileSystem.Statistics stats)
  throws  IOException {
  
  ......
  if  (proxyInfo != null ) {
    this .dtService = proxyInfo.getDelegationTokenService();
    this .namenode = proxyInfo.getProxy();
  } else  if  (rpcNamenode != null ) {
    
    Preconditions.checkArgument(nameNodeUri == null );
    this .namenode = rpcNamenode;
    dtService = null ;
  } else  {
    Preconditions.checkArgument(nameNodeUri != null ,
        "null URI" );
    
    proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
        ClientProtocol.class);
    this .dtService = proxyInfo.getDelegationTokenService();
    this .namenode = proxyInfo.getProxy();
  }
  ......
}
继续:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public  static  <T> ProxyAndInfo<T> createProxy (Configuration conf,
    URI nameNodeUri, Class<T> xface) throws  IOException {
  
  
  Class<FailoverProxyProvider<T>> failoverProxyProviderClass =
      getFailoverProxyProviderClass(conf, nameNodeUri, xface);
  
  if  (failoverProxyProviderClass == null ) {
    
    return  createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
        UserGroupInformation.getCurrentUser(), true );
  } else  {
    
    FailoverProxyProvider<T> failoverProxyProvider = NameNodeProxies
        .createFailoverProxyProvider(conf, failoverProxyProviderClass, xface,
            nameNodeUri);
    Conf config = new  Conf(conf);
    
    T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies
        .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
            config.maxFailoverAttempts, config.failoverSleepBaseMillis,
            config.failoverSleepMaxMillis));
    
    Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri);
    return  new  ProxyAndInfo<T>(proxy, dtService);
  }
}
RetryProxy.create方法具体的代码不列了,大概的原理是通过代理返回一个实现了ClientProtocol接口的对象A(这个对象是java动态生成的,没有对应的Class定义),这个对象内部包装了一个真正的ClientProtocol对象B,对A的操作都是在内部转化成对B的操作,只是加上了很多异常处理和重试机制。要先了解代理机制。
那真正的对象B是怎么得到的?是FailoverProxyProvider.getProxy得到的,看具体的实现类ConfiguredFailoverProxyProvider: 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private  final  List<AddressRpcProxyPair<T>> proxies =
    new  ArrayList<AddressRpcProxyPair<T>>();
 
public  ConfiguredFailoverProxyProvider (Configuration conf, URI uri,
    Class<T> xface) {
    ......
    
    Map<String, Map<String, InetSocketAddress>> map = DFSUtil.getHaNnRpcAddresses(
        conf);
    Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost());
    
    if  (addressesInNN == null  || addressesInNN.size() == 0 ) {
      throw  new  RuntimeException("Could not find any configured addresses "  +
          "for URI "  + uri);
    }
    
    Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
    
    for  (InetSocketAddress address : addressesOfNns) {
      proxies.add(new  AddressRpcProxyPair<T>(address));
    }
    ......
  } catch  (IOException e) {
    throw  new  RuntimeException(e);
  }
}
 
public  synchronized  T getProxy () {
  
  AddressRpcProxyPair current = proxies.get(currentProxyIndex);
  if  (current.namenode == null ) {
    try  {
      current.namenode = NameNodeProxies.createNonHAProxy(conf,
          current.address, xface, ugi, false ).getProxy();
    } catch  (IOException e) {
      LOG.error("Failed to create RPC proxy to NameNode" , e);
      throw  new  RuntimeException(e);
    }
  }
  return  (T)current.namenode;
}
继续看DFSUtil的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private  static  Map<String, Map<String, InetSocketAddress>>
  getAddresses (Configuration conf,
    String defaultAddress, String... keys) {
  
  Collection<String> nameserviceIds = getNameServiceIds(conf);
  
  
  Map<String, Map<String, InetSocketAddress>> ret = Maps.newLinkedHashMap();
  for  (String nsId : emptyAsSingletonNull(nameserviceIds)) {
    Map<String, InetSocketAddress> isas =
      getAddressesForNameserviceId(conf, nsId, defaultAddress, keys);
    if  (!isas.isEmpty()) {
      ret.put(nsId, isas);
    }
  }
  return  ret;
}
 
private  static  Map<String, InetSocketAddress> getAddressesForNameserviceId (
    Configuration conf, String nsId, String defaultValue,
    String[] keys) {
  
  
  Collection<String> nnIds = getNameNodeIds(conf, nsId);
  
  Map<String, InetSocketAddress> ret = Maps.newHashMap();
  
  for  (String nnId : emptyAsSingletonNull(nnIds)) {
    String suffix = concatSuffixes(nsId, nnId);
    String address = getConfValue(defaultValue, suffix, conf, keys);
    if  (address != null ) {
      InetSocketAddress isa = NetUtils.createSocketAddr(address);
      
      ret.put(nnId, isa);
    }
  }
  return  ret;
}
于是问题退化成了为何HashMap.values()方法返回的集合是有序的。。。
可以用下面一段代码验证:
1
2
3
4
5
6
7
8
9
HashMap<String, String> map = new  HashMap<String, String>();
map.put("nn4" , "value4" );
map.put("nn3" , "value3" );
map.put("nn1" , "value1" );
map.put("nn2" , "value2" );
map.put("nn5" , "value5" );
for  (String x : map.values()) {
    System.out.println(x);   
}
其实这是个巧合。。。HashMap确实不保证顺序。
HashMap内部用一个Entry[]保存各个元素,默认size是16,就是16个桶。HashMap根据对象的hashCode方法决定将对象放入哪个桶。而nn1~nn5这5个字符串的hashCode都正好相差1,HashMap转换后会将这个5个key分别放入9、10、11、12、13号桶(具体的计算方法不列了)。而values()方法返回的集合的iterator是按顺序遍历Entry[]的(从Entry[0]开始顺序遍历),结果就好像里面的元素有序一样。。。
具体的原理去看HashMap的代码吧。
追查了半天结果只是个巧合。。。不过也算有些收获。如果我们的集群配置不是nn1,nn2,而是其他字符串,结果应该就不同了
P.S.1 
关于DFSClient的重试机制
DFSClient连接出现异常时会重试,比如连接DN异常时加入deadNode再换一个节点。重试机制就是通过上面说过的RetryProxy.create方法实现的。
P.S.2 
关于HA相关参数
上述配置中的hp1/nn1/nn2等名称完全是客户端的,其实可以是任意字符串,只要能对应上就行,可以和服务端不同。