关于 AWS Redshift + dbt 的详情可以参考以下内容:
dbt + Redshift Workshop:
https://catalog.workshops.aws/dbt-cli-and-amazon-Redshift/en-US/introduction
dbt + AWS Redshift 集成:
https://aws.amazon.com/blogs/big-data/manage-data-transformations-with-dbt-in-amazon-Redshift/
dbt + AWS 云原生 CICD 集成:
https://catalog.us-east-1.prod.workshops.aws/workshops/a1f4b750-62c4-4d2b-bdbf-9f7120856d60
使用 dbt 进行数据建模,可以显著提升数据可靠性、可溯源性和可审查性,这些都对构建高质量的现代数据分析系统至关重要,总结下来主要有以下几大优势:
清晰的数据血缘。dbt 通过模型依赖关系,可以非常清晰地表达出数据流转的血缘。数据从原始表到中间层再到应用层的转换链路一目了然。
一致的文档。dbt 可以自动生成数据字典,对所有模型进行统一的文档化。文档中包含字段信息、描述、血缘等 Metadata。
通过测试驱动的数据质量。dbt 内置了测试框架,可以在模型级编写数据测试案例,持续验证数据质量。例如唯一性、完整性、精确性等测试。
问题排查更加高效。若出现数据问题,可以通过血缘链快速定位问题出在哪个转换步骤。文档也有助于理解字段含义。
自动化部署和测试 dbt 支持 CI/CD。可以通过自动化流程来确保任何代码变更都经过验证,避免引入问题。
可 audit 的变更。历史 dbt 项目本身是代码,所有的变更都可存储在 Git 等版本控制工具中,可完整地复核和回溯变更历史。
当使用 Airflow 的 DAG 去调度 dbt Core 项目连接 Redshift 时,引入 Cosmos 作为中间层的效果如下:
从图上可以看出来,差别较为明显,简单总结如下:
利用 Airflow 的数据感知调度功能,在上游数据接入后立即运行模型 ,将每个 dbt 模型转变成带重试、告警等功能的任务或任务组,血缘关系上移到 airflow 可以清晰地排查到具体是模型的哪个部分出现问题并再次重试。
使用 Airflow 连接而不是 dbt profile 来运行 dbt 项目,可以更好地利用 Airflow 的连接管理功能,Airflow 本身支持容错和重运行失败任务,使得 dbt 任务执行更加可靠。
原生支持在虚拟环境中安装和运行 dbt, Cosmos 目前支持本地、Kubernetes 和 Docker 三种运行模式,避免与 Airflow 的依赖冲突。
更多细节见
https://www.astronomer.io/Cosmos/
。
本篇主要探讨使用 Cosmos+dbt+Redshift 以及整个DevOps CICD 的数据管道的具体实现,架构图如下:
Continuous Integration 阶段
:每次触发合并请求时,GitLab Webhook 激活 API Gateway,然后调用 Lambda 函数启动 CodeBuild 作业。CodeBuild 将 DAG 中的代码编译为 Docker 镜像并推送到 Amazon Elastic Container Registry。更多信息见
https://github.com/aws-samples/gitlab-codebuild-quickstart
。通过重写 API Gateway 在 Git 更改时调用的 Lambda 函数,并修改 CodeBuild 的编译过程,我们实现了 dbt 部署的 CI/CD 流水线自动化。这样每次 Git 仓库中的 DAG 定义文件发生变更时,都会触发对 dbt 的重新编译和部署到 Redshift,实现分析工作负载中 dbt 模型的持续交付。
Continuous Deployment 阶段
:CodeBuild 作业会将 DAG 文件上传到 S3 桶。然后,它会在 Amazon Elastic Kubernetes Service 上触发 Kubernetes Pod 来运行 dbt 编译。编译后的 dbt 代码随后会部署到 Redshift 执行。此外,一旦上传到 S3,与 AWS Glue 等其他服务相关的 DAG 也可以用类似的方式触发。监控:每个 DAG 运行完成后,会触发回调,通过 SNS 通知将运行结果发送到 Lambda 函数。然后 Lambda 函数会将运行结果发布到用户常用的企业微信或 Microsoft Teams 等企业协作工具。
接下来,开始进入部署阶段。
1. 准备 GitLab 环境
GitLab 的具体安装方式可以根据需求选择容器化部署或者虚拟机部署。本文档的重点是设计 GitLab 的持续集成和持续交付工作流程。关于安装指南可以参考:
https://docs.gitlab.com/ee/install/aws/
。
2. 在 GitLab 中配置 Webhook
GitLab Webhook 允许外部系统在 GitLab 中的某些事件发生时得到通知。它可以集成 CI/CD 流水线,以便在代码推送到 GitLab 时自动触发构建。
2.1 在 GitLab 的左侧导航栏点击
设置
,然后点击辅助菜单中的 Webhooks。
2.2 在 Webhook
配置
页面填写在 4.1.1 中创建的 API Gateway 的端点,填写 Secret token,选择
Merge request events,
然后点击添加 Webhook。
2.3 记录您的 Secret token,它会在下一部分中使用。可以先任意填一个网页 url,之后用 API Gateway 生成的 url 替换它。
注意:如果选择不同的 GitLab 事件需要相应地修改 Lambda 函数代码以处理替代请求。本篇的 Lambda 主要是针对 merge request events 的处理。
3. 在 GitLab 中配置 API 访问权限
GitLab API 访问令牌用于认证 API 请求并授权访问 GitLab API。在本例中,Lambda 可以使用该令牌与 GitLab 服务进行交互。
– 在令牌名称字段中输入一个可识别的
名称
。
– 在选择角色时,
角色级别不能低于
Reporter
。
– 选择
read_api
和
read_repository
选项。
– 点击创建项目访问令牌。
– 妥善保存好项目访问令牌。编写 CodeBuild 脚本时会使用在这里生成的令牌。
4. 用 ARM 架构在 AWS 上实现无服务器部署 API Gateway、Lambda 和 CodeBuild
由于 ARM 架构 CPU 与传统的 x86 CPU 相比,为云中的虚拟化工作负载提供了明显的性价比优势,我们在本示例中选择了 Graviton 类型实例来构建代码。
4.1 在 ECR 中创建存储库
使用 AWS 控制台创建一个新的私有 ECR 存储库来存储容器镜像。
4.2 在 arm 的机器上创建一个 dockerfile 以及 requirment.txt 文件,参考
https://github.com/ziling777/cicd/tree/main/base
。构建镜像并推送到 ECR 的私有镜像仓库并记录镜像的 URI 到记录该镜像的 URI 供后续使用。
sudo systemctl start docker
aws ecr get-login-password --region us-east-2 |sudo docker login --username AWS --password-stdin AWS-ACCOUNT.dkr.ecr.us-east-2.amazonaws.com
sudo docker build -t cicd .
sudo docker tag cicd:latest AWS-ACCOUNT.dkr.ecr.us-east-2.amazonaws.com/cicd:latest
sudo docker tag cicd:latest AWS-ACCOUNT.dkr.ecr.us-east-2.amazonaws.com/cicd:latest
sudo docker push AWS-ACCOUNT.dkr.ecr.us-east-2.amazonaws.com/cicd:latest
4.3 创建 CodeCommit 仓库来保存 build.yaml、dockerfile 和 kubectl.config 文件。前两个文件的内容,可参考 https://github.com/ziling777/cicd/tree/main/codecommit。
Replace AWS_CCOUNT into your own account id in build.yaml file.
Replace AWS_REGION into your own region to be provisioned.
a)build.yaml:这是CodeBuild构建项目的配置文件,定义了构建环境、构建命令等信息。通过这个文件可以指定构建过程中的各种配置,比如构建的运行环境、需要安装的依赖、构建和测试命令等。
b)dockerfile:这是一个Docker镜像构建文件的配置清单,定义了如何将应用程序打包成Docker镜像。dockerfile中会指定基础镜像、需要安装的依赖、应用程序代码的复制、启动命令等步骤。通过dockerfile可以将应用程序及其运行环境打包为一个镜像。
c)kubectl.config 文件生成请参考 https://docs.aws.amazon.com/mwaa/latest/userguide/mwaa-eks-example.html#eksctl-kube-config,该文件将在代码构建期间自动复制到 S3,用于 airflow 和 kubernetes 集群之间的交互。
在首次连接 AWS CodeCommit 之前,您必须完成以下初始配置步骤:https://docs.aws.amazon.com/codecommit/latest/userguide/setting-up-https-unixes.html。
git init
git remote add origin git-codecommit.us-east-2.amazonaws.com/v1/repos/pipeline-code-prod
git branch -M main
git add .
git commit -m 'first cicd'
git push -u origin main
4.4 创建 codebuild 并记下 project 的名字
添加环境变量:
– GIT_BRANCH: The Git branch name to pull the code from, such as main or master
– GIT_PROJECT_NAME: The Git project name, such as airflow or dbt-project
– GIT_ACCESS_TOKEN: The Personal Access Token used to access the Git project
– GIT_PROJECT_ID: The ID of the Git project, used to build CI/CD pipelines
– GIT_PROJECT_IID: The IID of the Git merge request id.
– GIT_PROJECT_URL: The HTTP/SSH URL of the Git project
– REPOSITORY_IMAGE_PREI: The ECR image Project name
– REPOSITORY_URI: The ECR URI
– REPOSITORY_TAG: The ECR repository tag
– MWAA_DAGS_DIR: The directory of DAG files in MWAA.
GitLab 用户名:访问 GitLab 的用户名。
GitLab 密码:访问 GitLab 的密码。将存储在 AWS Secrets Manager 中。
Webhook token:Webhook_token 和 git 访问令牌在第一部分中生成,格式如下:
{“webhook_token”:”xxxxx”,”git_access_token”:”xxxxx”}
Lambda 层 S3 桶:下载 https://github.com/ziling777/cicd/blob/main/cloudformation/boto3.zip 到任意部署区域的 S3 桶然后填入桶的名字。
其余选项默认。
4.6 创建 Amazon MWAA 环境,版本为 2.5.1,步骤可参见 https://docs.aws.amazon.com/mwaa/latest/userguide/what-is-mwaa.html。
创建 requirement.txt 文件如下,让 MWAA 可以自动安装相关插件,然后将其上传到 MWAA 的桶中,并在配置 MWAA 时指定它:
#--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.10.txt"
#kubernetes==23.6.0
#apache-airflow[cncf.kubernetes]
airflow-dbt-python[Redshift,s3]
astronomer-Cosmos[dbt-Redshift,kubernetes]==0.7.3
apache-airflow-providers-cncf-kubernetes==7.2.0
"password":"xxxxx",
"schema":"dbt",
"host":"xxxxxx.us-east-2.Redshift-serverless.amazonaws.com",
"port":"5439",
"dbname":"dev"}
给 Amazon MWAA 的 service role 添加 Amazon Redshift, Amazon secret manager, Amazon S3 的权限。
dbt project workshop 参考 https://catalog.workshops.aws/dbt-cli-and-amazon-Redshift/en-US/introduction。Lab3 完成之后最终生成的 project 文件夹目录参考 https://github.com/ziling777/cicd/tree/main/gitlab_dbt。这里需要确保这个 dbt project 的名字和在 gitlab 提交的 project 名字相同。
接下是最重要的部分,通过 Amazon MWAA 的 DAG 在 Amazon Elastic Kubernetes Service 上创建 pod 部署 dbt/Cosmos 生成 SQL 模型,在 Redshift 执行,原始 DAG 文件可以参考 https://github.com/ziling777/cicd/blob/main/gitlab_dbt/dags/eks.py。以下是关于 Cosmos 的参数的解释说明:
import os
import boto3
import json
from datetime import datetime
from pathlib import Path
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from Cosmos.providers.dbt import DbtTaskGroup
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
#from kubernetes.client import V1EnvVar
#from kubernetes.client import V1ResourceRequirements
# Cosmos 使用 Airflow 驱动调度,你可以利用 Airflow 的调度能力来调度你的 dbt 项目。
@dag(
schedule_interval="@daily",
start_date=datetime(2023, 7, 24),
catchup=False,
def basic_eks_Cosmos_task_group() -> None:
# 从 secret manager 读取 Redshift 的连接信息,关于 Redshift 的连接参考 https://astronomer.github.io/astronomer-Cosmos/profiles/RedshiftUserPassword.html。
def get_prod_Redshift_bi_conn_json():
secrets = boto3.client('secretsmanager')
secretValues = secrets.get_secret_value(SecretId="Redshiftsecretname")['SecretString']
dic_secrets_json = json.loads(secretValues)
# prod_Redshift_bi_str = dic_secrets_json['prod_Redshift_bi']
# prod_Redshift_bi_json = json.loads(dic_secrets_json)
return dic_secrets_json
prod_Redshift_bi_conn=get_prod_Redshift_bi_conn_json()
# 这个 operator 不会执行动作,它可以用来在 DAG 中对任务进行分组。任务会被调度程序评估但从不会被执行程序处理。
pre_dbt = EmptyOperator(task_id="pre_dbt")
# 注意:您的 dbt 项目可以放在 Airflow 可以访问的任何位置。默认情况下,Cosmos 会在 /usr/local/airflow/dags/dbt 目录中查找,但是您可以通过在创建 DAG 实例时设置 dbt_project_dir 参数来更改此位置。
_project_dir= "/usr/local/airflow/dags/dbt_projects_dir/"
# 这个 DAG 使用 Cosmos 包中的 DbtTaskGroup 类来从 dbt 项目中的模型创建一个任务组。dbt 模型之间的依赖关系会自动变成 Airflow 任务之间的依赖关系。请确保为 YOUR_NAME、DB_NAME 和 SCHEMA_NAME 添加自己的值。
# Cosmos 可以使用四种不同的方法(称为执行模式)来运行 dbt 命令,目前我们使用的是 kubernetes 模式。这四种模式的区别和优势可以参考 https://astronomer.github.io/astronomer-Cosmos/getting_started/execution-modes.html。
local: 使用本地 dbt 安装运行 dbt 命令(默认)
virtualenv: 从 Cosmos 管理的 Python 虚拟环境中运行 dbt 命令
docker: 从 Cosmos 管理的Docker容器中运行 dbt 命令
kubernetes: 从 Cosmos 管理的 Kubernetes Pod 中运行 dbt 命令
Redshift_dbt_group = DbtTaskGroup(
dbt_root_path=_project_dir,
dbt_project_name="dbtcicd",
execution_mode="kubernetes",
conn_id="bi-poc-Redshift",
# Cosmos 允许你在每个 DbtDag / DbtTaskGroup 中使用 RenderConfig 类中的 select 和 exclude 参数过滤 dbt 项目的一个子集
select={"configs": ["tags:finance"]},
# Cosmos 的 DBTRunkubernetesOperator 和 DbtTestKubernetesOperator 都继承了 Airflow 的 KubernetesPodOperator(请见https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/_api/airflow/providers/cncf/kubernetes/operators/pod/index.html)。 Cosmos 将组装参数传递给 KubernatesPodOperator,具体传递了哪些参数请见 Cosmos 源码。
operator_args={
# task 之间是否有消息交互
"do_xcom_push": False,
# 在基础镜像的 dockerfile 中 dbt 项目文件会复制到容器的/app 目录下
"project_dir":"/app",
"namespace":"eks namespace",
"image": "ecr image uri",
"get_logs": True,
"is_delete_operator_pod": True,
# "labels": {"app": "bigdata-dw"},
"name": "mwaa-Cosmos-pod-dbt",
"config_file": "/usr/local/airflow/dags/kubeconfig file name",
"in_cluster": False,
# 通过在提供给 operator_args 参数的字典中使用 vars 关键字,可以向 dbt 项目中注入变量。如果 dbt 项目中包含 dbt 测试,它们将在模型完成后直接运行。请注意,最佳实践是为运行 dbt 模型的所有任务将重试次数设置为至少 2。
"vars": '{"my_car": "val1"}',
"env_vars": {"TARGT": "prod_password",
"DB_HOST": prod_Redshift_bi_conn['host'],
"DB_PORT": str(prod_Redshift_bi_conn['port']),
"DB_USER": prod_Redshift_bi_conn['username'],
"DB_PASSWORD": prod_Redshift_bi_conn['password'],
"DB_NAME": prod_Redshift_bi_conn['dbname'],
"DB_SCHEMA": prod_Redshift_bi_conn['schema']
# service_account_name当 IAM 连接 Redshift 时使用
#"service_account_name": "prod-bigdata-bi-Redshift-iam-sa",
#"cluster_context": "aws",
"image_pull_policy": "Always",
# "tolerations": tolerations,
#"node_selector": nodeSelector
#"resources": resources
post_dbt = EmptyOperator(task_id="post_dbt")
pre_dbt >> Redshift_dbt_group >> post_dbt
basic_eks_Cosmos_task_group()
4.7 在 CodeBuild 项目中配置通知
配置要用于通知的 SNS 主题 注意:SNS 主题的名称必须以”codestar-notifications-“开头,否则它可以关联但 CodeBuild 无法连接到配置的主题。
通过结合 GitLab、Amazon API Gateway、Amazon Lambda 和 Amazon CodeBuild,可以实现一个高度可扩展、自动化程度高、Serverless 架构的 CI/CD 工作流,具有快速迭代、高效协作、安全可控、低成本和高可用等优势,能够大幅提升研发交付效率。
使用 Amazon MWAA,Cosmos 插件,dbt 和 Amazon Redshift 可以轻松设置和管理 ELT 数据转换的流程,大大简化了数据分析的流程,快速构建可扩展的分析管道。