Files
cnpmcore/app/repository/TaskRepository.ts
MK (fengmk2) 89f1250927 refactor: use all in one egg v4 (#855)
required https://github.com/eggjs/egg/pull/5654

---------

Signed-off-by: MK (fengmk2) <fengmk2@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-10-28 22:17:01 +08:00

183 lines
5.3 KiB
TypeScript

import assert from 'node:assert/strict';
import { uniq } from 'lodash-es';
import { AccessLevel, Inject, SingletonProto } from 'egg';
import { ModelConvertor } from './util/ModelConvertor.ts';
import { isDuplicateKeyError } from './util/ErrorUtil.ts';
import type { Task as TaskModel } from './model/Task.ts';
import type { HistoryTask as HistoryTaskModel } from './model/HistoryTask.ts';
import { AbstractRepository } from './AbstractRepository.ts';
import type { TaskState, TaskType } from '../../app/common/enum/Task.ts';
import {
Task as TaskEntity,
type TaskUpdateCondition,
} from '../core/entity/Task.ts';
@SingletonProto({
accessLevel: AccessLevel.PUBLIC,
})
export class TaskRepository extends AbstractRepository {
@Inject()
private readonly Task: typeof TaskModel;
@Inject()
private readonly HistoryTask: typeof HistoryTaskModel;
async saveTask(task: TaskEntity): Promise<void> {
if (task.id) {
const model = await this.Task.findOne({ id: task.id });
if (!model) return;
await ModelConvertor.saveEntityToModel(task, model);
} else {
try {
await ModelConvertor.convertEntityToModel(task, this.Task);
} catch (e) {
e.message = `[TaskRepository] insert Task failed: ${e.message}`;
if (isDuplicateKeyError(e)) {
this.logger.warn(e);
const taskModel = await this.Task.findOne({ bizId: task.bizId });
// 覆盖 bizId 相同的 id 和 taskId
if (taskModel) {
task.id = taskModel.id;
task.taskId = taskModel.taskId;
return;
}
// taskModel 可能不存在,遇到数据错误
// 重新将错误抛出。
throw e;
}
throw e;
}
}
}
async idempotentSaveTask(
task: TaskEntity,
condition: TaskUpdateCondition
): Promise<boolean> {
assert.ok(task.id, 'task have no save');
const changes = ModelConvertor.convertEntityToChanges(task, this.Task);
const updateRows = await this.Task.update(
{
taskId: condition.taskId,
attempts: condition.attempts,
},
changes
);
return updateRows === 1;
}
async saveTaskToHistory(task: TaskEntity): Promise<void> {
const model = await this.Task.findOne({ id: task.id });
if (!model) return;
const history = await this.HistoryTask.findOne({ taskId: task.taskId });
if (history) {
await ModelConvertor.saveEntityToModel(task, history);
} else {
await ModelConvertor.convertEntityToModel(task, this.HistoryTask);
}
await model.remove();
}
async updateSpecificVersionsOfWaitingTask(
task: TaskEntity,
specificVersions?: string[]
): Promise<void> {
const model = await this.Task.findOne({ id: task.id });
if (!model || !model.data.specificVersions) return;
const data = model.data;
if (specificVersions) {
const combinedVersions = uniq<string>(
data.specificVersions.concat(specificVersions)
);
data.specificVersions = combinedVersions;
} else {
Reflect.deleteProperty(data, 'specificVersions');
}
await model.update({ data });
}
async findTask(taskId: string) {
const task = await this.Task.findOne({ taskId });
if (task) {
return ModelConvertor.convertModelToEntity(task, TaskEntity);
}
// try to read from history
const history = await this.HistoryTask.findOne({ taskId });
if (history) {
return ModelConvertor.convertModelToEntity(history, TaskEntity);
}
return null;
}
async findTaskByBizId(bizId: string) {
const task = await this.Task.findOne({ bizId });
if (task) {
return ModelConvertor.convertModelToEntity(task, TaskEntity);
}
return null;
}
async findTasks(taskIds: string[]): Promise<TaskEntity[]> {
const tasks = await this.HistoryTask.find({ taskId: { $in: taskIds } });
return tasks.map(task =>
ModelConvertor.convertModelToEntity(task, TaskEntity)
);
}
async findTasksByCondition(where: {
targetName?: string;
state?: TaskState;
type: TaskType;
}): Promise<TaskEntity[]> {
const tasks = await this.Task.find(where);
return tasks.map(task =>
ModelConvertor.convertModelToEntity(task, TaskEntity)
);
}
async findTaskByTargetName(
targetName: string,
type: TaskType,
state?: TaskState
) {
const where: { targetName: string; type: TaskType; state?: TaskState } = {
targetName,
type,
};
if (state) {
where.state = state;
}
const task = await this.Task.findOne(where);
if (task) {
return ModelConvertor.convertModelToEntity(task, TaskEntity);
}
return null;
}
async findTimeoutTasks(taskState: TaskState, timeout: number) {
const timeoutDate = new Date();
timeoutDate.setTime(timeoutDate.getTime() - timeout);
const models = await this.Task.find({
state: taskState,
updatedAt: {
$lt: timeoutDate,
},
}).limit(1000);
return models.map(model =>
ModelConvertor.convertModelToEntity(model, TaskEntity)
);
}
async findTaskByAuthorIpAndType(authorIp: string, type: TaskType) {
const models = await this.Task.find({
type,
authorIp,
}).limit(1000);
return models.map(model =>
ModelConvertor.convertModelToEntity(model, TaskEntity)
);
}
}