Source code for aldy.gene

# 786
# Aldy source: gene.py
#   This file is subject to the terms and conditions defined in
#   file 'LICENSE', which is part of this source code package.


from typing import Tuple, Dict, List, Optional, Set, NamedTuple
from dataclasses import dataclass, field
from enum import Enum
from natsort import natsorted
import os
import yaml
import collections

from .common import (
    GRange,
    AldyException,
    allele_name,
    sorted_tuple,
    seq_to_amino,
    rev_comp,
    log,
    freezekey,
)


[docs] class Mutation(NamedTuple): """ Mutation description. Immutable. :param pos: Reference genome position (0-based). :param op: Variant operation in HGVS-like format: - `X>Y`: a SNP from X to Y - `insX`: an insertion of X - `delX`: a deletion of X (X, Y are both of format `[ACGT]+`). """ pos: int op: str def __str__(self): return f"{self.pos + 1}.{self.op}"
[docs] @dataclass class MinorAllele: """ Minor allele description. :param name: Minor allale name. :param alt_name: List of alternative names. :param neutral_muts: Netural mutations that describe the minor allele. :param activity: Activity indicator (see PharmVar for details). :param evidence: Evidence indicator (see PharmVar for details). :param pharmvar: PharmVar ID. """ name: str alt_name: Optional[str] = None neutral_muts: Set[Mutation] = field(default_factory=set) _activity: Optional[str] = "unknown" evidence: Optional[str] = None pharmvar: Optional[str] = None def __str__(self): muts = ", ".join(map(str, self.neutral_muts)) return f"Minor({self.name}; [{muts}])" @property def activity(self): return self._activity or "unknown"
[docs] @dataclass class MajorAllele: """ Major allele description. :param name: Major allele name. :param cn_config: Copy-number configuration. :param func_muts: Functional mutations that describe the major allele. :param minors: Minor alleles (suballeles) that are derived from this major allele. """ name: str cn_config: str = "1" func_muts: Set[Mutation] = field(default_factory=set) minors: Dict[str, MinorAllele] = field(default_factory=dict)
[docs] def get_minor_mutations(self, minor: str): """Sequence of mutations that describe a minor allele.""" for m in self.func_muts: yield m for m in self.minors[minor].neutral_muts: yield m
[docs] class CNConfigType(Enum): """Enumeration describing the type of a copy-number configuration""" DEFAULT = 0 LEFT_FUSION = 1 RIGHT_FUSION = 2 DELETION = 3 CUSTOM = 4
[docs] @dataclass class CNConfig: """ Copy-number (CN) configuration description. Immutable. :param cn: Value of the expected region copy number in each gene. For example, `cn[0]["e1"] == 1` means that the exon 1 of the main gene (ID 0) has one copy (and thus should be present) in the configuration `cn`. :param kind: Type of the copy-number configuration. :param alleles: Allele IDs that have this CN configuration. :param description: Human-readable description (e.g. "deletion"). """ cn: List[Dict[str, int]] kind: CNConfigType alleles: Set[str] description: str = "" @property def vector(self): return "|".join("".join(str(g[r]) for r in self.cn[0]) for g in self.cn) def __str__(self): a = " ".join(natsorted(self.alleles)) return f"CNConfig({str(self.kind)[13:]}; vector={self.vector}; alleles=[{a}])"
[docs] @dataclass class Gene: """Gene (and associated pseudogenes) description relative to a reference genome.""" # Basic information name: str """Gene name (e.g. _CYP2D6_).""" version: str """Database version.""" pharmvar: Optional[str] """PharmVar ID.""" ensembl: Optional[str] """ENSEMBL gene ID.""" # Structure refseq: str """RefSeq sequence ID.""" seq: str """RefSeq sequence (typically `*1` allele).""" genome: str """Reference genome version (e.g., hg19).""" chr: str """Reference genome chromosome.""" strand: int """RefSeq sequence strand within the reference genome.""" chr_to_ref: Dict[int, int] """Position mapping from the reference to the RefSeq sequence.""" ref_to_chr: Dict[int, int] """Position mapping from the RefSeq sequence to the reference.""" pseudogenes: List[str] """Pseudogene names. A pseudogene has ID greater than zero.""" regions: List[Dict[str, GRange]] """ Collection of regions (names and ranges) for each gene in the database. Maps a gene ID to a dictionary that maps a gene region name (e.g. `"e9"` for exon 9) to its region in the reference genome. Gene 0 is the main gene. """ exons: List[Tuple[int, int]] """List of RefSeq exon coordinates.""" aminoacid: str """Aminoacid sequence of the main gene.""" # Mutations mutations: Dict[Tuple[int, str], Tuple[Optional[str], str, int, int, str]] """Maps `(position, mutation)` to the corresponding :py:class:`Mutation`.""" random_mutations: Set[Mutation] """Set of mutations that can occur in any allele.""" do_copy_number: bool """Set if the gene is subject to structural variations.""" cn_configs: Dict[str, CNConfig] """ Copy-number configurations associated with the gene. `1` (akin to `*1`) is the default CN configuration (no structural variations present). """ unique_regions: List[str] """List of genic regions used for copy-number calling.""" # Alleles alleles: Dict[str, MajorAllele] """Major star-alleles in the gene.""" common_tandems: List[Tuple] """ List of common allele tandems. Used in diplotype assignment heuristics. For example, the fact that `*13` is always followed by `*1` (encoded as `('13', '1')`) will be used to group `*13` and `*1` together within the same haplotype (e.g. `*13+*1`). """ def __init__( self, path: Optional[str], name: Optional[str] = None, yml: Optional[str] = None, genome: Optional[str] = None, ) -> None: """ Initialize the object with the database description specified in a YAML file. :param path: Location of the YAML file. :param name: Gene name. :param yml: YML file contents (used if `path` is `None`). :param genome: Reference genome version. :raises: :py:class:`aldy.common.AldyException`: if the path is not set. """ if path and os.path.exists(path): with open(path) as file: yml = file.read() if not name: name = os.path.split(path)[-1].split(".")[0].upper() if not yml or not name: raise AldyException("Either a path or a name should be given") yml = yaml.safe_load(yml) self._yml = yml # for debugging / raw API access if genome: self.genome = genome else: self.genome = list(yml["reference"]["mappings"].keys())[0] # type: ignore self._init_basic(yml) self._init_regions(yml) self._init_alleles(yml) self._init_partials()
[docs] def region_at(self, pos: int) -> Optional[Tuple[int, str]]: """:returns: Gene ID and a region that covers the position.""" return self._region_at.get(pos, None)
def _reverse_op(self, op: str) -> str: if ">" in op: l, r = op.split(">") return f"{rev_comp(l)}>{rev_comp(r)}" elif op[:3] == "ins": return f"ins{rev_comp(op[3:])}" elif op[:3] == "del": assert "ins" not in op, "del+ins not yet supported" return f"del{rev_comp(op[3:])}" return op
[docs] def get_functional(self, mut, infer=True) -> Optional[str]: """ :returns: String describing the mutation effect if a mutation is functional; otherwise `None`. """ pos, op = mut if (pos, op) in self.mutations: return self.mutations[pos, op][0] # Calculate based on aminoacid change if pos not in self.chr_to_ref: return None pos = self.chr_to_ref[pos] if infer and any(s <= pos < e for s, e in self.exons): if ">" not in op: return "indel" if self.strand < 0: op = self._reverse_op(op) # pos -= 1 if op[2] == "N": return None if self.seq[pos] != op[0]: log.warn(f"Bad mutation: {op[0]} != {self.seq[pos]}") seq = "".join( ( self.seq[s:pos] + op[2] + self.seq[pos + 1 : e] if s <= pos < e else self.seq[s:e] ) for s, e in self.exons ) amino = seq_to_amino(seq) if amino != self.aminoacid: pos = next(i for i, a in enumerate(amino) if a != self.aminoacid[i]) return f"{self.aminoacid[pos]}{pos + 1}{amino[pos]}" return None
[docs] def is_functional(self, mut, infer=True) -> bool: """:returns: `True` if a mutation is functional.""" return self.get_functional(mut, infer) is not None
[docs] def get_rsid(self, *args, default=True) -> str: """:returns: rsID if a mutation has it; otherwise `"-"`.""" assert 1 <= len(args) <= 2 if len(args) == 2: pos, op = args else: pos, op = args[0] res = "-" if (pos, op) in self.mutations: res = self.mutations[pos, op][1] return res if res != "-" or not default else f"{pos + 1}.{op}"
[docs] def get_allele(self, name): if name in self.removed: name = self.removed[name] for an, a in self.alleles.items(): if name in a.minors: return (a, a.minors[name]) return None
[docs] def get_refseq(self, *args, from_atg=False) -> str: """:returns: `True` if a mutation is functional.""" assert 1 <= len(args) <= 2 atg_start = self.exons[0][0] if len(args) == 2: pos, op = args else: pos, op = args[0] if (pos, op) in self.mutations: pos, op = self.mutations[pos, op][3:5] if from_atg: if pos >= atg_start: pos = pos - atg_start + 1 else: pos = pos - atg_start return f"{pos + 1}{op}" return "-"
[docs] def deletion_allele(self) -> Optional[str]: """ :returns: The ID of the deletion allele (or `None` if gene has no such allele). """ try: return next( a for a, cn in self.cn_configs.items() if cn.kind == CNConfigType.DELETION ) except StopIteration: return None
[docs] def has_coverage(self, a: str, pos: int): """:returns: `True` if a major allele contains the given mutation.""" m = self.region_at(pos) if m: return self.cn_configs[self.alleles[a].cn_config].cn[m[0]][m[1]] > 0 return False
[docs] def get_wide_region(self): """:returns: The genomic region that covers all specified gene regions.""" mi = min(r.start for g in self.regions for r in g.values()) ma = max(r.end for g in self.regions for r in g.values()) return GRange(self.chr, mi, ma)
def __contains__(self, i: int): return i in self.chr_to_ref def __getitem__(self, i): s, e = self._lookup_range if isinstance(i, slice): i, j = i.start, i.stop if j <= s or i >= e: return "N" * (j - i) loff = max(0, s - i) roff = max(0, j - e) if loff: i = s if roff: j = e return ("N" * loff) + self._lookup_seq[i - s : j - s] + ("N" * roff) else: assert isinstance(i, int) if not s <= i < e: return "N" return self._lookup_seq[i - s] def __str__(self): return f"Gene({self.name})" def __repr__(self): return f"Gene({self.name})" def _init_basic(self, yml) -> None: """ Read basic gene properties (`name`, `seq` and `region`). All database YAML coordinates are indexed starting from 1. Aldy internally uses 0-based indexing. """ self.name = yml["name"] self.version = f"{yml.get('version', 'v?')} ({yml.get('generated', '-')})" self.refseq = yml["reference"]["name"] self.pharmvar = yml.get("pharmvar", None) self.ensembl = yml.get("ensembl", None) self.notes = yml.get("notes", "") self.seq = yml["reference"]["seq"].replace("\n", "") if "patches" in yml["reference"]: seq = list(self.seq) for [pos, nuc] in yml["reference"]["patches"]: seq[pos - 1] = nuc self.seq = "".join(seq) if self.genome not in yml["reference"]["mappings"]: raise AldyException("Gene {self.name} not compatible with {self.genome}") self.chr, start, end, strand, cigar = yml["reference"]["mappings"][self.genome] self.chr_to_ref = {} self.ref_to_chr = {} self.strand = 1 if strand == "+" else -1 pos_ref = 0 if self.strand > 0 else (len(self.seq) - 1) pos_chr = start - 1 for i in cigar.split(): op, sz = i[0], int(i[1:]) if op == "M": for idx in range(sz): self.chr_to_ref[pos_chr + idx] = pos_ref + idx * self.strand self.ref_to_chr[pos_ref + idx * self.strand] = pos_chr + idx pos_chr += sz pos_ref += sz * self.strand elif op == "I": pos_ref += sz * self.strand elif op == "D": pos_chr += sz else: raise AldyException("Invalid CIGAR string") self._lookup_range = (start - 1, end - 1) self._lookup_seq = "".join( ( ( rev_comp(self.seq[self.chr_to_ref[i]]) if self.strand < 0 else self.seq[self.chr_to_ref[i]] ) if i in self.chr_to_ref else "N" ) for i in range(*self._lookup_range) ) self.exons = sorted((s - 1, e - 1) for [s, e] in yml["reference"]["exons"]) self.aminoacid = seq_to_amino("".join(self.seq[s:e] for [s, e] in self.exons)) self.cpic = {} self.cpic_scores = None cpic = yml["reference"].get("cpic", {}) if "scores" in cpic and "conditions" in cpic: for fn, s in cpic["conditions"].items(): self.cpic[fn] = s self.cpic_scores = cpic["scores"] elif "table" in cpic: for fn, ll in cpic["table"].items(): self.cpic[fn] = {tuple(sorted(i)) for i in ll} def _init_regions(self, yml) -> None: """ Calculate the genic regions and pseudogenes (`regions`, `unique_regions` and `pseudogenes`). Prepare :py:method:`aldy.gene.Gene.region_at` call. """ self.regions = [] # Each pseudogene is associated with an index > 0 self.pseudogenes = [] for i, g in enumerate(yml["structure"]["genes"]): if i > 0: self.pseudogenes.append(g) regions = {} num_exons = 0 for name, coord in yml["structure"]["regions"][self.genome].items(): if name[0] == "e" and name[1:].isdigit(): # this is exon num_exons += 1 if i * 2 + 1 < len(coord) and not coord[i * 2] <= coord[i * 2 + 1]: raise AldyException( f"Malformed YML file {self.name} (structure:regions:{name})" ) regions[name] = GRange(self.chr, coord[i * 2] - 1, coord[i * 2 + 1] - 1) for e in range(1, num_exons): # fill introns r = [regions[f"e{e}"], regions[f"e{e + 1}"]][:: self.strand] regions[f"i{e}"] = GRange(self.chr, r[0][2], r[1][1]) self.regions.append( dict(sorted(regions.items(), key=lambda x: x[1])[:: self.strand]) ) #: dict[int, (int, str)]: #: reverse lookup (gene, region) of gene regions given a location #: within the reference genome. for d in self.regions: regs = sorted((r.start, r.end) for r in d.values()) for ri in range(1, len(regs)): if regs[ri - 1][1] < regs[ri][0]: raise AldyException( "Region {}-{} is not annotated", regs[ri - 1][1], regs[ri][0] ) self._region_at = { i: (g, r) for g, d in enumerate(self.regions) for r, rng in d.items() for i in range(rng.start, rng.end) } for i in self.chr_to_ref: if i not in self._region_at: raise AldyException( f"Position {i} not within a named region of {self.name}" ) self.unique_regions = yml["structure"]["cn_regions"] def _init_alleles(self, yml) -> None: """ Initialize allele (`alleles`, `common_tandems`) and copy number configurations (`cn_configs`). Initialize old notation lookup table (`old_notation`). :raise: :py:class:`aldy.common.AldyException` if allele name cannot be found. """ alleles = {} fusions_left = {} fusions_right = {} custom_cn = {} # allele ID of the deletion allele (i.e. whole gene is missing). deletion_allele = None def process_mutation(name, pos, op, info): if pos == self.name and op.startswith("deletion:"): custom_cn[name] = op[9:].split(",") elif pos in self.pseudogenes: assert pos == self.pseudogenes[0] # TODO: relax later if op[-1] == "-": fusions_left[name] = op[:-1] else: if op[-1] == "+": op = op[:-1] fusions_right[name] = op else: orig_op, orig_pos = op, pos if info == []: info = ["-"] rsid, function = info[0], info[1] if len(info) > 1 else None if self.strand < 0: if ">" in op: l, r = op.split(">") op = f"{rev_comp(l)}>{rev_comp(r)}" pos = pos + len(l) - 1 elif op[:3] == "ins": op = f"ins{rev_comp(op[3:])}" pos += 1 elif op[:3] == "del": if "ins" in op[3:]: pd, pi = op[3:].split("ins") op = f"del{rev_comp(pd)}ins{rev_comp(pi)}" pos = pos + len(pd) - 1 else: pos = pos + len(op) - 4 op = f"del{rev_comp(op[3:])}" pos -= 1 # Cast to 0-based index if pos not in self.ref_to_chr: log.warn(f"Ignoring {pos}.{op} in {name} (not in {self.refseq})") elif self.region_at(self.ref_to_chr[pos]) is None: log.warn(f"Ignoring {pos}.{op} in {name} (not in named region)") else: self.mutations.setdefault( (self.ref_to_chr[pos], op), (function, rsid, pos, orig_pos - 1, orig_op), ) yield Mutation(self.ref_to_chr[pos], op) self.mutations = {} self.yml_mutations = yml["alleles"] self.random_mutations = set() for pos, op, *info in yml["alleles"].get("random", []): if isinstance(pos, str) and pos == "ignored": continue for m in process_mutation("random", pos, op, info): self.random_mutations.add(m) self.mutation_groups = {} for name, muts in yml["alleles"].get("groups", {}).items(): self.mutation_groups[name] = set() for pos, op, *info in muts: for m in process_mutation(name, pos, op, info): self.mutation_groups[name].add(m) for name, allele in yml["alleles"].items(): if name in ["random", "groups"]: continue if allele.get("ignored", False): continue name = allele_name(name) mutations: Set[Mutation] = set() if [self.name, "deletion"] in allele["mutations"]: deletion_allele = name else: for pos, op, *info in allele["mutations"]: if isinstance(pos, str) and pos == "ignored": continue if pos == self.name and op in self.mutation_groups: log.warn(f"Ignoring {op} in {name}") continue for m in process_mutation(name, pos, op, info): mutations.add(m) alt_name = allele.get("label", None) if alt_name: alt_name = allele_name(alt_name) alleles[name] = MinorAllele( name, alt_name, mutations, allele.get("activity", "unknown"), allele.get("evidence", None), allele.get("pharmvar", None), ) self.common_tandems: List[tuple] = [] if "tandems" in yml["structure"]: self.common_tandems = [tuple(y) for y in yml["structure"]["tandems"]] # Set copy number configurations # TODO: currently fusions only cover the space between the main gene and # the first pseudogene. Multi-pseudogene fusions are not supported. self.do_copy_number = bool( deletion_allele or len(fusions_left) or len(fusions_right) or len(custom_cn) ) self.cn_configs: Dict[str, CNConfig] = dict() for i, _ in enumerate(self.pseudogenes): if list(self.regions[i + 1].keys()) != list(self.regions[0].keys()): raise AldyException("Invalid database structure") rank = {a: ai for ai, a in enumerate(self.regions[0])} inverse_cn: Dict[tuple, str] = dict() # Left fusions are PSEUDOGENE + GENE fusions for a, brk in fusions_left.items(): cn = [ {r: int(rank[r] >= rank[brk]) for r in self.regions[0]}, {r: int(rank[r] < rank[brk]) for r in self.regions[1]}, ] key = freezekey(cn) if key not in inverse_cn: self.cn_configs[a] = CNConfig( cn, CNConfigType.LEFT_FUSION, {a}, f"{self.pseudogenes[0]} fusion until {brk}", ) inverse_cn[key] = a else: self.cn_configs[inverse_cn[key]].alleles.add(a) # Deletion is a special kind of left fusion if deletion_allele is not None: cn = [{r: 0 for r in self.regions[0]}] if len(self.pseudogenes) > 0: cn.append({r: 1 for r in self.regions[1]}) self.cn_configs[deletion_allele] = CNConfig( cn, CNConfigType.DELETION, {deletion_allele}, f"{self.name} deletion", ) # Right fusions GENE + PSEUDOGENE + whole copy of PSEUDOGENE fusions for a, brk in fusions_right.items(): cn = [ {r: int(rank[r] < rank[brk]) for r in self.regions[0]}, {r: 1 + int(rank[r] >= rank[brk]) for r in self.regions[1]}, ] key = freezekey(cn) if key not in inverse_cn: self.cn_configs[a] = CNConfig( cn, CNConfigType.RIGHT_FUSION, {a}, f"{self.pseudogenes[0]} conservation after {brk}", ) inverse_cn[key] = a else: self.cn_configs[inverse_cn[key]].alleles.add(a) # Custom deletions (main gene only) for a, items in custom_cn.items(): cn = [{r: int(r not in items) for r in self.regions[0]}] if len(self.regions) > 1: cn.append({r: 1 for r in self.regions[1]}) key = freezekey(cn) if key not in inverse_cn: self.cn_configs[a] = CNConfig( cn, CNConfigType.CUSTOM, {a}, f"{self.name} deletion in {','.join(items)}", ) inverse_cn[key] = a else: self.cn_configs[inverse_cn[key]].alleles.add(a) # Normal CN case used_alleles = {a for cn in self.cn_configs.values() for a in cn.alleles} has_pseudogenes = len(self.pseudogenes) > 0 default_cn = [ {r: 1 for r in self.regions[g]} for g in range(1 + has_pseudogenes) ] self.cn_configs = {min(v.alleles): v for v in self.cn_configs.values()} self.cn_configs["1"] = CNConfig( default_cn, CNConfigType.DEFAULT, set(alleles.keys()) - used_alleles, "Standard copy-number configuration", ) # Set CN of "empty" regions to zero (e.g. CYP2D6.pce) for _, conf in self.cn_configs.items(): for g, r in enumerate(conf.cn): for rg in r: if self.regions[g][rg].end - self.regions[g][rg].start <= 0: r[rg] = 0 # Set up major and minor allele structures alleles_inverse: Dict[tuple, set] = collections.defaultdict(set) for a in alleles: cn_config = next( cn for cn, conf in self.cn_configs.items() if a in conf.alleles ) fn_muts = sorted( m for m in alleles[a].neutral_muts if self.is_functional(m) ) alleles_inverse[cn_config, tuple(fn_muts)].add(a) # Ensure that each distinct major allele has unique prefix used_names: Dict[str, int] = {} new_names = {} changed_cn_configs = {} for key, minors in alleles_inverse.items(): an = min(minors) name = an.split(".")[0] # remove ".0XX" suffix if name in used_names: # special case name = alleles[an].alt_name if alleles[an].alt_name else an if name in used_names: # Append letter used_names[name] += 1 log.debug(f"[gene] renaming {name} -> {name}:{used_names[name]}") name += f":{used_names[name]}" used_names[name] = 1 else: used_names[name] = 1 if an in self.cn_configs and an != name: self.cn_configs[name] = self.cn_configs[an] changed_cn_configs[an] = name del self.cn_configs[an] new_names[key] = name self.alleles: Dict[str, MajorAllele] = dict() for key, minors in alleles_inverse.items(): self.alleles[new_names[key]] = MajorAllele( name=new_names[key], cn_config=changed_cn_configs.get(key[0], key[0]), func_muts=set(key[1]), minors={ sa: MinorAllele( sa, alleles[sa].alt_name, set(alleles[sa].neutral_muts) - set(key[1]), alleles[sa].activity, alleles[sa].evidence, alleles[sa].pharmvar, ) for sa in natsorted(minors) }, ) def _init_partials(self) -> None: """ Construct partial major alleles. If a major allele is cut in half by a fusion, we will create a new major allele that contains the functional mutations that have survived the fusion event. .. note:: This is currently supported only for left fusions. """ def preserved_mutations(f, m): def filter_f(m): m = self.region_at(m.pos) if m: return self.cn_configs[f].cn[m[0]][m[1]] > 0 return False return set(filter(filter_f, m)) for f in filter( lambda x: self.cn_configs[x].kind == CNConfigType.LEFT_FUSION, self.cn_configs, ): # Do not extend left fusions that are already defined # by some functional mutations # HACK: This is hacky but works well for now if len(self.alleles[f].func_muts) > 0: continue add: Dict[tuple, MajorAllele] = {} for an, a in self.alleles.items(): # We only make partial major alleles from non-fused major alleles if a.cn_config != "1": continue new_name = f"{f}#{an}" new_muts = preserved_mutations(f, a.func_muts) key = sorted_tuple(new_muts) if key in add: add[key].minors.update( { f"{f}#{san}": MinorAllele( f"{f}#{san}", None, preserved_mutations(f, sa.neutral_muts), sa.activity, sa.evidence, sa.pharmvar, ) for san, sa in a.minors.items() } ) else: add[key] = MajorAllele( name=new_name, cn_config=f, func_muts=new_muts, minors={ f"{f}#{san}": MinorAllele( f"{f}#{san}", neutral_muts=preserved_mutations(f, sa.neutral_muts), ) for san, sa in a.minors.items() }, ) # Remove fusion (will be replaced at least by allele "1#{f}") del self.alleles[f] self.alleles.update({a.name: a for a in add.values()}) self.removed = {} for an, a in self.alleles.items(): # Clean up minor alleles (as many might be identical after a fusion). # Put a reference to the cleaned-up alleles in `alt_name` field. minors: Dict[tuple, List[str]] = collections.defaultdict(list) for s in a.minors: key = sorted_tuple(a.minors[s].neutral_muts) minors[key].append(s) for sa in minors.values(): if len(sa) > 1: for s in sa: if s != min(sa) and "#" not in s: self.removed[s] = min(sa) log.debug( f"Removing {self.name}*{s} as it is the same as" f" {min(sa)}" ) self.alleles[an] = MajorAllele( self.alleles[an].name, self.alleles[an].cn_config, self.alleles[an].func_muts, { min(sa): MinorAllele( min(sa), a.minors[min(sa)].alt_name, set(nm), a.minors[min(sa)].activity, a.minors[min(sa)].evidence, a.minors[min(sa)].pharmvar, ) for nm, sa in minors.items() }, ) # TODO: prune identical post-fusion alleles # (e.g. *2 after *13A and *13B is the same--- no need to have 2 items). for _, v in self.cn_configs.items(): v.alleles.clear() for a in self.alleles.values(): self.cn_configs[a.cn_config].alleles.add(a.name)