'''
mesoSPIM Image Writer class, intended to run in the Camera Thread and handle file I/O
'''
import os
import time
import numpy as np
import tifffile
import logging
logger = logging.getLogger(__name__)
import sys
from PyQt5 import QtCore
from distutils.version import StrictVersion
import npy2bdv
from .utils.acquisitions import AcquisitionList, Acquisition
from .utils.utility_functions import write_line, gb_size_of_array_shape, replace_with_underscores, log_cpu_core
[docs]
class mesoSPIM_ImageWriter(QtCore.QObject):
def __init__(self, parent, frame_queue):
'''Image and metadata writer class. Parent is mesoSPIM_Camera() object'''
super().__init__()
self.parent = parent # a mesoSPIM_Camera() object
self.cfg = parent.cfg
self.frame_queue = frame_queue
self.state = self.parent.state # a mesoSPIM_StateSingleton() object
self.running_flag = self.abort_flag = False
self.x_pixels = self.cfg.camera_parameters['x_pixels']
self.y_pixels = self.cfg.camera_parameters['y_pixels']
self.binning_string = self.cfg.camera_parameters['binning'] # Should return a string in the form '2x4'
self.x_binning = int(self.binning_string[0])
self.y_binning = int(self.binning_string[2])
self.x_pixels = int(self.x_pixels / self.x_binning)
self.y_pixels = int(self.y_pixels / self.y_binning)
self.file_extension = ''
self.bdv_writer = self.tiff_writer = self.tiff_mip_writer = self.mip_image = None
self.tiff_aliases = ('.tif', '.tiff')
self.bigtiff_aliases = ('.btf', '.tf2', '.tf8')
self.check_versions()
[docs]
def check_versions(self):
"""Take care of API changes in different library versions"""
if StrictVersion(tifffile.__version__) < StrictVersion('2020.9.30'):
self.tiff_write = tifffile.TiffWriter.save
print(f"Warning: you are using outdated version of tifffile library {tifffile.__version__}. "
f"Upgrade to Python 3.7 and pip-install the latest tifffile version.")
else:
self.tiff_write = tifffile.TiffWriter.write
tifffile.TiffWriter.write = self.tiff_write # rename the entire class method if necessary
if hasattr(self.cfg, 'buffering'):
msg = "Option 'buffering = {...}' in config file is deprecated from v.1.10.0 and will be ignored, \
due to improved program performance. You can delete it from the config file."
logger.info(msg)
print(msg)
[docs]
def prepare_acquisition(self, acq, acq_list):
self.folder = acq['folder']
self.filename = replace_with_underscores(acq['filename'])
self.path = os.path.realpath(self.folder+'/'+self.filename)
self.MIP_path = os.path.realpath(self.folder +'/MAX_'+ self.filename + '.tiff')
self.file_root, self.file_extension = os.path.splitext(self.path)
logger.info(f'Save path: {self.path}')
self.binning_string = self.state['camera_binning'] # Should return a string in the form '2x4'
self.x_binning = int(self.binning_string[0])
self.y_binning = int(self.binning_string[2])
self.x_pixels = int(self.x_pixels / self.x_binning)
self.y_pixels = int(self.y_pixels / self.y_binning)
self.max_frame = acq.get_image_count()
if self.file_extension == '.h5':
if hasattr(self.cfg, "hdf5"):
subsamp = self.cfg.hdf5['subsamp']
compression = self.cfg.hdf5['compression']
flip_flags = self.cfg.hdf5['flip_xyz']
else:
subsamp = ((1, 1, 1),)
compression = None
flip_flags = (False, False, False)
# create writer object if the view is first in the list
if acq == acq_list[0]:
self.bdv_writer = npy2bdv.BdvWriter(self.path,
nilluminations=acq_list.get_n_shutter_configs(),
nchannels=acq_list.get_n_lasers(),
nangles=acq_list.get_n_angles(),
ntiles=acq_list.get_n_tiles(),
blockdim=((1, 256, 256),),
subsamp=subsamp,
compression=compression)
# x and y need to be exchanged to account for the image rotation
shape = (self.max_frame, self.x_pixels, self.y_pixels)
px_size_um = self.cfg.pixelsize[acq['zoom']]
sign_xyz = (1 - np.array(flip_flags)) * 2 - 1
if hasattr(self.cfg, "hdf5") and ('transpose_xy' in self.cfg.hdf5.keys()) and self.cfg.hdf5['transpose_xy']:
tile_translation = (sign_xyz[1] * acq['y_pos'] / px_size_um,
sign_xyz[0] * acq['x_pos'] / px_size_um,
sign_xyz[2] * acq['z_start'] / acq['z_step'])
else:
tile_translation = (sign_xyz[0] * acq['x_pos'] / px_size_um,
sign_xyz[1] * acq['y_pos'] / px_size_um,
sign_xyz[2] * acq['z_start'] / acq['z_step'])
affine_matrix = np.array(((1.0, 0.0, 0.0, tile_translation[0]),
(0.0, 1.0, 0.0, tile_translation[1]),
(0.0, 0.0, 1.0, tile_translation[2])))
self.bdv_writer.append_view(stack=None, virtual_stack_dim=shape,
illumination=acq_list.find_value_index(acq['shutterconfig'], 'shutterconfig'),
channel=acq_list.find_value_index(acq['laser'], 'laser'),
angle=acq_list.find_value_index(acq['rot'], 'rot'),
tile=acq_list.get_tile_index(acq),
voxel_units='um',
voxel_size_xyz=(px_size_um, px_size_um, acq['z_step']),
calibration=(1.0, 1.0, acq['z_step']/px_size_um),
m_affine=affine_matrix,
name_affine="Translation to Regular Grid"
)
elif self.file_extension == '.raw':
self.fsize = self.x_pixels*self.y_pixels
self.xy_stack = np.memmap(self.path, mode="write", dtype=np.uint16, shape=self.fsize * self.max_frame)
elif self.file_extension in self.tiff_aliases:
self.tiff_writer = tifffile.TiffWriter(self.path, imagej=True)
elif self.file_extension in self.bigtiff_aliases:
self.tiff_writer = tifffile.TiffWriter(self.path, bigtiff=True)
if acq['processing'] == 'MAX' and self.file_extension in (('.raw',) + self.tiff_aliases + self.bigtiff_aliases):
self.tiff_mip_writer = tifffile.TiffWriter(self.MIP_path, imagej=True)
self.mip_image = np.zeros((self.x_pixels, self.y_pixels), 'uint16')
self.cur_image_counter = 0
self.abort_flag = False
self.running_flag = True
self.acq = acq
self.acq_list = acq_list
@QtCore.pyqtSlot(Acquisition, AcquisitionList)
def write_images(self, acq, acq_list):
"""Write available images to disk.
The actual images are passed via `self.frame_queue` from the Camera thread, NOT via the signal/slot mechanism as before,\
starting from v.1.10.0. This is to avoid the overhead of signal/slot mechanism and to improve performance."""
if self.running_flag:
while len(self.frame_queue) > 0:
logger.debug('image queue length: ' + str(len(self.frame_queue)))
image = np.rot90(self.frame_queue.popleft())
self.image_to_disk(acq, acq_list, image)
else:
logger.debug('self.running_flag = False, no images written')
[docs]
def image_to_disk(self, acq, acq_list, image):
logger.debug('image_to_disk() started')
log_cpu_core(logger, msg='image_to_disk()')
if self.cur_image_counter % 5 == 0:
self.parent.sig_status_message.emit('Writing to disk...')
xy_res = (1./self.cfg.pixelsize[acq['zoom']], 1./self.cfg.pixelsize[acq['zoom']])
if self.file_extension == '.h5':
self.bdv_writer.append_plane(plane=image, z=self.cur_image_counter,
illumination=acq_list.find_value_index(acq['shutterconfig'], 'shutterconfig'),
channel=acq_list.find_value_index(acq['laser'], 'laser'),
angle=acq_list.find_value_index(acq['rot'], 'rot'),
tile=acq_list.get_tile_index(acq)
)
# flush H5 every 100 frames
if (self.cur_image_counter + 1) % 100 == 0:
self.bdv_writer._file_object_h5.flush()
logger.debug(f'flushed at {self.cur_image_counter + 1} frames to disk')
elif self.file_extension == '.raw':
self.xy_stack[self.cur_image_counter * self.fsize:(self.cur_image_counter + 1) * self.fsize] = image.flatten()
elif self.file_extension in self.tiff_aliases:
self.tiff_writer.write(image[np.newaxis,...], contiguous=True, resolution=xy_res,
metadata={'spacing': acq['z_step'], 'unit': 'um'})
elif self.file_extension in self.bigtiff_aliases:
self.tiff_writer.write(image[np.newaxis,...], contiguous=False, resolution=xy_res, # tile=(1024,1024), compression='lzw', #compression requires imagecodecs
metadata={'spacing': acq['z_step'], 'unit': 'um'})
if acq['processing'] == 'MAX' and self.file_extension in (('.raw',) + self.tiff_aliases + self.bigtiff_aliases):
self.mip_image[:] = np.maximum(self.mip_image, image)
self.cur_image_counter += 1
logger.debug('image_to_disk() ended')
@QtCore.pyqtSlot()
def abort_writing(self):
"""Terminate writing and close all files if STOP button is pressed"""
self.abort_flag = True
if self.running_flag:
try:
if self.file_extension == '.h5':
self.bdv_writer.close()
elif self.file_extension == '.raw':
del self.xy_stack
elif self.file_extension in (self.tiff_aliases + self.bigtiff_aliases):
self.tiff_writer.close()
self.metadata_file.close()
except Exception as e:
logger.error(f'{e}')
self.parent.sig_status_message.emit("Writing terminated, files closed")
self.running_flag = False
self.abort_flag = False
else:
pass
@QtCore.pyqtSlot(Acquisition, AcquisitionList)
def end_acquisition(self, acq, acq_list):
logger.info("end_acquisition() started")
if self.file_extension == '.h5':
if acq == acq_list[-1]:
try:
self.bdv_writer.set_attribute_labels('channel', tuple(acq_list.get_unique_attr_list('laser')))
self.bdv_writer.set_attribute_labels('illumination', tuple(acq_list.get_unique_attr_list('shutterconfig')))
self.bdv_writer.set_attribute_labels('angle', tuple(acq_list.get_unique_attr_list('rot')))
self.bdv_writer.write_xml()
except:
logger.error(f'HDF5 XML could not be written: {sys.exc_info()}')
try:
self.bdv_writer.close()
except:
logger.error(f'HDF5 file could not be closed: {sys.exc_info()}')
else:
self.bdv_writer._file_object_h5.flush()
logger.info(f'flushed H5')
elif self.file_extension == '.raw':
try:
del self.xy_stack
except Exception as e:
logger.error(f'{e}')
elif self.file_extension in (self.tiff_aliases + self.bigtiff_aliases):
try:
self.tiff_writer.close()
except Exception as e:
logger.error(f'{e}')
if acq['processing'] == 'MAX' and self.file_extension in (('.raw',) + self.tiff_aliases + self.bigtiff_aliases):
try:
self.tiff_mip_writer.write(self.mip_image)
self.tiff_mip_writer.close()
except Exception as e:
logger.error(f'{e}')
self.running_flag = False
[docs]
def write_snap_image(self, image):
timestr = time.strftime("%Y%m%d-%H%M%S")
filename = timestr + '.tif'
path = self.state['snap_folder'] + '/' + filename
if os.path.exists(self.state['snap_folder']):
try:
tifffile.imsave(path, image, photometric='minisblack')
self.write_snap_metadata(path)
except Exception as e:
logger.error(f"{e}")
else:
print(f"Error: Snap folder does not exist: {self.state['snap_folder']}. Choose it from the menu.")
@QtCore.pyqtSlot(Acquisition, AcquisitionList)
def write_metadata(self, acq, acq_list):
logger.debug("write_metadata() started")
''' Writes a metadata.txt file. Path contains the file to be written '''
path = acq['folder'] + '/' + acq['filename']
metadata_path = os.path.dirname(path) + '/' + os.path.basename(path) + '_meta.txt'
if acq['filename'][-3:] == '.h5':
if acq == acq_list[0]:
self.metadata_file = open(metadata_path, 'w')
else:
self.metadata_file = open(metadata_path, 'a')
else:
self.metadata_file = open(metadata_path, 'w')
write_line(self.metadata_file, 'Metadata for file', path)
write_line(self.metadata_file)
# write_line(file, 'COMMENTS')
# write_line(file, 'Comment: ', acq(['comment']))
# write_line(file)
write_line(self.metadata_file, 'CFG')
write_line(self.metadata_file, 'Laser', acq['laser'])
write_line(self.metadata_file, 'Intensity (%)', acq['intensity'])
write_line(self.metadata_file, 'Zoom', acq['zoom'])
write_line(self.metadata_file, 'Pixelsize in um', self.state['pixelsize'])
write_line(self.metadata_file, 'Filter', acq['filter'])
write_line(self.metadata_file, 'Shutter', acq['shutterconfig'])
write_line(self.metadata_file)
write_line(self.metadata_file, 'POSITION')
write_line(self.metadata_file, 'x_pos', acq['x_pos'])
write_line(self.metadata_file, 'y_pos', acq['y_pos'])
write_line(self.metadata_file, 'f_start', acq['f_start'])
write_line(self.metadata_file, 'f_end', acq['f_end'])
write_line(self.metadata_file, 'z_start', acq['z_start'])
write_line(self.metadata_file, 'z_end', acq['z_end'])
write_line(self.metadata_file, 'z_stepsize', acq['z_step'])
write_line(self.metadata_file, 'z_planes', acq.get_image_count())
write_line(self.metadata_file, 'rot', acq['rot'])
write_line(self.metadata_file)
''' Attention: change to true ETL values ASAP '''
write_line(self.metadata_file, 'ETL PARAMETERS')
write_line(self.metadata_file, 'ETL CFG File', self.state['ETL_cfg_file'])
write_line(self.metadata_file, 'etl_l_offset', self.state['etl_l_offset'])
write_line(self.metadata_file, 'etl_l_amplitude', self.state['etl_l_amplitude'])
write_line(self.metadata_file, 'etl_r_offset', self.state['etl_r_offset'])
write_line(self.metadata_file, 'etl_r_amplitude', self.state['etl_r_amplitude'])
write_line(self.metadata_file)
write_line(self.metadata_file, 'GALVO PARAMETERS')
write_line(self.metadata_file, 'galvo_l_frequency', self.state['galvo_l_frequency'])
write_line(self.metadata_file, 'galvo_l_amplitude', self.state['galvo_l_amplitude'])
write_line(self.metadata_file, 'galvo_l_offset', self.state['galvo_l_offset'])
#write_line(self.metadata_file, 'galvo_r_amplitude', self.state['galvo_r_amplitude'])
write_line(self.metadata_file, 'galvo_r_offset', self.state['galvo_r_offset'])
write_line(self.metadata_file)
write_line(self.metadata_file, 'CAMERA PARAMETERS')
write_line(self.metadata_file, 'camera_type', self.cfg.camera)
write_line(self.metadata_file, 'camera_exposure', self.state['camera_exposure_time'])
write_line(self.metadata_file, 'camera_line_interval', self.state['camera_line_interval'])
write_line(self.metadata_file, 'x_pixels', self.cfg.camera_parameters['x_pixels'])
write_line(self.metadata_file, 'y_pixels', self.cfg.camera_parameters['y_pixels'])
write_line(self.metadata_file)
self.metadata_file.close()
logger.debug("write_metadata() ended")