forked from pycontribs/python-crowd
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcrowd.py
More file actions
401 lines (299 loc) · 12 KB
/
crowd.py
File metadata and controls
401 lines (299 loc) · 12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
# Copyright 2012 Alexander Else <aelse@else.id.au>.
#
# This file is part of the python-crowd library.
#
# python-crowd is free software released under the BSD License.
# Please see the LICENSE file included in this distribution for
# terms of use. This LICENSE is also available at
# https://github.com/aelse/python-crowd/blob/master/LICENSE
import json
import requests
class CrowdServer(object):
"""Crowd server authentication object.
This is a Crowd authentication class to be configured for a
particular application (app_name) to authenticate users
against a Crowd server (crowd_url).
This module uses the Crowd JSON API for talking to Crowd.
An application account must be configured in the Crowd server
and permitted to authenticate users against one or more user
directories prior to using this module.
Please see the Crowd documentation for information about
configuring additional applications to talk to Crowd.
The ``ssl_verify`` parameter controls how and if certificates are verified.
If ``True``, the SSL certificate will be verified.
A CA_BUNDLE path can also be provided.
"""
def __init__(self, crowd_url, app_name, app_pass, ssl_verify=True):
self.crowd_url = crowd_url
self.app_name = app_name
self.app_pass = app_pass
self.rest_url = crowd_url.rstrip("/") + "/rest/usermanagement/1"
self.session = requests.Session()
self.session.verify = ssl_verify
self.session.auth = requests.auth.HTTPBasicAuth(app_name, app_pass)
self.session.headers.update({
"Content-type": "application/json",
"Accept": "application/json"
})
def __str__(self):
return "Crowd Server at %s" % self.crowd_url
def __repr__(self):
return "<CrowdServer('%s', '%s', '%s')>" % \
(self.crowd_url, self.app_name, self.app_pass)
def _get(self, *args, **kwargs):
"""Wrapper around Requests for GET requests
Returns:
Response:
A Requests Response object
"""
req = self.session.get(*args, **kwargs)
return req
def _post(self, *args, **kwargs):
"""Wrapper around Requests for POST requests
Returns:
Response:
A Requests Response object
"""
req = self.session.post(*args, **kwargs)
return req
def _delete(self, *args, **kwargs):
"""Wrapper around Requests for DELETE requests
Returns:
Response:
A Requests Response object
"""
req = self.session.delete(*args, **kwargs)
return req
def auth_ping(self):
"""Test that application can authenticate to Crowd.
Attempts to authenticate the application user against
the Crowd server. In order for user authentication to
work, an application must be able to authenticate.
Returns:
bool:
True if the application authentication succeeded.
"""
url = self.rest_url + "/non-existent/location"
response = self._get(url)
if response.status_code == 401:
return False
elif response.status_code == 404:
return True
else:
# An error encountered - problem with the Crowd server?
return False
def auth_user(self, username, password):
"""Authenticate a user account against the Crowd server.
Attempts to authenticate the user against the Crowd server.
Args:
username: The account username.
password: The account password.
Returns:
dict:
A dict mapping of user attributes if the application
authentication was successful. See the Crowd documentation
for the authoritative list of attributes.
None: If authentication failed.
"""
response = self._post(self.rest_url + "/authentication",
data=json.dumps({"value": password}),
params={"username": username})
# If authentication failed for any reason return None
if not response.ok:
return None
# ...otherwise return a dictionary of user attributes
return response.json()
def get_session(self, username, password, remote="127.0.0.1"):
"""Create a session for a user.
Attempts to create a user session on the Crowd server.
Args:
username: The account username.
password: The account password.
remote:
The remote address of the user. This can be used
to create multiple concurrent sessions for a user.
The host you run this program on may need to be configured
in Crowd as a trusted proxy for this to work.
Returns:
dict:
A dict mapping of user attributes if the application
authentication was successful. See the Crowd
documentation for the authoritative list of attributes.
None: If authentication failed.
"""
params = {
"username": username,
"password": password,
"validation-factors": {
"validationFactors": [
{"name": "remote_address", "value": remote, }
]
}
}
response = self._post(self.rest_url + "/session",
data=json.dumps(params),
params={"expand": "user"})
# If authentication failed for any reason return None
if not response.ok:
return None
# Otherwise return the user object
return response.json()
def validate_session(self, token, remote="127.0.0.1"):
"""Validate a session token.
Validate a previously acquired session token against the
Crowd server. This may be a token provided by a user from
a http cookie or by some other means.
Args:
token: The session token.
remote: The remote address of the user.
Returns:
dict:
A dict mapping of user attributes if the application
authentication was successful. See the Crowd
documentation for the authoritative list of attributes.
None: If authentication failed.
"""
params = {
"validationFactors": [
{"name": "remote_address", "value": remote, }
]
}
url = self.rest_url + "/session/%s" % token
response = self._post(url, data=json.dumps(params), params={"expand": "user"})
# For consistency between methods use None rather than False
# If token validation failed for any reason return None
if not response.ok:
return None
# Otherwise return the user object
return response.json()
def terminate_session(self, token):
"""Terminates the session token, effectively logging out the user
from all crowd-enabled services.
Args:
token: The session token.
Returns:
True: If session terminated
None: If session termination failed
"""
url = self.rest_url + "/session/%s" % token
response = self._delete(url)
# For consistency between methods use None rather than False
# If token validation failed for any reason return None
if not response.ok:
return None
# Otherwise return True
return True
def add_user(self, username, **kwargs):
"""Add a user to the directory
Args:
username: The account username
**kwargs: key-value pairs:
password: mandatory
email: mandatory
first_name: optional
last_name: optional
display_name: optional
active: optional (default True)
Returns:
True: Succeeded
False: If unsuccessful
"""
# Check that mandatory elements have been provided
if 'password' not in kwargs:
raise ValueError("missing password")
if 'email' not in kwargs:
raise ValueError("missing email")
components = ['username', 'password', 'first_name',
'last_name', 'display_name', 'active']
# Populate data with default and mandatory values.
# A KeyError means a mandatory value was not provided,
# so raise a ValueError indicating bad args.
try:
data = {
"name": username,
"first-name": username,
"last-name": username,
"display-name": username,
"email": kwargs["email"],
"password": { "value": kwargs["password"] },
"active": True
}
except KeyError:
return ValueError
# Remove special case 'password'
del(kwargs["password"])
# Put values from kwargs into data
for k, v in kwargs.items():
new_k = k.replace("_", "-")
if new_k not in data:
raise ValueError("invalid argument %s" % k)
data[new_k] = v
response = self._post(self.rest_url + "/user",
data=json.dumps(data))
if response.status_code == 201:
return True
return False
def get_user(self, username):
"""Retrieve information about a user
Returns:
dict: User information
None: If no user or failure occurred
"""
response = self._get(self.rest_url + "/user",
params={"username": username,
"expand": "attributes"})
if not response.ok:
return None
return response.json()
def get_groups(self, username):
"""Retrieves a list of group names that have <username> as a direct member.
Returns:
list:
A list of strings of group names.
"""
response = self._get(self.rest_url + "/user/group/direct",
params={"username": username})
if not response.ok:
return None
return [g['name'] for g in response.json()['groups']]
def get_nested_groups(self, username):
"""Retrieve a list of all group names that have <username> as a direct or indirect member.
Args:
username: The account username.
Returns:
list:
A list of strings of group names.
"""
response = self._get(self.rest_url + "/user/group/nested",
params={"username": username})
if not response.ok:
return None
return [g['name'] for g in response.json()['groups']]
def get_nested_group_users(self, groupname):
"""Retrieves a list of all users that directly or indirectly belong to the given groupname.
Args:
groupname: The group name.
Returns:
list:
A list of strings of user names.
"""
response = self._get(self.rest_url + "/group/user/nested",
params={"groupname": groupname,
"start-index": 0,
"max-results": 99999})
if not response.ok:
return None
return [u['name'] for u in response.json()['users']]
def user_exists(self, username):
"""Determines if the user exists.
Args:
username: The user name.
Returns:
bool:
True if the user exists in the Crowd application.
"""
response = self._get(self.rest_url + "/user",
params={"username": username})
if not response.ok:
return None
return True