# MIT License
# Copyright 2020 Ryan Hausen
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# ==============================================================================
import os
from itertools import product, repeat
import numpy as np
import pytest
from astropy.io import fits
import morpheus_core.helpers.label_helper as lh
from morpheus_core.helpers.label_helper import finalize_rank_vote
import morpheus_core.tests.helpers as helper
[docs]@pytest.mark.unit
def test_get_mean_var_array_in_mem():
"""tests morpheus_core.helpers.label_helper.get_mean_var_array"""
shape = [100, 100, 1]
hdul, array = lh.get_mean_var_array(shape)
assert hdul is None
assert array.shape == (100, 100, 1, 2)
[docs]@pytest.mark.unit
@pytest.mark.filterwarnings("ignore::UserWarning") # Ignore astropy warning
def test_get_mean_var_array_on_disk():
"""tests morpheus_core.helpers.label_helper.get_mean_var_array"""
helper.setup()
shape = [100, 100, 1]
out_file = os.path.join(helper.TMP_DIR, "test.fits")
hdul, in_mem_arrays = lh.get_mean_var_array(shape, write_to=out_file)
on_disk_array = fits.getdata(out_file)
# validate the returned array
assert hdul is not None
assert in_mem_arrays.shape == (100, 100, 1, 2)
assert on_disk_array.shape == (100, 100, 1, 2)
helper.tear_down()
[docs]@pytest.mark.unit
def test_get_rank_vote_array_in_mem():
"""tests morpheus_core.helpers.label_helper.get_mean_var_array"""
shape = [100, 100, 1]
hdul, array = lh.get_rank_vote_array(shape)
assert hdul is None
assert array.shape == (100, 100, 1)
[docs]@pytest.mark.unit
@pytest.mark.filterwarnings("ignore::UserWarning") # Ignore astropy warning
def test_get_rank_vote_array_on_disk():
"""tests morpheus_core.helpers.label_helper.get_mean_var_array"""
helper.setup()
shape = [100, 100, 1]
out_file = os.path.join(helper.TMP_DIR, "test.fits")
hdul, in_mem_array = lh.get_rank_vote_array(shape, write_to=out_file)
on_disk_array = fits.getdata(out_file)
# validate the returned array
assert hdul is not None
assert in_mem_array.shape == (100, 100, 1)
assert on_disk_array.shape == (100, 100, 1)
helper.tear_down()
[docs]@pytest.mark.unit
def test_get_n_array_in_mem():
"""tests morpheus_core.helpers.label_helper.get_mean_var_array"""
shape = [100, 100]
hdul, array = lh.get_n_array(shape)
assert hdul is None
assert array.shape == (100, 100)
[docs]@pytest.mark.unit
@pytest.mark.filterwarnings("ignore::UserWarning") # Ignore astropy warning
def test_get_n_array_on_disk():
"""tests morpheus_core.helpers.label_helper.get_mean_var_array"""
helper.setup()
shape = [100, 100]
out_file = os.path.join(helper.TMP_DIR, "test.fits")
hdul, array = lh.get_n_array(shape, write_to=out_file)
on_disk_array = fits.getdata(out_file)
# validate the returned array
assert hdul is not None
assert array.shape == (100, 100)
assert on_disk_array.shape == (100, 100)
helper.tear_down()
[docs]@pytest.mark.unit
def test_get_windowed_index_generator_fails():
"""test morpheus_core.helpers.label_helper.get_windowed_index_generator"""
total_wh = (100, 100)
window_shape = (10, 10)
stride = (1,)
with pytest.raises(ValueError):
lh.get_windowed_index_generator(total_wh, window_shape, stride)
@pytest.mark.unit
def test_get_windowed_index_generator_stride_one():
"""test morpheus_core.helpers.label_helper.get_windowed_index_generator"""
total_wh = (100, 100)
window_shape = (10, 10)
stride = (1, 1)
test = np.zeros(total_wh)
ys, xs = zip(*lh.get_windowed_index_generator(total_wh, window_shape, stride))
test[ys, xs] = 1
assert test[: total_wh[0] - window_shape[0], : total_wh[1] - window_shape[1]].all()
[docs]@pytest.mark.unit
def test_get_windowed_index_generator_stride_one():
"""test morpheus_core.helpers.label_helper.get_windowed_index_generator"""
total_wh = (100, 100)
window_shape = (10, 10)
stride = (2, 2)
test = np.zeros(total_wh)
ys, xs = zip(*lh.get_windowed_index_generator(total_wh, window_shape, stride))
test[ys, xs] = 1
# assert that all of the desired indicies where hit
assert test[
: total_wh[0] - window_shape[0] : stride[0],
: total_wh[1] - window_shape[1] : stride[1],
].all()
# assert that all of the skipped indicies where missed
assert not test[
1 : total_wh[0] - window_shape[0] + 1 : stride[0],
1 : total_wh[1] - window_shape[1] + 1 : stride[1],
].any()
[docs]@pytest.mark.unit
def test_get_final_map_final():
"""Test the get_final_map method on complete array."""
total_shape = (100, 100)
update_mask_shape = (10, 10)
stride = (1, 1)
output_idx = (
total_shape[0] - update_mask_shape[0],
total_shape[1] - update_mask_shape[1],
)
expected_idx = product(range(update_mask_shape[0]), range(update_mask_shape[1]))
actual_idx = lh.get_final_map(total_shape, update_mask_shape, stride, output_idx)
assert all([a == b for a, b in zip(expected_idx, actual_idx)])
[docs]@pytest.mark.unit
def test_get_final_map_end_y():
"""Test the get_final_map method on final row."""
total_shape = (100, 100)
update_mask_shape = (10, 10)
stride = (1, 1)
output_idx = (total_shape[0] - update_mask_shape[0], 0)
expected_idx = zip(range(update_mask_shape[0]), repeat(0, update_mask_shape[1]))
actual_idx = lh.get_final_map(total_shape, update_mask_shape, stride, output_idx)
assert all([a == b for a, b in zip(expected_idx, actual_idx)])
[docs]@pytest.mark.unit
def test_get_final_map_end_x():
"""Test the get_final_map method on final col."""
total_shape = (100, 100)
update_mask_shape = (10, 10)
stride = (1, 1)
output_idx = (0, total_shape[1] - update_mask_shape[1])
expected_idx = zip(repeat(0, update_mask_shape[0]), range(update_mask_shape[1]))
actual_idx = lh.get_final_map(total_shape, update_mask_shape, stride, output_idx)
assert all([a == b for a, b in zip(expected_idx, actual_idx)])
[docs]@pytest.mark.unit
def test_get_final_map_first():
"""Test the get_final_map method on any non final position."""
total_shape = (100, 100)
update_mask_shape = (10, 10)
stride = (1, 1)
output_idx = (0, 0)
expected_idx = [(0, 0)]
actual_idx = lh.get_final_map(total_shape, update_mask_shape, stride, output_idx)
assert list(actual_idx) == expected_idx
[docs]@pytest.mark.unit
def test_update_n():
"""Test the get_final_map method on any non final position."""
update_mask = np.ones([10, 10])
n = np.zeros([100, 100])
output_idx = (0, 0)
n_new = lh.update_n(update_mask, n, output_idx)
assert n_new[:10, :10].all()
assert n_new.sum() == update_mask.sum()
[docs]@pytest.mark.unit
def test_iterative_mean():
"""Tests morpheus_core.helpers.label_helper.iterative_mean."""
shape = (10, 10)
n = np.ones(shape) * 2
curr_mean = np.ones(shape)
update_mask = curr_mean.copy()
x_n = curr_mean * 2
actual_mean = lh.iterative_mean(n, curr_mean, x_n, update_mask)
np.testing.assert_array_equal(np.ones(shape) * 1.5, actual_mean)
[docs]@pytest.mark.unit
def test_iterative_variance():
"""Tests morpheus_core.helpers.label_helper.iterative_variance"""
shape = (10, 10)
terms = [np.ones(shape) * i for i in range(9)]
s_n = np.zeros(shape)
update_mask = np.ones((shape))
for i in range(9):
curr_mean = np.mean(terms[: i + 1], axis=0)
if i > 0:
prev_mean = np.mean(terms[:i], axis=0)
else:
prev_mean = curr_mean.copy()
s_n = lh.iterative_variance(s_n, terms[i], prev_mean, curr_mean, update_mask)
n = np.ones(shape) * 9
expected_sn = np.var(terms, axis=0) * n
all_same = np.equal(expected_sn, s_n)
assert all_same.all()
[docs]@pytest.mark.unit
def test_finalize_variance():
"""Tests morpheus_core.helpers.label_helper.finalize_variance"""
shape = (10, 10)
terms = [np.ones(shape) * i for i in range(9)]
expected_var = np.var(terms, axis=0)
n = np.ones(shape) * 9
sn = expected_var * n
final_map = list(product(range(shape[0]), range(shape[1])))
var = lh.finalize_variance(n, final_map, sn)
all_same = np.equal(expected_var, var)
assert all_same.all()
[docs]@pytest.mark.unit
def test_update_single_class_mean_var():
"""Tests morpheus_core.helpers.label_helper.update_single_class_mean_var"""
shape = (10, 10)
terms = np.dstack([np.ones(shape) * i for i in range(9)])
update_mask = np.ones((shape))
n = np.ones(shape) * 9
mean_var = np.zeros((10, 10, 2))
mean_var[:, :, 0] = np.mean(terms[:, :, :-1], axis=-1)
mean_var[:, :, 1] = np.var(terms[:, :, :-1], axis=-1) * 8
actual_mean, acutal_var = lh.update_single_class_mean_var(
update_mask, n, mean_var, terms[:, :, -1]
)
expected_mean = np.mean(terms, axis=-1)
expected_var = np.var(terms, axis=-1) * n
np.testing.assert_array_equal(actual_mean, expected_mean)
np.testing.assert_array_equal(acutal_var, expected_var)
[docs]@pytest.mark.unit
def test_update_mean_var():
"""Tests morpheus_core.helpers.label_helper.update_mean_var"""
window_shape = (10, 10)
total_shape = (100, 100)
stride = (1, 1)
update_mask = np.ones(window_shape)
n = np.zeros(total_shape)
output_array = np.zeros((100, 100, 1, 2))
batch_output = np.ones((10, 10, 1))
output_idx = (0, 0)
lh.update_mean_var(update_mask, stride, n, output_array, batch_output, output_idx)
assert output_array[:, :, :, 0].sum() == 100
assert output_array[:, :, :, 1].sum() == 0
[docs]@pytest.mark.unit
def test_finalize_rank_vote():
"""Tests morpheus_core.helpers.label_helper.finalize_rank_vote"""
window_size = (10, 10)
n = np.ones(window_size)
final_map = [(0, 0)]
output = np.dstack(
[np.zeros(window_size), np.zeros(window_size), np.ones(window_size)]
)
return output.sum() == finalize_rank_vote(n, final_map, output).sum()
[docs]@pytest.mark.unit
def test_update_rank_vote():
"""Tests morpheus_core.helpers.label_helper.update_rank_vote"""
window_shape = (10, 10)
total_shape = (100, 100)
stride = (1, 1)
update_mask = np.ones(window_shape)
n = np.zeros(total_shape)
output = np.zeros((100, 100, 3))
single_output = np.dstack(
[
np.ones(window_shape) * 0,
np.ones(window_shape) * 0.4,
np.ones(window_shape) * 0.6,
]
)
output_idx = (0, 0)
lh.update_rank_vote(update_mask, stride, n, output, single_output, output_idx)
expected_output = np.dstack(
[np.ones(window_shape) * 0, np.ones(window_shape) * 0, np.ones(window_shape)]
)
assert output.sum() == expected_output.sum()
if __name__ == "__main__":
test_update_mean_var()