-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathface_report.py
More file actions
206 lines (189 loc) · 9.58 KB
/
face_report.py
File metadata and controls
206 lines (189 loc) · 9.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
###################################################################################
# Copyright (c) 2021 Rhombus Systems #
# #
# Permission is hereby granted, free of charge, to any person obtaining a copy #
# of this software and associated documentation files (the "Software"), to deal #
# in the Software without restriction, including without limitation the rights #
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell #
# copies of the Software, and to permit persons to whom the Software is #
# furnished to do so, subject to the following conditions: #
# #
# The above copyright notice and this permission notice shall be included in all #
# copies or substantial portions of the Software. #
# #
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR #
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, #
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE #
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER #
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, #
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE #
# SOFTWARE. #
###################################################################################
import requests
from datetime import datetime, timedelta
import time
import json
import calendar
import csv
import sys
import os
import argparse
import urllib3
#to disable warnings for not verifying host
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class faceProject:
def __init__(self, cli_args):
arg_parser = self.__initalize_argument_parser()
self.args = arg_parser.parse_args(cli_args)
self.api_url = "https://api2.rhombussystems.com"
self.api_sess = requests.session()
self.api_sess.headers = {
"x-auth-scheme": "api-token",
"x-auth-apikey": self.args.APIkey}
self.media_sess = requests.session()
self.media_sess.headers = {
"x-auth-scheme": "api-token",
"x-auth-apikey": self.args.APIkey}
today = datetime.now().replace(microsecond=0, second=0, minute=0)
self.end_time = today
self.start_time = (self.end_time - timedelta(days=365))
@staticmethod
def __initalize_argument_parser():
parser = argparse.ArgumentParser(
description= "Gets a report of the recent faces and downloads the pictures of each face."
)
#aruements avaiable for the user to customize
parser.add_argument('APIkey', type=str, help='Get this from your console')
parser.add_argument('-s', '--startTime', type=str, help='Add the end search time in yyyy-mm-dd~(0)0:00:00 or default to 1 hour before current time')
parser.add_argument('-e', '--endTime', type=str, help='Add the end search time in yyyy-mm-dd~(0)0:00:00 or default to current time')
parser.add_argument('-f', '--filter', type=str, help= 'Choose a filter', choices=['alert','trusted','named','other'], default='named')
parser.add_argument('-n', '--name', type = str, help = 'Searches for name')
parser.add_argument('-c', '--cameraName', type=str, help='Name of camera in the console')
parser.add_argument('--csv', type=str, help= 'Name the csv file', default='report')
parser.add_argument('-r', '--report', type=str, help='Name the folder for csv file and thumbnails', default='Report')
return parser
#decrypts the jpg and saves the image to a folder
def saving_img(self):
#url of the api
endpoint = self.thumbnail
resp = self.media_sess.get(endpoint,
verify=False)
content = resp.content
#opens the folder and writes the image to it
with open(self.args.report +'/' + self.name + '_' + str(self.count + 1) + '.jpg', 'wb') as f:
# write the data
f.write(content)
f.close()
self.fileName = (self.args.report +'/' + self.name + '_' + str(self.count + 1) + '.jpg')
#converts the ms time to a timestamp
def human_time(self, event):
event = event/1000
timestamp = time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime(event))
return timestamp
#converts the timestamp to ms time
def milliseconds_time(self, human):
# converts human time to ms. the +25200000 is to get it to local time and not GMT
ms_time = (calendar.timegm(time.strptime(human, '%Y-%m-%d~%H:%M:%S')) * 1000) + 25200000
return ms_time
#gets the api data about the cameras in the console and returns it
def camera_data(self):
# url of the api
endpoint = self.api_url + "/api/camera/getMinimalCameraStateList"
# any parameters
payload = {
}
resp = self.api_sess.post(endpoint, json=payload,
verify=False)
content = resp.content
self.data_camera = json.loads(content)
#gets the camera_name from the uuid
def camera_name(self, uuid, data_camera):
for value in data_camera['cameraStates']:
if uuid == value['uuid']:
return value['name']
#uses the api to get the recent faces
def recent_faces(self):
# url of the api
endpoint = self.api_url + "/api/face/getRecentFaceEventsV2"
#checks to see if the user put in a start and end time if not it defaults to one hour before current
if self.args.startTime:
start = self.milliseconds_time(self.args.startTime)
else:
start = int(round((time.time() - 3600) * 1000))
if self.args.endTime:
end = self.milliseconds_time(self.args.endTime)
else:
end = int(round(time.time() * 1000))
#any parameters
payload = {
"filter":{"types":[self.args.filter]}, #arguement to filter alert, trusted, named, or other
"interval":{
"end": end,
"start": start
}
}
resp = self.api_sess.post(endpoint, json=payload,
verify=False)
content = resp.content
data = json.loads(content)
return data
#adds the name, timestamp, camera name, and sighting number to a csv
def csv_add(self, value):
timestamp = self.human_time(value['eventTimestamp'])
self.csv_data.append([])
self.name = value['faceName']
self.csv_data[self.count].append(self.name)
self.csv_data[self.count].append(timestamp)
camera = self.camera_name(value['deviceUuid'], self.data_camera)
self.csv_data[self.count].append(camera)
self.thumbnail = self.mediaBaseURL + value['thumbnailS3Key']
self.saving_img()
self.csv_data[self.count].append(self.fileName)
with open(self.args.report + '/' + self.args.csv + '.csv', 'w', newline = '') as f:
writer = csv.writer(f) # create the csv writer
writer.writerow(self.header) # write the header
writer.writerows(self.csv_data) # write the data
def execute(self):
self.count = 0
self.mediaBaseURL = "https://media.rhombussystems.com/media/faces?s3ObjectKey="
self.header = ['Name', 'Date', 'Camera', 'Image File Name']
self.csv_data = []
data_recentFaces = self.recent_faces()
self.camera_data()
#gets a path and makes a directory file to the path
path = os.getcwd()
if os.path.exists(path + '/' + self.args.report) == False:
os.mkdir(path + '/' + self.args.report)
#checks if there is an arguement for a name to filter and only get instances with them
if self.args.name:
final_list = [event for event in data_recentFaces['faceEvents'] if event["faceName"] == self.args.name]
for value in final_list:
self.csv_add(value)
self.count += 1
#checks for an arguement to filter only certain cameras
elif self.args.cameraName:
for value in self.data_camera['cameraStates']:
if self.args.cameraName == value['name']:
uuid = value['uuid']
final_list = [event for event in data_recentFaces['faceEvents'] if event["deviceUuid"] == uuid]
for value in final_list:
self.csv_add(value)
self.count += 1
#checks for arguements of both certain cameras and a certain person to filter
elif self.args.cameraName and self.args.name:
for value in self.data_camera['cameraStates']:
if self.args.cameraName == value['name']:
uuid = value['uuid']
initial_list = [event for event in data_recentFaces['faceEvents'] if event["faceName"] == self.args.name]
final_list = [event for event in initial_list if event["deviceUuid"] == uuid]
for value in final_list:
self.csv_add(value)
self.count += 1
#if no filter arguements it just grabs all from that time
else:
for value in data_recentFaces['faceEvents']:
self.csv_add(value)
self.count += 1
if __name__ == "__main__":
engine = faceProject(sys.argv[1:])
engine.execute()