Source code for psydac.linalg.utilities

# coding: utf-8

import numpy as np
from math import sqrt

from psydac.linalg.basic   import Vector
from psydac.linalg.stencil import StencilVectorSpace, StencilVector
from psydac.linalg.block   import BlockVector, BlockVectorSpace
from psydac.linalg.topetsc import petsc_local_to_psydac, get_npts_per_block

__all__ = (
    'array_to_psydac',
    'petsc_to_psydac',
    '_sym_ortho'
)

#==============================================================================
[docs] def array_to_psydac(x, V): """ Convert a NumPy array to a Vector of the space V. This function is designed to be the inverse of the method .toarray() of the class Vector. Note: This function works in parallel but it is very costly and should be avoided if performance is a priority. Parameters ---------- x : numpy.ndarray Array to be converted. It only contains the true data, the ghost regions must not be included. V : psydac.linalg.stencil.StencilVectorSpace or psydac.linalg.block.BlockVectorSpace Space of the final Psydac Vector. Returns ------- u : psydac.linalg.stencil.StencilVector or psydac.linalg.block.BlockVector Element of space V, the coefficients of which (excluding ghost regions) are the entries of x. The ghost regions of u are up to date. """ assert x.ndim == 1, 'Array must be 1D.' if x.dtype==complex: assert V.dtype==complex, 'Complex array cannot be converted to a real StencilVector' assert x.size == V.dimension, 'Array must have the same global size as the space.' u = V.zeros() _array_to_psydac_recursive(x, u) u.update_ghost_regions() return u
def _array_to_psydac_recursive(x, u): """ Recursive function filling in the coefficients of each block of u. """ assert isinstance(u, Vector) V = u.space assert x.ndim == 1, 'Array must be 1D.' if x.dtype==complex: assert V.dtype==complex, 'Complex array cannot be converted to a real StencilVector' assert x.size == V.dimension, 'Array must have the same global size as the space.' if isinstance(V, BlockVectorSpace): for i, V_i in enumerate(V.spaces): x_i = x[:V_i.dimension] x = x[V_i.dimension:] u_i = u[i] _array_to_psydac_recursive(x_i, u_i) elif isinstance(V, StencilVectorSpace): index_global = tuple(slice(s, e+1) for s, e in zip(V.starts, V.ends)) u[index_global] = x.reshape(V.npts)[index_global] else: raise NotImplementedError(f'Can only handle StencilVector or BlockVector spaces, got {type(V)} instead') #==============================================================================
[docs] def petsc_to_psydac(x, Xh): """ Convert a PETSc.Vec object to a StencilVector or BlockVector. It assumes that PETSc was installed with the configuration for complex numbers. Uses the index conversion functions in psydac.linalg.topetsc.py. Parameters ---------- x : PETSc.Vec PETSc vector Returns ------- u : psydac.linalg.stencil.StencilVector | psydac.linalg.block.BlockVector Psydac vector. In the case of a BlockVector, the blocks must be StencilVector. The general case is not yet implemented. """ if isinstance(Xh, BlockVectorSpace): if any([isinstance(Xh.spaces[b], BlockVectorSpace) for b in range(len(Xh.spaces))]): raise NotImplementedError('Block of blocks not implemented.') u = BlockVector(Xh) comm = x.comm dtype = Xh._dtype localsize, globalsize = x.getSizes() assert globalsize == u.shape[0], 'Sizes of global vectors do not match' # Find shift for process k: # ..get number of points for each block, each process and each dimension: npts_local_per_block_per_process = np.array(get_npts_per_block(Xh)) #indexed [b,k,d] for block b and process k and dimension d # ..get local sizes for each block and each process: local_sizes_per_block_per_process = np.prod(npts_local_per_block_per_process, axis=-1) #indexed [b,k] for block b and process k # ..sum the sizes over all the blocks and the previous processes: index_shift = 0 + np.sum(local_sizes_per_block_per_process[:,:comm.Get_rank()], dtype=int) #global variable for local_petsc_index in range(localsize): block_index, psydac_index = petsc_local_to_psydac(Xh, local_petsc_index) # Get value of local PETSc vector passing the global PETSc index value = x.getValue(local_petsc_index + index_shift) if value != 0: u[block_index[0]]._data[psydac_index] = value if dtype is complex else value.real # PETSc always handles dtype specified in the installation configuration elif isinstance(Xh, StencilVectorSpace): u = StencilVector(Xh) comm = x.comm dtype = Xh.dtype localsize, globalsize = x.getSizes() assert globalsize == u.shape[0], 'Sizes of global vectors do not match' # Find shift for process k: # ..get number of points for each process and each dimension: npts_local_per_block_per_process = np.array(get_npts_per_block(Xh))[0] #indexed [k,d] for process k and dimension d # ..get local sizes for each process: local_sizes_per_block_per_process = np.prod(npts_local_per_block_per_process, axis=-1) #indexed [k] for process k # ..sum the sizes over all the previous processes: index_shift = 0 + np.sum(local_sizes_per_block_per_process[:comm.Get_rank()], dtype=int) #global variable for local_petsc_index in range(localsize): block_index, psydac_index = petsc_local_to_psydac(Xh, local_petsc_index) # Get value of local PETSc vector passing the global PETSc index value = x.getValue(local_petsc_index + index_shift) if value != 0: u._data[psydac_index] = value if dtype is complex else value.real # PETSc always handles dtype specified in the installation configuration else: raise ValueError('Xh must be a StencilVectorSpace or a BlockVectorSpace') u.update_ghost_regions() return u
#============================================================================== def _sym_ortho(a, b): """ Stable implementation of Givens rotation. This function was taken from the scipy repository https://github.com/scipy/scipy/blob/master/scipy/sparse/linalg/isolve/lsqr.py Notes ----- The routine 'SymOrtho' was added for numerical stability. This is recommended by S.-C. Choi in [1]_. It removes the unpleasant potential of ``1/eps`` in some important places (see, for example text following "Compute the next plane rotation Qk" in minres.py). References ---------- .. [1] S.-C. Choi, "Iterative Methods for Singular Linear Equations and Least-Squares Problems", Dissertation, http://www.stanford.edu/group/SOL/dissertations/sou-cheng-choi-thesis.pdf """ if b == 0: return np.sign(a), 0, abs(a) elif a == 0: return 0, np.sign(b), abs(b) elif abs(b) > abs(a): tau = a / b s = np.sign(b) / sqrt(1 + tau * tau) c = s * tau r = b / s else: tau = b / a c = np.sign(a) / sqrt(1+tau*tau) s = c * tau r = a / c return c, s, r