添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
悲伤的拐杖  ·  重設 OpenWrt ...·  7 月前    · 
贪玩的西瓜  ·  岚图汽车的梦想与迷雾·  2 年前    · 
知识渊博的帽子  ·  Project Converter ...·  2 年前    · 
跳到主要内容

任务调度(Task)

@midwayjs/task 是为了解决任务系列的模块,例如分布式定时任务、延迟任务调度。例如每日定时报表邮件发送、订单2小时后失效等工作。

分布式定时任务依赖 bull,其通过 redis 进行实现,所以配置中,需要配置额外的 Redis,本地定时任务基于 Cron 模块,不需要额外配置。

安装组件

首先安装 Midway 提供的任务组件:

$ npm install @midwayjs/task@2 -S

configuration.ts 中,引入这个组件:

// src/configuration.ts
import { Configuration } from '@midwayjs/decorator';
import * as task from '@midwayjs/task'; // 导入模块
import { join } from 'path';

@Configuration({
imports: [task],
importConfigs: [join(__dirname, 'config')],
})
export class AutoConfiguration {}

配置

config.default.ts 文件中配置对应的模块信息:

export const taskConfig = {
redis: `redis://127.0.0.1:32768`, //任务依赖redis,所以此处需要加一个redis
prefix: 'midway-task', // 这些任务存储的key,都是midway-task开头,以便区分用户原有redis里面的配置。
defaultJobOptions: {
repeat: {
tz: 'Asia/Shanghai', // Task等参数里面设置的比如(0 0 0 * * *)本来是为了0点执行,但是由于时区不对,所以国内用户时区设置一下。
},
},
};

有账号密码情况:

export const taskConfig = {
redis: {
port: 6379,
host: '127.0.0.1',
password: 'foobared',
}, //此处相当于是ioredis的配置 https://www.npmjs.com/package/ioredis
prefix: 'midway-task', // 这些任务存储的key,都是midway-task开头,以便区分用户原有redis里面的配置。
defaultJobOptions: {
repeat: {
tz: 'Asia/Shanghai', // Task等参数里面设置的比如(0 0 0 * * *)本来是为了0点执行,但是由于时区不对,所以国内用户时区设置一下。
},
},
};

业务代码编写方式

分布式定时任务

import { Provide, Inject, Task } from '@midwayjs/decorator';

@Provide()
export class UserService {
@Inject()
helloService: HelloService;

// 例如下面是每分钟执行一次,并且是分布式任务
@Task({
repeat: { cron: '* * * * *' },
})
async test() {
console.log(this.helloService.getName());
}
}

本地定时任务

import { Provide, Inject, TaskLocal } from '@midwayjs/decorator';

@Provide()
export class UserService {
@Inject()
helloService: HelloService;

// 例如下面是每秒钟执行一次
@TaskLocal('* * * * * *')
async test() {
console.log(this.helloService.getName());
}
}

手动触发任务

任务的定义,通过 @Queue 装饰器,定义一个任务类,内必须含有 execute 方法,并且是 async 的。为什么需要是 async 的因为,这个代码,是为了分布式,相当于有个内部的任务调度过程。

import { Provide, Inject, Queue } from '@midwayjs/decorator';

@Queue()
@Provide()
export class HelloTask {
@Inject()
service;

async execute(params) {
console.log(params);
}
}

触发:

import { QueueService } from '@midwayjs/task';
import { Provide, Inject } from '@midwayjs/decorator';

@Provide()
export class UserTask {
@Inject()
service;

@Inject()
queueService: QueueService;

async execute(params) {
// 3秒后触发分布式任务调度。
const xxx = await this.queueService.execute(HelloTask, params, { delay: 3000 });
}
}

这样,就相当于是 3 秒后,触发 HelloTask 这个任务。

设置进度

例如我们在做音视频或者发布这种比较耗时的任务的时候,我们希望能设置进度。

相当于第二个参数,将 bull 的 job 传递给了用户。用户可以通过 job.progress 来设置进度。

然后查询进度:

import { QueueService } from '@midwayjs/task';
import { Provide, Controller, Get } from '@midwayjs/decorator';

@Provide()
@Controller()
export class HelloController {
@Inject()
queueService: QueueService;

@Get('/get-queue')
async getQueue(@Query() id: string) {
return await this.queueService.getClassQueue(TestJob).getJob(id);
}
}

任务的相关内容

let job = await this.queueService.getClassQueue(TestJob).getJob(id);

然后 job 上面有类似停止的方法,或者查看进度的方法。

启动就触发

有朋友由于只有一台机器,希望重启后立马能执行一下对应的定时任务。

import { Context, ILifeCycle, IMidwayBaseApplication, IMidwayContainer } from '@midwayjs/core';
import { Configuration } from '@midwayjs/decorator';
import { Queue } from 'bull';
import { join } from 'path';
import * as task from '@midwayjs/task';
import { QueueService } from '@midwayjs/task';

@Configuration({
imports: [task],
importConfigs: [join(__dirname, './config')],
})
export class ContainerConfiguration implements ILifeCycle {
async onReady(container: IMidwayContainer, app?: IMidwayBaseApplication<Context>): Promise<void> {
// Task这块的启动后立马执行
let result: any = await container.getAsync(QueueService);
let job: Queue = result.getQueueTask(`HelloTask`, 'task'); // 此处第一个是你任务的类名,第二个任务的名字也就是装饰器Task的函数名
job.add({}, { delay: 0 }); // 表示立即执行。
}
}

运维

日志

在 Midway Task Component 上面,增加了两个日志:

  • midway-task.log
  • midway-task-error.log

分别在 task、localTask、queue 触发开始和结束的时候会打印对应的日志。

分布式的 Task 触发日志:

logger.info(`task start.`);

// 异常情况:
logger.error(`${e.stack}`);

logger.info(`task end.`);

非分布式的 LocalTask 触发日志:

logger.info(`local task start.`);

// 异常情况:
// logger.error(`${e.stack}`)

logger.info(`local task end.`);

任务队列的触发日志:

logger.info(`queue process start.`);

// 异常情况:
// logger.error(`${e.stack}`)

logger.info(`queue process end.`);

排查问题链路:

用户可以搜索这个相同的 id,找到同一次请求的日志。 为了方便用户在自己的业务代码中串联对应的日志,我在 ctx 上面挂了 traceId 变量。

例如异常情况: