Source code for ce_detector.scanner

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
""" class for scanning cryptic exons based on previous _result ::
Junction detector and junction annotator
"""
from typing import Iterable

import numpy as np
import pandas as pd

from .utils import timethis


[docs]def check(axis) -> np.array: """check if cryptic exon has children Which means that check if the cryptic exon is split by others junction reads in terms of start and end position :param axis: an array, a junction read, included start and end :type axis: numpy.array :return: whether junction read can split cryptic exon :rtype: bool """ return (axis[0] < 0) & (axis[1] > 0)
[docs]def assign_value(df_ce, ces, ns, ce_id, ns_id) -> None: """assign value of `child column` for every cryptic exons that contains junction reads with N type Given the start and end of cryptic exons and junction reads, Note: types of junction reads contains N, D, A, DA, NDA. For details: https://regtools.readthedocs.io/en/latest/commands/junctions-annotate/ :param df_ce: pandas.DataFrame of cryptic exons :type df_ce: pandas.DataFrame :param ces: cryptic exons' pandas.DataFrame, which has gene ids that both cryptic exon and junction read own. It only contain gene id, start and end :type ces: pandas.DataFrame :param ns: pandas.DataFrame of junction reads whose type is N, as well as has same gene id as cryptic exons :type ns: pandas.DataFrame :param ce_id: index of gene id of cryptic exons which has junction reads as children :type ce_id: numpy.array :param ns_id: index of gene id of junction reads with N type, which have junction reads as children :type ns_id: numpy.array """ children_info = f"{ns.iloc[ns_id, 0]}-{ns.iloc[ns_id, 1]}," # start-end gene_id = ces.index[ce_id] length = ces.iloc[ce_id, 1] - ces.iloc[ce_id, 0] df_ce.loc[(gene_id, length), "children"] += children_info
[docs]def split_ce(df_ce, df_n) -> Iterable: """Iterator: check whether detected cryptic exons are split by other junction reads :param df_ce: pandas.DataFrame of cryptic exons return from :func:`find_ce` :type df_ce: pandas.DataFrame :param df_n: pandas.DataFrame of junction reads with N type :type df_n: pandas.DataFrame :return: param:`df_ce` with new column `children` :rtype: iterator """ # set gene as index df_ce.set_index("gene", inplace=True) df_n.set_index("gene", inplace=True) # get genes contained in both cryptic exons and junction reads with N type gene_ind = set(df_ce.index) & set(df_n.index) ces = df_ce.loc[gene_ind, ["start", "end"]] ns = df_n.loc[gene_ind, ["start", "end"]] ces_values = ces.values ns_values = ns.values # set another index in case that one gene has more than one cryptic exons df_ce.set_index("length", append=True, inplace=True) if ces_values.size > 1: x = np.apply_along_axis( lambda row: row - ns_values, 1, ces_values, ) # iter for row bool_re = np.apply_along_axis(check, 2, x) position = np.argwhere( bool_re, ) # return row id, n_df id[[ce_id,n_id],[ce_id,n_id]] if position.size >= 1: for ce_id, ns_id in position: assign_value(df_ce, ces, ns, ce_id, ns_id) else: x = ces_values - ns_values bool_x = (x[:, 0] < 0) & (x[:, 1] > 0) position = np.argwhere(bool_x) # return [[n_id],[n_id]] if position.size >= 1: for ns_id in position.flatten(): assign_value(df_ce, ces, ns, ce_id=0, ns_id=ns_id) return df_ce
[docs]def find_ce(groups) -> Iterable: """parse _result getting from annotations in order to detect cryptic exons :param groups: annotated junction reads are grouped by `strand` and `type` :type groups: Groupby object return from ``Dataframe.groupby`` :return: pd.DataFrame of two strands :rtype: Iterable """ # old column and new column pick_col = ( "chrom", "end_D", "start_A", "start", "end", "strand", "score_DA", "score_D", "score", "gene", ) rename_col = ( "chrom", "start", "end", "start_DA", "end_DA", "strand", "score_DA", "score_D", "score_A", "gene", ) result = [] for strand in ("+", "-"): try: da = groups.get_group((strand, "DA")) d = groups.get_group((strand, "D")) a = groups.get_group((strand, "A")) n = groups.get_group((strand, "N")) except KeyError as exc: raise exc else: temp = ( da.merge( d, left_on=["chrom", "start", "gene"], right_on=["chrom", "start", "gene"], suffixes=("_DA", "_D"), ) .merge( a, left_on=["chrom", "end_DA", "gene"], right_on=["chrom", "end", "gene"], suffixes=("", "_A"), )[lambda df: df["end_D"] < df["start_A"]] .pipe(lambda df: df.loc[:, pick_col]) .rename(columns=dict(zip(pick_col, rename_col))) .assign(length=lambda df: df.end - df.start) .assign(children="") ) result.append(split_ce(temp, n)) return pd.concat(result).reset_index()
[docs]class Scanner: """class for scanning cryptic exons based on annotated junction reads :param cutoff: cutoff used to filter junction reads with relatively low score or depth :type cutoff: int :param output: filename of _result :type output: str """ def __init__(self, cutoff, output=None): """Constructor for Scanner""" self.cutoff = cutoff self.output = output self._result = None def __repr__(self): return f"Scanner({self.cutoff!r})"
[docs] @timethis(name="File Writer for Scanner", message="FINISHED") def write2file(self, logger, verbose=False): """ start iterator and write _result to file""" if verbose: logger.info("Beginning Writing") pd.concat(self._result).to_csv(self.output, sep="\t", encoding="utf8")
[docs] @timethis(name="Cryptic Exon Scanner", message="FINISHED") def run(self, junctionmap, logger, verbose=False) -> Iterable: """run program to scan cryptic exons :param verbose: :type verbose: :param logger: :type logger: :param junctionmap: instance from :class:`ce_detector.detector.JunctionMap` :type junctionmap: instance :return: temporary result used to store cryptic exons :rtype: Iterable """ if verbose: logger.info(f"Chrom {junctionmap.chrom} Scanner Beginning ") groups = pd.DataFrame( [ [read.chrom, read.start, read.end, read.strand, read.score, *info] for read in junctionmap for info in read.information if read.score >= self.cutoff ], columns=[ "chrom", "start", "end", "strand", "score", "type", "dk", "ak", "gene", ], ).pipe(lambda df: df.groupby(["strand", "type"])) try: junctionmap.result = find_ce(groups) except KeyError as exc: logger.warn(f"Chrom {junctionmap.chrom} KeyError {exc.args[0]}") if verbose: logger.info(f"Chrom {junctionmap.chrom} Scanner Finished") return junctionmap