Source code for pysagas.cfd.oblique_prandtl_meyer

import numpy as np
import pandas as pd
from tqdm import tqdm
from scipy.optimize import bisect
from typing import Optional, Tuple
from pysagas.flow import FlowState
from pysagas.geometry import Vector
from scipy.optimize import root_scalar
from pysagas.utilities import add_sens_data
from pysagas.cfd.solver import FlowSolver, FlowResults, SensitivityResults


[docs] class OPM(FlowSolver): """Oblique shock thoery and Prandtl-Meyer expansion theory flow solver. This implementation uses oblique shock theory for flow-facing cell elements, and Prandtl-Meyer expansion theory for rearward-facing elements. Extended Summary ---------------- Data attribute 'method' refers to which method was used for a particular cell, according to: - -1 : invalid / skipped (eg. 90 degree face) - 0 : parallel face, do nothing - 1 : Prandlt-Meyer - 2 : normal shock - 3 : oblique shock """ method = "Oblique/Prandtl-Meyer combination" PM_ANGLE_THRESHOLD = -20 # degrees
[docs] def solve( self, freestream: Optional[FlowState] = None, mach: Optional[float] = None, aoa: Optional[float] = None, cog: Vector = Vector(0, 0, 0), ) -> FlowResults: already_run = super().solve(freestream=freestream, mach=mach, aoa=aoa) if already_run: # Already have a result if self.verbosity > 1: print("Attention: this flowstate has already been solved.") result = self.flow_result else: # Get flow flow = self._last_solve_freestream # Construct progress bar if self.verbosity > 0: print() desc = f"Running OPM solver at AoA = {flow.aoa:.2f} and Mach = {flow.M:.2f}" pbar = tqdm( total=len(self.cells), desc=desc, position=0, leave=True, ) # Iterate over all cells net_force = Vector(0, 0, 0) net_moment = Vector(0, 0, 0) bad = 0 total = 0 for cell in self.cells: # Calculate orientation of cell to flow theta = np.pi / 2 - np.arccos( np.dot(flow.direction.vec, cell.n.vec) / (cell.n.norm * flow.direction.norm) ) # Also calculate vector to COG from cell centroid r = cell.c - cog # Solve flow for this cell if theta < 0: # Rearward facing cell if theta < np.deg2rad(self.PM_ANGLE_THRESHOLD): M2, p2, T2 = (flow.M, 0.0, flow.T) method = -1 bad += 1 else: # Use Prandtl-Meyer expansion theory method = 1 M2, p2, T2 = self._solve_pm( abs(theta), flow.M, flow.P, flow.T, flow.gamma ) elif theta > 0: # Use shock theory beta_max = OPM.beta_max(M=flow.M, gamma=flow.gamma) theta_max = OPM.theta_from_beta( M1=flow.M, beta=beta_max, gamma=flow.gamma ) if theta > theta_max: # Detached shock method = 2 M2, p2, T2 = self._solve_normal( flow.M, flow.P, flow.T, flow.gamma ) else: # Use oblique shock theory method = 3 M2, p2, T2 = self._solve_oblique( theta, flow.M, flow.P, flow.T, flow.gamma ) else: # Cell is parallel to flow, assume no change method = 0 M2, p2, T2 = (flow.M, flow.P, flow.T) total += 1 # Save results for this cell cell.attributes["pressure"] = p2 cell.attributes["Mach"] = M2 cell.attributes["temperature"] = T2 cell.attributes["method"] = method # Add flowstate info to Cell # TODO - calculate flowstate direction cell.flowstate = FlowState( mach=M2, pressure=p2, temperature=T2, direction=None, ) # Calculate force vector F = cell.n * p2 * cell.A net_force += F net_moment += Vector.from_coordinates(np.cross(r.vec, F.vec)) # Update progress bar if self.verbosity > 0: pbar.update(1) if self.verbosity > 0: pbar.close() if bad / total > 0.25: print( f"WARNING: {100*bad/total:.2f}% of cells were not " "solved due to PM threshold." ) # Construct results result = FlowResults( freestream=flow, net_force=net_force, net_moment=net_moment ) # Save self.flow_result = result # Print result if self.verbosity > 1: print(result) return result
[docs] @staticmethod def pm(M: float, gamma: float = 1.4): """Solves the Prandtl-Meyer function and returns the Prandtl-Meyer angle in radians.""" v = ((gamma + 1) / (gamma - 1)) ** 0.5 * np.arctan( ((M**2 - 1) * (gamma - 1) / (gamma + 1)) ** 0.5 ) - np.arctan((M**2 - 1) ** 0.5) return v
[docs] @staticmethod def inv_pm(angle: float, gamma: float = 1.4): """Solves the inverse Prandtl-Meyer function using a bisection algorithm.""" func = lambda M: OPM.pm(M, gamma) - angle pm = bisect(func, 1.0, 42.0) return pm
[docs] @staticmethod def _solve_pm( theta: float, M1: float, p1: float = 1.0, T1: float = 1.0, gamma: float = 1.4 ): """Solves for the Mach number, pressure and temperature after a Prandtl-Meyer expansion fan. Parameters ---------- theta : float The deflection angle, specified in radians. M1 : float The pre-expansion Mach number. p1 : float, optional The pre-expansion pressure (Pa). The default is 1.0. T1 : float, optional The pre-expansion temperature (K). The default is 1.0. gamma : float, optional The ratio of specific heats. The default is 1.4. Returns -------- M2 : float The post-expansion Mach number. p2 : float The post-expansion pressure (Pa). T2 : float The post-expansion temperature (K). """ # Solve for M2 v_M1 = OPM.pm(M=M1, gamma=gamma) v_M2 = theta + v_M1 M2 = OPM.inv_pm(v_M2) a = (gamma - 1) / 2 n = 1 + a * M1**2 d = 1 + a * M2**2 p2 = p1 * (n / d) ** (gamma / (gamma - 1)) T2 = T1 * (n / d) return M2, p2, T2
[docs] @staticmethod def _solve_oblique( theta: float, M1: float, p1: float = 1.0, T1: float = 1.0, gamma: float = 1.4 ): """Solves the flow using oblique shock theory. Parameters ---------- theta : float The flow deflection angle, specified in radians. M1 : float The pre-shock Mach number. p1 : float, optional The pre-shock pressure (Pa). The default is 1.0. T1 : float, optional The pre-expansion temperature (K). The default is 1.0. gamma : float, optional The ratio of specific heats. The default is 1.4. Returns -------- M2 : float The post-shock Mach number. p2 : float The post-shock pressure (Pa). T2 : float The post-shock temperature (K). """ # Calculate angle and ratios beta = OPM.oblique_beta(M1, theta, gamma) p2_p1 = OPM.oblique_p2_p1(M1, beta, gamma) rho2_rho1 = OPM.oblique_rho2_rho1(M1, beta, gamma) T2_T1 = p2_p1 / rho2_rho1 # Calculate properties M2 = OPM.oblique_M2(M1, beta, theta, gamma) p2 = p2_p1 * p1 T2 = T1 * T2_T1 return M2, p2, T2
[docs] @staticmethod def _solve_normal(M1: float, p1: float = 1.0, T1: float = 1.0, gamma: float = 1.4): """Solves the flow using normal shock theory. Parameters ---------- M1 : float The pre-shock Mach number. p1 : float, optional The pre-shock pressure (Pa). The default is 1.0. T1 : float, optional The pre-expansion temperature (K). The default is 1.0. gamma : float, optional The ratio of specific heats. The default is 1.4. Returns -------- M2 : float The post-shock Mach number. p2 : float The post-shock pressure (Pa). T2 : float The post-shock temperature (K). """ rho2_rho1 = (gamma + 1) * M1**2 / (2 + (gamma - 1) * M1**2) p2_p1 = 1 + 2 * gamma * (M1**2 - 1) / (gamma + 1) M2 = ((1 + M1**2 * (gamma - 1) / 2) / (gamma * M1**2 - (gamma - 1) / 2)) ** 0.5 p2 = p1 * p2_p1 T2 = T1 * p2_p1 / rho2_rho1 return M2, p2, T2
[docs] @staticmethod def oblique_p2_p1(M1: float, beta: float, gamma: float = 1.4): """Returns the pressure ratio p2/p1 across an oblique shock. Parameters ----------- M1 : float The pre-shock Mach number. beta : float The shock angle specified in radians. Returns -------- p2/p1 : float The pressure ratio across the shock. References ---------- Peter Jacobs """ M1n = M1 * abs(np.sin(beta)) p2p1 = 1.0 + 2.0 * gamma / (gamma + 1.0) * (M1n**2 - 1.0) return p2p1
[docs] @staticmethod def oblique_beta( M1: float, theta: float, gamma: float = 1.4, tolerance: float = 1.0e-6 ): """Calculates the oblique shock angle using a recursive algorithm. Parameters ---------- M1 : float The pre-shock Mach number. theta : float The flow deflection angle, specified in radians. gamma : float, optional The ratio of specific heats. The default is 1.4. References ---------- Peter Jacobs """ func = lambda beta: OPM.theta_from_beta(M1, beta, gamma) - theta # Initialise sign_beta = np.sign(theta) theta = abs(theta) b1 = np.arcsin(1.0 / M1) b2 = b1 * 1.05 # Check f1 f1 = func(b1) if abs(f1) < tolerance: return sign_beta * b1 # Check f2 f2 = func(b2) if abs(f2) < tolerance: return sign_beta * b2 # Finally, solve with secant method # beta = sign_beta * secant(func, b1, b2, tol=tolerance) root_result = root_scalar(f=func, x0=b1, x1=b2, xtol=tolerance, method="secant") if root_result.converged: beta = sign_beta * root_result.root else: raise Exception( f"Cannot converge beta (theta={np.rad2deg(theta):.2f} degrees)." ) return beta
[docs] @staticmethod def theta_from_beta(M1: float, beta: float, gamma: float = 1.4): """Calculates the flow deflection angle from the oblique shock angle. Parameters ---------- M1 : float The pre-expansion Mach number. beta : float The shock angle specified in radians. gamma : float, optional The ratio of specific heats. The default is 1.4. Returns -------- theta : float The deflection angle, specified in radians. References ---------- Peter Jacobs """ M1n = M1 * abs(np.sin(beta)) t1 = 2.0 / np.tan(beta) * (M1n**2 - 1) t2 = M1**2 * (gamma + np.cos(2 * beta)) + 2 theta = np.arctan(t1 / t2) return theta
[docs] @staticmethod def oblique_M2(M1: float, beta: float, theta: float, gamma: float = 1.4): """Calculates the Mach number following an oblique shock. Parameters ---------- M1 : float The pre-expansion Mach number. beta : float The shock angle specified in radians. theta : float The deflection angle, specified in radians. gamma : float, optional The ratio of specific heats. The default is 1.4. Returns -------- M2 : float The post-shock Mach number. References ---------- Peter Jacobs """ M1n = M1 * abs(np.sin(beta)) a = 1 + (gamma - 1) * 0.5 * M1n**2 b = gamma * M1n**2 - (gamma - 1) * 0.5 M2 = (a / b / (np.sin(beta - theta)) ** 2) ** 0.5 return M2
[docs] @staticmethod def oblique_T2_T1(M1: float, beta: float, gamma: float = 1.4): """Returns the temperature ratio T2/T1 across an oblique shock. Parameters ----------- M1 : float The pre-shock Mach number. beta : float The shock angle specified in radians. gamma : float, optional The ratio of specific heats. The default is 1.4. Returns -------- T2/T1 : float The temperature ratio across the shock. References ---------- Peter Jacobs """ T2_T1 = OPM.oblique_p2_p1(M1, beta, gamma) / OPM.oblique_rho2_rho1( M1, beta, gamma ) return T2_T1
[docs] @staticmethod def oblique_rho2_rho1(M1: float, beta: float, gamma: float = 1.4): """Returns the density ratio rho2/rho1 across an oblique shock. Parameters ----------- M1 : float The pre-shock Mach number. beta : float The shock angle specified in radians. gamma : float, optional The ratio of specific heats. The default is 1.4. Returns -------- T2/T1 : float The temperature ratio across the shock. References ---------- Peter Jacobs """ M1n = M1 * abs(np.sin(beta)) rho2_rho1 = (gamma + 1) * M1n**2 / (2 + (gamma - 1) * M1n**2) return rho2_rho1
[docs] @staticmethod def beta_max(M: float, gamma: float = 1.4): """Returns the maximum shock angle for a given Mach number. """ beta_max = np.arcsin( np.sqrt( (1 / (gamma * M**2)) * ( (gamma + 1) * M**2 / 4 - 1 + np.sqrt( (gamma + 1) * ((gamma + 1) * M**4 / 16 + (gamma - 1) * M**2 / 2 + 1) ) ) ) ) return beta_max
[docs] def solve_sens( self, sensitivity_filepath: Optional[str] = None, parameters: Optional[list[str]] = None, cells_have_sens_data: Optional[bool] = False, freestream: Optional[FlowState] = None, mach: Optional[float] = None, aoa: Optional[float] = None, cog: Vector = Vector(0, 0, 0), perturbation: float = 1e-3, ) -> SensitivityResults: # TODO - add to cell attributes for visualisation # TODO - return Mach and Temp sensitivities already_run = super().solve_sens(freestream=freestream, mach=mach, aoa=aoa) if already_run: # Already have a result if self.verbosity > 1: print("Attention: this flowstate sensitivity has already been solved.") result = self.flow_sensitivity else: # Get flow flow = self._last_sens_freestream # Check that nominal flow condition has been solved already if self._last_solve_freestream: # Solver has run before if self._last_solve_freestream != flow: # Run flow solver first at the nominal flow conditions if self.verbosity > 0: print("Running flow solver to prepare sensitivities.") self.solve(flow) else: # Solver has not run yet at all, run it now self.solve(flow) if cells_have_sens_data and not parameters: # Need to extract parameters _, parameters = self._load_sens_data(sensitivity_filepath) elif not cells_have_sens_data: # Load sensitivity data and parameters sensdata, parameters = self._load_sens_data(sensitivity_filepath) # Add sensitivity data to cells add_sens_data(cells=self.cells, data=sensdata, verbosity=self.verbosity) # Construct progress bar if self.verbosity > 0: print() desc = "Solving cell flow sensitivities." pbar = tqdm( total=len(self.cells), desc=desc, position=0, leave=True, ) # Iterate over cells dFdp = 0 dMdp = 0 for cell in self.cells: # Initialisation all_directions = [Vector(1, 0, 0), Vector(0, 1, 0), Vector(0, 0, 1)] sensitivities = np.empty(shape=(cell.dndp.shape[1], 3)) moment_sensitivities = np.empty(shape=(cell.dndp.shape[1], 3)) # Calculate moment dependencies r = cell.c - cog F = cell.flowstate.P * cell.A * cell.n.vec # Calculate orientation of cell to flow theta = np.pi / 2 - np.arccos( np.dot(flow.direction.vec, cell.n.vec) / (cell.n.norm * flow.direction.norm) ) # Solve flow for this cell if theta < 0: # Rearward facing cell if theta < np.deg2rad(self.PM_ANGLE_THRESHOLD): _, dP_dtheta, _ = (0.0, 0.0, 0.0) else: # Use Prandtl-Meyer expansion theory _, dP_dtheta, _ = self.dp_dtheta_pm( abs(theta), flow.M, cell.flowstate, flow.P, flow.T, flow.gamma, perturbation, ) elif theta > 0: # Use shock theory beta_max = OPM.beta_max(M=flow.M, gamma=flow.gamma) theta_max = OPM.theta_from_beta( M1=flow.M, beta=beta_max, gamma=flow.gamma ) if theta > theta_max: # Detached shock _, dP_dtheta, _ = (0.0, 0.0, 0.0) else: # Use oblique shock theory _, dP_dtheta, _ = self.dp_dtheta_obl( theta, flow.M, cell.flowstate, flow.P, flow.T, flow.gamma, perturbation, ) else: # Cell is parallel to flow, assume no change _, dP_dtheta, _ = (0.0, 0.0, 0.0) # For each parameter for p_i in range(cell.dndp.shape[1]): # Calculate gradient of pressure with respect to parameter dtheta_dp = ( -np.dot(cell.dndp[:, p_i], flow.direction.vec) / (1 - np.dot(cell.n.unit.vec, flow.direction.unit.vec) ** 2) ** 0.5 ) if np.isnan(dtheta_dp): # Normal vector parallel to flow dtheta_dp = 0 dPdp = dP_dtheta * dtheta_dp # Calculate pressure sensitivity for each direction for i, direction in enumerate(all_directions): dF = ( dPdp * cell.A * np.dot(cell.n.vec, direction.vec) + cell.flowstate.P * cell.dAdp[p_i] * np.dot(cell.n.vec, direction.vec) + cell.flowstate.P * cell.A * np.dot(-cell.dndp[:, p_i], direction.vec) ) sensitivities[p_i, i] = dF # Now evaluate moment sensitivities moment_sensitivities[p_i, :] = np.cross( r.vec, sensitivities[p_i, :] ) + np.cross(cell.dcdp[:, p_i], F) # Append to cell cell.sensitivities = sensitivities cell.moment_sensitivities = moment_sensitivities # Update dFdp += sensitivities dMdp += moment_sensitivities # Update progress bar if self.verbosity > 0: pbar.update(1) if self.verbosity > 0: pbar.close() print("Done.") # Construct dataframes to return df_f = pd.DataFrame( dFdp, columns=["dFx/dp", "dFy/dp", "dFz/dp"], index=parameters ) df_m = pd.DataFrame( dMdp, columns=["dMx/dp", "dMy/dp", "dMz/dp"], index=parameters ) # Construct results result = SensitivityResults( f_sens=df_f, m_sens=df_m, freestream=flow, ) # Save self.flow_sensitivity = result if self.verbosity > 0: print(result) return result
[docs] @staticmethod def dp_dtheta_obl( theta: float, M1: float, nominal_flowstate: FlowState, p1: float = 1.0, T1: float = 1.0, gamma: float = 1.4, perturbation: float = 1e-3, ): """Solves for the sensitivities at the given point using oblique shock theory. Parameters ---------- theta : float The deflection angle, specified in radians. M1 : float The pre-expansion Mach number. nominal_flowstate : FlowState The nominal flowstate at this point. p1 : float, optional The pre-expansion pressure (Pa). The default is 1.0. T1 : float, optional The pre-expansion temperature (K). The default is 1.0. gamma : float, optional The ratio of specific heats. The default is 1.4. perturbation : float, optional The perturbation amount. Perturbations are calculated according to a multiple of (1 +/- perturbation). The default is 1e-3. Returns -------- dM_dtheta : float The sensitivity of the post-expansion Mach number with respect to the deflection angle theta. dP_dtheta : float The sensitivity of the post-expansion pressure (Pa) with respect to the deflection angle theta. dT_dtheta : float The sensitivity of the post-expansion temperature (K) with respect to the deflection angle theta. """ # Create function handle func = lambda theta: OPM._solve_oblique(theta, M1, p1, T1, gamma) # Calculate derivitive by finite differencing g = OPM._findiff(func, theta, nominal_flowstate, perturbation) return g
[docs] @staticmethod def dp_dtheta_pm( theta: float, M1: float, nominal_flowstate: FlowState, p1: float = 1.0, T1: float = 1.0, gamma: float = 1.4, perturbation: float = 1e-3, ): """Solves for the sensitivities at the given point using Prandtl-Meyer theory. Parameters ---------- theta : float The deflection angle, specified in radians. M1 : float The pre-expansion Mach number. nominal_flowstate : FlowState The nominal flowstate at this point. p1 : float, optional The pre-expansion pressure (Pa). The default is 1.0. T1 : float, optional The pre-expansion temperature (K). The default is 1.0. gamma : float, optional The ratio of specific heats. The default is 1.4. perturbation : float, optional The perturbation amount. Perturbations are calculated according to a multiple of (1 +/- perturbation). The default is 1e-3. Returns -------- dM_dtheta : float The sensitivity of the post-expansion Mach number with respect to the deflection angle theta. dP_dtheta : float The sensitivity of the post-expansion pressure (Pa) with respect to the deflection angle theta. dT_dtheta : float The sensitivity of the post-expansion temperature (K) with respect to the deflection angle theta. """ # Create function handle func = lambda theta: OPM._solve_pm(theta, M1, p1, T1, gamma) # Calculate derivitive by finite differencing g = OPM._findiff(func, theta, nominal_flowstate, perturbation) return g
@staticmethod def _findiff( func: callable, point: any, nominal_flowstate: FlowState, perturbation: float = 1e-3, ): # Generate flow value at points +/- perturbed from nominal point xvals = [point * (1 - perturbation), point, point * (1 + perturbation)] vals = [ func(p) for p in [point * (1 - perturbation), point * (1 + perturbation)] ] Mvals = [v[0] for v in vals] pvals = [v[1] for v in vals] Tvals = [v[2] for v in vals] # Inject nominal result Mvals = [Mvals[0], nominal_flowstate.M, Mvals[1]] pvals = [pvals[0], nominal_flowstate.P, pvals[1]] Tvals = [Tvals[0], nominal_flowstate.T, Tvals[1]] dM = np.gradient(Mvals, xvals) dp = np.gradient(pvals, xvals) dT = np.gradient(Tvals, xvals) return dM[1], dp[1], dT[1]
[docs] def save(self, name: str): # Initialise attributes dictionary attributes = { "pressure": [], "Mach": [], "temperature": [], "method": [], } # Call super method super().save(name, attributes)
[docs] @staticmethod def _load_sens_data(sensitivity_filepath: str) -> Tuple[pd.DataFrame, list[set]]: """Extracts design parameters from the sensitivity data.""" sensdata = pd.read_csv(sensitivity_filepath) parameters = set() for e in sensdata.columns: e: str if e.startswith("dxd") or e.startswith("dyd") or e.startswith("dzd"): # Sensitivity coluns parameters.add(e[3:]) return sensdata, list(parameters)