This reference shows usage of Dagster Pipes with other entities in the Dagster system. For a step-by-step walkthrough, refer to the
Dagster Pipes tutorial
.
When launching the subprocess, you may want to make environment variables or additional parameters available in the external process. Extras are arbitrary, user-defined parameters made available on the context object in the external process.
In the external code, you can access extras via the
PipesContext
object:
import os
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes
def main():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
total_orders = len(orders_df)
context = PipesContext.get()
print(context.extras)
print(context.get_extra("foo"))
print(os.environ["MY_ENV_VAR_IN_SUBPROCESS"])
if __name__ == "__main__":
with open_dagster_pipes():
main()
The
run
method to the
PipesSubprocessClient
resource also accepts
env
and
extras
, which allow you to specify environment variables and extra arguments when executing the subprocess:
Note: We're using
os.environ
in this example, but Dagster's recommendation is to use
EnvVar
in production.
import shutil
from dagster import (
AssetExecutionContext,
Definitions,
MaterializeResult,
PipesSubprocessClient,
asset,
file_relative_path,
@asset
def subprocess_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
) -> MaterializeResult:
cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(
command=cmd,
context=context,
extras={"foo": "bar"},
env={
"MY_ENV_VAR_IN_SUBPROCESS": "my_value",
).get_materialize_result()
defs = Definitions(
assets=[subprocess_asset],
resources={"pipes_subprocess_client": PipesSubprocessClient()},
Sometimes, you may not want to materialize an asset, but instead want to report a data quality check result. When your asset has data quality checks defined in
@asset_check
:
From the external code, you can report to Dagster that an asset check has been performed via
PipesContext.report_asset_check
. Note that
asset_key
in this case is required, and must match the asset key defined in
@asset_check
:
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes
def main():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
context = PipesContext.get()
context.report_asset_check(
asset_key="my_asset",
passed=orders_df[["item_id"]].notnull().all().bool(),
check_name="no_empty_order_check",
if __name__ == "__main__":
with open_dagster_pipes():
main()
On Dagster's side, the
PipesClientCompletedInvocation
object returned from
PipesSubprocessClient
includes a
get_asset_check_result
method, which you can use to access the
AssetCheckResult
event reported by the subprocess.
import shutil
from dagster import (
AssetCheckExecutionContext,
AssetCheckResult,
Definitions,
MaterializeResult,
PipesSubprocessClient,
asset,
asset_check,
file_relative_path,
@asset
def my_asset(): ...
@asset_check(asset="my_asset")
def no_empty_order_check(
context: AssetCheckExecutionContext, pipes_subprocess_client: PipesSubprocessClient
) -> AssetCheckResult:
cmd = [
shutil.which("python"),
file_relative_path(__file__, "external_code.py"),
return pipes_subprocess_client.run(
command=cmd, context=context.op_execution_context
).get_asset_check_result()
defs = Definitions(
assets=[my_asset],
asset_checks=[no_empty_order_check],
resources={"pipes_subprocess_client": PipesSubprocessClient()},
Sometimes, you may invoke a single call to an API that results in multiple tables being updated, or you may have a single script that computes multiple assets. In these cases, you can use Dagster Pipes to report back on multiple assets at once.
Note
: when working with multi-assets,
`
PipesContext.report_asset_materialization
may only be called once per unique asset key. If called more than once, an error similar to the following will surface:
Calling {method} with asset key {asset_key} is undefined. Asset has already been materialized, so no additional data can be reported for it
Instead, you’ll need to set the
asset_key
parameter for each instance of
PipesContext.report_asset_materialization
:
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes
def main():
orders_df = pd.DataFrame(
{"order_id": [1, 2, 3], "item_id": [432, 878, 102], "user_id": ["a", "b", "a"]}
total_orders = len(orders_df)
total_users = orders_df["user_id"].nunique()
context = PipesContext.get()
context.report_asset_materialization(
asset_key="orders", metadata={"total_orders": total_orders}
context.report_asset_materialization(
asset_key="users", metadata={"total_users": total_users}
if __name__ == "__main__":
with open_dagster_pipes():
main()
In the Dagster code, you can use
@multi_asset
to define a single asset that represents multiple assets. The
PipesClientCompletedInvocation
object returned from
PipesSubprocessClient
includes a
get_results
method, which you can use to access all the events, such as multiple
AssetMaterializations
and
AssetCheckResults
, reported by the subprocess:
import shutil
from dagster import (
AssetExecutionContext,
AssetSpec,
Definitions,
PipesSubprocessClient,
file_relative_path,
multi_asset,
@multi_asset(specs=[AssetSpec("orders"), AssetSpec("users")])
def subprocess_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
cmd = [
shutil.which("python"),
file_relative_path(__file__, "external_code.py"),
return pipes_subprocess_client.run(command=cmd, context=context).get_results()
defs = Definitions(
assets=[subprocess_asset],
resources={"pipes_subprocess_client": PipesSubprocessClient()},