# 786
# Aldy source: diplotype.py
# This file is subject to the terms and conditions defined in
# file 'LICENSE', which is part of this source code package.
from typing import List, Dict, Any
from natsort import natsorted
import collections
import re
from .common import td
from .gene import Gene, Mutation
from .coverage import Coverage
from .solutions import MinorSolution
from .version import __version__ as version
OUTPUT_COLS = [
"Sample",
"Gene",
"SolutionID",
"Major",
"Minor",
"Copy",
"Allele",
"Location",
"Type",
"Coverage",
"Effect",
"dbSNP",
"Code",
"Status",
]
"""Output column descriptions"""
[docs]
def write_decomposition(
sample: str, gene: Gene, coverage: Coverage, sol_id: int, minor: MinorSolution, f
):
"""Write an allelic decomposition to the given file.
:param sample: Sample name.
:param gene: Gene instance.
:param sol_id: Solution ID (each solution must have a different ID).
:param minor: Minor star-allele solution to be written.
:param f: Output file.
"""
for copy, a in enumerate(minor.solution):
assert a.minor
mutations = set(gene.alleles[a.major].func_muts) | set(
gene.alleles[a.major].minors[a.minor].neutral_muts
)
mutations |= set(a.added)
mutations -= set(a.missing)
items = []
if len(mutations) > 0:
for m in sorted(mutations):
fn = gene.get_functional(m, False)
items.append(
[
sample,
gene.name,
sol_id,
minor.get_major_diplotype().replace(" ", ""),
";".join(ay.minor for ay in minor.solution if ay.minor),
copy,
a.minor,
m.pos,
m.op,
coverage[m],
fn if fn else "none",
gene.get_rsid(m, default=False),
"",
]
)
else:
items.append(
[
sample,
gene.name,
sol_id,
minor.get_major_diplotype().replace(" ", ""),
";".join(ay.minor for ay in minor.solution if ay.minor),
copy,
a.minor,
"",
"",
"",
"",
"",
"",
"",
]
)
for it in items:
print("\t".join(str(i) for i in it), file=f)
[docs]
def write_vcf(
sample: str, gene: Gene, coverage: Coverage, minors: List[MinorSolution], f
):
"""
Write an allelic decomposition in the VCF format to the given file.
:param sample: Sample name.
:param gene: Gene instance.
:param sol_id: Solution ID (each solution must have a different ID).
:param minor: Minor star-allele solution to be written.
:param f: Output file.
"""
header = f"""
##fileformat=VCFv4.2
##source=aldy-v{version} (gene: {gene.name}-{gene.version})
##INFO=<ID=ANN,Number=1,Type=String,Description="Location within {gene.name}">
##INFO=<ID=TYPE,Number=1,Type=String,Description="Mutation kind">
##INFO=<ID=GENE,Number=1,Type=String,Description="Gene">
##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">
##FORMAT=<ID=DP,Number=1,Type=Integer,Description="Read Depth">
##FORMAT=<ID=MA,Number=1,Type=String,Description="Major genotype star-allele calls">
##FORMAT=<ID=MI,Number=1,Type=String,Description="Minor genotype star-allele calls">
"""
all_mutations: Dict[Mutation, List[dict]] = {
m: [collections.defaultdict(int)] * len(minors)
for minor in minors
for a in minor.solution
if a.minor
for m in set(gene.alleles[a.major].func_muts)
| set(gene.alleles[a.major].minors[a.minor].neutral_muts)
| set(a.added)
}
for mi, minor in enumerate(minors):
for ai, a in enumerate(minor.solution):
assert a.minor
mutations = set(gene.alleles[a.major].func_muts) | set(
gene.alleles[a.major].minors[a.minor].neutral_muts
)
mutations |= set(a.added)
for m in mutations:
all_mutations[m][mi][ai] = 1
print(
td(header).strip()
+ "\n"
+ "\t".join(
["#CHROM", "POS", "ID", "REF", "ALT", "QUAL", "FILTER", "INFO", "FORMAT"]
+ [
f"{sample}:{mi}:{m.get_major_diplotype().replace(' ', '')}"
for mi, m in enumerate(minors)
]
),
file=f,
)
for m in sorted(all_mutations):
ref = m.op[0] # TODO: should be genome nucleotide, not the RefSeq nucleotide?!
if m.op[1] == ">":
alt = m.op[2]
elif m.op[:3] == "ins":
alt = ref + m.op[3:]
else:
ref, alt = ".", f"{m.op[3:]}, ." # TODO: this is wrong?!
fm = gene.get_functional(m)
fm = fm.replace(" ", "_").replace("\t", "_").replace(";", "_") if fm else "none"
info = [
f"EFFECT={fm}",
"GENE=" + gene.name,
]
data = []
for mi, minor in enumerate(minors):
nall = len(minor.solution)
data.append(
{
"GT": "|".join(str(all_mutations[m][mi][i]) for i in range(nall)),
"DP": str(coverage[m]),
"MA": ",".join(
(
f"*{minor.solution[i].major}"
if all_mutations[m][mi][i] > 0
else "-"
)
for i in range(nall)
),
"MI": ",".join(
(
f"*{minor.solution[i].minor}"
if all_mutations[m][mi][i] > 0
else "-"
)
for i in range(nall)
),
}
)
pattern = (
"{chrom}\t{pos}\t{id}\t{ref}\t{alt}\t{qual}\t{filter}\t{info}\t"
+ "{format}\t{data}"
)
print(
pattern.format(
chrom=gene.chr,
pos=m.pos + 1,
id=gene.get_rsid(m, default=False),
ref=ref,
alt=alt,
qual=0,
filter="PASS",
info=";".join(info),
format=":".join(data[0].keys()),
data="\t".join(":".join(d.values()) for d in data),
),
file=f,
)
[docs]
def estimate_diplotype(gene: Gene, solution: MinorSolution) -> str:
"""Calculate the diplotype assignment for a minor solution.
Set the `diplotype` attribute of the :py:class:`aldy.solutions.MinorSolution`.
Uses the diplotype assignment heuristics to assign correct diplotypes.
This heuristics has no biological validity whatsoever---it is purely used
for pretty-printing the final solutions.
:returns: Diplotype assignment.
"""
del_allele = gene.deletion_allele()
# solution is the array of (major, minor) tuples
major_dict: Dict[str, List[int]] = collections.defaultdict(list)
for ai, a in enumerate(solution.solution):
n = str(a.major).split("#")[0] # chop off fusion suffix
components = re.split(r"(\d+)", n)
real = components[0] if components[0] != "" else components[1]
major_dict[real].append(ai)
if del_allele:
if len(solution.solution) == 0:
major_dict[del_allele].append(-1)
major_dict[del_allele].append(-1)
elif len(solution.solution) == 1:
major_dict[del_allele].append(-1)
diplotype: Any = [[], []]
dc = 0
# Handle tandems (heuristic that groups common tandems together,
# e.g. 1, 2, 13 -> 13+1/2 if [13,1] is a common tandem)
if len(solution.solution) > 2:
for ta, tb in gene.common_tandems:
while major_dict[ta] and major_dict[tb]:
diplotype[dc % 2].append((major_dict[ta][0], major_dict[tb][0]))
dc += 1
del major_dict[ta][0], major_dict[tb][0]
# Handle duplicates (heuristics that groups duplicate alleles together,
# e.g. 1, 1, 2 -> 1+1/2)
# First check should we split them (e.g. 1, 1, 1, 1 -> 1+1/1+1)?
xlen = lambda d: sum(2 if isinstance(n, tuple) else 1 for n in d) # noqa
if len(major_dict) == 1:
items = next(iter(major_dict.values()))
if len(items) % 2 == 0:
diplotype[dc % 2] += items[: len(items) // 2]
dc += 1
diplotype[dc % 2] += items[len(items) // 2 :]
dc += 1
major_dict.clear()
for _, items in major_dict.items():
if len(items) > 1:
if xlen(diplotype[dc % 2]) > xlen(diplotype[(dc + 1) % 2]):
dc += 1
diplotype[dc % 2] += items
items.clear()
dc += 1
# Handle the rest
for _, items in major_dict.items():
if items:
if xlen(diplotype[dc % 2]) > xlen(diplotype[(dc + 1) % 2]):
dc += 1
assert len(items) == 1
diplotype[dc % 2] += items
dc += 1
# Each diplotype should have at least one item
# e.g. 1, 1 -> becomes 1+1/_ due to duplicate heuristic -> fixed to 1/1
if len(diplotype[1]) == 0:
if len(diplotype[0]) > 1:
diplotype = diplotype[0][:-1], [diplotype[0][-1]]
elif len(diplotype[0]) == 1 and isinstance(diplotype[0][0], tuple):
diplotype = diplotype[0][0]
def flatten(d):
def key(x):
if isinstance(x, tuple):
return solution.get_major_name(x[0])
return solution.get_major_name(x)
for i in natsorted(d, key=key):
if isinstance(i, tuple):
yield from i
else:
yield i
# Make sure that the elements are sorted and that the tandems are grouped together
diplotype = natsorted(
[list(flatten(diplotype[0])), list(flatten(diplotype[1]))],
key=lambda x: [solution.get_major_name(y) for y in x],
)
solution.set_diplotype(diplotype)
return diplotype
[docs]
def estimate_cpic(gene: Gene, solution: MinorSolution) -> str:
"""Calculate the CPIC functionality for a minor solution.
:returns: CPIC functionality.
"""
activities = []
for ai, a in enumerate(solution.solution):
if a.extra_functionality():
activities.append("unknown")
else:
activities.append(gene.alleles[a.major].minors[a.minor].activity)
if "unknown" not in activities:
if gene.cpic_scores:
score = 0.0
for a in activities:
if a not in gene.cpic_scores:
break
score += gene.cpic_scores[a]
else:
for fn, [lo, hi] in gene.cpic.items():
if lo <= score <= hi:
return (score, fn)
else:
activities = tuple(sorted(activities))
i = [fn for fn, a in gene.cpic.items() if activities in a]
if len(i) == 1:
return (0, i[0])
return (0, "indeterminate")