Source code for pyhiv.align.align_with_reference

from __future__ import annotations

import logging
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from typing import Optional, Tuple, List

from pyhiv.align.famsa import pyfamsa_align
from pyhiv.loading import REFERENCE_GENOMES_FASTAS_DIR

try:
    from Bio import SeqIO
    from Bio.SeqRecord import SeqRecord
except ImportError: # pragma: no cover
    raise ImportError("BioPython is required for this module. Please install it via 'pip install biopython'.")

[docs] def process_alignment(test_seq: SeqRecord, ref_seq: SeqRecord) -> Optional[Tuple[int, str, str, str]]: """ Aligns test sequence with a reference sequence and calculates the score. Parameters ---------- test_seq: SeqRecord A BioPython SeqRecord object representing the test sequence. ref_seq: SeqRecord A BioPython SeqRecord object representing the reference sequence. Returns ------- Tuple[int, str, str, str] A tuple containing the alignment score, the aligned test sequence, the aligned reference sequence, and the reference sequence name. """ try: test_aligned, ref_aligned = pyfamsa_align(test_seq, ref_seq) score = calculate_alignment_score(test_aligned, ref_aligned) return score, test_aligned, ref_aligned, ref_seq.name except Exception as e: logging.error(f"Failed to process {ref_seq.name}: {e}") return None
[docs] def align_with_references(test_sequence: SeqRecord, references_dir: Optional[Path] = None, n_jobs: Optional[int] = None) -> Optional[Tuple[str, str, str]]: """ Aligns a test sequence with reference sequences in parallel and returns the best match. Parameters ---------- test_sequence: SeqRecord A BioPython SeqRecord object representing the test sequence. references_dir: Path, optional Path to the directory containing reference sequences. Defaults to REFERENCE_GENOMES_FASTAS_DIR, containing reference genomes for HIV-1 subtyping. n_jobs: int, optional Number of worker processes to use for parallel processing. Defaults to using all available CPU cores. Returns ------- Tuple[str, str, str] A tuple containing the test sequence, reference sequence, and the reference file name with the best alignment. """ num_workers = n_jobs or 1 references_dir = references_dir or REFERENCE_GENOMES_FASTAS_DIR if not isinstance(references_dir, Path) or not references_dir.exists(): logging.error("Invalid reference directory provided.") return None # Load reference sequences efficiently ref_sequences: List[SeqRecord] = [] for ref_file in references_dir.glob("*.fasta"): # Only process FASTA files try: with open(ref_file, "r") as handle: ref_sequences.extend(list(SeqIO.parse(handle, "fasta"))) except Exception as e: logging.error(f"Error reading {ref_file}: {e}") if not ref_sequences: logging.error("No valid reference sequences found.") return None best_alignment = None best_score = float('-inf') with ProcessPoolExecutor(max_workers=num_workers) as executor: futures = {executor.submit(process_alignment, test_sequence, ref): ref for ref in ref_sequences} for future in as_completed(futures): result = future.result() if result and result[0] > best_score: best_score = result[0] best_alignment = result[1:] return best_alignment
[docs] def calculate_alignment_score(seq1: str, seq2: str) -> int: """ Calculate the alignment score between two sequences. Parameters ---------- seq1: str First sequence to compare. seq2: str Second sequence to compare. Returns ------- int Number of positions where the sequences are equal. """ try: return sum(1 for seq1_nt, seq2_nt in zip(seq1, seq2) if seq1_nt.upper() == seq2_nt.upper() and seq1_nt != "-") except ValueError: logging.error("Sequences have different lengths, alignment might be incorrect.") return 0