This repository was archived by the owner on Sep 16, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathspringerWrapper.py
More file actions
243 lines (194 loc) · 7.17 KB
/
springerWrapper.py
File metadata and controls
243 lines (194 loc) · 7.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
#!/usr/bin/env python3
import json
from typing import Union
import urllib.parse, urllib.request
import xml.etree.ElementTree as ET
from .wrapperInterface import WrapperInterface
class SpringerWrapper(WrapperInterface):
def __init__(self, apiKey: str):
self.apiKey = apiKey
self.__resultFormat = "json"
self.__collection = "metadata"
self.__startRecord = 1
self.__parameters = {}
# Endpoint used for the query
@property
def endpoint(self) -> str:
return "http://api.springernature.com"
# A dictionary that contains the available result formats for each collection
@property
def allowedResultFormats(self) -> {str: [str]}:
return {
"meta/v2": ["pam", "jats", "json", "jsonp", "jsonld"],
"metadata": ["pam", "json", "jsonp"],
"openaccess": ["jats", "json", "jsonp"],
"integro": ["xml"]
}
# The result format that will be used for the query
@property
def resultFormat(self) -> str:
return self.__resultFormat
# resultFormat must be settable
@resultFormat.setter
def resultFormat(self, value: str):
# Strip leading and trailing whitespace and convert to lower case
value = str(value).strip().lower()
# Check if the format is supported by the selected collection
if value in self.allowedResultFormats[self.collection]:
self.__resultFormat = value
else:
raise ValueError(f"Illegal format {value} for collection {self.collection}")
# Collection being used for the query
@property
def collection(self) -> str:
return self.__collection
# collection must be settable
@collection.setter
def collection(self, value: str):
# Strip leading and trailing whitespace and convert to lower case
value = str(value).strip().lower()
if not value in self.allowedResultFormats:
raise ValueError(f"Unknown collection {value}")
# Adjust resultFormat
if self.resultFormat not in self.allowedResultFormats[value]:
self.resultFormat = self.allowedResultFormats[value][0]
print(f"Illegal resultFormat for collection. Setting to {self.resultFormat}")
self.__collection = value
# Maximum number of results that the API will return
@property
def maxRecords(self) -> int:
if self.collection == "openaccess":
return 20
return 100
# Dictionary of allowed keys and their allowed values for searchField()
@property
def allowedSearchFields(self) -> {str: [str]}:
return {
"doi":[], "subject":[], "keyword":[], "pub":[], "year":[],
"onlinedate":[], "onlinedatefrom":[], "onlinedateto": [],
"country":[], "isbn":[], "issn":[], "journalid":[],
"topicalcollection":[], "journalonlinefirst":["true"],
"date":[], "issuetype":[], "issue":[], "volume":[],
"type":["Journal", "Book"]
}
# Specify value for a given search parameter for manual search
def searchField(self, key: str, value):
# Convert to lowercase and strip leading and trailing whitespace
key = str(key).strip().lower()
value = str(value).strip()
if len(value) == 0:
raise ValueError(f"Value is empty")
# Check if key and value are allowed as combination
if key in self.allowedSearchFields:
if len(self.allowedSearchFields[key]) == 0 or value in self.allowedSearchFields[key]:
self.__parameters[key] = value
else:
raise ValueError(f"Illegal value {value} for search-field {key}")
else:
raise ValueError(f"Searches against {key} are not supported")
# Reset all search parameters
def resetAllFields(self):
self.__parameters = {}
# Reset a search parameter
def resetField(self, key: str):
if key in self.__parameters:
del self.__parameters[key]
else:
raise ValueError(f"Field {key} is not set.")
# Build API query without the search terms
def queryPrefix(self) -> str:
url = self.endpoint
url += "/" + str(self.collection)
url += "/" + str(self.resultFormat)
url += "?api_key=" + str(self.apiKey)
url += "&s=" + str(self.__startRecord)
url += "&p=" + str(self.maxRecords)
return url
# Build a manual query from the keys and values specified by searchField
def buildQuery(self) -> str:
if len(self.__parameters) == 0:
raise ValueError("No search-parameters set.")
url = self.queryPrefix()
url += "&q="
# Add url encoded key, value pair to query
for key, value in self.__parameters.items():
url += key + ":" + urllib.parse.quote_plus(value) + "+"
url = url[:-1]
return url
# Builds a search group by inserting the connector between the search terms
def buildGroup(self, items: [str], connector: str) -> str:
group = "("
# connect with OR and negate group if connector is NOT
if connector == "NOT":
group = "-" + group
connector = "OR"
# Insert and combine
group += ("+" + connector + "+").join(items)
group += ")"
return group
# Translate a search in the wrapper input format into a query that the wrapper api understands
def translateQuery(self, query: dict) -> str:
url = self.queryPrefix()
url += "&q="
groups = query["search_groups"].copy()
for i in range(len(groups)):
for j in range(len(groups[i]["search_terms"])):
term = groups[i]["search_terms"][j]
# Enclose seach term in quotes if it contains a space to prevent splitting
if " " in term:
term = '"' + term + '"'
# Urlencode search term
groups[i]["search_terms"][j] = urllib.parse.quote(term)
groups[i] = self.buildGroup(groups[i]["search_terms"], groups[i]["match"])
url += self.buildGroup(groups, query["match"])
return url
# Set the index from which the returned results start
# (Necessary if there are more hits for a query than the maximum number of returned results.)
def startAt(self, value: int):
self.__startRecord = int(value)
# Format raw resonse to set format
def formatResponse(self, response: bytes, query: str):
if self.resultFormat == "json" or self.resultFormat == "jsonld":
# Load into dict
response = json.loads(response)
# Modify response to fit the defined wrapper output format
response["dbquery"] = response.pop("query")
response["query"] = query
del response["facets"]
del response["apiMessage"]
for record in response["records"]:
record["uri"] = record["url"][0]["value"]
del record["url"]
del record["identifier"]
authors = []
for author in record["creators"]:
authors.append(author["creator"])
record["authors"] = authors
del record["creators"]
record["pages"] = {
"first": record.pop("startingPage") if "startingPage" in record else "",
"last": record.pop("endingPage") if "endingPage" in record else ""
}
if self.collection == "openaccess":
record["openAccess"] = True
else:
record["openAccess"] = (record.pop("openaccess") == "true")
return response
elif self.resultFormat == "xml" or self.resultFormat == "pam":
return ET.ElementTree(ET.fromstring(response))
else:
print(f"No formatter defined for {self.resultFormat}. Returning raw response.")
return response
# Make the call to the API
# If no query is given, use the manual search specified by searchField() calls
def callAPI(self, query: Union[dict, None] = None, raw: bool = False, dry: bool = False):
if not query:
url = self.buildQuery()
else:
url = self.translateQuery(query)
if dry:
return url
response = urllib.request.urlopen(url).read()
if raw:
return response
return self.formatResponse(response, query)