diff --git a/python/startup.py b/python/startup.py index 63eec96..10ca0b9 100644 --- a/python/startup.py +++ b/python/startup.py @@ -1,27 +1,79 @@ # Container for the Startup module. See the Estimates-Program wiki page for more # details: https://github.com/SANDAG/Estimates-Program/wiki/Startup +import pandas as pd import sqlalchemy as sql + import python.utils as utils +import python.tests as tests def run_startup(debug: bool): - """Control function to call the correct functions in the correct order""" - # Startup requires no input data - # Startup requires no processing of input data - _insert_outputs(debug) + """Orchestrator function to grab MGRA data, validate, and insert. + Inserts MGRA geography data from SANDAG's GeoAnalyst database into the + production database. The data could be directly inserted via a single SQL + statement but it is instead brought into Python to allow for validation + and to be written out to csv files for debugging purposes. -def _insert_outputs(debug: bool): - """Insert output data related to the Startup module""" + Functionality is segmented into functions for code encapsulation: + _get_startup_inputs - Get MGRA data from GeoAnalyst + _validate_startup_inputs - Validate MGRA data + _insert_startup_outputs - Insert MGRA data to the production database - # Skip insertion if running in debug mode - if debug: - return + Args: + debug (bool): Whether to run in debug mode + """ + mgra = _get_startup_inputs() + _validate_startup_inputs(mgra) - # Insert the MGRA geography + _insert_startup_outputs(mgra, debug) + + +def _get_startup_inputs() -> pd.DataFrame: + """Get input data related to the Startup module""" with utils.ESTIMATES_ENGINE.connect() as con: - with open(utils.SQL_FOLDER / "insert_mgra.sql") as file: - query = sql.text(file.read()) - con.execute(query, {"run_id": utils.RUN_ID, "mgra": utils.MGRA_VERSION}) - con.commit() + with open(utils.SQL_FOLDER / "startup/get_mgra.sql") as file: + mgra = pd.read_sql_query( + sql=sql.text(file.read()), + con=con, + params={ + "run_id": utils.RUN_ID, + "mgra_version": utils.MGRA_VERSION, + "insert_switch": 0, # return tabular data only + }, # type: ignore + ) + + return mgra + + +def _validate_startup_inputs(mgra: pd.DataFrame) -> None: + """Validate input data related to the Startup module""" + tests.validate_data( + "MGRA Geography", + mgra, + row_count={"key_columns": {"mgra"}}, + negative={}, + null={}, + ) + + +def _insert_startup_outputs(mgra: pd.DataFrame, debug: bool) -> None: + """Insert output data related to the Startup module""" + # Save locally if in debug mode + if debug: + mgra.to_csv(utils.DEBUG_OUTPUT_FOLDER / "inputs_mgra.csv", index=False) + else: + # Insert the MGRA geography to the database + with utils.ESTIMATES_ENGINE.connect() as con: + with open(utils.SQL_FOLDER / "startup/get_mgra.sql") as file: + query = sql.text(file.read()) + con.execute( + query, + { + "run_id": utils.RUN_ID, + "mgra_version": utils.MGRA_VERSION, + "insert_switch": 1, # write data to database + }, + ) + con.commit() diff --git a/sql/create_objects.sql b/sql/create_objects.sql index 4237610..5bb075d 100644 --- a/sql/create_objects.sql +++ b/sql/create_objects.sql @@ -33,6 +33,7 @@ CREATE TABLE [inputs].[controls_ase] ( CONSTRAINT [fk_inputs_controls_ase_run_id] FOREIGN KEY ([run_id]) REFERENCES [metadata].[run] ([run_id]), CONSTRAINT [chk_non_negative_inputs_controls_ase] CHECK ([value] >= 0) ) +GO CREATE TABLE [inputs].[controls_tract] ( [run_id] INT NOT NULL, @@ -45,6 +46,7 @@ CREATE TABLE [inputs].[controls_tract] ( CONSTRAINT [fk_inputs_controls_tract_run_id] FOREIGN KEY ([run_id]) REFERENCES [metadata].[run] ([run_id]), CONSTRAINT [chk_non_negative_inputs_controls_tract] CHECK ([value] >= 0) ) +GO CREATE TABLE [inputs].[controls_city] ( [run_id] INT NOT NULL, @@ -57,10 +59,13 @@ CREATE TABLE [inputs].[controls_city] ( CONSTRAINT [fk_inputs_controls_city_run_id] FOREIGN KEY ([run_id]) REFERENCES [metadata].[run] ([run_id]), CONSTRAINT [chk_non_negative_inputs_controls_city] CHECK ([value] >= 0) ) +GO CREATE TABLE [inputs].[mgra] ( [run_id] INT NOT NULL, [mgra] INT NOT NULL, + [2010_census_blockgroup] NVARCHAR(12) NOT NULL, + [2020_census_blockgroup] NVARCHAR(12) NOT NULL, [2010_census_tract] NVARCHAR(11) NOT NULL, [2020_census_tract] NVARCHAR(11) NOT NULL, [puma00] nvarchar(5) NOT NULL, @@ -72,6 +77,7 @@ CREATE TABLE [inputs].[mgra] ( CONSTRAINT [fk_inputs_mgra_run_id] FOREIGN KEY ([run_id]) REFERENCES [metadata].[run] ([run_id]) -- No non-negative CHECK here as these values are directly pulled from [GeoDepot] ) WITH (DATA_COMPRESSION = PAGE) +GO CREATE TABLE [inputs].[special_mgras] ( [id] INT IDENTITY(1,1), @@ -142,6 +148,7 @@ CREATE TABLE [outputs].[ase] ( CONSTRAINT [fk_outputs_ase_mgra] FOREIGN KEY ([run_id], [mgra]) REFERENCES [inputs].[mgra] ([run_id], [mgra]), CONSTRAINT [chk_non_negative_outputs_ase] CHECK ([value] >= 0) ) +GO -- For purposes of data insertion speed, only non-zero ASE data is inserted into -- [outputs].[ase]. In case you want the full table with zeros, you can use the below @@ -222,6 +229,7 @@ BEGIN AND [shell].[ethnicity] = [ase].[ethnicity] RETURN; END +GO CREATE TABLE [outputs].[gq] ( [run_id] INT NOT NULL, @@ -235,6 +243,7 @@ CREATE TABLE [outputs].[gq] ( CONSTRAINT [fk_outputs_gq_mgra] FOREIGN KEY ([run_id], [mgra]) REFERENCES [inputs].[mgra] ([run_id], [mgra]), CONSTRAINT [chk_non_negative_outputs_gq] CHECK ([value] >= 0) ) +GO CREATE TABLE [outputs].[hh] ( [run_id] INT NOT NULL, @@ -248,6 +257,7 @@ CREATE TABLE [outputs].[hh] ( CONSTRAINT [fk_outputs_hh_mgra] FOREIGN KEY ([run_id], [mgra]) REFERENCES [inputs].[mgra] ([run_id], [mgra]), CONSTRAINT [chk_non_negative_outputs_hh] CHECK ([value] >= 0) ) +GO CREATE TABLE [outputs].[hh_characteristics] ( [run_id] INT NOT NULL, @@ -261,6 +271,7 @@ CREATE TABLE [outputs].[hh_characteristics] ( CONSTRAINT [fk_outputs_hh_characteristics_mgra] FOREIGN KEY ([run_id], [mgra]) REFERENCES [inputs].[mgra] ([run_id], [mgra]), CONSTRAINT [chk_non_negative_outputs_hh_characteristics] CHECK ([value] >= 0) ) +GO CREATE TABLE [outputs].[hs] ( [run_id] INT NOT NULL, diff --git a/sql/insert_mgra.sql b/sql/insert_mgra.sql deleted file mode 100644 index 7f0df4a..0000000 --- a/sql/insert_mgra.sql +++ /dev/null @@ -1,174 +0,0 @@ -/* -For an input MGRA alias from the [GeoAnalyst].[geography].[geography] table, -return the MGRA zones, their shapes, and the one-to-many cross references to -the following geographies and insert these records into [inputs].[mgra]: - [2010_census_tract] - [2020_census_tract] - [puma00] - [puma10] - [puma20] - [cities_2020] -*/ - -DECLARE @run_id integer = :run_id; -DECLARE @mgra nvarchar(10) = :mgra; - -with [mgra] AS ( - SELECT - [zone].[zone] AS [mgra], - [zone].[shape] - FROM [GeoAnalyst].[geography].[zone] - INNER JOIN [GeoAnalyst].[geography].[geography] - ON [zone].[geography_id] = [geography].[geography_id] - WHERE [geography].[alias] = @mgra -), -[xref_2010_census_tract] AS ( - SELECT - [from_zone].[zone] AS [mgra], - [to_zone].[zone] AS [2010_census_tract] - FROM [GeoAnalyst].[geography].[xref_zone] - INNER JOIN [GeoAnalyst].[geography].[xref] - ON [xref_zone].[xref_id] = [xref].[xref_id] - INNER JOIN [GeoAnalyst].[geography].[geography] AS [from_geo] - ON [xref].[from_geography_id] = [from_geo].[geography_id] - INNER JOIN [GeoAnalyst].[geography].[geography] AS [to_geo] - ON [xref].[to_geography_id] = [to_geo].[geography_id] - INNER JOIN [GeoAnalyst].[geography].[zone] AS [from_zone] - ON [xref_zone].[from_zone_id] = [from_zone].[zone_id] - INNER JOIN [GeoAnalyst].[geography].[zone] AS [to_zone] - ON [xref_zone].[to_zone_id] = [to_zone].[zone_id] - WHERE - [from_geo].[alias] = @mgra - AND [to_geo].[alias] = '2010_census_tract' - AND CASE WHEN @mgra = 'mgra15' THEN 25 -- One to one xref between Series 15 MGRA and 2010 census tract - ELSE NULL END = [xref].[xref_id] -), -[xref_2020_census_tract] AS ( - SELECT - [from_zone].[zone] AS [mgra], - [to_zone].[zone] AS [2020_census_tract] - FROM [GeoAnalyst].[geography].[xref_zone] - INNER JOIN [GeoAnalyst].[geography].[xref] - ON [xref_zone].[xref_id] = [xref].[xref_id] - INNER JOIN [GeoAnalyst].[geography].[geography] AS [from_geo] - ON [xref].[from_geography_id] = [from_geo].[geography_id] - INNER JOIN [GeoAnalyst].[geography].[geography] AS [to_geo] - ON [xref].[to_geography_id] = [to_geo].[geography_id] - INNER JOIN [GeoAnalyst].[geography].[zone] AS [from_zone] - ON [xref_zone].[from_zone_id] = [from_zone].[zone_id] - INNER JOIN [GeoAnalyst].[geography].[zone] AS [to_zone] - ON [xref_zone].[to_zone_id] = [to_zone].[zone_id] - WHERE - [from_geo].[alias] = @mgra - AND [to_geo].[alias] = '2020_census_tract' -), -[xref_puma00] AS ( - SELECT - [from_zone].[zone] AS [mgra], - [to_zone].[zone] AS [puma00] - FROM [GeoAnalyst].[geography].[xref_zone] - INNER JOIN [GeoAnalyst].[geography].[xref] - ON [xref_zone].[xref_id] = [xref].[xref_id] - INNER JOIN [GeoAnalyst].[geography].[geography] AS [from_geo] - ON [xref].[from_geography_id] = [from_geo].[geography_id] - INNER JOIN [GeoAnalyst].[geography].[geography] AS [to_geo] - ON [xref].[to_geography_id] = [to_geo].[geography_id] - INNER JOIN [GeoAnalyst].[geography].[zone] AS [from_zone] - ON [xref_zone].[from_zone_id] = [from_zone].[zone_id] - INNER JOIN [GeoAnalyst].[geography].[zone] AS [to_zone] - ON [xref_zone].[to_zone_id] = [to_zone].[zone_id] - WHERE - [from_geo].[alias] = CASE WHEN @mgra = 'mgra15' THEN 'mgra15pt' ELSE @mgra END - AND [to_geo].[alias] = 'puma00' -), -[xref_puma10] AS ( - SELECT - [from_zone].[zone] AS [mgra], - [to_zone].[zone] AS [puma10] - FROM [GeoAnalyst].[geography].[xref_zone] - INNER JOIN [GeoAnalyst].[geography].[xref] - ON [xref_zone].[xref_id] = [xref].[xref_id] - INNER JOIN [GeoAnalyst].[geography].[geography] AS [from_geo] - ON [xref].[from_geography_id] = [from_geo].[geography_id] - INNER JOIN [GeoAnalyst].[geography].[geography] AS [to_geo] - ON [xref].[to_geography_id] = [to_geo].[geography_id] - INNER JOIN [GeoAnalyst].[geography].[zone] AS [from_zone] - ON [xref_zone].[from_zone_id] = [from_zone].[zone_id] - INNER JOIN [GeoAnalyst].[geography].[zone] AS [to_zone] - ON [xref_zone].[to_zone_id] = [to_zone].[zone_id] - WHERE - [from_geo].[alias] = CASE WHEN @mgra = 'mgra15' THEN 'mgra15pt' ELSE @mgra END - AND [to_geo].[alias] = 'puma10' -), -[xref_puma20] AS ( - SELECT - [from_zone].[zone] AS [mgra], - [to_zone].[zone] AS [puma20] - FROM [GeoAnalyst].[geography].[xref_zone] - INNER JOIN [GeoAnalyst].[geography].[xref] - ON [xref_zone].[xref_id] = [xref].[xref_id] - INNER JOIN [GeoAnalyst].[geography].[geography] AS [from_geo] - ON [xref].[from_geography_id] = [from_geo].[geography_id] - INNER JOIN [GeoAnalyst].[geography].[geography] AS [to_geo] - ON [xref].[to_geography_id] = [to_geo].[geography_id] - INNER JOIN [GeoAnalyst].[geography].[zone] AS [from_zone] - ON [xref_zone].[from_zone_id] = [from_zone].[zone_id] - INNER JOIN [GeoAnalyst].[geography].[zone] AS [to_zone] - ON [xref_zone].[to_zone_id] = [to_zone].[zone_id] - WHERE - [from_geo].[alias] = CASE WHEN @mgra = 'mgra15' THEN 'mgra15pt' ELSE @mgra END - AND [to_geo].[alias] = 'puma20' -), -[xref_cities_2020] AS ( - SELECT - [from_zone].[zone] AS [mgra], - [to_zone].[name] AS [cities_2020] - FROM [GeoAnalyst].[geography].[xref_zone] - INNER JOIN [GeoAnalyst].[geography].[xref] - ON [xref_zone].[xref_id] = [xref].[xref_id] - INNER JOIN [GeoAnalyst].[geography].[geography] AS [from_geo] - ON [xref].[from_geography_id] = [from_geo].[geography_id] - INNER JOIN [GeoAnalyst].[geography].[geography] AS [to_geo] - ON [xref].[to_geography_id] = [to_geo].[geography_id] - INNER JOIN [GeoAnalyst].[geography].[zone] AS [from_zone] - ON [xref_zone].[from_zone_id] = [from_zone].[zone_id] - INNER JOIN [GeoAnalyst].[geography].[zone] AS [to_zone] - ON [xref_zone].[to_zone_id] = [to_zone].[zone_id] - WHERE - [from_geo].[alias] = @mgra - AND [to_geo].[alias] = 'cities_2020' -) -INSERT INTO [inputs].[mgra] ( - [run_id], - [mgra], - [2010_census_tract], - [2020_census_tract], - [puma00], - [puma10], - [puma20], - [cities_2020], - [shape] -) -SELECT - @run_id AS [run_id], - CONVERT(int, [mgra].[mgra]) AS [mgra], - [2010_census_tract], - [2020_census_tract], - [puma00], - [puma10], - [puma20], - [cities_2020], - [mgra].[shape] -FROM [mgra] -INNER JOIN [xref_2010_census_tract] - ON [mgra].[mgra] = [xref_2010_census_tract].[mgra] -INNER JOIN [xref_2020_census_tract] - ON [mgra].[mgra] = [xref_2020_census_tract].[mgra] -INNER JOIN [xref_puma00] - ON [mgra].[mgra] = [xref_puma00].[mgra] -INNER JOIN [xref_puma10] - ON [mgra].[mgra] = [xref_puma10].[mgra] -INNER JOIN [xref_puma20] - ON [mgra].[mgra] = [xref_puma20].[mgra] -INNER JOIN [xref_cities_2020] - ON [mgra].[mgra] = [xref_cities_2020].[mgra] \ No newline at end of file diff --git a/sql/startup/get_mgra.sql b/sql/startup/get_mgra.sql new file mode 100644 index 0000000..9064ea4 --- /dev/null +++ b/sql/startup/get_mgra.sql @@ -0,0 +1,179 @@ +/* +For an input MGRA alias from the [GeoAnalyst].[geography].[geography] table, +return the MGRA zones, their shapes, and the one-to-many cross references to +the following geographies: + [2010_census_blockgroup] + [2020_census_blockgroup] + [2010_census_tract] + [2020_census_tract] + [puma00] + [puma10] + [puma20] + [cities_2020] + +The @insert_switch parameter acts as a switch where a value of 1 inserts data +to the [inputs].[mgra] table and a value of 0 returns the tabular result set +without the shape attribute. This is used to validate data in Python without +needing to handle the shape attribute. +*/ +SET NOCOUNT ON; +DECLARE @insert_switch BIT = :insert_switch; +DECLARE @run_id INTEGER = :run_id; +DECLARE @mgra_version NVARCHAR(10) = :mgra_version; + + +-- Get MGRA data from [GeoAnalyst] and INSERT to temporary table +DROP TABLE IF EXISTS [#inputs_mgra]; +WITH [mgra] AS ( + SELECT + [zone].[zone] AS [mgra], + [zone].[shape] + FROM [GeoAnalyst].[geography].[zone] + INNER JOIN [GeoAnalyst].[geography].[geography] + ON [zone].[geography_id] = [geography].[geography_id] + WHERE [geography].[alias] = @mgra_version +), +[xref_2010_census_blockgroup] AS ( + SELECT + [from_zone] AS [mgra], + [to_zone] AS [2010_census_blockgroup] + FROM [GeoAnalyst].[geography].[fn_xref_zones]( + CASE WHEN @mgra_version = 'mgra15' THEN 87 ELSE NULL END + ) +), +[xref_2020_census_blockgroup] AS ( + SELECT + [from_zone] AS [mgra], + [to_zone] AS [2020_census_blockgroup] + FROM [GeoAnalyst].[geography].[fn_xref_zones]( + CASE WHEN @mgra_version = 'mgra15' THEN 90 ELSE NULL END + ) +), +[xref_2010_census_tract] AS ( + SELECT + [from_zone] AS [mgra], + [to_zone] AS [2010_census_tract] + FROM [GeoAnalyst].[geography].[fn_xref_zones]( + CASE WHEN @mgra_version = 'mgra15' THEN 25 ELSE NULL END + ) +), +[xref_2020_census_tract] AS ( + SELECT + [from_zone] AS [mgra], + [to_zone] AS [2020_census_tract] + FROM [GeoAnalyst].[geography].[fn_xref_zones]( + CASE WHEN @mgra_version = 'mgra15' THEN 16 ELSE NULL END + ) +), +[xref_puma00] AS ( + SELECT + [from_zone] AS [mgra], + [to_zone] AS [puma00] + FROM [GeoAnalyst].[geography].[fn_xref_zones]( + CASE WHEN @mgra_version = 'mgra15' THEN 43 ELSE NULL END + ) +), +[xref_puma10] AS ( + SELECT + [from_zone] AS [mgra], + [to_zone] AS [puma10] + FROM [GeoAnalyst].[geography].[fn_xref_zones]( + CASE WHEN @mgra_version = 'mgra15' THEN 42 ELSE NULL END + ) +), +[xref_puma20] AS ( + SELECT + [from_zone] AS [mgra], + [to_zone] AS [puma20] + FROM [GeoAnalyst].[geography].[fn_xref_zones]( + CASE WHEN @mgra_version = 'mgra15' THEN 41 ELSE NULL END + ) +), +[xref_cities_2020] AS ( + SELECT + [from_zone] AS [mgra], + [to_name] AS [cities_2020] + FROM [GeoAnalyst].[geography].[fn_xref_zones]( + CASE WHEN @mgra_version = 'mgra15' THEN 74 ELSE NULL END + ) +) +SELECT + @run_id AS [run_id], + CONVERT(int, [mgra].[mgra]) AS [mgra], + [2010_census_blockgroup], + [2020_census_blockgroup], + [2010_census_tract], + [2020_census_tract], + [puma00], + [puma10], + [puma20], + [cities_2020], + [mgra].[shape] +INTO [#inputs_mgra] +FROM [mgra] +INNER JOIN [xref_2010_census_blockgroup] + ON [mgra].[mgra] = [xref_2010_census_blockgroup].[mgra] +INNER JOIN [xref_2020_census_blockgroup] + ON [mgra].[mgra] = [xref_2020_census_blockgroup].[mgra] +INNER JOIN [xref_2010_census_tract] + ON [mgra].[mgra] = [xref_2010_census_tract].[mgra] +INNER JOIN [xref_2020_census_tract] + ON [mgra].[mgra] = [xref_2020_census_tract].[mgra] +INNER JOIN [xref_puma00] + ON [mgra].[mgra] = [xref_puma00].[mgra] +INNER JOIN [xref_puma10] + ON [mgra].[mgra] = [xref_puma10].[mgra] +INNER JOIN [xref_puma20] + ON [mgra].[mgra] = [xref_puma20].[mgra] +INNER JOIN [xref_cities_2020] + ON [mgra].[mgra] = [xref_cities_2020].[mgra] + + +-- INSERT data into [inputs].[mgra] if @insert switch is set +-- Otherwise return the tabular data only without the shape attribute +IF @insert_switch = 1 +BEGIN + INSERT INTO [inputs].[mgra] ( + [run_id], + [mgra], + [2010_census_blockgroup], + [2020_census_blockgroup], + [2010_census_tract], + [2020_census_tract], + [puma00], + [puma10], + [puma20], + [cities_2020], + [shape] + ) + SELECT + [run_id], + [mgra], + [2010_census_blockgroup], + [2020_census_blockgroup], + [2010_census_tract], + [2020_census_tract], + [puma00], + [puma10], + [puma20], + [cities_2020], + [shape] + FROM [#inputs_mgra] + ORDER BY [mgra] +END +ELSE IF @insert_switch = 0 +BEGIN + SELECT + [run_id], + [mgra], + [2010_census_blockgroup], + [2020_census_blockgroup], + [2010_census_tract], + [2020_census_tract], + [puma00], + [puma10], + [puma20], + [cities_2020] + FROM [#inputs_mgra] + ORDER BY [mgra] +END \ No newline at end of file