Source code for aldy.major

# 786
# Aldy source: major.py
#   This file is subject to the terms and conditions defined in
#   file 'LICENSE', which is part of this source code package.


from typing import List, Dict, Tuple, Any, Optional
from natsort import natsorted
import collections
import copy

from . import lpinterface
from .common import log, json, sorted_tuple
from .gene import MajorAllele, Mutation, Gene
from .coverage import Coverage
from .solutions import CNSolution, MajorSolution, SolvedAllele


[docs] def estimate_major( gene: Gene, coverage: Coverage, cn_solution: CNSolution, solver: str, identifier: int = 0, debug: Optional[str] = None, ) -> List[MajorSolution]: """ Estimate optimal major star-alleles. :param gene: Gene instance. :param profile: Profile instance. :param coverage: Read coverage instance. :param cn_solution: Copy-number solution for major star-allele calling. :param solver: ILP solver (see :py:mod:`aldy.lpinterface` for supported solvers). :param identifier: Unique solution identifier. Used for debug purposes. Default: 0. :param debug: When set, create a `{debug}.major{identifier}.lp` model description file for debug purposes. Default: `None` (no debug dumps). """ log.debug("*" * 80) log.debug("[major] struct= {}", cn_solution._solution_nice()) alleles, coverage = _filter_alleles(gene, coverage, cn_solution) coverage.dump(log.trace) # Check if some CN solution has no matching allele if set(cn_solution.solution) - set(a.cn_config for a in alleles.values()): results: List[MajorSolution] = [] else: results = solve_major_model( gene, coverage, cn_solution, alleles, solver, identifier, debug ) # TODO: Check for novel functional mutations and do something with them return results
[docs] def solve_major_model( gene: Gene, coverage: Coverage, cn_solution: CNSolution, allele_dict: Dict[str, MajorAllele], solver: str, identifier: int = 0, debug: Optional[str] = None, ) -> List[MajorSolution]: """ Solves the major star-allele detection problem via integer linear programming. :param gene: Gene instance. :param coverage: Read coverage instance. :param cn_solution: Copy-number solution for major star-allele calling. :param allele_dict: Candidate major star-alleles. :param solver: ILP solver (see :py:mod:`aldy.lpinterface` for supported solvers). :param identifier: Unique solution identifier. Used for debug purposes. Default: 0. :param debug: When set, create a `{debug}.major{identifier}.lp` model description file for debug purposes. Default: `None` (no debug dumps). .. note:: Please see `Aldy paper <https://www.nature.com/articles/s41467-018-03273-1>`_ (section Methods/Major star-allele identification) for the model explanation. """ model = lpinterface.model("AldyMajor", solver) debug_info = json[gene.name]["major"][len(json[gene.name]["major"])] # Get the list of _all_ functional mutations present in the sample # and the database (intersection) func_muts = { Mutation(*m) for m in gene.mutations if gene.is_functional(m) and coverage[Mutation(*m)] > 0 } func_muts |= { m for m in gene.random_mutations if gene.is_functional((m.pos, m.op)) and coverage[m] > 0 } _print_candidates(gene, allele_dict, coverage, cn_solution, func_muts) a: Any = 0 # Create a binary variable for every possible allele copy alleles = {(a, int(0)): allele_dict[a] for a in allele_dict} for (an, _), a in list(alleles.items()): max_cn = cn_solution.solution[a.cn_config] for i in range(1, max_cn): alleles[an, i] = alleles[an, 0] VA = {a: model.addVar(vtype="B", name=f"A_{a[0]}_{a[1]}") for a in alleles} # Make sure that VA[i+1] <= VA[i] (to avoid equivalent solutions) for a, ai in alleles.keys(): if ai > 0: model.addConstr(VA[a, ai] <= VA[a, ai - 1], name=f"CORD_{a}_{ai}") # Add an error variable for every mutation VERR = { m: model.addVar(lb=-model.INF, ub=model.INF, name=f"E_{m.pos}_{m.op}") for m in func_muts } constraints = {e: 0 for e in VERR} # Add a binary variable for each mutation copy # For each present functional mutation, add a binary variable VNEW s.t. # VNEW[m] = 0 <=> sum(VA[a] if m in a) >= 1 VNEW = {m: model.addVar(vtype="B", name=f"N_{m}") for m in func_muts} # Populate the constraints for a in alleles: for m in alleles[a].func_muts: constraints[m] += VA[a] for m, v in VNEW.items(): constraints[m] += v # Populate constraints of non-variations (i.e. matches with the reference genome) for pos in set(m.pos for m in constraints): ref_m = Mutation(pos, "_") # type: ignore constraints[ref_m] = 0 VERR[ref_m] = model.addVar(lb=-model.INF, ub=model.INF, name=f"E_{pos}_REF") for a in alleles: if not gene.has_coverage(a[0], pos): continue # An insertion still contributes to the total coverage at this loci if any(ma[0] == pos and ma[1][:3] != "ins" for ma in alleles[a].func_muts): continue constraints[ref_m] += VA[a] # We ensure that only one additional mutation can be selected here (HACK) z = model.quicksum( v for m, v in VNEW.items() if m[0] == pos and m[1][:3] != "ins" ) model.addConstr(z <= 1, name=f"CONE_{pos}") # Each allele must express all of its functional mutations debug_info["id"] = identifier debug_info["cn"] = str(dict(cn_solution.solution)) debug_info["data"] = [] for m, expr in sorted(constraints.items()): if coverage.single_copy(m.pos, cn_solution) == 0: cov = 0.0 else: cov = coverage[m] / coverage.single_copy(m, cn_solution) model.addConstr(expr + VERR[m] <= cov, name=f"CFUNC_{m.pos}_{m.op}") model.addConstr(expr + VERR[m] >= cov, name=f"CFUNC_{m.pos}_{m.op}") debug_info["data"].append((m[0], m[1], cov)) # Each CN configuration must be satisfied by corresponding alleles for cnf, cnt in cn_solution.solution.items(): expr = sum(VA[a] for a in VA if alleles[a].cn_config == cnf) model.addConstr(expr <= cnt, name=f"CSAT_{cnf}") model.addConstr(expr >= cnt, name=f"CSAT_{cnf}") # Each functional mutation must be either chosen by some allele or marked as novel: # 1 == (m chosen by allele) XOR (m novel) == OR(VA[a] if m in a) XOR VNEW[m] for m in func_muts: VOR = model.addVar(vtype="B", name=f"OR_{m}") m_all = {a for a in alleles if m in alleles[a].func_muts} model.addConstr(VOR <= model.quicksum(VA[a] for a in m_all), name="COR") for a in m_all: model.addConstr(VOR >= VA[a], name="COR") VXOR = model.addVar(vtype="B", name=f"XOR_{m}") model.addConstr(VXOR <= VNEW[m] + VOR, name="CXOR") model.addConstr(VXOR <= 2 - VNEW[m] - VOR, name="CXOR") model.addConstr(VXOR >= VNEW[m] - VOR, name="CXOR") model.addConstr(VXOR >= VOR - VNEW[m], name="CXOR") model.addConstr(VXOR >= 1, name="CXOR") # Objective: minimize the absolute sum of errors and the number of novel mutations objective = 0 objective += model.abssum(e for e in VERR.values()) z = model.addVar(vtype="B", name="NOVEL") for m in VNEW: model.addConstr(z >= VNEW[m], name=f"NOVEL_UB_{VNEW[m]}") model.addConstr(z <= model.quicksum(VNEW[m] for m in VNEW), name="NOVEL_LB") objective += coverage.profile.major_novel * z objective += 0.1 * model.quicksum(VNEW[m] for m in VNEW) model.setObjective(objective) if debug: model.dump(f"{debug}.{gene.name}.major{identifier}.lp") # Solve the model lookup = { **{model.varName(v): a for a, v in VA.items()}, **{model.varName(v): m for m, v in VNEW.items()}, } result: Dict[Any, MajorSolution] = {} debug_info["sol"] = [] for status, opt, sol in model.solutions(coverage.profile.gap): solved_alleles = sorted_tuple( [lookup[s][0] for s in sol if s in lookup and s.startswith("A_")] ) novel_muts = sorted_tuple( [lookup[s] for s in sol if s in lookup and s.startswith("N_")] ) if (solved_alleles, novel_muts) not in result: solution = collections.Counter( SolvedAllele(gene, major=a) for a in solved_alleles ) debug_info["sol"].append( dict(collections.Counter(a for a in solved_alleles)) ) sol = MajorSolution( score=opt, solution=solution, cn_solution=cn_solution, added=list(novel_muts), ) log.debug( f"[major] status= {status}; opt= {opt:.2f}; " + f"solution= {sol._solution_nice()}" ) result[solved_alleles, novel_muts] = sol if not result: log.debug("[major] solution= []") return list(result.values())
def _filter_alleles( gene: Gene, coverage: Coverage, cn_solution: CNSolution ) -> Tuple[Dict[str, MajorAllele], Coverage]: """ Filter out low-quality mutations and alleles that are not expressed. :returns: Expressed alleles and the coverage of high-quality variants. """ def filter_fns(cov, mut): cond = cov.basic_filter(mut, cn=coverage.profile.cn_max) if mut.op != "_": cond = cond and cov.basic_filter( mut, cn=cn_solution.position_cn(mut.pos) + 0.5 ) return cond if coverage.profile.debug_probe: # Check for probe mutations and show their pileup probes = coverage.profile.debug_probe.split(";") d = collections.defaultdict(set) for (pos, op), (fn, rs, _, _, _) in gene.mutations.items(): d[fn].add((pos, op)) d[rs].add((pos, op)) d[str(pos + 1)].add((pos, op)) def pileup(pos): return "".join( (a if a == "_" else a[2:]) * len(c) for a, c in coverage._coverage[pos].items() ) for p in probes: for pos, op in d.get(p, set()): log.warn("{} -> {}: {}", p, pos + 1, op) for i in range(pos - 5, pos + 5): log.info(" {} {} {}", i + 1, "->" if i == pos else " ", pileup(i)) cov = coverage.filtered(Coverage.quality_filter) cov = cov.filtered(filter_fns) alleles = copy.deepcopy(gene.alleles) for an, a in natsorted(gene.alleles.items()): if a.cn_config not in cn_solution.solution: del alleles[an] elif any(cov[m] <= 0 for m in a.func_muts): s = (f"{gene.get_rsid(m)}" for m in a.func_muts if cov[m] <= 0) log.trace("[major] removing {} because of {}", an, " and ".join(s)) del alleles[an] return alleles, cov def _print_candidates( gene, alleles: Dict[str, Any], coverage: Coverage, cn_solution: CNSolution, muts: set, ): """Pretty-prints the list of allele candidates and their functional mutations.""" def print_mut(m): copies = ( coverage[m] / (coverage.total(m) / cn_solution.position_cn(m.pos)) if cn_solution.position_cn(m.pos) and coverage.total(m) else 0 ) g = gene.region_at(m.pos) return ( f" {gene.get_rsid(m):12} {str(m):15} " + f"{gene.get_refseq(m, from_atg=True):10} " + f"(cov={coverage[m]:4}, cn= {copies:3.1f}; " + f"region={g[1] if g else '?'}; " + f"impact={gene.get_functional(m)}" + ")" ) log.debug("[major] candidate mutations=") muts = set(m for a in alleles for m in alleles[a].func_muts) | set(muts) for m in sorted(muts): log.debug(print_mut(m)) als = natsorted( f"*{a:8}" for a, al in gene.alleles.items() if m in al.func_muts and "#" not in a ) if als: log.debug( " {}", "\n ".join(" ".join(als[i : i + 6]) for i in range(0, len(als), 6)), ) else: log.debug(" <NOVEL>") log.debug("[major] candidate alleles=") muts = muts.copy() for a in natsorted(alleles): log.debug(" *{} (struct= *{})", a, alleles[a].cn_config) for m in sorted(alleles[a].func_muts): if m in muts: muts.remove(m) log.debug(" {}", print_mut(m)) if len(muts) > 0: log.debug(" Other mutations:") for m in sorted(muts): a = (f"*{a}" for a, b in gene.alleles.items() if m in b.func_muts) log.debug(" {}, alleles={})", print_mut(m)[:-1], ", ".join(a))