"""Tools for running multiple computations"""
# This file is part of FAST-OAD : A framework for rapid Overall Aircraft Design
# Copyright (C) 2024 ONERA & ISAE-SUPAERO
# FAST is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
import multiprocessing as mp
from contextlib import contextmanager
from dataclasses import dataclass
from math import ceil, log10
from os import PathLike
from pathlib import Path
from typing import List, Optional, Union
from openmdao.utils.mpi import FakeComm
from fastoad._utils.files import as_path, make_parent_dir
from fastoad.io import DataFile
from fastoad.io.configuration import FASTOADProblemConfigurator
from fastoad.openmdao.variables import VariableList
# Import MPI4Py at the module level to avoid repeated imports
try:
from mpi4py import MPI
from mpi4py.futures import MPIPoolExecutor
except ImportError:
HAVE_MPI = False
else:
HAVE_MPI = True
_LOGGER = logging.getLogger(__name__) # Logger for this module
[docs]@dataclass
class CalcRunner:
"""
Class for running FAST-OAD computations for a specific configuration.
It is specifically designed to run several computations concurrently with
:meth:`run_cases`.
For each computation, data can be isolated in a specific folder.
"""
#: Configuration file, common to all computations
configuration_file_path: Union[str, PathLike]
#: Input file for the computation (will supersede the input file setting in
# configuration file)
input_file_path: Optional[Union[str, PathLike]] = None
#: For activating MDO instead MDA
optimize: bool = False
def __post_init__(self):
# Let's ensure we have absolute paths
self.configuration_file_path = as_path(self.configuration_file_path).resolve()
if self.input_file_path:
self.input_file_path = as_path(self.input_file_path).resolve()
[docs] def run(
self,
input_values: Optional[VariableList] = None,
calculation_folder: Optional[Union[str, PathLike]] = None,
) -> DataFile:
"""
Run the computation.
This method is useful to set input values on-the-fly, and/or isolate the
computation data in a dedicated folder.
:param input_values: if provided, these values will supersede the content
of input file (specified in configuration file)
:param calculation_folder: if specified, all data, including configuration file,
will be stored in that folder. The input file in this folder
will contain data from `input_values`
:return: the written output data
"""
configuration = FASTOADProblemConfigurator(self.configuration_file_path)
if self.input_file_path:
configuration.input_file_path = self.input_file_path
if calculation_folder:
make_parent_dir(calculation_folder)
configuration.make_local(calculation_folder)
if input_values:
input_data = DataFile(configuration.input_file_path)
input_data.update(input_values)
input_data.save()
problem = configuration.get_problem(read_inputs=True)
problem.comm = FakeComm() # We do not run OpenMDAO in parallel
problem.setup()
if input_values and not calculation_folder:
for input_variable in input_values:
problem.set_val(
input_variable.name,
val=input_variable.val,
units=input_variable.units,
)
if self.optimize:
problem.run_driver()
else:
problem.run_model()
output_data = problem.write_outputs()
return output_data
[docs] def run_cases(
self,
input_list: List[VariableList],
destination_folder: Union[str, PathLike],
*,
max_workers: Optional[int] = None,
use_MPI_if_available: bool = True,
overwrite_subfolders: bool = False,
):
"""
Run computations concurrently.
The data of each computation will be isolated in a dedicated subfolder of
`destination folder`.
:param input_list: a computation will be run for each item of this list
:param destination_folder: The data of each computation will be isolated in a dedicated
subfolder of this folder.
:param max_workers: if not specified, all available processors will be used. Set to -1
to use all available processors except 1 (useful when running
locally while keeping some CPU for working meanwhile).
:param use_MPI_if_available: If False, or if no MPI implementation is available,
computations will be run concurrently using the multiprocessing
library.
:param overwrite_subfolders: if False, calculations that match existing subfolders won't be
run (allows batch continuation)
"""
destination_folder = as_path(destination_folder).resolve()
use_MPI = use_MPI_if_available and HAVE_MPI
if use_MPI_if_available and not HAVE_MPI:
_LOGGER.warning("No MPI environment found. Using multiprocessing instead.")
# One worker is consumed by the MPIPoolExecutor
max_proc = (MPI.COMM_WORLD.Get_size() - 1) if use_MPI else mp.cpu_count()
if max_workers == -1:
max_workers = max_proc - 1
elif max_workers is not None:
if max_workers > max_proc:
_LOGGER.warning(
'Asked for "%d" workers, but only "%d" available.'
'Setting "max_workers" to "%d".',
max_workers,
max_proc,
max_proc,
)
max_workers = max(1, min(max_workers, max_proc))
pool_cls = _MPIPool if use_MPI else mp.Pool
with pool_cls(max_workers) as pool:
pool.starmap(
CalcRunner.run,
self._calculation_inputs(input_list, destination_folder, overwrite_subfolders),
# If a computation crashes, the whole chunk stops.
# chunksize=1 ensures all computations will be launched.
chunksize=1,
)
def _calculation_inputs(
self,
input_list: List[VariableList],
destination_folder: Path,
overwrite_subfolders: bool,
):
"""Iterator for providing inputs of :meth:`run`."""
case_count = len(input_list)
n_digits = ceil(log10(case_count))
for i, input_vars in enumerate(input_list):
calculation_folder = destination_folder / f"calc_{i:0{n_digits}d}"
if overwrite_subfolders or not calculation_folder.is_dir():
yield self, input_vars, calculation_folder
else:
_LOGGER.info('Subfolder "%s" exists. Computation skipped', calculation_folder)
@contextmanager
def _MPIPool(*args, **kwargs):
"""Assumes availability of MPI environment."""
pool = MPIPoolExecutor(*args, main=False, **kwargs)
try:
yield pool
finally:
pool.shutdown()