from __future__ import annotations
from typing import Sequence
import os
import time
import subprocess
import tempfile
from copy import copy
import numpy as np
from pytomography.io.SPECT import dicom, simind
from pytomography.metadata import ObjectMeta
from pytomography.metadata.SPECT import SPECTProjMeta
import pydicom
import torch
from pytomography.transforms import Transform
[docs]ENERGY_RESOLUTION_MODELS = ['siemens']
[docs]def save_attenuation_map(
attenuation_map: torch.Tensor,
dx: float,
temp_path: str
):
"""Save attenuation map as binary file to temporary directory for subsequent use by Monte Carlo scatter simulation.
Args:
attenuation_map (torch.Tensor): Attenuation map to save
dx (float): Spacing of attenuation map in cm
temp_path (str): Temporary folder to save to
"""
d = (attenuation_map).cpu().numpy().astype(np.float32) * dx
d_flat = d.swapaxes(0,2).ravel()
d_flat.tofile(os.path.join(temp_path, f'phantom140_atn_av.bin'))
[docs]def save_source_map(
source_map: torch.Tensor,
temp_path: str,
scaling: float,
vmax: float = 1e6
):
"""Save source map as binary file to temporary directory for subsequent use by Monte Carlo scatter simulation.
Args:
source_map (torch.Tensor): Source map to save
temp_path (str): Temporary folder to save to
vmax (float, optional): Maximum value of source map, prevents divergence at early iterations. Defaults to 1e6.
"""
source_map = source_map.clamp(0, vmax)
d = source_map.cpu().numpy().astype(np.float32)
d *= scaling / d.sum()
d_flat = d.swapaxes(0,2).ravel()
d_flat.tofile(os.path.join(temp_path, f'source_act_av.bin'))
[docs]def get_simind_isotope_detector_params(
isotope_name: str,
collimator_type: str,
cover_thickness: float,
backscatter_thickness: float,
crystal_thickness: float,
energy_resolution_140keV: float = 0,
advanced_energy_resolution_model: str | None = None,
advanced_collimator_modeling: bool = False,
random_collimator_movement: bool = False,
) -> dict:
"""Obtain SIMIND parameter dictionary from isotope and detector parameters
Args:
isotope_name (str): Name of isotope used for Monte Carlo scatter simulation
collimator_type (str): Collimator type used for Monte Carlo scatter simulation.
cover_thickness (float): Cover thickness used for simulation. Currently assumes aluminum is used.
backscatter_thickness (float): Equivalent backscatter thickness used for simulation. Currently assumes pyrex is used.
energy_resolution_140keV (float): Energy resolution in percent of the detector at 140keV. Currently uses the relationship that resolution is proportional to sqrt(E) for E in keV.
advanced_collimator_modeling (bool, optional): Whether or not to use advanced collimator modeling that can be used to model septal penetration and scatter. Defaults to False.
random_collimator_movement (bool, optional): Whether or not to include random collimator movement (e.g. holes are not fixed in place). Defaults to False.
Returns:
dict: Dictionary of SIMIND parameters obtainable from isotope and detector parameters
"""
if advanced_energy_resolution_model is not None:
if advanced_energy_resolution_model=='siemens':
energy_resolution_140keV = 0
else:
raise ValueError(f'Advanced energy resolution model {advanced_energy_resolution_model} not recognized.')
index_dict = {
'fi': isotope_name,
'cc': collimator_type,
'22': energy_resolution_140keV, # TODO: add energy resolution arbitrary function as argument
'53': int(advanced_collimator_modeling),
'59': int(random_collimator_movement),
'13': cover_thickness, # TODO: add material as argument, default aluminum
'11': backscatter_thickness, # TODO: add material as argument, default pyrex
'09': crystal_thickness
}
if advanced_energy_resolution_model is not None:
index_dict['Fe'] = advanced_energy_resolution_model
return index_dict
[docs]def get_energy_window_params_dicom(
file_NM: str,
idxs: Sequence[int] | None = None
) -> Sequence[str]:
"""Obtain energy window parameters from a DICOM file: this includes a list of strings which, when written to a file, correspond to a typical "scattwin.win" file used by SIMIND.
Args:
file_NM (str): DICOM projection file name
idxs (Sequence[int]): Indices corresponding to the energy windows to extract. More than one index is provided in cases where multi-photopeak reconstruction is used and scatter needs to be obtained at all windows.
Returns:
Sequence[str]: Lines of the "scattwin.win" file corresponding to the energy windows specified by the indices.
"""
lines = []
if idxs is None:
num_energy_windows = len(pydicom.dcmread(file_NM).EnergyWindowInformationSequence)
idxs = range(num_energy_windows)
for idx in idxs:
lower, upper = dicom.get_energy_window_bounds(file_NM, idx)
lines.append(f'{lower},{upper},0')
return lines
[docs]def get_energy_window_params_simind(headerfiles: Sequence[str]|str)-> Sequence[str]:
"""Obtain energy window parameters from a list of SIMIND header files: this includes a list of strings which, when written to a file, correspond to a typical "scattwin.win" file used by SIMIND.
Args:
headerfiles (Sequence[str]): SIMIND header files
Returns:
Sequence[str]: Lines of the "scattwin.win" file corresponding to the energy windows specified by the header files.
"""
if type(headerfiles) is str:
headerfiles = [headerfiles]
lines = []
for headerfile in headerfiles:
lower, upper = simind.get_energy_window_bounds(headerfile)
lines.append(f'{lower},{upper},0')
return lines
[docs]def create_simind_command(index_dict: dict, parallel_idx: int) -> str:
"""Creates the terminal command to run SIMIND with the specified parameters
Args:
index_dict (dict): Dictionary of SIMIND parameters
parallel_idx (int): Random seed used for simulation, used to differentiate between parallel simulations
Returns:
str: Terminal command to run SIMIND with the specified parameters
"""
simind_command = f'temp_output{parallel_idx}/14:-7/15:-7/CA:1/RR:{parallel_idx}/01:-208/in:x2,6x'
for key, value in index_dict.items():
if isinstance(value, list):
for v in value:
simind_command += f'/{key}:{v}'
else:
simind_command += f'/{key}:{value}'
return simind_command
[docs]def add_together(n_parallel: int, n_windows: int, temp_path: str):
"""Adds together all the parallel SIMIND simulations to obtain the final scatter and total projections
Args:
n_parallel (int): Number of parallel simulations
n_windows (int): Number of energy windows used in the simulation
temp_path (str): Temporary directory where files were saved
"""
xscats = [0] * n_windows
xtots = [0] * n_windows
for i in range(n_parallel):
for j in range(n_windows):
w_scat = np.fromfile(os.path.join(temp_path, f'temp_output{i}_sca_w{j+1}.a00'), dtype=np.float32)
xscats[j] += w_scat
w_tot = np.fromfile(os.path.join(temp_path, f'temp_output{i}_tot_w{j+1}.a00'), dtype=np.float32)
xtots[j] += w_tot
for i in range(n_windows):
# Take mean
xscat = xscats[i] / n_parallel
xtot = xtots[i] / n_parallel
xscat.tofile(os.path.join(temp_path, f'sca_w{i+1}.a00'))
xtot.tofile(os.path.join(temp_path, f'tot_w{i+1}.a00'))
# Create a header file for it
subprocess.run(['mv', f'temp_output0_sca_w{i+1}.h00', f'sca_w{i+1}.h00'], cwd=temp_path)
subprocess.run(['sed', '-i', f's/temp_output0_sca_w{i+1}.a00/sca_w{i+1}.a00/g', f'sca_w{i+1}.h00'], cwd=temp_path)
# REMOVE THESE LATER >
subprocess.run(['mv', f'temp_output0_tot_w{i+1}.h00', f'tot_w{i+1}.h00'], cwd=temp_path)
subprocess.run(['sed', '-i', f's/temp_output0_tot_w{i+1}.a00/tot_w{i+1}.a00/g', f'tot_w{i+1}.h00'], cwd=temp_path)
[docs]def run_scatter_simulation(
source_map: torch.Tensor,
attenuation_map_140keV: torch.Tensor,
object_meta: ObjectMeta,
proj_meta: SPECTProjMeta,
energy_window_params: list,
simind_index_dict: dict,
n_events: int,
n_parallel: int = 1,
return_total: bool = False
) -> torch.Tensor:
"""Runs a Monte Carlo scatter simulation using SIMIND
Args:
source_map (torch.Tensor): Source map used in the simulation
attenuation_map_140keV (torch.Tensor): Attenuation map at 140keV used in the simulation
object_meta (ObjectMeta): SPECT ObjectMeta used in reconstruction
proj_meta (SPECTProjMeta): SPECT projection metadata used in reconstruction
energy_window_params (list): List of strings which constitute a typical "scattwin.win" file used by SIMIND
primary_window_idxs (Sequence[int]): Indices from the energy_window_params list corresponding to indices used as photopeak in reconstruction. For single photopeak reconstruction, this will be a list of length 1, while for multi-photopeak reconstruction, this will be a list of length > 1.
simind_index_dict (dict): Dictionary of SIMIND parameters
n_events (int): Number of events to simulate per projection angle
n_parallel (int, optional): Number of simulations to perform in parallel, should not exceed number of CPU cores. Defaults to 1.
return_total (bool, optional): Whether or not to also return the total projections. Defaults to False.
Returns:
torch.Tensor: Simulated projections
"""
n_batches = 1
temp_dir = tempfile.TemporaryDirectory()
# Create window file
with open(os.path.join(temp_dir.name, 'simind.win'), 'w') as f:
f.write('\n'.join(energy_window_params))
# Radial positions
np.savetxt(os.path.join(temp_dir.name, f'radii_corfile.cor'), proj_meta.radii)
# Save attenuation map and source map to TEMP directory
save_attenuation_map(attenuation_map_140keV, object_meta.dr[0], temp_dir.name)
save_source_map(source_map, temp_dir.name, scaling=n_events/n_parallel/n_batches)
# Move simind.smc and energy_resolution.erf to TEMP directory
module_path = os.path.dirname(os.path.abspath(__file__))
smc_filepath = os.path.join(module_path, "../data/simind.smc")
p = subprocess.Popen(['cp', smc_filepath, f'{temp_dir.name}/simind.smc'])
p.wait() # wait for copy to complete
for energy_res_model in ENERGY_RESOLUTION_MODELS:
e_res_filepath = os.path.join(module_path, f"../data/{energy_res_model}.erf")
p = subprocess.Popen(['cp', e_res_filepath, f'{temp_dir.name}/{energy_res_model}.erf'])
# Create simind commands and run simind in parallel
simind_commands = [create_simind_command(simind_index_dict, i) for i in range(n_parallel*n_batches)]
for batchIdx in range(n_batches):
n0, n1 = batchIdx * n_parallel, (batchIdx + 1) * n_parallel
procs = [subprocess.Popen([f'simind', 'simind', simind_command, 'radii_corfile.cor'], stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, cwd=temp_dir.name) for simind_command in simind_commands[n0:n1]]
for p in procs:
p.wait()
if p.returncode != 0: # Check if the process exited with an error
error_output = p.stderr.read().decode('utf-8')
print(f"Error in process {p.args}:\n{error_output}")
# Add together projection data from all seperate processes
add_together(n_parallel*n_batches, len(energy_window_params), temp_dir.name)
proj_simind_scatter = simind.get_projections([f'{temp_dir.name}/sca_w{i+1}.h00' for i in range(len(energy_window_params))])
proj_simind_tot = simind.get_projections([f'{temp_dir.name}/tot_w{i+1}.h00' for i in range(len(energy_window_params))])
# if length of energy window params is 1 then we need to unsqueeze
if len(energy_window_params) == 1:
proj_simind_scatter = proj_simind_scatter.unsqueeze(0)
proj_simind_tot = proj_simind_tot.unsqueeze(0)
# Remove data files from temporary directory
temp_dir.cleanup()
# Return data
if return_total:
return proj_simind_tot
else:
return proj_simind_scatter
def __init__(
self,
object_meta: ObjectMeta,
proj_meta: SPECTProjMeta,
n_events: int,
n_parallel: int,
obj2obj_transforms: Sequence[Transform],
proj2proj_transforms: Sequence[Transform],
attenuation_map_140keV: torch.Tensor,
energy_window_params: Sequence[str],
primary_window_idx: int,
isotope_names: Sequence[str],
isotope_ratios: Sequence[float],
collimator_type: str,
crystal_thickness: float,
cover_thickness: float,
backscatter_thickness: float,
energy_resolution_140keV: float = 0,
advanced_energy_resolution_model: str | None = None,
advanced_collimator_modeling: bool = False,
):
"""Monte Carlo Hybrid SPECT System Matrix class that uses SIMIND to simulate scatter and total projections.
Args:
object_meta (ObjectMeta): SPECT ObjectMeta used in reconstruction
proj_meta (SPECTProjMeta): SPECT projection metadata used in reconstruction
n_events (int): Number of photons to simulate per projection angle
n_parallel (int): Number of simulations to perform in parallel, should not exceed number of CPU cores.
obj2obj_transforms (Sequence[Transform]): List of object to object transforms for back projection
proj2proj_transforms (Sequence[Transform]): List of projection to projection transforms for back projection
attenuation_map_140keV (torch.Tensor): Attenuation map at 140keV (used in MC simulation)
energy_window_params (Sequence[str]): List of strings which constitute a typical "scattwin.win" file used by SIMIND
primary_window_idx (int): Index from the energy_window_params list corresponding to indices used as photopeak in reconstruction. For single photopeak reconstruction, this will be a list of length 1, while for multi-photopeak reconstruction, this will be a list of length > 1.
isotope_names (Sequence[str]): List of isotope names used in the simulation
isotope_ratios (Sequence[float]): Proportion of all isotopes.
collimator_type (str): Collimator type used for Monte Carlo scatter simulation (should use SIMIND name).
crystal_thickness (float): Crystal thickness used for Monte Carlo scatter simulation (currently assumes NaI)
cover_thickness (float): Cover thickness used for simulation. Currently assumes aluminum is used.
backscatter_thickness (float): Equivalent backscatter thickness used for simulation. Currently assumes pyrex is used.
energy_resolution_140keV (float): Energy resolution in percent of the detector at 140keV. Currently uses the relationship that resolution is proportional to sqrt(E) for E in keV.
advanced_energy_resolution_model (str | None, optional): Advanced energy resolution model to use. If provided, then ``energy_resolution_140keV`` is not used. Currently only 'siemens' is supported. Defaults to None.
advanced_collimator_modeling (bool, optional): Whether or not to use advanced collimator modeling that can be used to model septal penetration and scatter. Defaults to False.
"""
super().__init__(
obj2obj_transforms,
proj2proj_transforms,
object_meta,
proj_meta,
object_initial_based_on_camera_path=True,
)
self.attenuation_map_140keV = attenuation_map_140keV
self.energy_window_params = energy_window_params
self.isotope_names = isotope_names
self.isotope_ratios = isotope_ratios
self.collimator_type = collimator_type
self.cover_thickness = cover_thickness
self.crystal_thickness = crystal_thickness
self.backscatter_thickness = backscatter_thickness
self.energy_resolution_140keV = energy_resolution_140keV
self.advanced_energy_resolution_model = advanced_energy_resolution_model
self.advanced_collimator_modeling = advanced_collimator_modeling
self.primary_window_idx = primary_window_idx
self.n_events = n_events
self.n_parallel = n_parallel
def _get_proj_meta_subset(self, subset_idx: int) -> SPECTProjMeta:
"""Creates a new SPECTProjMeta that corresponds to a subset of projections
Args:
subset_idx (int): Index of the subset to use
Returns:
SPECTProjMeta: New SPECTProjMeta that corresponds to the subset of projections
"""
indices_array = self.subset_indices_array[subset_idx]
proj_meta_new = copy(self.proj_meta)
proj_meta_new.angles = proj_meta_new.angles[indices_array]
proj_meta_new.radii = proj_meta_new.radii[indices_array.cpu().numpy()]
proj_meta_new.shape = (len(indices_array), proj_meta_new.shape[1], proj_meta_new.shape[2])
proj_meta_new.padded_shape = (len(indices_array), proj_meta_new.padded_shape[1], proj_meta_new.padded_shape[2])
proj_meta_new.num_projections = len(indices_array)
return proj_meta_new
def forward(self, object: torch.Tensor, subset_idx: int | None = None):
"""Runs the Monte Carlo scatter simulation using SIMIND and returns the simulated projections.
Args:
object (torch.Tensor): Object to simulate
subset_idx (int | None, optional): Index of the subset to use. If None, then all projections are used. Defaults to None.
Returns:
torch.Tensor: Simulated projections
"""
# subsample proj_meta
if subset_idx is not None:
proj_meta = self._get_proj_meta_subset(subset_idx)
else:
proj_meta = self.proj_meta
projections_total = 0
for isotope_name, isotope_ratio in zip(self.isotope_names, self.isotope_ratios):
index_dict = get_simind_params_from_metadata(self.object_meta, proj_meta)
index_dict.update(get_simind_isotope_detector_params(
isotope_name = isotope_name,
collimator_type= self.collimator_type,
crystal_thickness=self.crystal_thickness,
cover_thickness=self.cover_thickness,
backscatter_thickness=self.backscatter_thickness,
energy_resolution_140keV=self.energy_resolution_140keV,
advanced_energy_resolution_model=self.advanced_energy_resolution_model,
advanced_collimator_modeling=self.advanced_collimator_modeling
))
projections = run_scatter_simulation(
object,
self.attenuation_map_140keV,
self.object_meta,
proj_meta,
self.energy_window_params,
index_dict,
self.n_events,
self.n_parallel,
return_total=True,
)[self.primary_window_idx]
projections_total += projections * object.sum() * isotope_ratio
# still apply proj2proj transforms since these are only additive term and cutoff
for transform in self.proj2proj_transforms:
projections_total = transform.forward(projections_total, padded=False)
return projections_total