"""Functions for analyzing MOSAIC data.
In addition to the dependencies for waltlabtools, waltlabtools.mosaic
also requires pandas 0.25 or greater and scikit-learn 0.21 or greater.
The public functions in waltlabtools.mosaic can be accessed via, e.g.,
.. code-block:: python
import waltlabtools as wlt # waltlabtools main functionality
import waltlabtools.mosaic # for analyzing MOSAIC assays
subset_data = wlt.mosaic.plate_subsets() # analyze data from a plate
if also using other functionality from the waltlabtools package, or
.. code-block:: python
from waltlabtools import mosaic # for analyzing MOSAIC assays
subset_data = mosaic.plate_subsets() # analyze data from a plate
if using only the waltlabtools.mosaic module.
-----
"""
import os
import re
import warnings
from tkinter import filedialog
import pandas as pd
from numpy.random import default_rng
from sklearn.mixture import GaussianMixture
from ._backend import jit, np
from .cal_curve import CalCurve, limit_of_detection
from .core import _DDOFS, c4, flatten
__all__ = [
"PlateFileCollection",
"mixture_orientation",
"weighted_aeb",
"log_transform",
"mixture_aeb",
"well_to_aeb",
"extended_coefs",
"plate_subsets",
]
# CONSTANTS
TOL = 1e-4
REG_COVAR = 1e-6
N_INIT = 5
MAX_ITER = 100
BACKGROUND_LEVEL = 2000
coefs_index = ["a", "b", "c", "d", "LOD", "LLOQ", "blank mean", "blank std", "blank cv"]
# CLASSES
[docs]
class PlateFileCollection:
"""Collection of RCA product fluorescence intensity files.
A PlateFileCollection object is a container for MOSAIC files. It is
used to keep all wells from a given day, calibration curve, or assay
together.
Parameters
----------
dir_path : str, optional
The directory containing the MOSAIC plate files. If not provided
a file dialog will be opened to select the folder.
Attributes
----------
name : str
The name of the folder containing the MOSAIC plate files.
wells : list
The wells contained in the MOSAIC plate files, e.g., "A1".
file_map : dict
A dictionary mapping the well position to the file path for
the corresponding flow cytometry output file.
conc_map : dict
A dictionary mapping the well position to the known
concentration of the calibrator.
dir_path
The path to the folder containing the MOSAIC plate files.
"""
def __init__(self, dir_path=None):
if dir_path is None:
dir_path_ = filedialog.askdirectory(
title="Choose a Folder With Data for One Calibration Curve"
)
else:
dir_path_ = dir_path
layout_file = None
well_files = []
for file_entry in os.scandir(dir_path_):
if file_entry.name.endswith(".xlsx") and ("layout" in file_entry.name):
layout_file = file_entry.path
elif file_entry.name.endswith(".csv"):
well_files.append(file_entry)
if layout_file is None:
raise OSError("Could not find layout file in folder " + dir_path_ + ".")
name = re.split(r"[/\\\\]", dir_path_)[-1].replace(" FINAL", "")
file_map = {}
conc_map = (
pd.read_excel(layout_file, header=None, index_col=0).squeeze().to_dict()
)
for file_entry in well_files:
split_filename = re.split(r"[-–—_ .()\[\]|,*]", file_entry.name)
for well in conc_map:
if well in split_filename:
if file_entry.path not in file_map.values():
file_map[well] = file_entry.path
else:
file_map[file_entry.name] = file_entry.path
if conc_map.keys() != file_map.keys():
missing_files = set(conc_map.keys()) - set(file_map.keys())
if missing_files:
warnings.warn(
"In the folder '"
+ name
+ "', for the following wells, no CSV data files were found: '"
+ "', '".join(missing_files)
+ "'."
)
for key in missing_files:
del conc_map[key]
missing_concs = set(file_map.keys()) - set(conc_map.keys())
if missing_concs:
warnings.warn(
"In the folder '"
+ name
+ "', the following wells were not found in the layout file: '"
+ "', '".join(missing_concs)
+ "'."
)
for key in missing_concs:
del file_map[key]
self.name = name
self.wells = sorted(conc_map.keys())
self.file_map = file_map
self.conc_map = conc_map
self.dir_path = dir_path_
[docs]
@jit
def mixture_orientation(
means_: np.ndarray, covariances_: np.ndarray, threshold_sds=5
) -> tuple:
"""Determines which peak is 'off.'
Parameters
----------
means_ : ndarray of length 2
The two means of the Gaussian mixture model.
covariances_ : ndarray of length 2
The two covariances of the Gaussian mixture model.
threshold_sds : float, default 5
The number of standard deviations above the mean to use as a
threshold for distinguishing on-beads in the case where there
are very few on-beads.
Returns
-------
off_label, sds_threshold, ordered_means
Tuple of length 3. Its elements:
- off_label : 1 or 2
The label of the Gaussian mixture model with the lower mean.
- sds_threshold : float
The number of standard deviations above the mean to use as a
threshold for distinguishing on-beads in the case where
there are very few on-beads.
- ordered_means : ndarray of length 2
The two means of the Gaussian mixture model, ordered from
lowest to highest (off to on).
"""
off_label = np.argmin(means_)
off_on = np.array([off_label, 1 - off_label])
low_sd = np.sqrt(covariances_[off_label])
sds_threshold = means_[off_label] + threshold_sds * low_sd
ordered_means = means_[off_on]
return off_label, sds_threshold, ordered_means
[docs]
@jit
def weighted_aeb(onfrac_gmm, onfrac_sds):
"""Weighted average of the two measures of fraction on.
Calculates the AEB using a weighted average of the Gaussian
mixture model fraction on and the standard deviation-based fraction
on.
Parameters
----------
onfrac_gmm : float
The on-fraction of the Gaussian mixture model.
onfrac_sds : float
The on-fraction based on the standard deviation threshold.
Returns
-------
aeb : float
The average number of enzymes per bead.
See Also
--------
aeb : calculate the AEB from a single f_on value
"""
gmm_weight = (0.5 - 0.5 * np.cos(np.pi * onfrac_gmm)) ** 2
onfrac = (1 - gmm_weight) * onfrac_sds + gmm_weight * onfrac_gmm
return -np.log(1 - onfrac)
[docs]
def mixture_aeb(
flat_data: np.ndarray,
means_init=None,
flat_len=None,
threshold_sds=5,
reg_covar=REG_COVAR,
) -> float:
"""Calculates AEB based on a 2-Gaussian mixture model.
Parameters
----------
flat_data : 1D ndarray
Data used for fitting the mixture model. It is assumed that if
the data should be log-transformed, they have already been
transformed, e.g., with `log_transform`.
means_init : array-like of length 2, optional
The user-provided initial means, If not provided, means are
initialized as the maximum and minimum of the data.
flat_len : int, optional
Length of flat_data. If not provided, it is calculated.
threshold_sds : numeric, default 5
The number of standard deviations above the mean to use as a
threshold for distinguishing on-beads in the case where there
are very few on-beads.
reg_covar : numeric, default REG_COVAR
Non-negative regularization added to the diagonal of covariance.
Allows to assure that the covariance matrices are all positive.
Returns
-------
float
AEB, based on weighted average of two measures of calculating
on-fraction.
"""
mi = [np.amin(flat_data), np.amax(flat_data)] if means_init is None else means_init
means_init_ = np.array(mi).reshape(-1, 1)
mixture = GaussianMixture(
n_components=2,
means_init=means_init_,
tol=TOL,
reg_covar=reg_covar,
n_init=N_INIT,
max_iter=MAX_ITER,
covariance_type="spherical",
)
try:
gmm_labels = mixture.fit_predict(flat_data.reshape(-1, 1))
off_label, sds_threshold, ordered_means = mixture_orientation(
np.ravel(mixture.means_), np.ravel(mixture.covariances_), threshold_sds
)
flat_labels = np.ravel(1 - gmm_labels) if off_label else np.ravel(gmm_labels)
corrected_flat_labels = (flat_labels & (flat_data > ordered_means[0])) | (
flat_data > ordered_means[1]
)
flat_len_ = flat_len if flat_len is not None else len(flat_data)
onfrac_gmm = np.sum(corrected_flat_labels) / flat_len_
onfrac_sds = np.sum(flat_data > sds_threshold) / flat_len_
return weighted_aeb(onfrac_gmm, onfrac_sds)
except Exception as exc:
if reg_covar < 100 * REG_COVAR:
return mixture_aeb(
flat_data=flat_data,
flat_len=flat_len_,
means_init=means_init,
threshold_sds=threshold_sds,
reg_covar=reg_covar * 2,
)
else:
raise exc
[docs]
def well_to_aeb(well_entry=None, log: bool = True, threshold_sds=5):
"""Calculates AEB of a well based on a 2-Gaussian mixture model.
Parameters
----------
well_entry : str, iterable, or os.DirEntry, optional
The well to be analyzed. If not provided, a filedialog will
open to select the output file for a well.
log : bool, default True
Should the data be log-transformed before fitting the Gaussian
mixture model?
threshold_sds : numeric, default 5
The number of standard deviations above the mean to use as a
threshold for distinguishing on-beads in the case where there
are very few on-beads.
Returns
-------
float or dict
AEB, based on weighted average of two measures of calculating
on-fraction. If multiple well_entry values are provided, a
dictionary is returned with the AEB values for each well.
"""
if isinstance(well_entry, str):
data = pd.read_csv(well_entry)
elif hasattr(well_entry, "__iter__"):
return {
entry: well_to_aeb(entry, log=log, threshold_sds=threshold_sds)
for entry in well_entry # type: ignore
}
elif isinstance(well_entry, os.DirEntry):
data = pd.read_csv(well_entry.path)
else:
well_path = filedialog.askopenfilenames(
title="Choose a File with the Flow Cytometry Data for One Well",
filetypes=[("Comma-Separated Values", "csv")],
)
return well_to_aeb(well_path)
flat_data = log_transform(flatten(data)) if log else flatten(data)
return mixture_aeb(flat_data=flat_data, threshold_sds=threshold_sds)
[docs]
def extended_coefs(concs, aebs, corr="c4", cal_curve=None) -> dict:
"""Calculates the coefficients for a 4PL model.
Parameters
----------
concs : array-like
The concentrations for the calibrators.
aebs : array-like
The AEBs for the calibrators.
corr : {"n", "n-1", "n-1.5", "c4"} or numeric, default "c4"
The sample standard deviation under-estimates the population
standard deviation for a normally distributed variable.
Specifies how this should be addressed. Options:
- "n" : Divide by the number of samples to yield the
uncorrected sample standard deviation.
- "n-1" : Divide by the number of samples minus one to
yield the square root of the unbiased sample variance.
- "n-1.5" : Divide by the number of samples minus 1.5 to
yield the approximate unbiased sample standard deviation.
- "c4" : Divide by the correction factor to yield the
exact unbiased sample standard deviation.
- If numeric, gives the delta degrees of freedom.
cal_curve : CalCurve, optional
A CalCurve object to use. If not provided, a new CalCurve will
be calculated based on the concs and aebs provided.
Returns
-------
dict
A dictionary of coefficients and properties of the calibration
curve. Its elements are:
"a", "b", "c", "d" : coefficients of the 4PL fit
"LOD" : limit of detection
"LLOQ" : lower limit of quantification (10 standard
deviations above background)
"blank mean" : mean AEB at 0 concentration
"blank std" : standard deviation of AEB at 0 concentration
"blank cv" : coefficient of variation of AEB at 0
concentration
"""
blank_array = flatten(aebs[concs == 0])
if isinstance(cal_curve, CalCurve):
new_cal_curve = cal_curve
calibratible = True
else:
new_cal_curve = CalCurve()
try:
new_cal_curve.fit(concs, aebs)
except Exception:
calibratible = False
else:
calibratible = True
if calibratible:
coefs = dict(zip("ABCDEFG", new_cal_curve.coef_.items()))
coefs["LOD"] = new_cal_curve.lod_
coefs["LLOQ"] = limit_of_detection(
blank_array, new_cal_curve, sds=10, corr=corr
)
else:
coefs = {
"a": np.nan,
"b": np.nan,
"c": np.nan,
"d": np.nan,
"LOD": np.inf,
"LLOQ": np.inf,
}
coefs["blank mean"] = np.mean(blank_array)
try:
ddof = _DDOFS[corr]
except KeyError:
ddof = float(corr)
corr_factor = c4(len(blank_array)) if (corr == "c4") else 1
coefs["blank std"] = np.std(blank_array, ddof=ddof) / corr_factor
coefs["blank cv"] = coefs["blank std"] / coefs["blank mean"]
return coefs
[docs]
def plate_subsets(
dir_path=None,
save_aebs_to=None,
save_coefs_to=None,
log: bool = True,
model="4PL",
lod_sds=3,
subsets: int = 10,
sizes=(),
corr="c4",
threshold_sds=5,
) -> pd.DataFrame:
"""Calculates AEBs and coefficients for a plate.
Parameters
----------
dir_path : str, optional
The directory containing the MOSAIC plate files. If not provided
a file dialog will be opened to select the folder.
save_aebs_to : str, optional
The path to save the AEBs to.
save_coefs_to : str, optional
The path to save the coefficients to.
log : bool, default True
Should the data be log-transformed before fitting the Gaussian
mixture model?
model : str or Model, default "4PL"
The model to fit.
lod_sds : numeric, default 3
The number of standard deviations above the mean to use as a
limit of detection.
subsets : int, default 10
The number of subsets to create.
sizes : iterable
The number of beads in each subset. Technically optional, but
the default argument is "()" which does not conduct any
subsetting.
corr : {"n", "n-1", "n-1.5", "c4"} or numeric, default "c4"
The sample standard deviation under-estimates the population
standard deviation for a normally distributed variable.
Specifies how this should be addressed. Options:
- "n" : Divide by the number of samples to yield the
uncorrected sample standard deviation.
- "n-1" : Divide by the number of samples minus one to
yield the square root of the unbiased sample variance.
- "n-1.5" : Divide by the number of samples minus 1.5 to
yield the approximate unbiased sample standard deviation.
- "c4" : Divide by the correction factor to yield the
exact unbiased sample standard deviation.
- If numeric, gives the delta degrees of freedom.
Returns
-------
pd.DataFrame
A dataframe of coefficients and extended parameters for each
subset.
See Also
--------
extended_coefs : for the coefficients and parameters returned
"""
plate_files = PlateFileCollection(dir_path)
concs = []
datasets = []
dlens = []
aebs = []
for well in plate_files.wells:
concs.append(plate_files.conc_map[well])
data = pd.read_csv(plate_files.file_map[well])
flat_data = log_transform(flatten(data.values)) if log else flatten(data.values)
dlens.append(len(flat_data))
datasets.append(flat_data)
aebs.append(mixture_aeb(flat_data=flat_data, threshold_sds=threshold_sds))
concs_flat = flatten(concs)
aebs_flat = flatten(aebs)
cal_curve = CalCurve(model=model, lod_sds=lod_sds).fit(concs_flat, aebs_flat)
coefs = extended_coefs(concs_flat, aebs_flat, corr, cal_curve)
coef_table = pd.DataFrame.from_dict(coefs, orient="index")
aeb_table = pd.DataFrame.from_dict(
{
"Well": plate_files.wells,
"Concentration": concs_flat,
"AEB": aebs_flat,
"Number of Beads": dlens,
}
)
rng = default_rng()
subset_aebs = {}
for size in sizes:
subset_aebs[size] = pd.DataFrame(
columns=range(subsets), index=plate_files.wells, dtype=float
)
for w in range(len(concs)):
if dlens[w] > size:
for subset in range(subsets):
subset_data = rng.choice(datasets[w], size, replace=False)
subset_aebs[size].loc[plate_files.wells[w], subset] = mixture_aeb(
flat_data=subset_data, threshold_sds=threshold_sds
)
else:
for subset in range(subsets):
subset_aebs[size].loc[plate_files.wells[w], subset] = aebs_flat[w]
subset_aebs[size]["mean"] = subset_aebs[size][range(subsets)].mean(
axis=1, skipna=False
)
subset_aebs[size]["std"] = subset_aebs[size][range(subsets)].std(
axis=1, skipna=False, ddof=1.5
)
subset_aebs[size]["median"] = subset_aebs[size][range(subsets)].median(
axis=1, skipna=False
)
subset_aebs[size]["IQR 25%"] = subset_aebs[size][range(subsets)].quantile(
q=0.25, axis=1
)
subset_aebs[size]["IQR 75%"] = subset_aebs[size][range(subsets)].quantile(
q=0.75, axis=1
)
if save_aebs_to is None:
save_aebs_to_ = filedialog.asksaveasfilename(
initialfile=plate_files.name + " subset AEBs.xlsx",
defaultextension=".xlsx",
filetypes=[("Excel Workbook", "xlsx")],
)
else:
save_aebs_to_ = save_aebs_to
if save_aebs_to_:
with pd.ExcelWriter(save_aebs_to_) as writer:
aeb_table.to_excel(writer, sheet_name="Original", index=False)
for size in sizes:
subset_aebs[size].to_excel(writer, sheet_name=str(size))
subset_coefs = {}
for size in sizes:
subset_coefs[size] = pd.DataFrame(
columns=range(subsets), index=coefs_index, dtype=float
)
for subset in range(subsets):
one_subset_coefs = extended_coefs(
concs_flat, subset_aebs[size][subset], corr
)
for key, value in one_subset_coefs.items():
subset_coefs[size].loc[key, subset] = value
subset_coefs[size]["mean"] = (
subset_coefs[size][range(subsets)].fillna(np.inf).mean(axis=1, skipna=False)
)
subset_coefs[size]["std"] = (
subset_coefs[size][range(subsets)]
.fillna(np.inf)
.std(axis=1, skipna=False, ddof=1.5)
)
subset_coefs[size]["median"] = (
subset_coefs[size][range(subsets)]
.fillna(np.inf)
.median(axis=1, skipna=False)
)
subset_coefs[size]["IQR 25%"] = (
subset_coefs[size][range(subsets)].fillna(np.inf).quantile(q=0.25, axis=1)
)
subset_coefs[size]["IQR 75%"] = (
subset_coefs[size][range(subsets)].fillna(np.inf).quantile(q=0.75, axis=1)
)
if save_coefs_to is None:
save_coefs_to_ = filedialog.asksaveasfilename(
initialfile=plate_files.name + " subset coefficients.xlsx",
defaultextension=".xlsx",
filetypes=[("Excel Workbook", "xlsx")],
)
else:
save_coefs_to_ = save_coefs_to
if save_coefs_to_:
with pd.ExcelWriter(save_coefs_to_) as writer:
coef_table.to_excel(writer, sheet_name="Original", header=False)
for size in sizes:
subset_coefs[size].to_excel(writer, sheet_name=str(size))
return coef_table