import logging
import numpy as np
import time
import warnings
from mejiro.utils import util
from mejiro.analysis.snr_calculation import get_snr
logger = logging.getLogger(__name__)
[docs]
class Exposure:
def __init__(self,
synthetic_image,
exposure_time,
engine='galsim',
engine_params={}
):
"""
Parameters
----------
synthetic_image : SyntheticImage
Source image in counts/sec.
exposure_time : float
Exposure time in seconds.
engine : str
Detector-effects engine. Determines the units of ``self.data``:
* ``'galsim'``: **Counts** (= DN for Roman, where gain = 1.0 e-/DN).
Computed as ``synthetic_image.data * exposure_time`` with sky, Poisson,
dark, and read noise added, then divided by instrument gain.
* ``'romanisim'``: **DN/s** (calibrated level-2 rate image, romanisim
gain = 2 e-/DN). Divide by exposure time and multiply by 2 to convert
to the galsim-engine electron scale.
engine_params : dict, optional
Engine-specific configuration.
"""
start = time.time()
self.synthetic_image = synthetic_image
self.exposure_time = exposure_time
self.engine = engine
self.noise = None
if engine not in synthetic_image.instrument.engines:
raise ValueError(f"Engine '{engine}' is not supported by {synthetic_image.instrument.name}. Supported engines: {synthetic_image.instrument.engines}")
if engine == 'galsim':
from mejiro.engines.galsim_engine import GalSimEngine
self.noise = GalSimEngine.get_empty_image(self.synthetic_image.num_pix,
self.synthetic_image.pixel_scale)
if self.synthetic_image.instrument_name == 'Roman':
# get exposure
results, self.sky_background, self.poisson_noise, self.reciprocity_failure, self.dark_noise, self.nonlinearity, self.ipc, self.read_noise = GalSimEngine.get_roman_exposure(
synthetic_image, exposure_time, engine_params)
# sum noise
if self.sky_background is not None: self.noise += self.sky_background
if self.poisson_noise is not None: self.noise += self.poisson_noise
if self.reciprocity_failure is not None: self.noise += self.reciprocity_failure
if self.dark_noise is not None: self.noise += self.dark_noise
if self.nonlinearity is not None: self.noise += self.nonlinearity
if self.ipc is not None: self.noise += self.ipc
if self.read_noise is not None: self.noise += self.read_noise
elif self.synthetic_image.instrument_name == 'HWO' or self.synthetic_image.instrument_name == 'JWST' or self.synthetic_image.instrument_name == 'HST':
# get exposure
results, self.sky_background, self.poisson_noise, self.dark_noise, self.read_noise = GalSimEngine.get_exposure(
synthetic_image, exposure_time, engine_params)
# sum noise
if self.sky_background is not None: self.noise += self.sky_background
if self.poisson_noise is not None: self.noise += self.poisson_noise
if self.dark_noise is not None: self.noise += self.dark_noise
if self.read_noise is not None: self.noise += self.read_noise
else:
self.instrument_not_available_error(engine)
# write the noise out to a numpy array
self.noise = self.noise.array # it's confusing for all detector effects to be type galsim.Image and the noise attribute to be an ndarray, but for comparison across engines, the noise should be an array and the detector effects should be Images so they can be passed in as engine params
elif engine == 'lenstronomy':
raise NotImplementedError('Lenstronomy engine not yet implemented')
from mejiro.engines.lenstronomy_engine import LenstronomyEngine
self.noise = np.zeros_like(self.synthetic_image.data)
# get exposure
results, self.noise = LenstronomyEngine.get_exposure(
synthetic_image=synthetic_image,
exposure_time=exposure_time,
engine_params=engine_params)
# TODO conditional for supported instruments
elif engine == 'pandeia':
raise NotImplementedError('Pandeia engine not yet implemented')
from mejiro.engines.pandeia_engine import PandeiaEngine
# get exposure
results, self.noise = PandeiaEngine.get_roman_exposure(synthetic_image, exposure_time, engine_params)
# TODO temporarily set noise to zeros until I can grab the noise that Pandeia generates
self.noise = np.zeros((self.synthetic_image.num_pix, self.synthetic_image.num_pix))
elif engine == 'romanisim':
raise NotImplementedError('romanisim engine not yet implemented')
from mejiro.engines.romanisim_engine import RomanISimEngine
if self.synthetic_image.instrument_name == 'Roman':
results, self.noise = RomanISimEngine.get_roman_exposure(synthetic_image, exposure_time, engine_params)
else:
self.instrument_not_available_error(engine)
else:
raise ValueError(f'Engine "{engine}" not recognized')
# once engine params have been defaulted and validated, set them as an attribute
self.engine_params = engine_params
# set image and expoure attributes
if self.engine == 'galsim':
if self.synthetic_image.pieces:
self.image, self.lens_image, self.source_image = results
self.data, self.lens_data, self.source_data = self.image.array, self.lens_image.array, self.source_image.array
else:
self.image, self.lens_image, self.source_image = results, None, None
self.data, self.lens_data, self.source_data = self.image.array, None, None
else:
if self.synthetic_image.pieces:
self.data, self.lens_data, self.source_data = results
else:
self.data, self.lens_data, self.source_data = results, None, None
# crop off edge effects (e.g., IPC)
# TODO crop_edge_effects returns a new (cropped) array but the result is
# discarded here, so self.data is left at its original size. Fixing this
# requires also cropping self.noise / self.image / self.lens_image /
# self.source_image to keep shapes consistent across the Exposure object.
Exposure.crop_edge_effects(self.data, pad=3)
# check for negative pixels
if np.any(self.data < 0):
warnings.warn(f'Negative pixel values in final image. Setting {np.sum(self.data < 0)} pixels to 0')
self.data[self.data < 0] = 0
if self.synthetic_image.pieces:
# TODO same discarded-return issue as above
Exposure.crop_edge_effects(self.lens_data, pad=3)
Exposure.crop_edge_effects(self.source_data, pad=3)
if np.any(self.lens_data < 0):
warnings.warn(f'Negative pixel values in lens image. Setting {np.sum(self.lens_data < 0)} pixels to 0')
self.lens_data[self.lens_data < 0] = 0
if np.any(self.source_data < 0):
warnings.warn(f'Negative pixel values in source image. Setting {np.sum(self.source_data < 0)} pixels to 0')
self.source_data[self.source_data < 0] = 0
end = time.time()
self.calc_time = end - start
logger.info(f'Exposure calculation time with {self.engine} engine: {util.calculate_execution_time(start, end, unit="s")}')
def __getstate__(self):
state = self.__dict__.copy()
# drop the SyntheticImage reference on pickle: it's already written to disk in step 04,
# so embedding it here fully duplicates that output (~2.77 MB per Exposure).
# step 06 loads the SyntheticImage from step 04's pickle directly.
state['synthetic_image'] = None
return state
[docs]
def get_snr(self, snr_per_pixel_threshold=1):
return get_snr(self, snr_per_pixel_threshold=snr_per_pixel_threshold)[0]
[docs]
def plot(self, show_snr=False, savepath=None):
import matplotlib.pyplot as plt
plt.imshow(np.log10(self.data), origin='lower')
title = f'{self.synthetic_image.strong_lens.name} (' + r'$z_{l}=$' + f'{self.synthetic_image.strong_lens.z_lens:.2f}, ' + r'$z_{s}=$' + f'{self.synthetic_image.strong_lens.z_source:.2f}' + f')\n{self.synthetic_image.instrument_name} {self.synthetic_image.band}, {self.exposure_time} s'
if show_snr:
snr = self.get_snr()
title += f'\nSNR: {snr:.2f}'
plt.title(title)
cbar = plt.colorbar()
cbar.set_label(r'log$_{10}$(Counts)')
plt.xlabel('x [Pixels]')
plt.ylabel('y [Pixels]')
if savepath is not None:
plt.savefig(savepath)
plt.show()
# def detailed_plot(self, savepath=None):
# import matplotlib.pyplot as plt
# fig, axs = plt.subplots(1, 3, figsize=(15, 5))
# axs[0].imshow(np.log10(self.data), cmap='viridis')
# axs[0].set_title(f'Exposure: {self.synthetic_image.instrument_name} {self.synthetic_image.band} band, {self.exposure_time} s')
# axs[0].set_xlabel('x [Pixels]')
# axs[0].set_ylabel('y [Pixels]')
# cbar = fig.colorbar(axs[0].images[0], ax=axs[0])
# cbar.set_label(r'log$_{10}$(Counts)')
# if self.lens_data is not None:
# axs[1].imshow(np.log10(self.lens_data), cmap='viridis')
# axs[1].set_title('Lens Image')
# axs[1].set_xlabel('x [Pixels]')
# axs[1].set_ylabel('y [Pixels]')
# cbar = fig.colorbar(axs[1].images[0], ax=axs[1])
# cbar.set_label(r'log$_{10}$(Counts)')
# if self.source_data is not None:
# axs[2].imshow(np.log10(self.source_data), cmap='viridis')
# axs[2].set_title('Source Image')
# axs[2].set_xlabel('x [Pixels]')
# axs[2].set_ylabel('y [Pixels]')
# cbar = fig.colorbar(axs[2].images[0], ax=axs[2])
# cbar.set_label(r'log$_{10}$(Counts)')
# plt.tight_layout()
# if savepath is not None:
# plt.savefig(savepath)
# plt.show()
[docs]
def instrument_not_available_error(self, engine):
raise ValueError(
f'Instrument "{self.synthetic_image.instrument_name}" not available for engine "{engine}."')
[docs]
@staticmethod
def crop_edge_effects(image, pad):
num_pix = image.shape[0]
assert num_pix % 2 != 0, 'Image has even number of pixels'
output_num_pix = num_pix - 2 * pad
return util.center_crop_image(image, (output_num_pix, output_num_pix))