# 786
# Aldy source: genotype.py
# This file is subject to the terms and conditions defined in
# file 'LICENSE', which is part of this source code package.
from typing import List, Optional, Any, Set, Dict
import os
import sys
import importlib_resources
import datetime
import time
from . import sam
from . import cn
from . import major
from . import minor
from . import diplotype
from . import solutions
from .common import (
colorize,
log,
script_path,
json,
AldyException,
SOLUTION_PRECISION,
)
from .profile import Profile
from .gene import Gene, GRange
from .diplotype import OUTPUT_COLS
from .lpinterface import model as lp_model
[docs]
def genotype(
gene_db: str,
sam_path: str,
profile_name: Optional[str],
output_file: Optional[Any] = sys.stdout,
cn_region: Optional[GRange] = None,
cn_solution: Optional[List[str]] = None,
solver: str = "any",
reference: Optional[str] = None,
debug: Optional[str] = None,
multiple_warn_level: int = 1,
report: bool = False,
genome=None,
is_simple: bool = False,
**params,
) -> Dict[str, List[solutions.MinorSolution]]:
"""Genotype a sample.
:param gene_db: Gene name (if it is shipped with Aldy)
or the location of the gene database in YAML format.
:param sam_path: Location of SAM/BAM/CRAM file that is to be analyzed.
:param profile_name: Coverage profile (e.g. WGS).
`None` if `cn_solution` is provided.
:param output_file: Location of the output file. Use `None` for no output.
Default: `sys.stdout`.
:param cn_region: Copy-number neutral region.
Default: None (uses the provided CYP2D8 region).
:param cn_solution: List of the copy number configurations.
Copy-number detection will not run if this parameter is set.
Default: `None`.
:param solver: ILP solver (see :py:mod:`aldy.lpinterface` for supported solvers).
:param reference: Reference genome (for reading CRAM files).
Default: `None`.
:param debug: Prefix for debug and core dump files.
Default: `None` (no debug information).
:param multiple_warn_level: Warning level (1 for optimal solutions, 2 for major
solutions and 3 for CN solutions).
Default: 1 (warn only on multiple optimal solutions).
:param report: If set, write the solution summary to the stderr.
Default: `False`.
:param genome: Reference genome (e.g., hg19 or hg38).
Default: `None` (auto-detect).
:param is_simple: Use simple output format.
Default: `False`.
:param params: Model parameters. See :py:mod:`aldy.profile` for details.
"""
t1 = time.time()
log.debug("[genotype] gene={}; start={}", gene_db, datetime.datetime.now())
# Test the existence of LP solver
_ = lp_model("init", solver)
with open(sam_path): # Check if file exists
pass
kind, g = sam.detect_genome(sam_path)
if genome is None:
genome = g
if not genome:
log.warn("WARNING: Cannot detect genome, defaulting to hg19.")
genome = "hg19"
log.debug(f"[genotype] reference= {genome}")
if genome not in ["hg19", "hg38"]:
raise AldyException(f"Unknown genome {genome}")
avail_genes = []
if gene_db == "all":
avail_genes = importlib_resources.files("aldy.resources.genes").iterdir()
avail_genes = [
i.name[:-4]
for i in avail_genes
if len(i.name) > 4 and i.name.endswith(".yml") and not i.name.startswith("pharma-")
]
avail_genes = sorted(avail_genes)
else:
avail_genes = gene_db.lower().split(",")
if len(avail_genes) != 1:
res: Dict = {}
for a in avail_genes:
log.warn("=" * 50)
log.warn("Gene {}", a.upper())
try:
res = {
**res,
**genotype(
a,
sam_path,
profile_name,
output_file,
cn_region,
cn_solution,
solver,
reference,
debug,
multiple_warn_level,
report,
genome,
is_simple,
**params,
),
}
except AldyException as ex:
log.error(f"Failed gene {a.upper()}")
log.error(f"Message: {str(ex)}")
log.warn("")
return res
else:
db_file = script_path(
"aldy.resources.genes/{}.yml".format(avail_genes[0].lower())
)
# Load the gene specification
if os.path.exists(db_file):
gene_db = db_file
with open(gene_db): # Check if file exists
pass
gene = Gene(gene_db, genome=genome)
if profile_name in ["exome", "wxs", "wes"]:
gene.do_copy_number = False
profile_name = "illumina"
params["min_coverage"] = 5.0
elif profile_name == "wgs":
profile_name = "illumina"
elif profile_name == "pgrnseq-v1":
profile_name = "pgx1"
elif profile_name == "pgrnseq-v2":
profile_name = "pgx2"
elif profile_name == "pgrnseq-v3":
profile_name = "pgx3"
if kind in ["vcf", "pscan"]:
log.warn("WARNING: Using VCF file. Copy-number calling is not available.")
profile = Profile("user_provided", cn_solution=["1", "1"], **params)
sample = sam.Sample(gene, profile, sam_path, debug=debug)
else:
if cn_solution:
profile = Profile("user_provided", cn_solution=cn_solution, **params)
elif kind != "dump":
if not profile_name:
raise AldyException("Profile not provided")
profile = Profile.load(gene, profile_name, cn_region, **params)
else:
profile = None
sample = sam.Sample(gene, profile, sam_path, reference, debug)
profile = sample.profile # if loaded for a dump
assert profile, "Profile not set"
if kind == "dump":
profile.update(params)
json[gene.name].update({"sample": sample.name})
is_vcf = output_file and output_file.name.endswith(".vcf")
is_simple = is_simple or (
output_file is not None and output_file.name.endswith(".simple")
)
is_aldy = output_file and not (is_vcf or is_simple)
if is_simple:
print(
sample.name,
gene.name,
sep="\t",
end="\t",
file=output_file,
)
if kind not in ["vcf", "pscan"]:
avg_cov = sample.coverage.average_coverage()
if profile.cn_region and avg_cov < profile.min_avg_coverage:
if is_simple:
print(file=output_file)
raise AldyException(
f"Average coverage of {avg_cov:.2f} for gene {gene.name} is too low; "
+ f"skipping gene {gene.name}. "
+ f"Please ensure that {gene.name} is present in the input SAM/BAM."
)
elif profile.cn_region and avg_cov < 20:
log.warn(
f"Average sample coverage is {avg_cov}. "
+ "We recommend at least 20x coverage for the best results."
)
# Get copy-number solutions
cn_sols = cn.estimate_cn(
gene,
profile,
sample.coverage,
solver=solver,
debug=debug,
)
# Add SLACK to each score to avoid division by zero or other numerical issues
# when the optimum is close to zero.
# HACK: Value of 1 (1 gene copy) seems to be reasonable, but is not
# theoretically guaranteed to be better than any other value.
SLACK = 1
# Get major solutions and pick the best one
if multiple_warn_level >= 3 and len(cn_sols) > 1:
log.warn("WARNING: multiple gene structures found!")
log.info(f"Potential {gene.name} gene structures for {sample.name}:")
major_sols: list = []
cn_sols = sorted(cn_sols, key=lambda m: (int(1000 * m.score), m._solution_nice()))
if len(cn_sols) == 0:
if is_simple:
print(file=output_file)
raise AldyException("No solutions found!")
min_cn_score = min(cn_sols, key=lambda m: m.score).score
for i, cn_sol in enumerate(cn_sols):
conf = 100 * (min_cn_score + SLACK) / (cn_sol.score + SLACK)
log.info(f" {i + 1:2}: {cn_sol._solution_nice()} (confidence: {conf:.0f}%)")
log.debug("*" * 80)
# return []
for i, cn_sol in enumerate(cn_sols):
sols = major.estimate_major(
gene,
sample.coverage,
cn_sol,
solver=solver,
identifier=i,
debug=debug,
)
for s in sols:
s.score += cn_sol.score - min_cn_score
major_sols += sols
if len(major_sols) == 0:
if is_simple:
print(file=output_file)
raise AldyException("No major solutions found!")
major_sols = [
solutions.MajorSolution(
m.score, # * ((m.cn_solution.score + SLACK) / (min_cn_score + SLACK)),
m.solution,
m.cn_solution,
m.added,
)
for m in major_sols
]
min_major_score = min(major_sols, key=lambda m: m.score).score
major_sols = sorted(
[
m
for m in major_sols
if m.score - min_major_score - profile.gap < SOLUTION_PRECISION
],
key=lambda m: (int(1000 * m.score), m._solution_nice()),
)
if multiple_warn_level >= 2 and len(major_sols) > 1:
log.warn("WARNING: multiple major solutions found!")
log.info(f"Potential major {gene.name} star-alleles for {sample.name}:")
for i, major_sol in enumerate(major_sols):
conf = 100 * (min_major_score + SLACK) / (major_sol.score + SLACK)
log.info(f" {i + 1:2}: {major_sol._solution_nice()} (confidence: {conf:.0f}%)")
log.debug("*" * 80)
minor_sols = []
for m in minor.estimate_minor(
gene,
sample.coverage,
major_sols,
solver,
max_solutions=profile.max_minor_solutions,
):
n = solutions.MinorSolution(
m.score
* ((m.major_solution.cn_solution.score + SLACK) / (min_cn_score + SLACK)),
m.solution,
m.major_solution,
profile=profile,
)
n.set_diplotype(m.get_diplotype())
minor_sols.append(n)
if len(minor_sols) == 0:
if is_simple:
print(file=output_file)
raise AldyException(
"Aldy could not phase any major solution.\n"
+ "Possible solutions:\n"
+ " - Check the coverage. Extremely low coverage prevents Aldy from "
+ "calling star-alleles.\n"
+ " - Run with --debug parameter and notify the authors of Aldy."
)
min_minor_score = min(minor_sols, key=lambda m: m.score).score
minor_sols = sorted(
[
m
for m in minor_sols
if m.score - min_minor_score - profile.gap < SOLUTION_PRECISION
],
key=lambda m: (int(1000 * m.score), m._solution_nice()),
)
log.debug("*" * 80)
if multiple_warn_level >= 1 and len(minor_sols) > 1:
log.warn("WARNING: multiple optimal solutions found!")
log.info(
f"{{}} {gene.name} star-alleles for {sample.name}:",
"Best" if len(minor_sols) == 1 else "Potential",
)
if is_aldy:
print("#" + "\t".join(OUTPUT_COLS), file=output_file)
simple = [sample.name, gene.name]
for i, minor_sol in enumerate(minor_sols):
conf = 100 * (min_minor_score + SLACK) / (minor_sol.score + SLACK)
log.info(
f" {i + 1:2}: {minor_sol.get_major_diplotype()}"
+ f" (confidence={conf:.0f}%)"
)
log.info(f" Minor alleles: {minor_sol._solution_nice()}")
simple += [
minor_sol.get_major_diplotype().replace(" ", ""),
minor_sol.get_minor_diplotype(legacy=True).replace(" ", ""),
]
if is_aldy:
cpic = f"; gene={gene.name}-{gene.version}"
if gene.cpic:
cpic_score, cpic_fun = diplotype.estimate_cpic(gene, minor_sol)
cpic += f"; cpic={cpic_fun}"
if gene.cpic_scores and cpic_fun != "indeterminate":
cpic += f"; cpic_score={cpic_score}"
print(
f"#Solution {i + 1}: {minor_sol._solution_nice()}{cpic}",
file=output_file,
)
diplotype.write_decomposition(
sample.name, gene, sample.coverage, i + 1, minor_sol, output_file
)
elif is_simple:
print(simple[-2], simple[-1], sep="\t", end="\t", file=output_file)
if is_vcf:
diplotype.write_vcf(sample.name, gene, sample.coverage, minor_sols, output_file)
elif is_simple:
print(file=output_file)
log.debug("[simple]\t{}", "\t".join(simple))
if report:
log.info(colorize(f"{gene.name} results:"))
reported: Set[str] = set()
for r in minor_sols:
st = f" - {r.get_major_diplotype()}"
if st not in reported:
log.info(colorize(st))
log.info(f" Minor: {r.get_minor_diplotype()}")
log.info(f" Legacy notation: {r.get_minor_diplotype(legacy=True)}")
reported.add(st)
if gene.cpic:
cpic_score, cpic_fun = diplotype.estimate_cpic(gene, r)
s = f" CPIC functionality: {cpic_fun}"
if gene.cpic_scores and cpic_fun != "indeterminate":
s += f" (activity score = {cpic_score})"
log.info(s)
# Output the activity
for m in r.solution:
mn = [
ma.minors[m.minor]
for ma in gene.alleles.values()
if m.minor in ma.minors
]
msg = f"Estimated activity for {m.major_repr()}"
if len(mn) == 1 and mn[0].activity:
msg = f"{msg}: {mn[0].activity}"
if gene.cpic_scores and mn[0].activity in gene.cpic_scores:
msg = f"{msg} (score: {gene.cpic_scores[mn[0].activity]})"
if mn[0].evidence:
msg = f"{msg}; evidence: {mn[0].evidence}"
if mn[0].pharmvar:
msg = f"{msg} (see {mn[0].pharmvar} for details)"
else:
msg = f"{msg}: unknown"
log.info(f" {msg}")
# Calculate coverage of each mutation in a solution
if multiple_warn_level >= 2:
for m, mc, sc in r.get_mutation_coverages(sample.coverage):
if abs(mc - sc) > 0.5:
log.warn(
" Warning: Coverage of {} is not in line with "
"the prediction (predicted: {:.1f}, observed: {:.1f})",
gene.get_rsid(m),
mc,
sc,
)
if (
any(len(m.major_solution.added) > 0 for m in minor_sols)
and not sample.is_long_read
):
novels = {
gene.get_rsid(mm) for m in minor_sols for mm in m.major_solution.added
}
log.warn(
"WARNING: mutations {} suggest presence of a novel major star-allele.\n"
"However, exact allele contents cannot be determined without the "
"long-read phasing data. The reported assignments of novel mutations "
"are for that reason currently randomly assigned.",
", ".join(sorted(novels)),
)
log.debug("[genotype] gene={}; took={}", gene_db, time.time() - t1)
return {gene_db: minor_sols}