Source code for fastoad.models.performances.mission.openmdao.mission_wrapper

"""
Mission wrapper.
"""

#  This file is part of FAST-OAD : A framework for rapid Overall Aircraft Design
#  Copyright (C) 2024 ONERA & ISAE-SUPAERO
#  FAST is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see <https://www.gnu.org/licenses/>.

from itertools import pairwise
from os import PathLike

import numpy as np
import openmdao.api as om
import pandas as pd
from openmdao.vectors.vector import Vector

from fastoad.constants import FlightPhase
from fastoad.model_base import FlightPoint
from fastoad.model_base.propulsion import IPropulsion

from ..mission_definition.mission_builder import MissionBuilder
from ..mission_definition.mission_builder.constants import NAME_TAG, SEGMENT_TYPE_TAG, TYPE_TAG
from ..mission_definition.schema import (
    CLIMB_PARTS_TAG,
    CRUISE_PART_TAG,
    DESCENT_PARTS_TAG,
    PARTS_TAG,
    PHASE_TAG,
    RESERVE_TAG,
    ROUTE_TAG,
    MissionDefinition,
)

TOFL_FACTOR = 1.15


[docs] class MissionWrapper(MissionBuilder): """ Wrapper around :class:`~fastoad.models.performances.mission.mission_definition.mission_builder.MissionBuilder` for using with OpenMDAO. Unlike its parent class, the `mission_name` argument is mandatory at instantiation, unless there is only one mission in the definition file. """ def __init__( self, mission_definition: str | PathLike | MissionDefinition, *, propulsion: IPropulsion = None, reference_area: float | None = None, mission_name: str | None = None, variable_prefix: str = "data:mission", force_all_block_fuel_usage: bool = False, ): """ :param mission_definition: a file path or MissionDefinition instance :param propulsion: if not provided, the property :attr:`propulsion` must be set before calling :meth:`build` :param reference_area: if not provided, the property :attr:`reference_area` must be set before calling :meth:`build` :param mission_name: name of chosen mission. Can be omitted if definition file contains only one mission. :param variable_prefix: prefix for auto-generated variable names. :param force_all_block_fuel_usage: if True and if `mission_name` is provided, the mission definition will be modified to set the target fuel consumption to variable "~:block_fuel" """ super().__init__( mission_definition, propulsion=propulsion, reference_area=reference_area, mission_name=mission_name, variable_prefix=variable_prefix, ) self.consumed_fuel_before_input_weight = 0.0 if force_all_block_fuel_usage: self.force_all_block_fuel_usage()
[docs] def force_all_block_fuel_usage(self): """Modifies mission definition to set block fuel as target fuel consumption.""" if self._mission_name: self.definition.force_all_block_fuel_usage(self.mission_name) self._update_structure_builders()
[docs] def setup(self, component: om.ExplicitComponent): """ To be used during setup() of provided OpenMDAO component. It adds input and output variables deduced from mission definition file. :param component: the OpenMDAO component where the setup is done. """ input_definition = self.get_input_variables(self.mission_name) output_definition = self._identify_outputs() output_definition = { name: value for name, value in output_definition.items() if name not in input_definition.names() } for variable in input_definition: component.add_input(**variable.get_openmdao_kwargs()) for name, (units, desc) in output_definition.items(): component.add_output(name, 0.0, units=units, desc=desc)
[docs] def compute( self, start_flight_point: FlightPoint, inputs: Vector, outputs: Vector ) -> pd.DataFrame: """ To be used during compute() of an OpenMDAO component. Builds the mission from input file, and computes it. `outputs` vector is filled with duration, burned fuel and covered ground distance for each part of the flight. :param start_flight_point: starting point of mission :param inputs: the input vector of the OpenMDAO component :param outputs: the output vector of the OpenMDAO component :return: a pandas DataFrame where column names match fields of :class:`~fastoad.model_base.flight_point.FlightPoint` """ mission = self.build(inputs, self.mission_name) def _compute_vars(name_root, start: FlightPoint, end: FlightPoint): """Computes duration, burned fuel and covered distance.""" distance = end.ground_distance - start.ground_distance if name_root + ":duration" in outputs: outputs[name_root + ":duration"] = end.time - start.time if name_root + ":fuel" in outputs: outputs[name_root + ":fuel"] = start.mass - end.mass if name_root + ":distance" in outputs: outputs[name_root + ":distance"] = distance if name_root + ":TOFL" in outputs: outputs[name_root + ":TOFL"] = TOFL_FACTOR * distance if name_root + ":initial_altitude" in outputs: outputs[name_root + ":initial_altitude"] = start.altitude if name_root + ":final_altitude" in outputs: outputs[name_root + ":final_altitude"] = end.altitude if name_root + ":altitude" in outputs: # for non-optimal cruise segments outputs[name_root + ":altitude"] = start.altitude flight_points = mission.compute_from(start_flight_point) flight_points.loc[0, "name"] = flight_points.loc[1, "name"] nb_levels = np.max([len(n.split(":")) for n in flight_points["name"]]) for i in range(nb_levels): flight_points["name2"] = [":".join(n.split(":")[: i + 1]) for n in flight_points.name] grouped_points = flight_points.groupby("name2") part_names = pd.unique(flight_points.name2) for part_name1, part_name2 in pairwise(part_names): part1 = grouped_points.get_group(part_name1) part2 = grouped_points.get_group(part_name2) _compute_vars( f"{self.variable_prefix}:{part_name2}", part1.iloc[-1], part2.iloc[-1] ) start_part_name = part_names[0] start_part = grouped_points.get_group(start_part_name) _compute_vars( f"{self.variable_prefix}:{start_part_name}", start_part.iloc[0], start_part.iloc[-1] ) del flight_points["name2"] self.consumed_fuel_before_input_weight = mission.consumed_mass_before_input_weight if mission.reserve_ratio: outputs[self.get_reserve_variable_name()] = mission.get_reserve_fuel() return flight_points
[docs] def get_reserve_variable_name(self) -> str: """ :return: the name of OpenMDAO variable for fuel reserve. This name is among the declared outputs in :meth:`setup`. """ return f"{self.variable_prefix}:{self.mission_name}:reserve:fuel"
def _identify_outputs(self) -> dict[str, tuple[str, str]]: """ Builds names of OpenMDAO outputs from names of mission, route and phases. :return: dictionary with variable name as key and unit, description as value """ output_definition = {} output_definition.update(self._add_vars(self.mission_name)) for part in self._structure_builders[self.mission_name].structure[PARTS_TAG]: if RESERVE_TAG in part: output_definition[self.get_reserve_variable_name()] = ( "kg", f'reserve fuel for mission "{self.mission_name}"', ) elif part[TYPE_TAG] == PHASE_TAG: subpart_name = part[NAME_TAG] output_definition.update(self._add_vars(subpart_name, part)) elif part[TYPE_TAG] == ROUTE_TAG: route_name = part[NAME_TAG] output_definition.update(self._add_vars(route_name)) for subpart in part[CLIMB_PARTS_TAG] + part[DESCENT_PARTS_TAG]: subpart_name = subpart[NAME_TAG] output_definition.update(self._add_vars(subpart_name)) cruise_part = part[CRUISE_PART_TAG] output_definition.update( self._add_vars(route_name + ":" + FlightPhase.CRUISE.value, cruise_part) ) return output_definition def _add_vars(self, part_name, part_structure=None) -> dict: """ Builds names of OpenMDAO outputs for provided mission, route and phase names. :param part_name: part name in the form <mission_name>:<route_name:<phase_name>, route_name and phase_name being independently optional. :param part_structure: optional structure dict containing segment_type and other metadata :return: dictionary with variable name as key and unit, description as value """ output_definition = {} name_root = ":".join(name for name in [f"{self.variable_prefix}", part_name] if name) names = part_name.split(":") mission_name, route_name, phase_name = names + [""] * (3 - len(names)) if not phase_name and route_name not in self.get_route_names(): phase_name = route_name route_name = "" if route_name and phase_name: flight_part_desc = ( f'phase "{phase_name}" of route "{route_name}" in mission "{mission_name}"' ) elif route_name: flight_part_desc = f'route "{route_name}" in mission "{mission_name}"' elif phase_name: flight_part_desc = f'phase "{phase_name}" in mission "{mission_name}"' else: flight_part_desc = f'mission "{mission_name}"' output_definition[name_root + ":duration"] = ("s", f"duration of {flight_part_desc}") output_definition[name_root + ":fuel"] = ("kg", f"burned fuel during {flight_part_desc}") output_definition[name_root + ":distance"] = ( "m", f"covered ground distance during {flight_part_desc}", ) if self._is_takeoff_phase(part_structure): output_definition[name_root + ":TOFL"] = ( "m", f"estimated takeoff field length (CS-25.113(a), {TOFL_FACTOR} x AEO takeoff " f"distance) during {flight_part_desc}", ) # Check if this is an optimal cruise or any cruise-like segment if part_structure: if part_structure.get(SEGMENT_TYPE_TAG) == "optimal_cruise": # NOTE: # For optimal cruise segments, this "initial_altitude" output may not reflect # the actual altitude flown at the beginning of the cruise if there is an # altitude discontinuity with the previous segment, i.e., no optimal altitude climb # segment. In such cases, the optimal cruise segment internally enforces its own # starting altitude, but the value reported here corresponds to the previous segment # boundary. To avoid this inconsistency, insert a climb segment with target # "optimal_altitude" before the optimal cruise. output_definition[name_root + ":initial_altitude"] = ( "m", f"initial cruise altitude during {flight_part_desc}" " (may differ from actual flown altitude if optimal cruise starts with" " a discontinuity)", ) output_definition[name_root + ":final_altitude"] = ( "m", f"final cruise altitude during {flight_part_desc}", ) elif part_structure.get(SEGMENT_TYPE_TAG) in ["cruise", "breguet"]: output_definition[name_root + ":altitude"] = ( "m", f"cruise altitude during {flight_part_desc}", ) return output_definition @staticmethod def _is_takeoff_phase(part_structure: dict | None) -> bool: """Returns True when provided phase structure looks like a takeoff phase.""" if not isinstance(part_structure, dict): return False for part in part_structure.get(PARTS_TAG, []): segment_type = part.get(SEGMENT_TYPE_TAG) if segment_type in {"takeoff", "rotation", "end_of_takeoff"}: return True return False