Source code for pod5.tools.pod5_repack

"""
Tool for repacking pod5 files to potentially improve performance
"""
from concurrent.futures import ProcessPoolExecutor, as_completed
import os
import sys
import typing
from pathlib import Path
from pod5.tools.utils import PBAR_DEFAULTS
from tqdm.auto import tqdm

import pod5 as p5
import pod5.repack
from pod5.tools.parsers import prepare_pod5_repack_argparser, run_tool


[docs]def repack_pod5_file(src: Path, dest: Path): """Repack the source pod5 file into dest""" repacker = pod5.repack.Repacker() with p5.Reader(src) as reader: with p5.Writer(dest) as writer: # Add all reads to the repacker repacker_output = repacker.add_output(writer) repacker.add_all_reads_to_output(repacker_output, reader) repacker.wait(show_pbar=False, finish=True)
[docs]def repack_pod5( inputs: typing.List[Path], output: Path, threads: int, force_overwrite: bool ): """Given a list of pod5 files, repack their contents and write files 1-1""" # Create output directory if required if not output.is_dir(): output.mkdir(parents=True, exist_ok=True) # Remove existing files if required for input_filename in inputs: output_filename = output / input_filename.name if output_filename.exists(): if output_filename == input_filename: print(f"Refusing to overwrite {input_filename} inplace") sys.exit(1) if force_overwrite: output_filename.unlink() else: print("Refusing to overwrite output without --force-overwrite") sys.exit(1) disable_pbar = not bool(int(os.environ.get("POD5_PBAR", 1))) futures = {} with ProcessPoolExecutor(max_workers=threads) as executor: pbar = tqdm( total=len(inputs), disable=disable_pbar, unit="Files", **PBAR_DEFAULTS ) for src in inputs: dest = output / src.name futures[executor.submit(repack_pod5_file, src=src, dest=dest)] = dest for future in as_completed(futures): tqdm.write(f"Finished {futures[future]}") pbar.update(1) pbar.close() print("Done")
[docs]def main(): run_tool(prepare_pod5_repack_argparser())
if __name__ == "__main__": main()