-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathextract_data.py
More file actions
163 lines (132 loc) · 5.95 KB
/
extract_data.py
File metadata and controls
163 lines (132 loc) · 5.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import json
import os
import os.path as op
import struct
import subprocess
import sys
import time
from hgpaktool import HGPAKFile
try:
from tkinter import Tk, filedialog
has_tkinter = True
except ModuleNotFoundError:
has_tkinter = False
CFG_FOLDER = op.join(os.environ.get("APPDATA", op.expanduser("~")), "MBINCompiler-test-data")
OUTPUT_FOLDER = "data"
def update_index(fpath, new_data: dict[str, dict[str, int]]) -> dict[str, dict[str, int]]:
# Get the old data
with open(fpath, "r") as f:
existing_data = json.load(f)
# Loop over the new data, and merge it into the existing data.
for pakname, new_pak_contents in new_data.items():
# If the pak name already exists in the current data, update the dict.
if (existing_pak_data := existing_data.get(pakname)) is not None:
existing_pak_data.update(new_pak_contents)
existing_data[pakname] = existing_pak_data
# Otherwise add to the existing data.
else:
existing_data[pakname] = new_pak_contents
# Write the new data
with open(fpath, "w") as f:
json.dump(existing_data, f, indent=1)
return existing_data
def parse_index(fpath) -> set[int]:
# Read the index file and get the list of currently used namehashes.
namehashes = set()
with open(fpath, "r") as f:
data = json.load(f)
for pak_contents in data.values():
for namehash in pak_contents.values():
namehashes.add(int(namehash))
return namehashes
def extract_data(data: dict[str, dict[str, int]], pcbanks_dir: str) -> int:
count = 0
for pakname, pak_contents in data.items():
with HGPAKFile(op.join(pcbanks_dir, pakname)) as pak:
count += pak.unpack(OUTPUT_FOLDER, list(pak_contents.keys()), upper=True)
return count
if __name__ == "__main__":
if not op.exists(CFG_FOLDER):
os.makedirs(CFG_FOLDER, exist_ok=True)
# Check to see if we have the PCBANKS folder configured
do_configure = False
pcbanks_dir = None
settings_fpath = op.join(CFG_FOLDER, "settings.json")
if not op.exists(settings_fpath):
do_configure = True
if not do_configure:
with open(settings_fpath, "r") as f:
settings = json.load(f)
if isinstance(settings, dict):
if not (pcbanks_dir := settings.get("PCBANKS_dir")):
do_configure = True
else:
do_configure = True
if do_configure:
if has_tkinter:
root = Tk()
root.withdraw()
pcbanks_dir = filedialog.askdirectory(title="PCBANKS directory")
with open(settings_fpath, "w") as f:
json.dump({"PCBANKS_dir": pcbanks_dir}, f)
else:
with open(settings_fpath, "w") as f:
json.dump({"PCBANKS_dir": ""}, f)
print(
f"Tkinter not installed! Opening {CFG_FOLDER!r} and exiting. Please write fill in the "
"settings file manually."
)
subprocess.call(f'explorer "{CFG_FOLDER}"')
sys.exit(1)
if not pcbanks_dir:
print(
"There was an issue resolving your PCBANKS directory, please set it manually in the "
"settings.json file."
)
subprocess.call(f'explorer "{CFG_FOLDER}"')
sys.exit(1)
t0 = time.perf_counter()
filename_pak_map: dict[str, str] = {} # Map filenames to the pak they are in.
file_namehash_map: dict[str, int] = {} # Map filename to the namehash contained.
found_namehashes = set() # Set of found namehashes so we can determine if there are new ones.
# Extract a tonne of info from the pak files.
for pak_fname in os.listdir(pcbanks_dir):
if pak_fname.endswith(".pak"):
with HGPAKFile(op.join(pcbanks_dir, pak_fname)) as pak:
for _fname in pak.filenames:
if _fname.lower().endswith(".mbin"):
filename_pak_map[_fname] = pak_fname
# Only read the first 0x20 bytes of the mbin file since we only need the header.
for fname, _data in pak.extract(_fname, max_bytes=0x20):
namehash, guid = struct.unpack("<12xIQ8x", _data)
if namehash == 0x2E:
# Skip a broken file...
continue
file_namehash_map[fname] = namehash
found_namehashes.add(namehash)
# Parse the existing index.json file to see what we currently track in this repo.
current_namehashes = parse_index("index.json")
new_namehashes = found_namehashes - current_namehashes
# List of newly taken namehashes so that we only add each new one once.
taken_new_namehashes = set()
new_data: dict[str, dict[str, int]] = {}
# For each file in the namehash map, check to see if it's in the new set of found namehashes.
# If it is, then add the info to the new data and update.
if new_namehashes:
for fname, namehash in file_namehash_map.items():
if namehash in new_namehashes and namehash not in taken_new_namehashes:
# Get the pak the file is in and write the data to the dict.
pak_name = filename_pak_map[fname]
if pak_name in new_data:
new_data[pak_name][fname] = namehash
else:
new_data[pak_name] = {fname: namehash}
taken_new_namehashes.add(namehash)
# Update the index.
updated_data = update_index("index.json", new_data)
# Extract all the files in the index. Git will handle whether they are new or not.
num_extracted = extract_data(updated_data, pcbanks_dir)
print(f"Process complete in {time.perf_counter() - t0:.6f}s")
print("Summary:")
print(f" - Updated the index with {len(new_namehashes)} new namehashes")
print(f" - Extracted {num_extracted} files")