"""
This module implements the wind generation functionality forward facing API
"""
import pickle
from math import ceil
from os import PathLike
from pathlib import Path
from typing import Optional, Union
import numpy as np
import torch
from ..common import CPU_Unpickler
from ..spectra_fitting import CalibrationProblem
from .covariance_kernels import MannCovariance, VonKarmanCovariance
from .gaussian_random_fields import VectorGaussianRandomField
from .nn_covariance import NNCovariance
[docs]
class GenerateFluctuationField:
r"""
.. _generate-fluctuation-field-reference:
Class for generating a fluctuation field either from a Mann model or a pre-fit DRD model that generates the field spectra.
Turbulent fluctuations can be formally written as a convolution of a covariance kernel with Gaussian noise :math:`\boldsymbol{\xi}` in the physical domain:
.. math::
\mathbf{u}=\mathcal{F}^{-1} \mathcal{G} \widehat{\boldsymbol{\xi}}=\mathcal{F}^{-1} \mathcal{G} \mathcal{F} \boldsymbol{\xi},
where :math:`\mathcal{F}` is the Fourier transform and the operator :math:`\mathcal{G}` is the point-wise multiplication by :math:`G(\boldsymbol{k})`, which is any positive-definite "square-root" of the spectral tensor and satisfies :math:`G(\boldsymbol{k}) G^*(\boldsymbol{k})=\Phi(\boldsymbol{k})`.
This is determined by which :math:`\Phi(\boldsymbol{k}, \tau(\boldsymbol{k}))` is used. The following are provided:
#. Mann, which utilizes the Mann eddy lifetime function
.. math::
\tau^{\mathrm{IEC}}(k)=\frac{T B^{-1}(k L)^{-\frac{2}{3}}}{\sqrt{{ }_2 F_1\left(1 / 3,17 / 6 ; 4 / 3 ;-(k L)^{-2}\right)}}
and the full spectral tensor can be found in the following reference:
J. Mann, “The spatial structure of neutral atmospheric surfacelayer turbulence,” Journal of fluid mechanics 273, 141-168 (1994)
#. DRD model, which utilizes a learned eddy lifetime function and requires a pre-trained DRD model. The eddy lifetime function is given by
.. math::
\tau(\boldsymbol{k})=\frac{T|\boldsymbol{a}|^{\nu-\frac{2}{3}}}{\left(1+|\boldsymbol{a}|^2\right)^{\nu / 2}}, \quad \boldsymbol{a}=\boldsymbol{a}(\boldsymbol{k})
#. Von Karman model,
.. math::
\Phi_{i j}^{\mathrm{VK}}(\boldsymbol{k})=\frac{E(k)}{4 \pi k^2}\left(\delta_{i j}-\frac{k_i k_j}{k^2}\right)
which utilizes the energy spectrum function
.. math::
E(k)=c_0^2 \varepsilon^{2 / 3} k^{-5 / 3}\left(\frac{k L}{\left(1+(k L)^2\right)^{1 / 2}}\right)^{17 / 3},
where :math:`\varepsilon` is the viscous dissipation of the turbulent kinetic energy, :math:`L` is the length scale parameter and :math:`c_0^2 \approx 1.7` is an empirical constant.
"""
def __init__(
self,
friction_velocity: float,
reference_height: float,
grid_dimensions: np.ndarray,
grid_levels: np.ndarray,
model: str,
length_scale: Optional[float] = None,
time_scale: Optional[float] = None,
energy_spectrum_scale: Optional[float] = None,
path_to_parameters: Optional[Union[str, PathLike]] = None,
seed: int = None,
blend_num=10,
):
r"""
Parameters
----------
friction_velocity : float
The reference wind friction velocity :math:`u_*`
reference_height : float
Reference height :math:`L`
grid_dimensions : np.ndarray
Numpy array denoting the grid size; the real dimensions of the domain of interest.
grid_levels : np.ndarray
Numpy array denoting the grid levels; number of discretization points used in each dimension, which evaluates as 2^k for each dimension for FFT-based sampling methods.
model : str
One of ``"DRD"``, ``"VK"``, or ``"Mann"`` denoting
"Neural Network," "Von Karman," and "Mann model".
length_scale : Optional[float]
The length scale :math:`L:`, used only if non-DRD model is used. By default, None.
time_scale : Optional[float]
The time scale :math:`T`, used only if non-DRD model is used. By default, None.
energy_spectrum_scale : Optional[float]
Scaling of energy spectrum, used only if non-DRD model is used. By default, None.
path_to_parameters : Union[str, PathLike]
File path (string or ``Pathlib.Path()``)
seed : int, optional
Pseudo-random number generator seed, by default None. See ``np.random.RandomState``.
blend_num : int, optional
Number of grid points in the y-z plane to use as buffer regions between successive blocks of fluctuation; see figures 7 and 8 of the original DRD paper, by default 10. Note that at the boundary of each block, points are often correlated, so if the resulting field has undesirably high correlation, increasing this number may mitigate some of these effects.
Raises
------
ValueError
If ``model`` doesn't match one of the 3 available models: DRD, VK and Mann.
"""
if model not in ["DRD", "VK", "Mann"]:
raise ValueError(
"Provided model type not supported, must be one of DRD, VK, Mann"
)
if model == "DRD" and path_to_parameters is None:
raise ValueError(
"Please provide the path to saved pre-trained DRD model, or else choose a different model type."
)
if model in ["VK", "Mann"] and any(
[length_scale is None, time_scale is None, energy_spectrum_scale is None]
):
raise ValueError(
"Must provide all physical scalar quantities (length, time, energy spectrum scales) to use current model type."
)
if model == "DRD":
device = "cuda" if torch.cuda.is_available() else "cpu"
# safely load saved parameters with reference to Tensor device(s)
with open(path_to_parameters, "rb") as file:
(nn_params, prob_params, loss_params, phys_params, model_params,) = (
pickle.load(file) if device == "cpu" else CPU_Unpickler(file).load()
)
pb = CalibrationProblem(
nn_params=nn_params,
prob_params=prob_params,
loss_params=loss_params,
phys_params=phys_params,
device=device,
)
pb.parameters = model_params
L, T, M = pb.OPS.exp_scales()
M = (4 * np.pi) * L ** (-5 / 3) * M
print("Scales: ", [L, T, M])
E0 = M * friction_velocity**2 * reference_height ** (-2 / 3)
L = L * reference_height
Gamma = T
else:
E0 = (
energy_spectrum_scale
* friction_velocity**2
* reference_height ** (-2 / 3)
)
L = length_scale
Gamma = time_scale
# define margins and buffer
time_buffer = 3 * Gamma * L
spatial_margin = 1 * L
try:
grid_levels = [grid_levels[i].GetInt() for i in range(3)]
except:
pass
Nx = 2 ** grid_levels[0] + 1
Ny = 2 ** grid_levels[1] + 1
Nz = 2 ** grid_levels[2] + 1
hx = grid_dimensions[0] / Nx
hy = grid_dimensions[1] / Ny
hz = grid_dimensions[2] / Nz
n_buffer = ceil(time_buffer / hx)
n_marginy = ceil(spatial_margin / hy)
n_marginz = ceil(spatial_margin / hz)
wind_shape = [0] + [Ny] + [Nz] + [3]
if blend_num > 0:
noise_shape = (
[Nx + 2 * n_buffer + (blend_num - 1)]
+ [Ny + 2 * n_marginy]
+ [Nz + 2 * n_marginz]
+ [3]
)
else:
noise_shape = (
[Nx + 2 * n_buffer] + [Ny + 2 * n_marginy] + [Nz + 2 * n_marginz] + [3]
)
new_part_shape = [Nx] + [Ny + 2 * n_marginy] + [Nz + 2 * n_marginz] + [3]
central_part = [
slice(None, None),
] * 4
new_part = central_part.copy()
central_part[0] = slice(n_buffer, -n_buffer)
central_part[1] = slice(n_marginy, -n_marginy)
central_part[2] = slice(0, -2 * n_marginz)
if blend_num > 0:
new_part[0] = slice(2 * n_buffer + (blend_num - 1), None)
else:
new_part[0] = slice(2 * n_buffer, None)
self.grid_dimensions = grid_dimensions
self.grid_levels = grid_levels
self.new_part = new_part
self.Nx = Nx
self.blend_num = blend_num
self.central_part = central_part
self.new_part_shape = new_part_shape
self.noise_shape = noise_shape
self.n_buffer = n_buffer
self.n_marginy = n_marginy
self.n_marginz = n_marginz
self.seed = seed
self.noise = None
self.total_fluctuation = np.zeros(wind_shape)
self.log_law = (
lambda z, z0, zref, uref: uref * np.log(z / z0 + 1.0) / np.log(zref / z0)
)
self.power_law = lambda z, zref, Uref, a: Uref * (z / zref) ** a
### Random field object
if model == "VK":
self.Covariance = VonKarmanCovariance(ndim=3, length_scale=L, E0=E0)
self.RF = VectorGaussianRandomField(
ndim=3,
grid_level=grid_levels,
grid_dimensions=grid_dimensions,
sampling_method="vf_fftw",
grid_shape=self.noise_shape[:-1],
Covariance=self.Covariance,
)
elif model == "Mann":
self.Covariance = MannCovariance(ndim=3, length_scale=L, E0=E0, Gamma=Gamma)
self.RF = VectorGaussianRandomField(
ndim=3,
grid_level=grid_levels,
grid_dimensions=grid_dimensions,
sampling_method="vf_fftw",
grid_shape=self.noise_shape[:-1],
Covariance=self.Covariance,
)
elif model == "DRD":
self.Covariance = NNCovariance(
ndim=3,
length_scale=L,
E0=E0,
Gamma=Gamma,
ops=pb.OPS,
h_ref=reference_height,
)
self.RF = VectorGaussianRandomField(
ndim=3,
grid_level=grid_levels,
grid_dimensions=grid_dimensions,
sampling_method="vf_fftw",
grid_shape=self.noise_shape[:-1],
Covariance=self.Covariance,
)
self.RF.reseed(self.seed)
def _generate_block(self) -> np.ndarray:
"""Generates a single block of the fluctuation field.
Returns
-------
np.ndarray
A single block of the fluctuation field, to be concatenated with the total field.
"""
if self.noise is None:
noise = self.RF.sample_noise(self.noise_shape)
else:
noise = np.roll(self.noise, -self.Nx, axis=0)
noise[tuple(self.new_part)] = self.RF.sample_noise(self.new_part_shape)
self.noise = noise
wind_block = self.RF.sample(noise)
wind = wind_block[tuple(self.central_part)]
if self.blend_num > 0:
self.blend_region = wind[-self.blend_num :, ...].copy()
else:
self.blend_region = None
if self.blend_num > 1:
wind = wind[: -(self.blend_num - 1), ...]
return wind
def _normalize_block(
self,
curr_block: np.ndarray,
zref: float,
uref: float,
z0: float,
windprofiletype: str,
plexp: Optional[float] = None,
) -> np.ndarray:
r"""Normalizes an individual block of wind under the given profile and physical parameters.
Parameters
----------
curr_block : np.ndarray
_description_
zref : float
Reference height.
uref : float
Reference velocity.
z0 : float
Roughness height.
windprofiletype : str
Type of wind profile by which to normalize, either ``"LOG"`` for logarithmic scaling or ``"PL"`` for power law scaling: for ``"LOG"``,
.. math::
\left\langle U_1(z)\right\rangle= U_{\text{ref}} \frac{\ln \left( \frac{z}{z_0} + 1 \right)}{\ln \left( \frac{z_{\text{ref}}}{z_0} \right)}
or for ``"PL"``,
.. math::
\left\langle U_1(z)\right\rangle= u_* \left( \frac{z}{z_{\text{ref}}} \right)^\alpha
where :math:`u_*` is the friction velocity and :math:`z_{\text{ref}}` is the reference height.
plexp : Optional[float], optional
Power law exponent :math:`\alpha`, by default None.
Returns
-------
np.ndarray
Fluctuation field normalized by the logarithmic profile.
Raises
------
ValueError
"No fluctuation field has been generated, call the .generate() method first."
"""
if not np.any(curr_block):
raise ValueError(
"No fluctuation field has been generated, call the .generate() method first."
)
sd = np.sqrt(np.mean(curr_block**2))
curr_block /= sd
z_space = np.linspace(
0.0, self.grid_dimensions[2], 2 ** (self.grid_levels[2]) + 1
)
if windprofiletype == "LOG":
mean_profile_z = self.log_law(z_space, z0, zref, uref)
else:
mean_profile_z = self.power_law(z_space, zref, uref, plexp)
mean_profile = np.zeros_like(curr_block)
mean_profile[..., 0] = np.tile(
mean_profile_z.T, (mean_profile.shape[0], mean_profile.shape[1], 1)
)
return curr_block + mean_profile
[docs]
def generate(
self,
num_blocks: int,
zref: float,
uref: float,
z0: float,
windprofiletype: str,
plexp: Optional[float] = None,
) -> np.ndarray:
r"""Generates the full fluctuation field in blocks. The resulting field is stored as the ``total_fluctuation`` field of this object, allowing for all metadata of the object to be stored safely with the fluctuation field, and also reducing data duplication for post-processing; all operations can be performed on this public variable.
.. warning::
If this method is called twice in the same object, additional fluctuation field blocks will be appended to the field generated from the first call. If this is undesirable behavior, instantiate a new object.
Parameters
----------
num_blocks : int
Number of blocks to use in fluctuation field generation.
zref : float
Reference height.
uref : float
Reference velocity.
z0 : float
Roughness height.
windprofiletype : str
Type of wind profile by which to normalize, either ``"LOG"`` for logarithmic scaling or ``"PL"`` for power law scaling: for ``"LOG"``,
.. math::
\left\langle U_1(z)\right\rangle=\frac{u_*}{\kappa} \ln \left(\frac{z}{z_0}+1\right)
or for ``"PL"``,
.. math::
\left\langle U_1(z)\right\rangle= u_* \left( \frac{z}{z_{\text{ref}}} \right)^\alpha
where :math:`u_*` is the friction velocity and :math:`z_{\text{ref}}` is the reference height.
plexp : Optional[float], optional
Power law exponent :math:`\alpha`, by default None.
Returns
-------
np.ndarray
The full fluctuation field, which is also stored as the ``total_fluctuation`` field.
"""
if np.any(self.total_fluctuation):
import warnings
warnings.warn(
"Fluctuation field has already been generated, additional blocks will be appended to existing field. If this is undesirable behavior, instantiate a new object."
)
for _ in range(num_blocks):
t_block = self._generate_block()
normed_block = self._normalize_block(
curr_block=t_block,
zref=zref,
uref=uref,
z0=z0,
windprofiletype=windprofiletype,
plexp=plexp,
)
self.total_fluctuation = np.concatenate(
(self.total_fluctuation, normed_block), axis=0
)
return self.total_fluctuation
[docs]
def save_to_vtk(self, filepath: Union[str, Path] = "./"):
"""Saves generated fluctuation field in VTK format to specified filepath.
Parameters
----------
filepath : Union[str, Path]
Filepath to which to save generated fluctuation field.
"""
from pyevtk.hl import imageToVTK
spacing = tuple(self.grid_dimensions / (2.0**self.grid_levels + 1))
wind_field_vtk = tuple(
[np.copy(self.total_fluctuation[..., i], order="C") for i in range(3)]
)
cellData = {
"grid": np.zeros_like(self.total_fluctuation[..., 0]),
"wind": wind_field_vtk,
}
imageToVTK(filepath, cellData=cellData, spacing=spacing)
[docs]
def evaluate_divergence(
self, spacing: Union[tuple, np.ndarray], field: Optional[np.ndarray] = None
) -> np.ndarray:
r"""Evaluates the point-wise divergence of a generated fluctuation vector (!) field on a given grid. The underlying method is numpy's ``gradient`` function, which is computed with second-order central difference methods.
.. note::
If the generated field has been normalized with ``.normalize()``, it must be passed into this method as the ``field`` argument. The default evaluation of this method is on the ``total_fluctuation`` attribute of this object.
This method will approximate
.. math::
\operatorname{div} \boldsymbol{F} = \frac{\partial \boldsymbol{F}_x}{\partial x} + \frac{\partial \boldsymbol{F}_y}{\partial y} + \frac{\partial \boldsymbol{F}_z}{\partial z}.
Note that the vector field is assumed to be 3D.
Parameters
----------
spacing : Union[tuple, np.ndarray]
The spacing of the grid on which the fluctuation field has been generated. This is necessary for derivatives to be computed properly.
field : Optional[np.ndarray], optional
The fluctuation field containing all field components, of the shape :math:`(x, y, z, 3)`, by default None, which evaluates the divergence of the non-normalized field stored in ``total_fluctuation``.
Returns
-------
np.ndarray
Point-wise divergence of the vector field, this will be of shape (x, y, z). To gather further information about the divergence, consider using ``.max()``, ``.sum()`` or ``.mean()`` to determine the maximum, total, or average point-wise divergence of the generated field.
Raises
------
ValueError
Spacing must contain 3 scalars determining the spacing of evaluation points of the field for each dimension.
ValueError
Last dimension of vector field must be 3, consider reshaping your vector field.
"""
if len(spacing) != 3:
raise ValueError(
"Spacing must contain 3 scalars determining the spacing of evaluation points of the field for each dimension."
)
if field is None:
field = self.total_fluctuation
if field.shape[-1] != 3:
raise ValueError(
"Last dimension of vector field must be 3, consider reshaping your vector field."
)
return np.ufunc.reduce(
np.add, [np.gradient(field[..., i], spacing[i], axis=i) for i in range(3)]
)