# 786
# Aldy source: sam.py
# This file is subject to the terms and conditions defined in
# file 'LICENSE', which is part of this source code package.
from typing import Tuple, Dict, List, Optional
from collections import defaultdict, Counter
from statistics import mean
import pysam
import os
import os.path
import gzip
import tarfile
import pickle
import tempfile
from .common import log, GRange, AldyException, script_path, Timing, chr_prefix
from .gene import Gene, CNConfigType, Mutation
from .coverage import Coverage
from .profile import Profile
[docs]
class Sample:
"""Parse read alignments in a SAM/BAM/CRAM/VCF/dump format"""
def __init__(
self,
gene: Gene,
profile: Optional[Profile],
path: str,
reference: Optional[str] = None,
debug: Optional[str] = None,
store_reads: bool = False,
):
"""
:param gene: Gene instance.
:param profile: Profile instance. `None` if loading a dump file.
:param path: Path to a SAM/BAM/CRAM/VCF/dump file.
:param reference: Reference genome path for reading CRAM files.
Default: None.
:param debug: When set, create a `{debug}.dump` file for debug purposes.
Default: None.
:raise: :py:class:`aldy.common.AldyException` if the sample is invalid
(e.g., the coverage of the copy-number neutral region is too low).
"""
self.name = os.path.basename(path).split(".")[0]
"""Sample name."""
self.gene = gene
"""Gene instance."""
self.profile = profile
"""Profile instance."""
self.path = path
"""File path."""
self._dump_cn = defaultdict(int)
"""dict[int, int]: Coverage of the copy-number neutral region"""
self._dump_reads: List[Tuple[Tuple, List]] = []
"""list[tuple[tuple, list]]: Read information."""
self._indel_sites = {
(pos, op): [0, 0] for pos, op in gene.mutations if op[:3] in ["ins", "del"]
}
self._indel_sites_eqs = {}
"""indel equivalents (used for long reads because indelpost is too slow)"""
self._multi_sites = {
m.pos: m.op
for _, a in gene.alleles.items()
for m in a.func_muts
if ">" in m.op and len(m.op) > 3
}
"""Multi-substitutions (e.g., `A.C>T.G`)."""
self.phaseable = {
pos: i for i, pos in enumerate(sorted({pos for pos, _ in gene.mutations}))
}
"""Locations that should be phased."""
self.phases: Dict[str, Dict[int, str]] = {}
"""Phasing information."""
self._fusion_counter: Dict = {}
"""Fusion read coverage (for long reads)."""
self.is_long_read = False
"""Set if long-read data is used."""
self.reads = [] if store_reads else None
with Timing("[sam] Read SAM"):
self.kind, _ = detect_genome(path)
self.genome = gene.genome
if self.kind == "vcf":
try:
norm, muts = self._load_vcf(
path, profile.vcf_sample_idx if profile else 0
)
except ValueError:
raise AldyException(f"VCF {path} is not indexed")
elif self.kind == "dump":
norm, muts = self._load_dump(path)
elif self.kind == "pscan":
norm, muts = self._load_pscan(path)
else:
if self.profile and self.profile.sam_long_reads:
self.is_long_read = True
norm, muts = self._load_long_sam(path, reference, debug)
else:
norm, muts = self._load_sam(path, reference, debug)
if self.profile and self.profile.cn_region:
self._dump_cn = self._load_cn_region(
path, reference, self.profile.cn_region
)
self._make_coverage(norm, muts)
if self.kind == "sam" and debug:
self._dump_alignments(f"{debug}.{gene.name}", norm, muts)
assert self.profile, "profile not set"
if self.profile.cn_region:
self.coverage._normalize_coverage()
log.debug("[sam] avg_coverage= {:.1f}x", self.coverage.average_coverage())
if self.profile.cn_region and self.coverage.diploid_avg_coverage() < 2:
raise AldyException(
"The average coverage of the sample is too low ({:.1f}).".format(
self.coverage.diploid_avg_coverage()
)
)
if self.profile.debug_novel:
self._load_novel_mutations()
def _load_sam(self, sam_path: str, reference=None, debug=None):
"""
Load the read, mutation and coverage data from a SAM/BAM/CRAM file.
:raise: :py:class:`aldy.common.AldyException` if the file lacks an index.
"""
log.debug("[sam] path= {}", os.path.abspath(sam_path))
if reference:
log.debug("[sam] reference= {}", os.path.abspath(reference))
assert self.profile, "profile not set"
norm: dict = defaultdict(list)
muts: dict = defaultdict(list)
with pysam.AlignmentFile( # type: ignore
sam_path, reference_filename=reference
) as sam:
# Check do we have proper index to speed up the queries
try:
has_index = sam.check_index()
except AttributeError:
# SAM files do not have an index. BAMs might also lack it
has_index = False
except ValueError:
raise AldyException(f"Cannot check index of {sam_path}")
self._prefix = chr_prefix(
self.gene.chr, [x["SN"] for x in sam.header["SQ"]]
)
with tempfile.TemporaryDirectory() as tmp:
self._realign_indels(tmp, sam, reference)
if has_index:
iter = sam.fetch(
region=self.gene.get_wide_region().samtools(prefix=self._prefix)
)
else:
log.warn("SAM/BAM index not found. Reading will be slow.")
iter = sam.fetch()
# Fetch the reads
for read in iter:
if not read.cigartuples: # only valid alignments
continue
if read.is_supplementary: # avoid supplementary alignments
continue
if "H" in read.cigarstring: # avoid hard-clipped reads
continue
if not read.query_sequence:
continue
# ensure that it is a proper gene read
if not _in_region(self.gene.get_wide_region(), read, self._prefix):
continue
# Handle 10X Genomics tags
if read.has_tag("BX"):
fragment = read.get_tag("BX")
if read.has_tag("XC"):
fragment = f"{fragment}:{read.get_tag('XC')}"
elif read.has_tag("MI"):
fragment = f"{fragment}:{read.get_tag('MI')}"
self.is_long_read = True
else:
fragment = read.query_name
r = self._parse_read(
fragment,
read.reference_start,
read.cigartuples,
read.query_sequence,
norm,
muts,
read.mapping_quality,
read.query_qualities,
)
if self.reads is not None:
self.reads.append((read.query_sequence, read.query_name, r))
if r and debug:
self._dump_reads.append(r)
return norm, muts
def _load_vcf(self, vcf_path: str, sample_idx: int = 0):
"""Load the read, mutation and coverage data from a VCF file."""
log.debug("[vcf] path= {}", os.path.abspath(vcf_path))
norm = {
p: [(40, 40)] * 20
for p in range(
self.gene.get_wide_region().start - 500,
self.gene.get_wide_region().end + 1,
)
}
muts: dict = defaultdict(list)
def get_mut(pos, ref, alt):
off = 0
while off < len(ref) and off < len(alt) and ref[off] == alt[off]:
off += 1
if len(ref) - off == 1 and len(alt) - off == 1:
if alt[off] == self.gene[off + pos]:
return off + pos, "_"
return off + pos, f"{self.gene[off + pos]}>{alt[off]}"
elif len(ref) > len(alt) and len(alt) - off == 0:
return off + pos, f"del{self.gene[off + pos : pos + len(ref)]}"
elif len(ref) < len(alt) and len(ref) - off == 0:
return off + pos, f"ins{alt[off:]}"
else:
log.trace(f"[sam] ignoring {pos}: {ref}->{alt}")
return pos, None
with pysam.VariantFile(vcf_path) as vcf: # type: ignore
self._prefix = chr_prefix(self.gene.chr, list(vcf.header.contigs))
samples = list(vcf.header.samples)
if sample_idx >= len(samples):
raise AldyException(
f"Cannot fetch sample no. {sample_idx}; "
f"input VCF has {len(vcf.header.samples)} samples"
)
self.name = sample = samples[sample_idx]
log.info("Using VCF sample {}", sample)
for read in vcf.fetch(
region=self.gene.get_wide_region().samtools(prefix=self._prefix)
):
g = sorted(y for y in read.samples[sample]["GT"] if y is not None)
if len(g) != 2 or self.gene[read.pos - 1] == "N":
continue # ignore polyploid and incomplete cases
dump_arr = {}
if len(read.ref) == 1 and read.ref != self.gene[read.pos - 1]:
hgvs = [(read.pos - 1, f"{self.gene[read.pos - 1]}>{read.ref}")]
else:
hgvs = [(read.pos - 1, "_")]
hgvs += [get_mut(read.pos - 1, read.ref, a) for a in read.alleles[1:]]
for gt in g:
pos, op = hgvs[gt]
if op == "_":
continue
muts[pos, op] += [(40, 40)] * 10
norm[pos] = norm[pos][:-10]
dump_arr[pos] = op
# Handle multi-SNPs
for pos, op in self._multi_sites.items():
if pos not in dump_arr:
continue
l, r = op.split(">")
if all(
dump_arr.get(pos + p, "-") == f"{l[p]}>{r[p]}"
for p in range(len(l))
if l[p] != "."
):
for p in range(len(l)):
if l[p] != ".":
np = pos + p, f"{l[p]}>{r[p]}"
muts[np] = muts[np][:-10]
if p:
norm[pos + p] += [(40, 40)] * 10
muts[pos, op] += [(40, 40)] * 10
return norm, muts
def _load_dump(self, dump_path: str):
"""Load Aldy dump data."""
log.debug("[dump] path= {}", os.path.abspath(dump_path))
if dump_path.endswith(".tar.gz"):
tar = tarfile.open(dump_path, "r:gz")
f = [i for i in tar.getnames() if i.endswith(f".{self.gene.name}.dump")]
if not f:
raise AldyException("Invalid dump file")
log.debug("Found {} in the archive", f[0])
data = tar.extractfile(f[0])
assert data, "Malformed dump file"
fd = gzip.open(data)
else:
fd = gzip.open(dump_path, "rb")
log.warn("Loading debug dump from {}", dump_path)
(
self.name,
self.profile,
self._dump_cn,
norm,
muts,
phases,
self._fusion_counter,
self._indel_sites,
) = pickle.load(
fd
) # type: ignore
self.profile.display_format = False
self.profile.debug_probe = ""
self.profile.debug_novel = False
self.profile.min_avg_coverage = 2.0
self.phases = {f"r{i}": v for i, v in enumerate(phases)}
norm = {p: [q for q, n in c.items() for _ in range(n)] for p, c in norm.items()}
muts = {p: [q for q, n in c.items() for _ in range(n)] for p, c in muts.items()}
return norm, muts
def _load_pscan(self, path: str):
"""Load Pharmacoscan probe data."""
log.debug("[pscan] path= {}", os.path.abspath(path))
norm = {
p: [(40, 40)] * 20
for p in range(
self.gene.get_wide_region().start - 500,
self.gene.get_wide_region().end + 1,
)
}
muts: dict = defaultdict(list)
def parse(start, ref, alt):
while ref and alt and ref[0] == alt[0]:
ref, alt, start = ref[1:], alt[1:], start + 1
return start, ref, alt
with open(path) as f:
self._prefix = "" # no chr prefix assumed
self.name = os.path.splitext(os.path.basename(path))[0]
for line in f:
if line.startswith("#") or line.startswith("probeset_id\t"):
continue
line = line.strip().split("\t")
(
_,
genotype,
_,
chrom,
start,
stop,
_,
rsid,
_,
_,
_,
_,
_,
_,
_,
_,
_,
ref,
alt,
*_,
) = line
start, stop = int(start) - 1, int(stop)
if chrom != self.gene.chr:
continue
if start not in self.gene:
continue
genotype = genotype.split("/")
if len(genotype) != 2:
continue
if ref != "-":
if self.gene[start:stop] != ref and self.gene[start:stop] == alt:
ref, alt = alt, ref
if self.gene[start:stop] != ref:
log.debug(
f"Issue: REF {ref} (ALT {alt}) "
f"does not match {self.gene[start:stop]}"
)
ref = self.gene[start:stop] # only for PScan/TPMT
alt = alt.split("//")
if len(alt) > 1:
for g in genotype:
if g in alt:
alt = g
break
else:
alt = alt[0]
start, ref, alt = parse(start, ref, alt)
r, m = 20, 0
mc = f"{ref}>{alt}"
if alt == "-":
mc = f"del{ref}"
if ref == "-":
mc = f"ins{alt}"
mut = (start, mc)
for g in genotype:
if g in alt:
m += 10
r -= 10
if m:
muts[mut] += [(40, 40)] * m
norm[mut[0]] = norm[mut[0]][:r]
return norm, muts
def _dump_alignments(self, debug: str, norm, muts):
"""Pickle alignment data for debug purposes."""
with open(f"{debug}.genome", "w") as fd:
print(self.gene.genome, file=fd)
with gzip.open(f"{debug}.dump", "wb") as fd: # type: ignore
pickle.dump(
(
self.name,
self.profile,
self._dump_cn,
{p: Counter(q) for p, q in norm.items()},
{p: Counter(q) for p, q in muts.items()},
[v for v in self.phases.values() if len(v) > 1],
self._fusion_counter,
self._indel_sites, # TODO: remove
),
fd, # type: ignore
)
def _realign_indels(self, tmp, sam, reference, long_reads=False):
"""Realign reads around database indels via indelpost module."""
assert self.profile, "profile not loaded"
from .indelpost import Variant, VariantAlignment
rname = f"{self._prefix}{self.gene.chr}"
if not reference:
reference = f"{tmp}/ref.fa"
with open(reference, "w") as f:
print(f">{rname}", file=f)
print("N" * self.gene._lookup_range[0], end="", file=f)
print(self.gene._lookup_seq, end="", file=f)
sz = sam.get_reference_length(rname)
print("N" * (sz - self.gene._lookup_range[1]), file=f)
with open(f"{reference}.fai", "w") as f:
print(rname, sz, len(rname) + 2, sz, sz + 1, sep="\t", file=f)
ref = pysam.FastaFile(reference) # type: ignore
prev_indel = None
for pos, op in sorted(self._indel_sites, key=lambda x: (x[0], -len(x[1]))):
p = pos
if op.startswith("del"):
if "ins" in op:
pd, pi = op[3:].split("ins")
o1, o2 = pd, pi
else:
p -= 1
o = self.gene[p]
o1, o2 = o + op[3:], o
else:
o = self.gene[p]
o1, o2 = o, o + op[3:]
v = Variant(rname, p + 1, o1, o2, ref) # type: ignore
if long_reads or not self.profile.indelpost:
# Speed-up: just generate equivalent indels, no need for the realignment
for ev in v.generate_equivalents():
np, no = ev.pos - 1, ""
if len(ev.ref) < len(ev.alt) and ev.alt.startswith(ev.ref):
np += len(ev.ref)
no = "ins" + ev.alt[len(ev.ref) :]
elif len(ev.ref) > len(ev.alt) and ev.ref.startswith(ev.alt):
np += len(ev.alt)
no = "del" + ev.ref[len(ev.alt) :]
if no:
self._indel_sites_eqs[np, no] = (pos, op)
continue
try:
exact_match_for_shiftable = True
valn = VariantAlignment( # type: ignore
v,
sam,
mapping_quality_threshold=self.profile.min_mapq,
base_quality_threshold=self.profile.min_quality,
# needed to account for indel and database errors
exact_match_for_shiftable=exact_match_for_shiftable,
)
phased = valn.phase()
if len(phased.ref) - len(phased.alt) != len(v.ref) - len(v.alt):
continue # HACK: this indicates a subsumed indel
self._indel_sites[pos, op] = list(valn.count_alleles())
on_target = {r.query_name for r in valn.fetch_reads("target")}
if (
prev_indel
and pos == prev_indel[0]
and op.startswith("ins")
and prev_indel[1].startswith(op) # handle indel repeats
):
# do not consider previously used reads
off, on = self._indel_sites[pos, op]
x = len(on_target & prev_indel[2])
self._indel_sites[pos, op] = [off + x, on - x]
if self._indel_sites[pos, op][1]:
log.debug(
"[indel] {}:{} -> {}", pos + 1, op, self._indel_sites[pos, op]
)
prev_indel = (pos, op, on_target)
except (IndexError, ValueError):
log.error(
"Indel realignment error. If using long reads, "
+ "try running with --param sam_long_reads=true. "
+ "Otherwise, try --param indelpost=false"
)
# assert False
def _load_cn_region(self, path, reference, cn_region):
"""
Load the copy-number neutral coverage data from a SAM/BAM/CRAM file.
:param cn_region: Copy-number neutral region to be used for coverage rescaling.
"""
self._dump_cn = defaultdict(int)
with pysam.AlignmentFile( # type: ignore
path, reference_filename=reference
) as sam:
# Check do we have proper index to speed up the queries
try:
has_index = sam.check_index()
except AttributeError:
# SAM files do not have an index. BAMs might also lack it
has_index = False
except ValueError:
raise AldyException(f"Cannot check index of {self.path}")
self._prefix = chr_prefix(
self.gene.chr, [x["SN"] for x in sam.header["SQ"]]
)
# Set it to _fetched_ if a CN-neutral region is user-provided.
# Then read the CN-neutral region.
if has_index:
iter = sam.fetch(region=cn_region.samtools(prefix=self._prefix))
else:
log.warn("SAM/BAM index not found. Reading will be slow.")
iter = sam.fetch()
for read in iter:
if _in_region(cn_region, read, self._prefix):
start = read.reference_start
if read.cigartuples is None:
continue
if read.is_supplementary:
continue
for op, size in read.cigartuples:
if op in [0, 7, 8, 2]:
for i in range(size):
self._dump_cn[start + i] += 1
start += size
return self._dump_cn
def _make_coverage(self, norm, muts):
"""Populate coverage data."""
coverage: Dict[int, Dict[str, List]] = dict()
for pos, cov in norm.items():
if len(cov) == 0:
continue
coverage.setdefault(pos, {})["_"] = cov
bounds = min(self.gene.chr_to_ref), max(self.gene.chr_to_ref)
for (pos, mut), cov in muts.items():
if pos not in coverage:
coverage[pos] = {}
if not bounds[0] <= pos <= bounds[1] and mut[:3] != "ins":
mut = "_" # ignore mutations outside of the region of interest
coverage.setdefault(pos, {}).setdefault(mut, []).extend(cov)
for pos, op in self._multi_sites.items():
if pos in coverage and op in coverage[pos]:
log.debug(f"[sam] multi-SNP {pos}{op}: {len(coverage[pos][op])} reads")
assert self.profile, "profile not set"
self.coverage = Coverage(
self.gene,
self.profile,
self,
{p: {m: v for m, v in coverage[p].items() if len(v) > 0} for p in coverage},
self._indel_sites,
self._dump_cn,
)
"""Sample coverage data."""
return self.coverage
def _parse_read(
self,
fragment,
ref_start,
cigar,
seq,
norm,
muts,
mq=None,
qual=None,
):
"""
Parse a read.
:param gene: Gene instance.
:param fragment: Read name.
:param ref_start: Start position in the reference genome.
:param cigar: List of CIGAR operations in tuple format (operation, size).
:param seq: Read sequence.
:param norm: Positions within the read that have not been mutated.
:param muts: Positions within the read that have been mutated.
:param mq: Mapping quality.
:param seq: Read qualities (normalized).
.. note:: `norm` and `muts` are modified.
.. note:: Qualities will be binned.
"""
def bin_quality(q):
"""
Quality score binning.
See https://www.illumina.com/content/dam/illumina-marketing/docum ents/products/technotes/technote_understanding_quality_scores.pdf # noqa
"""
if q < 2:
return int(q)
if q < 10:
return 6
if q < 20:
return 15
if q < 29:
return 25
if q < 39:
return 35
return 40
phase = self.phases.setdefault(fragment, {})
dump_arr = []
start, s_start = ref_start, 0
prev_q = 10
for op, size in cigar:
if op == 2: # Deletion
mut = (start, "del" + self.gene[start : start + size])
for i in range(size):
muts[start + i, "-"].append((bin_quality(mq), bin_quality(prev_q)))
dump_arr.append(mut)
if start in self.phaseable:
phase[start] = mut[1]
if self._indel_sites_eqs and mut in self._indel_sites_eqs:
self._indel_sites[self._indel_sites_eqs[mut]][1] += 1
start += size
elif op == 1: # Insertion
mut = (start, "ins" + seq[s_start : s_start + size])
q = mean(qual[s_start : s_start + size]) if qual else prev_q
muts[mut].append((bin_quality(mq), bin_quality(q)))
prev_q = q
dump_arr.append(mut)
if start in self.phaseable:
phase[start] = mut[1]
if self._indel_sites_eqs and mut in self._indel_sites_eqs:
self._indel_sites[self._indel_sites_eqs[mut]][1] += 1
s_start += size
elif op == 4: # Soft-clip
s_start += size
elif op in [0, 7, 8]: # M, X and =
for i in range(size):
q = qual[s_start + i] if qual else prev_q
if (
start + i in self.gene
and self.gene[start + i] != seq[s_start + i]
):
mut = (start + i, f"{self.gene[start + i]}>{seq[s_start + i]}")
dump_arr.append(mut)
muts[mut].append((bin_quality(mq), bin_quality(q)))
if start + i in self.phaseable:
phase[start + i] = mut[1]
else: # We ignore all mutations outside the RefSeq region
norm[start + i].append((bin_quality(mq), bin_quality(q)))
if start + i in self.phaseable:
phase[start + i] = "_"
prev_q = q
start += size
s_start += size
dump_arr_pos = {p for p, _ in dump_arr}
for pos, op in self._multi_sites.items():
if pos not in dump_arr_pos:
continue
l, r = op.split(">")
if all(
(pos + p, f"{l[p]}>{r[p]}") in dump_arr
for p in range(len(l))
if l[p] != "."
):
items = []
for p in range(len(l)):
if l[p] != ".":
items.append(muts[pos + p, f"{l[p]}>{r[p]}"].pop())
if p: # no idea why...
norm[pos + p].append(items[-1])
# TODO: use sth else instead of mean?
muts[pos, op].append(
(mean(mq for mq, _ in items), mean(q for _, q in items))
)
if self._indel_sites_eqs: # long-read hack
for pos, op in self._indel_sites:
if ref_start <= pos < start:
self._indel_sites[pos, op][0] += 1
read_pos = (ref_start, start, len(seq))
return read_pos, dump_arr
def _load_long_sam(self, sam_path: str, reference=None, debug=None):
"""
Load the read, mutation and coverage data from a long-read SAM/BAM/CRAM file.
This function remaps long reads for selected genes.
Tested on PacBio HiFi data only.
.. note:: Currently only supports CYP2D6.
"""
log.debug("[sam] pacbio_path= {}", os.path.abspath(sam_path))
assert self.genome, "Genome not provided"
assert self.genome == "hg38", "Only hg38 supported at this moment"
assert self.profile, "profile not set"
norm: dict = defaultdict(list)
muts: dict = defaultdict(list)
self._index = None
ref = script_path(f"aldy.resources.genes/{self.gene.name.lower()}.fa.gz")
if os.path.exists(ref):
import mappy
self._index = mappy.Aligner(ref, preset=self.profile.sam_mappy_preset)
seq_name = self._index.seq_names[0]
self._seq_range = list(map(int, seq_name.split(":")[1].split("-")))
self._index_gene = []
for _, g in enumerate(self.gene.regions):
st = min(r.start for r in g.values())
ed = max(r.end for r in g.values())
idx = mappy.Aligner(
seq=self._index.seq(seq_name)[
st - self._seq_range[0] : ed - self._seq_range[0]
],
preset=self.profile.sam_mappy_preset,
)
self._index_gene.append((idx, st))
self._fusion_signatures = {}
self._fusion_counter = {}
for n, cn in self.gene.cn_configs.items():
if cn.kind == CNConfigType.LEFT_FUSION:
s, pos = 0, next(i for i, j in cn.cn[0].items() if j == 1)
elif cn.kind == CNConfigType.RIGHT_FUSION:
s, pos = 1, next(i for i, j in cn.cn[0].items() if j == 0)
else:
continue
sig = (
(1 - s, list(cn.cn[0])[list(cn.cn[0]).index(pos) - 1]),
(s, pos),
)
self._fusion_counter[n] = [0, 0]
if (
sig[0][1] in self.gene.unique_regions
or sig[1][1] in self.gene.unique_regions
):
self._fusion_signatures[sig] = n
log.debug("[sam] PacBio remapping enabled")
with pysam.AlignmentFile( # type: ignore
sam_path, reference_filename=reference
) as sam, Timing("[sam] Remap"):
# Assumes SAM index exists
self._prefix = chr_prefix(
self.gene.chr, [x["SN"] for x in sam.header["SQ"]]
)
with tempfile.TemporaryDirectory() as tmp:
self._realign_indels(tmp, sam, reference, True)
for po, (off, on) in self._indel_sites.items():
self._indel_sites[po] = [off - on, on]
wide = self.gene.get_wide_region()
end = wide.end
if self.gene.name == "CYP2D6":
end = {"hg19": 42_551_000, "hg38": 42_155_000}[self.genome]
rng = GRange(wide.chr, wide.start, end)
counter = 0
iter = sam.fetch(region=rng.samtools(prefix=self._prefix))
for read in iter:
if not read.cigartuples or "H" in read.cigarstring:
continue
if read.reference_start >= rng.end or read.reference_end < rng.start:
continue
seq = read.query_sequence
if not seq:
continue
qual = read.query_qualities if read.query_qualities else [40] * len(seq)
if self._index:
# Split reads
pieces = []
s_start = 0
gene_bnd = self.gene.regions[0]["up"].end - self._seq_range[0]
pseudo_bnd = self.gene.regions[1]["up"].end - self._seq_range[0]
while s_start < len(seq):
s = seq[s_start:]
q = qual[s_start:]
h = self._map(self._index, s)
if (
h
and h.r_st <= gene_bnd < h.r_en
and gene_bnd - h.r_st > 100
):
s = seq[s_start : s_start + (gene_bnd - h.r_st)]
q = qual[s_start : s_start + (gene_bnd - h.r_st)]
h = self._map(self._index, s)
elif (
h
and h.r_st <= pseudo_bnd < h.r_en
and pseudo_bnd - h.r_st > 100
):
s = seq[s_start : s_start + (pseudo_bnd - h.r_st)]
q = qual[s_start : s_start + (pseudo_bnd - h.r_st)]
h = self._map(self._index, s)
if not h:
s_start += 100
else:
pcs, ed = self._split_read(s, q, h)
pieces += pcs
s_start += ed
for ref_start, seq, qual, cigar in pieces:
r = self._parse_read(
counter,
ref_start,
cigar,
seq,
norm,
muts,
read.mapping_quality,
qual,
)
counter += 1
if r and debug:
self._dump_reads.append(r)
else:
r = self._parse_read(
counter,
read.reference_start,
read.cigartuples,
seq,
norm,
muts,
read.mapping_quality,
read.query_qualities,
)
counter += 1
if r and debug:
self._dump_reads.append(r)
return norm, muts
def _map(self, idx, seq):
"""
Map a sequence to a mappy reference index.
:returns: The first primary hit or `None` if no primary hits are found.
"""
for hit in idx.map(seq):
if hit.is_primary:
return hit
return None
def _split_read(self, seq, qual, hit):
"""Split a long read."""
def _rev_cigar(c):
"""Reverse CIGAR tuples for pysam/mappy compatibility."""
return [(op, size) for (size, op) in c]
regs = self._get_gene_regions(
hit.r_st + self._seq_range[0] - 1, _rev_cigar(hit.cigar)
)
if regs:
splits = []
for fs, fn in self._fusion_signatures.items():
brk = [r for r in regs if r[0][1] == fs[1][1]]
if not brk:
continue
self._fusion_counter[fn][1] += 1
brk = brk[0]
lg, brk, rg = fs[1][0], brk[1] + brk[2], fs[0][0]
lh = self._map(self._index_gene[lg][0], seq[:brk])
rh = self._map(self._index_gene[rg][0], seq[brk:])
if lh and rh:
unmap = len(seq) - (lh.q_en - lh.q_st) - (rh.q_en - rh.q_st)
unmap += lh.NM + rh.NM
splits.append([fn, unmap, brk, lh, rh, lg, rg])
splits.sort(key=lambda x: x[1])
if splits and splits[0][1] <= hit.NM:
fn, _, brk, lh, rh, lg, rg = splits[0]
self._fusion_counter[fn][0] += 1
return (
[
(
lh.r_st + self._index_gene[lg][1] - 1,
seq[lh.q_st : lh.q_en],
qual[lh.q_st : lh.q_en],
_rev_cigar(lh.cigar),
),
(
rh.r_st + self._index_gene[rg][1] - 1,
seq[brk + rh.q_st : brk + rh.q_en],
qual[brk + rh.q_st : brk + rh.q_en],
_rev_cigar(rh.cigar),
),
],
max(lh.q_en, brk + rh.q_en),
)
return [
(
hit.r_st + self._seq_range[0] - 1,
seq[hit.q_st : hit.q_en],
qual[hit.q_st : hit.q_en],
_rev_cigar(hit.cigar),
)
], hit.q_en
def _get_gene_regions(self, r_start, cigar):
"""Get gene regions that cover the given CIGAR range."""
regs = []
start, s_start = r_start, 0
for op, size in cigar:
if op == 1 or op == 4:
s_start += size
elif op == 2 or op == 3:
start += size
elif op == 0 or op == 7 or op == 8:
for i in range(size):
rg = self.gene.region_at(start + i)
if rg:
if not regs or regs[-1][0] != rg:
regs.append([rg, s_start + i, 0])
regs[-1][2] += 1
start += size
s_start += size
return regs
def _load_novel_mutations(self):
ni = 0
for pos, muts in self.coverage._coverage.items():
for op in muts:
if op == "_":
continue
if (pos, op) in self.gene.mutations:
continue
e = self.gene.get_functional((pos, op), infer=True)
if not e:
continue
if (len(muts[op])) > 3:
log.debug(
f"[sam] Found potential novel major mutation "
f"{pos}.{op} ({e}): coverage = {len(muts[op])}"
)
self.gene.mutations.setdefault(
(pos, op),
(e, f"NOVEL:{pos+1}.{op}", pos, pos, op),
)
self.gene.random_mutations.add(Mutation(pos, op))
ni += 1
[docs]
def detect_genome(sam_path: str) -> Tuple[str, Optional[str]]:
"""Detect file type and the reference genome."""
try:
pysam.set_verbosity(0) # type: ignore
with pysam.AlignmentFile(sam_path) as sam: # type: ignore
try:
sam.check_index()
except AttributeError:
pass # SAM files do not have an index. BAMs might also lack it
except ValueError:
raise AldyException(
f"File {sam_path} has no index (it must be indexed)"
)
#: str: Check if we need to append 'chr' or not
_prefix = chr_prefix("1", [x["SN"] for x in sam.header["SQ"]])
# Detect the genome
hg19_lens = {"1": 249250621, "10": 135534747, "22": 51304566}
hg38_lens = {"1": 248956422, "10": 133797422, "22": 50818468}
chrs = {x["SN"]: int(x["LN"]) for x in sam.header["SQ"]}
is_hg19, is_hg38 = True, True
for c in "1 10 22".split():
if _prefix + c in chrs:
is_hg19 &= chrs[_prefix + c] == hg19_lens[c]
is_hg38 &= chrs[_prefix + c] == hg38_lens[c]
if is_hg19:
return "sam", "hg19"
elif is_hg38:
return "sam", "hg38"
else:
return "sam", None
except (ValueError, OSError):
try:
with pysam.VariantFile(sam_path): # type: ignore
return "vcf", None
except (ValueError, OSError):
if sam_path.endswith(".tar.gz"):
tar = tarfile.open(sam_path, "r:gz")
f = [i for i in tar.getnames() if i.endswith(".genome")]
if not f:
raise AldyException("Invalid dump file")
data = tar.extractfile(f[0])
if data:
genome = data.read().decode("utf-8").strip()
return "dump", genome
if sam_path.endswith(".txt") and os.path.exists(sam_path):
with open(sam_path) as f:
if f.readline().startswith("##batch-folder"):
return "pscan", None
return "", None
def _in_region(region: GRange, read, prefix: str) -> bool:
"""Check if a read intersects a given gene region."""
if read.reference_id == -1 or read.reference_name != prefix + region.chr:
return False
if read.reference_end is None:
return False
a = (read.reference_start, read.reference_end)
b = (region.start, region.end)
return a[0] <= b[0] <= a[1] or b[0] <= a[0] <= b[1]