Source code for ce_detector.detector

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""class for detecting junction reads

"""
import re
import time

import numpy as np
import pysam as ps
from statsmodels.stats.multitest import fdrcorrection

from .utils import timethis


[docs]class Read: """build a read class for storing information of every junction read :param chrom: chromosome of genome :type chrom: str :param start: start position of junction read :type start: int :param end: end position of junction read :type end: int :param idn: index of junction read :type idn: int :param score: support of junction read :type score: int :param strand: direction of junction read (-|+) :type strand: str :param anchor: anchor of junction read :type anchor: str :param acceptor: acceptor of junction read :type acceptor: str """ __slots__ = ( "chrom", "start", "end", "idn", "score", "strand", "anchor", "acceptor", "information", "pvalue", ) def __init__(self, chrom, start, end, idn, score, strand, anchor, acceptor, pvalue): self.chrom, self.start, self.end = chrom, start, end self.idn, self.score, self.strand = idn, score, strand self.anchor, self.acceptor = anchor, acceptor self.information = [] self.pvalue = pvalue @property def identifiers(self): return f"{self.chrom}_{self.start}_{self.end}" def __repr__(self): return ( fr"Read({self.chrom}, {self.start}, {self.end}, {self.idn}, " fr"{self.score}, {self.strand}, {self.anchor}, {self.acceptor})" ) def __str__(self): return "\t".join( map( str, [ self.chrom, self.start, self.end, self.idn, self.score, self.strand, f"{self.anchor}-{self.acceptor}", ], ), )
[docs]class JunctionMap: """build a class to store information of all junction reads""" def __init__(self, chrom): self.chrom = chrom self._junctionList = [] self.result = None @classmethod def build(cls, chrom, data): new_instance = cls(chrom) new_instance.add_data(data) return new_instance def add_data(self, data): self._junctionList = data @property def data(self): return self._junctionList def is_empty(self): if len(self._junctionList): return False else: return True def size(self): return len(self._junctionList)
[docs] def add_read(self, read): """add read to junctionlist :param read: instance from Read :type read: instance """ self._junctionList.append(read)
def __contains__(self, read): """check if read is in junctionlist according to identifiers :type read: object :return: whether Read is in JunctionMap :rtype: bool """ return read in self._junctionList def __iter__(self): """iterate every read in junctionlist""" for item in self._junctionList: yield item def __repr__(self): return f"ce_detector.detector.JunctionMap(chrom = {self.chrom})"
[docs] def write2file(self, output, header=None): """write all reads in junctionlist to file :param output: file name of output :type output: str or TextIo :param header: header of output :type header: str """ try: output.write(f"{header}\n") except AttributeError: output = open(output, "w") output.write(f"{header}\n") finally: for read in self: output.write(f"{read}\n") output.close()
[docs]class JunctionDetector: """class for detecting junction reads and record position :param bam_file: bam file :type bam_file: str :param output: filename of output :type output: str :param reference: filename of genome reference :type reference: str :param quality: quality for filtering junction reads :type quality: int """ SPLICE_SITE = dict( zip([("GT", "AG"), ("AT", "AC"), ("GC", "AG")], "+++") ) # positive site SPLICE_SITE.update( dict(zip([("CT", "AC"), ("GT", "AT"), ("CT", "GC")], "---")) ) # negative site PATTERN = re.compile(r"\d*?S*(\d+)M(\d+)N(\d+)M") def __init__(self, bam_file, reference, quality, output=None): self.bam, self.reference = ps.AlignmentFile(bam_file), ps.FastaFile(reference) self.output, self.quality = output, quality
[docs] @staticmethod def check_strand(anchor, acceptor): """check type of strand :param anchor: anchor of read :type anchor: str :param acceptor: acceptor of read :type acceptor: str :return: type of strand (-|+) :rtype: str """ strand = JunctionDetector.SPLICE_SITE.get((anchor, acceptor), "N") return strand
@staticmethod def fdr_correction(junctionmaps): chroms = junctionmaps.keys() junctionmaps_len = [jmap.size() for jmap in junctionmaps.values()] split_ind = np.cumsum(junctionmaps_len)[:-1] all_pvalues = np.array( [read.pvalue for jmap in junctionmaps.values() for read in jmap] ) cond, fdr = fdrcorrection(all_pvalues) cond_pvalue = np.split(cond, split_ind) for ind, chrom in enumerate(chroms): data = np.array(junctionmaps[chrom].data)[cond_pvalue[ind]] junctionmaps[chrom] = JunctionMap.build(chrom, data.tolist()) return junctionmaps def get_pvalue(self, chrom, start, end): jun_reads = self.bam.fetch(contig=chrom, start=start, end=end) max_ratio = float("-inf") max_info = tuple() for read in jun_reads: if "N" in read.cigarstring and read.mapping_quality > self.quality: block1, gap, block2 = map( int, self.PATTERN.search(read.cigarstring).groups() ) block_ratio = min(block1, block2) / max(block1, block2) if block_ratio > max_ratio: max_ratio = block_ratio max_info = (block1, gap, block2) R = min(max_info[0], max_info[-1]) L = max_info[1] p_value = 1 - (1 - (1 / 4) ** R) ** (L - R + 1) return max_info, p_value
[docs] def worker(self, bam_file, reference, chrom, ann_chrom, quality, idn, junctionmap): """find junction reads and annotate slice site :param ann_chrom: :type ann_chrom: :param bam_file: handle of bam_file :type bam_file: instance :param reference: handle of reference :type reference: instance :param chrom: chromosome :type chrom: str :param quality: quality for filtering reads :type quality: int :param idn: identifier of reads :type idn: int :param junctionmap: instance from junctionmap :type junctionmap: instance :return: instance from junctionmap :rtype: instance """ # detect junction reads junction_regions = bam_file.find_introns( [ r for r in bam_file.fetch(contig=chrom) # chrom if r.mapping_quality > quality ], ) # annotate slice sites for ((start, end), score) in junction_regions.items(): junction_bases = reference.fetch( reference=ann_chrom, start=start, end=end, # change chrom ) anchor, acceptor = junction_bases[:2].upper(), junction_bases[-2:].upper() strand = self.check_strand(anchor, acceptor) _, p_value = self.get_pvalue(chrom, start, end) read = Read( chrom, start, end, idn + 1, score, strand, anchor, acceptor, p_value ) junctionmap.add_read(read) idn += 1 return junctionmap
[docs] @timethis(name="Junction detector", message=" ") def run(self, chrom, ann_chrom, logger, verbose=False): """detect junction reads and annotate slice site, write results to file :return: instance from junctionmap :rtype: instance """ idn = 0 junctionmap = JunctionMap(chrom) # write junction reads information if verbose: logger.info(f"Chrom {chrom} Beginning") start = time.time() junctionmap = self.worker( self.bam, self.reference, chrom, ann_chrom, self.quality, idn, junctionmap ) if verbose: logger.info(f"Chrom {chrom} Finished {time.time() - start:.2f}s") return junctionmap