skip to content
logo

Search

Monorepo 中的任务调度机制

14 min read

Monorepo 中对每个项目进行的具体操作称为任务 task,Monorepo 管理工具应当有能力调度这些任务。

前言

Monorepo 中的每个项目称为 project,对 project 进行的具体操作称为任务 task,比如 buildtest,(可以狭义地理解为 npm scripts 中注册的操作)Monorepo 管理工具应当有能力调度这些任务。

举个栗子🌰,如图所示,一个依赖关系较为复杂的 Monorepo 工程,此时需要执行某个任务,例如 build,如何同时保证任务执行顺序以及任务执行效率(假设最大任务并行数为 N)?

task

接下来就是枯燥乏味的做题过程,咱们先把这张项目依赖图抽象成代码。

问题

interface Project {
  name: string;
  actions: { name: string; fn: () => Promise<void> }[];
  dependencyProjects: Project[];
}
// 辅助函数,用于在执行构建操作时模拟延迟。它返回一个 Promise,在指定的时间后完成。
const sleep = (s: number): Promise<void> =>
  new Promise((r) => setTimeout(r, s));
 
// Monorepo 中注册的所有项目
const projects: Project[] = [
  "@monorepo/a",
  "@monorepo/b",
  "@monorepo/c",
  "@monorepo/d",
  "@monorepo/x",
  "@monorepo/y",
  "@monorepo/z",
].map((name) => ({
  name,
  actions: [{ name: "build", fn: () => sleep(Math.random() * 1000) }],
  dependencyProjects: [],
}));
 
const [A, B, C, D, X, Y, Z] = projects;
 
A.dependencyProjects = [B];
B.dependencyProjects = [D];
C.dependencyProjects = [D, X, Y];
X.dependencyProjects = [Y, Z];
 
/**
 * 实现本方法,使得 build 行为按照正确的顺序执行,且保证执行效率
 * @param projects 需要执行任务的 project 集合
 * @param actionName 具体操作名称
 * @param limit 任务最大并行数
 */
function run(projects: Project[], actionName: string, limit: number) {
  // todo
}
 
run(projects, "build", 12);
 

解题

很明显,project 之间存在依赖关系,那么任务 task 之间也存在依赖关系,那么可以得到以下结论:

  1. 当前任务作为上游任务时,只有当前任务的下游任务都被完成(清空)时,当前任务才可以开始执行
  2. 当前任务作为下游任务时,当前任务完成后,需要更新其上游任务的依赖任务,从其内移除当前任务

于是 task 定义如下:

interface Task {
  // 任务名 `${projectName}:{actionName}`
  name: string;
  // 当前任务依赖的任务,即当前任务的下游任务,当该 dependenciesSet 被清空,说明当前任务可以被执行
  dependencies: Set<Task>;
  // 依赖当前任务的任务,即当前任务的上游任务,当前任务完成后,需要更新其上游任务的 dependenciesSet(从其内移除当前任务)
  dependents: Set<Task>;
  // 具体任务执行函数
  fn: () => Promise<void>;
}

初始化任务

根据 projects 参数,构造出项目对应的任务。

function run(projects: Project[], actionName: string, limit: number) {
  // 任务名与任务的映射
  const tasks = new Map<string, Task>();
  projects.forEach((project) =>
   // 使用 getTaskName 生成任务的唯一标识符
    tasks.set(getTaskName(project, actionName), {
      name: getTaskName(project, actionName),
      dependencies: new Set(),
      dependents: new Set(),
      // 设置任务执行函数
      fn: project.actions.find((a) => a.name === actionName)?.fn ?? noop,
    })
  );
}
 
// 辅助方法:获取任务名
const getTaskName = (project: Project, actionName: string) =>
 `${project.name}:${actionName}`;
// 辅助方法:用于在异步代码中占位,或者在需要传递异步函数但实际上不需要执行异步操作的情况下使用
const noop = ():Promise<void> => new Promise((r) => r());

补充 dependencies 与 dependents

假设存在 project1,对其进行以下操作:

  1. 取到当前项目对应的任务 task1
  2. 获取当前任务对应的下游任务名 dependencyTaskNames(基于 project1.dependencyProjects)
  3. 遍历下游任务名 dependencyTaskName
  4. 取到下游任务(由任务列表 tasks 初始化而来) dependencyTask
  5. 补充 task1 的 dependencies
  6. 补充 dependencyTask 的 dependents
function run(projects: Project[], actionName: string, limit: number) {
  // ...
  // project 与 project 对应 task 的下游任务名称
  function getDependencyTaskNames(project: Project): Set<string> {
    const dependencyTaskNames: Set<string> = new Set();
    // 遍历下游项目
    for (const dep of project.dependencyProjects) {
      // 搜集下游任务名
      dependencyTaskNames.add(getTaskName(dep, actionName));
    }
 
    return dependencyTaskNames;
  }
 
  projects.forEach((project) => {
    // 1. 获取当前项目对应的任务 (! 是非空断言操作符 告诉ts编译器,当前表达式的值不会为 null/undefined,可以安全访问)
    const task = tasks.get(getTaskName(project, actionName))!;
    // 2. 获取当前任务对应的下游任务名
    const dependencyTaskNames = getDependencyTaskNames(project);
    // 3. 遍历下游任务名
    for (const dependencyName of dependencyTaskNames) {
      // 4. 取到下游任务(由任务列表 tasks 初始化而来)
      const dependency: Task = tasks.get(dependencyName)!;
      // 5. 补充当前任务的 dependencies (下游)
      task.dependencies.add(dependency);
      // 6. 补充当前取到的下游任务的 dependents (上游)
      dependency.dependents.add(task);
    }
  });
}
task-dependencies

以 X 项目的 build 任务为例,此时的数据结构如下:

{
  name: "@monorepo/x:build",
  dependencies: Set([@monorepo/y:build, @monorepo/z:build]),  // 依赖 Y、Z
  dependents: Set([@monorepo/c:build]),  // 被 C 依赖
  fn: () => Promise<void>
}

并行执行任务

并行执行任务的核心逻辑如下:

  1. 任务队列初始化:将所有任务放入队列 taskQueue 中等待执行。

  2. 任务调度机制

    • 使用 currentActiveTasks 追踪当前正在执行的任务数量
    • 通过 limit 参数控制最大并行任务数
    • getNextTask 函数负责获取下一个可执行的任务(依赖为空的任务)
  3. 任务执行流程

    • _run 函数执行具体任务
    • 任务完成后,更新其上游任务的依赖状态(从 dependencies 中移除当前任务)
    • 自动触发下一轮任务调度
  4. 并发控制

    • start 函数实现并发控制逻辑
    • 使用 Promise.all 等待当前批次的任务完成
    • 动态维护任务队列,确保任务按照依赖顺序执行

这种设计既保证了任务的正确执行顺序,又通过并行执行提高了整体执行效率。

function run(projects: Project[], actionName: string, limit: number) {
  // ...
  const taskQueue: Task[] = [];
  for (const [, task] of tasks) {
    taskQueue.push(task);
  }
  runTasks(taskQueue, limit);
}
 
async function runTasks(taskQueue: Task[], limit: number) {
	// 跟踪当前激活的任务数
  let currentActiveTasks = 0;
	// 在任务队列中查找下一个准备执行的任务
  function getNextTask() {
    for (let i = 0; i < taskQueue.length; i++) {
      const task: Task = taskQueue[i];
			// 如果任务的依赖为空,表示任务准备好执行
      if (task.dependencies.size === 0) {
				// 从队列中移除并返回准备好执行的任务
        return taskQueue.splice(i, 1)[0];
      }
    }
    return null;
  }
 
	// 执行任务,处理任务完成后的清理工作,并继续调度下一个任务
  function _run(task: Task): Promise<void> {
    return task.fn().then(() => {
      console.log("任务成功执行", task.name);
      currentActiveTasks--;
      // 当前任务执行完成,从其上游任务的 dependencies 中移除当前任务
      task.dependents.forEach((dependent: Task) => {
        dependent.dependencies.delete(task);
      });
      // 自动触发下一轮任务调度
      start();
    });
  }
 
  /**
   * 启动任务调度的核心方法
   * 该方法负责并发执行任务,同时确保不超过最大并行限制
   */
  async function start() {
    // 用于临时存储从队列中获取的下一个待执行任务
    let ctask: Task | null = null;
    // 存储当前批次所有执行中任务的 Promise 数组
    const taskPromises: Promise<void>[] = [];
    // 持续获取和执行任务,直到达到并发限制或没有可执行的任务
    while (
      // 检查是否达到最大并发数
      currentActiveTasks < limit && 
      // 尝试获取下一个可执行的任务(依赖为空的任务)
      // 同时将获取到的任务赋值给 ctask
      (ctask = getNextTask())
    ) {
      // 增加当前活跃任务计数
      currentActiveTasks++;
      // 将临时任务变量赋值给具名变量(提高代码可读性)
      const task: Task = ctask;
      // 将任务执行 Promise 添加到 Promise 数组中
      // _run 会执行任务并在完成后自动触发新一轮调度
      taskPromises.push(_run(task));
    }
    // 等待当前批次所有任务完成
    // 注意:这里的 await 主要用于确保所有任务都被正确启动
    // 实际任务的完成和后续调度是由 _run 中的 then 处理的
    await Promise.all(taskPromises);
  }
	// 启动任务调度
  start();
}
 

执行 run(projects, "build", 12),可以按照正确顺序输出结果。

任务成功执行 @monorepo/z:build
任务成功执行 @monorepo/y:build
任务成功执行 @monorepo/x:build
任务成功执行 @monorepo/d:build
任务成功执行 @monorepo/b:build
任务成功执行 @monorepo/a:build
任务成功执行 @monorepo/c:build
 

关键路径长度

上文中的实现使得任务可以按照正确的顺序执行,但是在实际任务执行过程中,最长的任务链限制了整个任务树的执行速度,效率不能得到保证。(实际业务开发中,一般不需要构建 Monorepo 内全部的项目)

关键路径长度:任务距离最远的根节点的距离。以这里的依赖图为例:

  • C 依赖 D、X、Y
  • X 依赖 Y、Z

并行限制:这形成了一条最长的依赖链:C -> X -> Y/Z。即使我们有足够的并行能力(比如 limit=12),某些任务也必须等待其依赖任务完成才能开始。

CriticalPaths
interface Task {
  name: string;
  dependencies: Set<Task>;
  dependents: Set<Task>;
  // 关联路径长度
  criticalPathLength?: number;
  fn: () => Promise<void>;
}
 
function run(projects: Project[], actionName: string, limit: number) {
  // ...
  const taskQueue: Task[] = [];
  for (const [, task] of tasks) {
    // 计算关键路径长度
    task.criticalPathLength = calculateCriticalPaths(task);
    taskQueue.push(task);
  }
  // 基于关键路径长度对任务进行降序排序
  taskQueue.sort((a, b) => b.criticalPathLength! - a.criticalPathLength!);
  runTasks(taskQueue, limit);
}
 
// 计算关键路径长度
function calculateCriticalPaths(task: Task): number {
  // 重复走到某一个任务了 直接返回值
  if (task.criticalPathLength !== undefined) {
    return task.criticalPathLength;
  }
 
  // 如果没有 dependents, 说明我们是 "root",即 app 此类不被依赖的 project
  if (task.dependents.size === 0) {
    task.criticalPathLength = 0;
    return task.criticalPathLength;
  }
 
  // 递归向上取最大值 每次 +1
  const depsLengths: number[] = [];
  task.dependents.forEach((dep) =>
    depsLengths.push(calculateCriticalPaths(dep))
  );
  task.criticalPathLength = Math.max(...depsLengths) + 1;
  return task.criticalPathLength;
}
 

速度瓶颈:

  • 无论并行度多高,这种串行依赖关系都会导致执行时间至少是关键路径上所有任务执行时间的总和
  • 比如:即使其他任务都可以并行,C 任务也必须等待 X->Y->Z 这条链上的所有任务依次执行完成

实际影响:在大型 Monorepo 项目中,如果依赖链很长,即使底层依赖的任务执行很快,整体构建时间仍会受限于这种串行依赖关系。这就是为什么上面提到”最长的任务链限制了整个任务树的执行速度,效率不能得到保证”的原因。

因此,在实际项目中,我们需要:

  1. 尽量避免过长的依赖链(深度)
  2. 合理设计项目结构,减少不必要的依赖关系 (组织方式:结构)
  3. 考虑使用增量构建、缓存等优化手段

思考

  1. 循环依赖的场景如何解决?在实际应用中,随着项目复杂度增加,难免会出现循环依赖的问题,这个时候,依赖关系图,很难找出从谁开始。(强制要求开发者解决)
  2. 实际应用中,依赖层级比较深的情况很常见,如果是最底层的依赖产生了变更,那么必须要将所有的依赖链都构建一遍,如果这个时候使用顺序执行,效率依然很低怎么办?而且,此时依赖链中的每个环节的缓存是不是应该都失效才对?(远程缓存、更精确的缓存失效、优化并行执行、增量构建…)