RM HA机制的一些研究

之前一直在研究hadoop 2.5.2。稍微整理下RM HA机制。

与NN HA的区别

NN HA中,DN会同时向ActiveNN和StandbyNN发送心跳。
RM HA中,NM只会向ActiveRM发送心跳。StandyRM中的很多服务甚至不会启动,见代码(ResourceManager类):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
synchronized void transitionToActive() throws Exception {
if (rmContext.getHAServiceState() ==
HAServiceProtocol.HAServiceState.ACTIVE) {
LOG.info("Already in active state");
return;
}
LOG.info("Transitioning to active state");
// use rmLoginUGI to startActiveServices.
// in non-secure model, rmLoginUGI will be current UGI
// in secure model, rmLoginUGI will be LoginUser UGI
this.rmLoginUGI.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
// 这个方法会启动一些只在active状态下的服务,比如调度器、web界面、NM心跳监控等等
startActiveServices();
return null;
}
});
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE);
LOG.info("Transitioned to active state");
}
synchronized void transitionToStandby(boolean initialize)
throws Exception {
if (rmContext.getHAServiceState() ==
HAServiceProtocol.HAServiceState.STANDBY) {
LOG.info("Already in standby state");
return;
}
LOG.info("Transitioning to standby state");
if (rmContext.getHAServiceState() ==
HAServiceProtocol.HAServiceState.ACTIVE) {
// 进入StandBy状态后,会停止一些服务
stopActiveServices();
if (initialize) {
resetDispatcher();
createAndInitActiveServices();
}
}
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
LOG.info("Transitioned to standby state");
}

这个机制决定了RM的HA切换会比较慢,不像NN的切换那么迅速。

RM HA的另一个不同之处是选举机制内建在RM里(EmbeddedElectorService类),而NN HA是用单独的zkfc进程进行选举的。zkfc进程和NN进程通过RPC协议(ZKFCProtocol)进行通信。
zkfc的隔离性比较好。比如NN进程意外挂掉时,zkfc很快会监控到NN挂掉并重新发起选举。
如果是内建选举机制,可能会有bug,比如RM进程意外挂掉(比如直接kill -9),要等zk超时后才能再次选举。failover的时间会比较长。
这个超时机制见我另一篇文章

客户端

客户端(NM也可以认为是个客户端)判断Active的方法和NN HA是类似的(见我另一篇文章),都是纯客户端操作,见RMProxy类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
protected static <T> T createRMProxy(final Configuration configuration,
final Class<T> protocol, RMProxy instance) throws IOException {
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
? (YarnConfiguration) configuration
: new YarnConfiguration(configuration);
RetryPolicy retryPolicy = createRetryPolicy(conf);
// 在客户端判断哪个是ActiveRM
// 判断yarn.resourcemanager.ha.enabled属性是否是true
if (HAUtil.isHAEnabled(conf)) {
RMFailoverProxyProvider<T> provider =
instance.createRMFailoverProxyProvider(conf, protocol);
return (T) RetryProxy.create(protocol, provider, retryPolicy);
} else {
// 取出yarn.resourcemanager.resource-tracker.address属性的值
// 默认值是${yarn.resourcemanager.hostname}:8031
InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
LOG.info("Connecting to ResourceManager at " + rmAddress);
T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
}
}

Recovery失败导致导致RM无法启动

非常严重的bug。单独拿出来说说。YARN HA的坑还是比较多的。
具体表现为启动RM无法启动,一直报异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
org.apache.hadoop.ha.ServiceFailedException: RM could not transition to Active
at org.apache.hadoop.yarn.server.resourcemanager.EmbeddedElectorService.becomeActive(EmbeddedElectorService.java:122)
at org.apache.hadoop.ha.ActiveStandbyElector.becomeActive(ActiveStandbyElector.java:805)
at org.apache.hadoop.ha.ActiveStandbyElector.processResult(ActiveStandbyElector.java:416)
at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:599)
at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:498)
Caused by: org.apache.hadoop.ha.ServiceFailedException: Error when transitioning to Active mode
at org.apache.hadoop.yarn.server.resourcemanager.AdminService.transitionToActive(AdminService.java:284)
at org.apache.hadoop.yarn.server.resourcemanager.EmbeddedElectorService.becomeActive(EmbeddedElectorService.java:120)
... 4 more
Caused by: org.apache.hadoop.service.ServiceStateException: RMActiveServices cannot enter state STARTED from state STOPPED
at org.apache.hadoop.service.ServiceStateModel.checkStateTransition(ServiceStateModel.java:129)
at org.apache.hadoop.service.ServiceStateModel.enterState(ServiceStateModel.java:111)
at org.apache.hadoop.service.AbstractService.start(AbstractService.java:190)

相关JIRA:
https://issues.apache.org/jira/browse/YARN-2010
https://issues.apache.org/jira/browse/YARN-2019
https://issues.apache.org/jira/browse/YARN-2588

根源在于RM HA启动时会去zk读取之前的状态(recovery过程),如果zk中的数据有问题,recovery过程会抛出异常,这个异常没处理好,会直接抛到上层,导致RM进入STOPPED状态。

进入STOPPED状态后,就不能再变成其他状态了。(状态转换关系见ServiceStateModel类)。相关代码:

RMActiveServices.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
protected void serviceStart() throws Exception {
RMStateStore rmStore = rmContext.getStateStore();
// The state store needs to start irrespective of recoveryEnabled as apps
// need events to move to further states.
rmStore.start();
if(recoveryEnabled) {
try {
rmStore.checkVersion();
if (rmContext.isWorkPreservingRecoveryEnabled()) {
rmContext.setEpoch(rmStore.getAndIncrementEpoch());
}
RMState state = rmStore.loadState();
recover(state);
} catch (Exception e) {
// the Exception from loadState() needs to be handled for
// HA and we need to give up master status if we got fenced
LOG.error("Failed to load/recover state", e);
throw e;
}
}
super.serviceStart();
}

异常链:RMActiveServices.serviceStart() -> RMActiveServices.start() -> ResourceManager.startActiveServices() -> ResourceManager.transitionToActive() -> ResourceManager.serviceStart() -> AbstractService.start()

AbstractService相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public void start() {
if (isInState(STATE.STARTED)) {
return;
}
//enter the started state
synchronized (stateChangeLock) {
if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
try {
startTime = System.currentTimeMillis();
serviceStart();
if (isInState(STATE.STARTED)) {
//if the service started (and isn't now in a later state), notify
if (LOG.isDebugEnabled()) {
LOG.debug("Service " + getName() + " is started");
}
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
// 这个方法会将service的状态改为STOPPED
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}

JIRA上的补丁的原理就是加一层try cache,recovery过程的异常单独处理,不要抛到上层。
recovery的异常不应该影响RM的正常功能。

引起recovery异常的原因有很多。我碰到的情况是token过期。
RM启动时尝试读取zk中的APP信息:

1
2
3
4
5
6
2015-03-27 15:02:44,897 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl: Recovering app: application_1425461709120_0005 with 1 attempts and final state = FINISHED
2015-03-27 15:02:44,897 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: Recovering attempt: appattempt_1425461709120_0005_000001 with final state: FINISHED
2015-03-27 15:02:44,897 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1425461709120_0005_000001 State change from NEW to FINISHED
2015-03-27 15:02:44,897 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl: application_1425461709120_0005 State change from NEW to FINISHED
2015-03-27 15:02:44,897 INFO org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=censor OPERATION=Application Finished - Succeeded TARGET=RMAppManager RESULT=SUCCESS APPID=application_1425461709120_0005
2015-03-27 15:02:44,898 INFO org.apache.hadoop.yarn.server.resourcemanager.RMAppManager$ApplicationSummary: appId=application_1425461709120_0005,name=Iteration 1 of 50\, input path: /user/censor/yixin/modeltmp/model-0,user=censor,queue=censor\,default,state=FINISHED,trackingUrl=http://inspur116.photo.163.org:8088/proxy/application_1425461709120_0005/jobhistory/job/job_1425461709120_0005,appMasterHost=N/A,startTime=1425463503225,finishTime=1425463538077,finalStatus=SUCCEEDED

出现异常(省略了部分内容):

1
2
3
4
5
6
7
8
9
10
org.apache.hadoop.security.token.SecretManager$InvalidToken: yarn tried to renew an expired token
at org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.renewToken(AbstractDelegationTokenSecretManager.java:366)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renewDelegationToken(FSNamesystem.java:6726)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.renewDelegationToken(NameNodeRpcServer.java:504)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.renewDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:939)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): yarn tried to renew an expired token
at org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.renewToken(AbstractDelegationTokenSecretManager.java:366)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renewDelegationToken(FSNamesystem.java:6726)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.renewDelegationToken(NameNodeRpcServer.java:504)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.renewDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:939)

如果之前app的状态是FINISHED(无论SUCCESS或FAILED),都不会有问题。但如果之前app的状态是UNDEFINED,RM会尝试重新运行这个程序。之前app用的token存在zk里了,RM会尝试更新这个token并重新运行程序。

但我是从2.2.0升到2.5.2的,以前的token早就无效了(7天),所以会有异常。
而且NN都重启过了,就算token没有过期,我觉得也会抛其他异常。相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public synchronized long renewToken(Token<TokenIdent> token,
String renewer) throws InvalidToken, IOException {
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf);
TokenIdent id = createIdentifier();
id.readFields(in);
LOG.info("Token renewal for identifier: " + id + "; total currentTokens "
+ currentTokens.size());
// token过期
long now = Time.now();
if (id.getMaxDate() < now) {
throw new InvalidToken(renewer + " tried to renew an expired token");
}
if ((id.getRenewer() == null) || (id.getRenewer().toString().isEmpty())) {
throw new AccessControlException(renewer +
" tried to renew a token without a renewer");
}
if (!id.getRenewer().toString().equals(renewer)) {
throw new AccessControlException(renewer +
" tries to renew a token with renewer " + id.getRenewer());
}
// 如果这个token不存在,估计也会抛异常
DelegationKey key = allKeys.get(id.getMasterKeyId());
if (key == null) {
throw new InvalidToken("Unable to find master key for keyId="
+ id.getMasterKeyId()
+ " from cache. Failed to renew an unexpired token"
+ " with sequenceNumber=" + id.getSequenceNumber());
}

解决方法:不改代码的话,只能手动清除zk中的数据。可以手动删除zk中的/rmstore路径:

1
2
setAcl /rmstore/ZKRMStateRoot world:anyone:cdrwa
rmr /rmstore

或者在yarn-site.xml中设置yarn.resourcemanager.zk-state-store.parent-path属性,比如/rmstore2,将数据存到另外一个路径。

其他一些bug

RM failover可能导致job失败

跟failover的时机有关:MAPREDUCE-5718
如果map和reduce都结束后,AM将进入commit阶段(其实就是在staging dir写个文件),如果正好在这个时候failover,job虽然会重新提交,但必定失败。
触发几率应该不大,HA本身就是个备用机制,不能太依赖。

不能手动failover

开启RM HA后不能手动failover,见邮件列表。会抛出异常:

1
2
3
4
5
6
7
8
9
10
11
12
hadoop@inspur116:~/hadoop-current/bin$ ./yarn rmadmin -failover rm1 rm2
Exception in thread "main" java.lang.UnsupportedOperationException: RMHAServiceTarget doesn't have a corresponding ZKFC address
at org.apache.hadoop.yarn.client.RMHAServiceTarget.getZKFCAddress(RMHAServiceTarget.java:51)
at org.apache.hadoop.ha.HAServiceTarget.getZKFCProxy(HAServiceTarget.java:94)
at org.apache.hadoop.ha.HAAdmin.gracefulFailoverThroughZKFCs(HAAdmin.java:315)
at org.apache.hadoop.ha.HAAdmin.failover(HAAdmin.java:286)
at org.apache.hadoop.ha.HAAdmin.runCmd(HAAdmin.java:453)
at org.apache.hadoop.ha.HAAdmin.run(HAAdmin.java:382)
at org.apache.hadoop.yarn.client.cli.RMAdminCLI.run(RMAdminCLI.java:318)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at org.apache.hadoop.yarn.client.cli.RMAdminCLI.main(RMAdminCLI.java:434)

相关JIRA:https://issues.apache.org/jira/browse/YARN-1177

网络问题导致RM failover失败

https://issues.apache.org/jira/browse/YARN-2578
详细的分析见另一篇文章