-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodule.py
More file actions
218 lines (192 loc) · 7.33 KB
/
module.py
File metadata and controls
218 lines (192 loc) · 7.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import json
import requests
def print_blue_text(text):
blue_text = "\033[34m" + text + "\033[0m"
print(blue_text)
def print_red_text(text):
red_text = "\033[31m" + text + "\033[0m"
print(red_text)
def get_new_access_token(refresh_token, client_id, client_secret):
token_url = 'https://signin.tradestation.com/oauth/token'
headers = {'content-type': 'application/x-www-form-urlencoded'}
payload = {
'grant_type': 'refresh_token',
'client_id': client_id,
'client_secret': client_secret,
'refresh_token': refresh_token
}
response = requests.post(token_url, data=payload, headers=headers)
if response.status_code == 200:
data = response.json()
access_token = data.get('access_token')
expires_in = data.get('expires_in')
return access_token, expires_in
else:
raise Exception(f"Failed to get a new access token. Response: {response.text}")
def refresh_access_token():
with open(r"C:\\Users\\INSERT A VALID PATH HERE\\private.json", 'r') as file:
private_data = json.load(file)
with open(r"C:\\Users\\INSERT A VALID PATH HERE\\refreshToken.json", 'r') as file:
refresh = json.load(file)
client_id = private_data['client_id']
client_secret = private_data.get('client_secret', '')
refresh_token = refresh['refresh']
on = True
while on == True:
try:
access_token, expires_in = get_new_access_token(refresh_token, client_id, client_secret)
data = {"accessToken": access_token}
with open(r"C:\\Users\\INSERT A VALID PATH HERE\\access_token.json", "w") as json_file:
json.dump(data, json_file)
print(f"New access token aquired. Expires in: {expires_in} seconds")
time.sleep(expires_in - 60)
except KeyboardInterrupt:
on = False
print_red_text("Shutting down the refresh loop...")
sys.exit(0)
except Exception as e:
print(f"Error: {e}")
break
def load_access_token():
path = 'C:\\Users\\admin\\Desktop\\Million Dollar Business\\argonaut\\access_token.json'
with open(f'{path}') as file:
data = json.load(file)
return data['accessToken']
def placeTradeFlat(quantityEnter, direction):
access_token = load_access_token()
account_id = 'SIM2504240F'
symbol = 'MNQU23'
quantity = quantityEnter
order_type = 'Market'
action = direction
time_in_force = {'Duration': 'GTC'}
route = 'Intelligent'
url = 'https://sim-api.tradestation.com/v3/orderexecution/orders'
headers = {
'content-type': 'application/json',
'Authorization': f'Bearer {access_token}',
}
data = {
'AccountID': account_id,
'Symbol': symbol,
'Quantity': quantity,
'OrderType': order_type,
'TradeAction': action,
'TimeInForce': time_in_force,
'Route': route,
}
try:
response = requests.post(url, headers=headers, json=data)
response_data = response.json()
print(response_data)
print(f'Order for {direction} {symbol} placed successfully.')
except Exception as e:
print_red_text("\n------ CRITICAL ------")
print_red_text("Position exit order failed.")
print_red_text("---- KILL PROGRAM ----")
raise e
def placeTrade(quantityEnter, quantityEnter2, direction, tsdirec, tspoints):
access_token = load_access_token()
account_id = 'SIM2504240F'
symbol = 'MNQU23'
quantity = quantityEnter
quantity2 = quantityEnter2
order_type = 'Market'
action = direction
actionStop = tsdirec
points = tspoints
url = 'https://sim-api.tradestation.com/v3/orderexecution/orders'
headers = {
'content-type': 'application/json',
'Authorization': f'Bearer {access_token}',
}
data = {
"AccountID": f"{account_id}",
"TimeInForce": {
"Duration": "DAY"
},
"Quantity": f"{quantity}",
"OrderType": f"{order_type}",
"Symbol": f"{symbol}",
"TradeAction": f"{action}",
"Route": "Intelligent",
"OSOs": [
{
"Type": "NORMAL",
"Orders": [
{
"AccountID": f"{account_id}",
"TimeInForce": {
"Duration": "DAY"
},
"Quantity": f"{quantity2}",
"OrderType": "StopMarket",
"Symbol": f"{symbol}",
"TradeAction": f"{actionStop}",
"Route": "Intelligent",
"AdvancedOptions": {
"TrailingStop": {
"Amount": f"{points}"
}
}
}
]
}
]
}
try:
response = requests.post(url, headers=headers, json=data)
response_data = response.json()
print(response_data)
print(f'Order for {direction} {symbol} placed successfully.')
try:
first_order = response_data['Orders'][0]
first_order_id = first_order['OrderID']
print(f'Order ID for the trailingstop is {first_order_id}')
path = 'C:\\Users\\admin\\Desktop\\Million Dollar Business\\argonaut\\ts_orderid.txt'
with open(f'{path}', 'w') as f:
f.write(first_order_id)
except Exception as e:
print_red_text("\n------ WARNING ------")
print_red_text("Stop ID logging failed.")
print_red_text("---- END WARNING ----")
except Exception as e:
print_red_text("\n------ WARNING ------")
print_red_text("Order placement loop failed.")
print_red_text("---- END WARNING ----")
def cancel_order():
try:
path = 'C:\\Users\\admin\\Desktop\\Million Dollar Business\\argonaut\\ts_orderid.txt'
with open(f'{path}', 'r') as f:
order_id = f.read().strip()
except FileNotFoundError:
order_id = None
access_token = load_access_token()
url = f"https://sim-api.tradestation.com/v3/orderexecution/orders/{order_id}"
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.request("DELETE", url, headers=headers)
if response.status_code == 200:
print("\nStop order successfully canceled.")
path = 'C:\\Users\\admin\\Desktop\\Million Dollar Business\\argonaut\\ts_orderid.txt'
with open(f'{path}', 'w') as f:
f.write("")
else:
print_red_text(f"Failed to cancel the stop order. Status code: {response.status_code}, Response: {response.text}")
def openTrade():
url = 'https://sim-api.tradestation.com/v3/brokerage/accounts/SIM2504240F/positions?symbol=MNQU23'
access_token = load_access_token()
headers = {
'content-type': 'application/json',
'Authorization': f'Bearer {access_token}',
}
response = requests.request("GET", url, headers=headers)
data = response.json()
try:
if data["Positions"]:
return True
else:
return False
except Exception as e:
print_red_text("\n------ WARNING ------")
print_red_text("Your access token is expired.")
print_red_text("---- END WARNING ----")