-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbigquery.py
More file actions
67 lines (59 loc) · 2.57 KB
/
bigquery.py
File metadata and controls
67 lines (59 loc) · 2.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
__author__ = 'Samuel'
import pprint
import numpy as np
import json
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from oauth2client.client import GoogleCredentials
NUMPY_CONVERSIONS = {int: ('i8', int),
float: ('f8', float),
str: ('U128', str),
'json_int': (np.dtype(object), lambda x: np.array(json.loads(x)))}
def get_service():
"""returns an initialized and authorized bigquery client"""
credentials = GoogleCredentials.get_application_default()
if credentials.create_scoped_required():
credentials = credentials.create_scoped(
'https://www.googleapis.com/auth/bigquery')
return build('bigquery', 'v2', credentials=credentials)
def run_query(project, raw_query, columns_type=float):
def get_rows(response, data, i):
if response['jobComplete']:
if 'rows' in response:
for row in response['rows']:
for j, cell in enumerate(row['f']):
data[j][i] = NUMPY_CONVERSIONS[columns_type[j]][1](cell['v'])
i += 1
else:
raise RuntimeError('Query execution timeout.')
return i
job = get_service().jobs()
body = {"timeoutMs": 1000 * 300,
"kind": "bigquery#queryRequest",
"dryRun": False,
"useQueryCache": True,
"maxResults": 10000,
"query": raw_query
}
try:
response = job.query(projectId=project, body=body).execute()
results_args = {"projectId": project,
"jobId": response['jobReference']['jobId'],
"maxResults": 10000}
labels = [f['name'] for f in response['schema']['fields']]
try:
iter(columns_type)
except TypeError:
columns_type = [columns_type] * len(response['schema']['fields'])
data = []
for column_type in columns_type:
data.append(np.zeros(int(response['totalRows']), dtype=NUMPY_CONVERSIONS[column_type][0]))
start_index = 0
start_index = get_rows(response, data, start_index)
while "pageToken" in response:
results_args["pageToken"] = response['pageToken']
response = job.getQueryResults(**results_args).execute()
start_index = get_rows(response, data, start_index)
return data, labels
except HttpError as err:
print('Error in querytableData: ', pprint.pprint(err.content))