Source code for drdmannturb.fluctuation_generation.sampling_methods

"""
This module implements and exposes the various sampling methods required
"""

import os
from math import *

import numpy as np
import scipy.fftpack as fft

METHOD_DST = "dst"
METHOD_DCT = "dct"
METHOD_FFT = "fft"
METHOD_FFTW = "fftw"
METHOD_VF_FFTW = "vf_fftw"


[docs] class Sampling_method_base: """Meta class for different sampling methods. Each of these requires a ``RandomField`` object, which is a subclass of :py:class:``GaussianRandomField``.""" def __init__(self, RandomField): """ Parameters ---------- RandomField : GaussianRandomField The random field from which to sample from. This object also determines all of the physical quantities and domain partitioning. """ self.L, self.Nd, self.ndim = ( RandomField.L, RandomField.ext_grid_shape, RandomField.ndim, ) self.DomainSlice = RandomField.DomainSlice
[docs] class Sampling_method_freq(Sampling_method_base): """Sampling method specifically in the frequency domain. This metaclass involves a single precomputation of the covariance spectrum of the underlying ``GaussianRandomField``. Refer to specific subclasses for details on what each of these entails, but generally, the approximate square-root of each associated spectral tensor is computed and transformed into the frequency domain. The norm of the transform is defined as the square-root of the length-scale. """ def __init__(self, RandomField): super().__init__(RandomField) L, Nd, d = self.L, self.Nd, self.ndim self.Frequencies = [ (2 * pi / L[j]) * (Nd[j] * fft.fftfreq(Nd[j])) for j in range(d) ] self.TransformNorm = np.sqrt(L.prod()) self.Spectrum = RandomField.Covariance.precompute_Spectrum(self.Frequencies)
####################################################################################################### # Fourier Transform (FFTW) ####################################################################################################### ### - Only stationary covariance ### - Uses the Fastest Fourier Transform in the West
[docs] class Sampling_FFTW(Sampling_method_freq): """Sampling with FFTW. Two stencils for the forward and inverse FFTs are generated using the following FFTW flags: ``"FFTW_MEASURE", "FFTW_DESTROY_INPUT", "FFTW_UNALIGNED"``. Due to properties of the FFT, only stationary covariances are admissible. """ def __init__(self, RandomField): super().__init__(RandomField) import pyfftw shpR = RandomField.ext_grid_shape shpC = shpR.copy() shpC[-1] = int(shpC[-1] // 2) + 1 axes = np.arange(self.ndim) flags = ("FFTW_MEASURE", "FFTW_DESTROY_INPUT", "FFTW_UNALIGNED") self.fft_x = pyfftw.empty_aligned(shpR, dtype="float64") self.fft_y = pyfftw.empty_aligned(shpC, dtype="complex128") self.fft_plan = pyfftw.FFTW( self.fft_x, self.fft_y, axes=axes, direction="FFTW_FORWARD", flags=flags ) self.ifft_plan = pyfftw.FFTW( self.fft_y, self.fft_x, axes=axes, direction="FFTW_BACKWARD", flags=flags ) self.Spectrum_half = self.Spectrum[..., : shpC[-1]] * np.sqrt(self.Nd.prod()) def __call__(self, noise): self.fft_x[:] = noise self.fft_plan() self.fft_y[:] *= self.Spectrum_half self.ifft_plan() return self.fft_x[self.DomainSlice] / self.TransformNorm
####################################################################################################### # Vector Field Fourier Transform (VF_FFTW) ####################################################################################################### ### - Random vector fields ### - Only stationary covariance ### - Uses the Fastest Fourier Transform in the West
[docs] class Sampling_VF_FFTW(Sampling_method_freq): """FFTW applied to a vector field. This should be used in conjunction with :py:class:`VectorGaussianRandomField`. This sampling method is also multi-threaded across 4 threads, or else the maximum allowed by the environment. As in :py:class:`Sampling_FFTW`, the following FFTW flags are used: ``"FFTW_MEASURE", "FFTW_DESTROY_INPUT", "FFTW_UNALIGNED"``.""" def __init__(self, RandomField): super().__init__(RandomField) import pyfftw n_cpu = 4 try: n_cpu = int(os.environ["OMP_NUM_THREADS"]) except: pass shpR = RandomField.ext_grid_shape shpC = shpR.copy() shpC[-1] = int(shpC[-1] // 2) + 1 axes = np.arange(self.ndim) flags = ("FFTW_MEASURE", "FFTW_DESTROY_INPUT", "FFTW_UNALIGNED") self.fft_x = pyfftw.empty_aligned(shpR, dtype="float32") self.fft_y = pyfftw.empty_aligned(shpC, dtype="complex64") self.fft_plan = pyfftw.FFTW( self.fft_x, self.fft_y, axes=axes, direction="FFTW_FORWARD", flags=flags, threads=n_cpu, ) self.ifft_plan = pyfftw.FFTW( self.fft_y, self.fft_x, axes=axes, direction="FFTW_BACKWARD", flags=flags, threads=n_cpu, ) self.Spectrum_half = self.Spectrum[..., : shpC[-1]] * np.sqrt(self.Nd.prod()) self.hat_noise = np.stack( [np.zeros(shpC, dtype="complex64") for _ in range(3)], axis=-1 ) self.shpC = shpC def __call__(self, noise): tmp = np.zeros(noise.shape) for i in range(noise.shape[-1]): self.fft_x[:] = noise[..., i] self.fft_plan() self.hat_noise[..., i] = self.fft_y[:] self.hat_noise = np.einsum( "kl...,...l->...k", self.Spectrum_half, self.hat_noise ) for i in range(noise.shape[-1]): self.fft_y[:] = self.hat_noise[..., i] self.ifft_plan() tmp[..., i] = self.fft_x[:] return tmp[self.DomainSlice] / self.TransformNorm
####################################################################################################### # Fourier Transform ####################################################################################################### ### - Only stationary covariance ### - Uses sscipy.fftpack (slower solution)
[docs] class Sampling_FFT(Sampling_method_freq): """Sampling using ``scipy.fftpack``, which is considerably slower than with FFTW but is a simpler interface.""" def __init__(self, RandomField): super().__init__(RandomField) def __call__(self, noise): noise_hat = fft.ifftn(noise) y = self.Spectrum * noise_hat y = fft.fftn(y) return y.real[self.DomainSlice] / self.TransformNorm
####################################################################################################### # Sine Transform ####################################################################################################### ### - Only stationary covariance ### - Uses sscipy.fftpack (non the fastest solution)
[docs] class Sampling_DST(Sampling_method_freq): """Sampling using the discrete sine transform from ``scipy.fftpack``, with all other operations being identical as other sampling methods.""" def __init__(self, RandomField): super().__init__(RandomField) def __call__(self, noise): y = self.Spectrum * noise for j in range(self.ndim): y = fft.dst(y, axis=j, type=1) return y[self.DomainSlice] / self.TransformNorm
####################################################################################################### # Cosine Transform ####################################################################################################### ### - Only stationary covariance ### - Uses sscipy.fftpack (non the fastest solution)
[docs] class Sampling_DCT(Sampling_method_freq): """Sampling using the discrete cosine transform from ``scipy.fftpack``, with all other operations being identical as other sampling methods.""" def __init__(self, RandomField): super().__init__(RandomField) def __call__(self, noise): y = self.Spectrum * noise for j in range(self.ndim): y = fft.dct(y, axis=j, type=2) return y[self.DomainSlice] / self.TransformNorm