Source code for pyhiv.report.utils

"""
Utility functions for PyHIV reporting module.
"""

import ast
import re
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any

import pandas as pd

from pyhiv.report.constants import NumericOffsets, K03455Config


[docs] def ungap(seq: str) -> str: """ Remove gaps from sequence. Parameters ---------- seq : str The input sequence with gaps. Returns ------- str The ungapped sequence. """ return seq.replace("-", "").replace(".", "")
[docs] def first_last_nongap_idx(seq: str) -> Tuple[int, int]: """ Return the first and last indices of non-gap characters in a sequence. Parameters ---------- seq : str The input sequence with gaps. Returns ------- Tuple[int, int] A tuple containing the first and last indices of non-gap characters. """ if not seq or all(c in "-." for c in seq): return 0, 0 first = next((i for i, c in enumerate(seq) if c not in "-."), 0) last = len(seq) - 1 - next((i for i, c in enumerate(reversed(seq)) if c not in "-."), 0) return first, last
[docs] def read_alignment_fasta(fpath: Path) -> Tuple[str, str, str, str]: """ Read alignment FASTA file and return headers and sequences. Parameters ---------- fpath : Path Path to the alignment FASTA file. Returns ------- Tuple[str, str, str, str] A tuple containing: - Reference header - Reference sequence (aligned) - User header - User sequence (aligned) """ if not fpath.exists(): raise FileNotFoundError(f"Alignment FASTA not found: {fpath}") headers, seqs, cur, cur_header = [], [], [], None with open(fpath, "r", encoding="utf-8") as fh: for line in fh: line = line.strip() if not line: continue if line.startswith(">"): if cur_header is not None: seqs.append("".join(cur)) cur = [] cur_header = line[1:].strip() headers.append(cur_header) else: cur.append(line) if cur_header is not None: seqs.append("".join(cur)) if len(seqs) != 2: raise ValueError(f"Expected 2 sequences in {fpath}, found {len(seqs)}.") idx_ref = 0 if "reference" in headers[0].lower() else (1 if "reference" in headers[1].lower() else 0) idx_usr = 1 - idx_ref return headers[idx_ref], seqs[idx_ref], headers[idx_usr], seqs[idx_usr]
[docs] def parse_present_regions(cell: Any) -> List[str]: """ Parse present regions from a table cell into a list of region strings. Parameters ---------- cell : Any The table cell containing present regions. Returns ------- List[str] A list of present region strings. """ if cell is None: return [] cell = str(cell).strip() if not cell or cell == "-": return [] try: val = ast.literal_eval(cell) except Exception: val = cell if isinstance(val, (list, tuple)): return [str(x).strip().strip("'\"") for x in val] if isinstance(val, str): return [p.strip().strip("'\"") for p in val.split(",") if p.strip()] return []
[docs] def parse_features(cell: Any) -> Dict[str, Tuple[int, int]]: """ Parse features from table cell. Parameters ---------- cell : Any The table cell containing features. Returns ------- Dict[str, Tuple[int, int]] A dictionary mapping feature names to (start, end) tuples. """ if cell is None or (isinstance(cell, float) and pd.isna(cell)): return {} if isinstance(cell, dict): return {str(k): (int(v[0]), int(v[1])) for k, v in cell.items()} d = ast.literal_eval(str(cell)) return {str(k): (int(v[0]), int(v[1])) for k, v in d.items()}
[docs] def is_special_reference(accession: str, ref_header: str) -> bool: """ Check if reference is special (K03455). Parameters ---------- accession : str The accession number of the reference. ref_header : str The header of the reference sequence. Returns ------- bool True if the reference is K03455, False otherwise. """ return (accession or "").strip() == "K03455" or "K03455-B" in (ref_header or "")
_CANON_PATTERNS = [ (re.compile(r"^\s*5\s*'? *ltr\s*$", re.I), "5' LTR"), (re.compile(r"^\s*gag\s*$", re.I), "gag"), (re.compile(r"^\s*pol(\s*cds)?\s*$", re.I), "pol"), (re.compile(r"^\s*vif(\s*cds)?\s*$", re.I), "vif"), (re.compile(r"^\s*vpr(\s*cds)?\s*$", re.I), "vpr"), (re.compile(r"^\s*vpu(\s*cds)?\s*$", re.I), "vpu"), (re.compile(r"^\s*tat(\s*exon)?\s*(?:1|i)\s*$", re.I), "tat 1"), (re.compile(r"^\s*tat(\s*exon)?\s*(?:2|ii)\s*$", re.I), "tat 2"), (re.compile(r"^\s*rev(\s*exon)?\s*(?:1|i)\s*$", re.I), "rev 1"), (re.compile(r"^\s*rev(\s*exon)?\s*(?:2|ii)\s*$", re.I), "rev 2"), (re.compile(r"^\s*env(\s*cds)?\s*$", re.I), "env"), (re.compile(r"^\s*nef(\s*cds)?\s*$", re.I), "nef"), (re.compile(r"^\s*3\s*'? *ltr\s*$", re.I), "3' LTR"), ]
[docs] def canon_label(label: str) -> Optional[str]: """ Canonicalize gene label for K03455. Parameters ---------- label : str The input gene label. Returns ------- Optional[str] The canonical gene label, or None if not recognized. """ s = (label or "").strip() if not s: return None for pattern, canonical in _CANON_PATTERNS: if pattern.match(s): return canonical # Direct or case-insensitive match to configured target regions if s in K03455Config.TARGET_REGIONS: return s s_lower = s.lower() for target in K03455Config.TARGET_REGIONS: if target.lower() == s_lower: return target return None
[docs] def normalize_features(raw_features: Dict[str, Tuple[int, int]], special: bool) -> Dict[str, Tuple[int, int]]: """ Normalize features based on reference type. Parameters ---------- raw_features : Dict[str, Tuple[int, int]] Raw features mapping. special : bool Whether the reference is special (K03455). Returns ------- Dict[str, Tuple[int, int]] Normalized features mapping. """ raw_features = raw_features or {} if not special: return {str(k): (int(v[0]), int(v[1])) for k, v in raw_features.items()} out = {} for k, (s, e) in raw_features.items(): canon = canon_label(k) if canon in K03455Config.TARGET_REGIONS: out[canon] = (int(s), int(e)) return {k: v for k, v in out.items() if k in K03455Config.TARGET_REGIONS}
[docs] def normalize_present_regions(regions: List[str], special: bool) -> List[str]: """ Normalize present regions based on reference type. Parameters ---------- regions : List[str] List of raw present regions. special : bool Whether the reference is special (K03455). Returns ------- List[str] Normalized list of present regions. """ regions = regions or [] if not special: return regions out = [] for r in regions: canon = canon_label(r) if canon in K03455Config.TARGET_REGIONS: out.append(canon) return out
[docs] def build_ref_to_alignment_map(ref_aligned: str) -> Tuple[Dict[int, int], int]: """ Build mapping from reference coordinates to alignment coordinates. Parameters ---------- ref_aligned : str The reference sequence with alignment gaps. Returns ------- Tuple[Dict[int, int], int] A tuple containing: - A dictionary mapping reference positions to alignment indices. - The length of the aligned reference sequence. """ mapping = {} ref_pos = 0 for aln_idx, ch in enumerate(ref_aligned): if ch not in "-.": ref_pos += 1 mapping[ref_pos] = aln_idx return mapping, len(ref_aligned)
[docs] def project_features_to_alignment(features_genomic: Dict[str, Tuple[int, int]], ref_map: Dict[int, int]) -> Dict[str, Tuple[int, int]]: """ Project genomic features to alignment coordinates. Parameters ---------- features_genomic : Dict[str, Tuple[int, int]] Genomic features mapping. ref_map : Dict[int, int] Reference to alignment mapping. Returns ------- Dict[str, Tuple[int, int]] Features projected to alignment coordinates. """ projected = {} for gene, (gstart, gend) in features_genomic.items(): if gstart in ref_map and gend in ref_map: astart, aend = ref_map[gstart], ref_map[gend] if aend >= astart: projected[gene] = (astart, aend) return projected
[docs] def get_numeric_offsets_non_special(gene: str) -> tuple[float, float]: """ Get numeric offsets for non-K03455 references using NumericOffsets. Parameters ---------- gene : str The gene name. Returns ------- tuple[float, float] A tuple containing (start_offset, end_offset). """ return NumericOffsets.get_offsets(gene)
[docs] def build_alignment_path(sequence: str, alignments_dir: Path) -> Path: """ Build path to alignment FASTA file. Parameters ---------- sequence : str The name or identifier of the sequence. alignments_dir : Path The directory containing alignment FASTA files. Returns ------- Path The path to the alignment FASTA file. """ p = alignments_dir / f"best_alignment_{sequence}.fasta" return p if p.exists() else (alignments_dir / f"{sequence}.fasta")