morpheus_framework.helpers package

Submodules

morpheus_framework.helpers.fits_helper module

morpheus_core.helpers.fits_helper.create_file(file_name: str, shape: Tuple[int], dtype: numpy.dtype) → None[source]

Creates a fits file without loading it into memory.

This is a helper method to create large FITS files without loading an array into memory. The method follows the direction given at: http://docs.astropy.org/en/stable/generated/examples/io/skip_create-large-fits.html

Parameters:
  • file_name (str) – the complete path to the file to be created.
  • data_shape (tuple) – a tuple describe the shape of the file to be created, the shape should be one of the following shapes: (n, w, h) or (n, w, h, 2)
  • dtype (numpy.dtype) – the numpy datatype used in the array
Returns:

None

TODO: Figure out why this throws warning about size occasionally
when files that are created by it are opened
morpheus_core.helpers.fits_helper.dtype_to_bytes_per_value(dtype: numpy.dtype) → int[source]

Gets the number of bytes as an int for each numpy datatype.

Parameters:

dtype (np.dtype) – the numpy datatype to get the bytes for

Returns:

The number of bytes, as an int, for the given numpy datatype

Raises:
  • ValueError for a value that is not one of – np.uint8, np.int16,
  • np.int32, np.float32, np.float64
morpheus_core.helpers.fits_helper.open_file(file_name: str, mode: str = 'readonly') → Tuple[astropy.io.fits.hdu.hdulist.HDUList, numpy.ndarray][source]

Gets the HDULS and data handles for all the files in file_names.

This is a convience function to opening a singls FITS file using memmap.

Args:
file_name (str): filename to open mode (str): the mode to pass to fits.open
Returns:
Tuple containing the HDUL and the corresponding numpy array
morpheus_core.helpers.fits_helper.open_files(file_names: List[str], mode: str = 'readonly') → Tuple[List[astropy.io.fits.hdu.hdulist.HDUList], List[numpy.ndarray]][source]

Gets the HDULS and data handles for all the files in file_names.

This is a convience function to opening multiple FITS files using memmap.

Args:
file_names (List[str]): a list of file names including paths to FITS
files

mode (str): the mode to pass to fits.open

Returns:
Tuple of a list numpy arrays that are the mmapped data handles for each of the FITS files and the HDULs that go along with them

morpheus_framework.helpers.label_helper module

morpheus_core.helpers.label_helper.finalize_rank_vote(n: numpy.ndarray, final_map: List[Tuple[int, int]], output: numpy.ndarray) → numpy.ndarray[source]

Performs final calulation on completely classified pixels.

Parameters:
  • n (np.ndarray) – an array containing the total number of times a each pixel has been classified
  • final_map (np.ndarray) – an boolean array indicating which pixels are finished being classified
  • output (np.ndarray) – an array containing the current running classifications
Returns:

An array with the same shape as output with updated values according to the final_map parameter.

morpheus_core.helpers.label_helper.finalize_variance(n: numpy.ndarray, final_map: List[Tuple[int, int]], curr_sn: numpy.ndarray) → numpy.ndarray[source]

The second of two methods used to calculate the variance online.

This method calculates the final variance value using equation 25 from http://people.ds.cam.ac.uk/fanf2/hermes/doc/antiforgery/stats.pdf but without performing the square root.

Parameters:
  • n (np.ndarray) – the current number of values included in the calculation
  • List[ (final_map) – a list of indices to calculate the final variance for
  • curr_sn (np.ndarray) – the current $S_n$ values
Returns:

A np.ndarray with the current $S_n$ values and variance values for all indices in final_map

morpheus_core.helpers.label_helper.get_final_map(total_shape: Tuple[int, int], update_mask_shape: Tuple[int, int], stride: Tuple[int, int], output_idx: Tuple[int, int]) → List[Tuple[int, int]][source]

Creates a boolean array indicating which pixels have completed classification.

Parameters:
  • total_shape (Tuple[int, int]) – The (height, width) of the total image indices in the array should be updated
  • update_mask_shape (Tuple[int, int]) – The (height, width) of the update mask
  • stride (Tuple[int, int]) – The distance, in pixels, to move along the (height, width) of the image.
  • output_idx (Tuple[int, int]) – the y, x value that idicate where in the image the update is happening
Returns:

A list of tuples that contain the (y,x) coordinates that are done being classified.

morpheus_core.helpers.label_helper.get_mean_var_array(shape: Union[List[int], Tuple[int]], write_to: str = None) → Tuple[Optional[astropy.io.fits.hdu.hdulist.HDUList], numpy.ndarray][source]

Make label arrays for storing the model output.

Parameters:
  • shape (Union[List[int], Tuple[int]]) – Gets the array for storing n values
  • write_to (str) – If supplied is the place where to write the array. Otherwise the array is created in memory
Returns:

A 2-tuple where the first item if write_to is supplied, otherwise None and the second item is a numpy array

morpheus_core.helpers.label_helper.get_n_array(shape: Union[List[int], Tuple[int]], write_to: str = None) → Tuple[Optional[astropy.io.fits.hdu.hdulist.HDUList], numpy.ndarray][source]

Make label arrays for storing the model output.

Parameters:
  • shape (Union[List[int], Tuple[int]]) – Gets the array for storing n values
  • write_to (str) – If supplied is the place where to write the array. Otherwise the array is created in memory
Returns:

A 2-tuple where the first item if write_to is supplied, otherwise None and the second item is a numpy array

morpheus_core.helpers.label_helper.get_rank_vote_array(shape: Union[List[int], Tuple[int]], write_to: str = None) → Tuple[Optional[astropy.io.fits.hdu.hdulist.HDUList], numpy.ndarray][source]

Make label arrays for storing the model output.

Parameters:
  • shape (Union[List[int], Tuple[int]]) – Gets the array for storing n values
  • write_to (str) – If supplied is the place where to write the array. Otherwise the array is created in memory
Returns:

A 2-tuple where the first item if write_to is supplied, otherwise None and the second item is a numpy array

morpheus_core.helpers.label_helper.get_windowed_index_generator(img_wh: Tuple[int, int], window_shape: Tuple[int, int], stride: Tuple[int, int] = (1, 1)) → Iterable[Tuple[int, int]][source]

Creates a generator that returns window limited indices over a 2d array.

Parameters:
  • img_wh (Tuple[int, int]) – The (height, width) of the total image size
  • window_shape (Tuple[int, int]) – The (height, width) of the input/output to the classifier
  • stride (Tuple[int, int]) – The distance, in pixels, to move along the (height, width) of the image.
Returns:

An iterable containing tuples of ints that are the indexes to use to extract samples from the large image.

morpheus_core.helpers.label_helper.iterative_mean(n: numpy.ndarray, curr_mean: numpy.ndarray, x_n: numpy.ndarray, update_mask: numpy.ndarray) → numpy.ndarray[source]

Calculates the mean of collection in an online fashion. The values are calculated using the following equation: http://people.ds.cam.ac.uk/fanf2/hermes/doc/antiforgery/stats.pdf, eq. 4

Parameters:
  • n (np.ndarray) – a 2d array containing the number of terms used in the mean
  • curr_mean (np.ndarray) – the current calculated mean
  • x_n (np.ndarray) – the new values to add to the mean
  • update_mask (np.ndarray) – a 2d boolean array indicating which indices in the array should be updated
Returns:

An array with the same shape as the curr_mean with the updated mean values

morpheus_core.helpers.label_helper.iterative_variance(prev_sn: numpy.ndarray, x_n: numpy.ndarray, curr_mean: numpy.ndarray, next_mean: numpy.ndarray, update_mask: numpy.ndarray) → numpy.ndarray[source]

The first of two methods used to calculate the variance online.

This method specifically calculates the $S_n$ value as indicated in equation 24 from:

http://people.ds.cam.ac.uk/fanf2/hermes/doc/antiforgery/stats.pdf

Parameters:
  • prev_sn (np.ndarray) – the $S_n$ value from the previous step
  • x_n (np.ndarray) – the current incoming values
  • curr_mean (np.ndarray) – the mean that was previously calculated
  • next_mean (np.ndarray) – the mean, including the current values
  • update_mask (np.ndarray) – a boolean mask indicating which values to update
Returns:

An np.ndarray containg the current value for $S_n$

morpheus_core.helpers.label_helper.update_mean_var(update_mask: numpy.ndarray, stride: Tuple[int, int], n: numpy.ndarray, output: numpy.ndarray, single_out: numpy.ndarray, output_idx: Tuple[int, int]) → None[source]

Updates the mean and variance with the recently classified values.

Parameters:
  • update_mask (np.ndarray) – a 2d boolean array indicating which indices in the array should be updated
  • stride (Tuple[int, int]) – How many (rows, columns) to move through the image at each iteration.
  • n (np.ndarray) – a 2d array containing the number of terms used in the mean
  • output (np.ndarray) – The current running output array containing the overall mean and variance
  • single_out (np.ndarray) – The new output values to update the mean and variance with
  • output_idx (Tuple[int, int]) – the y, x values that idicate where in the image the updates should happen
Returns:

None

morpheus_core.helpers.label_helper.update_n(update_mask: numpy.ndarray, n: numpy.ndarray, output_idx: Tuple[int, int]) → numpy.ndarray[source]

Updates the counts that are stored in ‘n’ array.

Parameters:
  • update_mask (np.ndarray) – a 2d boolean array indicating which indices in the array should be updated
  • n (np.ndarray) – a 2d array containing the number of terms used in the mean
  • output_idx (Tuple[int, int]) – the y, x values that idicate where in the image the updates should happen
Returns:

The n array with updated values

morpheus_core.helpers.label_helper.update_rank_vote(update_mask: numpy.ndarray, stride: Tuple[int, int], n: numpy.ndarray, output: numpy.ndarray, single_output: numpy.ndarray, output_idx: Tuple[int, int]) → None[source]

Updates the rank vote values with the recently classified output.

Parameters:
  • update_mask (np.ndarray) – a 2d boolean array indicating which indices in the array should be updated
  • stride (Tuple[int, int]) – How many (rows, columns) to move through the image at each iteration.
  • n (np.ndarray) – an array containing the total number of times a each pixel has been classified
  • output (np.ndarray) – an array containing the current running classifications
  • final_map (np.ndarray) – an boolean array indicating which pixels are finished being classified
  • single_output (np.ndarray) – The new output values to update the mean and variance with
  • output_idx (Tuple[int, int]) – the y, x values that idicate where in the image the updates should happen
Returns:

None

morpheus_core.helpers.label_helper.update_single_class_mean_var(update_mask: numpy.ndarray, n: numpy.ndarray, mean_var: numpy.ndarray, x_n: numpy.ndarray) → Tuple[numpy.ndarray, numpy.ndarray][source]

Updates the mean and variance for a single class.

Parameters:
  • update_mask (np.ndarray) – a 2d boolean array indicating which indices in the array should be updated
  • n (np.ndarray) – a 2d array containing the number of terms used in the mean
  • mean_var (np.ndarray) – the current calculated mean and variance
  • x_n (np.ndarray) – the new values to add to update the mean and variance
Returns:

A tuple containing two numpy arrays that contain the updated mean and variance repsectively

morpheus_framework.helpers.misc_helper module

morpheus_core.helpers.misc_helper.apply(f: Callable, args: Iterable, kwargs: Iterable[dict] = None) → None[source]

Applies the function f to the args and kwargs.

Parameters:
  • f (Callable) – fucntion to apply
  • args (Iterable) – iterable to apply f to
  • kwargs (Iterable[dict]) – iterable of a dict of kwargs to apply with each element in args
Returns:

None

morpheus_core.helpers.misc_helper.arrays_not_same_size(inputs: List[numpy.ndarray]) → bool[source]

Validates that all input arrays are the same size.

Parameters:inputs (List[np.ndarray]) – Input arrays to validate
Returns:true if the arrays are the same size and false if they are not
morpheus_core.helpers.misc_helper.vaidate_input_types_is_str(inputs: List[Union[str, numpy.ndarray]]) → bool[source]

Validates that the inputs are all the same type and one of str or np.ndarray.

Parameters:

inputs (List[Union[str, np.ndarray]]) – List of inputs to validate

Returns:

true if the inputs are str and false if the inputs are np.ndarray

Raises:
  • ValueError if all inputs are not the same type
  • ValueError if the types are other than np.ndarray or str
morpheus_core.helpers.misc_helper.validate_parallel_params(gpus: List[int] = None, cpus: int = None, out_dir: str = None) → Tuple[List[int], bool][source]

Validates that the parallel params.

Parameters:
  • gpus (List[int]) – list GPU ids to use for parallel classification
  • cpus (int) – number of cpus to use for parallel classification
Returns:

A tuple where the first element is a List of integer id values for each worker. The second element is true if the ids are gpu ids and false if they are cpu ids

Raises:
  • ValueError if both gpus and cpus are given
  • ValueError is cpus or gpus are given, but out_dir is not given
  • ValueError if len(gpus)==1
  • ValueError if cpus<2

morpheus_framework.helpers.parallel_helper module

morpheus_core.helpers.parallel_helper.build_parallel_classification_structure(model: Callable, arrs: List[numpy.ndarray], arr_fnames: List[str], n_classes: int, batch_size: int, window_shape: Tuple[int], dilation: int, stride: Union[Tuple[int], List[int]], update_map: numpy.ndarray, aggregate_method: str, out_dir: str, workers: List[int]) → None[source]

Sets up the subdirs and files to run the parallel classification.

Parameters:
  • arrs (List[np.ndarray]) – List of arrays to split up in the order HJVZ
  • arr_fnames (List[str]) – The file names that hold the input arrays arrs
  • workers (List[int]) – A list of worker ID’s that can either be CUDA GPU ID’s or a list dummy numbers for cpu workers
  • batch_size (int) – The batch size for Morpheus to use when classifying the input.
  • window_shape (Tuple[int]) – The (height, width) tuple describing the size of the sliding window.
  • out_dir (str) – the location to place the subdirs in
Returns:

None

TODO: Refactor to a more functional implementation

morpheus_core.helpers.parallel_helper.check_procs(procs: Dict[int, subprocess.Popen]) → List[bool][source]

Checks on the status of running jobs.

Parameters:procs (Dict[int, Popen]) – A dictionary where the keys are the worker ids and the values are the process objects
Returns:A list of booleans indicating if the processes are finished.
morpheus_core.helpers.parallel_helper.get_data_from_worker(out_dir: str, worker: int) → Tuple[numpy.ndarray, numpy.ndarray][source]

Returns the n array and the output classifications for a given worker

Parameters:
  • out_dir (str) – The directory where the workers are storing their ouptut
  • worker (int) – The worker id to get the data for
Returns:

A 2-Tuple where the first element is the worker output array and the second element is n array.

morpheus_core.helpers.parallel_helper.get_empty_output_array(out_dir: str, height: int, width: int, n_classes: int, aggregation_method: str) → numpy.ndarray[source]

Creates an empty array in the output dir and returns a memmapped array for it

Parameters:
  • out_dir (str) – The output directory to store the array in
  • height (int) – The output image height
  • width (int) – The output image width
  • n_classes (int) – The number classes the model predicts
  • aggregation_method (str) – The method to use for merging outputs one of morpheus_core.AGGREGATION_METHODS.MEAN_VAR or morpheus_core.AGGREGATION_METHODS.RANK_VOTE
Returns:

A 4-Tuple where the first element is the HDUL for output array, the second element is the HDUL for the n array, the third element is the output array, the fourth element is the n array.

morpheus_core.helpers.parallel_helper.get_merge_function(aggreation_method: str) → Callable[source]

Returns the method for merging arrays based on the aggregation method.

Parameters:aggregation_method (str) – The aggregation method used one of morpheus_core.AGGREGATION_METHODS.MEAN_VAR or morpheus_core.AGGREGATION_METHODS.RANK_VOTE
Returns:A function the use for merging output arrays
morpheus_core.helpers.parallel_helper.get_split_length(shape: List[int], num_workers: int, window_shape: Tuple[int]) → int[source]

Calculate the size of the sub images for classification.

Parameters:
  • shape (List[int]) – the shape of the array to be split
  • num_workers (int) – the number of splits to make
  • window_shape (Tuple[int]) – The (height, width) tuple describing the size of the sliding window.
Returns:

The length of each split along axis 0

TODO: Implement splits along other axes

morpheus_core.helpers.parallel_helper.get_split_slice_generator(shape: Tuple[int], window_shape: Tuple[int], num_workers: int, split_length: int) → Iterable[slice][source]

Creates a generator that yields slice objects to split imgs.

Parameters:
  • shape (Tuple[int]) – The shape of the array to be split
  • window_shape (Tuple[int]) – The (height, width) tuple describing the size of the sliding window.
  • num_workers (int) – The number of splits to make
  • split_length (int) – The length each slice should be
Returns
A generator that yields slice objects

TODO: Implement splits along other axes TODO: Refactor to a more functional implementation

morpheus_core.helpers.parallel_helper.get_start_y_idxs(n_heights: List[int], window_height: int) → List[int][source]

Gets the y indexes to crop and merge arrays with.

Parameters:
  • n_heights (List[int]) – The heights of the cropped arrays
  • window_height (int) – The height of the a single input/output from the model
Returns:

The y index values to use for merging the arrays.

morpheus_core.helpers.parallel_helper.make_runnable_file(path: str, input_fnames: List[str], n_classes: int, batch_size: int, window_size: Union[Tuple[int], List[int]], dilation: int, stride: Union[Tuple[int], List[int]], aggregate_method: str) → None[source]

Creates a file at path that classfies local FITS files.

Parameters:
  • path (str) – The dir to save the file in
  • input_fnames (List[str]) – The list of file names that contain the arrays to convert into batches and serve to the model
  • n_classes (int) – The number of classes that the models predicts for
  • batch_size (int) – The batch size for the model to use when classifying the input
  • window_size (Union[Tuple[int], List[int]]) – The (h, w) of each example in a batch
  • stride (Union[Tuple[int], List[int]]) – The stride size of the sliding window
  • aggregate_method (str) – how to process the output from the model. If AGGREGATION_METHODS.MEAN_VAR record output using mean and variance, If AGGREGATION_METHODS.RANK_VOTE record output as the normalized vote count.
Returns:

None

morpheus_core.helpers.parallel_helper.merge_parallel_mean_var(combined_out: numpy.ndarray, combined_n: numpy.ndarray, output: numpy.ndarray, n: numpy.ndarray, start_y: int) → None[source]

Merge the output from a worker into the total output for mean/var.

Derived from: https://www.emathzone.com/tutorials/basic-statistics/combined-variance.html

Parameters:
  • combined_out (np.ndarray) – The total output array
  • combined_n (np.ndarray) – The total n array
  • output (np.ndarray) – The output to merge into the total output
  • n (np.ndarray) – The n to merge into the total n
  • start_y (int) – The y index to merge into output into combined_out
Returns:

None, the operation is performed inplace on combined_out and combined_n

morpheus_core.helpers.parallel_helper.merge_parallel_rank_vote(combined_out: numpy.ndarray, combined_n: numpy.ndarray, output: numpy.ndarray, n: numpy.ndarray, start_y: int) → None[source]

Merge the output from a worker into the total output for rank vote.

Parameters:
  • combined_out (np.ndarray) – The total output array
  • combined_n (np.ndarray) – The total n array
  • output (np.ndarray) – The output to merge into the total output
  • n (np.ndarray) – The n to merge into the total n
  • start_y (int) – The y index to merge into output into combined_out
Returns:

None, the operation is performed inplace on combined_out and combined_n

morpheus_core.helpers.parallel_helper.monitor_procs(procs: Dict[int, subprocess.Popen], parallel_check_interval: int) → None[source]

Monitors the progress of running subprocesses.

Parameters:
  • procs (Dict[int, Popen]) – A dictionary where the keys are the worker ids and the values are the process objects
  • parrallel_check_interval (int) – An integer
morpheus_core.helpers.parallel_helper.run_parallel_jobs(workers: List[int], is_gpu: bool, out_dir: str, parallel_check_interval: float) → None[source]

Starts and tracks parallel job runs.

WARNING: This will not finish running until all subprocesses are complete

Parameters:
  • workers (List[int]) – A list of worker ID’s to assign to a portion of an image.
  • is_gpu (bool) – if True the worker ID’s belong to NVIDIA GPUs and will be used as an argument in CUDA_VISIBLE_DEVICES. If False, then the ID’s are assocaited with CPU workers
  • out_dir (str) – the location with the partitioned data
  • parallel_check_interval (float) – If gpus are given, then this is the number of minutes to wait between polling each subprocess for completetion.
Returns:

None

morpheus_core.helpers.parallel_helper.stitch_parallel_classifications(workers: List[int], out_dir: str, aggregation_method: str, window_shape: Tuple[int]) → Tuple[List[astropy.io.fits.hdu.hdulist.HDUList], List[numpy.ndarray]][source]

Merges all of the output from the workers into a single classification image.

Parameters:
  • workers (List[int]) – List of integer ids associated with workers
  • out_dir (str) – The output directory that the worker classifications are stored in.
  • aggregation (str) – The morpheus_core.AGGREGATION_METHODS value to use to merge the output arrays
  • window_shape (Tuple[int, int]) – The (width, height) of the input output image data.
Returns:

A 2-Tuple, where the first element is a list of HDULs for the merged data and the second element is the merged arrays.

morpheus_core.helpers.parallel_helper.worker_to_cmd(is_gpu: bool, worker: int) → str[source]

Returns a the bash command to run a worker job.

Parameters:
  • is_gpu (bool) – True if worker is a gpu worker false if cpu worker
  • worker (int) – The worker id, this is the GPU id for gpu workers
Returns:

A string containing the bash command to run a worker job.

Module contents