# 786
# Aldy source: minor.py
# This file is subject to the terms and conditions defined in
# file 'LICENSE', which is part of this source code package.
from typing import List, Set, Tuple, Dict
from natsort import natsorted
import collections
import os
from . import lpinterface
from .common import log, json, Timing
from .gene import Mutation, Gene
from .coverage import Coverage
from .solutions import MajorSolution, SolvedAllele, MinorSolution
from .diplotype import estimate_diplotype
[docs]
def estimate_minor(
gene: Gene,
coverage: Coverage,
major_sols: List[MajorSolution],
solver: str,
max_solutions: int = 1,
) -> List[MinorSolution]:
"""
Estimate the optimal minor star-allele.
:param gene: Gene instance.
:param coverage: Read coverage instance.
:param major_sol: Major allele solution for minor star-allele calling.
:param solver: ILP solver (see :py:mod:`aldy.lpinterface` for supported solvers).
:param max_solutions: Maximum number of solutions to report.
Default: 1.
"""
# Get the list of potential alleles and mutations
alleles: List[SolvedAllele] = list()
# Consider all major and minor mutations
# *from all available major solutions* together
mutations: Set[Mutation] = set()
for major_sol in major_sols:
for sa in major_sol.solution:
alleles += [
SolvedAllele(gene, sa.major, mi) for mi in gene.alleles[sa.major].minors
]
mutations |= set(gene.alleles[sa.major].func_muts)
for minor in gene.alleles[sa.major].minors.values():
mutations |= set(minor.neutral_muts)
mutations |= set(major_sol.added)
# Filter out low quality mutations
def default_filter_fn(cov, mut):
# TODO: is this necessary?
r = gene.region_at(mut.pos)
if mut.op != "_" and not (
mut in mutations
or (r and r[1][0] == "e")
or (r and r[1] in ["utr3", "utr5", "up"])
):
return False
cond = cov.basic_filter(mut, cn=coverage.profile.cn_max)
if mut.op != "_":
cond = cond and cov.basic_filter(
mut, cn=major_sol.cn_solution.position_cn(mut.pos) + 0.5
)
return cond
cov = coverage.filtered(Coverage.quality_filter)
cov = cov.filtered(default_filter_fn)
mutations |= {m for m in gene.random_mutations if cov[m] > 0}
# Group by CN solutions
minor_sols: List[MinorSolution] = []
cn_sols = {m.cn_solution for m in major_sols}
min_score = min(m.score for m in major_sols)
for c in sorted(cn_sols, key=lambda x: x._solution_nice()):
log.debug("*" * 80)
majors = [m for m in major_sols if m.cn_solution == c]
_print_candidates(gene, alleles, c, cov, mutations)
for major_sol in natsorted(majors, key=lambda s: str(s.solution)):
sols = solve_minor_model(
gene,
cov,
major_sol,
alleles,
mutations,
solver,
max_solutions,
)
for s in sols:
s.score += major_sol.score - min_score
minor_sols += sols
return minor_sols
[docs]
def solve_minor_model(
gene: Gene,
coverage: Coverage,
major_sol: MajorSolution,
alleles_list: List[SolvedAllele],
mutations: Set[Mutation],
solver: str,
max_solutions: int = 1,
) -> List[MinorSolution]:
"""
Solves the minor star-allele detection problem via integer linear programming.
:param gene: Gene instance.
:param coverage: Read coverage instance.
:param major_sol: Major allele solution for minor star-allele calling.
:param alleles_list: Candidate minor star-alleles.
:param mutations: Mutations to consider for model building (all other mutations
are ignored).
:param solver: ILP solver (see :py:mod:`aldy.lpinterface` for supported solvers).
:param max_solutions: Maximum number of solutions to report.
Default: 1.
.. note::
Please see `Aldy paper <https://www.nature.com/articles/s41467-018-03273-1>`_
(section Methods/Genotype refining) for the model explanation.
Currently returns only the first optimal solution.
"""
log.debug("[minor] major= {}", major_sol._solution_nice())
model = lpinterface.model("AldyMinor", solver)
debug_info = json[gene.name]["minor"][len(json[gene.name]["minor"])]
# Establish minor alleles and their mutations
alleles: Dict[Tuple[SolvedAllele, int], Set[Mutation]] = {
(a, 0): set(gene.alleles[a.major].func_muts)
| set(gene.alleles[a.major].minors[a.minor].neutral_muts)
for a in alleles_list
}
for a, _ in list(alleles):
max_cn = major_sol.solution[SolvedAllele(gene, a.major, "", a.added, a.missing)]
for cnt in range(1, max_cn):
alleles[a, cnt] = alleles[a, 0]
VA = {
a: model.addVar(vtype="B", name=f"A_{a[0].major}_{a[0].minor}_{a[1]}")
for a in alleles
}
for a, cnt in alleles:
if cnt > 0:
model.addConstr(VA[a, cnt] <= VA[a, cnt - 1], name=f"CORD_{a.minor}_{cnt}")
# Make sure that the sum of all subaleles matches the count
# of the corresponding major alleles
for sa, cnt in major_sol.solution.items():
expr = model.quicksum(
v
for (vs, _), v in VA.items()
if (vs.major, vs.added, vs.missing) == (sa.major, sa.added, sa.missing)
)
model.addConstr(expr <= cnt, name=f"CCNT_{sa.major}_1")
model.addConstr(expr >= cnt, name=f"CCNT_{sa.major}_2")
# Disable other alleles
model.addConstr(
model.quicksum(VA.values()) <= sum(major_sol.solution.values()),
name="CCNT_OTHER",
)
# Add a binary variable for each allele/mutation pair, where mutation belongs
# to that allele, that will indicate whether such mutation will be kept or not.
# Second variable stands for the corresponding VA * VKEEP binary product
# (binary product transformation).
VKEEP: dict = {
a: {
m: (
model.addVar(
vtype="B", name=f"K_{m.pos}_{m.op}_{a[0].major}_{a[0].minor}_{a[1]}"
),
model.addVar(
vtype="B",
name=f"MUL_K_{m.pos}_{m.op}_{a[0].major}_{a[0].minor}_{a[1]}",
),
)
for m in alleles[a]
}
for a in alleles
}
# Add a binary variable for each allele/mutation pair, where mutation DOES NOT
# belong to that allele,that will indicate whether such mutation will be assigned
# to that allele or not.
# Second variable stands for the corresponding VA * VNEW product
# (binary product transformation).
VNEW: dict = {
a: {
m: (
model.addVar(
vtype="B", name=f"N_{m.pos}_{m.op}_{a[0].major}_{a[0].minor}_{a[1]}"
),
model.addVar(
vtype="B",
name=f"MUL_N_{m.pos}_{m.op}_{a[0].major}_{a[0].minor}_{a[1]}",
),
)
for m in mutations
if gene.has_coverage(a[0].major, m.pos) and m not in alleles[a]
}
for a in alleles
}
# Add an error variable for each mutation, and populate error constraints
VERR = {
m: model.addVar(lb=-model.INF, ub=model.INF, name=f"E_{m.pos}_{m.op}")
for m in mutations
}
constraints = {m: 0 for m in mutations}
for m in mutations:
for a in alleles:
if m in alleles[a]:
constraints[m] += model.prod(VKEEP[a][m][1], [VA[a], VKEEP[a][m][0]])
# Add this *only* if CN of this region in a given allele is positive
elif gene.has_coverage(a[0].major, m.pos):
constraints[m] += model.prod(VNEW[a][m][1], [VA[a], VNEW[a][m][0]])
# Fill the constraints for non-variations (i.e. reference genome matches)
for pos in set(m.pos for m in constraints):
ref_m = Mutation(pos, "_") # type: ignore
VERR[ref_m] = model.addVar(lb=-model.INF, ub=model.INF, name=f"E_{pos}_REF")
constraints[ref_m] = 0
for a in alleles:
if not gene.has_coverage(a[0].major, pos):
continue
# Does this allele contain any mutation at the position `pos`?
# Insertions are not counted as they always contribute to `_`.
present_muts = [m for m in alleles[a] if m.pos == pos and m[1][:3] != "ins"]
assert len(present_muts) < 2
if len(present_muts) == 1:
constraints[ref_m] += VA[a] - VKEEP[a][present_muts[0]][1]
else:
constraints[ref_m] += VA[a]
muts = [m for m in VNEW[a] if m.pos == pos and m[1][:3] != "ins"]
for m in muts:
constraints[ref_m] -= VNEW[a][m][1]
# We ensure that only one additional mutation can be selected here
model.addConstr(
model.quicksum(VNEW[a][m][0] for m in muts) <= 1,
name=f"CONE_{pos}_{a[0].major}_{a[0].minor}_{a[1]}",
)
# Ensure that each constraint matches the observed coverage
debug_info["cn"] = dict(major_sol.cn_solution.solution)
debug_info["major"] = {s.major: v for s, v in major_sol.solution.items()}
debug_info["data"] = []
for m, expr in sorted(constraints.items()):
scov = coverage.single_copy(m, major_sol.cn_solution)
# If scov = 0, no mutations should be selected at that locus
# (enforced by other constraints)
cov = coverage[m] / scov if scov > 0 else 0
model.addConstr(expr + VERR[m] >= cov, name=f"CCOV_{m.pos}_{m.op}")
model.addConstr(expr + VERR[m] <= cov, name=f"CCOV_{m.pos}_{m.op}")
debug_info["data"].append((m.pos, m.op, coverage[m]))
# TODO: downscale ambiguous mutations (right now score == 1 for all mutations)
score = {}
for m, v in VERR.items():
s = 1
# ops = coverage._coverage[m.pos].get(m.op, [])
# if ops:
# mx = max(mq for mq, _ in ops)
# sx = sum(math.log(1 + mq) / math.log(1 + mx) for mq, _ in ops) / len(ops)
# s += 1 - sx
score[model.varName(v)] = s
# Enforce the following rules:
# 1) Each mutation is assigned only to alleles that are present in the solution
for a, mv in VKEEP.items():
for m, v in mv.items():
model.addConstr(
v[0] <= VA[a],
name=f"CVK_{m.pos}_{m.op}_{a[0].major}_{a[0].minor}_{a[1]}",
)
for a, mv in VNEW.items():
for m, v in mv.items():
model.addConstr(
v[0] <= VA[a],
name=f"CVN_{m.pos}_{m.op}_{a[0].major}_{a[0].minor}_{a[1]}",
)
# 2) Each allele must express ALL of its functional mutations
for a in alleles:
for m in alleles[a]:
if gene.is_functional(m):
assert m in VKEEP[a]
model.addConstr(
VKEEP[a][m][0] >= VA[a],
name=f"CFUNC_{m.pos}_{m.op}_{a[0].major}_{a[0].minor}_{a[1]}",
)
# 3) No allele can include a mutation with coverage 0
for a in VKEEP:
for m, v in VKEEP[a].items():
if not gene.has_coverage(a[0].major, m.pos):
model.addConstr(
v[0] <= 0,
name=f"CZERO_{m.pos}_{m.op}_{a[0].major}_{a[0].minor}_{a[1]}",
)
# 4) Avoid extra mutations if there is an existing mutation at the corresponding
# locus (either from the definition or added via VNEW)
for pos in set(m.pos for m in constraints):
for a in alleles:
mp = [VKEEP[a][m][1] for m in VKEEP[a] if m.pos == pos]
ma = [VNEW[a][m][1] for m in VNEW[a] if m.pos == pos]
# TODO: add support for extra insertions!
if len(ma) > 1: # useless if == 1 since variables are binary
model.addConstr(
model.quicksum(ma) <= 1,
name=f"CSINGLE_{pos}_{a[0].major}_{a[0].minor}_{a[1]}",
)
if len(ma) + len(mp) > 1:
model.addConstr(
model.quicksum(mp + ma) <= 1,
name=f"CSINGLEFULL_{pos}_{a[0].major}_{a[0].minor}_{a[1]}",
)
# 5) Make sure that each mutation copy has at least one read supporting it
for m in mutations:
expr = model.quicksum(VKEEP[a][m][1] for a in alleles if m in VKEEP[a])
expr += model.quicksum(VNEW[a][m][1] for a in alleles if m in VNEW[a])
if major_sol.cn_solution.position_cn(m.pos) == 0 or coverage[m] == 0:
model.addConstr(expr <= 0, name=f"CNOCOV_{m.pos}_{m.op}")
else:
model.addConstr(expr <= coverage[m], name=f"CMAXCOV_{m.pos}_{m.op}")
# Ensure that at least one allele picks an existing non-filtered mutation
model.addConstr(expr >= 1, name=f"CMINONE_{m.pos}_{m.op}")
# 6) Do the same for non-mutations
for pos in set(m.pos for m in constraints):
expr = 0
max_mut = 0
for a in alleles:
e = [VKEEP[a][m][1] for m in VKEEP[a] if m.pos == pos]
e += [VNEW[a][m][1] for m in VNEW[a] if m.pos == pos]
max_mut = max(max_mut, len(e))
expr += len(e) * VA[a] - model.quicksum(e)
if isinstance(expr, int) and expr == 0:
continue
m = Mutation(pos, "_")
if major_sol.cn_solution.position_cn(m.pos) == 0:
model.addConstr(expr <= 0, name=f"CNOCOV_{m.pos}_{m.op}")
else:
# If there is no coverage at `pos` at all despite CN being > 0 (or if there
# is coverage only on a functional mutation that cannot be selected),
# allow the allele to select a non-existent non-mutation to prevent
# infeasibility. Should not happen with "sane" datasets...
model.addConstr(
expr
<= max(major_sol.cn_solution.position_cn(m.pos), coverage[m], max_mut),
name=f"CMAXCOV_{m.pos}_{m.op}",
)
# 7) Respect phasing
VPHASEERR = []
VPHASE = {}
modes = collections.defaultdict(int)
if coverage.profile.phase and coverage.sam:
mut_pos = {m.pos for m in mutations}
for rr, rv in coverage.sam.phases.items():
c = []
for k, v in rv.items():
if k in mut_pos:
c.append((k, v))
c = sorted(c)
if len(c) > 1:
modes[tuple(c)] += 1
log.debug("[minor] number of phases= {}", len(modes))
log.debug("[minor] number of alleles= {}", len(alleles))
if len(modes) * len(alleles) > coverage.profile.minor_phase_vars:
max_sample = len(modes) * (
coverage.profile.minor_phase_vars / (len(modes) * len(alleles))
)
log.debug("[minor] downsampling to= {}", max_sample)
skip = len(modes) / max_sample
if max_sample < len(modes):
mi = list(modes.items())
modes = dict(mi[i] for i in range(0, len(mi), int(skip)))
with Timing("[minor] Model setup"):
addVar = lambda n: model.addVar(vtype="B", name=n, update=False) # noqa
vars = 0
for ri, (rr, cnt) in enumerate(modes.items()):
r = dict(rr)
for ai, a in enumerate(alleles):
pos, neg, e = [], [], 0
for m in mutations:
if m.pos not in r or not gene.has_coverage(a[0].major, m.pos):
continue
if m in VKEEP[a]:
(pos if m.op == r[m.pos] else neg).append(VKEEP[a][m][0])
elif m in VNEW[a]:
(pos if m.op == r[m.pos] else neg).append(VNEW[a][m][0])
if len(pos) + len(neg) > 1:
v = VPHASE[ai, ri] = addVar(f"PH_{ai}_{ri}")
vars += 1
model.addConstr(VPHASE[ai, ri] <= VA[a], name=f"PH_{ai}_{ri}")
e = 0
for i, vm in enumerate(pos):
v_vp = model.prod(addVar(f"PHASE2_{ai}_{ri}_{i}"), [v, vm])
vars += 1
e += v - v_vp
for i, vm in enumerate(neg):
v_vp = model.prod(addVar(f"PHASE3_{ai}_{ri}_{i}"), [v, vm])
vars += 1
e += v_vp
VPHASEERR.append(cnt * e)
for ri, _ in enumerate(modes):
v = [
VPHASE[ai, ri] for ai, _ in enumerate(alleles) if (ai, ri) in VPHASE
]
if v:
e = model.quicksum(v)
model.addConstr(e <= 1, name=f"PHASE4_{ri}_1")
model.addConstr(e >= 1, name=f"PHASE4_{ri}_2")
model.update()
log.debug("[minor] phasing setup done, vars= {}", vars)
# Objective: minimize the absolute sum of errors ...
objective = 0
o_error = model.abssum((v for v in VERR.values()), coeffs=score)
objective += o_error
# ... and penalize the misses ...
o_penal = coverage.profile.minor_miss * model.quicksum(
len(VKEEP[a]) * VA[a] for a in VKEEP
)
o_penal -= coverage.profile.minor_miss * model.quicksum(
v[1] for a in VKEEP for _, v in VKEEP[a].items()
)
# ... and additions ...
cnt = 0
for a in VNEW:
for _, v in VNEW[a].items():
# HACK:
# Add cnt/10000 to select the smallest allele if there is a tie
o_penal += coverage.profile.minor_add * (1 + cnt / 1000000) * v[0]
cnt += 1
# ... and novel functional mutations from the major model...
for m in {m for a in VNEW for m in VNEW[a]}:
vars = [
VNEW[a][m][0]
for a in VNEW
if m in VNEW[a]
if gene.is_functional(m)
if m not in gene.alleles[a[0].major].func_muts
]
if vars:
vo = model.addVar(vtype="B", name=f"VNEWOR_{m.pos}_{m.op}")
model.addConstr(vo <= model.quicksum(vars), name=f"VNEWOR1_{m.pos}_{m.op}")
for vi, v in enumerate(vars):
model.addConstr(vo >= v, name=f"VNEWOR2_{m.pos}_{m.op}_{vi}")
o_penal += coverage.profile.minor_add / 2 * vo
objective += o_penal
# ... and phasing error!
o_phase = coverage.profile.minor_phase * model.quicksum(VPHASEERR)
objective += o_phase
model.setObjective(objective)
# Solve the model
results = {}
with Timing("[minor] Model solution"):
for status, opt, sol in model.solutions():
solution = []
for allele, value in VA.items():
if model.getValue(value) <= 0:
continue
if coverage.profile.phase and coverage.sam:
ai = next(i for i, a in enumerate(alleles) if allele == a)
_print_phase(
model, gene, mutations, modes, VPHASE, VKEEP, VNEW, allele, ai
)
added: List[Mutation] = []
missing: List[Mutation] = []
for m, mv in VKEEP[allele].items():
if not model.getValue(mv[0]):
missing.append(m)
for m, mv in VNEW[allele].items():
if model.getValue(mv[0]):
added.append(m)
else:
copies = coverage.single_copy(m, major_sol.cn_solution)
copies = coverage[m] / copies if copies > 0 else 0
if abs(copies - major_sol.cn_solution.max_cn()) > 1e-5:
continue
# HACK: add homozygous mutation to _all_ alleles
# Works only if a mutation is unambiguiusly homozygous;
# otherwise, it will fall back to the model defaults
if m not in alleles[allele]:
added.append(m)
solution.append(
SolvedAllele(
gene,
allele[0].major,
allele[0].minor,
allele[0].added + added,
missing,
)
)
sol = MinorSolution(
score=opt,
solution=solution,
major_solution=major_sol,
profile=coverage.profile,
)
_ = estimate_diplotype(gene, sol)
debug_info["sol"] = [(s.minor, s.added, s.missing) for s in solution]
debug_info["diplotype"] = sol.diplotype
log.debug(
f"[minor] status= {status}; opt= {opt:.2f} "
+ "(error= {:.2f}, penal={:.2f}, phase= {:.2f}) "
+ f"solution= {sol._solution_nice()}",
model.getValue(o_error),
model.getValue(o_penal),
model.getValue(o_phase),
)
if str(sol) not in results:
results[str(sol)] = sol
if len(results) >= max_solutions:
break
if not results:
log.debug("[minor] solution= []")
debug_info["sol"] = []
if "ALDY_IIS" in os.environ: # Set to debug infeasible models
model.model.computeIIS()
model.dump("minor.iis.ilp")
return sorted(results.values(), key=lambda x: str(x.get_minor_diplotype()))
def _print_candidates(gene, alleles, cn_sol, coverage, muts):
"""Pretty-print the list of allele candidates and their mutations."""
def print_mut(m):
copies = coverage.single_copy(m, cn_sol)
copies = coverage[m] / copies if copies > 0 else 0
impact = gene.get_functional(m)
return (
f" {gene.get_rsid(m):12} {str(m):15} "
+ f"{gene.get_refseq(m, from_atg=True):10} "
+ f"(cov={coverage[m]:4}, cn= {copies:3.1f}"
+ (f"; impact={impact})" if impact else ")")
)
log.debug("[minor] candidate mutations=")
for m in sorted(muts):
if coverage[m]:
log.debug(print_mut(m))
log.trace("[minor] candidate alleles=")
muts = muts.copy()
for sa in natsorted(alleles, key=lambda x: (x.major, x.minor)):
am = (
set(gene.alleles[sa.major].func_muts)
| set(gene.alleles[sa.major].minors[sa.minor].neutral_muts)
| set(sa.added)
)
miss = sum(1 for a in am if coverage[a] == 0)
log.trace(f" *{sa.minor} (major= *{sa.major}; miss= {miss})")
for m in sorted(am):
if m in muts:
muts.remove(m)
log.trace(" {}", print_mut(m))
if len(muts) > 0:
log.trace(" Other mutations:")
for m in sorted(muts):
log.trace(" {}", print_mut(m))
def _print_phase(model, gene, mutations, modes, VPHASE, VKEEP, VNEW, allele, ai):
"""Pretty-print phasing results."""
rds = [
r
for ri, r in enumerate(modes.items())
if (ai, ri) in VPHASE
if model.getValue(VPHASE[ai, ri])
]
log.trace("[phase] {} (reads= {})", allele[0].minor, len(rds))
for m in sorted(mutations):
sm = f" {m} "
if m in gene.alleles[allele[0].major].func_muts:
sm += "*"
elif m in gene.alleles[allele[0].major].minors[allele[0].minor].neutral_muts:
sm += "#"
else:
sm += " "
if m in VKEEP[allele]:
sm += "K" if model.getValue(VKEEP[allele][m][0]) else "-"
elif m in VNEW[allele]:
sm += "N" if model.getValue(VNEW[allele][m][0]) else "-"
else:
sm += " "
ph = collections.defaultdict(int)
for r in rds:
dr = dict(r[0])
if m.pos in dr:
c = dr[m.pos]
if ">" in c:
c = c[2]
elif c.startswith("del"):
c = f"-{len(c)-3}"
ph[c] += r[1]
if len(ph):
p = sorted(ph.items())
log.trace(
"[phase] {}\t{}",
sm,
"; ".join(f"{o} ({n})" for o, n in p),
)