From 9ff340fd702f026956cce9e2c045e09c294531f2 Mon Sep 17 00:00:00 2001 From: Math Paquette Date: Fri, 5 May 2023 07:24:05 -0400 Subject: [PATCH] feat(api): automatically abort running tasks when failFast enabled closes #76 --- apps/api/src/app.e2e.spec.ts | 51 ++++++++++--- apps/api/src/runs/run.entity.ts | 7 +- apps/api/src/runs/runs.service.ts | 5 +- apps/api/src/tasks/task.entity.ts | 11 +++ apps/api/src/tasks/tasks.service.ts | 71 +++++++++++++------ libs/common/src/lib/tasks/task-status.ts | 1 + .../db/src/migrations/1683512593831-tskmgr.js | 23 ++++++ 7 files changed, 136 insertions(+), 33 deletions(-) create mode 100644 libs/db/src/migrations/1683512593831-tskmgr.js diff --git a/apps/api/src/app.e2e.spec.ts b/apps/api/src/app.e2e.spec.ts index b53d23a..d783ade 100644 --- a/apps/api/src/app.e2e.spec.ts +++ b/apps/api/src/app.e2e.spec.ts @@ -153,30 +153,61 @@ describe('Runs', () => { }); }); - describe('abort run', () => { - let run: Run; - - beforeAll(async () => { + describe('fail task', () => { + it('should abort running tasks when failFast enabled', async () => { // arrange const createRunDto = TestDtoUtils.createRunDto(); - run = (await createRun(app, createRunDto)).body; + const run: Run = (await createRun(app, createRunDto)).body; + await createTasks(app, run.id, TestDtoUtils.createTasksDto(5)); + const startedTask1: StartTaskResponseDto = (await startTask(app, run.id, startTaskDto)).body; + const startedTask2: StartTaskResponseDto = (await startTask(app, run.id, startTaskDto)).body; + // act + const failedTask: Task = (await failTask(app, startedTask1.task.id).expect(200)).body; + const tasks: Task[] = (await getTasks(app, run.id).expect(200)).body; + const abortedTask: Task = tasks.find((x) => x.id === startedTask2.task.id); + // assert + expect(failedTask.status).toBe(TaskStatus.Failed); + expect(abortedTask.status).toBe(TaskStatus.Aborted); + expect(failedTask.run.status).toBe(RunStatus.Failed); }); + }); + describe('abort run', () => { it('should abort run when not ended', async () => { + // arrange + const createRunDto = TestDtoUtils.createRunDto(); + const run: Run = (await createRun(app, createRunDto)).body; // act - run = (await abortRun(app, run.id).expect(200)).body; + const abortedRun: Run = (await abortRun(app, run.id).expect(200)).body; // assert - expect(run.endedAt).toBeTruthy(); - expect(run.status).toBe(RunStatus.Aborted); + expect(abortedRun.endedAt).toBeTruthy(); + expect(abortedRun.status).toBe(RunStatus.Aborted); }); it('should not abort run when already ended', async () => { + // arrange + const createRunDto = TestDtoUtils.createRunDto(); + const run: Run = (await createRun(app, createRunDto)).body; + const abortedRun: Run = (await abortRun(app, run.id).expect(200)).body; // act const exception = (await abortRun(app, run.id)).body; // assert expect(exception.statusCode).toBe(500); expect(exception.message).toBe("Can't abort already ended run."); }); + + it('should abort running tasks', async () => { + // arrange + const createRunDto = TestDtoUtils.createRunDto(); + const run: Run = (await createRun(app, createRunDto)).body; + await createTasks(app, run.id, createTasksDto); + const startTaskResponseDto: StartTaskResponseDto = (await startTask(app, run.id, startTaskDto)).body; + // act + const abortedRun: Run = (await abortRun(app, run.id).expect(200)).body; + // assert + const abortedTask = abortedRun.tasks.find((x) => x.id === startTaskResponseDto.task.id); + expect(abortedTask.status).toBe(TaskStatus.Aborted); + }); }); describe('fail run', () => { @@ -319,6 +350,10 @@ function getRun(app: INestApplication, runId: number): request.Test { return request(app.getHttpServer()).get(ApiUrl.createNoPrefix().getRunUrl(runId)); } +function getTasks(app: INestApplication, runId: number): request.Test { + return request(app.getHttpServer()).get(ApiUrl.createNoPrefix().getTasksUrl(runId)); +} + function abortRun(app: INestApplication, runId: number): request.Test { return request(app.getHttpServer()).put(ApiUrl.createNoPrefix().abortRunUrl(runId)); } diff --git a/apps/api/src/runs/run.entity.ts b/apps/api/src/runs/run.entity.ts index 8904060..6e0d4b7 100644 --- a/apps/api/src/runs/run.entity.ts +++ b/apps/api/src/runs/run.entity.ts @@ -1,5 +1,5 @@ import { Column, CreateDateColumn, Entity, OneToMany, PrimaryGeneratedColumn, UpdateDateColumn } from 'typeorm'; -import { RunStatus, TaskPriority, Run, DateUtil, RunParameters, RunInfo } from '@tskmgr/common'; +import { DateUtil, Run, RunInfo, RunParameters, RunStatus, TaskPriority, TaskStatus } from '@tskmgr/common'; import { FileEntity } from '../files/file.entity'; import { TaskEntity } from '../tasks/task.entity'; @@ -90,6 +90,11 @@ export class RunEntity implements Run { throw new Error(`Can't abort already ended run.`); } + const runningTasks = this.tasks.filter((x) => x.status === TaskStatus.Running); + for (const runningTask of runningTasks) { + runningTask.abort(); + } + const endedAt = new Date(); this.status = RunStatus.Aborted; this.duration = DateUtil.getDurationInSeconds(this.createdAt, endedAt); diff --git a/apps/api/src/runs/runs.service.ts b/apps/api/src/runs/runs.service.ts index 847945e..b25e428 100644 --- a/apps/api/src/runs/runs.service.ts +++ b/apps/api/src/runs/runs.service.ts @@ -63,7 +63,10 @@ export class RunsService { } async abort(id: number): Promise { - const run = await this.runsRepository.findOneBy({ id: id }); + const run = await this.runsRepository.findOne({ + where: { id: id }, + relations: { tasks: true }, + }); run.abort(); return this.runsRepository.save(run); } diff --git a/apps/api/src/tasks/task.entity.ts b/apps/api/src/tasks/task.entity.ts index 0d34771..c489279 100644 --- a/apps/api/src/tasks/task.entity.ts +++ b/apps/api/src/tasks/task.entity.ts @@ -111,4 +111,15 @@ export class TaskEntity implements Task { this.status = TaskStatus.Failed; this.duration = DateUtil.getDurationInSeconds(this.startedAt, endedAt); } + + public abort(): void { + if (this.status !== TaskStatus.Running) { + throw new Error(`Task with ${this.status} status can't change to ${TaskStatus.Aborted}`); + } + + const endedAt = new Date(); + this.endedAt = endedAt; + this.status = TaskStatus.Aborted; + this.duration = DateUtil.getDurationInSeconds(this.startedAt, endedAt); + } } diff --git a/apps/api/src/tasks/tasks.service.ts b/apps/api/src/tasks/tasks.service.ts index 2b9df3d..9ecf952 100644 --- a/apps/api/src/tasks/tasks.service.ts +++ b/apps/api/src/tasks/tasks.service.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; -import { Repository } from 'typeorm'; +import { DataSource, Repository } from 'typeorm'; import { TaskEntity } from './task.entity'; import { CompleteTaskDto, @@ -19,7 +19,8 @@ export class TasksService { public constructor( @InjectRepository(TaskEntity) private readonly tasksRepository: Repository, @InjectRepository(RunEntity) private readonly runsRepository: Repository, - @InjectRepository(FileEntity) private readonly filesRepository: Repository + @InjectRepository(FileEntity) private readonly filesRepository: Repository, + private dataSource: DataSource ) {} /** * Create new tasks in bulk @@ -89,19 +90,44 @@ export class TasksService { return task; } + /** + * If failFast enabled, set all running tasks to abort status. + * @param taskId + */ async failTask(taskId: number): Promise { - const task = await this.tasksRepository.findOne({ - where: { id: taskId }, - relations: { run: true }, - }); + const task = await this.dataSource.transaction(async (manager) => { + const task = await manager.findOne(TaskEntity, { + where: { id: taskId }, + relations: { run: true }, + lock: { + mode: 'pessimistic_write', + tables: ['task'], + }, + }); - if (!task) { - throw new Error(`Task id: ${taskId} can't be found.`); - } + if (!task) { + throw new Error(`Task id: ${taskId} can't be found.`); + } - task.fail(); - await this.tasksRepository.save(task); + // if failFast enabled, abort all tasks from the run + if (task.run.failFast) { + const runningTasks = await manager.findBy(TaskEntity, { + run: { id: task.run.id }, + status: TaskStatus.Running, + }); + + for (const runningTask of runningTasks) { + runningTask.abort(); + } + + await manager.save(runningTasks); + } + + task.fail(); + await manager.save(task); + return task; + }); await this.updateRunStatus(task.run); return task; } @@ -129,27 +155,26 @@ export class TasksService { return this.filesRepository.save(fileEntity); } - private async updateRunStatus(run: RunEntity): Promise { - const allTasks = await this.tasksRepository.find({ - where: { run: { id: run.id } }, - }); - + private async updateRunStatus(run: RunEntity): Promise { + const allTasks = await this.tasksRepository.find({ where: { run: { id: run.id } } }); + const completedTasks = allTasks.filter((x) => x.status === TaskStatus.Completed); const endedTasks = allTasks.filter((x) => x.endedAt); - const failedTasks = allTasks.filter((x) => x.status === TaskStatus.Failed); + const uncompletedTasks = endedTasks.filter((x) => x.status !== TaskStatus.Completed); - // when fail fast enabled, we fail the run as soon we get one failed task - if (run.failFast && failedTasks.length > 0) { + // when fail fast enabled, we fail run as soon as we get one uncompleted task + if (run.failFast && uncompletedTasks.length > 0) { run.fail(); + return this.runsRepository.save(run); } if (run.closed && allTasks.length === endedTasks.length) { - if (failedTasks.length > 0) { - run.fail(); - } else { + if (completedTasks.length === allTasks.length) { run.complete(); + } else { + run.fail(); } + return this.runsRepository.save(run); } - await this.runsRepository.save(run); } private async getAverageTaskDuration(createTaskDto: CreateTaskDto): Promise { diff --git a/libs/common/src/lib/tasks/task-status.ts b/libs/common/src/lib/tasks/task-status.ts index ef9a0d2..83c0d70 100644 --- a/libs/common/src/lib/tasks/task-status.ts +++ b/libs/common/src/lib/tasks/task-status.ts @@ -2,5 +2,6 @@ export enum TaskStatus { Pending = 'PENDING', Running = 'RUNNING', Completed = 'COMPLETED', + Aborted = 'ABORTED', Failed = 'FAILED', } diff --git a/libs/db/src/migrations/1683512593831-tskmgr.js b/libs/db/src/migrations/1683512593831-tskmgr.js new file mode 100644 index 0000000..ea8e697 --- /dev/null +++ b/libs/db/src/migrations/1683512593831-tskmgr.js @@ -0,0 +1,23 @@ +const { MigrationInterface, QueryRunner } = require("typeorm"); + +module.exports = class tskmgr1683512593831 { + name = 'tskmgr1683512593831' + + async up(queryRunner) { + await queryRunner.query(`ALTER TYPE "public"."task_status_enum" RENAME TO "task_status_enum_old"`); + await queryRunner.query(`CREATE TYPE "public"."task_status_enum" AS ENUM('PENDING', 'RUNNING', 'COMPLETED', 'ABORTED', 'FAILED')`); + await queryRunner.query(`ALTER TABLE "task" ALTER COLUMN "status" DROP DEFAULT`); + await queryRunner.query(`ALTER TABLE "task" ALTER COLUMN "status" TYPE "public"."task_status_enum" USING "status"::"text"::"public"."task_status_enum"`); + await queryRunner.query(`ALTER TABLE "task" ALTER COLUMN "status" SET DEFAULT 'PENDING'`); + await queryRunner.query(`DROP TYPE "public"."task_status_enum_old"`); + } + + async down(queryRunner) { + await queryRunner.query(`CREATE TYPE "public"."task_status_enum_old" AS ENUM('PENDING', 'RUNNING', 'COMPLETED', 'FAILED')`); + await queryRunner.query(`ALTER TABLE "task" ALTER COLUMN "status" DROP DEFAULT`); + await queryRunner.query(`ALTER TABLE "task" ALTER COLUMN "status" TYPE "public"."task_status_enum_old" USING "status"::"text"::"public"."task_status_enum_old"`); + await queryRunner.query(`ALTER TABLE "task" ALTER COLUMN "status" SET DEFAULT 'PENDING'`); + await queryRunner.query(`DROP TYPE "public"."task_status_enum"`); + await queryRunner.query(`ALTER TYPE "public"."task_status_enum_old" RENAME TO "task_status_enum"`); + } +}