-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathescape_task.py
More file actions
183 lines (158 loc) · 7.91 KB
/
escape_task.py
File metadata and controls
183 lines (158 loc) · 7.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
from village.custom_classes.task import Task
import random
from sound_functions import crescendo_looming_sound
import time
import softcode_functions
from pathlib import Path
from bpod_mock import BpodMock
#from gpiozero import LED
class EscapeBehavior(Task):
def __init__(self):
super().__init__()
self.info = """
Escape Behavior Task
-------------------
Animals are placed in an open arena with a shelter located in one side.
There is a trigger zone defined by the user. When the animal enters this zone,
there is a chance that a looming sound is played, prompting the animal to escape
to the shelter.
"""
# create a list to hold the states
self.states_visited = []
# states are lists of [START,END,MSG]
self.animal_in_trigger_zone = False
def start(self):
#self.led = LED(18)
# Open the raw file once and keep it open
self.output_file = Path(self.rt_session_path)
# I thought I fixed this bug before... but apparently not. It should be a "w" instead of "a".
self.raw_file = self.output_file.open("w")
self.raw_file.write("TRIAL;START;END;MSG;VALUE\n")
# create a crescendo sound
self.crescendo_sound = crescendo_looming_sound(
amp_start=self.settings.starting_amplitude,
amp_end=self.settings.ending_amplitude,
ramp_duration=self.settings.ramp_duration,
ramp_down_duration=self.settings.ramp_down_duration,
hold_duration=self.settings.hold_duration,
n_repeats=self.settings.n_repeats,
)
# create a mock bpod object to log events
self.bpod_mock = BpodMock()
# sleep during exploration time
time.sleep(self.settings.exploration_time)
def create_trial(self):
#self.led.off()
self.trial_start_time = time.time()
# record it in the mock bpod
self.bpod_mock.reset_trial()
self.bpod_mock.current_trial["Trial start timestamp"] = self.trial_start_time
# stop any sound that might be playing
softcode_functions.function1()
# innitiate grace period state
self.current_state = [time.time() , "" , "grace_period"]
# reset the timer for triggering attempts
self.triggering_time_reset = time.time() - self.settings.time_between_sound_triggering_attempts
# load the sound
softcode_functions.function2()
# define the trigger zone as the second area in the cam_box
# TODO: add a warning if this is not on
self.trigger_zone = self.cam_box.areas[int(self.settings.trigger_area) - 1]
# get the current camera frame
self.last_camera_frame = self.cam_box.frame_number
# while loop to go through states
loop_counter = 0
while True:
loop_counter += 1
if loop_counter % 1000 == 0:
print(f"Escape Task Loop {loop_counter}, with x y positions {self.cam_box.x_position} {self.cam_box.y_position} and state {self.current_state[2]} for area {self.trigger_zone}")
time.sleep(0.001)
# if the frame of the camera has not changed, skip the rest of the loop
if self.cam_box.frame_number == self.last_camera_frame:
continue
# if the frame has changed, update the last camera frame
self.last_camera_frame = self.cam_box.frame_number
match self.current_state[2]:
case "grace_period":
if (time.time() - self.current_state[0]) > self.settings.grace_period:
if self.animal_in_trigger_zone:
self.change_state_to("animal_inside_trigger_zone")
else:
self.change_state_to("animal_outside_trigger_zone")
case "animal_outside_trigger_zone":
# self.cam_box.log("out")
# check if the animal is in the trigger zone
if self.animal_in_trigger_zone:
# self.cam_box.log("in")
#self.led.on()
self.register_event("animal_entered_trigger_zone")
self.change_state_to("animal_inside_trigger_zone")
case "animal_inside_trigger_zone":
# check if the animal is still in the trigger zone
if not self.animal_in_trigger_zone:
self.register_event("animal_exited_trigger_zone")
self.change_state_to("animal_outside_trigger_zone")
continue
# check if the time between triggering attempts has passed
if (time.time() - self.triggering_time_reset) < self.settings.time_between_sound_triggering_attempts:
continue
# if the time has passed, decide whether to play the sound or not
# check if we play the looming sound
if random.random() < self.settings.looming_sound_probability:
# play the sound
softcode_functions.function3()
# register the event in the raw file
self.register_event("sound_played")
self.change_state_to("sound_triggered")
else:
self.register_event("sound_not_played")
# set a timer to wait before trying to trigger the sound again
self.triggering_time_reset = time.time()
case "sound_triggered":
# wait for some time to let the animal escape and stuff
time.sleep(self.settings.time_to_wait_after_sound)
# finish the trial
self.register_event("trial_end")
return
def after_trial(self):
# write the end time of the ending state
self.current_state[1] = time.time()
# append the current state to the list of states visited
self.states_visited.append(self.current_state)
# write the states visited to the raw file
for state in self.states_visited:
self.raw_file.write(f"{self.current_trial};{state[0]};{state[1]};STATE_{state[2]};\n")
# also register it in the mock bpod
self.bpod_mock.record_state(state[2], [state[0], state[1]])
# write the location of the trigger zone
# TODO: use register_value function instead
self.raw_file.write(f"{self.current_trial};;;trigger_zone;{self.trigger_zone}\n")
self.bpod_mock.current_trial["trigger_zone"] = self.trigger_zone
# reset the list of states visited
self.states_visited = []
# reset the current state
self.current_state = ['', '', 'none']
# write to file the trial start and end
self.raw_file.write(f"{self.current_trial};{self.trial_start_time};{time.time()};TRIAL;\n")
# reset the mock bpod
self.bpod_mock.reset_trial()
def close(self):
# close the file
if not self.raw_file.closed:
self.raw_file.close()
def change_state_to(self, new_state: str):
# update the current state ending time
self.current_state[1] = time.time()
# Register the state change event
self.register_event(f"_Transition_to_{new_state}")
# append the current state to the list of states visited
self.states_visited.append(self.current_state)
# update the current state to the new state
self.current_state = [time.time(), '', new_state]
return
def register_event(self, msg: str, value = '') -> None:
time_of_event = time.time()
self.raw_file.write(f"{self.current_trial};{time_of_event};{''};{msg};{value}\n")
self.bpod_mock.record_event(msg, time_of_event)
self.bpod_mock.record_message(msg)
return