"""
Parsers for blast output formats 6 (CSV) and 7 (CSV with comments between queries).
"""
import logging
from collections import namedtuple
from typing import List
log = logging.getLogger(__name__) # pylint: disable=invalid-name
[docs]def reader(fileobj, t: int=7) -> 'BlastParser':
"""
Creates a reader for files in BLAST format
>>> with open(blast_file) as infile:
>>> reader = blast.reader(infile)
>>> for hit in reader:
>>> print(hit)
Args:
fileobj: iterable yielding lines in blast format
t: number of blast format type
"""
if t == 7:
return Fmt7Parser(fileobj)
elif t == 6:
return Fmt6Parser(fileobj)
else:
raise NotImplementedError()
[docs]def writer(fileobj, t: int=7) -> 'BlastWriter':
"""
Creates a writer for files in BLAST format
>>> with open(blast_file) as outfile:
>>> writer = blast.writer(outfile)
>>> for hit in hits:
>>> writer.write_hit(hit)
"""
if t == 7:
return Fmt7Writer(fileobj)
else:
raise NotImplementedError()
[docs]class BlastBase(object):
"Base class for BLAST readers and writers"
[docs] def tupleofint(text):
if text == "N/A":
return tuple()
try:
return tuple(int(i) for i in text.split(';'))
except ValueError:
log.warning(f"Error parsing BLAST file at line='{text}'")
return tuple()
#: Map between field short and long names
FIELD_MAP = {
#"": "qseqid", # Query Seq-id
#"": "qgi", # Query GI
"query acc.": "qacc", # Query accession
#"": "qaccver", # Query accession.version
"query length": "qlen", # Query sequence length
#"": "sseqid", # Subject Seq-id
#"": "sallseqid", # All subject Seq-id(s), separated by ';'
#"": "sgi", # Subject GI
#"": "sallgi", # All subject GIs
"subject acc.": "sacc", # Subject accession
#"": "saccver", # Subject accession.version
#"": "sallacc", # All subject accessions
#"": "slen", # Subject sequence length
"q. start": "qstart", # Start of alignment in query
"q. end": "qend", # End of alignment in query
"s. start": "sstart", # Start of alignment in subject
"s. end": "send", # Start of alignment in query
#"": "qseq", # Aligned part of query sequence
#"": "sseq", # Aligned part of subject sequence
"evalue": "evalue", # Expect value
"bit score": "bitscore", # Bit score
"score": "score", # Raw score
"alignment length": "length", # Alignment length
"% identity": "pident", # Percentage of identical matches
"mismatches": "mismatch", # Number of mismatches
#"": "positive", # Number of positive-scoring matches
"gap opens": "gapopen", # Number of gap openings
#"": "gaps", # Total number of gaps
#"": "ppos", # Percentage of positive-soring matches
#"": "frames", # Query and subject frames separated by a '/'
"query frame": "qframe", # Query frame
"sbjct frame": "sframe", # Subject frame
#"": "btop", # Blast traceback operations (BTOP)
#"": "staxid", # Subject Taxonomy ID
#"": "scciname", # Subject Scientifi Name
#"": "scomname", # Subject Common Name
#"": "sblastname", # Subject Blast Name
#"": "sskingdom", # Subject Super Kingdom
"subject tax ids": "staxids", # sorted unique ';'-separated Subject Taxonomy ID(s)
#"": "sscinames", # unique Subject Scientific Name(s)
#"": "scomnames", # unique Subject Common Name(s)
#"": "sblastnames", # unique Subject Blast Name(s)
#"": "sskingdoms", # unique Subject Super Kingdom(s)
"subject title": "stitle", # Subject Title
#"": "sakktutkes", # All Subject Title(s) separated by '<>'
"subject strand": "sstrand", # Subject Strand
#"": "qcovs", # Query Coverage per Subject
#"": "qcovhsp", # Query Coverage per HSP
#"": "qcovus", # Query Coverage per Unique Subject (blastn only)
}
#: Reversed map from short to long name
FIELD_REV_MAP = {
value: key
for key, value in FIELD_MAP.items()
}
#: Map defining types of fields
FIELD_TYPE = {
'pident': float,
'length': int,
'mismatch': int,
'gapopen': int,
'qstart': int,
'qend': int,
'qlen': int,
'sstart': int,
'send': int,
'evalue': float,
'bitscore': float,
'score': float,
'sframe': int,
'qframe': int,
'stitle': str,
'staxids': tupleofint
}
[docs]class BlastParser(BlastBase):
"""Base class for BLAST readers"""
[docs] def get_fields(self):
raise NotImplementedError()
def __iter__(self):
raise NotImplementedError()
[docs]class BlastWriter(BlastBase):
"""Base class for BLAST writers"""
[docs] def write_hit(self, hit):
raise NotImplementedError()
[docs]class Fmt7Parser(BlastParser):
"""
Parses BLAST results in format '7' (CSV with comments)
"""
PAT_FIELDS = "# Fields: "
PAT_QUERY = "# Query: "
PAT_DATABASE = "# Database: "
PAT_HITSFOUND = " hits found"
def __init__(self, fileobj):
self.fileobj = fileobj
self.fields = None
self.query = "undefined"
self.database = "undefined"
if "BLAST" not in fileobj.readline():
raise ValueError("not a BLAST7 formatted file")
[docs] def get_fields(self) -> List[str]:
"""Returns list of available field names
Format 7 specifies which columns it contains in comment lines, allowing
this parser to be agnostic of the selection of columns made when running
BLAST.
Returns:
List of field names (e.g. ``['sacc', 'qacc', 'evalue']``)
"""
return self.fields
def __iter__(self):
for line in self.fileobj:
if line.startswith(self.PAT_FIELDS):
self.fields = [
self.FIELD_MAP[field]
if field in self.FIELD_MAP else field
for field in line[len(self.PAT_FIELDS):].strip().split(", ")
]
self.Hit = namedtuple("BlastHit", self.fields)
elif line.startswith(self.PAT_QUERY):
self.query = line[len(self.PAT_QUERY):].strip()
elif line.startswith(self.PAT_DATABASE):
self.database = line[len(self.PAT_DATABASE):].strip()
elif line.strip().endswith(self.PAT_HITSFOUND):
self.hits = int(line.split()[1])
self.hit = 0
elif line[0] == "#":
continue
else:
self.hit += 1
yield self.Hit(*[
self.FIELD_TYPE[key](value)
if key in self.FIELD_TYPE else value
for key, value in zip(self.fields,
line.strip().split('\t'))
])
[docs] def isfirsthit(self) -> bool:
"""Returns `True` if the current hit is the first hit for the current
query"""
return self.hit == 1
[docs]class Fmt6Parser(BlastParser):
"""Parser for BLAST format 6 (CSV)
"""
#: Default field types
fields = ("qseqid sseqid pident length mismatch gapopen "
"qstart qend sstart send evalue bitscore").split()
field_types = [BlastParser.FIELD_TYPE.get(n, None) for n in fields ]
Hit = namedtuple("BlastHit", fields)
def __init__(self, fileobj):
self.fileobj = fileobj
[docs] def get_fields(self):
return self.fields
def __iter__(self):
for line in self.fileobj:
yield self.Hit(*[t(v) if t else v
for v, t in zip(line.split("\t"),
self.field_types)])
[docs]class Fmt7Writer(BlastWriter):
def __init__(self, fileobj):
self.fileobj = fileobj
self.toolname = "YMP writer " + ymp.version
self.query = None
self.database = "undefined"
self.fields = "undefined"
self.hits = []
def __enter__(self):
return self
def __exit__(self, exc_type, ext_value, tb):
self.write_hitset()
[docs] def write_hitset(self):
self.query = self.hits[0].qacc
self.fields = self.hits[0]._fields
self.write_header()
self.fileobj.write(f"# {len(self.hits)} found")
[docs] def write_hit(self, hit):
if self.hits and hit.qacc != self.hits[0].qacc:
self.write_hitset()
self.hits.append(hit)