Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ psycopg
selenium
webdriver-manager
sphinx
pre-commit
pre-commit
usaddress
47 changes: 47 additions & 0 deletions src/stac_utils/address.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@

import usaddress
import logging

# logging
logger = logging.getLogger(__name__)

def parse_address(address: str) -> dict:
"""
Parse a full address string into components.

Args:
address: A full address string (e.g., "123 Internet St, City, ST 12345")

Returns:
dict: Contains keys like 'street_address', 'city', 'state', 'zip' as available.
"""
try:
parsed, address_type = usaddress.tag(address)
result = {}

# Map usaddress fields to street address components
street_parts = []
street_keys = [
'AddressNumber', 'StreetNamePreDirectional', 'StreetNamePreModifier',
'StreetNamePreType', 'StreetName', 'StreetNamePostType',
'StreetNamePostDirectional', 'SubaddressType', 'SubaddressIdentifier',
'OccupancyType', 'OccupancyIdentifier'
]
for key in street_keys:
if key in parsed:
street_parts.append(parsed[key])

if street_parts:
result['street_address'] = ' '.join(street_parts)
if 'PlaceName' in parsed:
result['city'] = parsed['PlaceName']
if 'StateName' in parsed:
result['state'] = parsed['StateName']
if 'ZipCode' in parsed:
result['zip'] = parsed['ZipCode']

return result

except Exception as e:
logger.error(f"Failed to parse address '{address}': {e}")
return {}
28 changes: 28 additions & 0 deletions src/stac_utils/convert.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re

from typing import Any, Optional

def _convert(camel_input: str) -> str:
# from https://stackoverflow.com/a/46493824
Expand Down Expand Up @@ -35,3 +36,30 @@ def strip_dict(full_dict: dict):
:param full_dict: dict to clean up
:return: dict without None values"""
return {k: v for k, v in full_dict.items() if v is not None}

def get_first_value(row: dict, keys: list[str]) -> tuple[Optional[Any], Optional[str]]:
"""
Get the first non-None/non-empty value from a list of possible keys.

Returns:
tuple: (value, key_used) or (None, None) if no value found.
"""
for key in keys:
value = row.get(key)
if value is not None and value != "":
return value, key
return None, None


def get_all_values(row: dict, keys: list[str]) -> dict[str, Any]:
"""
Get all non-None/non-empty values from a list of possible keys.

Returns:
dict: {key: value} for all keys that have values.
"""
return {
key: row.get(key)
for key in keys
if row.get(key) is not None and row.get(key) != ""
}
104 changes: 86 additions & 18 deletions src/stac_utils/ngpvan.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import os
import requests

from .convert import convert_to_snake_case, strip_dict
from .listify import listify
from .address import parse_address

from .convert import convert_to_snake_case, strip_dict, get_first_value, get_all_values
from .http import HTTPClient

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -178,36 +181,101 @@ def format_person_json(row: dict, id_key: str, has_identifier: bool) -> dict:
else:
print("No ID key used")

if row.get("email"):
formatted_json["emails"] = [{"email": row.get("email").strip()}]
emails = None

if row.get("emails"):
emails = listify(str(row.get("emails").strip()))
else:
email_keys = ["email", "email_address"]
email_value, _ = get_first_value(row, email_keys)

if email_value:
emails = listify(str(email_value).strip())

if emails:
formatted_json["emails"] = [{"email": e.strip()} for e in emails]

phones = None

if row.get("phone"):
if row.get("phones"):
phones = listify(str(row.get("phones").strip()))
else:
phone_keys = ["phone", "phone_number"]
phone_value, _ = get_first_value(row, phone_keys)

if phone_value:
phones = listify(str(phone_value).strip())

if phones:
formatted_json["phones"] = [
{"phoneNumber": str(row.get("phone")).replace(".0", "")}
{"phoneNumber": p.replace(".0", "")} for p in phones
]

if row.get("middle_name"):
formatted_json["middleName"] = row.get("middle_name")

if row.get("suffix"):
formatted_json["suffix"] = row.get("suffix")

address = {}

if row.get("street_address"):
address["addressLine1"] = row.get("street_address")
street_address_keys = ["street_address", "address", "address1", "address_1"]
street_values = get_all_values(row, street_address_keys)

if row.get("city"):
address["city"] = row.get("city")
if len(street_values) > 1:
logger.warning(
f"Multiple street address fields provided: {list(street_values.keys())}. "
f"Using first found value."
)

if row.get("state") or row.get("stateOrProvince"):
if row.get("state"):
address["stateOrProvince"] = row.get("state")
else:
address["stateOrProvince"] = row.get("stateOrProvince")
street_address, _ = get_first_value(row, street_address_keys)

has_city = row.get("city")
has_state = row.get("state") or row.get("stateOrProvince")
has_zip = row.get("zip") or row.get("zipOrPostalCode")

if street_address and not (has_city or has_state or has_zip):
parsed = parse_address(str(street_address))

if parsed.get('street_address'):
address["addressLine1"] = parsed['street_address']

if parsed.get('city'):
address["city"] = parsed['city']

if parsed.get('state'):
address["stateOrProvince"] = parsed['state']

if parsed.get('zip'):
address["zipOrPostalCode"] = parsed['zip']
else:
if street_address:
address["addressLine1"] = street_address

if row.get("city"):
address["city"] = row.get("city")

if row.get("state") or row.get("stateOrProvince"):
address["stateOrProvince"] = row.get("state") or row.get("stateOrProvince")

if row.get("zip") or row.get("zipOrPostalCode"):
address["zipOrPostalCode"] = row.get("zip") or row.get("zipOrPostalCode")

if row.get("street_address"):
address["addressLine1"] = row.get("street_address")

# Handle addressLine2 (multiple aliases supported)
address2_keys = ["address2", "address_2", "street_address_2"]
address2_value, address2_key = get_first_value(row, address2_keys)

if row.get("zip") or row.get("zipOrPostalCode"):
if row.get("zip"):
address["zipOrPostalCode"] = row.get("zip")
if address2_value:
if not address.get("addressLine1"):
logger.error(
f"addressLine2 field '{address2_key}' provided without addressLine1. "
f"Value: '{address2_value}'"
)
else:
address["zipOrPostalCode"] = row.get("zipOrPostalCode")
address["addressLine2"] = address2_value

if address:
formatted_json["addresses"] = [address]
Expand Down
Loading