Source code for psydac.feec.multipatch.operators
# coding: utf-8
# Conga operators on piecewise (broken) de Rham sequences
from sympy import Tuple
from mpi4py import MPI
import os
import numpy as np
from scipy.sparse import save_npz, load_npz
from scipy.sparse import kron, block_diag
from scipy.sparse.linalg import inv
from sympde.topology import Boundary, Interface, Union
from sympde.topology import element_of, elements_of
from sympde.topology.space import ScalarFunction
from sympde.calculus import grad, dot, inner, rot, div
from sympde.calculus import laplace, bracket, convect
from sympde.calculus import jump, avg, Dn, minus, plus
from sympde.expr.expr import LinearForm, BilinearForm
from sympde.expr.expr import integral
from psydac.core.bsplines import collocation_matrix, histopolation_matrix
from psydac.api.discretization import discretize
from psydac.api.essential_bc import apply_essential_bc_stencil
from psydac.api.settings import PSYDAC_BACKENDS
from psydac.linalg.block import BlockVectorSpace, BlockVector, BlockLinearOperator
from psydac.linalg.stencil import StencilVector, StencilMatrix, StencilInterfaceMatrix
from psydac.linalg.solvers import inverse
from psydac.fem.basic import FemField
from psydac.feec.global_projectors import Projector_H1, Projector_Hcurl, Projector_L2
from psydac.feec.derivatives import Gradient_2D, ScalarCurl_2D
from psydac.feec.multipatch.fem_linear_operators import FemLinearOperator
[docs]
def get_patch_index_from_face(domain, face):
""" Return the patch index of subdomain/boundary
Parameters
----------
domain : <Sympde.topology.Domain>
The Symbolic domain
face : <Sympde.topology.BasicDomain>
A patch or a boundary of a patch
Returns
-------
i : <int>
The index of a subdomain/boundary in the multipatch domain
"""
if domain.mapping:
domain = domain.logical_domain
if face.mapping:
face = face.logical_domain
domains = domain.interior.args
if isinstance(face, Interface):
raise NotImplementedError(
"This face is an interface, it has several indices -- I am a machine, I cannot choose. Help.")
elif isinstance(face, Boundary):
i = domains.index(face.domain)
else:
i = domains.index(face)
return i
[docs]
def get_interface_from_corners(corner1, corner2, domain):
""" Return the interface between two corners from two different patches that correspond to a single (physical) vertex.
Parameters
----------
corner1 : <Sympde.topology.Corner>
The first corner of the 2D interface
corner2 : <Sympde.topology.Corner>
The second corner of the 2D interface
domain : <Sympde.topology.Domain>
The Symbolic domain
Returns
-------
interface: <Sympde.topology.Interface|None>
The interface between two vertices
"""
interface = []
interfaces = domain.interfaces
if not isinstance(interfaces, Union):
interfaces = (interfaces,)
for i in interfaces:
if i.plus.domain in [corner1.domain, corner2.domain]:
if i.minus.domain in [corner1.domain, corner2.domain]:
interface.append(i)
bd1 = corner1.boundaries
bd2 = corner2.boundaries
new_interface = []
for i in interface:
if i.minus in bd1 + bd2:
if i.plus in bd2 + bd1:
new_interface.append(i)
if len(new_interface) == 1:
return new_interface[0]
if len(new_interface) > 1:
raise ValueError(
'found more than one interface for the corners {} and {}'.format(
corner1, corner2))
return None
[docs]
def get_row_col_index(corner1, corner2, interface, axis, V1, V2):
""" Return the row and column index of a corner in the StencilInterfaceMatrix
for dofs of H1 type spaces
Parameters
----------
corner1 : <Sympde.topology.Corner>
The first corner of the 2D interface
corner2 : <Sympde.topology.Corner>
The second corner of the 2D interface
interface : <Sympde.topology.Interface|None>
The interface between the two corners
axis : <int|None>
Axis of the interface
V1 : <FemSpace>
Test Space
V2 : <FemSpace>
Trial Space
Returns
-------
index: <list>
The StencilInterfaceMatrix index of the corner, it has the form (i1, i2, k1, k2) in 2D,
where (i1, i2) identifies the row and (k1, k2) the diagonal.
"""
start = V1.vector_space.starts
end = V1.vector_space.ends
degree = V2.degree
start_end = (start, end)
row = [None] * len(start)
col = [0] * len(start)
assert corner1.boundaries[0].axis == corner2.boundaries[0].axis
for bd in corner1.boundaries:
row[bd.axis] = start_end[(bd.ext + 1) // 2][bd.axis]
if interface is None and corner1.domain != corner2.domain:
bd = [i for i in corner1.boundaries if i.axis == axis][0]
if bd.ext == 1:
row[bd.axis] = degree[bd.axis]
if interface is None:
return row + col
axis = interface.axis
if interface.minus.domain == corner1.domain:
if interface.minus.ext == -1:
row[axis] = 0
else:
row[axis] = degree[axis]
else:
if interface.plus.ext == -1:
row[axis] = 0
else:
row[axis] = degree[axis]
if interface.minus.ext == interface.plus.ext:
pass
elif interface.minus.domain == corner1.domain:
if interface.minus.ext == -1:
col[axis] = degree[axis]
else:
col[axis] = -degree[axis]
else:
if interface.plus.ext == -1:
col[axis] = degree[axis]
else:
col[axis] = -degree[axis]
return row + col
# ===============================================================================
[docs]
def allocate_interface_matrix(corners, test_space, trial_space):
""" Allocate the interface matrix for a vertex shared by two patches
Parameters
----------
corners: <list>
The patch corners corresponding to the common shared vertex
test_space: <FemSpace>
The test space
trial_space: <FemSpace>
The trial space
Returns
-------
mat: <StencilInterfaceMatrix>
The interface matrix shared by two patches
"""
bi, bj = list(zip(*corners))
permutation = np.arange(bi[0].domain.dim)
flips = []
k = 0
while k < len(bi):
c1 = np.array(bi[k].coordinates)
c2 = np.array(bj[k].coordinates)[permutation]
flips.append(
np.array([-1 if d1 != d2 else 1 for d1, d2 in zip(c1, c2)]))
if np.sum(abs(flips[0] - flips[-1])) != 0:
prod = [f1 * f2 for f1, f2 in zip(flips[0], flips[-1])]
while -1 in prod:
i1 = prod.index(-1)
if -1 in prod[i1 + 1:]:
i2 = i1 + 1 + prod[i1 + 1:].index(-1)
prod = prod[i2 + 1:]
permutation[i1], permutation[i2] = permutation[i2], permutation[i1]
k = -1
flips = []
else:
break
k += 1
assert all(abs(flips[0] - i).sum() == 0 for i in flips)
cs = list(zip(*[i.coordinates for i in bi]))
axis = [all(i[0] == j for j in i) for i in cs].index(True)
ext = 1 if cs[axis][0] == 1 else -1
s = test_space.get_assembly_grids(
)[axis].spans[-1 if ext == 1 else 0] - test_space.degree[axis]
mat = StencilInterfaceMatrix(
trial_space.vector_space,
test_space.vector_space,
s,
s,
axis,
flip=flips[0],
permutation=list(permutation))
return mat
# ===============================================================================
# The following operators are not compatible with the changes in the Stencil format
# and their datatype does not allow for non-matching interfaces, but they might be
# useful for future implementations
# ===============================================================================
[docs]
class ConformingProjection_V0(FemLinearOperator):
"""
Conforming projection from global broken V0 space to conforming global V0 space
Defined by averaging of interface dofs
Parameters
----------
V0h: <FemSpace>
The discrete space
domain_h: <Geometry>
The discrete domain of the projector
hom_bc : <bool>
Apply homogenous boundary conditions if True
backend_language: <str>
The backend used to accelerate the code
storage_fn:
filename to store/load the operator sparse matrix
"""
# todo (MCP, 16.03.2021):
# - avoid discretizing a bilinear form
# - allow case without interfaces (single or multipatch)
def __init__(
self,
V0h,
domain_h,
hom_bc=False,
backend_language='python',
storage_fn=None):
FemLinearOperator.__init__(self, fem_domain=V0h)
V0 = V0h.symbolic_space
domain = V0.domain
self.symbolic_domain = domain
if storage_fn and os.path.exists(storage_fn):
print(
"[ConformingProjection_V0] loading operator sparse matrix from " +
storage_fn)
self._sparse_matrix = load_npz(storage_fn)
else:
# assemble the operator matrix
u, v = elements_of(V0, names='u, v')
expr = u * v # dot(u,v)
Interfaces = domain.interfaces # note: interfaces does not include the boundary
# this penalization is for an H1-conforming space
expr_I = (plus(u) - minus(u)) * (plus(v) - minus(v))
a = BilinearForm((u, v), integral(domain, expr) +
integral(Interfaces, expr_I))
# print('[[ forcing python backend for ConformingProjection_V0]] ')
# backend_language = 'python'
ah = discretize(
a, domain_h, [
V0h, V0h], backend=PSYDAC_BACKENDS[backend_language])
# self._A = ah.assemble()
self._A = ah.forms[0]._matrix
spaces = self._A.domain.spaces
if isinstance(Interfaces, Interface):
Interfaces = (Interfaces, )
for b1 in self._A.blocks:
for A in b1:
if A is None:
continue
A[:, :, :, :] = 0
indices = [slice(None, None)] * domain.dim + [0] * domain.dim
for i in range(len(self._A.blocks)):
self._A[i, i][tuple(indices)] = 1
for I in Interfaces:
axis = I.axis
i_minus = get_patch_index_from_face(domain, I.minus)
i_plus = get_patch_index_from_face(domain, I.plus)
sp_minus = spaces[i_minus]
sp_plus = spaces[i_plus]
s_minus = sp_minus.starts[axis]
e_minus = sp_minus.ends[axis]
s_plus = sp_plus.starts[axis]
e_plus = sp_plus.ends[axis]
d_minus = V0h.spaces[i_minus].degree[axis]
d_plus = V0h.spaces[i_plus].degree[axis]
indices = [slice(None, None)] * domain.dim + [0] * domain.dim
minus_ext = I.minus.ext
plus_ext = I.plus.ext
if minus_ext == 1:
indices[axis] = e_minus
else:
indices[axis] = s_minus
self._A[i_minus, i_minus][tuple(indices)] = 1 / 2
if plus_ext == 1:
indices[axis] = e_plus
else:
indices[axis] = s_plus
self._A[i_plus, i_plus][tuple(indices)] = 1 / 2
if plus_ext == minus_ext:
if minus_ext == 1:
indices[axis] = d_minus
else:
indices[axis] = s_minus
self._A[i_minus, i_plus][tuple(indices)] = 1 / 2
if plus_ext == 1:
indices[axis] = d_plus
else:
indices[axis] = s_plus
self._A[i_plus, i_minus][tuple(indices)] = 1 / 2
else:
if minus_ext == 1:
indices[axis] = d_minus
else:
indices[axis] = s_minus
if plus_ext == 1:
indices[domain.dim + axis] = d_plus
else:
indices[domain.dim + axis] = -d_plus
self._A[i_minus, i_plus][tuple(indices)] = 1 / 2
if plus_ext == 1:
indices[axis] = d_plus
else:
indices[axis] = s_plus
if minus_ext == 1:
indices[domain.dim + axis] = d_minus
else:
indices[domain.dim + axis] = -d_minus
self._A[i_plus, i_minus][tuple(indices)] = 1 / 2
domain = domain.logical_domain
corner_blocks = {}
for c in domain.corners:
for b1 in c.corners:
i = get_patch_index_from_face(domain, b1.domain)
for b2 in c.corners:
j = get_patch_index_from_face(domain, b2.domain)
if (i, j) in corner_blocks:
corner_blocks[i, j] += [(b1, b2)]
else:
corner_blocks[i, j] = [(b1, b2)]
for c in domain.corners:
if len(c) == 2:
continue
for b1 in c.corners:
i = get_patch_index_from_face(domain, b1.domain)
for b2 in c.corners:
j = get_patch_index_from_face(domain, b2.domain)
interface = get_interface_from_corners(b1, b2, domain)
axis = None
if self._A[i, j] is None:
self._A[i, j] = allocate_interface_matrix(
corner_blocks[i, j], V0h.spaces[i], V0h.spaces[j])
if i != j and self._A[i, j]:
axis = self._A[i, j]._dim
index = get_row_col_index(
b1, b2, interface, axis, V0h.spaces[i], V0h.spaces[j])
self._A[i, j][tuple(index)] = 1 / len(c)
if hom_bc:
for bn in domain.boundary:
self.set_homogenous_bc(bn)
self._matrix = self._A
self._sparse_matrix = self._matrix.tosparse() # self._sparse_matrix
if storage_fn:
print(
"[ConformingProjection_V0] storing operator sparse matrix in " +
storage_fn)
save_npz(storage_fn, self._sparse_matrix)
[docs]
def set_homogenous_bc(self, boundary, rhs=None):
domain = self.symbolic_domain
Vh = self.fem_domain
if domain.mapping:
domain = domain.logical_domain
if boundary.mapping:
boundary = boundary.logical_domain
corners = domain.corners
i = get_patch_index_from_face(domain, boundary)
if rhs:
apply_essential_bc_stencil(
rhs[i], axis=boundary.axis, ext=boundary.ext, order=0)
for j in range(len(domain)):
if self._A[i, j] is None:
continue
apply_essential_bc_stencil(
self._A[i, j], axis=boundary.axis, ext=boundary.ext, order=0)
for c in corners:
faces = [f for b in c.corners for f in b.boundaries]
if len(c) == 2:
continue
if boundary in faces:
for b1 in c.corners:
i = get_patch_index_from_face(domain, b1.domain)
for b2 in c.corners:
j = get_patch_index_from_face(domain, b2.domain)
interface = get_interface_from_corners(b1, b2, domain)
axis = None
if i != j:
axis = self._A[i, j].dim
index = get_row_col_index(
b1, b2, interface, axis, Vh.spaces[i], Vh.spaces[j])
self._A[i, j][tuple(index)] = 0.
if i == j and rhs:
rhs[i][tuple(index[:2])] = 0.
# ===============================================================================
[docs]
class ConformingProjection_V1(FemLinearOperator):
"""
Conforming projection from global broken V1 space to conforming V1 global space
proj.dot(v) returns the conforming projection of v, computed by solving linear system
Parameters
----------
V1h: <FemSpace>
The discrete space
domain_h: <Geometry>
The discrete domain of the projector
hom_bc : <bool>
Apply homogenous boundary conditions if True
backend_language: <str>
The backend used to accelerate the code
storage_fn:
filename to store/load the operator sparse matrix
"""
# todo (MCP, 16.03.2021):
# - avoid discretizing a bilinear form
# - allow case without interfaces (single or multipatch)
def __init__(
self,
V1h,
domain_h,
hom_bc=False,
backend_language='python',
storage_fn=None):
FemLinearOperator.__init__(self, fem_domain=V1h)
V1 = V1h.symbolic_space
domain = V1.domain
self.symbolic_domain = domain
if storage_fn and os.path.exists(storage_fn):
print(
"[ConformingProjection_V1] loading operator sparse matrix from " +
storage_fn)
self._sparse_matrix = load_npz(storage_fn)
else:
# assemble the operator matrix
u, v = elements_of(V1, names='u, v')
expr = dot(u, v)
#
Interfaces = domain.interfaces # note: interfaces does not include the boundary
# this penalization is for an H1-conforming space
expr_I = dot(plus(u) - minus(u), plus(v) - minus(v))
a = BilinearForm((u, v), integral(domain, expr) +
integral(Interfaces, expr_I))
# print('[[ forcing python backend for ConformingProjection_V1]] ')
# backend_language = 'python'
ah = discretize(
a, domain_h, [
V1h, V1h], backend=PSYDAC_BACKENDS[backend_language])
#
# # self._A = ah.assemble()
self._A = ah.forms[0]._matrix
# C1 = V1h.vector_space
# self._A = BlockLinearOperator(C1, C1)
for b1 in self._A.blocks:
for b2 in b1:
if b2 is None:
continue
for b3 in b2.blocks:
for A in b3:
if A is None:
continue
A[:, :, :, :] = 0
spaces = self._A.domain.spaces
if isinstance(Interfaces, Interface):
Interfaces = (Interfaces, )
indices = [slice(None, None)] * domain.dim + [0] * domain.dim
for i in range(len(self._A.blocks)):
self._A[i, i][0, 0][tuple(indices)] = 1
self._A[i, i][1, 1][tuple(indices)] = 1
# empty list if no interfaces ?
if Interfaces is not None:
for I in Interfaces:
i_minus = get_patch_index_from_face(domain, I.minus)
i_plus = get_patch_index_from_face(domain, I.plus)
indices = [slice(None, None)] * \
domain.dim + [0] * domain.dim
sp1 = spaces[i_minus]
sp2 = spaces[i_plus]
s11 = sp1.spaces[0].starts[I.axis]
e11 = sp1.spaces[0].ends[I.axis]
s12 = sp1.spaces[1].starts[I.axis]
e12 = sp1.spaces[1].ends[I.axis]
s21 = sp2.spaces[0].starts[I.axis]
e21 = sp2.spaces[0].ends[I.axis]
s22 = sp2.spaces[1].starts[I.axis]
e22 = sp2.spaces[1].ends[I.axis]
d11 = V1h.spaces[i_minus].spaces[0].degree[I.axis]
d12 = V1h.spaces[i_minus].spaces[1].degree[I.axis]
d21 = V1h.spaces[i_plus].spaces[0].degree[I.axis]
d22 = V1h.spaces[i_plus].spaces[1].degree[I.axis]
s_minus = [s11, s12]
e_minus = [e11, e12]
s_plus = [s21, s22]
e_plus = [e21, e22]
d_minus = [d11, d12]
d_plus = [d21, d22]
minus_ext = I.minus.ext
plus_ext = I.plus.ext
axis = I.axis
for k in range(domain.dim):
if k == I.axis:
continue
if minus_ext == 1:
indices[axis] = e_minus[k]
else:
indices[axis] = s_minus[k]
self._A[i_minus, i_minus][k, k][tuple(indices)] = 1 / 2
if plus_ext == 1:
indices[axis] = e_plus[k]
else:
indices[axis] = s_plus[k]
self._A[i_plus, i_plus][k, k][tuple(indices)] = 1 / 2
if plus_ext == minus_ext:
if minus_ext == 1:
indices[axis] = d_minus[k]
else:
indices[axis] = s_minus[k]
self._A[i_minus, i_plus][k, k][tuple(
indices)] = 1 / 2 * I.direction
if plus_ext == 1:
indices[axis] = d_plus[k]
else:
indices[axis] = s_plus[k]
self._A[i_plus, i_minus][k, k][tuple(
indices)] = 1 / 2 * I.direction
else:
if minus_ext == 1:
indices[axis] = d_minus[k]
else:
indices[axis] = s_minus[k]
if plus_ext == 1:
indices[domain.dim + axis] = d_plus[k]
else:
indices[domain.dim + axis] = -d_plus[k]
self._A[i_minus, i_plus][k, k][tuple(
indices)] = 1 / 2 * I.direction
if plus_ext == 1:
indices[axis] = d_plus[k]
else:
indices[axis] = s_plus[k]
if minus_ext == 1:
indices[domain.dim + axis] = d_minus[k]
else:
indices[domain.dim + axis] = -d_minus[k]
self._A[i_plus, i_minus][k, k][tuple(
indices)] = 1 / 2 * I.direction
if hom_bc:
for bn in domain.boundary:
self.set_homogenous_bc(bn)
self._matrix = self._A
self._sparse_matrix = self._matrix.tosparse()
if storage_fn:
print(
"[ConformingProjection_V1] storing operator sparse matrix in " +
storage_fn)
save_npz(storage_fn, self._sparse_matrix)
[docs]
def set_homogenous_bc(self, boundary):
domain = self.symbolic_domain
Vh = self.fem_domain
i = get_patch_index_from_face(domain, boundary)
axis = boundary.axis
ext = boundary.ext
for j in range(len(domain)):
if self._A[i, j] is None:
continue
apply_essential_bc_stencil(
self._A[i, j][1 - axis, 1 - axis], axis=axis, ext=ext, order=0)
# ===============================================================================
[docs]
def get_K0_and_K0_inv(V0h, uniform_patches=False):
"""
Compute the change of basis matrices K0 and K0^{-1} in V0h.
With
K0_ij = sigma^0_i(B_j) = B_jx(n_ix) * B_jy(n_iy)
where sigma_i is the geometric (interpolation) dof
and B_j is the tensor-product B-spline
"""
if uniform_patches:
print(' [[WARNING -- hack in get_K0_and_K0_inv: using copies of 1st-patch matrices in every patch ]] ')
V0 = V0h.symbolic_space # VOh is ProductFemSpace
domain = V0.domain
K0_blocks = []
K0_inv_blocks = []
for k, D in enumerate(domain.interior):
if uniform_patches and k > 0:
K0_k = K0_blocks[0].copy()
K0_inv_k = K0_inv_blocks[0].copy()
else:
V0_k = V0h.spaces[k] # fem space on patch k: (TensorFemSpace)
K0_k_factors = [None, None]
for d in [0, 1]:
# 1d fem space alond dim d (SplineSpace)
V0_kd = V0_k.spaces[d]
K0_k_factors[d] = collocation_matrix(
knots=V0_kd.knots,
degree=V0_kd.degree,
periodic=V0_kd.periodic,
normalization=V0_kd.basis,
xgrid=V0_kd.greville
)
K0_k = kron(*K0_k_factors)
K0_k.eliminate_zeros()
K0_inv_k = inv(K0_k.tocsc())
K0_inv_k.eliminate_zeros()
K0_blocks.append(K0_k)
K0_inv_blocks.append(K0_inv_k)
K0 = block_diag(K0_blocks)
K0_inv = block_diag(K0_inv_blocks)
return K0, K0_inv
# ===============================================================================
[docs]
def get_K1_and_K1_inv(V1h, uniform_patches=False):
"""
Compute the change of basis matrices K1 and K1^{-1} in Hcurl space V1h.
With
K1_ij = sigma^1_i(B_j) = int_{e_ix}(M_jx) * B_jy(n_iy)
if i = horizontal edge [e_ix, n_iy] and j = (M_jx o B_jy) x-oriented MoB spline
or
= B_jx(n_ix) * int_{e_iy}(M_jy)
if i = vertical edge [n_ix, e_iy] and j = (B_jx o M_jy) y-oriented BoM spline
(above, 'o' denotes tensor-product for functions)
"""
if uniform_patches:
print(' [[WARNING -- hack in get_K1_and_K1_inv: using copies of 1st-patch matrices in every patch ]] ')
V1 = V1h.symbolic_space # V1h is ProductFemSpace
domain = V1.domain
K1_blocks = []
K1_inv_blocks = []
for k, D in enumerate(domain.interior):
if uniform_patches and k > 0:
K1_k = K1_blocks[0].copy()
K1_inv_k = K1_inv_blocks[0].copy()
else:
# fem space on patch k: (ProductFemSpace (of TensorFemSpace (s))
V1_k = V1h.spaces[k]
K1_k_blocks = []
for c in [0, 1]: # dim of component
# fem space for comp. dc (TensorFemSpace)
V1_kc = V1_k.spaces[c]
K1_kc_factors = [None, None]
for d in [0, 1]: # dim of variable
# 1d fem space for comp c alond dim d (SplineSpace)
V1_kcd = V1_kc.spaces[d]
if c == d:
K1_kc_factors[d] = histopolation_matrix(
knots=V1_kcd.knots,
degree=V1_kcd.degree,
periodic=V1_kcd.periodic,
normalization=V1_kcd.basis,
xgrid=V1_kcd.ext_greville
)
else:
K1_kc_factors[d] = collocation_matrix(
knots=V1_kcd.knots,
degree=V1_kcd.degree,
periodic=V1_kcd.periodic,
normalization=V1_kcd.basis,
xgrid=V1_kcd.greville
)
K1_kc = kron(*K1_kc_factors)
K1_kc.eliminate_zeros()
K1_k_blocks.append(K1_kc)
K1_k = block_diag(K1_k_blocks)
K1_k.eliminate_zeros()
K1_inv_k = inv(K1_k.tocsc())
K1_inv_k.eliminate_zeros()
K1_blocks.append(K1_k)
K1_inv_blocks.append(K1_inv_k)
K1 = block_diag(K1_blocks)
K1_inv = block_diag(K1_inv_blocks)
return K1, K1_inv
# #===============================================================================
# def get_M_and_M_inv(Vh, subdomains_h, is_scalar, backend_language='python'):
# """
# compute the mass matrix M and M^{-1} in multipatch space Vh
# DOES NOT WORK -- SHOULD WE HAVE THE POSSIBILITY OF DOING THAT ?
# """
# from pprint import pprint
#
# V = Vh.symbolic_space # VOh is ProductFemSpace
# domain = V.domain
# M_blocks = []
# M_inv_blocks = []
#
# # print('type(domain_h) = ', type(domain_h))
# #
# # print('type(domain_h._patches) = ', type(domain_h._patches))
# # print('len(domain_h._patches) = ', len(domain_h._patches))
# #
# # mappings = domain_h.mappings
# # print('type(mappings) = ', type(mappings))
# # print('len(mappings) = ', len(mappings))
# #
# # mappings_list = list(mappings.values())
# # print('len(mappings_list) = ', len(mappings_list))
# #
# # print('type(mappings_list[0]) = ', type(mappings_list[0]))
#
# for k, Dh_k in enumerate(subdomains_h):
#
# print('k = ', k)
# print('type(Dh_k) = ', type(Dh_k))
# # print('Dh = ', Dh)
# D_k = domain.interior[k]
#
# # exit()
#
# # for k, D in enumerate(domain.interior):
#
# V_k = V.spaces[k]
# Vh_k = Vh.spaces[k]
#
# # print(type(domain_h))
# #
# # pprint(dir(domain_h))
# #
# #
# # print(len(domain_h._patches))
# # exit()
# # Dh_k = domain_h.spaces[k] # fem space on patch k: (TensorFemSpace)
# u, v = elements_of(V_k, names='u, v')
# if is_scalar:
# expr = u*v
# else:
# expr = dot(u,v)
# a_k = BilinearForm((u,v), integral(D_k, expr))
# a_kh = discretize(a_k, Dh_k, [Vh_k, Vh_k], backend=PSYDAC_BACKENDS[backend_language]) # 'pyccel-gcc'])
#
# M_k = a_kh.assemble().toarray()
# M_k.eliminate_zeros()
# M_inv_k = inv(M_k.tocsc())
# M_inv_k.eliminate_zeros()
#
# M_blocks.append(M_k)
# M_inv_blocks.append(M_inv_k)
# M = block_diag(M_blocks)
# M_inv = block_diag(M_inv_blocks)
# return M, M_inv
# ===============================================================================
[docs]
class HodgeOperator(FemLinearOperator):
"""
Change of basis operator: dual basis -> primal basis
self._matrix: matrix of the primal Hodge = this is the mass matrix !
self.dual_Hodge_matrix: this is the INVERSE mass matrix
Parameters
----------
Vh: <FemSpace>
The discrete space
domain_h: <Geometry>
The discrete domain of the projector
metric : <str>
the metric of the de Rham complex
backend_language: <str>
The backend used to accelerate the code
load_dir: <str>
storage files for the primal and dual Hodge sparse matrice
load_space_index: <str>
the space index in the derham sequence
Notes
-----
Either we use a storage, or these matrices are only computed on demand
# todo: we compute the sparse matrix when to_sparse_matrix is called -- but never the stencil matrix (should be fixed...)
We only support the identity metric, this implies that the dual Hodge is the inverse of the primal one.
# todo: allow for non-identity metrics
"""
def __init__(
self,
Vh,
domain_h,
metric='identity',
backend_language='python',
load_dir=None,
load_space_index=''):
FemLinearOperator.__init__(self, fem_domain=Vh)
self._domain_h = domain_h
self._backend_language = backend_language
self._dual_Hodge_sparse_matrix = None
assert metric == 'identity'
self._metric = metric
if load_dir and isinstance(load_dir, str):
if not os.path.exists(load_dir):
os.makedirs(load_dir)
assert str(load_space_index) in ['0', '1', '2', '3']
primal_Hodge_storage_fn = load_dir + \
'/H{}_m.npz'.format(load_space_index)
dual_Hodge_storage_fn = load_dir + \
'/dH{}_m.npz'.format(load_space_index)
primal_Hodge_is_stored = os.path.exists(primal_Hodge_storage_fn)
dual_Hodge_is_stored = os.path.exists(dual_Hodge_storage_fn)
if dual_Hodge_is_stored:
assert primal_Hodge_is_stored
print(
" ... loading dual Hodge sparse matrix from " +
dual_Hodge_storage_fn)
self._dual_Hodge_sparse_matrix = load_npz(
dual_Hodge_storage_fn)
print(
"[HodgeOperator] loading primal Hodge sparse matrix from " +
primal_Hodge_storage_fn)
self._sparse_matrix = load_npz(primal_Hodge_storage_fn)
else:
assert not primal_Hodge_is_stored
print(
"[HodgeOperator] assembling both sparse matrices for storage...")
self.assemble_primal_Hodge_matrix()
print(
"[HodgeOperator] storing primal Hodge sparse matrix in " +
primal_Hodge_storage_fn)
save_npz(primal_Hodge_storage_fn, self._sparse_matrix)
self.assemble_dual_Hodge_matrix()
print(
"[HodgeOperator] storing dual Hodge sparse matrix in " +
dual_Hodge_storage_fn)
save_npz(dual_Hodge_storage_fn, self._dual_Hodge_sparse_matrix)
else:
# matrices are not stored, we will probably compute them later
pass
[docs]
def to_sparse_matrix(self):
"""
the Hodge matrix is the patch-wise multi-patch mass matrix
it is not stored by default but assembled on demand
"""
if (self._sparse_matrix is not None) or (self._matrix is not None):
return FemLinearOperator.to_sparse_matrix(self)
self.assemble_primal_Hodge_matrix()
return self._sparse_matrix
[docs]
def assemble_primal_Hodge_matrix(self):
"""
the Hodge matrix is the patch-wise multi-patch mass matrix
it is not stored by default but assembled on demand
"""
if self._matrix is None:
Vh = self.fem_domain
assert Vh == self.fem_codomain
V = Vh.symbolic_space
domain = V.domain
# domain_h = V0h.domain: would be nice...
u, v = elements_of(V, names='u, v')
if isinstance(u, ScalarFunction):
expr = u * v
else:
expr = dot(u, v)
a = BilinearForm((u, v), integral(domain, expr))
ah = discretize(a, self._domain_h, [
Vh, Vh], backend=PSYDAC_BACKENDS[self._backend_language])
self._matrix = ah.assemble() # Mass matrix in stencil format
self._sparse_matrix = self._matrix.tosparse()
[docs]
def get_dual_Hodge_sparse_matrix(self):
if self._dual_Hodge_sparse_matrix is None:
self.assemble_dual_Hodge_matrix()
return self._dual_Hodge_sparse_matrix
[docs]
def assemble_dual_Hodge_matrix(self):
"""
the dual Hodge matrix is the patch-wise inverse of the multi-patch mass matrix
it is not stored by default but computed on demand, by local (patch-wise) inversion of the mass matrix
"""
if self._dual_Hodge_sparse_matrix is None:
if not self._matrix:
self.assemble_primal_Hodge_matrix()
M = self._matrix # mass matrix of the (primal) basis
nrows = M.n_block_rows
ncols = M.n_block_cols
inv_M_blocks = []
for i in range(nrows):
Mii = M[i, i].tosparse()
inv_Mii = inv(Mii.tocsc())
inv_Mii.eliminate_zeros()
inv_M_blocks.append(inv_Mii)
inv_M = block_diag(inv_M_blocks)
self._dual_Hodge_sparse_matrix = inv_M
# ==============================================================================
[docs]
class BrokenGradient_2D(FemLinearOperator):
def __init__(self, V0h, V1h):
FemLinearOperator.__init__(self, fem_domain=V0h, fem_codomain=V1h)
D0s = [Gradient_2D(V0, V1) for V0, V1 in zip(V0h.spaces, V1h.spaces)]
self._matrix = BlockLinearOperator(self.domain, self.codomain, blocks={
(i, i): D0i._matrix for i, D0i in enumerate(D0s)})
[docs]
def transpose(self, conjugate=False):
# todo (MCP): define as the dual differential operator
return BrokenTransposedGradient_2D(self.fem_domain, self.fem_codomain)
# ==============================================================================
[docs]
class BrokenTransposedGradient_2D(FemLinearOperator):
def __init__(self, V0h, V1h):
FemLinearOperator.__init__(self, fem_domain=V1h, fem_codomain=V0h)
D0s = [Gradient_2D(V0, V1) for V0, V1 in zip(V0h.spaces, V1h.spaces)]
self._matrix = BlockLinearOperator(self.domain, self.codomain, blocks={
(i, i): D0i._matrix.T for i, D0i in enumerate(D0s)})
[docs]
def transpose(self, conjugate=False):
# todo (MCP): discard
return BrokenGradient_2D(self.fem_codomain, self.fem_domain)
# ==============================================================================
[docs]
class BrokenScalarCurl_2D(FemLinearOperator):
def __init__(self, V1h, V2h):
FemLinearOperator.__init__(self, fem_domain=V1h, fem_codomain=V2h)
D1s = [ScalarCurl_2D(V1, V2) for V1, V2 in zip(V1h.spaces, V2h.spaces)]
self._matrix = BlockLinearOperator(self.domain, self.codomain, blocks={
(i, i): D1i._matrix for i, D1i in enumerate(D1s)})
[docs]
def transpose(self, conjugate=False):
return BrokenTransposedScalarCurl_2D(
V1h=self.fem_domain, V2h=self.fem_codomain)
# ==============================================================================
[docs]
class BrokenTransposedScalarCurl_2D(FemLinearOperator):
def __init__(self, V1h, V2h):
FemLinearOperator.__init__(self, fem_domain=V2h, fem_codomain=V1h)
D1s = [ScalarCurl_2D(V1, V2) for V1, V2 in zip(V1h.spaces, V2h.spaces)]
self._matrix = BlockLinearOperator(self.domain, self.codomain, blocks={
(i, i): D1i._matrix.T for i, D1i in enumerate(D1s)})
[docs]
def transpose(self, conjugate=False):
return BrokenScalarCurl_2D(V1h=self.fem_codomain, V2h=self.fem_domain)
# ==============================================================================
# def multipatch_Moments_Hcurl(f, V1h, domain_h):
[docs]
def ortho_proj_Hcurl(EE, V1h, domain_h, M1, backend_language='python'):
"""
return orthogonal projection of E on V1h, given M1 the mass matrix
"""
assert isinstance(EE, Tuple)
V1 = V1h.symbolic_space
v = element_of(V1, name='v')
l = LinearForm(v, integral(V1.domain, dot(v, EE)))
lh = discretize(
l,
domain_h,
V1h,
backend=PSYDAC_BACKENDS[backend_language])
b = lh.assemble()
M1_inv = inverse(M1.mat(), 'pcg', pc='jacobi', tol=1e-10)
sol_coeffs = M1_inv @ b
return FemField(V1h, coeffs=sol_coeffs)
# ==============================================================================
[docs]
class Multipatch_Projector_H1:
"""
to apply the H1 projection (2D) on every patch
"""
def __init__(self, V0h):
self._P0s = [Projector_H1(V) for V in V0h.spaces]
self._V0h = V0h # multipatch Fem Space
def __call__(self, funs_log):
"""
project a list of functions given in the logical domain
"""
u0s = [P(fun) for P, fun, in zip(self._P0s, funs_log)]
u0_coeffs = BlockVector(self._V0h.vector_space,
blocks=[u0j.coeffs for u0j in u0s])
return FemField(self._V0h, coeffs=u0_coeffs)
# ==============================================================================
[docs]
class Multipatch_Projector_Hcurl:
"""
to apply the Hcurl projection (2D) on every patch
"""
def __init__(self, V1h, nquads=None):
self._P1s = [Projector_Hcurl(V, nquads=nquads) for V in V1h.spaces]
self._V1h = V1h # multipatch Fem Space
def __call__(self, funs_log):
"""
project a list of functions given in the logical domain
"""
E1s = [P(fun) for P, fun, in zip(self._P1s, funs_log)]
E1_coeffs = BlockVector(self._V1h.vector_space,
blocks=[E1j.coeffs for E1j in E1s])
return FemField(self._V1h, coeffs=E1_coeffs)
# ==============================================================================
[docs]
class Multipatch_Projector_L2:
"""
to apply the L2 projection (2D) on every patch
"""
def __init__(self, V2h, nquads=None):
self._P2s = [Projector_L2(V, nquads=nquads) for V in V2h.spaces]
self._V2h = V2h # multipatch Fem Space
def __call__(self, funs_log):
"""
project a list of functions given in the logical domain
"""
B2s = [P(fun) for P, fun, in zip(self._P2s, funs_log)]
B2_coeffs = BlockVector(self._V2h.vector_space,
blocks=[B2j.coeffs for B2j in B2s])
return FemField(self._V2h, coeffs=B2_coeffs)