r"""`Op` classes for working with ``numpy.ndarrays`` symbolically.
This module primarily defines `Op`\s for the creation, conversion, and
manipulation of tensors.
import builtins
import warnings
from collections.abc import Sequence
from functools import partial
from numbers import Number
from typing import TYPE_CHECKING
from typing import cast as type_cast
import numpy as np
from numpy.core.multiarray import normalize_axis_index
from numpy.core.numeric import normalize_axis_tuple
import pytensor
import pytensor.scalar.sharedvar
from pytensor import compile, config, printing
from pytensor import scalar as ps
from pytensor.compile.builders import OpFromGraph
from pytensor.gradient import DisconnectedType, grad_undefined
from pytensor.graph import RewriteDatabaseQuery
from pytensor.graph.basic import Apply, Constant, Variable, equal_computations
from pytensor.graph.fg import FunctionGraph, Output
from pytensor.graph.op import Op
from pytensor.graph.replace import _vectorize_node
from pytensor.graph.rewriting.db import EquilibriumDB
from pytensor.graph.type import HasShape, Type
from pytensor.link.c.op import COp
from pytensor.link.c.params_type import ParamsType
from pytensor.misc.safe_asarray import _asarray
from pytensor.printing import Printer, min_informative_str, pprint, set_precedence
from pytensor.raise_op import CheckAndRaise, assert_op
from pytensor.scalar import int32
from pytensor.scalar.basic import ScalarConstant, ScalarVariable
from pytensor.tensor import (
_as_tensor_variable,
_get_vector_length,
as_tensor_variable,
get_vector_length,
from pytensor.tensor.blockwise import Blockwise, vectorize_node_fallback
from pytensor.tensor.elemwise import (
DimShuffle,
Elemwise,
get_normalized_batch_axes,
scalar_elemwise,
from pytensor.tensor.exceptions import NotScalarConstantError
from pytensor.tensor.shape import (
Shape,
Shape_i,
Unbroadcast,
shape,
shape_padaxis,
shape_padleft,
shape_padright,
shape_tuple,
specify_broadcastable,
from pytensor.tensor.type import (
TensorType,
discrete_dtypes,
float_dtypes,
int_dtypes,
integer_dtypes,
tensor,
uint_dtypes,
values_eq_approx_always_true,
from pytensor.tensor.variable import (
TensorConstant,
TensorVariable,
get_unique_constant_value,
if TYPE_CHECKING:
from pytensor.tensor import TensorLike
def __oplist_tag(thing, tag):
tags = getattr(thing, "__oplist_tags", [])
tags.append(tag)
thing.__oplist_tags = tags
@_as_tensor_variable.register(Apply)
def _as_tensor_Apply(x, name, ndim, **kwargs):
# use Apply's default output mechanism
if (x.op.default_output is None) and (len(x.outputs) != 1):
raise TypeError(
"Multi-output Op without default_output encountered. "
"Retry using only one of the outputs directly."
x = x.default_output()
return as_tensor_variable(x, name=name, ndim=ndim, **kwargs)
@_as_tensor_variable.register(ScalarVariable)
@_as_tensor_variable.register(ScalarConstant)
def _as_tensor_Scalar(x, name, ndim, **kwargs):
return as_tensor_variable(tensor_from_scalar(x), name=name, ndim=ndim, **kwargs)
@_as_tensor_variable.register(Variable)
def _as_tensor_Variable(x, name, ndim, **kwargs):
if not isinstance(x.type, TensorType):
raise TypeError(
f"Tensor type field must be a TensorType; found {type(x.type)}."
if ndim is None:
return x
if x.type.ndim > ndim:
# Strip off leading broadcastable dimensions
non_broadcastables = tuple(
idx for idx in range(x.type.ndim) if x.type.shape[idx] != 1
if non_broadcastables:
x = x.dimshuffle(list(range(x.type.ndim))[non_broadcastables[0] :])
else:
x = x.dimshuffle()
if x.ndim > ndim:
raise ValueError(
f"Tensor of type {x.type} could not be cast to have {ndim} dimensions"
return x
elif x.type.ndim < ndim:
return shape_padleft(x, n_ones=(ndim - x.type.ndim))
else:
return x
@_as_tensor_variable.register(list)
@_as_tensor_variable.register(tuple)
def _as_tensor_Sequence(x, name, ndim, dtype=None, **kwargs):
if len(x) == 0:
return constant(x, name=name, ndim=ndim, dtype=dtype)
# If a sequence has `Variable`s in it, then we want
# to customize the conversion to a tensor type.
def extract_constants(i):
if isinstance(i, Variable):
if isinstance(i, Constant):
return i.data
else:
raise TypeError
else:
return i
try:
x = type(x)(extract_constants(i) for i in x)
except TypeError:
if builtins.all(getattr(i, "ndim", None) == 0 for i in x) and (
ndim is None or ndim == 1
# In this instance, we have a sequence of constants with which we
# want to construct a vector, so we can use `MakeVector` directly.
if dtype is None:
dtype = ps.upcast(*[i.dtype for i in x if hasattr(i, "dtype")])
return MakeVector(dtype)(*x)
# In this case, we have at least one non-`Constant` term, so we
# couldn't get an underlying non-symbolic sequence of objects and we to
# symbolically join terms.
return stack(x)
return constant(x, name=name, ndim=ndim, dtype=dtype)
@_as_tensor_variable.register(np.bool_)
@_as_tensor_variable.register(np.number)
@_as_tensor_variable.register(Number)
@_as_tensor_variable.register(np.ndarray)
def _as_tensor_numbers(x, name, ndim, dtype=None, **kwargs):
return constant(x, name=name, ndim=ndim, dtype=dtype)
@_as_tensor_variable.register(bool)
def _as_tensor_bool(x, name, ndim, **kwargs):
raise TypeError(
"Cannot cast True or False as a tensor variable. Please use "
"np.array(True) or np.array(False) if you need these constants. "
"This error might be caused by using the == operator on "
"Variables. v == w does not do what you think it does, "
"use pytensor.tensor.eq(v, w) instead."
as_tensor = as_tensor_variable
[docs]
def constant(x, name=None, ndim=None, dtype=None) -> TensorConstant:
"""Return a `TensorConstant` with value `x`.
Raises
------
TypeError
`x` could not be converted to a numpy.ndarray.
ValueError
`x` could not be expanded to have ndim dimensions.
if isinstance(x, TensorConstant):
if (
(name is None or x.name == name)
and (ndim is None or x.ndim == ndim)
and (dtype is None or x.dtype == dtype)
return x
else:
x = x.data
x_ = ps.convert(x, dtype=dtype)
if ndim is not None:
if x_.ndim < ndim:
x_ = np.expand_dims(x_, axis=tuple(range(ndim - x_.ndim)))
elif x_.ndim > ndim:
try:
x_ = np.squeeze(x_, axis=tuple(range(x_.ndim - ndim)))
except np.AxisError:
raise ValueError(
f"ndarray could not be cast to constant with {int(ndim)} dimensions"
assert x_.ndim == ndim
ttype = TensorType(dtype=x_.dtype, shape=x_.shape)
return TensorConstant(ttype, x_, name=name)