Source code for morpheus_core.tests.test_parallel_helper

# MIT License
# Copyright 2020 Ryan Hausen
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# ==============================================================================

import os
import time
from astropy.io.fits.convenience import writeto
from astropy.io.fits.hdu.image import PrimaryHDU

import numpy as np
from numpy.lib.stride_tricks import as_strided
import pytest
from astropy.io import fits

import morpheus_core.helpers.parallel_helper as ph
import morpheus_core.morpheus_core as morpheus_core
import morpheus_core.tests.helpers as helper
from morpheus_core.tests.helpers import TMP_DIR


[docs]@pytest.mark.unit def test_get_split_length(): """Tests morpheus_core.helpers.parallel_helper.get_split_length""" shape = (1200, 1200) num_workers = 4 window_shape = (40, 40) expected_length = 330 actual_length = ph.get_split_length(shape, num_workers, window_shape) assert expected_length == actual_length
[docs]@pytest.mark.unit def test_get_split_slice_generator(): """Tests morpheus_core.helpers.parallel_helper.get_spit_slice_generator""" shape = (1200, 1200) window_shape = (40, 40) num_workers = 4 split_length = 330 expected_slices = [ slice(0, 330), slice(291, 621), slice(582, 912), slice(873, 1200), ] actual_slices = list( ph.get_split_slice_generator(shape, window_shape, num_workers, split_length) ) print(actual_slices) assert expected_slices == actual_slices
[docs]@pytest.mark.unit def test_make_runnable_file(): """Tests morpheus_core.helpers.parallel_helper.make_runnable_file""" helper.setup() path = helper.TMP_DIR input_file_names = [helper.make_sample_file()] n_classes = 2 batch_size = 10 dilation = 1 window_size = (10, 10) stride = (1, 1) aggregate_method = morpheus_core.AGGREGATION_METHODS.MEAN_VAR local = os.path.dirname(os.path.dirname(__file__)) expected_text = [ "import sys", f"sys.path.append('{local}')", "import os", "import dill", "import numpy as np", "from tqdm import tqdm", "from morpheus_core import morpheus_core", "def main():", " output_dir = './output'", " if 'output' not in os.listdir():", " os.mkdir('./output')", "", " with open('model.pkl', 'rb') as f:", " model = dill.load(f)", "", " model_inputs = [", " " + ",".join(["'" + i + "'" for i in input_file_names]), " ]", "", " update_map = np.load('update_map.npy', allow_pickle=True)", "", " morpheus_core.predict(", " model,", " model_inputs,", f" {n_classes},", f" {batch_size},", f" {window_size},", f" {dilation},", f" stride={stride},", " update_map=update_map,", f" aggregate_method='{aggregate_method}',", " out_dir=output_dir,", " )", " sys.exit(0)", "if __name__=='__main__':", " main()", ] ph.make_runnable_file( path, input_file_names, n_classes, batch_size, window_size, dilation, stride, aggregate_method, ) with open(os.path.join(path, "main.py"), "r") as f: actual_text = [l.rstrip() for l in f.readlines()] assert expected_text == actual_text helper.tear_down()
[docs]def pickle_rick(x): """A pickleable function to use a mock model""" return None
[docs]@pytest.mark.integration def test_build_parallel_classification_structure(): """Tests morpheus_core.helpers.parallel_helper.build_parallel_classification_structure""" helper.setup() model = pickle_rick arr_fnames = [helper.make_mock_input()] arrs = [fits.getdata(arr_fnames[0])] n_classes = 2 batch_size = 10 window_size = (10, 10) dilation = 1 stride = (1, 1) update_map = np.ones(window_size) aggregate_method = morpheus_core.AGGREGATION_METHODS.MEAN_VAR out_dir = helper.TMP_DIR workers = [0, 1] ph.build_parallel_classification_structure( model, arrs, arr_fnames, n_classes, batch_size, window_size, dilation, stride, update_map, aggregate_method, out_dir, workers, ) expected_parent_tree = set(["input.fits", "0", "1"]) expected_sub_tree = set(["input.fits", "main.py", "model.pkl", "update_map.npy"]) assert expected_parent_tree == set(os.listdir(out_dir)) assert expected_sub_tree == set(os.listdir(os.path.join(out_dir, "0"))) assert expected_sub_tree == set(os.listdir(os.path.join(out_dir, "1"))) helper.tear_down()
[docs]@pytest.mark.unit def test_worker_to_cmd_gpu(): """Tests morpheus_core.helpers.parallel_helper.worker_to_cmd for a gpu worker""" is_gpu = True worker = 1 expected_string = f"CUDA_VISIBLE_DEVICES={worker} python main.py" actual_string = ph.worker_to_cmd(is_gpu, worker) assert expected_string == actual_string
[docs]@pytest.mark.unit def test_worker_to_cmd_cpu(): """Tests morpheus_core.helpers.parallel_helper.worker_to_cmd for a cpu worker""" is_gpu = False worker = 1 expected_string = "CUDA_VISIBLE_DEVICES=-1 python main.py" actual_string = ph.worker_to_cmd(is_gpu, worker) assert expected_string == actual_string
[docs]@pytest.mark.unit def test_check_procs(): """Tests morpheus_core.helpers.parallel_helper.check_procs""" class MockProcess(object): def __init__(self, poll_value): self.poll_value = poll_value def poll(self): return self.poll_value procs = {1: MockProcess(1), 2: MockProcess(1), 3: MockProcess(None)} expected_num_running_procs = 1 actual_num_running_procs = sum(ph.check_procs(procs)) assert expected_num_running_procs == actual_num_running_procs
[docs]@pytest.mark.integration def test_monitor_procs(): """Tests morpheus_core.helpers.parallel_helper.monitor_procs""" class MockProcess(object): def __init__(self, counter_to_complete): self.counter_to_complete = counter_to_complete def poll(self): self.counter_to_complete -= 1 return 0 if self.counter_to_complete <= 0 else None parallel_check_interval = 0.5 procs = {1: MockProcess(2), 2: MockProcess(3)} start_time = time.time() ph.monitor_procs(procs, parallel_check_interval) end_time = time.time() expected_runtime = len(procs) * parallel_check_interval actual_runtime = end_time - start_time # we expected that the mock process dictionary to be queried at least # twice becuase of how the mock processes were instantiated, so we # can verify that the core was called to sleep at least that long by # seeing how much time has past assert actual_runtime >= expected_runtime
# TODO: This test feels hacky and will likely have issues depending on cpu, # write a test that detects running subprocess
[docs]@pytest.mark.flaky(reruns=5) @pytest.mark.integration def test_run_parallel_jobs(): """Tests morpheus_core.helpers.parallel_helper.run_parallel_jobs""" helper.setup() workers = ["0", "1"] for w in workers: os.mkdir(os.path.join(helper.TMP_DIR, w)) with open(os.path.join(helper.TMP_DIR, w, "main.py"), "w") as f: f.write( "\n".join( [ "import time", "def main():", " time.sleep(0.15)", "", "if __name__=='__main__':", " main()", ] ) ) is_gpu = False out_dir = helper.TMP_DIR parallel_check_interval = 0.2 start_time = time.time() ph.run_parallel_jobs(workers, is_gpu, out_dir, parallel_check_interval) end_time = time.time() run_time = end_time - start_time expected_max_time = 0.6 # if the process runs in parallel then they will both be done before the # first check interval assert run_time < expected_max_time helper.tear_down()
[docs]@pytest.mark.unit def test_get_merge_function_mean_var(): """Tests morpheus_core.helpers.parallel_helper.get_merge_function""" method = morpheus_core.AGGREGATION_METHODS.MEAN_VAR assert ph.get_merge_function(method) == ph.merge_parallel_mean_var
[docs]@pytest.mark.unit def test_get_merge_function_rank_vote(): """Tests morpheus_core.helpers.parallel_helper.get_merge_function""" method = morpheus_core.AGGREGATION_METHODS.RANK_VOTE assert ph.get_merge_function(method) == ph.merge_parallel_rank_vote
[docs]@pytest.mark.unit @pytest.mark.filterwarnings("ignore::UserWarning") # Ignore astropy warning def test_get_empty_output_array_rank_vote(): """Tests morpheus_core.helpers.parallel_helper.get_empty_output_array for mean_var""" helper.setup() out_dir = helper.TMP_DIR method = morpheus_core.AGGREGATION_METHODS.RANK_VOTE in_shape = (100, 100, 5) expected_output_shape = (100, 100, 5) expected_n_shape = (100, 100) out_hdul, n_hdul, actual_output, actual_n = ph.get_empty_output_array( out_dir, *in_shape, method ) assert actual_output.shape == expected_output_shape assert actual_n.shape == expected_n_shape out_hdul.close() n_hdul.close() helper.tear_down()
[docs]@pytest.mark.unit @pytest.mark.filterwarnings("ignore::UserWarning") # Ignore astropy warning def test_get_empty_output_array_mean_var(): """Tests morpheus_core.helpers.parallel_helper.get_empty_output_array for mean_var""" helper.setup() out_dir = helper.TMP_DIR method = morpheus_core.AGGREGATION_METHODS.MEAN_VAR in_shape = (100, 100, 5) expected_output_shape = (100, 100, 5, 2) expected_n_shape = (100, 100) out_hdul, n_hdul, actual_output, actual_n = ph.get_empty_output_array( out_dir, *in_shape, method ) assert actual_output.shape == expected_output_shape assert actual_n.shape == expected_n_shape out_hdul.close() n_hdul.close() helper.tear_down()
[docs]@pytest.mark.unit def test_get_start_y_idxs(): """Tests morpheus_core.helpers.parallel_helper.get_start_y_idxs""" n_heights = [100, 100, 100] window_height = 10 expected_idxs = np.array([0, 91, 182, 273]) actual_idxs = ph.get_start_y_idxs(n_heights, window_height) np.testing.assert_array_equal(actual_idxs, expected_idxs)
[docs]@pytest.mark.unit def test_get_data_from_worker(): """Tests morpheus_core.helpers.parallel_helper.get_data_from_worker""" helper.setup() out_dir = helper.TMP_DIR worker = "1" os.mkdir(os.path.join(out_dir, worker)) os.mkdir(os.path.join(out_dir, worker, "output")) output_path = os.path.join(out_dir, worker, "output", "output.fits") n_path = os.path.join(out_dir, worker, "output", "n.fits") mock_data = np.zeros([100, 100], dtype=np.float32) fits.PrimaryHDU(data=mock_data).writeto(output_path) fits.PrimaryHDU(data=mock_data).writeto(n_path) actual_output, actual_n = ph.get_data_from_worker(out_dir, worker) np.testing.assert_array_equal(mock_data, actual_output) np.testing.assert_array_equal(mock_data, actual_n) helper.tear_down()
[docs]@pytest.mark.unit def test_merge_parallel_mean_var(): """Tests morpheus_core.helpers.parallel_helper.merge_parallel_mean_var""" test_shape = [10, 10, 1, 2] combined_out = np.zeros(test_shape) combined_out[:5, ..., 0] = 1 combined_n = np.zeros([10, 10]) combined_n[:5, :] = 1 combined_n[5:, :] = 0 output = np.zeros([5, 10, 1, 2]) output[..., 0] = 2 output[..., 1] = 0 n = np.ones([5, 10]) start_y = 4 ph.merge_parallel_mean_var(combined_out, combined_n, output, n, start_y) expected_out = np.zeros(test_shape) expected_out[:4, :, 0, 0] = 1 expected_out[4, :, 0, 0] = 1.5 expected_out[5:-1, :, 0, 0] = 2 expected_out[:4, :, 0, 1] = 0 expected_out[4, :, 0, 1] = 0.25 expected_out[5:, :, 0, 1] = 0 np.testing.assert_array_equal(expected_out, combined_out)
[docs]@pytest.mark.unit def test_mege_parallel_rank_vote(): """tests morpheus_core.helpers.parallel_helper.merge_parallel_rank_vote""" test_shape = [10, 10, 2] combined_out = np.zeros(test_shape) combined_out[:5, :, 0] = 0.3 combined_out[:5, :, 1] = 0.7 combined_n = np.zeros([10, 10]) combined_n[:5, :] = 1 combined_n[5:, :] = 0 output = np.zeros([5, 10, 2]) output[..., 0] = 0.5 output[..., 1] = 0.5 n = np.ones([5, 10]) start_y = 4 ph.merge_parallel_rank_vote(combined_out, combined_n, output, n, start_y) expected_out = np.zeros(test_shape) expected_out[:4, :, 0] = 0.3 expected_out[:4, :, 1] = 0.7 expected_out[4, :, 0] = 0.4 expected_out[4, :, 1] = 0.6 expected_out[5:-1, :, 0] = 0.5 expected_out[5:-1, :, 1] = 0.5 np.testing.assert_array_equal(expected_out, combined_out)
[docs]@pytest.mark.integration @pytest.mark.filterwarnings("ignore::UserWarning") # Ignore astropy warning def test_stitch_parallel_classifications(): """tests morpheus_core.helpers.parallel_helper.stitch_parallel_classifications""" helper.setup() out_dir = helper.TMP_DIR test_shape = [10, 10, 1, 2] window_shape = [5, 5] workers = [0, 1] aggregation_method = morpheus_core.AGGREGATION_METHODS.MEAN_VAR for w in workers: os.mkdir(os.path.join(out_dir, str(w))) os.mkdir(os.path.join(out_dir, str(w), "output")) test_data = np.zeros(test_shape) test_data[:, :, 0, 0] = 1 + w fits.PrimaryHDU(data=test_data).writeto( os.path.join(TMP_DIR, str(w), "output", "output.fits") ) fits.PrimaryHDU(data=np.ones(test_shape[:2])).writeto( os.path.join(TMP_DIR, str(w), "output", "n.fits") ) hduls, outputs = ph.stitch_parallel_classifications( workers, out_dir, aggregation_method, window_shape ) actual_output, actual_n = outputs expected_n = np.ones([16, 10]) expected_n[6:10, :] = 2 expected_output = np.zeros([16, 10, 1, 2]) expected_output[:6, :, 0, 0] = 1 expected_output[6:10, :, 0, 0] = 1.5 expected_output[10:, :, 0, 0] = 2 expected_output[:6, :, 0, 1] = 0 expected_output[6:10, :, 0, 1] = 0.25 expected_output[10:, :, 0, 1] = 0 np.testing.assert_array_equal(actual_n, expected_n) np.testing.assert_array_equal(actual_output, expected_output) list(map(lambda h: h.close(), hduls)) helper.tear_down()