Source code for genome_entropy.orf.finder

"""ORF finder wrapper using get_orfs binary."""

import subprocess
import tempfile
import re
import sys
from pathlib import Path
from typing import Dict, List, Literal

from ..config import DEFAULT_GENETIC_CODE_TABLE, GET_ORFS_BINARY
from ..errors import OrfFinderError
from ..logging_config import get_logger
from .types import OrfRecord

logger = get_logger(__name__)


[docs] def find_orfs( sequences: Dict[str, str], table_id: int = DEFAULT_GENETIC_CODE_TABLE, min_nt_length: int = 90, binary_path: str = GET_ORFS_BINARY, ) -> List[OrfRecord]: """Find ORFs in DNA sequences using get_orfs binary. This function wraps the external get_orfs binary (https://github.com/linsalrob/get_orfs). The binary must be installed and available in PATH or specified via binary_path. Args: sequences: Dictionary mapping sequence IDs to DNA sequences table_id: NCBI genetic code table ID (default: 11, bacterial) min_nt_length: Minimum ORF length in nucleotides (default: 90) binary_path: Path to get_orfs binary (default: from config/environment) Returns: List of OrfRecord objects Raises: OrfFinderError: If get_orfs binary is not found or fails """ logger.info( "Starting ORF finding for %d sequence(s) (table=%d, min_length=%d)", len(sequences), table_id, min_nt_length, ) # Check if binary exists try: logger.debug("Checking get_orfs binary at: %s", binary_path) result = subprocess.run( [binary_path, "-v"], capture_output=True, text=True, timeout=5, ) if result.returncode != 0: raise OrfFinderError(f"get_orfs binary check failed: {result.stderr}") logger.debug("get_orfs binary check successful") except FileNotFoundError: logger.error("get_orfs binary not found at: %s", binary_path) raise OrfFinderError( f"get_orfs binary not found at: {binary_path}\n" "Please install from https://github.com/linsalrob/get_orfs\n" "or set GET_ORFS_PATH environment variable." ) except subprocess.TimeoutExpired: logger.error("get_orfs binary check timed out") raise OrfFinderError("get_orfs binary check timed out") # Write sequences to temporary FASTA file with tempfile.NamedTemporaryFile( mode="w", suffix=".fasta", delete=False ) as tmp_fasta: tmp_fasta_path = tmp_fasta.name for seq_id, sequence in sequences.items(): tmp_fasta.write(f">{seq_id}\n{sequence}\n") logger.debug("Wrote sequences to temporary file: %s", tmp_fasta_path) try: # Run get_orfs # Expected command: get_orfs -f fasta_file -t table_id -l min_length cmd = [ binary_path, "-f", tmp_fasta_path, "-t", str(table_id), "-l", str(min_nt_length), ] logger.debug("Running command: %s", " ".join(cmd)) result = subprocess.run( cmd, capture_output=True, text=True, timeout=300, # 5 minute timeout ) if result.returncode != 0: logger.error( "get_orfs failed with return code %d: %s", result.returncode, result.stderr, ) raise OrfFinderError(f"get_orfs failed: {result.stderr}") logger.debug("get_orfs completed successfully") # Parse output orfs = _parse_get_orfs_output(result.stdout, sequences, table_id) logger.info("Found %d ORF(s) in %d sequence(s)", len(orfs), len(sequences)) return orfs finally: # Clean up temporary file Path(tmp_fasta_path).unlink(missing_ok=True) logger.debug("Cleaned up temporary file: %s", tmp_fasta_path)
def _parse_orf_header_line(header: str, table_id: int) -> OrfRecord: """Parse ORF header line from get_orfs output. Supports two formats: 1. Standard: >parent_id-orf_id [parent_id frame frame_num start end] 2. Simple: >orf_id [parent_id frame frame_num start end] Args: header: FASTA header line starting with > table_id: Genetic code table ID Returns: OrfRecord object Raises: ValueError: If header format is invalid """ # Pattern for standard format: >parent_id-orf_id [parent_id frame ...] FASTA_RE_STANDARD = re.compile( r""" ^> (?P<parent_id>[^-\s]+) # JQ995537 - (?P<orf_id>orf\d+) # orf14635 \s+ \[ (?P=parent_id) # repeat accession \s+frame\s+ (?P<frame>[+-]?\d+) \s+ (?P<start>\d+) \s+ (?P<end>\d+) \] $ """, re.VERBOSE, ) # Pattern for simple format: >orf_id [parent_id frame ...] FASTA_RE_SIMPLE = re.compile( r""" ^> (?P<orf_id>orf\d+) # orf1 \s+ \[ (?P<parent_id>[^\s]+) # JQ995537 \s+frame\s+ (?P<frame>[+-]?\d+) \s+ (?P<start>\d+) \s+ (?P<end>\d+) \] $ """, re.VERBOSE, ) # Try standard format first m = FASTA_RE_STANDARD.match(header) if not m: # Try simple format m = FASTA_RE_SIMPLE.match(header) if not m: raise ValueError("Header did not match expected format") data = m.groupdict() strand: Literal["+", "-"] = "+" if int(data["frame"]) < 0: strand = "-" orf = OrfRecord( parent_id=data["parent_id"], orf_id=data["orf_id"], start=int(data["start"]), end=int(data["end"]), frame=abs(int(data["frame"])), strand=strand, nt_sequence="", aa_sequence="", table_id=table_id, has_start_codon=False, has_stop_codon=False, ) return orf
[docs] def reverse_complement(seq: str) -> str: """Return the reverse complement of a DNA sequence.""" complement = str.maketrans("ACGTacgt", "TGCAtgca") return seq.translate(complement)[::-1]
def _extract_orf_dna_sequence(sequences: Dict[str, str], current_orf: OrfRecord) -> str: """ Extract the sequence from sequences """ if current_orf.parent_id not in sequences: raise ValueError( f"We were asked to extract an orf for {current_orf.parent_id} but it is not in our sequences: {sequences.keys()}" ) if current_orf.start < 1 or current_orf.end < 1: raise ValueError("Coordinates must be >= 1") if current_orf.end < current_orf.start: raise ValueError("End coordinate precedes start coordinate") # Convert to Python slicing (0-based, end-exclusive) dna = sequences[current_orf.parent_id] if current_orf.strand == "-": dna = reverse_complement(dna) orf_seq = dna[current_orf.start - 1 : current_orf.end] return orf_seq def _parse_get_orfs_output( output: str, sequences: Dict[str, str], table_id: int ) -> List[OrfRecord]: """Parse get_orfs output into OrfRecord objects. Format: >JQ995537-orf14635 [JQ995537 frame -3 96951 97093] Args: output: stdout from get_orfs sequences: Original DNA sequences (for validation) table_id: Genetic code table used Returns: List of OrfRecord objects """ logger.debug("Parsing get_orfs output") orfs = [] current_orf = None for line in output.strip().split("\n"): try: line = line.rstrip("\r\n") if line.startswith(">"): if current_orf is not None: current_orf.nt_sequence = _extract_orf_dna_sequence( sequences, current_orf ) current_orf.has_start_codon = ( True if "M" in current_orf.aa_sequence else False ) current_orf.has_stop_codon = ( True if "*" in current_orf.aa_sequence else False ) orfs.append(current_orf) logger.debug( "Parsed ORF: %s (length=%d nt)", current_orf.orf_id, len(current_orf.nt_sequence), ) current_orf = _parse_orf_header_line(line, table_id) elif current_orf is not None: current_orf.aa_sequence += line.strip() except (ValueError, IndexError) as e: logger.error("Error handling sequence line '%s': %s", line, e) print(f"Error handling sequence {line}", file=sys.stderr) raise Exception(f"Error as {e}") # Add the last ORF if present if current_orf is not None: current_orf.nt_sequence = _extract_orf_dna_sequence(sequences, current_orf) current_orf.has_start_codon = True if "M" in current_orf.aa_sequence else False current_orf.has_stop_codon = True if "*" in current_orf.aa_sequence else False orfs.append(current_orf) logger.debug( "Parsed ORF: %s (length=%d nt)", current_orf.orf_id, len(current_orf.nt_sequence), ) logger.debug("Parsed %d ORF(s) from get_orfs output", len(orfs)) return orfs