Source code for waltlabtools.read

import os
import pathlib
from collections.abc import Callable, Hashable, Iterable
from io import IOBase
from mmap import mmap
from tkinter import Tk, filedialog
from typing import Any, Optional

import pandas as pd

from .cal_curve import CalCurve, _CalCurveSeries
from .core import match_kwargs
from .model import MODELS, Model


def _read_tsv(*args, **kwargs):
    kwargs.setdefault("delimiter", "\t")
    return pd.read_csv(*args, **kwargs)


_EXTENSION_READERS: dict[str, Callable] = {
    ".csv": pd.read_csv,
    ".tsv": _read_tsv,
    ".xls": pd.read_excel,
    ".xlsb": pd.read_excel,
    ".xlsm": pd.read_excel,
    ".xlsx": pd.read_excel,
    "": pd.read_csv,
}

_SAMPLE_RESULTS_COLNAMES: dict[str, str] = {
    "AEB": "Replicate AEB",
    "Concentration": "Replicate Conc.",
    "Mean Concentration": "Mean Conc.",
    "SD Concentration": "SD Conc.",
    "CV Concentration": "CV Conc.",
}

_HDX_EXCEPTIONS: dict[str, Any] = {
    "TooMuchFluorescenceInResorufinChannelException": 30,
}


def _suffix(filepath_or_buffer: os.PathLike | str | IOBase) -> str:
    """Determines the file extension for a given filepath or buffer.

    Parameters
    ----------
    filepath_or_buffer : str, path object, or file-like object
        The filepath or buffer to determine the file extension for.

    Returns
    -------
    str
        The file extension, including the leading dot (e.g., '.txt').

    Raises
    ------
    TypeError
        If the input is not a valid filepath or buffer.
    """
    if hasattr(filepath_or_buffer, "suffix"):
        return filepath_or_buffer.suffix  # type: ignore
    elif isinstance(filepath_or_buffer, str):
        i = filepath_or_buffer.rfind(".")
        return filepath_or_buffer[i:] if (0 < i < len(filepath_or_buffer) - 1) else ""
    elif isinstance(filepath_or_buffer, IOBase):
        return ""
    else:
        raise TypeError(
            "Could not determine file extension for file "
            f"{filepath_or_buffer} of type {type(filepath_or_buffer)}."
        )


def _select_files_or_folders(folder: bool = False, multiple: bool = True) -> list[str]:
    """Displays a dialog box to select files or a folder.

    Parameters
    ----------
    folder : bool, default False
        If False (default), the dialog box will allow selecting files.
        If True, the dialog box will allow selecting a folder.
    multiple : bool, default True
        If True (default), the dialog box will allow selecting multiple
        files. If False, the dialog box will only allow selecting a
        single file. Ignored if folder is True.

    Returns
    -------
    io
        A list of strings representing the paths of the selected files
        or folder.

    Notes
    -----
    This function uses the `Tk` and `filedialog` modules from the
    `tkinter` package to display the dialog box. A hidden `Tk` window is
    created to facilitate the selection, and it is destroyed after the
    selection is made. The dialog box may be hidden behind other
    windows.

    """
    root = Tk()
    root.withdraw()

    if folder:
        io = [filedialog.askdirectory()]
    elif multiple:
        io = list(filedialog.askopenfilenames())
    else:
        io = [filedialog.askopenfilename()]

    root.destroy()
    return io


def _accumulate_list(iterable: Iterable, func: Callable) -> list:
    """Applies a function to iterable items and accumulates a list.

    Applies a given function to each item in an iterable and
    accumulates the results in a single list. The function should
    return an iterable for each item in the input iterable. The
    function is applied to each item, and the returned iterables are
    combined into a single list.

    Parameters
    ----------
    iterable : Iterable
        An iterable containing items that the function `func` will be
        applied to.
    func : Callable
        A function that takes an item from the iterable as input and
        returns an iterable.

    Returns
    -------
    list
        A list containing the combined results of applying the function
        `func` to each item in the input iterable.
    """
    result = []
    for item in iterable:
        result.extend(func(item))
    return result


def _is_leaf(io) -> bool:
    """Check if an object is a single file-like object."""
    return isinstance(io, (str, bytes, mmap, os.PathLike)) or not isinstance(
        io, Iterable
    )


[docs] def crawl(io) -> list: """Traverses directories and iterables to assemble a list of files. If a filepath (e.g., a string or an os.PathLike) or buffer is passed, it will be returned as a list. If a directory path is passed, then a list of all of its constituent files will be returned. Finally, if a collection of filepaths, buffers, or dictionaries is passed, then a list of all of their constituent files will be returned. Parameters ---------- io : Any Any valid string path is acceptable. The string could be a URL. Valid URL schemes include http, ftp, s3, gs, and file. For file URLs, a host is expected. A local file could be: file://localhost/path/to/table.csv. If you want to pass in a path object, crawl accepts any os.PathLike. By file-like object, we refer to objects with a read() method, such as a file handle (e.g. via builtin open function) or StringIO. If the path of a directory is passed, it will be traversed and a list of its constituent files will be returned. If a folder or collection of files is passed, then each element of the collection will be crawled. Returns ------- list A list of files or buffers. If `io` is a directory, then this list contains all files inside of it and its subfolders. If `io` is a collection of files, buffers, and directories, then this list contains all files and buffers in the collection, and all files inside of directories in the collection. """ if hasattr(io, "read") or hasattr(io, "write"): return [io] elif _is_leaf(io): try: io_path = pathlib.Path(io) except TypeError: return [io] if io_path.is_dir(): return _accumulate_list(io_path.iterdir(), crawl) return [io_path] else: return _accumulate_list(io, crawl)
def _get_sample_barcode(row: pd.Series) -> Hashable: """Gets the sample ID from a given row of a pandas DataFrame. If the "Sample Barcode" field is missing, the sample ID is constructed using a tuple of the "Batch Name" and "Location" fields. Otherwise, the "Sample Barcode" value is used as the sample ID. Parameters ---------- row : pd.Series A row from self.raw. Returns ------- str or tuple of str The sample ID. """ if ( ("Sample Barcode" in row) and pd.notna("Sample Barcode") and (row["Sample Barcode"] != "") ): return row["Sample Barcode"] else: return (row["Batch Name"], row["Location"])
[docs] class HDX: """Quanterix HD-X file reader. Reads in data from one or more Quanterix HD-X run histories (preferred) or sample results reports. The data are combined into a pandas.DataFrame called `raw`, and the assays are identified. From there, calibration curves can be fit to the data, and the data can be tidied. Parameters ---------- io : Any, optional Any valid string path is acceptable. The string could be a URL. Valid URL schemes include http, ftp, s3, gs, and file. For file URLs, a host is expected. A local file could be: file://localhost/path/to/table.csv. If you want to pass in a path object, crawl accepts any os.PathLike. By file-like object, we refer to objects with a read() method, such as a file handle (e.g. via builtin open function) or StringIO. If the path of a directory is passed, it will be traversed and a list of its constituent files will be returned. If a list, tuple, set, dictionary, or generator is passed, then each element of the collection will be crawled. If not provided, a dialogue box will open, asking the user to select files. Examples -------- To read in and combine Quanterix run histories and sample results reports chosen from a dialogue box, call without any arguments: >>> q = wlt.HDX() """ def __init__( self, io=None, raw: Optional[pd.DataFrame | Iterable[pd.DataFrame | pd.Series]] = None, cal_curves: Optional[pd.Series] = None, **kwargs, ): if raw is None: # initialize attributes file_extensions = kwargs.get("file_extensions", [".csv", ".xls"]) # read in data files if io is None: io = _select_files_or_folders( folder=kwargs.get("folder", False), multiple=kwargs.get("multiple", True), ) filepaths = crawl(io) # determine which kwargs from __init__ to pass to each reader readers = { key: value for key, value in _EXTENSION_READERS.items() if key in file_extensions } reader_kwargs = { suffix: match_kwargs(reader, kwargs) for suffix, reader in readers.items() } # read in a DataFrame for each file raw_dfs = [] for filepath in filepaths: suffix = _suffix(filepath) if suffix in readers: raw_dfs.append( readers[suffix](filepath, **reader_kwargs[suffix]).rename( columns=_SAMPLE_RESULTS_COLNAMES ) ) # concatenate the DataFrames self.raw = pd.concat(raw_dfs, ignore_index=True).drop_duplicates() elif isinstance(raw, pd.DataFrame): self.raw = raw elif isinstance(raw, Iterable): self.raw = pd.concat(raw, ignore_index=True).drop_duplicates() else: raise ValueError( "Parameter `raw` must be a DataFrame or an iterable of DataFrames if provided." ) self.raw["Sample Barcode"] = self.raw.apply(_get_sample_barcode, axis=1) # type: ignore self._cal_curves = cal_curves self._tidy = None # Find and enumerate assays self.data_cols = kwargs.get("data_cols", ["Replicate AEB"]) self.assay_defining_cols = kwargs.get("assay_defining_cols", ["Assay", "Plex"]) if "assays" in kwargs: self.assays = kwargs["assays"] else: assays = self.raw.value_counts(subset=self.assay_defining_cols).index levels = reversed(assays.names[1:]) for name in levels: dropped = assays.droplevel(name) if (assays.get_level_values(name).value_counts().shape[0] <= 1) or ( (name == assays.names[-1]) and dropped.nunique() == assays.nunique() ): assays = dropped self.assays = assays.unique() # Calibration curve methods: calculate_cal_curves, cal_curves
[docs] def calculate_cal_curves( self, model: str | Model | dict | pd.Series = "4PL", X_name: str = "Replicate Conc.", y_name: str = "Replicate AEB", force: bool = False, include_assays: Optional[Iterable] = None, exclude_assays: Optional[Iterable] = None, **kwargs, ) -> pd.Series: # get the list of assays to generate curves for if isinstance(model, Model) or model in MODELS: if include_assays is not None: if isinstance(include_assays, (str, tuple)): assay_models = {include_assays: model} else: assay_models = {assay: model for assay in include_assays} elif exclude_assays is not None: if isinstance(exclude_assays, (str, tuple)): assay_models = { assay: model for assay in self.assays if assay != exclude_assays } else: assay_models = { assay: model for assay in self.assays if assay not in exclude_assays } else: assay_models = {assay: model for assay in self.assays} elif hasattr(model, "items"): assay_models = model else: raise ValueError("Model not found.") # generate the curves cc_init_kwargs = match_kwargs(CalCurve, kwargs) cc_fit_kwargs = match_kwargs(CalCurve.fit, kwargs) self._cal_curves = _CalCurveSeries(index=self.assays, dtype=object) for assay, model in assay_models.items(): # type: ignore if isinstance(assay, tuple): # MultiIndex indexer = pd.DataFrame( { name: self.raw[name] == level_value for name, level_value in zip(self.assays.names, assay) } ).all(axis=1) else: # 1D Index indexer = self.raw[self.assays.name] == assay indexer = indexer & (self.raw["Sample Type"] == "Calibrator") X = self.raw.loc[indexer, X_name] y = self.raw.loc[indexer, y_name] try: self._cal_curves[assay] = CalCurve(model=model, **cc_init_kwargs).fit( X=X, y=y, **cc_fit_kwargs ) except Exception as e: if force: raise RuntimeError(f"Error on assay {assay}.") from e return self._cal_curves
@property def cal_curves(self) -> pd.Series: if self._cal_curves is None: self._cal_curves = self.calculate_cal_curves() # Use default parameters return self._cal_curves # Make concentration def _calculate_concentrations( self, colname: Hashable = "Replicate AEB", newname: Optional[str] = None, fix_hdx_exceptions: bool = False, **kwargs, ) -> pd.Series: "Create a new column in self.raw and calculate concentrations." if (self._cal_curves is None) or kwargs: self.calculate_cal_curves(**kwargs) if fix_hdx_exceptions: def replicate_aeb(row): # type: ignore if pd.isna(row[colname]) and row["Errors"]: for key in _HDX_EXCEPTIONS: if key in row["Errors"]: return _HDX_EXCEPTIONS[key] return row[colname] else: def replicate_aeb(row): return row[colname] if len(self.assays.names) > 1: def get_assay(row): # type: ignore return tuple(row[self.assays.names]) else: def get_assay(row): return row[self.assays.name] def apply_cal_curve(row: pd.Series) -> float: assay = get_assay(row) return self.cal_curves[assay].estimate(replicate_aeb(row)) if newname is None: newname = f"Concentration Calculated from {colname}" self.raw[newname] = self.raw.apply(apply_cal_curve, axis=1) return self.raw[newname] # Tidy data methods: _make_tidy, tidy
[docs] def calculate_tidy( self, stat: str | Callable = "median", colname: Optional[str] = None, use_curves: bool = False, **kwargs, ): if colname is None: colname = "Replicate AEB" if use_curves else "Replicate Conc." specimens = self.raw if use_curves: colname = self._calculate_concentrations(colname=colname, **kwargs).name # type: ignore tidy_columns = ["Sample Barcode", *list(self.assays.names), colname] specimens = self.raw[self.raw["Sample Type"] == "Specimen"][tidy_columns] self._tidy = ( specimens.groupby(["Sample Barcode", *self.assays.names]) .agg(stat) .reset_index() .pivot_table( index="Sample Barcode", columns=self.assays.names, values=colname ) ) return self._tidy
@property def tidy(self) -> pd.DataFrame: if self._tidy is None: self.calculate_tidy() return self._tidy # type: ignore def __add__(self, other): if not isinstance(other, HDX): raise TypeError("Unsupported operand type for +") return HDX(raw=[self.raw, other.raw]) def __eq__(self, other) -> bool: """Two HDX objects are equal if they have the same raw values.""" if not isinstance(other, HDX): raise TypeError("Unsupported operand type for ==") if len(self.raw) != len(other.raw): return False for column in self.raw.columns: if column in other.raw.columns and not self.raw[column].equals( other.raw[column] ): return False return True