forked from RafalKucharskiPK/query_PT
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
247 lines (205 loc) · 8.72 KB
/
Copy pathmain.py
File metadata and controls
247 lines (205 loc) · 8.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
import os
import time
import subprocess
import requests
import pandas as pd
import glob
from time import sleep
#PARAMS
from utils import get_latest, merge_batches
SLEEP_SINGLE = 0.1 # pause between queries so that we do not flood server
SLEEP_BATCH = 10 # sleep between batches of requests, not to floog server
SLEEP_RESTART = 60 # sleep after restarting the server (if needed)
BATCH_SIZE = 50 # store results every batch_size trips and give a break to a server
RESTART_EVERY = 999999 # restart after every batch
OTP_API = "http://localhost:8080/otp/routers/default/plan"
QUERY_MODES = "TRANSIT,WALK"
MAX_WALK_DISTANCE = 2000
def make_query(row):
"""
creates OTP query from the single row in requests
"""
query = dict()
query['fromPlace'] = "{},{}".format(row.origin_y, row.origin_x)
query['toPlace'] = "{},{}".format(row.destination_y, row.destination_x)
hour, minute = row.treq.hour, row.treq.minute
if int(hour) < 12:
ampm = 'am'
else:
hour = int(hour) - 12
ampm = 'pm'
query['time'] = "{:02d}:{}{}".format(int(hour), minute, ampm)
query['date'] = "{}-{}-{}".format(row.treq.month, row.treq.day, row.treq.year)
query['mode'] = QUERY_MODES
query['maxWalkDistance'] = MAX_WALK_DISTANCE
query['arriveBy'] = 'false'
return query
def parse_OTP_response(response):
'''
parses OTP response (.json) and populates dictionary of PT trip attributes
:param response: OTP server response
:param store_modes: do we store information on modes
:return: one row of resulting database
'''
if 'plan' in response.keys():
plan = response['plan']
modes = list()
shortest = 0
duration = 99999
# find the shortest
for it in plan['itineraries']:
dur = it['duration']
if dur < duration:
duration = dur
shortest = it
total_fare = 0
# Track combinations of operator and mode that we've seen
encountered_operator_modes = set()
for leg in shortest['legs']:
mode = leg['mode']
leg_duration = int(leg['duration'])
leg_distance = int(leg['distance'])
# Get operator if available (default to None if not present)
operator = leg.get('agencyId') or leg.get('agency') or leg.get('operator')
# Combine operator and mode to create a unique key
operator_mode_key = f"{operator}_{mode}" if operator else mode
# Check if this is a new operator-mode combination and if the mode has a base fare
if mode in mode_base_fares and operator_mode_key not in encountered_operator_modes:
encountered_operator_modes.add(operator_mode_key)
total_fare += mode_base_fares[mode]
# Calculate distance-based fare for this leg
distance_km = leg_distance / 1000 # Assuming distance is in meters
price_per_km = mode_prices.get(mode, 0) # Default to 0 if mode not found
leg_fare = price_per_km * distance_km
total_fare += leg_fare
# Append leg information to modes list
modes.append([leg['mode'], leg_duration, leg_distance])
# Round the fare to 2 decimal places
total_fare = round(total_fare, 2)
ret = {'success': True,
"n_itineraries": len(plan['itineraries']),
'duration': shortest['duration'],
'walkDistance': shortest['walkDistance'],
'transfers': shortest['transfers'],
'transitTime': shortest['transitTime'],
'waitingTime': shortest['waitingTime'],
'fare': total_fare,
'modes': modes}
else:
ret = {'success': False}
# ret_str = """Trip from ({:.4f},{:.4f}) to ({:.4f},{:.4f}) at {}.
# \n{} connections found. \nBest one is {:.0f}min ({:.0f}m walk, {} transfer(s), wait time {:.2f}min)""".format(ret)
return ret
def query_dataset(PATH, OUTPATH, BATCHES_PATH = None):
df = pd.read_csv(PATH, index_col=[0]).sort_index() # load the csv
df.treq = pd.to_datetime(df.treq)
first_index = get_latest(OUTPATH)
if first_index > 0:
df = df.loc[first_index:]
print('trips processed so far: ', first_index)
print('trips to process ', df.shape[0])
# loop over batches
for batch in range(((max(BATCH_SIZE, df.shape[0]) - 1) // BATCH_SIZE + 1)):
end_batch_idx = BATCH_SIZE * (batch + 1) if (BATCH_SIZE * (batch + 1)) <= df.shape[0] else df.shape[0]
batch_df = df.iloc[BATCH_SIZE * batch:end_batch_idx] # process this batch only
print(BATCH_SIZE * batch,end_batch_idx)
queries = batch_df.apply(make_query, axis=1) # make OTP query for each trip in dataset
ret_list = list()
for id, query in queries.items():
try:
r = requests.get(OTP_API, params=query)
ret = parse_OTP_response(r.json())
except Exception as err:
print(f"Exception occured: {err}")
ret = {'success': False}
pass
ret['id'] = id
print(id, ret['success'])
if not ret['success']:
print('Not found for: ', query)
ret_list.append(ret)
sleep(SLEEP_SINGLE) # Time in seconds
if len(ret_list) > 0:
batch_out = pd.DataFrame(ret_list).set_index('id').sort_index()
batch_name = '{}_{}.csv'.format(batch_df.index.min(), batch_df.index.max())
batch_out.to_csv(os.path.join(BATCHES_PATH, batch_name))
print('batch {} saved with {} out of {} trips success'.format(batch,
batch_out[batch_out.success].shape[0],
BATCH_SIZE))
sleep(SLEEP_BATCH) # Time in seconds
if batch >= RESTART_EVERY:
print("scheduled server restart")
return -1
return 1
def main(start_server = True):
OTP_PATH = "otp-2.3.0-shaded.jar" # path to OTP executable
CITY_PATH = "data" # folder with osm, gtfs files and/or graph.obj
PATH = 'georequests.csv' # path with trips to query
OUTPATH = PATH[:-4] + "_PT.csv"
BATCHES_PATH = 'batches' # path with trips to query
if not os.path.exists(BATCHES_PATH):
os.makedirs(BATCHES_PATH)
# else:
# files = glob.glob(os.path.join(BATCHES_PATH,'*'))
# for f in files:
# os.remove(f)
# remove all prevoius data from batches directory
print('starting server')
#run java server
if start_server:
with open("stdout.txt", "wb") as out, open("stderr.txt", "wb") as err:
p = subprocess.Popen(['java', '-Xmx12G', '-jar', OTP_PATH, '--build', CITY_PATH, '--inMemory'],
stdout=out, stderr=err)
while True:
if 'Grizzly server running' in open('stdout.txt').read():
print('server_running')
break
time.sleep(1)
flag = query_dataset(PATH, OUTPATH, BATCHES_PATH)
if flag < 0:
print('terminating server')
time.sleep(SLEEP_RESTART)
else:
print('flag positive')
if start_server:
p.terminate()
print('merging processed batches')
merge_batches(path=BATCHES_PATH, out_path=OUTPATH, remove=False)
def test_server(dataset_path):
batch_df = pd.read_csv(dataset_path, index_col=[0]).sample(5) # load the csv
queries = batch_df.apply(make_query, axis=1) # make OTP query for each trip in dataset
print('test server on 5 sample trips')
ret_dict = list()
for id, query in queries.items():
try:
r = requests.get(OTP_API, params=query)
ret = parse_OTP_response(r.json())
print(query, ret)
except Exception as err:
print(f"Exception occured: {err}")
ret = {'success': False}
pass
ret['id'] = id
print(id, ret['success'])
ret_dict.append(ret)
if __name__ == "__main__":
# Define prices per kilometer for different modes
mode_prices = {
'BUS': 0.20,
'SUBWAY': 0.20,
'RAIL': 0.12,
'WALK': 0.00,
'BICYCLE': 0.00,
'TRAM': 0.20,
'FERRY': 0.15,
}
# Define base fares for different modes
mode_base_fares = {
'BUS': 1.0, # €1.00 base fare
'SUBWAY': 1.0, # €1.00 base fare
'RAIL': 3.0, # €3.00 base fare
'TRAM': 1.0, # €1.00 base fare
'FERRY': 1.5 # €1.50 base fare
# Walking and cycling typically don't have base fares
}
main(start_server=False)