"""
Tool for converting fast5 files to the pod5 format
"""
import datetime
import functools
import logging
import multiprocessing as mp
from multiprocessing.context import SpawnContext, SpawnProcess
import os
import sys
from time import perf_counter
import warnings
from pod5.pod5_types import CompressedRead
from tqdm.auto import tqdm
import uuid
from pathlib import Path
from queue import Empty
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
import h5py
import iso8601
import more_itertools
import vbz_h5py_plugin # noqa: F401
import pod5 as p5
from pod5.signal_tools import DEFAULT_SIGNAL_CHUNK_SIZE, vbz_compress_signal_chunked
from pod5.tools.parsers import pod5_convert_from_fast5_argparser, run_tool
from pod5.tools.utils import PBAR_DEFAULTS, is_pod5_debug, iterate_inputs
READ_CHUNK_SIZE = 400
TIMEOUT_SECONDS = 600
def _init_logging():
"""Initialise logging only if POD5_DEBUG is true"""
if not is_pod5_debug():
logger = logging.getLogger()
logger.addHandler(logging.NullHandler())
else:
datetime_now = datetime.datetime.now().strftime("%Y-%m-%d--%H-%M-%S")
pid = os.getpid()
logging.basicConfig(
level=logging.DEBUG,
filename=f"{datetime_now}-{pid}-pod5-c2f5.log",
format="%(asctime)s %(levelname)s %(message)s",
)
logger = logging.getLogger()
return logger
logger = _init_logging()
[docs]def logged(log_return: bool = False, log_args: bool = False, log_time: bool = False):
"""Logging parameterised decorator"""
log_fn = logging.debug
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
uid = f"'{func.__name__}':{str(uuid.uuid4())[:8]}"
if log_args:
log_fn("{0}:{1}, {2}".format(uid, args, kwargs))
else:
log_fn("{0}".format(uid))
try:
started = perf_counter()
ret = func(*args, **kwargs)
except Exception as exc:
log_fn("{0}:Exception:{1}".format(uid, exc))
raise exc
if log_time:
duration_s = perf_counter() - started
log_fn("{0}:Done:{1:.3f}s".format(uid, duration_s))
if log_return:
log_fn("{0}:Returned:{1}".format(uid, ret))
return ret
return wrapper
return decorator
logged_all = logged(log_return=True, log_args=True, log_time=True)
[docs]@logged_all
def terminate_processes(processes: List[SpawnProcess]) -> None:
"""terminate all child processes"""
for proc in processes:
try:
proc.terminate()
except ValueError:
# Catch ValueError raised if proc is already closed
pass
return
[docs]class QueueManager:
[docs] def __init__(
self,
context: SpawnContext,
inputs: List[Path],
threads: int,
timeout: float,
) -> None:
"""Manager for balancing work queues"""
self._requests_size = threads * 2
self._inputs: mp.Queue = context.Queue(maxsize=len(inputs))
self._requests: mp.Queue = context.Queue(maxsize=self._requests_size)
self._data: mp.Queue = context.Queue()
self._exceptions: mp.Queue = context.Queue()
self._timeout = timeout
self._start(inputs=inputs)
def _await(self, queue: mp.Queue) -> Any:
"""Await the next item on a queue raising TimeoutError if failing"""
try:
item = queue.get(timeout=self._timeout)
return item
except Empty:
logger.fatal("Empty queue or timeout ")
raise TimeoutError(f"No progress in {self._timeout} seconds - quitting")
[docs] def enqueue_request(self) -> None:
self._requests.put(None, timeout=self._timeout)
[docs] def await_request(self) -> None:
"""Await a request for data"""
self._await(self._requests)
[docs] @logged()
def enqueue_data(
self, path: Optional[Path], reads: Union[List[CompressedRead], int, None]
) -> None:
"""
Enqueues an input path and either a list of compressed reads to be written, or
the total count of reads converted for that path.
Otherwise, if path is None, mark the child process as being empty.
"""
self._data.put((path, reads), timeout=self._timeout)
[docs] @logged(log_time=True)
def await_data(
self,
) -> Tuple[Optional[Path], Union[List[CompressedRead], int, None]]:
"""
Await compressed reads or the total count of reads compressed (file end) for
a input filepath. Enqueues the next request if necessary
"""
path, item = self._await(self._data)
# Check for the exhausted process sentinel value
if path is None:
return None, None
# Add another request if we received compressed reads
if isinstance(item, List):
self.enqueue_request()
return path, item
[docs] @logged(log_args=True)
def enqueue_exception(self, path: Path, exception: Exception, trace: str) -> None:
self._exceptions.put((path, exception, trace), timeout=self._timeout)
[docs] def get_exception(self) -> Optional[Tuple[Path, Exception, str]]:
"""Promptly get an exception if any"""
try:
# Use short timeout instead of get_nowait as we might call this method
# very shortly after enqueueing an exception
path, exc, trace = self._exceptions.get(timeout=0.01)
logger.exception(f"Encountered an exception in {path} - {exc}")
if trace:
logger.exception(f"Trace Exception {path}\n{trace}")
return path, exc, trace
except Empty:
pass
return None
@logged(log_return=True)
def _discard_and_close(self, queue: mp.Queue) -> int:
"""
Discard all remaining enqueued items and close a queue to nicely shutdown the
queue. Returns the number of discarded items
"""
count = 0
while True:
try:
queue.get(timeout=0.1)
count += 1
except Exception:
break
queue.close()
queue.join_thread()
return count
[docs] @logged(log_return=True)
def shutdown(self) -> Tuple[int, int, int, int]:
"""Shutdown all queues returning the counts of all remaining items"""
n_inputs = self._discard_and_close(self._inputs)
n_req = self._discard_and_close(self._requests)
n_data = self._discard_and_close(self._data)
n_exc = self._discard_and_close(self._exceptions)
if n_inputs > 0:
logger.warn("Unfinished inputs found during shutdown!")
if n_data > 0:
logger.warn("Unfinished data found during shutdown!")
if n_exc > 0:
logger.warn("Unfinished exceptions found during shutdown!")
return n_inputs, n_req, n_data, n_exc
@logged(log_args=True)
def _start(self, inputs: Iterable[Path]) -> None:
"""Enqueue all inputs for child processes to poll and set the requests size"""
for path in inputs:
if path.is_file():
self.enqueue_input(path)
for _ in range(self._requests_size):
self.enqueue_request()
[docs]class OutputHandler:
"""Class for managing p5.Writer handles"""
[docs] @logged(log_args=True)
def __init__(
self,
output_root: Path,
one_to_one: Optional[Path],
force_overwrite: bool,
):
self.output_root = output_root
self._one_to_one = one_to_one
self._force_overwrite = force_overwrite
self._input_to_output: Dict[Path, Path] = {}
self._open_writers: Dict[Path, p5.Writer] = {}
self._closed_writers: Set[Path] = set([])
def _open_writer(self, output_path: Path) -> p5.Writer:
"""Get the writer from existing handles or create a new one if unseen"""
if output_path in self._open_writers:
return self._open_writers[output_path]
if output_path in self._closed_writers:
raise FileExistsError(f"Trying to re-open a closed Writer to {output_path}")
if output_path.exists() and self._force_overwrite:
output_path.unlink()
writer = p5.Writer(output_path)
self._open_writers[output_path] = writer
return writer
[docs] def get_writer(self, input_path: Path) -> p5.Writer:
"""Get a Pod5Writer to write data from the input_path"""
if input_path not in self._input_to_output:
out_path = self.resolve_output_path(
path=input_path, root=self.output_root, relative_root=self._one_to_one
)
self._input_to_output[input_path] = out_path
output_path = self._input_to_output[input_path]
return self._open_writer(output_path=output_path)
[docs] @staticmethod
@logged_all
def resolve_one_to_one_path(path: Path, root: Path, relative_root: Path):
"""
Find the relative path between the input path and the relative root
"""
try:
relative = path.with_suffix(".pod5").relative_to(relative_root)
except ValueError as exc:
raise RuntimeError(
f"--one-to-one directory must be a relative parent of "
f"all input fast5 files. For {path} relative to {relative_root}"
) from exc
# Resolve the new final output path relative to the output directory
# This path is to a file with the equivalent filename(.pod5)
return root / relative
[docs] @staticmethod
@logged_all
def resolve_output_path(
path: Path, root: Path, relative_root: Optional[Path]
) -> Path:
"""
Resolve the output path. If relative_root is a path, resolve the relative output
path under root, otherwise, the output is either root or a new file within root
if root is a directory
"""
if relative_root is not None:
# Resolve the relative path to the one_to_one root path
out_path = OutputHandler.resolve_one_to_one_path(
path=path,
root=root,
relative_root=relative_root,
)
# Create directory structure if needed
out_path.parent.mkdir(parents=True, exist_ok=True)
return out_path
if root.is_dir():
# If the output path is a directory, the write the default filename
return root / "output.pod5"
# The provided output path is assumed to be a named file
return root
[docs] @logged()
def close_all(self):
"""Close all open writers"""
for path, writer in self._open_writers.items():
writer.close()
del writer
# Keep track of closed writers to ensure we don't overwrite our own work
self._closed_writers.add(path)
self._open_writers = {}
def __del__(self) -> None:
self.close_all()
[docs]class StatusMonitor:
"""Class for monitoring the status of the conversion"""
[docs] @logged_all
def __init__(self, paths: List[Path]):
# Estimate that a fast5 file will have 4k reads
self.path_reads = {path: 4000 for path in paths}
self.count_finished = 0
disable_pbar = not bool(int(os.environ.get("POD5_PBAR", 1)))
self.pbar = tqdm(
total=self.total_reads,
disable=disable_pbar,
desc=f"Converting {len(self.path_reads)} Fast5s",
unit="Reads",
leave=True,
**PBAR_DEFAULTS,
)
@property
def total_files(self) -> int:
return len(self.path_reads)
@property
def total_reads(self) -> int:
return sum(self.path_reads.values())
[docs] @logged(log_args=True)
def increment_reads(self, n: int) -> None:
"""Increment the reads status by n"""
self.pbar.update(n)
[docs] @logged(log_args=True)
def update_reads_total(self, path: Path, total: int) -> None:
"""Increment the reads status by n and update the total reads"""
self.path_reads[path] = total
self.pbar.total = self.total_reads
self.pbar.refresh()
[docs] @logged(log_args=True)
def write(self, msg: str, file: Any) -> None:
"""Write runtime message to avoid clobbering tqdm pbar"""
self.pbar.write(msg, file=file)
[docs] @logged()
def close(self) -> None:
"""Close the progress bar"""
self.pbar.close()
[docs]@logged_all
def is_multi_read_fast5(path: Path) -> bool:
"""
Assert that the given path points to a a multi-read fast5 file for which
direct-to-pod5 conversion is supported.
"""
try:
with h5py.File(path) as _h5:
# The "file_type" attribute might be present on supported multi-read fast5 files.
if _h5.attrs.get("file_type") == "multi-read":
return True
# No keys, assume multi-read but there shouldn't be anything to do which would
# cause an issue so pass silently
if len(_h5) == 0:
return True
# if there are "read_x" keys, this is a multi-read file
if any(key for key in _h5 if key.startswith("read_")):
return True
except Exception:
pass
return False
[docs]def decode_str(value: Union[str, bytes]) -> str:
"""Decode a h5py utf-8 byte string to python string"""
if isinstance(value, str):
return value
return value.decode("utf-8")
[docs]def convert_fast5_end_reason(fast5_end_reason: int) -> p5.EndReason:
"""
Return an EndReason instance from the given end_reason integer from a fast5 file.
This will handle the difference between fast5 and pod5 values for this enumeration
and set the default "forced" value for each fast5 enumeration value.
"""
# Expected fast5 enumeration:
# end_reason_dict = {
# "unknown": 0,
# "partial": 1, <-- Not used in pod5
# "mux_change": 2, <-- Remaining values are offset by +1
# "unblock_mux_change": 3,
# "data_service_unblock_mux_change": 4,
# "signal_positive": 5,
# "signal_negative": 6,
# }
# (0:unknown | 1:partial) => pod5 (0:unknown)
if fast5_end_reason < 2:
return p5.EndReason.from_reason_with_default_forced(p5.EndReasonEnum.UNKNOWN)
# Resolve the offset in enumeration values between both files
p5_scaled_end_reason = fast5_end_reason - 1
return p5.EndReason.from_reason_with_default_forced(
p5.EndReasonEnum(p5_scaled_end_reason)
)
[docs]def convert_datetime_as_epoch_ms(time_str: Optional[str]) -> datetime.datetime:
"""Convert the fast5 time string to timestamp"""
epoch = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=datetime.timezone.utc)
if time_str is None:
return epoch
try:
return iso8601.parse_date(decode_str(time_str))
except iso8601.iso8601.ParseError:
return epoch
[docs]def convert_run_info(
acq_id: str,
adc_max: int,
adc_min: int,
sample_rate: int,
context_tags: Dict[str, str],
device_type: str,
tracking_id: Dict[str, str],
) -> p5.RunInfo:
"""Create a Pod5RunInfo instance from parsed fast5 data"""
return p5.RunInfo(
acquisition_id=acq_id,
acquisition_start_time=convert_datetime_as_epoch_ms(
tracking_id["exp_start_time"]
),
adc_max=adc_max,
adc_min=adc_min,
context_tags={
str(key): decode_str(value) for key, value in context_tags.items()
},
experiment_name="",
flow_cell_id=decode_str(tracking_id.get("flow_cell_id", b"")),
flow_cell_product_code=decode_str(
tracking_id.get("flow_cell_product_code", b"")
),
protocol_name=decode_str(tracking_id["exp_script_name"]),
protocol_run_id=decode_str(tracking_id["protocol_run_id"]),
protocol_start_time=convert_datetime_as_epoch_ms(
tracking_id.get("protocol_start_time", None)
),
sample_id=decode_str(tracking_id["sample_id"]),
sample_rate=sample_rate,
sequencing_kit=decode_str(context_tags.get("sequencing_kit", b"")),
sequencer_position=decode_str(tracking_id.get("device_id", b"")),
sequencer_position_type=decode_str(tracking_id.get("device_type", device_type)),
software="python-pod5-converter",
system_name=decode_str(tracking_id.get("host_product_serial_number", b"")),
system_type=decode_str(tracking_id.get("host_product_code", b"")),
tracking_id={str(key): decode_str(value) for key, value in tracking_id.items()},
)
[docs]def convert_fast5_read(
fast5_read: h5py.Group,
run_info_cache: Dict[str, p5.RunInfo],
signal_chunk_size: int = DEFAULT_SIGNAL_CHUNK_SIZE,
) -> p5.CompressedRead:
"""
Given a fast5 read parsed from a fast5 file, return a pod5.Read object.
"""
channel_id = fast5_read["channel_id"]
raw = fast5_read["Raw"]
attrs = fast5_read.attrs
# Get the acquisition id
if "run_id" in attrs:
acq_id = decode_str(attrs["run_id"])
else:
acq_id = decode_str(fast5_read["tracking_id"].attrs["run_id"])
# Create new run_info if we've not seen this acquisition id before
if acq_id not in run_info_cache:
adc_min = 0
adc_max = 2047
device_type_guess = "promethion"
if channel_id.attrs["digitisation"] == 8192:
adc_min = -4096
adc_max = 4095
device_type_guess = "minion"
# Add new run_info to cache
run_info_cache[acq_id] = convert_run_info(
acq_id=acq_id,
adc_max=adc_max,
adc_min=adc_min,
sample_rate=int(channel_id.attrs["sampling_rate"]),
context_tags=dict(fast5_read["context_tags"].attrs),
device_type=device_type_guess,
tracking_id=dict(fast5_read["tracking_id"].attrs),
)
# Process attributes unique to this read
read_id = uuid.UUID(decode_str(raw.attrs["read_id"]))
pore = p5.Pore(
channel=int(channel_id.attrs["channel_number"]),
well=raw.attrs["start_mux"],
pore_type=decode_str(attrs.get("pore_type", b"not_set")),
)
calibration = p5.Calibration.from_range(
offset=channel_id.attrs["offset"],
adc_range=channel_id.attrs["range"],
digitisation=channel_id.attrs["digitisation"],
)
end_reason = convert_fast5_end_reason(raw.attrs.get("end_reason", 0))
# Signal conversion process
signal = raw["Signal"][()]
signal_chunks, signal_chunk_lengths = vbz_compress_signal_chunked(
signal, signal_chunk_size
)
return p5.CompressedRead(
read_id=read_id,
pore=pore,
calibration=calibration,
read_number=raw.attrs["read_number"],
start_sample=raw.attrs["start_time"],
median_before=raw.attrs["median_before"],
num_minknow_events=raw.attrs.get("num_minknow_events", 0),
tracked_scaling=p5.pod5_types.ShiftScalePair(
raw.attrs.get("tracked_scaling_shift", float("nan")),
raw.attrs.get("tracked_scaling_scale", float("nan")),
),
predicted_scaling=p5.pod5_types.ShiftScalePair(
raw.attrs.get("predicted_scaling_shift", float("nan")),
raw.attrs.get("predicted_scaling_scale", float("nan")),
),
num_reads_since_mux_change=raw.attrs.get("num_reads_since_mux_change", 0),
time_since_mux_change=raw.attrs.get("time_since_mux_change", 0.0),
end_reason=end_reason,
run_info=run_info_cache[acq_id],
signal_chunks=signal_chunks,
signal_chunk_lengths=signal_chunk_lengths,
)
[docs]def get_read_from_fast5(group_name: str, h5_file: h5py.File) -> Optional[h5py.Group]:
"""Read a group from a h5 file ensuring that it's a read"""
if not group_name.startswith("read_"):
return None
try:
return h5_file[group_name]
except KeyError as exc:
# Observed strange behaviour where h5py reports a KeyError with
# the message "Unable to open object". Report a failed read as warning
warnings.warn(
f"Failed to read key {group_name} from {h5_file.filename} : {exc}",
)
return None
[docs]def convert_fast5_file_chunk(
queues: QueueManager,
handle: h5py.File,
chunk: Iterable[str],
cache: Dict[str, p5.RunInfo],
signal_chunk_size: int,
) -> List[CompressedRead]:
reads: List[p5.CompressedRead] = []
# Allow request queue to throttle work
queues.await_request()
try:
for group_name in chunk:
f5_read = get_read_from_fast5(group_name, handle)
if f5_read is None:
continue
read = convert_fast5_read(f5_read, cache, signal_chunk_size)
reads.append(read)
except Exception as exc:
# Ensures that requests aren't exhausted
queues.enqueue_request()
raise exc
return reads
[docs]@logged_all
def convert_fast5_file(
path: Path,
queues: QueueManager,
signal_chunk_size: int = DEFAULT_SIGNAL_CHUNK_SIZE,
) -> int:
"""Convert the reads in a fast5 file"""
run_info_cache: Dict[str, p5.RunInfo] = {}
total_reads: int = 0
with h5py.File(str(path), "r") as _f5:
for chunk in more_itertools.chunked(_f5.keys(), READ_CHUNK_SIZE):
reads = convert_fast5_file_chunk(
queues, _f5, chunk, run_info_cache, signal_chunk_size
)
queues.enqueue_data(path, reads)
total_reads += len(reads)
return total_reads
[docs]@logged()
def issue_not_multi_read_exception(path: Path, queues: QueueManager):
logger.error(f"Input {path.name} is not a multi-read fast5")
queues.enqueue_exception(
path=path,
exception=TypeError(f"{path} is not a multi-read fast5 file."),
trace="",
)
logger.info(f"Enqueueing file end: {path.name} reads: 0")
queues.enqueue_data(path, 0)
[docs]@logged(log_time=True)
def convert_fast5_files(
queues: QueueManager,
signal_chunk_size: int = DEFAULT_SIGNAL_CHUNK_SIZE,
) -> None:
"""
Main function for converting fast5s available in queues.
Collections of converted reads are emplaced on the data_queue for writing in
the main process.
"""
while True:
path = queues.get_input()
if path is None:
logger.info("Inputs exhausted. Closing Process")
break
if not is_multi_read_fast5(path):
issue_not_multi_read_exception(path, queues)
continue
try:
total_reads = convert_fast5_file(path, queues, signal_chunk_size)
logger.info(f"Enqueueing file end: {path.name} reads: {total_reads}")
queues.enqueue_data(path, total_reads)
except Exception as exc:
import traceback
logger.error(f"Enqueueing exception: {path.name} {exc}")
queues.enqueue_exception(path, exc, traceback.format_exc())
logger.info("Enqueue sentinel")
queues.enqueue_data(None, None)
[docs]@logged(log_args=True)
def handle_exception(
exception: Tuple[Path, Exception, str],
output_handler: OutputHandler,
status: StatusMonitor,
strict: bool,
) -> None:
path, exc, trace = exception
status.write(str(exc), sys.stderr)
output_handler.set_input_complete(path)
if strict:
logger.fatal("Exception raised and --strict set")
logger.debug(f"trace: {trace}")
status.close()
raise exc
[docs]@logged_all
def process_conversion_tasks(
queues: QueueManager,
output_handler: OutputHandler,
status: StatusMonitor,
strict: bool,
threads: int,
) -> None:
"""Work through the queues of data until all work is done"""
count_complete_processes = 0
while count_complete_processes < threads:
# Always poll exceptions to ensure they're handled
exception = queues.get_exception()
if exception is not None:
handle_exception(
exception=exception,
output_handler=output_handler,
status=status,
strict=strict,
)
continue
path, data = queues.await_data()
# Handle exhausted processes
if path is None:
# Processed finished sentinel
count_complete_processes += 1
logger.info(
f"Got process end sentinel {count_complete_processes} of {threads}"
)
continue
# Update the progress bar with the total number of converted reads in the file
if isinstance(data, int):
status.update_reads_total(path, data)
output_handler.set_input_complete(path)
continue
# Write the incoming list of converted reads
writer = output_handler.get_writer(path)
logger.info(f"Writing {len(data)} reads to {path.name} using {writer}")
writer.add_reads(data)
status.increment_reads(len(data))
status.close()
[docs]@logged(log_time=True)
def convert_from_fast5(
inputs: List[Path],
output: Path,
recursive: bool = False,
threads: int = 10,
one_to_one: Optional[Path] = None,
force_overwrite: bool = False,
signal_chunk_size: int = DEFAULT_SIGNAL_CHUNK_SIZE,
strict: bool = False,
) -> None:
"""
Convert fast5 files found (optionally recursively) at the given input Paths
into pod5 file(s). If one_to_one is a Path then the new pod5 files are
created in a new relative directory structure within output relative to the the
one_to_one Path.
"""
if output.is_file() and not force_overwrite:
raise FileExistsError(
"Output path points to an existing file and --force-overwrite not set"
)
if len(output.parts) > 1:
output.parent.mkdir(parents=True, exist_ok=True)
pending_fast5s = list(iterate_inputs(inputs, recursive, "*.fast5"))
if not pending_fast5s:
logger.fatal(f"Found no *.fast5 files in inputs: {inputs}")
raise RuntimeError("Found no fast5 inputs to process - Exiting")
output_handler = OutputHandler(output, one_to_one, force_overwrite)
status = StatusMonitor(pending_fast5s)
threads = min(threads, len(pending_fast5s))
ctx = mp.get_context("spawn")
queues = QueueManager(
context=ctx,
inputs=pending_fast5s,
threads=threads,
timeout=TIMEOUT_SECONDS,
)
active_processes = []
for _ in range(threads):
process = ctx.Process(
target=convert_fast5_files,
args=(queues, signal_chunk_size),
daemon=True,
)
process.start()
active_processes.append(process)
try:
process_conversion_tasks(
queues=queues,
output_handler=output_handler,
status=status,
strict=strict,
threads=threads,
)
queues.shutdown()
for proc in active_processes:
proc.join()
proc.close()
except Exception as exc:
status.write(f"An unexpected error occurred: {exc}", file=sys.stderr)
terminate_processes(active_processes)
raise exc
finally:
output_handler.close_all()
logger.disabled = True
[docs]def main():
"""Main function for pod5_convert_from_fast5"""
run_tool(pod5_convert_from_fast5_argparser())
if __name__ == "__main__":
main()