-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy path3_create_admin_features.py
More file actions
201 lines (156 loc) · 7.43 KB
/
3_create_admin_features.py
File metadata and controls
201 lines (156 loc) · 7.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import os
import argparse
import yaml
import pandas as pd
import geopandas as gpd
from functools import reduce
from feature_extraction.area_extractor import extract_admin_boundary_area_km2
from feature_extraction.osm_data_extractor import extract_all_osm_features
# TODO: TEMP renaming fix
DIR_NAME_MAPPING = {
'landcover' : 'LandsatStats',
'landsat' : 'LandCoverRatios',
'annualntl' : 'AnnualNTL_Stats',
'vida_buildings' : 'VidaBuildings'
}
def merge_dataframes(dataframes, admin_level):
"""
Merge a list of DataFrames on specified admin level columns.
Args:
dataframes (list): List of DataFrames to merge.
admin_level (int): Admin level for column specification.
Returns:
DataFrame: Merged DataFrame.
"""
merge_key = f"ADM{admin_level}_PCODE"
name_col = f"ADM{admin_level}_PT"
# Ensure only the first dataframe retains the ADM*_PT column
for i in range(1, len(dataframes)):
if name_col in dataframes[i].columns:
dataframes[i] = dataframes[i].drop(columns=[name_col])
return reduce(lambda left, right: pd.merge(
left, right,
on=[merge_key],
how='outer'), dataframes)
def prepare_boundaries_gdf(path, admin_level, original_admin_code_column, original_admin_name_column):
"""
Prepare administrative boundaries GeoDataFrame with standardized column names.
Parameters:
path (str): Path to the file containing the geospatial data.
admin_level (int): The administrative level to customize the column names.
original_admin_code_column (str): The original column name for administrative code.
original_admin_name_column (str): The original column name for administrative name.
Returns:
GeoDataFrame: A GeoDataFrame with renamed administrative code and name columns.
"""
# Load the geospatial data into a GeoDataFrame
admin_boundaries_gdf = gpd.read_file(path)
print(f"Columns for boundary file: {admin_boundaries_gdf.columns}")
admin_X_mappings = {
"code_column": "ADMX_PCODE", # Template code column
"name_column": "ADMX_PT" # Template name column
}
# Create new column names for the given administrative level
new_admin_code_column = admin_X_mappings["code_column"].replace("X", str(admin_level))
new_admin_name_column = admin_X_mappings["name_column"].replace("X", str(admin_level))
# Rename the original columns to the new standardised column names
admin_boundaries_gdf = admin_boundaries_gdf.rename(columns={
original_admin_code_column: new_admin_code_column,
original_admin_name_column: new_admin_name_column
})
return admin_boundaries_gdf
def read_features(base_dir, keyword, year, admin_level):
"""
Reads and merges all feature data files from a specified directory structure.
Args:
base_dir (str): The base directory where data is stored.
keyword (str): The keyword to determine the specific data type.
year (int): The year of the data to read.
admin_level (int): The administrative level of the data.
Returns:
DataFrame: A pandas DataFrame with the merged features,
excluding 'system:index' and '.geo' columns.
"""
# Construct the directory path based on input parameters
feature_dir_name = DIR_NAME_MAPPING[keyword]
features_dir = os.path.join(base_dir, feature_dir_name, str(year), f"adm{admin_level}")
# List all files in the directory
files = [os.path.join(features_dir, f) for f in os.listdir(features_dir) if f.endswith(".csv")]
if not files:
raise FileNotFoundError(f"No CSV files found in {features_dir}")
# Load and concatenate all CSV files
df_list = [pd.read_csv(f) for f in files]
merged_features = pd.concat(df_list, ignore_index=True)
# Drop specified columns if they exist
return merged_features.drop(columns=[c for c in ['system:index', '.geo'] if c in merged_features.columns])
def extract_features_for_administrative_unit(config):
"""
Extracts and merges features for a specified administrative unit.
Args:
config (dict): Configuration dictionary containing:
- admin_information: Dict with admin_level, boundary_path,
admin_code_column, admin_name_column.
- data_information: Dict with base_dir, year,
output_extracted_features_dir.
"""
# Extract admin information from config
admin_level = config['admin_information']['admin_level']
boundary_file_path = config['admin_information']['boundary_path']
admin_code_column = config['admin_information']['admin_code_column']
admin_name_column = config['admin_information']['admin_name_column']
# Extract data information from config
data_base_dir = config['data_information']['base_dir']
data_year = config['data_information']['year']
# Prepare output directory and file path
output_dir = config['data_information']['output_extracted_features_dir']
os.makedirs(output_dir, exist_ok=True)
output_fname = f"admin_{admin_level}_extracted_features.csv"
output_fpath = os.path.join(output_dir, output_fname)
# Load admin boundaries
admin_boundaries_gdf = prepare_boundaries_gdf(
boundary_file_path, admin_level, admin_code_column, admin_name_column)
print(f"Extracting features for each admin level {admin_level} unit...")
# Obtain area (km²) of each admin unit
print(f"Calculating area (km2) for each admin level {admin_level} unit")
admin_areas = extract_admin_boundary_area_km2(admin_boundaries_gdf, admin_level)
print("Extracting gee features for each admin level {admin_level} unit")
# TODO: move to config
feature_keywords = ['landcover', 'landsat', 'annualntl', 'vida_buildings']
feature_data = {}
# Read features for each keyword
for feature_keyword in feature_keywords:
data = read_features(data_base_dir, feature_keyword, data_year, admin_level)
feature_data[feature_keyword] = data
print("Extracting features from OSM maps")
# Extract OSM features
osm_shapefile_directory_path = os.path.join(data_base_dir, f'osm/{data_year}')
extracted_osm_data = extract_all_osm_features(
osm_shapefile_directory_path, admin_boundaries_gdf, admin_level)
print("Merging extracted features into one csv file")
# Merge all dataframes
feature_dataframes = [
admin_areas, *feature_data.values(), extracted_osm_data
]
merged_data = merge_dataframes(feature_dataframes, admin_level)
# Drop the count column if it exists
if 'count' in merged_data.columns:
merged_data = merged_data.drop(columns=['count'])
#TODO: bring fix to appropriate data download script
# Rename ambiguous NTL columns
merged_data = merged_data.rename(columns={
'mean': 'NTL_mean',
'stdDev': 'NTL_stdDev'
})
# Save merged data to CSV
merged_data.to_csv(output_fpath, index=False)
print(f"Admin features saved to {output_fpath}")
if __name__ == "__main__":
# Set up argument parser for managing command-line inputs
parser = argparse.ArgumentParser(description="Process configuration file")
parser.add_argument('config_file', type=str,
help='Path to the configuration YAML file')
# Parse the command-line arguments
args = parser.parse_args()
with open(args.config_file, 'r') as file:
config = yaml.safe_load(file)
extract_features_for_administrative_unit(config)