|
import os |
|
import sys |
|
import argparse |
|
import subprocess |
|
import zipnn |
|
from concurrent.futures import ( |
|
ProcessPoolExecutor, |
|
as_completed, |
|
) |
|
from zipnn_decompress_file import ( |
|
decompress_file, |
|
) |
|
|
|
sys.path.append( |
|
os.path.abspath( |
|
os.path.join( |
|
os.path.dirname(__file__), |
|
"..", |
|
) |
|
) |
|
) |
|
|
|
|
|
def check_and_install_zipnn(): |
|
try: |
|
import zipnn |
|
except ImportError: |
|
print("zipnn not found. Installing...") |
|
subprocess.check_call( |
|
[ |
|
sys.executable, |
|
"-m", |
|
"pip", |
|
"install", |
|
"zipnn", |
|
] |
|
) |
|
import zipnn |
|
|
|
|
|
def decompress_zpn_files( |
|
dtype="", |
|
path=".", |
|
delete=False, |
|
force=False, |
|
max_processes=1, |
|
): |
|
|
|
file_list = [] |
|
directories_to_search = [ |
|
( |
|
path, |
|
[], |
|
os.listdir(path), |
|
) |
|
] |
|
for ( |
|
root, |
|
_, |
|
files, |
|
) in directories_to_search: |
|
for file_name in files: |
|
if file_name.endswith(".znn"): |
|
decompressed_path = file_name[:-4] |
|
if not force and os.path.exists( |
|
decompressed_path |
|
): |
|
user_input = ( |
|
input( |
|
f"{decompressed_path} already exists; overwrite (y/n)? " |
|
) |
|
.strip() |
|
.lower() |
|
) |
|
if user_input not in ( |
|
"y", |
|
"yes", |
|
): |
|
print( |
|
f"Skipping {file_name}..." |
|
) |
|
continue |
|
full_path = os.path.join( |
|
root, |
|
file_name, |
|
) |
|
file_list.append(full_path) |
|
|
|
with ProcessPoolExecutor( |
|
max_workers=max_processes |
|
) as executor: |
|
for file in file_list[:max_processes]: |
|
future_to_file = { |
|
executor.submit( |
|
decompress_file, |
|
file, |
|
dtype, |
|
delete, |
|
True, |
|
): file |
|
for file in file_list[ |
|
:max_processes |
|
] |
|
} |
|
|
|
file_list = file_list[max_processes:] |
|
while future_to_file: |
|
|
|
for future in as_completed( |
|
future_to_file |
|
): |
|
file = future_to_file.pop( |
|
future |
|
) |
|
try: |
|
future.result() |
|
except Exception as exc: |
|
print( |
|
f"File {file} generated an exception: {exc}" |
|
) |
|
|
|
if file_list: |
|
next_file = file_list.pop( |
|
0 |
|
) |
|
future_to_file[ |
|
executor.submit( |
|
decompress_file, |
|
next_file, |
|
dtype, |
|
delete, |
|
True, |
|
) |
|
] = next_file |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
check_and_install_zipnn() |
|
|
|
parser = argparse.ArgumentParser( |
|
description="Compresses all .znn files. (optional) dtype." |
|
) |
|
parser.add_argument( |
|
"--float32", |
|
action="store_true", |
|
help="A flag that triggers float32 compression.", |
|
) |
|
parser.add_argument( |
|
"--path", |
|
type=str, |
|
help="Path to folder of files to decompress. If left empty, checks current folder.", |
|
) |
|
parser.add_argument( |
|
"--delete", |
|
action="store_true", |
|
help="A flag that triggers deletion of a single compressed file instead of decompression", |
|
) |
|
parser.add_argument( |
|
"--force", |
|
action="store_true", |
|
help="A flag that forces overwriting when decompressing.", |
|
) |
|
parser.add_argument( |
|
"--max_processes", |
|
type=int, |
|
help="The amount of maximum processes.", |
|
) |
|
args = parser.parse_args() |
|
optional_kwargs = {} |
|
if args.float32: |
|
optional_kwargs["dtype"] = 32 |
|
if args.path is not None: |
|
optional_kwargs["path"] = args.path |
|
if args.delete: |
|
optional_kwargs["delete"] = args.delete |
|
if args.force: |
|
optional_kwargs["force"] = args.force |
|
if args.max_processes: |
|
optional_kwargs["max_processes"] = ( |
|
args.max_processes |
|
) |
|
|
|
decompress_zpn_files(**optional_kwargs) |
|
|