Source code for minionpy.parallel

from concurrent.futures import ProcessPoolExecutor, as_completed, ThreadPoolExecutor
import threading
import sys

_instance = None  # Global instance per process

def _init_process(class_name, args, kwargs):
    """
    Initializer function to create a per-process instance.

    This function is called when a new process is initialized. It creates an 
    instance of the specified class (`class_name`) with the provided arguments 
    (`args` and `kwargs`). The instance is stored in the global `_instance` 
    variable so that it can be accessed by the worker function.

    Parameters
    ----------
    class_name : class
        The class to instantiate.
    args : tuple
        Positional arguments to pass to the class constructor.
    kwargs : dict
        Keyword arguments to pass to the class constructor.
    """
    global _instance
    _instance = class_name(*args, **kwargs)

def _worker_function(x, index):
    """
    Worker function to evaluate the objective function.

    This function is executed by each worker process. It evaluates the objective 
    function for a given input (`x`) using the instance of the class created 
    during initialization. The result is returned along with the input index.

    Parameters
    ----------
    x : list or numpy.ndarray
        The input data for the objective function evaluation.
    index : int
        The index of the current input `x` for result tracking.

    Returns
    -------
    tuple
        A tuple containing the index and the result of the objective function 
        evaluation.
    """
    global _instance
    return index, _instance.objective_function(x)


[docs]class Process_Parallel: """ Parallelizes the evaluation of an objective function using multiple processes. This class uses a process pool executor to parallelize the evaluation of the objective function across multiple processes. Each process is provided its own instance of the `class_name` class, ensuring independence between processes and enabling the use of multiple CPU cores for computation. Working Principle: - When the `objective` method is called, a list of inputs (`X`) is passed, and the function evaluations are distributed across the available processes. - Each process calls the `__get_instance` method, which ensures that each process gets its own instance of the class `class_name`. This avoids issues that may arise when sharing objects between processes. - The `__calc_func` method evaluates the objective function for each input `x` by calling the `objective_function` method of the process-local instance. - The results are collected from the processes as they complete, and the final output is returned after all evaluations have finished. Parameters ---------- Nprocesses : int The number of processes to use for parallel processing. class_name : class The class containing the 'objective_function' method. args : tuple Positional arguments passed to the class constructor. kwargs : dict Keyword arguments passed to the class constructor. Raises ------ ValueError If the provided class does not have an 'objective_function' method, which is required for the parallel execution. Attributes ---------- Nprocesses : int The number of processes to use. class_name : class The class that provides the 'objective_function' method. args : tuple Positional arguments passed to `class_name`'s constructor. kwargs : dict Keyword arguments passed to `class_name`'s constructor. executor : ProcessPoolExecutor Executor used to manage the parallel processes. """
[docs] def __init__(self, Nprocesses, class_name, *args, **kwargs): """ Initializes the Parallel object with a specified number of processes. Parameters ---------- Nprocesses : int The number of processes to use for parallel execution. class_name : class The class that contains the 'objective_function' method. args : tuple Positional arguments passed to the class constructor. kwargs : dict Keyword arguments passed to the class constructor. """ if not hasattr(class_name, "objective_function"): raise ValueError(f"Class {class_name.__name__} does not have the required 'objective_function' method.") self.Nprocesses = Nprocesses self.class_name = class_name self.args = args self.kwargs = kwargs # Initialize the executor with the provided class and arguments self.executor = ProcessPoolExecutor( max_workers=self.Nprocesses, initializer=_init_process, initargs=(self.class_name, self.args, self.kwargs) )
[docs] def __del__(self): """Shuts down the executor and cleans up resources.""" self.cleanUp()
[docs] def objective(self, X): """ Evaluates the objective function in parallel using multiple processes. Parameters ---------- X : list of list of float A list of input arrays to evaluate the objective function. Returns ------- list of float A list containing the results of the objective function evaluations. """ results = [sys.float_info.max] * len(X) futures = {self.executor.submit(_worker_function, x, i): i for i, x in enumerate(X)} for future in as_completed(futures): try: idx, value = future.result() results[idx] = value except Exception as e: print(f"Error evaluating function: {e}", file=sys.stderr) return results
[docs] def cleanUp(self): """Shuts down the executor and waits for all processes to finish.""" self.executor.shutdown(wait=True)
[docs] def __call__(self, X): """ Allows the object to be called directly. Parameters ---------- X : list[list[float]] A list of input arrays to evaluate the objective function. Returns ------- list of float A list containing the results of the objective function evaluations. """ return self.objective(X)
[docs]class Thread_Parallel: """ Parallelizes the evaluation of an objective function using multiple threads. This class uses a thread pool executor to parallelize the evaluation of the objective function across multiple threads. Each thread is provided its own instance of the `class_name` class, ensuring thread safety and allowing the objective function to be evaluated independently on each input. Working Principle: - When the `objective` method is called, a list of inputs (`X`) is passed, and the function evaluations are distributed across the available threads. - Each thread calls the `__get_instance` method, which ensures that each thread gets its own instance of the class `class_name`. This avoids race conditions by ensuring that each thread works with a separate object. - The `__calc_func` method evaluates the objective function for each input `x` by calling the `objective_function` method of the thread-local instance. - The results are collected from the threads as they complete, and the final output is returned after all evaluations have finished. Parameters ---------- Nthreads : int The number of threads to use for parallel processing. class_name : class The class containing the 'objective_function' method. args : tuple Positional arguments passed to the class constructor. kwargs : dict Keyword arguments passed to the class constructor. Raises ------ ValueError : If the provided class does not have an 'objective_function' method. """
[docs] def __init__(self, Nthreads, class_name, *args, **kwargs): """ Initializes the Thread_Parallel object with the specified number of threads. Parameters ---------- Nthreads : int The number of threads to use for parallel execution. class_name : class The class that contains the 'objective_function' method. args : tuple Positional arguments passed to the class constructor. kwargs : dict Keyword arguments passed to the class constructor. """ if not hasattr(class_name, "objective_function"): raise ValueError(f"Class {class_name.__name__} does not have the required 'objective_function' method.") self.Nthreads = Nthreads self.class_name = class_name self.args = args self.kwargs = kwargs # Initialize the thread pool executor self.executor = ThreadPoolExecutor(max_workers=self.Nthreads) # Thread-local storage to ensure thread-safe instances self.thread_local = threading.local()
[docs] def __del__(self): """Shuts down the executor and cleans up resources.""" self.cleanUp()
def __get_instance(self): """ Returns a thread-local instance of the class. Each thread is provided its own instance of the class to avoid race conditions. Returns ------- object An instance of the class. """ if not hasattr(self.thread_local, "obj"): # Create and store the instance in thread-local storage self.thread_local.obj = self.class_name(*self.args, **self.kwargs) return self.thread_local.obj def __calc_func(self, x): """ Evaluates the objective function in a thread-safe manner. Parameters ---------- x : list Input to the objective function. Returns ------- float The value of the objective function at the input `x`. """ obj = self.__get_instance() return obj.objective_function(x)
[docs] def objective(self, X): """ Evaluates the objective function in parallel for multiple inputs. Parameters ---------- X : list[list[float]] A list of input arrays to evaluate the objective function. Returns ------- list of float A list containing the results of the objective function evaluations. """ # Initialize the results list with a maximum float value results = [sys.float_info.max] * len(X) # Create futures dictionary to track tasks and their indices futures = {self.executor.submit(self.__calc_func, x): i for i, x in enumerate(X)} # Process the completed futures and store results for future in as_completed(futures): idx = futures[future] try: results[idx] = future.result() except Exception as e: # Print error if evaluation fails print(f"Error evaluating function at index {idx}: {e}", file=sys.stderr) return results
[docs] def cleanUp(self): """Shuts down the executor and waits for all threads to finish.""" self.executor.shutdown(wait=True)
[docs] def __call__(self, X): """ Allows the object to be called directly. Parameters ---------- X : list of list of float A list of input arrays to evaluate the objective function. Returns ------- list of float A list containing the results of the objective function evaluations. """ return self.objective(X)