Source code for morpheus_core.helpers.parallel_helper

# MIT License
# Copyright 2020 Ryan Hausen
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# ==============================================================================

from functools import partial
import os
from os.path import split
import pickle
import shutil
import time
from itertools import repeat, starmap
from subprocess import Popen
from typing import Callable, Dict, Iterable, List, Tuple, Union

import dill
import numpy as np
from astropy.io import fits
from tqdm import tqdm

import morpheus_core.helpers.misc_helper as mh
import morpheus_core.helpers.fits_helper as fh
from morpheus_core import morpheus_core


[docs]def get_split_length( shape: List[int], num_workers: int, window_shape: Tuple[int] ) -> int: """Calculate the size of the sub images for classification. Args: shape (List[int]): the shape of the array to be split num_workers (int): the number of splits to make window_shape (Tuple[int]): The (height, width) tuple describing the size of the sliding window. Returns: The length of each split along axis 0 TODO: Implement splits along other axes """ return (shape[0] + (num_workers - 1) * window_shape[0]) // num_workers
[docs]def get_split_slice_generator( shape: Tuple[int], window_shape: Tuple[int], num_workers: int, split_length: int ) -> Iterable[slice]: """Creates a generator that yields `slice` objects to split imgs. Args: shape (Tuple[int]): The shape of the array to be split window_shape (Tuple[int]): The (height, width) tuple describing the size of the sliding window. num_workers (int): The number of splits to make split_length (int): The length each slice should be Returns A generator that yields slice objects TODO: Implement splits along other axes TODO: Refactor to a more functional implementation """ start_ys = get_start_y_idxs( list(repeat(split_length, num_workers - 1)), window_height=window_shape[0] ) end_ys = start_ys + split_length end_ys[-1] = shape[0] return starmap(slice, zip(start_ys, end_ys))
# idx = 0 # for i in range(num_workers): # start_idx = max(idx - window_shape[0] - 1, 0) # if i == num_workers - 1: # end_idx = shape[0] # else: # end_idx = start_idx + split_length - 1 # idx = end_idx # yield slice(start_idx, end_idx)
[docs]def make_runnable_file( path: str, input_fnames: List[str], n_classes: int, batch_size: int, window_size: Union[Tuple[int], List[int]], dilation: int, stride: Union[Tuple[int], List[int]], aggregate_method: str, ) -> None: """Creates a file at `path` that classfies local FITS files. Args: path (str): The dir to save the file in input_fnames (List[str]): The list of file names that contain the arrays to convert into batches and serve to the model n_classes (int): The number of classes that the models predicts for batch_size (int): The batch size for the model to use when classifying the input window_size (Union[Tuple[int], List[int]]): The (h, w) of each example in a batch stride (Union[Tuple[int], List[int]]): The stride size of the sliding window aggregate_method (str): how to process the output from the model. If AGGREGATION_METHODS.MEAN_VAR record output using mean and variance, If AGGREGATION_METHODS.RANK_VOTE record output as the normalized vote count. Returns: None """ # we need `local` so that we can import morpheus_core just in case the pip env # doesn't carry over to the new process local = os.path.dirname(os.path.dirname(__file__)) text = [ "import sys", f"sys.path.append('{local}')", "import os", "import dill", "import numpy as np", "from tqdm import tqdm", "from morpheus_core import morpheus_core", "def main():", " output_dir = './output'", " if 'output' not in os.listdir():", " os.mkdir('./output')", "", " with open('model.pkl', 'rb') as f:", " model = dill.load(f)", "", " model_inputs = [", " " + ",".join(["'" + i + "'" for i in input_fnames]), " ]", "", " update_map = np.load('update_map.npy', allow_pickle=True)", "", " morpheus_core.predict(", " model,", " model_inputs,", f" {n_classes},", f" {batch_size},", f" {window_size},", f" {dilation},", f" stride={stride},", " update_map=update_map,", f" aggregate_method='{aggregate_method}',", " out_dir=output_dir,", " )", " sys.exit(0)", "if __name__=='__main__':", " main()", ] with open(os.path.join(path, "main.py"), "w") as f: f.write("\n".join(text))
[docs]def build_parallel_classification_structure( model: Callable, arrs: List[np.ndarray], arr_fnames: List[str], n_classes: int, batch_size: int, window_shape: Tuple[int], dilation: int, stride: Union[Tuple[int], List[int]], update_map: np.ndarray, aggregate_method: str, out_dir: str, workers: List[int], ) -> None: """Sets up the subdirs and files to run the parallel classification. Args: arrs (List[np.ndarray]): List of arrays to split up in the order HJVZ arr_fnames (List[str]): The file names that hold the input arrays `arrs` workers (List[int]): A list of worker ID's that can either be CUDA GPU ID's or a list dummy numbers for cpu workers batch_size (int): The batch size for Morpheus to use when classifying the input. window_shape (Tuple[int]): The (height, width) tuple describing the size of the sliding window. out_dir (str): the location to place the subdirs in Returns: None TODO: Refactor to a more functional implementation """ shape = arrs[0].shape num_workers = len(workers) split_slices = get_split_slice_generator( shape, window_shape, num_workers, get_split_length(shape, num_workers, window_shape), ) for worker, split_slice in tqdm(zip(sorted(workers), split_slices)): sub_output_dir = os.path.join(out_dir, str(worker)) os.mkdir(sub_output_dir) # put sliced input files into subdir for name, data in zip(arr_fnames, arrs): tmp_location = os.path.join(sub_output_dir, os.path.split(name)[1]) fits.PrimaryHDU(data=data[split_slice, ...]).writeto(tmp_location) # put model into subdir with open(os.path.join(sub_output_dir, "model.pkl"), "wb") as f: dill.dump(model, f) # put udpate_map into subdir if update_map is None: update_map = np.ones(window_shape) np.save(os.path.join(sub_output_dir, "update_map.npy"), update_map) make_runnable_file( sub_output_dir, arr_fnames, n_classes, batch_size, window_shape, dilation, stride, aggregate_method, )
[docs]def worker_to_cmd(is_gpu: bool, worker: int) -> str: """Returns a the bash command to run a worker job. Args: is_gpu (bool): True if worker is a gpu worker false if cpu worker worker (int): The worker id, this is the GPU id for gpu workers Returns: A string containing the bash command to run a worker job. """ if is_gpu: return f"CUDA_VISIBLE_DEVICES={worker} python main.py" else: return f"CUDA_VISIBLE_DEVICES=-1 python main.py"
[docs]def check_procs(procs: Dict[int, Popen]) -> List[bool]: """Checks on the status of running jobs. Args: procs (Dict[int, Popen]): A dictionary where the keys are the worker ids and the values are the process objects Returns: A list of booleans indicating if the processes are finished. """ return list( map( # if poll() returns None the process is still running lambda p: procs[p].poll() == None, procs, ) )
[docs]def monitor_procs(procs: Dict[int, Popen], parallel_check_interval: int) -> None: """Monitors the progress of running subprocesses. Args: procs (Dict[int, Popen]): A dictionary where the keys are the worker ids and the values are the process objects parrallel_check_interval (int): An integer """ wait_f = lambda: not bool(time.sleep(parallel_check_interval)) all( map( # if there any running processes, then time.sleep will get called # which always returns None and therefore false which is negated # to continue the loop # # if there are no running processes, then the conditional is # shortcutted and the expression returns false ending the loop lambda running_procs: any(running_procs) and wait_f(), map(check_procs, repeat(procs)), ) )
[docs]def run_parallel_jobs( workers: List[int], is_gpu: bool, out_dir: str, parallel_check_interval: float ) -> None: """Starts and tracks parallel job runs. WARNING: This will not finish running until all subprocesses are complete Args: workers (List[int]): A list of worker ID's to assign to a portion of an image. is_gpu (bool): if True the worker ID's belong to NVIDIA GPUs and will be used as an argument in CUDA_VISIBLE_DEVICES. If False, then the ID's are assocaited with CPU workers out_dir (str): the location with the partitioned data parallel_check_interval (float): If gpus are given, then this is the number of minutes to wait between polling each subprocess for completetion. Returns: None """ proc_cmds = [worker_to_cmd(is_gpu, w) for w in workers] subdirs = [os.path.join(out_dir, str(w)) for w in workers] processes = { w: Popen(p, shell=True, cwd=s) for w, p, s in zip(workers, proc_cmds, subdirs) } monitor_procs(processes, parallel_check_interval)
[docs]def merge_parallel_mean_var( combined_out: np.ndarray, combined_n: np.ndarray, output: np.ndarray, n: np.ndarray, start_y: int, ) -> None: """Merge the output from a worker into the total output for mean/var. Derived from: https://www.emathzone.com/tutorials/basic-statistics/combined-variance.html Args: combined_out (np.ndarray): The total output array combined_n (np.ndarray): The total n array output (np.ndarray): The output to merge into the total output n (np.ndarray): The n to merge into the total n start_y (int): The y index to merge into output into combined_out Returns: None, the operation is performed inplace on combined_out and combined_n """ ys = slice(start_y, start_y + output.shape[0]) x1, x2 = combined_out[ys, ..., 0].copy(), output[..., 0].copy() s1, s2 = combined_out[ys, ..., 1].copy(), output[..., 1].copy() n1, n2 = combined_n[ys, :, np.newaxis].copy(), n[..., np.newaxis].copy() denominator = n1 + n2 safe_divide_mask = denominator > 0 xc_numerator = (n1 * x1) + (n2 * x2) xc = np.where(safe_divide_mask, xc_numerator / denominator, 0) sc_numerator = (n1 * (s1 + (x1 - xc) ** 2)) + (n2 * (s2 + (x2 - xc) ** 2)) sc = np.where(safe_divide_mask, sc_numerator / denominator, 0) combined_out[ys, ..., 0] = xc combined_out[ys, ..., 1] = sc combined_n[ys, ...] = denominator[..., 0]
[docs]def merge_parallel_rank_vote( combined_out: np.ndarray, combined_n: np.ndarray, output: np.ndarray, n: np.ndarray, start_y: int, ) -> None: """Merge the output from a worker into the total output for rank vote. Args: combined_out (np.ndarray): The total output array combined_n (np.ndarray): The total n array output (np.ndarray): The output to merge into the total output n (np.ndarray): The n to merge into the total n start_y (int): The y index to merge into output into combined_out Returns: None, the operation is performed inplace on combined_out and combined_n """ ys = slice(start_y, start_y + output.shape[0]) x1, x2 = combined_out[ys, ...].copy(), output.copy() n1, n2 = combined_n[ys, :, np.newaxis].copy(), n[..., np.newaxis].copy() numerator = (n1 * x1) + (n2 * x2) denominator = n1 + n2 mean = np.where(denominator > 0, numerator / denominator, 0) combined_out[ys, :] = mean combined_n[ys, :] = denominator[..., 0]
[docs]def get_merge_function(aggreation_method: str) -> Callable: """Returns the method for merging arrays based on the aggregation method. Args: aggregation_method (str): The aggregation method used one of morpheus_core.AGGREGATION_METHODS.MEAN_VAR or morpheus_core.AGGREGATION_METHODS.RANK_VOTE Returns: A function the use for merging output arrays """ if aggreation_method == morpheus_core.AGGREGATION_METHODS.MEAN_VAR: return merge_parallel_mean_var else: return merge_parallel_rank_vote
[docs]def get_data_from_worker(out_dir: str, worker: int) -> Tuple[np.ndarray, np.ndarray]: """Returns the n array and the output classifications for a given worker Args: out_dir (str): The directory where the workers are storing their ouptut worker (int): The worker id to get the data for Returns: A 2-Tuple where the first element is the worker output array and the second element is n array. """ return ( fits.getdata(os.path.join(out_dir, str(worker), "output", "output.fits")), fits.getdata(os.path.join(out_dir, str(worker), "output", "n.fits")), )
[docs]def get_empty_output_array( out_dir: str, height: int, width: int, n_classes: int, aggregation_method: str ) -> np.ndarray: """Creates an empty array in the output dir and returns a memmapped array for it Args: out_dir (str): The output directory to store the array in height (int): The output image height width (int): The output image width n_classes (int): The number classes the model predicts aggregation_method (str): The method to use for merging outputs one of morpheus_core.AGGREGATION_METHODS.MEAN_VAR or morpheus_core.AGGREGATION_METHODS.RANK_VOTE Returns: A 4-Tuple where the first element is the HDUL for output array, the second element is the HDUL for the n array, the third element is the output array, the fourth element is the n array. """ if aggregation_method == morpheus_core.AGGREGATION_METHODS.MEAN_VAR: shape = [height, width, n_classes, 2] else: shape = [height, width, n_classes] out_file = os.path.join(out_dir, "output.fits") n_file = os.path.join(out_dir, "n.fits") fh.create_file(out_file, shape, dtype=np.float32) fh.create_file(n_file, shape[:2], dtype=np.float32) out_hdul, out_array = fh.open_file(out_file, mode="update") n_hdul, n_array = fh.open_file(n_file, mode="update") return (out_hdul, n_hdul, out_array, n_array)
[docs]def get_start_y_idxs(n_heights: List[int], window_height: int) -> List[int]: """Gets the y indexes to crop and merge arrays with. Args: n_heights (List[int]): The heights of the cropped arrays window_height (int): The height of the a single input/output from the model Returns: The y index values to use for merging the arrays. """ offset = window_height - 1 return np.cumsum([0] + list(map(lambda y: y - offset, n_heights)))
[docs]def stitch_parallel_classifications( workers: List[int], out_dir: str, aggregation_method: str, window_shape: Tuple[int] ) -> Tuple[List[fits.HDUList], List[np.ndarray]]: """Merges all of the output from the workers into a single classification image. Args: workers (List[int]): List of integer ids associated with workers out_dir (str): The output directory that the worker classifications are stored in. aggregation (str): The morpheus_core.AGGREGATION_METHODS value to use to merge the output arrays window_shape (Tuple[int, int]): The (width, height) of the input output image data. Returns: A 2-Tuple, where the first element is a list of HDULs for the merged data and the second element is the merged arrays. """ data_f = partial(get_data_from_worker, out_dir) outs, ns = list(zip(*map(data_f, workers))) total_y = sum(map(lambda x: x.shape[0], ns)) offset_y = (window_shape[0] - 1) * (len(ns) - 1) new_y = total_y - offset_y new_x = outs[0].shape[1] n_classes = outs[0].shape[2] out_hdul, n_hdul, combined_out, combined_n = get_empty_output_array( out_dir, new_y, new_x, n_classes, aggregation_method ) merge_f = partial(get_merge_function(aggregation_method), combined_out, combined_n) start_ys = get_start_y_idxs(list(map(lambda x: x.shape[0], ns)), window_shape[0]) mh.apply(merge_f, zip(outs, ns, start_ys)) mh.apply(lambda h: h.close(), [out_hdul, n_hdul]) clean_up = lambda w: shutil.rmtree(os.path.join(out_dir, str(w))) mh.apply(clean_up, workers) return ([out_hdul, n_hdul], [combined_out, combined_n])