import datetime
from abc import ABC, abstractmethod
from functools import cached_property
from typing import TYPE_CHECKING, Mapping, Optional, Sequence
import dagster._check as check
from dagster._annotations import experimental, public
from dagster._core.asset_graph_view.asset_graph_view import AssetSlice
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_subset import AssetSubset
from dagster._core.definitions.declarative_automation.serialized_objects import (
AssetSubsetWithMetadata,
AutomationConditionCursor,
AutomationConditionEvaluation,
AutomationConditionNodeCursor,
AutomationConditionNodeSnapshot,
AutomationConditionSnapshot,
get_serializable_candidate_subset,
)
from dagster._core.definitions.partition import AllPartitionsSubset
from dagster._core.definitions.time_window_partitions import BaseTimeWindowPartitionsSubset
from dagster._record import copy
from dagster._serdes.serdes import is_whitelisted_for_serdes_object
from dagster._time import get_current_timestamp
from dagster._utils.security import non_secure_md5_hash_str
from dagster._utils.warnings import disable_dagster_warnings
if TYPE_CHECKING:
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.declarative_automation.automation_context import (
AutomationContext,
)
from dagster._core.definitions.declarative_automation.operands import (
CodeVersionChangedCondition,
CronTickPassedCondition,
FailedAutomationCondition,
InLatestTimeWindowCondition,
InProgressAutomationCondition,
MissingAutomationCondition,
NewlyRequestedCondition,
NewlyUpdatedCondition,
WillBeRequestedCondition,
)
from dagster._core.definitions.declarative_automation.operators import (
AllDepsCondition,
AndAutomationCondition,
AnyDepsCondition,
AnyDownstreamConditionsCondition,
NewlyTrueCondition,
NotAutomationCondition,
OrAutomationCondition,
SinceCondition,
)
[docs]
class AutomationCondition(ABC):
"""An AutomationCondition represents a condition of an asset that impacts whether it should be
automatically executed. For example, you can have a condition which becomes true whenever the
code version of the asset is changed, or whenever an upstream dependency is updated.
.. code-block:: python
from dagster import AutomationCondition, asset
@asset(automation_condition=AutomationCondition.on_cron("0 0 * * *"))
def my_asset(): ...
AutomationConditions may be combined together into expressions using a variety of operators.
.. code-block:: python
from dagster import AssetSelection, AutomationCondition, asset
# any dependencies from the "important" group are missing
any_important_deps_missing = AutomationCondition.any_deps_match(
AutomationCondition.missing(),
).allow(AssetSelection.groups("important"))
# there is a new code version for this asset since the last time it was requested
new_code_version = AutomationCondition.code_version_changed().since(
AutomationCondition.newly_requested()
)
# there is a new code version and no important dependencies are missing
my_condition = new_code_version & ~any_important_deps_missing
@asset(automation_condition=my_condition)
def my_asset(): ...
"""
@property
def requires_cursor(self) -> bool:
return True
@property
def children(self) -> Sequence["AutomationCondition"]:
return []
@property
def description(self) -> str:
"""Human-readable description of when this condition is true."""
return ""
@property
def label(self) -> Optional[str]:
"""User-provided label subjectively describing the purpose of this condition in the broader evaluation tree."""
return None
@property
def name(self) -> str:
"""Formal name of this specific condition, generally aligning with its static constructor."""
return self.__class__.__name__
def get_node_snapshot(self, unique_id: str) -> AutomationConditionNodeSnapshot:
"""Returns a snapshot of this condition that can be used for serialization."""
return AutomationConditionNodeSnapshot(
class_name=self.__class__.__name__,
description=self.description,
unique_id=unique_id,
label=self.label,
name=self.name,
)
def get_snapshot(
self, *, parent_unique_id: Optional[str] = None, index: Optional[int] = None
) -> AutomationConditionSnapshot:
"""Returns a serializable snapshot of the entire AutomationCondition tree."""
unique_id = self.get_unique_id(parent_unique_id=parent_unique_id, index=index)
node_snapshot = self.get_node_snapshot(unique_id)
children = [
child.get_snapshot(parent_unique_id=unique_id, index=i)
for (i, child) in enumerate(self.children)
]
return AutomationConditionSnapshot(node_snapshot=node_snapshot, children=children)
def get_unique_id(self, *, parent_unique_id: Optional[str], index: Optional[int]) -> str:
"""Returns a unique identifier for this condition within the broader condition tree."""
parts = [str(parent_unique_id), str(index), self.__class__.__name__, self.description]
return non_secure_md5_hash_str("".join(parts).encode())
def get_hash(
self, *, parent_unique_id: Optional[str] = None, index: Optional[int] = None
) -> int:
"""Generates a hash based off of the unique ids of all children."""
unique_id = self.get_unique_id(parent_unique_id=parent_unique_id, index=index)
hashes = [hash(unique_id)]
for i, child in enumerate(self.children):
hashes.append(child.get_hash(parent_unique_id=unique_id, index=i))
return hash(tuple(hashes))
def __hash__(self) -> int:
return self.get_hash()
@property
def has_rule_condition(self) -> bool:
from dagster._core.definitions.declarative_automation.legacy import RuleCondition
if isinstance(self, RuleCondition):
return True
return any(child.has_rule_condition for child in self.children)
@property
def is_serializable(self) -> bool:
if not is_whitelisted_for_serdes_object(self):
return False
return all(child.is_serializable for child in self.children)
def as_auto_materialize_policy(self) -> "AutoMaterializePolicy":
"""Returns an AutoMaterializePolicy which contains this condition."""
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
return AutoMaterializePolicy.from_automation_condition(self)
@abstractmethod
def evaluate(self, context: "AutomationContext") -> "AutomationResult":
raise NotImplementedError()
def __and__(self, other: "AutomationCondition") -> "AndAutomationCondition":
from dagster._core.definitions.declarative_automation.operators import (
AndAutomationCondition,
)
# group AndAutomationConditions together
if isinstance(self, AndAutomationCondition):
return AndAutomationCondition(operands=[*self.operands, other])
return AndAutomationCondition(operands=[self, other])
def __or__(self, other: "AutomationCondition") -> "OrAutomationCondition":
from dagster._core.definitions.declarative_automation.operators import OrAutomationCondition
# group OrAutomationConditions together
if isinstance(self, OrAutomationCondition):
return OrAutomationCondition(operands=[*self.operands, other])
return OrAutomationCondition(operands=[self, other])
def __invert__(self) -> "NotAutomationCondition":
from dagster._core.definitions.declarative_automation.operators import (
NotAutomationCondition,
)
return NotAutomationCondition(operand=self)
[docs]
@public
def with_label(self, label: Optional[str]) -> "AutomationCondition":
"""Returns a copy of this AutomationCondition with a human-readable label."""
return copy(self, label=label)
def since(self, reset_condition: "AutomationCondition") -> "SinceCondition":
"""Returns a AutomationCondition that is true if this condition has become true since the
last time the reference condition became true.
"""
from dagster._core.definitions.declarative_automation.operators import SinceCondition
return SinceCondition(trigger_condition=self, reset_condition=reset_condition)
def newly_true(self) -> "NewlyTrueCondition":
"""Returns a AutomationCondition that is true only on the tick that this condition goes
from false to true for a given asset partition.
"""
from dagster._core.definitions.declarative_automation.operators import NewlyTrueCondition
return NewlyTrueCondition(operand=self)
[docs]
@public
@experimental
@staticmethod
def any_deps_match(condition: "AutomationCondition") -> "AnyDepsCondition":
"""Returns a AutomationCondition that is true for an asset partition if at least one partition
of any of its dependencies evaluate to True for the given condition.
Args:
condition (AutomationCondition): The AutomationCondition that will be evaluated against
this asset's dependencies.
"""
from dagster._core.definitions.declarative_automation.operators import AnyDepsCondition
return AnyDepsCondition(operand=condition)
[docs]
@public
@experimental
@staticmethod
def all_deps_match(condition: "AutomationCondition") -> "AllDepsCondition":
"""Returns a AutomationCondition that is true for an asset partition if at least one partition
of all of its dependencies evaluate to True for the given condition.
Args:
condition (AutomationCondition): The AutomationCondition that will be evaluated against
this asset's dependencies.
"""
from dagster._core.definitions.declarative_automation.operators import AllDepsCondition
return AllDepsCondition(operand=condition)
[docs]
@public
@experimental
@staticmethod
def missing() -> "MissingAutomationCondition":
"""Returns a AutomationCondition that is true for an asset partition if it has never been
materialized or observed.
"""
from dagster._core.definitions.declarative_automation.operands import (
MissingAutomationCondition,
)
return MissingAutomationCondition()
[docs]
@public
@experimental
@staticmethod
def in_progress() -> "InProgressAutomationCondition":
"""Returns a AutomationCondition that is true for an asset partition if it is part of an in-progress run."""
from dagster._core.definitions.declarative_automation.operands import (
InProgressAutomationCondition,
)
return InProgressAutomationCondition()
[docs]
@public
@experimental
@staticmethod
def failed() -> "FailedAutomationCondition":
"""Returns a AutomationCondition that is true for an asset partition if its latest run failed."""
from dagster._core.definitions.declarative_automation.operands import (
FailedAutomationCondition,
)
return FailedAutomationCondition()
[docs]
@public
@experimental
@staticmethod
def in_latest_time_window(
lookback_delta: Optional[datetime.timedelta] = None,
) -> "InLatestTimeWindowCondition":
"""Returns a AutomationCondition that is true for an asset partition when it is within the latest
time window.
Args:
lookback_delta (Optional, datetime.timedelta): If provided, the condition will
return all partitions within the provided delta of the end of the latest time window.
For example, if this is used on a daily-partitioned asset with a lookback_delta of
48 hours, this will return the latest two partitions.
"""
from dagster._core.definitions.declarative_automation.operands import (
InLatestTimeWindowCondition,
)
return InLatestTimeWindowCondition.from_lookback_delta(lookback_delta)
[docs]
@public
@experimental
@staticmethod
def will_be_requested() -> "WillBeRequestedCondition":
"""Returns a AutomationCondition that is true for an asset partition if it will be requested this tick."""
from dagster._core.definitions.declarative_automation.operands import (
WillBeRequestedCondition,
)
return WillBeRequestedCondition()
[docs]
@public
@experimental
@staticmethod
def newly_updated() -> "NewlyUpdatedCondition":
"""Returns a AutomationCondition that is true for an asset partition if it has been updated since the previous tick."""
from dagster._core.definitions.declarative_automation.operands import NewlyUpdatedCondition
return NewlyUpdatedCondition()
[docs]
@public
@experimental
@staticmethod
def newly_requested() -> "NewlyRequestedCondition":
"""Returns a AutomationCondition that is true for an asset partition if it was requested on the previous tick."""
from dagster._core.definitions.declarative_automation.operands import (
NewlyRequestedCondition,
)
return NewlyRequestedCondition()
[docs]
@public
@experimental
@staticmethod
def code_version_changed() -> "CodeVersionChangedCondition":
"""Returns a AutomationCondition that is true for an asset partition if its asset's code
version has been changed since the previous tick.
"""
from dagster._core.definitions.declarative_automation.operands import (
CodeVersionChangedCondition,
)
return CodeVersionChangedCondition()
[docs]
@public
@experimental
@staticmethod
def cron_tick_passed(
cron_schedule: str, cron_timezone: str = "UTC"
) -> "CronTickPassedCondition":
"""Returns a AutomationCondition that is true for all asset partitions whenever a cron tick of the provided schedule is passed."""
from dagster._core.definitions.declarative_automation.operands import (
CronTickPassedCondition,
)
return CronTickPassedCondition(cron_schedule=cron_schedule, cron_timezone=cron_timezone)
[docs]
@public
@experimental
@staticmethod
def eager() -> "AutomationCondition":
"""Returns a condition which will "eagerly" fill in missing partitions as they are created,
and ensures unpartitioned assets are updated whenever their dependencies are updated (either
via scheduled execution or ad-hoc runs).
Specifically, this is a composite AutomationCondition which is true for an asset partition
if all of the following are true:
- The asset partition is within the latest time window
- At least one of its parents has been updated more recently than it has been requested, or
the asset partition has never been materialized
- None of its parent partitions are missing
- None of its parent partitions are currently part of an in-progress run
"""
with disable_dagster_warnings():
became_missing_or_any_parents_updated = (
AutomationCondition.missing().newly_true().with_label("became missing")
| AutomationCondition.any_deps_match(
AutomationCondition.newly_updated() | AutomationCondition.will_be_requested()
).with_label("any parents updated")
)
any_parents_missing = AutomationCondition.any_deps_match(
AutomationCondition.missing() & ~AutomationCondition.will_be_requested()
).with_label("any parents missing")
any_parents_in_progress = AutomationCondition.any_deps_match(
AutomationCondition.in_progress()
).with_label("any parents in progress")
return (
AutomationCondition.in_latest_time_window()
& became_missing_or_any_parents_updated.since(
AutomationCondition.newly_requested() | AutomationCondition.newly_updated()
)
& ~any_parents_missing
& ~any_parents_in_progress
& ~AutomationCondition.in_progress()
).with_label("eager")
[docs]
@public
@experimental
@staticmethod
def on_cron(cron_schedule: str, cron_timezone: str = "UTC") -> "AutomationCondition":
"""Returns a condition which will materialize asset partitions within the latest time window
on a given cron schedule, after their parents have been updated. For example, if the
cron_schedule is set to "`0 0 * * *`" (every day at midnight), then this rule will not become
true on a given day until all of its parents have been updated during that same day.
Specifically, this is a composite AutomationCondition which is true for an asset partition
if all of the following are true:
- The asset partition is within the latest time window
- All parent asset partitions have been updated since the latest tick of the provided cron
schedule, or will be requested this tick
- The asset partition has not been requested since the latest tick of the provided cron schedule
"""
with disable_dagster_warnings():
cron_label = f"'{cron_schedule}' ({cron_timezone})"
cron_tick_passed = AutomationCondition.cron_tick_passed(
cron_schedule, cron_timezone
).with_label(f"tick of {cron_label} passed")
all_deps_updated_since_cron = AutomationCondition.all_deps_match(
AutomationCondition.newly_updated().since(cron_tick_passed)
| AutomationCondition.will_be_requested()
).with_label(f"all parents updated since {cron_label}")
return (
AutomationCondition.in_latest_time_window()
& cron_tick_passed.since(AutomationCondition.newly_requested())
& all_deps_updated_since_cron
).with_label(f"on cron {cron_label}")
[docs]
@public
@experimental
@staticmethod
def any_downstream_conditions() -> "AnyDownstreamConditionsCondition":
"""Returns a condition which will represent the union of all distinct downstream conditions."""
from dagster._core.definitions.declarative_automation.operators import (
AnyDownstreamConditionsCondition,
)
return AnyDownstreamConditionsCondition()
class AutomationResult:
"""The result of evaluating an AutomationCondition."""
def __init__(
self,
context: "AutomationContext",
true_slice: AssetSlice,
cursor: Optional[str] = None,
child_results: Optional[Sequence["AutomationResult"]] = None,
**kwargs,
):
from dagster._core.definitions.declarative_automation.automation_context import (
AutomationContext,
)
self._context = check.inst_param(context, "context", AutomationContext)
self._true_slice = check.inst_param(true_slice, "true_slice", AssetSlice)
self._child_results = check.opt_sequence_param(
child_results, "child_results", of_type=AutomationResult
)
self._start_timestamp = context.create_time.timestamp()
self._end_timestamp = get_current_timestamp()
# hidden_param which should only be set by legacy RuleConditions
self._subsets_with_metadata = check.opt_sequence_param(
kwargs.get("subsets_with_metadata"), "subsets_with_metadata", AssetSubsetWithMetadata
)
# hidden_param which should only be set by builtin conditions which require high performance
# in their serdes layer
structured_cursor = kwargs.get("structured_cursor")
invalid_hidden_params = set(kwargs.keys()) - {"subsets_with_metadata", "structured_cursor"}
check.param_invariant(
not invalid_hidden_params, "kwargs", f"Invalid hidden params: {invalid_hidden_params}"
)
check.param_invariant(
not (cursor and structured_cursor),
"structured_cursor",
"Cannot provide both cursor and structured_cursor.",
)
self._extra_state = check.opt_str_param(cursor, "cursor") or structured_cursor
# used to enable the evaluator class to modify the evaluation in some edge cases
self._serializable_evaluation_override: Optional[AutomationConditionEvaluation] = None
@property
def asset_key(self) -> AssetKey:
return self._true_slice.asset_key
@property
def true_slice(self) -> AssetSlice:
return self._true_slice
@property
def true_subset(self) -> AssetSubset:
return self.true_slice.convert_to_valid_asset_subset()
@property
def start_timestamp(self) -> float:
return self._start_timestamp
@property
def end_timestamp(self) -> float:
return self._end_timestamp
@property
def child_results(self) -> Sequence["AutomationResult"]:
return self._child_results
@property
def condition(self) -> AutomationCondition:
return self._context.condition
@property
def condition_unique_id(self) -> str:
return self._context.condition_unique_id
@cached_property
def value_hash(self) -> str:
"""An identifier for the contents of this AutomationResult. This will be identical for
results with identical values, allowing us to avoid storing redundant information.
"""
components: Sequence[str] = [
self.condition_unique_id,
self.condition.description,
_compute_subset_value_str(self.true_subset),
_compute_subset_value_str(
self._context.candidate_slice.convert_to_valid_asset_subset()
),
*(_compute_subset_with_metadata_value_str(swm) for swm in self._subsets_with_metadata),
*(child_result.value_hash for child_result in self._child_results),
]
return non_secure_md5_hash_str("".join(components).encode("utf-8"))
@cached_property
def node_cursor(self) -> Optional[AutomationConditionNodeCursor]:
"""Cursor value storing information about this specific evaluation node, if required."""
if not self.condition.requires_cursor:
return None
return AutomationConditionNodeCursor(
true_subset=self.true_subset,
candidate_subset=get_serializable_candidate_subset(
self._context.candidate_slice.convert_to_valid_asset_subset()
),
subsets_with_metadata=self._subsets_with_metadata,
extra_state=self._extra_state,
)
@cached_property
def _serializable_evaluation(self) -> AutomationConditionEvaluation:
return AutomationConditionEvaluation(
condition_snapshot=self.condition.get_node_snapshot(self.condition_unique_id),
true_subset=self.true_subset,
candidate_subset=get_serializable_candidate_subset(
self._context.candidate_slice.convert_to_valid_asset_subset()
),
subsets_with_metadata=self._subsets_with_metadata,
start_timestamp=self._start_timestamp,
end_timestamp=self._end_timestamp,
child_evaluations=[
child_result.serializable_evaluation for child_result in self._child_results
],
)
@property
def serializable_evaluation(self) -> AutomationConditionEvaluation:
"""Serializable representation of the evaluation of this condition."""
return self._serializable_evaluation_override or self._serializable_evaluation
def set_internal_serializable_evaluation_override(
self, override: AutomationConditionEvaluation
) -> None:
"""Internal method for handling edge cases in which the serializable evaluation must be
updated after evaluation completes.
"""
self._serializable_evaluation_override = override
def get_child_node_cursors(self) -> Mapping[str, AutomationConditionNodeCursor]:
node_cursors = {self.condition_unique_id: self.node_cursor} if self.node_cursor else {}
for child_result in self._child_results:
node_cursors.update(child_result.get_child_node_cursors())
return node_cursors
def get_new_cursor(self) -> AutomationConditionCursor:
return AutomationConditionCursor(
previous_requested_subset=self.serializable_evaluation.true_subset,
effective_timestamp=self._context.evaluation_time.timestamp(),
last_event_id=self._context.max_storage_id,
node_cursors_by_unique_id=self.get_child_node_cursors(),
result_value_hash=self.value_hash,
)
def _compute_subset_value_str(subset: AssetSubset) -> str:
"""Computes a unique string representing a given AssetSubsets. This string will be equal for
equivalent AssetSubsets.
"""
if isinstance(subset.value, bool):
return str(subset.value)
elif isinstance(subset.value, AllPartitionsSubset):
return AllPartitionsSubset.__name__
elif isinstance(subset.value, BaseTimeWindowPartitionsSubset):
return str(
[
(tw.start.timestamp(), tw.end.timestamp())
for tw in sorted(subset.value.included_time_windows)
]
)
else:
return str(list(sorted(subset.asset_partitions)))
def _compute_subset_with_metadata_value_str(subset_with_metadata: AssetSubsetWithMetadata):
return _compute_subset_value_str(subset_with_metadata.subset) + str(
sorted(subset_with_metadata.frozen_metadata)
)