2.5.2中queueMaxAppsDefault的一个bug

升级hadoop 2.5.2过程中碰到的一个问题。
很难说是个bug,更像是功能上的一些变化,社区也没有相关jira。
看看代码研究下。
调度器的基本概念见我以前的文章

症状

在我们的fair-scheduler.xml中一直有这样一个属性:

1
<queueMaxAppsDefault>2</queueMaxAppsDefault>

本意是将默认值设小一点,防止一些意外情况。在2.2.0中时,一直正常使用。
但升级到2.5.2后,发现这个设置变成了全局的,变成了整个集群只能并发执行2个任务。
删掉这个属性后就正常了。

我们的队列配置类似这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<allocations>
<queue name="a">
<minResources>10240 mb, 10 vcores</minResources>
<maxResources>409600 mb, 200 vcores</maxResources>
<maxRunningApps>20</maxRunningApps>
<weight>1.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
<aclSubmitApps>a</aclSubmitApps>
<aclAdministerApps>hadoop,yarn,a</aclAdministerApps>
</queue>
<queue name="b">
<minResources>10240 mb, 10 vcores</minResources>
<maxResources>409600 mb, 200 vcores</maxResources>
<maxRunningApps>20</maxRunningApps>
<weight>1.5</weight>
<schedulingPolicy>fair</schedulingPolicy>
<aclSubmitApps>b</aclSubmitApps>
<aclAdministerApps>b,hadoop,yarn</aclAdministerApps>
</queue>
</allocations>

队列结构只有2层。一个root队列,其他都是LeafQueue。并且为每个LeafQueue单独配置了maxRunningApps属性。

2.2.0的代码

2.2.0中,队列相关配置由QueueManager类管理。包括加载fair-scheduler.xml、更新配置等等。

FairScheduler中有一个线程UpdateThread,默认每0.5秒调用一次update方法:

FairScheduler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected synchronized void update() {
// 检查是否要重新加载fair-scheduler.xml
queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file
// 关键的一个方法,计算每个队列是否超出了资源限制、app数量限制
// 如果已经超出限制,所有的app的runnable属性会被设为false,不会再分配container
updateRunnability(); // Set job runnability based on user/queue limits
// 接下来的是抢占式调度的一些逻辑,不是本文重点
updatePreemptionVariables(); // Determine if any queues merit preemption
FSQueue rootQueue = queueMgr.getRootQueue();
// Recursively update demands for all queues
rootQueue.updateDemand();
rootQueue.setFairShare(clusterCapacity);
// Recursively compute fair shares for all queues
// and update metrics
rootQueue.recomputeShares();
}

继续看updateRunnability方法:

FairScheduler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void updateRunnability() {
List<AppSchedulable> apps = new ArrayList<AppSchedulable>();
// Start by marking everything as not runnable
// 注意这里,这里只处理了所有leafQueue,对于parentQueue,没有考虑maxApp限制
for (FSLeafQueue leafQueue : queueMgr.getLeafQueues()) {
for (AppSchedulable a : leafQueue.getAppSchedulables()) {
a.setRunnable(false);
apps.add(a);
}
}
// Create a list of sorted jobs in order of start time and priority
Collections.sort(apps, new FifoAppComparator());
// Mark jobs as runnable in order of start time and priority, until
// user or queue limits have been reached.
Map<String, Integer> userApps = new HashMap<String, Integer>();
Map<String, Integer> queueApps = new HashMap<String, Integer>();
for (AppSchedulable app : apps) {
String user = app.getApp().getUser();
String queue = app.getApp().getQueueName();
int userCount = userApps.containsKey(user) ? userApps.get(user) : 0;
int queueCount = queueApps.containsKey(queue) ? queueApps
.get(queue) : 0;
// 这里获得的queueMaxApps就是我们在xml文件里为每个队列配置的maxRunningApps属性
// 如果某个队列没有配置,就返回queueMaxAppsDefault
if (userCount < queueMgr.getUserMaxApps(user)
&& queueCount < queueMgr.getQueueMaxApps(queue)) {
userApps.put(user, userCount + 1);
queueApps.put(queue, queueCount + 1);
app.setRunnable(true);
}
}
}

只有当NM发来心跳时才能分配container,分配过程是一次DFS。
当一个leafQueue下所有app的runnable都是false时,不会分配任何container。
具体代码不列了。见FairScheduler.nodeUpdate和FSParentQueue.assignContainer。

可见,对于root队列,FairScheduler没有检查任何限制条件。

2.5.2的代码

2.5.2的FairScheduler代码变化非常大。我目前看到的几个:

  1. 处理fair-scheduler.xml的逻辑从QueueMananger类剥离,独立作为一个service:AllocationFileLoaderService。
  2. FairScheduler handle的事件增加了很多APP_ATTEMPT相关的,见handle方法。相应的app提交过程也变化了。
  3. maxApp相关限制的判断独立为一个类:MaxRunningAppsEnforcer。

还有很多变化是为了配合RM HA的。

提交app后会触发addApplicationAttempt方法(相关的事件链不列了。这个方法也是2.5.2新增的)

FairScheduler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected synchronized void addApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt,
boolean shouldNotifyAttemptAdded) {
// 省略
// 关键在这里,提交app时会判断这个app是否runnable
// runnable=false的app会一直保持在ACCEPTED状态,不会分配任何container(包括AM Container)
boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
// 将app加到队列中,会更改app的状态
queue.addApp(attempt, runnable);
// 下面是一些统计信息
if (runnable) {
maxRunningEnforcer.trackRunnableApp(attempt);
} else {
maxRunningEnforcer.trackNonRunnableApp(attempt);
}
// 省略
}

继续看canAppBeRunnable方法:

MaxRunningAppsEnforcer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean canAppBeRunnable(FSQueue queue, String user) {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
// 获得当前用户RUNNING的app数量
Integer userNumRunnable = usersNumRunnableApps.get(user);
if (userNumRunnable == null) {
userNumRunnable = 0;
}
if (userNumRunnable >= allocConf.getUserMaxApps(user)) {
return false;
}
// Check queue and all parent queues
// 关键在这里,会一直回溯到root队列,判断appMax
// 而我们对root队列没有单独配置maxRunningApps属性,就会返回queueMaxAppsDefault
while (queue != null) {
int queueMaxApps = allocConf.getQueueMaxApps(queue.getName());
if (queue.getNumRunnableApps() >= queueMaxApps) {
return false;
}
queue = queue.getParent();
}
return true;
}

可见在2.5.2中,maxApp的限制对所有队列都生效,不区分ParentQueue和LeafQueue。
所以造成了整个集群只能运行2个任务的情况。

总结

很难说那种方式更好。
2.5.2里的FairScheduler,有点像CapacityScheduler了,父队列会限制子队列。
而2.2.0里的FairScheduler,父队列和子队列基本没关系(除了ACL)。
之前测试的时候,没有测过并发执行多个任务,直到线上升级才发现这个问题,也是考虑不周。