Files
Aerofoil/bulkimport.py

418 lines
14 KiB
Python

import sys
import os
import subprocess
import zipfile
import shutil
import io
import json
debug_preserve_osx_dir = False
debug_preserve_temps = True
debug_preserve_resources = True
debug_preserve_qt = False
def invoke_command(process_path, args, output_lines=None):
print("Running " + str(process_path) + " with " + str(args))
args_concatenated = [process_path + ".exe"] + args
if output_lines != None:
completed_process = subprocess.run(args_concatenated, capture_output=True)
else:
completed_process = subprocess.run(args_concatenated)
if completed_process.returncode != 0:
print("Process crashed/failed with return code " + str(completed_process.returncode))
return False
if output_lines != None:
output_lines.clear()
output_lines.extend(completed_process.stdout.decode("utf-8", "ignore").splitlines(False))
return True
def recursive_scan_dir(out_paths, dir_path):
dir = os.scandir(dir_path)
for entry in dir:
if entry.is_dir():
recursive_scan_dir(out_paths, entry.path)
if entry.is_file():
out_paths.append(entry.path)
def decompress_zip(ftagdata_path, source_path, ts_path, decompress_path):
with zipfile.ZipFile(source_path, "r") as zfile:
zfile.extractall(decompress_path)
file_names = []
recursive_scan_dir(file_names, decompress_path)
for path in file_names:
if not invoke_command(ftagdata_path, [ts_path, path + ".gpf", "DATA", "DATA", "0", "0"]):
return False
os.replace(path, path + ".gpd")
return True
def fixup_macos_dir(ftagdata_path, asadtool_path, ts_path, dir_path, osx_path):
contents = []
recursive_scan_dir(contents, osx_path)
print("recursive_scan_dir results: " + str(contents))
for content_path in contents:
osx_rel_path = os.path.relpath(content_path, osx_path)
osx_rel_dir, osx_rel_file = os.path.split(osx_rel_path)
if osx_rel_file.startswith("._") and osx_rel_file.endswith(".gpd"):
out_path = os.path.join(dir_path, osx_rel_dir, osx_rel_file[2:-4])
if not invoke_command(asadtool_path, [content_path, ts_path, out_path]):
return False
return True
def recursive_fixup_macosx_dir(ftagdata_path, asadtool_path, ts_path, dir_path):
osx_path = os.path.join(dir_path, "__MACOSX")
if os.path.isdir(osx_path):
if not fixup_macos_dir(ftagdata_path, asadtool_path, ts_path, dir_path, osx_path):
print("fixup_macos_dir failed?")
return False
if not debug_preserve_osx_dir:
shutil.rmtree(osx_path)
dir = os.scandir(dir_path)
for entry in dir:
if entry.is_dir():
if not recursive_fixup_macosx_dir(ftagdata_path, asadtool_path, ts_path, entry.path):
return False
return True
def convert_movies(tools_dir, dir_path):
contents = []
recursive_scan_dir(contents, dir_path)
for content_path in contents:
print("convert_movies content path: " + content_path)
if content_path.endswith(".mov.gpf"):
if not os.path.isfile(content_path[:-4] + ".gpd"):
# Res-only movie, probably only contains external references, a.k.a. unusable
os.remove(content_path)
if os.path.isfile(content_path[:-4] + ".gpr"):
os.remove(content_path[:-4] + ".gpr")
else:
content_dir = os.path.dirname(content_path)
mov_path = content_path[:-4]
res_path = mov_path + ".gpr"
data_path = mov_path + ".gpd"
if os.path.isfile(res_path):
if not invoke_command(os.path.join(tools_dir, "flattenmov"), [data_path, res_path, mov_path]):
return False
if not debug_preserve_qt:
os.remove(res_path)
os.remove(data_path)
else:
if os.path.isfile(mov_path):
os.remove(mov_path)
os.rename(data_path, mov_path)
probe_lines = []
if not invoke_command(os.path.join(tools_dir, "ffprobe"), ["-show_streams", mov_path], probe_lines):
return False
v_index = None
v_fps_num = None
v_fps_denom = None
a_index = None
a_nbframes = None
a_sample_rate = None
current_fps = None
current_index = None
current_type = None
current_nbframes = None
current_sample_rate = None
is_stream = False
for l in probe_lines:
if is_stream:
if l == "[/STREAM]":
print("Closing stream: " + str(current_type) + " " + str(current_index) + " " + str(current_fps) + " " + str(current_nbframes) + " " + str(current_sample_rate))
if current_type == "video" and current_index != None and current_fps != None:
fps_list = current_fps.split("/")
v_index = current_index
v_fps_num = fps_list[0]
v_fps_denom = fps_list[1]
if current_type == "audio" and current_index != None and current_nbframes != None and current_sample_rate != None:
a_index = current_index
a_nbframes = current_nbframes
a_sample_rate = current_sample_rate
current_fps = None
current_index = None
current_type = None
current_nbframes = None
current_sample_rate = None
is_stream = False
elif l.startswith("codec_type="):
current_type = l[11:]
elif l.startswith("index="):
current_index = l[6:]
elif l.startswith("r_frame_rate="):
current_fps = l[13:]
elif l.startswith("nb_frames="):
current_nbframes = l[10:]
elif l.startswith("sample_rate="):
current_sample_rate = l[12:]
elif l == "[STREAM]":
current_fps_num = None
current_fps_denom = None
current_index = None
current_type = None
is_stream = True
wav_path = None
if a_index != None:
sample_rate_int = int(a_sample_rate)
target_sample_rate = "22254"
if sample_rate_int == 11025 or sample_rate_int == 44100:
target_sample_rate = "22050"
elif sample_rate_int < 22000 or sample_rate_int > 23000:
target_sample_rate = a_sample_rate
wav_path = os.path.join(content_dir, "0.wav")
if not invoke_command(os.path.join(tools_dir, "ffmpeg"), ["-y", "-i", mov_path, "-ac", "1", "-ar", target_sample_rate, "-c:a", "pcm_u8", wav_path]):
return False
if v_index != None:
if not invoke_command(os.path.join(tools_dir, "ffmpeg"), ["-y", "-i", mov_path, os.path.join(content_dir, "%d.bmp")]):
return False
if a_index != None or v_index != None:
with zipfile.ZipFile(mov_path + ".gpa", "w") as vid_archive:
metaf = io.StringIO()
if v_index != None:
metaf.write("{\n")
metaf.write("\t\"frameRateNumerator\" : " + v_fps_num + ",\n")
metaf.write("\t\"frameRateDenominator\" : " + v_fps_denom + "\n")
metaf.write("}\n")
else:
metaf.write("{\n")
metaf.write("\t\"frameRateNumerator\" : " + a_nbframes + ",\n")
metaf.write("\t\"frameRateDenominator\" : " + a_sample_rate + "\n")
metaf.write("}\n")
vid_archive.writestr("muvi/0.json", metaf.getvalue(), compress_type=zipfile.ZIP_DEFLATED, compresslevel=9)
if v_index != None:
frame_num = 1
bmp_name = str(frame_num) + ".bmp"
bmp_path = os.path.join(content_dir, bmp_name)
while os.path.isfile(bmp_path):
vid_archive.write(bmp_path, arcname=("PICT/" + bmp_name), compress_type=zipfile.ZIP_DEFLATED, compresslevel=9)
os.remove(bmp_path)
frame_num = frame_num + 1
bmp_name = str(frame_num) + ".bmp"
bmp_path = os.path.join(content_dir, bmp_name)
if a_index != None:
vid_archive.write(wav_path, arcname=("snd$20/0.wav"), compress_type=zipfile.ZIP_DEFLATED, compresslevel=9)
os.remove(wav_path)
if not debug_preserve_qt:
os.remove(mov_path)
return True
def reprocess_children(source_paths, dir_path):
reprocess_extensions = [ "sea", "bin", "hqx", "zip", "cpt", "sit" ]
contents = []
recursive_scan_dir(contents, dir_path)
for ext in reprocess_extensions:
full_ext = "." + ext + ".gpf"
for content_path in contents:
if content_path.endswith(full_ext):
truncated_path = content_path[:-4]
data_path = truncated_path + ".gpd"
if os.path.isfile(data_path):
os.rename(data_path, truncated_path)
source_paths.append(truncated_path)
print("Requeueing subpath " + truncated_path)
return True
def convert_resources(tools_dir, ts_path, qt_convert_dir, dir_path):
contents = []
recursive_scan_dir(contents, dir_path)
for content_path in contents:
if content_path.endswith(".gpr"):
if not invoke_command(os.path.join(tools_dir, "gpr2gpa"), [content_path, ts_path, content_path[:-4] + ".gpa", "-dumpqt", qt_convert_dir]):
return False
qt_convert_contents = []
recursive_scan_dir(qt_convert_contents, qt_convert_dir)
converted_pict_ids = []
# Convert inline QuickTime PICT resources
for convert_content_path in qt_convert_contents:
if convert_content_path.endswith(".mov"):
if not invoke_command(os.path.join(tools_dir, "ffmpeg"), ["-y", "-i", convert_content_path, convert_content_path[:-4] + ".bmp"]):
return False
os.remove(convert_content_path)
converted_pict_ids.append(os.path.basename(convert_content_path[:-4]))
if len(converted_pict_ids) > 0:
print("Reimporting converted QuickTime PICTs")
qt_convert_json_path = os.path.join(dir_path, "qt_convert.json")
convert_dict = { }
convert_dict["delete"] = []
convert_dict["add"] = { }
for pict_id in converted_pict_ids:
convert_dict["add"]["PICT/" + pict_id + ".bmp"] = os.path.join(qt_convert_dir, pict_id + ".bmp")
with open(qt_convert_json_path, "w") as f:
json.dump(convert_dict, f)
if not invoke_command(os.path.join(tools_dir, "gpr2gpa"), [content_path, ts_path, content_path[:-4] + ".gpa", "-patch", qt_convert_json_path]):
return False
for pict_id in converted_pict_ids:
os.remove(os.path.join(qt_convert_dir, pict_id + ".bmp"))
os.remove(qt_convert_json_path)
if not debug_preserve_resources:
os.remove(content_path)
return True
def scoop_files(tools_dir, output_dir, dir_path):
mergegpf_path = os.path.join(tools_dir, "MergeGPF")
contents = []
recursive_scan_dir(contents, dir_path)
scooped_files = []
for content_path in contents:
if content_path.endswith(".gpf"):
is_house = False
with zipfile.ZipFile(content_path, "r") as zfile:
meta_contents = None
with zfile.open("!!meta", "r") as metafile:
meta_contents = metafile.read()
if meta_contents[0] == 103 and meta_contents[1] == 108 and meta_contents[2] == 105 and meta_contents[3] == 72:
is_house = True
if is_house:
if not invoke_command(mergegpf_path, [content_path]):
return False
scooped_files.append(content_path)
mov_path = content_path[:-4] + ".mov.gpf"
if os.path.isfile(mov_path):
if not invoke_command(mergegpf_path, [mov_path]):
return False
scooped_files.append(mov_path)
for scoop_path in scooped_files:
os.replace(scoop_path, os.path.join(output_dir, os.path.basename(scoop_path)))
return True
class ImportContext:
def __init__(self):
pass
def run(self):
os.makedirs(self.qt_convert_dir, exist_ok=True)
os.makedirs(self.output_dir, exist_ok=True)
invoke_command(self.make_timestamp_path, [self.ts_path])
print("Looking for input files in " + self.source_dir)
source_paths = []
recursive_scan_dir(source_paths, self.source_dir)
pending_result_directories = []
result_dir_index = 0
while len(source_paths) > 0:
source_path = source_paths[0]
source_paths = source_paths[1:]
unpack_dir = os.path.join(self.output_dir, str(len(pending_result_directories)))
try:
os.mkdir(unpack_dir)
except FileExistsError as error:
pass
print("Attempting to unpack " + source_path)
decompressed_ok = False
should_decompress = True
if source_path.endswith(".zip"):
decompressed_ok = decompress_zip(self.ftagdata_path, source_path, self.ts_path, unpack_dir)
elif source_path.endswith(".sit") or source_path.endswith(".cpt") or source_path.endswith(".sea"):
decompressed_ok = invoke_command(os.path.join(self.tools_dir, "unpacktool"), [source_path, self.ts_path, unpack_dir, "-paranoid"])
elif source_path.endswith(".bin"):
decompressed_ok = invoke_command(os.path.join(self.tools_dir, "bin2gp"), [source_path, self.ts_path, os.path.join(unpack_dir, os.path.basename(source_path[:-4]))])
elif source_path.endswith(".hqx"):
decompressed_ok = invoke_command(os.path.join(self.tools_dir, "hqx2gp"), [source_path, self.ts_path, os.path.join(unpack_dir, os.path.basename(source_path[:-4]))])
else:
should_decompress = False
if should_decompress and not decompressed_ok:
return
if decompressed_ok:
pending_result_directories.append(unpack_dir)
while result_dir_index < len(pending_result_directories):
if not self.process_dir(pending_result_directories, result_dir_index, source_paths):
return
result_dir_index = result_dir_index + 1
# Clear temporaries
if not debug_preserve_temps:
for dir_path in pending_result_directories:
shutil.rmtree(dir_path)
def process_dir(self, all_dirs, dir_index, source_paths):
root = all_dirs[dir_index]
print("Processing directory " + root)
if not recursive_fixup_macosx_dir(self.ftagdata_path, os.path.join(self.tools_dir, "ASADTool"), self.ts_path, root):
return False
if not convert_movies(self.tools_dir, root):
return False
if not convert_resources(self.tools_dir, self.ts_path, self.qt_convert_dir, root):
return False
if not reprocess_children(source_paths, root):
return False
if not scoop_files(self.tools_dir, self.output_dir, root):
return False
return True
def main():
import_context = ImportContext()
#script_dir = sys.argv[0]
#source_dir = sys.argv[1]
import_context.source_dir = "C:\\Users\\Eric\\Downloads\\gliderfiles\\archives"
#output_dir = sys.argv[2]
import_context.output_dir = "C:\\Users\\Eric\\Downloads\\gliderfiles\\converted"
import_context.qt_convert_dir = os.path.join(import_context.output_dir, "qtconvert")
import_context.tools_dir = "D:\\src\\GlidePort\\x64\\Release"
import_context.make_timestamp_path = os.path.join(import_context.tools_dir, "MakeTimestamp")
import_context.ts_path = os.path.join(import_context.output_dir, "Timestamp.ts")
import_context.ftagdata_path = os.path.join(import_context.tools_dir, "FTagData")
import_context.run()
main()