from __future__ import annotations
from types import ModuleType
import abc
import numpy as np
[docs]class LinearOperator(abc.ABC):
"""abstract base class for linear operators"""
[docs] def __init__(self, xp: ModuleType):
"""init method
Parameters
----------
xp : ModuleType
numpy or cupy module
"""
self._scale = 1
self._xp = xp
@property
@abc.abstractmethod
def in_shape(self) -> tuple[int, ...]:
"""shape of the input array"""
raise NotImplementedError
@property
@abc.abstractmethod
def out_shape(self) -> tuple[int, ...]:
"""shape of the output array"""
raise NotImplementedError
@property
def scale(self) -> int | float | complex:
"""scalar factor applied to the linear operator"""
return self._scale
@scale.setter
def scale(self, value: int | float | complex):
if not np.isscalar(value):
raise ValueError
self._scale = value
@property
def xp(self) -> ModuleType:
"""module type (numpy or cupy) of the operator"""
return self._xp
[docs] @abc.abstractmethod
def _apply(self, x):
"""forward step y = Ax"""
raise NotImplementedError
[docs] @abc.abstractmethod
def _adjoint(self, y):
"""adjoint step x = A^H y"""
raise NotImplementedError
[docs] def apply(self, x):
"""forward step y = scale * Ax
Parameters
----------
x : numpy or cupy array
Returns
-------
numpy or cupy array
"""
if self._scale == 1:
return self._apply(x)
else:
return self._scale * self._apply(x)
[docs] def __call__(self, x):
return self.apply(x)
[docs] def adjoint(self, y):
"""adjoint step x = conj(scale) * A^H y
Parameters
----------
y : numpy or cupy array
Returns
-------
numpy or cupy array
"""
if self._scale == 1:
return self._adjoint(y)
else:
return np.conj(self._scale) * self._adjoint(y)
[docs] def adjointness_test(self, verbose=False, iscomplex=False, **kwargs):
"""test whether the adjoint is correctly implemented
Parameters
----------
verbose : bool, optional
verbose output
iscomplex : bool, optional
use complex arrays
"""
x = self.xp.random.rand(*self.in_shape)
y = self.xp.random.rand(*self.out_shape)
if iscomplex:
x = x + 1j * self.xp.random.rand(*self.in_shape)
if iscomplex:
y = y + 1j * self.xp.random.rand(*self.out_shape)
x_fwd = self.apply(x)
y_adj = self.adjoint(y)
ip1 = (x_fwd.conj() * y).sum()
ip2 = (x.conj() * y_adj).sum()
if verbose:
print(ip1, ip2)
assert (self.xp.isclose(ip1, ip2, **kwargs))
[docs] def norm(self, num_iter=30, iscomplex=False, verbose=False) -> float:
"""estimate norm of the linear operator using power iterations
Parameters
----------
num_iter : int, optional
number of power iterations
iscomplex : bool, optional
use complex arrays
verbose : bool, optional
verbose output
Returns
-------
float
the norm of the linear operator
"""
x = self.xp.random.rand(*self.in_shape)
if iscomplex:
x = x + 1j * self.xp.random.rand(*self.in_shape)
for i in range(num_iter):
x = self.adjoint(self.apply(x))
norm_squared = self.xp.linalg.norm(x)
x /= norm_squared
if verbose:
print(f'{(i+1):03} {self.xp.sqrt(norm_squared):.2E}')
return self.xp.sqrt(norm_squared)
[docs]class MatrixOperator(LinearOperator):
"""Linear Operator defined by dense matrix multiplication"""
[docs] def __init__(self, A, xp):
"""init method
Parameters
----------
A : 2D numpy or cupy real of complex array
xp: ModuleType
numpy or cupy module
"""
super().__init__(xp)
self._A = A
if self._A.dtype.kind == 'c':
self._dtype_kind = 'complex'
else:
self._dtype_kind = 'float'
@property
def in_shape(self):
return (self._A.shape[1], )
@property
def out_shape(self):
return (self._A.shape[0], )
@property
def A(self):
return self._A
[docs] def _apply(self, x):
"""forward step y = Ax"""
return self._A @ x
[docs] def _adjoint(self, y):
"""adjoint step x = A^H y"""
return self._A.conj().T @ y
[docs]class CompositeLinearOperator(LinearOperator):
"""Composite Linear Operator defined by a sequence of Linear Operators
Given a tuple of operators (A_0, ..., A_{n-1}) the composite operator is defined as
A(x) = A0( A1( ... ( A_{n-1}(x) ) ) )
"""
[docs] def __init__(self, operators: tuple[LinearOperator, ...]):
"""init method
Parameters
----------
operators : tuple[LinearOperator, ...]
tuple of linear operators
"""
super().__init__(operators[0].xp)
self._operators = operators
@property
def in_shape(self):
return self._operators[-1].in_shape
@property
def out_shape(self):
return self._operators[0].out_shape
@property
def operators(self) -> tuple[LinearOperator, ...]:
"""tuple of linear operators"""
return self._operators
[docs] def _apply(self, x):
"""forward step y = Ax"""
y = x
for op in self._operators[::-1]:
y = op(y)
return y
[docs] def _adjoint(self, y):
"""adjoint step x = A^H y"""
x = y
for op in self._operators:
x = op.adjoint(x)
return x
[docs]class ElementwiseMultiplicationOperator(LinearOperator):
"""Element-wise multiplication operator (multiplication with a diagonal matrix)"""
[docs] def __init__(self, values, xp):
"""init method
Parameters
----------
values : numpy or cupy array
values of the diagonal matrix
xp: ModuleType
numpy or cupy module
"""
super().__init__(xp)
self._values = values
@property
def in_shape(self):
return self._values.shape
@property
def out_shape(self):
return self._values.shape
[docs] def _apply(self, x):
"""forward step y = Ax"""
return self._values * x
[docs] def _adjoint(self, y):
"""adjoint step x = A^H y"""
return self._values.conj() * y
[docs]class GaussianFilterOperator(LinearOperator):
"""Gaussian filter operator"""
[docs] def __init__(self, in_shape, ndimage_module, xp, **kwargs):
"""init method
Parameters
----------
in_shape : tuple[int, ...]
shape of the input array
ndimage_module : scipy or cupyx.scipy module
xp: ModuleType
numpy or cupy module
**kwargs : sometype
passed to the ndimage gaussian_filter function
"""
super().__init__(xp)
self._in_shape = in_shape
self._ndimage_module = ndimage_module
self._kwargs = kwargs
@property
def in_shape(self):
return self._in_shape
@property
def out_shape(self):
return self._in_shape
[docs] def _apply(self, x):
"""forward step y = Ax"""
return self._ndimage_module.gaussian_filter(x, **self._kwargs)
[docs] def _adjoint(self, y):
"""adjoint step x = A^H y"""
return self._apply(y)
[docs]class VstackOperator(LinearOperator):
"""Stacking operator for stacking multiple linear operators vertically"""
[docs] def __init__(self, operators: tuple[LinearOperator, ...]) -> None:
"""init method
Parameters
----------
operators : tuple[LinearOperator, ...]
tuple of linear operators
"""
super().__init__(operators[0].xp)
self._operators = operators
self._in_shape = self._operators[0].in_shape
self._out_shapes = tuple([x.out_shape for x in operators])
self._raveled_out_shapes = tuple([np.prod(x) for x in self._out_shapes])
self._out_shape = (sum(self._raveled_out_shapes), )
# setup the slices for slicing the raveled output array
self._slices = []
start = 0
for length in self._raveled_out_shapes:
end = start + length
self._slices.append(slice(start, end, None))
start = end
self._slices = tuple(self._slices)
@property
def in_shape(self):
return self._in_shape
@property
def out_shape(self):
return self._out_shape
[docs] def _apply(self, x):
y = self.xp.zeros(self._out_shape, dtype=x.dtype)
for i, op in enumerate(self._operators):
y[self._slices[i]] = op(x).ravel()
return y
[docs] def _adjoint(self, y):
x = self.xp.zeros(self._in_shape, dtype=y.dtype)
for i, op in enumerate(self._operators):
x += op.adjoint(y[self._slices[i]].reshape(self._out_shapes[i]))
return x
[docs]class SubsetOperator:
"""Operator split into subsets"""
[docs] def __init__(self, operators: tuple[LinearOperator, ...]) -> None:
"""init method
Parameters
----------
operators : tuple[LinearOperator, ...]
tuple of linear operators
"""
self._operators = operators
self._in_shape = self._operators[0].in_shape
self._out_shapes = tuple([x.out_shape for x in operators])
self._num_subsets = len(operators)
@property
def in_shape(self):
return self._in_shape
@property
def out_shapes(self):
return self._out_shapes
@property
def operators(self) -> tuple[LinearOperator, ...]:
return self._operators
@property
def num_subsets(self):
return self._num_subsets
[docs] def apply(self, x):
"""A_i x for all subsets i"""
y = [op(x) for op in self._operators]
return y
[docs] def __call__(self, x):
return self.apply(x)
[docs] def adjoint(self, y):
"""A_i^H x for all subsets i"""
x = []
for i, op in enumerate(self._operators):
x.append(op.adjoint(y[i]))
return x
[docs] def apply_subset(self, x, i):
"""A_i x for a given subset i"""
return self._operators[i](x)
[docs] def adjoint_subset(self, x, i):
"""A_i^H x for a given subset i"""
return self._operators[i].adjoint(x)
[docs] def norms(self):
"""norm(A_i) for all subsets i"""
return [op.norm() for op in self._operators]