# !/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Data Classes and Functions
==========================
This file stores the data classes and functions for the MRBLEs Analysis module.
"""
# [Future imports]
from __future__ import (absolute_import, division, print_function)
from builtins import (str, super, object)
# [File header] | Copy and edit for each file in this project!
# title : data.py
# description : MRBLEs - Data Structures
# author : Bjorn Harink
# credits :
# date : 20160623
# [Modules]
# General Python
import os
import warnings
import re
import ast
# Data Structures
from xml.dom import minidom
import numpy as np
import pandas as pd
import xarray as xr
# File import
from skimage.external import tifffile as tff
# Python 2 compatibility
from six import string_types
# Descriptor classes
[docs]class TableDataFrame(object):
"""Pandas based dataframe object for table data.
Attributes
----------
data : Pandas DataFrame
Returns (filtered, if 'filter' column is present) Pandas DataFrame.
pdata : Pandas DataFrame
Returns unfiltered Pandas DataFrame
sets : list
Returns a list of all set names, if 'set' column is present.
"""
def __init__(self, data=None, flag_filt=True, **kwargs):
super(TableDataFrame, self).__init__()
self.flag_filt = flag_filt
if 'flag_name' in kwargs:
self.flag_name = kwargs['flag_name']
else:
self.flag_name = 'flag'
self._dataframe = data
def __repr__(self):
"""Return dataframe representation."""
return repr(self.data)
def __getitem__(self, index):
"""Get method."""
return self.data.loc[index]
@property
def data(self):
"""Return Pandas dataframe object."""
data = self._check_flag(self._dataframe)
return data
@property
def pdata(self):
"""Return unflagged Pandas dataframe."""
return self._dataframe
@property
def sets(self):
"""Return list of sets."""
if 'set' in self._dataframe.columns:
sets = self.get_set_names(self._dataframe)
else:
sets = None
return sets
[docs] def combine(self, data):
"""Combine data with dataframe.
Parameters
----------
data : Pandas DataFrame
"""
if isinstance(self.data, pd.DataFrame) & \
isinstance(data, pd.DataFrame):
index = data.index
self._dataframe = pd.concat([data.reset_index(drop=True),
self.data.reset_index(drop=True)],
axis=1)
self._dataframe.index = index
if 'index' in self._dataframe.index:
self._dataframe.drop(index='index')
else:
raise ValueError("Not Pandas DataFrame: %s." % type(data))
@classmethod
def _flatten_dict(cls, dict_data, prefix='.'):
"""Flatten dictionary with given prefix."""
def items():
# A closure for recursively extracting dict like values
for key, value in dict_data.items():
if isinstance(value, dict):
for sub_key, sub_value in cls._flatten_dict(value).items():
yield key + prefix + sub_key, sub_value
else:
yield key, value
return dict(items())
[docs] @staticmethod
def get_set_names(data_set, set_dim='set'):
"""Return list of sets."""
return np.unique(data_set[set_dim]).tolist()
@staticmethod
def _add_info(info_data, dataframe, codes=None, prefix='info.'):
if isinstance(info_data, pd.DataFrame):
info_data_prefix = info_data.add_prefix(prefix)
column_names = list(info_data_prefix.columns)
col_df = pd.DataFrame(columns=column_names)
dataframe = pd.concat([dataframe, col_df], sort=False)
if (codes is None) and ('code' in dataframe.columns):
codes = np.unique(dataframe['code'].dropna()).astype(int)
for code in codes:
dataframe.loc[dataframe.code == code, column_names] = \
info_data_prefix.iloc[int(code)].values
else:
for code in codes:
dataframe.loc[code, column_names] = \
info_data_prefix.iloc[int(code)]
else:
dataframe.loc[code, 'set.sequence'] = \
info_data[code]
return dataframe
def _check_flag(self, data):
if self.flag_name in data.columns and self.flag_filt is True:
flag_out_data = data[data[self.flag_name] == False] # NOQA: E712
else:
flag_out_data = data
return flag_out_data
[docs]class ImageDataFrame(object):
"""Xarray based dataframe object for images.
Attributes
----------
data : Xarray DataArray
Returns (cropped, if crop_x and/or crop_y is set) Xarray DataArray.
xdata : Xarray DataArray
Returns uncropped Xarray DataArray.
sets : list
Returns a list of all set names, if 'set' column is present.
crop_x : slice
Crop X slice. Set with slice().
crop_y : slice
Crop Y slice. Set with slice().
"""
def __init__(self, data=None):
super(ImageDataFrame, self).__init__()
self._dataframe = data
self._crop_x = slice(None, None, None)
self._crop_y = slice(None, None, None)
self._shift = {}
def __repr__(self):
"""Return dataframe representation."""
return repr(self.data)
def __getitem__(self, index=None):
"""Get method."""
if isinstance(index, string_types):
if isinstance(self.data, dict):
data = self.data[index]
else:
data = self.data.loc[index]
elif isinstance(index, slice):
data = self.data
elif index[0] == slice(None, None, None) and isinstance(self.data, dict): # NOQA
data = {key: data.loc[index[1:]]
for key, data in self.data.items()}
elif isinstance(self.data, xr.DataArray):
data = self.data.loc[index]
else:
data = self.data[index[0]].loc[index[1:]]
return data
@property
def data(self):
"""Return cropped Xarray dataframe."""
return self._check_crop(self._dataframe)
@property
def xdata(self):
"""Return uncropped Xarray dataframe."""
return self._dataframe
@property
def sets(self):
"""Return list of sets."""
if isinstance(self._dataframe, dict):
sets = self.get_set_names(self._dataframe)
else:
sets = None
return sets
[docs] def combine(self, images):
"""Combine iamges with dataframe.
Parameters
----------
images : Xarray DataArray, dict of Xarray DataArrays
"""
if isinstance(self.data, dict) & isinstance(images, dict):
self._dataframe = {
key: self.data[key].combine_first(
images[key]
)
for key in self.data.keys()
}
elif isinstance(self.data, xr.DataArray) & \
isinstance(images, xr.DataArray):
self._dataframe = self.data.combine_first(
images
)
else:
raise ValueError("Not dict or Xarray DataArray: %s."
% type(images))
[docs] def shift_channel(self, channel, x_shift, y_shift):
"""Shift images of channel by x and y pixels.
WARNING: This will shift the images permanently and sets inbound pixels
to 0. Reload images to reset.
Parameters
----------
channel : str
Channel name to shift.
x_shift : int
Pixels to shift in X dimension.
y_shift : int
Pixels to shift in Y dimension.
"""
data_shift = self._dataframe.copy()
if isinstance(data_shift, dict):
for key, value in data_shift.items():
data_shift[key].loc[:, channel] = \
value.loc[:, channel].shift(x=x_shift, y=y_shift).fillna(0)
elif isinstance(data_shift, xr.DataArray):
data_shift.loc[:, channel] = \
data_shift.loc[:, channel].shift(x=y_shift, y=y_shift)
else:
data_shift = None
self._dataframe = data_shift
# Crop properties
@property
def crop_x(self):
"""Crop X slice. Set with slice()."""
return self._crop_x
@crop_x.setter
def crop_x(self, value):
self._crop_x = self._set_slice(value)
@property
def crop_y(self):
"""Crop Y slice. Set with slice()."""
return self._crop_y
@crop_y.setter
def crop_y(self, value):
self._crop_y = self._set_slice(value)
# Crop methods
def _check_crop(self, data):
if isinstance(data, dict):
data_crop = {key: value.loc[dict(x=self._crop_x, y=self._crop_y)]
for key, value in data.items()}
elif isinstance(data, xr.DataArray):
data_crop = data.loc[dict(x=self._crop_x, y=self._crop_y)]
else:
data_crop = None
return data_crop
[docs] @staticmethod
def get_set_names(data_set):
"""Return list of sets."""
return list(data_set.keys())
[docs] @staticmethod
def get_dim_names(data_set, set_dim='c'):
"""Return list of dimension names."""
return data_set.coords[set_dim].values.tolist()
@staticmethod
def _set_slice(values):
if isinstance(values, list):
slice_values = slice(values[0], values[1])
elif values is None:
slice_values = slice(None)
else:
slice_values = values
return slice_values
# Classes
[docs]class ImageSetRead(ImageDataFrame):
"""Image set data object that loads image set from file(s).
Parameters
----------
file_path : string/list [string, string, ...]
File path as string, e.g. 'C:/folder/file.tif', or as list of file
paths, e.g. ['C:/folder/file.tif', 'C:/folder/file.tif'].
series : int, optional
Sets the series number if file has multiple series.
To Loads all series set to series='all'.
Defaults to 0.
output : str, optional
Sets default output method. Options: 'nd' for NumPy ndarray or 'xd'
for xarray.
Defaults to 'ndarray'.
Attributes
----------
See function descriptions.
Returns
-------
_dataframe : xarray dataframe
Returned when calling the instance.
_dataframe[idx] : NumPy ndarray
Returns the index value or slice values: [start:stop:stride]. Warning:
when using column names stop values are included.
Examples
--------
>>> image_data_object = ImageSetRead('C:/folder/file.tif')
>>> image_files = ['C:/folder/file.tif', 'C:/folder/file.tif']
>>> image_data_object = ImageSetRead(image_files, output='xd')
>>> image_data_object['BF', 100:400, 100:400]
(301L, 301L)
"""
def __init__(self, file_path, series=0, output='xr'):
"""Initialize file load object."""
super(ImageSetRead, self).__init__()
self._dataframe, self._metadata, self._files = \
self.load(file_path, series)
self.output = output
def __repr__(self):
"""Return xarray dataframe representation."""
return repr([self._dataframe])
# # TODO: Fix.
# def __getitem__(self, index):
# """Get method."""
# return self.data.loc[index]
# Main image load function
[docs] @classmethod
def load(cls, file_path, series=0):
"""Load image files into data structures.
Class method. Can be used without instantiating.
Parameters
----------
file_path : string/list [string, string, ...]
File path as string, e.g. 'C:/folder/file.tif', or as list of file
paths, e.g. ['C:/folder/file.tif', 'C:/folder/file.tif'].
series : int, optional
Sets the series number if file has multiple series (or positions).
Use series='all' for loading all series.
Defaults to 0.
Examples
--------
>>> ImageSetRead.load('C:/folder/file.tif')
>>> image_files = ['C:/folder/file.tif', 'C:/folder/file.tif']
>>> ImageSetRead.load(image_files)
"""
if isinstance(file_path, str):
file_path = [file_path]
with tff.TiffSequence(file_path, pattern='XYCZT') as ts, \
tff.TiffFile(file_path[0]) as tf:
files = ts.files
image_metadata = cls._get_metadata(tf)
if len(tf.series) > 1 and series == 'all':
data = []
for idx, _ in enumerate(tf.series):
data.append(ts.asarray(series=idx))
panel_data = np.array(data)
else:
panel_data = ts.asarray(series=series)
if len(file_path) == 1 and series != 'all':
panel_data = np.vstack(panel_data)
image_data = cls._convert_to_xd(
panel_data, image_metadata, file_path, series)
return image_data, image_metadata, files
# Channel properties and methods
@property
def c_size(self):
"""Return channel count."""
return self._dataframe.c.size
@property
def c_names(self):
"""Return channel names."""
return self._dataframe.c.values
[docs] def c_index(self, name):
"""Return channel number."""
return self.c_names.get_loc(name)
# File properties and methods
@property
def f_size(self):
"""Return file count."""
return len(self._files)
@property
def f_names(self):
"""Return file names."""
return self._files
@property
def is_multi_file(self):
"""Return if from multiple files."""
return len(self._files) > 1
# Series properties and methods
@property
def s_size(self):
"""Return series count."""
return len(self._metadata['series'])
# Position properties and methods
@property
def p_size(self):
"""Return position count."""
return len(self._metadata['series'])
# Z-slice properties and methods
@property
def z_size(self):
"""Return Z-slice count."""
return self._metadata['summary']['Slices']
# Time properties and methods
@property
def t_size(self):
"""Return time-point count."""
return len(np.unique(self._metadata['index_map']['frame']))
@property
def t_interval(self):
"""Return set time interval.
Default in milliseconds (ms), check object.t_unit for time unit.
"""
return self._metadata['summary']['Interval_ms']
@property
def t_deltas(self):
"""Return time deltas between each image acquisition.
Default in milliseconds (ms), check object.t_unit for time unit.
"""
xml_string = self._metadata['series'][0].pages[0].tags['image_description'].value # NOQA
xml_tree = minidom.parseString(xml_string)
t_deltas = [float(xm.attributes['DeltaT'].value)
for xm in xml_tree.getElementsByTagName('Plane')]
return t_deltas
@property
def t_unit(self):
"""Return time unit."""
xml_string = self._metadata['series'][0].pages[0].tags['image_description'].value # NOQA
xml_tree = minidom.parseString(xml_string)
xm = xml_tree.getElementsByTagName('Plane')
t_unit = str(xm[0].attributes['DeltaTUnit'].value)
return t_unit
# Axes properties and methods
@property
def axes(self):
"""Return data order.
Returns
-------
data_order : string
Returns order as string with: P for position; F for file; T for
timepoint, C for channel; Y for Y-axis; X for X-axis.
Examples
--------
>>> image_data_object = ImageSetRead('C:/folder/file.tif')
>>> image_data_object.axes
'TCYX'
"""
return ''.join(self._dataframe.dims).upper()
[docs] @staticmethod
def scan_path(path, pattern=".*.tif"):
"""Scan folder recursively for files matching the pattern.
Parameters
----------
path : string
Folder path as string, e.g. r'C:/folder/file.tif'.
pattern : string
General file pattern as search string, e.g. '20160728_MOL_*', using
regular expressions (regex).
Defaults to '.*.tif'.
"""
image_files = []
r = re.compile(pattern)
for root, dirs, files in os.walk(path):
file_list = [os.path.join(root, x) for x in files if r.match(x)]
if file_list:
image_files.append(file_list)
return np.hstack(image_files).tolist()
[docs] @classmethod
def scan_paths(cls, paths, pattern=".*.tif"):
"""Scan folders recursively for files matching the pattern.
Parameters
----------
paths : list
Folder paths as list, e.g. ['C:/folder/file.tif', ...].
pattern : string
General file pattern as search string, e.g. '20160728_MOL_*', using
regular expressions (regex).
"""
if isinstance(paths, str):
image_files = cls.scan_path(paths, pattern=pattern)
elif len(paths) > 1:
image_files = [cls.scan_path(path, pattern=pattern)
for path in paths]
else:
print("Can't resolve base path(s).")
image_files = None
return image_files
# Private methods
@staticmethod
def _convert_to_xd(data, metadata, file_path, series):
"""Convert data and metadata to xarray DataArray."""
if data.ndim == 2:
panel_data = xr.DataArray(data, dims=['y', 'x'])
else:
# Added check if using newer version of Scikit-Image
try:
if isinstance(metadata['series'][0], tff.tifffile.TiffPageSeries):
dims = [letter.lower()
for letter in metadata['series'][0].axes]
except DeprecationWarning:
# For backwards compatibility of Scikit-Image versions <0.12.3.
dims = [letter.lower()
for letter in metadata['series'][0]['axes']]
warnings.warn(
"Scikit-Image latest update has changed method to retrieve"
"metadata. Please upgrade to latest Scikit-Image package.")
if len(metadata['series']) > 1 and (series is 'all'):
dims.insert(0, 'p')
data = np.squeeze(data)
if 'i' in dims:
dims[dims.index('i')] = 'c'
if len(file_path) > 1:
dims.insert(0, 'f')
md_channels = metadata['summary']['ChNames']
# Micro-Manager re-stack bug fix: returns string instead of list.
if isinstance(md_channels, list):
channels = md_channels
elif isinstance(md_channels, str):
channels = ast.literal_eval(md_channels)
channels = [ch.strip() for ch in channels]
else:
ValueError("Channels metadata corrupted.")
panel_data = xr.DataArray(
data,
dims=dims,
coords={'c': channels},
encoding={'dtype': np.uint16}
)
return panel_data
@staticmethod
def _get_metadata(image_object):
if image_object.is_micromanager is True:
metadata = image_object.micromanager_metadata
metadata['series'] = image_object.series
return metadata
else:
warnings.warn("Not a Micro Manager TIFF file.")
return None