forked from kurtmaia/google_covid_data
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
135 lines (119 loc) · 4.31 KB
/
utils.py
File metadata and controls
135 lines (119 loc) · 4.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import textract
import pdfplumber
import sys
from urllib.request import urlopen, urlretrieve
import numpy as np
import pandas as pd
from tqdm import tqdm
import os
import pycountry
def getRegion(subtext,var_str = ["Retail & recreation","Grocery & pharmacy","Parks","Transit stations","Workplaces","Residential"]):
percs = (subtext[2]+subtext[10]).split("compared to baseline")
percs = [p2f(f) for f in percs if len(f)>0]
ans_dict = dict(zip(var_str,percs))
return ans_dict
def getRegionPerPage(text1):
result = {}
for i in range(int(len(text1)/17)):
j = i*17
result[text1[j]] = getRegion(text1[j:(j+17)])
return result
def getRegionResults(pdfpath):
pdf = pdfplumber.open(pdfpath)
if len(pdf.pages) >3:
finalResult = {}
for p in tqdm(range(2,len(pdf.pages)-1)):
page = pdf.pages[p]
text = page.extract_text()
if (text.find("compared to baseline")):
finalResult.update(getRegionPerPage(text.split("\n")))
else:
finalResult = None
pdf.close()
return finalResult
def p2f(x):
try:
ans = float(x.strip('[% ]'))/100
except Exception as e:
print(e)
ans = float('nan')
return ans
def pdf2dict(pdfpath, var_str = ["Retail & recreation","Grocery & pharmacy","Parks","Transit stations","Workplaces","Residential"] ):
text = textract.process(pdfpath).decode()
pointerStr = "About this data"
subtext = text[(text.find(pointerStr)+len(pointerStr)):]
bStr = "compared to baseline"
arr_dict = {}
for _ in var_str:
subtext_tmp = subtext[(subtext.find(_)+len(_)):]
percNum = p2f(subtext_tmp[:(subtext_tmp.find(bStr)+len(bStr))].split("\n")[-2])
arr_dict[_] = percNum
return arr_dict
def gpdfparser(countryCode, date, region = False):
url_pdf = "https://www.gstatic.com/covid19/mobility/"+date+"_"+countryCode+"_Mobility_Report_en.pdf"
urlretrieve(url_pdf,"Googledata.pdf")
ans_country = {countryCode : pdf2dict("Googledata.pdf")}
cdt = pd.DataFrame(ans_country).T
cdt.index.names = ["country"]
cdt = cdt.reset_index()
cdt["date"] = date
ans = cdt
if region:
ans_region = getRegionResults("Googledata.pdf")
if ans_region is not None:
ans_region = pd.DataFrame(ans_region).T
ans_region.index.names = ["region"]
ans_region = ans_region.reset_index()
ans_region["country"] = countryCode
ans_region["date"] = date
ans = [cdt,ans_region]
os.remove("Googledata.pdf")
return ans
def havedata(date):
url_pdf = "https://www.gstatic.com/covid19/mobility/"+date+"_US_Mobility_Report_en.pdf"
try:
urlopen(url_pdf)
ans = True
except:
ans = False
return ans
def countryname2acronym(c):
try:
ans = pycountry.countries.search_fuzzy(str(c))[0].alpha_2
except Exception as e:
print(e)
ans = None
return ans
def get_data_from_date(countrylist,date, useAcronym=False):
countryLevel = pd.DataFrame()
regionLevel = pd.DataFrame()
for c in countrylist:
print("Parsing {} results...".format(c))
if useAcronym:
cc = c
else:
cc = countryname2acronym(c)
if cc is not None:
try:
a1,a2 = gpdfparser(cc,date,region = True)
a1["countryName"] = c
countryLevel = countryLevel.append(a1,ignore_index=True)
if a2 is not None:
a2["countryName"] = c
regionLevel = regionLevel.append(a2,ignore_index=True)
except Exception as e:
print(e)
return countryLevel, regionLevel
def get_google_data(countrylist,datelist = None, start=1,end=31, month = "03"):
countryLevel = pd.DataFrame()
regionLevel = pd.DataFrame()
if datelist is None:
datelist = ["2020-{}-{:02d}".format(month,day) for day in range(start,end+1)]
for fullday in datelist:
try:
a1, a2 = get_data_from_date(countrylist,fullday)
countryLevel = countryLevel.append(a1,ignore_index=True)
regionLevel = regionLevel.append(a2,ignore_index=True)
except Exception as e:
print("No data on {}".format(fullday))
return countryLevel, regionLevel