OPEN_MR 任务用于在 MaxCompute 的 MapReduce 编程接口(Java API)基础上实现的数据处理程序的周期运行,使用示例请参见
创建 OPEN_MR 任务
。
MaxCompute 提供了 MapReduce 编程接口,您可以使用 MapReduce 提供的接口(Java API)编写 MapReduce 程序处理 MaxCompute 中的数据,并打包成为 JAR 等类型的资源文件上传到 DataWorks 中,然后配置 OPEN_MR 节点任务。
MaxCompute 提供 MapReduce 编程接口,您可以使用 MapReduce 提供的接口(Java API)编写 MapReduce 程序处理 MaxCompute 中的数据,您可以通过创建 ODPS_MR 类型节点的方式在任务调度中使用,使用示例请参见
ODPS_MR 任务
。
ODPS_SQL 任务支持您直接在 Web 端编辑和维护 SQL 代码,并可方便地调试运行和协作开发。DataWorks 还支持代码内容的版本管理和上下游依赖自动解析等功能,使用示例请参见
新建任务
。
DataWorks 默认使用 MaxCompute 的 project 作为开发生产空间,因此 ODPS_SQL 节点的代码内容遵循 MaxCompute SQL 的语法。MaxCompute SQL 采用的是类似于 Hive 的语法,可以看作是标准 SQL 的子集,但不能因此简单地把 MaxCompute SQL 等价成一个数据库,它在很多方面并不具备数据库的特征,如事务、主键约束、索引等。
具体的 MaxCompute SQL 语法请参见
SQL 概要
。
数据同步节点任务是阿里云数加平台对外提供的稳定高效、弹性伸缩的数据同步云服务。您通过数据同步节点可以轻松地将业务系统数据同步到 MaxCompute 上来。详情请参见
创建同步任务
。
机器学习节点用来调用机器学习平台中构建的任务,并按照节点配置进行调度生产。详情请参见
机器学习任务
。
注意:
只有在机器学习平台创建并保存的实验,在 DataWorks 中的机器学习节点中才能选择该实验。
Shell 节点支持标准的 Shell 语法,不支持交互式语法,详情请参见
Shell 任务
。
虚拟节点属于控制类型节点,它不产生任何数据的空跑节点,常用于工作流统筹节点的根节点,虚节点任务详情请参见
虚节点任务
。
注意:
工作流里最终输出表有多个分支输入表,且这些输入表没有依赖关系时便经常用到虚拟节点。
示例如下:
输出表由 3 个数据同步任务导入的源表经过 ODPS_SQL 任务加工产出,这 3 个数据同步任务没有依赖关系,ODPS_SQL 任务需要依赖 3 个同步任务,则工作流如下图所示:
用一个虚拟节点作为工作流起始根节点,3 个数据同步任务依赖虚拟节点,ODPS_SQL 加工任务依赖 3 个同步任务 。
二、工作流任务
一个节点任务,可以完成一件事;一个工作流任务,可以完成一个流程。工作流任务是节点任务的集合,一个工作流任务中,最多可以创建
30个
节点任务。请根据您的业务需求,合理选择节点类型,组合完成一个工作流任务。
1)进入
数据开发
页面,单击
新建
,选择
新建任务
。如下图所示:
2)填写新建任务弹出框中的信息。如下图所示:
选择任务类型为
工作流任务
,调度类型为
周期调度
。
3)单击
创建
,即跳转到工作流设计器页面。
您可在工作流设计器中根据自身的需求,创建对应的任务,任务类型请参见
任务类型
。
工作流中的节点任务是会依次运行的。
如上:test_01运行完以后才会运行test_02,如果test_01失败了以后,test_02就不会运行了。
三、虚节点任务
虚拟节点属于控制类型节点,它是不产生任何数据的空跑节点,常用于工作流统筹节点的根节点。
注意:
工作流中最终输出表有多个分支输入表,且这些输入表没有依赖关系时便经常用到虚拟节点。
示例如下:
输出表由 3 个数据同步任务导入的源表经过 ODPS_SQL 任务加工产出,这 3 个数据同步任务没有依赖关系,ODPS_SQL 任务需要依赖 3 个同步任务,则工作流如下图所示:
用一个虚拟节点作为工作流起始根节点,3 个数据同步任务依赖虚拟节点,ODPS_SQL 加工任务依赖 3 个同步任务。
1)进入
数据开发
页面,单击
新建
,选择
新建任务
。
2)填写新建任务弹出框中的信息。如下图所示:
选择任务类型为
工作流任务
,调度类型为
周期调度
。
3)单击
创建
,即可跳转到工作流设计器页面。
4)双击
节点组件
中的
虚节点
。
5)输入节点名后,单击
创建
,得到如下虚节点。
上一节创建了工作流 dataworks1 ,工作流中只有一个虚节点任务。
1)单击
测试运行
。
2)单击
周期任务运行提醒
弹出框中的
确定
。
3)单击
测试运行
弹出框中的
运行
。
1)单击
工作流任务测试运行
弹出框中的
前往运维中心
。
2)双击工作流名称,进入到工作流内。
进入工作流后,可以看到工作流内节点的运行情况。
3)选中 start 任务,右键单击
查看节点运行日志
。
任务日志提示:
当前实例,没有产生日志信息
。
出现此情况的原因:虚节点任务不会真正的执行,等到虚节点运行的时候,便会直接被置为成功,所以虚节点没有日志信息。
四、数据同步任务
目前数据同步任务支持的数据源类型包括:MaxCompute、RDS(MySQL、SQL Server、PostgreSQL)、Oracle、FTP、AnalyticDB、OSS、DRDS,更多支持的数据源请参见
支持数据源类型
。
本文以 RDS 数据同步至 MaxCompute 为例,详细说明如何进行数据同步任务。
创建 MaxCompute 表的详细操作请参见
创建表
。
注意:
新建数据源需项目管理员角色才能够创建。
当 RDS 数据源测试连通性不通时,需要到自己的 RDS 上添加数据同步机器 IP 白名单:
11.192.97.82,11.192.98.76,10.152.69.0/24,10.153.136.0/24,10.143.32.0/24,120.27.160.26,10.46.67.156,120.27.160.81,10.46.64.81,121.43.110.160,10.117.39.238,121.43.112.137,10.117.28.203,118.178.84.74,10.27.63.41,118.178.56.228,10.27.63.60,118.178.59.233,10.27.63.38,118.178.142.154,10.27.63.15,100.64.0.0/8
注意:
若使用自定义资源组调度 RDS 的数据同步任务,必须把自定义资源组的机器 IP 也加到 RDS 的白名单中。
① 以开发者身份进入
阿里云数加平台
>
DataWorks(数据工场)
>
管理控制台
页面,单击项目操作栏中的
进
入工作区
。
② 单击顶部菜单栏中的
数据集成
,导航至
数据源
页面。
③ 单击右上角的
新增数据源
。
④ 在新增数据源弹出框中填写相关配置项。
上图中的配置项具体说明如下:
RDS 实例购买者 ID
该 MySQL 数据源的 RDS 实例购买者 ID。
若选择 JDBC 形式来配置数据源,其 JDBC 连接信息,格式为:jdbc:mysql://IP:Port/database。
该数据源对应的数据库名。
用户名/密码
数据库对应的用户名和密码。
⑤ 单击
测试连通性
。
⑥ 若测试连通性成功,单击
保存
即可。
若测试连通性失败,请根据自身情况参见:
ECS 上自建的数据库测试连通性失败
或
RDS 数据源测试连通性不通
。
关于其他类型(MaxCompute、RDS、Oracle、FTP、AnalyticDB、OSS、DRDS)数据源的配置,详见
数据源配置
。
① 单击
数据开发
页面工具栏中的
新建任务
。
② 填写新建任务弹出框中的各配置项。
此处以节点任务为例,若节点需要每日自动调度运行,调度类型选择
周期调度
,然后在节点属性中配置调度周期。
③ 单击
创建
。
同步任务节点包括
选择来源
、
选择目标
、
字段映射
、
通道控制
四大配置项。
① 选择来源选择
数据源
和
数据表
。
数据过滤:可参考相应的 SQL 语法填写 where 过滤语句(不需要填写 where 关键字),该过滤条件将作为增量同步的条件。
where 条件即针对源头数据筛选条件,根据指定的 column、table、where 条件拼接 SQL 进行数据抽取。利用 where 条件可进行全量同步和增量同步,具体说明如下:
i)全量同步:
第一次做数据导入时通常为全量导入,可不用设置 where 条件。
ii)增量同步:
增量导入在实际业务场景中,往往会选择当天的数据进行同步,通常需要编写 where 条件语句,请先确认表中描述增量字段(时间戳)为哪一个。如 tableA 描述增量的字段为 create_time,那么在 where 条件中编写 create_time>${yesterday},在参数配置中为其参数赋值即可。其中更多内置参数的使用方法,请参见
系统调度参数
。
② 若数据同步任务是 RDS/Oracle/MaxCompute,在该页面中会有切分键配置。
切分键:
只支持类型为整型的字段
。
读取数据时,根据配置的字段进行数据分片,实现并发读取,可提升数据同步效率。只有同步任务是 RDS/Oracle 数据导入至 MaxCompute 时,才显示切分键配置项。
注意:
若源头为 Mysql 数据源,则数据同步任务还支持分库分表模式的数据导入(前提是无论数据存储在同一数据库还是不同数据库,表结构必须是一致的)。
③ 分库分表可支持如下场景:
i)同库多表:单击搜索表,添加需要同步的多张表即可。
ii)分库多表:首先单击添加选择源库,再单击搜索表来添加表。
④ 选择目标
单击
快速建表
可将源头表的建表语句转化为符合 MaxCompute SQL 语法规范的 DDL 语句新建目标表。选择后单击
下一步
。
i)分区信息:分区是为了便于查询部分数据引入的特殊列,指定分区便于快速定位到需要的数据。此处的自定义参数是将昨天的日期做为这个分区的值,分区值支持常量和变量,更多自定义参数请参见
参数配置
。
ii)清理规则:
a. 写入前清理已有数据:导数据之前,清空表或者分区的所有数据,相当于 insert overwrite。
b. 写入前保留已有数据:导数据之前不清理任何数据,每次运行数据都是追加进去的,相当于 insert into。
⑤ 在参数配置中为参数赋值,如下图所示:
⑥ 映射字段
需对字段映射关系进行配置,左侧
源头表字段
和右侧
目标表字段
为一一对应的关系。
增加/删除:鼠标 Hover 上每一行,单击删除图标可以删除当前字段。单击
添加一行
可单个增加字段,当数据库类型是 MaxCompute 时,可以将分区列的列名,作为添加一行的值,这样可以在同步的时候,将分区列也同步过去。
自定义变量和常量的写入方法:
如果需要把常量或者变量导入 MaxCompute 中表的某个字段,只需要单击插入按钮,然后输入常量或者变量的值,并且用英文单引号包起来即可。如变量 ${yesterday},在参数配置组件配置给变量赋值如 yesterday=$[yyyymmdd]。具体时间参数请参见
系统调度参数
。
⑦ 通道控制
通道控制
用来配置作业速率上限和脏数据检查规则,如下图所示:
a. 作业速率上限:即配置当前数据同步任务速率,支持最大为 20MB/s(通道流量度量值是数据同步任务本身的度量值,不代表实际网卡流量)。
b. 作业并发数:作业并发数必须配置了切分建以后才有效。作业并发数的上限是作业速率的上限,比如说作业速率上限是 10M,作业并发数最大可以选择 10。
c. 当错误纪录数:写入 RDS、Oracle 时可用,即脏数据数量,超过所配置的个数时,该数据同步任务结束。
注意:
我们不建议作业并发数配置过大,作业并发数越大,所消耗的资源也越多,很有可能会导致您别的任务会产生等待资源的情况,影响其他任务运行。
⑧ 预览保存
完成以上配置后,单击
下一步
即可预览,如若无误,单击
保
存
。如下图所示:
① 单击顶部菜单栏中的
提交
。
② 提交成功后单击
测试运行
。
因为本示例中源表里 cratetime 有时间为 2017-01-04 ,而配置中用到调度时间参数 $[yyyy-mm-dd-1] 和 ${bdp.system.bizdate},为了能在测试的时候将 cratetime 赋值为 2017-01-04,目标表的分区值为 20170104,测试的时候业务时间要选择 2017-01-04。如下图所示:
③ 测试任务触发成功后,单击
前往运维中心
即可查看任务进度。
④ 查看同步数据。
五、OPEN MR
注意:
OPEN_MR 不支持引用资源表,不支持多个 Reduce 等。
ODPS MR 比 OPEN MR 的功能更加强大
,ODPS MR 支持添加更多的Jar包资源、表资源等;更好的结合ODPS MR的原生语法,无需在界面上进行过多配置,可直接使用Maxcompute MR的语法调用,只需在配置任务时将Jar包资源先引用即可,详细的ODPS MR节点配置,可参考
ODPS_MR
,
建议优先使用ODPS MR
。
本示例将以经典的 WordCount 示例来介绍如何在阿里云大数据平台使用 MaxCompute MapReduce。 WordCount 示例的详细内容请参见
WordCount 示例
。
本示例涉及的数据表说明如下:
1)输入数据表:wc_in 用于存储 word 列表。
2)输出数据表:wc_out 用于存放通过 MR 程序处理后的结果集。
根据
创建表
中的操作新建表 wc_in、wc_out。
为感知 OPEN MR 程序在大数据平台上运行的结果,需向输入表(wc_in 的分区 pt=20170101)中插入示例数据。
① 进入
数据开发
页面,导航至
新建
>
新建脚本文件
。
② 填写
新建脚本文件
弹出框中的各配置项,单击
提交
。
③ 在 MaxCompute 代码编辑器中编写 MaxCompute SQL 并运行代码。更多 SQL 语法请参见
SQL 概要
。MaxCompute SQL 脚本如下所示:
drop table if exists dual;
create table dual(id bigint);
insert overwrite table dual select count(*)from dual;
insert overwrite table wc_in partition(pt=20170101) select * from (
select 'project','val_pro' from dual
union all
select 'problem','val_pro' from dual
union all
select 'package','val_a' from dual
union all
select 'pad','val_a' from dual
) b;
您在使用 OPEN_MR 节点前,需在本地基于 MaxCompute MapReduce 编程框架的 WordCount 示例代码,根据自身需求进行编写,然后打成 Jar 包,以资源的方式添加到大数据平台。MR 开发的相关内容请参见
大数据计算服务 MaxCompute 帮助文档
。本示例代码详情请参见
WordCount.java
附件。
无论是在 MaxCompute console 还是阿里云大数据平台中运行,都需要执行 Jar 命令运行。因此,先打包生成
WordCount.jar
(可以通过 Eclipse 的 Export 功能打包,也可以通过 ant 或其他工具生成),再上传至 MaxCompute 资源。
① 进入
数据开发
页面的
资源管理
模块,右键单击目录选择
上传资源
。
② 填写
资源上传
弹出框的各配置项,注意勾选
上传为 ODPS 资源
。
③ 单击
提交
。
新建的 MaxCompute MapReduce 程序以资源方式上传至 MaxCompute,现需新建 OPEN_MR 节点来调用执行。
① 进入
数据开发
页面,导航至
新建
>
新建脚本文件
。
② 填写
新建任务
弹出框的各配置项。
配置项说明:
i)任务名称:wordcount 示例。
ii)描述:wordcount 示例。
③ 单击
创建
。
④ 在 OPEN_MR 配置页面进行配置。
配置项说明:
i)MRJar 包:必选项,即本节点需要运行的主 jar 资源包。
ii)资源:必填项,本节点需要运行的主 jar 资源以及调用到的其他资源列表。
iii)输入/输出表:本示例中用到的是本项目的分区表,且分区值为每日自动调度的业务日期,因此分区用变量(系统调度参数)表示 。
⑤ 参数配置,由于本示例分区用系统参数表示,没有用自定义变量,所以此处无需而外配置:
注意:
更多参数变量使用请参见
系统调度参数
。
⑥ 单击
保存
、
提交
,切换到工作流的流程面板中,单击
测试运行
。
注意:
测试运行时由于示例表只有分区 pt=20170101 有数据,所以业务时间选择 2017-01-01,这样系统参数才会把输入/输出表的分区替换成 20170101。
六、SHELL任务
SHELL 任务支持标准 SHELL 语法,不支持交互性语法。SHELL 任务可以在默认资源组上运行,若需要访问 IP/域名,请在
项目管理-项目配置
下将 IP/域名添加到白名单中。
① 进入数据开发页面,单击
新建
,选择
新建任务
。
② 填写
新建任务
弹出框中各配置项。此示例中任务类型选择
节点任务
,类型选择
SHELL
,调度类型选择
周期调度
。如下图所示:
③ 填写完成后,单击
创建
。
若想在 SHELL 中调用
系统调度参数
,如下所示:
SHELL 语句如下:
echo "$1 $2 $3"
注意:
参数1 参数2 …..多个参数之间用空格分隔。更多系统调度参数的使用,请参见
调度属性配置 - 系统参数
。
3. 测试 SHELL 任务
测试 SHELL 任务有两种方式:页面直接
运
行
和
测试运行
。
两种运行方式的区别,如下所示:
i)
运行:
在页面直接单击
运行
时,任务是执行在默认资源组上的;若任务运行时,需要准备运行环境,那么建议使用测试运行。
比如:通过 SHELL 调用 pyodps 时,您就需要将 pyodps 需要的依赖给准备在您的机器上,将任务指定在您的机器上运行。
ii)
测试运行:
测试运行的任务会生成实例,支持在指定的资源组上运行;若任务运行时,需要准备运行环境,那么建议使用测试运行。
注意:
如何将任务运行在指定的资源组上呢?可以进入
运维中心-任务管理
页面,选中任务单击
修改资源组
,这样任务便会运行在指定的机器上。
① 在页面单击
运行
后,会出现下图所示的提示:
② 单击
去设置
,跳转到如下页面,单击
添加
:
注意:
在添加白名单时,可以输入域名/IP 和端口,添加完毕的 IP/域名,默认资源组可直接访问。
③ 切换回
数据开发
页面,单击
运行
,查看结果。
注意:系统调度参数只有在调度系统中才会被替换,因为页面单击
运
行
时,没有提交到调度系统中,所以 $3 被替换成了 0。
2)测试运行
① 在页面单击
测试运行
后,会出现下图所示的提示:
② 单击
确定
后,系统会检验您任务是否保存,且是否提交到调度系统;若未保存/提交,会弹出提示框,引导您保存/提交任务。
③ 单击
确定提交
, 出现测试运行弹出框,选择业务日期:
④ 单击
运行
,会提示您测试运行已经触发成功,可以前往运维中心查看进度。
⑤ 单击
前往运维中心
,进入运维中心页面后,右键单击任务名,选择
查看节点运行日志
。
日志信息如下:
从日志中可以看出,系统调度参数已经被替换。
注意:
为什么选择的业务日期是 20170806 号,替换出的结果是 20170807 呢?其遵循的转换规则为:实际时间=业务日期+1。
1)若数据库是在阿里云上搭建的,且区域是华东2,那么需要将数据库对如下白名单开放,即可连接数据库。
10.152.69.0/24,10.153.136.0/24,10.143.32.0/24,120.27.160.26,10.46.67.156,120.27.160.81,10.46.64.81,121.43.110.160,10.117.39.238,121.43.112.137,10.117.28.203,118.178.84.74,10.27.63.41,118.178.56.228,10.27.63.60,118.178.59.233,10.27.63.38,118.178.142.154,10.27.63.15,100.64.0.0/8
注意:
如果是在阿里云上搭建的数据库,但区域不是华东2,则建议使用外网或购买与数据库同区域的 ECS 做为调度资源,将该 SHELL 任务运行在自定义资源组上。
2)若数据库是自己在本地搭建的,那么建议使用外网连接,且将数据库对上述白名单 IP 开放。
注意:若使用自定义资源组运行 SHELL 任务,必须把自定义资源组的机器 IP 也加到上述白名单中。
关于此文档暂时还没有FAQ