Skip to content

Commit

Permalink
feat(api): automatically abort running tasks when failFast enabled
Browse files Browse the repository at this point in the history
closes #76
  • Loading branch information
mathpaquette committed May 8, 2023
1 parent ff3a7c5 commit 9ff340f
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 33 deletions.
51 changes: 43 additions & 8 deletions apps/api/src/app.e2e.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,30 +153,61 @@ describe('Runs', () => {
});
});

describe('abort run', () => {
let run: Run;

beforeAll(async () => {
describe('fail task', () => {
it('should abort running tasks when failFast enabled', async () => {
// arrange
const createRunDto = TestDtoUtils.createRunDto();
run = (await createRun(app, createRunDto)).body;
const run: Run = (await createRun(app, createRunDto)).body;
await createTasks(app, run.id, TestDtoUtils.createTasksDto(5));
const startedTask1: StartTaskResponseDto = (await startTask(app, run.id, startTaskDto)).body;
const startedTask2: StartTaskResponseDto = (await startTask(app, run.id, startTaskDto)).body;
// act
const failedTask: Task = (await failTask(app, startedTask1.task.id).expect(200)).body;
const tasks: Task[] = (await getTasks(app, run.id).expect(200)).body;
const abortedTask: Task = tasks.find((x) => x.id === startedTask2.task.id);
// assert
expect(failedTask.status).toBe(TaskStatus.Failed);
expect(abortedTask.status).toBe(TaskStatus.Aborted);
expect(failedTask.run.status).toBe(RunStatus.Failed);
});
});

describe('abort run', () => {
it('should abort run when not ended', async () => {
// arrange
const createRunDto = TestDtoUtils.createRunDto();
const run: Run = (await createRun(app, createRunDto)).body;
// act
run = (await abortRun(app, run.id).expect(200)).body;
const abortedRun: Run = (await abortRun(app, run.id).expect(200)).body;
// assert
expect(run.endedAt).toBeTruthy();
expect(run.status).toBe(RunStatus.Aborted);
expect(abortedRun.endedAt).toBeTruthy();
expect(abortedRun.status).toBe(RunStatus.Aborted);
});

it('should not abort run when already ended', async () => {
// arrange
const createRunDto = TestDtoUtils.createRunDto();
const run: Run = (await createRun(app, createRunDto)).body;
const abortedRun: Run = (await abortRun(app, run.id).expect(200)).body;
// act
const exception = (await abortRun(app, run.id)).body;
// assert
expect(exception.statusCode).toBe(500);
expect(exception.message).toBe("Can't abort already ended run.");
});

it('should abort running tasks', async () => {
// arrange
const createRunDto = TestDtoUtils.createRunDto();
const run: Run = (await createRun(app, createRunDto)).body;
await createTasks(app, run.id, createTasksDto);
const startTaskResponseDto: StartTaskResponseDto = (await startTask(app, run.id, startTaskDto)).body;
// act
const abortedRun: Run = (await abortRun(app, run.id).expect(200)).body;
// assert
const abortedTask = abortedRun.tasks.find((x) => x.id === startTaskResponseDto.task.id);
expect(abortedTask.status).toBe(TaskStatus.Aborted);
});
});

describe('fail run', () => {
Expand Down Expand Up @@ -319,6 +350,10 @@ function getRun(app: INestApplication, runId: number): request.Test {
return request(app.getHttpServer()).get(ApiUrl.createNoPrefix().getRunUrl(runId));
}

function getTasks(app: INestApplication, runId: number): request.Test {
return request(app.getHttpServer()).get(ApiUrl.createNoPrefix().getTasksUrl(runId));
}

function abortRun(app: INestApplication, runId: number): request.Test {
return request(app.getHttpServer()).put(ApiUrl.createNoPrefix().abortRunUrl(runId));
}
Expand Down
7 changes: 6 additions & 1 deletion apps/api/src/runs/run.entity.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Column, CreateDateColumn, Entity, OneToMany, PrimaryGeneratedColumn, UpdateDateColumn } from 'typeorm';
import { RunStatus, TaskPriority, Run, DateUtil, RunParameters, RunInfo } from '@tskmgr/common';
import { DateUtil, Run, RunInfo, RunParameters, RunStatus, TaskPriority, TaskStatus } from '@tskmgr/common';
import { FileEntity } from '../files/file.entity';
import { TaskEntity } from '../tasks/task.entity';

Expand Down Expand Up @@ -90,6 +90,11 @@ export class RunEntity implements Run {
throw new Error(`Can't abort already ended run.`);
}

const runningTasks = this.tasks.filter((x) => x.status === TaskStatus.Running);
for (const runningTask of runningTasks) {
runningTask.abort();
}

const endedAt = new Date();
this.status = RunStatus.Aborted;
this.duration = DateUtil.getDurationInSeconds(this.createdAt, endedAt);
Expand Down
5 changes: 4 additions & 1 deletion apps/api/src/runs/runs.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ export class RunsService {
}

async abort(id: number): Promise<RunEntity> {
const run = await this.runsRepository.findOneBy({ id: id });
const run = await this.runsRepository.findOne({
where: { id: id },
relations: { tasks: true },
});
run.abort();
return this.runsRepository.save(run);
}
Expand Down
11 changes: 11 additions & 0 deletions apps/api/src/tasks/task.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,15 @@ export class TaskEntity implements Task {
this.status = TaskStatus.Failed;
this.duration = DateUtil.getDurationInSeconds(this.startedAt, endedAt);
}

public abort(): void {
if (this.status !== TaskStatus.Running) {
throw new Error(`Task with ${this.status} status can't change to ${TaskStatus.Aborted}`);
}

const endedAt = new Date();
this.endedAt = endedAt;
this.status = TaskStatus.Aborted;
this.duration = DateUtil.getDurationInSeconds(this.startedAt, endedAt);
}
}
71 changes: 48 additions & 23 deletions apps/api/src/tasks/tasks.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { DataSource, Repository } from 'typeorm';
import { TaskEntity } from './task.entity';
import {
CompleteTaskDto,
Expand All @@ -19,7 +19,8 @@ export class TasksService {
public constructor(
@InjectRepository(TaskEntity) private readonly tasksRepository: Repository<TaskEntity>,
@InjectRepository(RunEntity) private readonly runsRepository: Repository<RunEntity>,
@InjectRepository(FileEntity) private readonly filesRepository: Repository<FileEntity>
@InjectRepository(FileEntity) private readonly filesRepository: Repository<FileEntity>,
private dataSource: DataSource
) {}
/**
* Create new tasks in bulk
Expand Down Expand Up @@ -89,19 +90,44 @@ export class TasksService {
return task;
}

/**
* If failFast enabled, set all running tasks to abort status.
* @param taskId
*/
async failTask(taskId: number): Promise<TaskEntity> {
const task = await this.tasksRepository.findOne({
where: { id: taskId },
relations: { run: true },
});
const task = await this.dataSource.transaction(async (manager) => {
const task = await manager.findOne(TaskEntity, {
where: { id: taskId },
relations: { run: true },
lock: {
mode: 'pessimistic_write',
tables: ['task'],
},
});

if (!task) {
throw new Error(`Task id: ${taskId} can't be found.`);
}
if (!task) {
throw new Error(`Task id: ${taskId} can't be found.`);
}

task.fail();
await this.tasksRepository.save(task);
// if failFast enabled, abort all tasks from the run
if (task.run.failFast) {
const runningTasks = await manager.findBy(TaskEntity, {
run: { id: task.run.id },
status: TaskStatus.Running,
});

for (const runningTask of runningTasks) {
runningTask.abort();
}

await manager.save(runningTasks);
}

task.fail();
await manager.save(task);

return task;
});
await this.updateRunStatus(task.run);
return task;
}
Expand Down Expand Up @@ -129,27 +155,26 @@ export class TasksService {
return this.filesRepository.save(fileEntity);
}

private async updateRunStatus(run: RunEntity): Promise<void> {
const allTasks = await this.tasksRepository.find({
where: { run: { id: run.id } },
});

private async updateRunStatus(run: RunEntity): Promise<RunEntity> {
const allTasks = await this.tasksRepository.find({ where: { run: { id: run.id } } });
const completedTasks = allTasks.filter((x) => x.status === TaskStatus.Completed);
const endedTasks = allTasks.filter((x) => x.endedAt);
const failedTasks = allTasks.filter((x) => x.status === TaskStatus.Failed);
const uncompletedTasks = endedTasks.filter((x) => x.status !== TaskStatus.Completed);

// when fail fast enabled, we fail the run as soon we get one failed task
if (run.failFast && failedTasks.length > 0) {
// when fail fast enabled, we fail run as soon as we get one uncompleted task
if (run.failFast && uncompletedTasks.length > 0) {
run.fail();
return this.runsRepository.save(run);
}

if (run.closed && allTasks.length === endedTasks.length) {
if (failedTasks.length > 0) {
run.fail();
} else {
if (completedTasks.length === allTasks.length) {
run.complete();
} else {
run.fail();
}
return this.runsRepository.save(run);
}
await this.runsRepository.save(run);
}

private async getAverageTaskDuration(createTaskDto: CreateTaskDto): Promise<number> {
Expand Down
1 change: 1 addition & 0 deletions libs/common/src/lib/tasks/task-status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ export enum TaskStatus {
Pending = 'PENDING',
Running = 'RUNNING',
Completed = 'COMPLETED',
Aborted = 'ABORTED',
Failed = 'FAILED',
}
23 changes: 23 additions & 0 deletions libs/db/src/migrations/1683512593831-tskmgr.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
const { MigrationInterface, QueryRunner } = require("typeorm");

module.exports = class tskmgr1683512593831 {
name = 'tskmgr1683512593831'

async up(queryRunner) {
await queryRunner.query(`ALTER TYPE "public"."task_status_enum" RENAME TO "task_status_enum_old"`);
await queryRunner.query(`CREATE TYPE "public"."task_status_enum" AS ENUM('PENDING', 'RUNNING', 'COMPLETED', 'ABORTED', 'FAILED')`);
await queryRunner.query(`ALTER TABLE "task" ALTER COLUMN "status" DROP DEFAULT`);
await queryRunner.query(`ALTER TABLE "task" ALTER COLUMN "status" TYPE "public"."task_status_enum" USING "status"::"text"::"public"."task_status_enum"`);
await queryRunner.query(`ALTER TABLE "task" ALTER COLUMN "status" SET DEFAULT 'PENDING'`);
await queryRunner.query(`DROP TYPE "public"."task_status_enum_old"`);
}

async down(queryRunner) {
await queryRunner.query(`CREATE TYPE "public"."task_status_enum_old" AS ENUM('PENDING', 'RUNNING', 'COMPLETED', 'FAILED')`);
await queryRunner.query(`ALTER TABLE "task" ALTER COLUMN "status" DROP DEFAULT`);
await queryRunner.query(`ALTER TABLE "task" ALTER COLUMN "status" TYPE "public"."task_status_enum_old" USING "status"::"text"::"public"."task_status_enum_old"`);
await queryRunner.query(`ALTER TABLE "task" ALTER COLUMN "status" SET DEFAULT 'PENDING'`);
await queryRunner.query(`DROP TYPE "public"."task_status_enum"`);
await queryRunner.query(`ALTER TYPE "public"."task_status_enum_old" RENAME TO "task_status_enum"`);
}
}

0 comments on commit 9ff340f

Please sign in to comment.