Skip to content
This repository was archived by the owner on Feb 25, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 155 additions & 36 deletions openalexapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@
Copyright 2022 Dennis Priskorn
"""
import logging
from typing import Optional, List
from typing import Optional, List, Union

import backoff # type: ignore
import requests
from pydantic import BaseModel, EmailStr

from openalexapi.basetype import OpenAlexBaseType
from openalexapi.work import Work
from openalexapi.author import Author, DehydratedAuthor
from openalexapi.concept import Concept, DehydratedConcept
from openalexapi.venue import Venue, DehydratedVenue, HostVenue
from openalexapi.institution import Institution, DehydratedInstitution


logger = logging.getLogger(__name__)

Expand All @@ -20,14 +26,48 @@ class OpenAlex(BaseModel):
:parameter=email
"""
email: Optional[EmailStr]
base_url = "https://api.openalex.org/"
page_limit: int = 50
_base_url: str = "https://api.openalex.org/"
_headers: dict = {
"Accept": "application/json",
"User-Agent": f"OpenAlexAPI https://github.com/dpriskorn/OpenAlexAPI"
}
#Convenience dict because dehydrated entities do not have works_api_urls and annoying inconsistencies in endpoints (institution vs instititions, host_venue vs venue)
_works_urls: dict = {
Author: _base_url+"works?filter=author.id:",
DehydratedAuthor: _base_url+"works?filter=author.id:",
Concept: _base_url+"works?filter=concept.id:",
DehydratedConcept: _base_url+"works?filter=concept.id:",
Institution: _base_url+"works?filter=institutions.id:",
DehydratedInstitution: _base_url+"works?filter=institutions.id:",
Venue: _base_url+"works?filter=host_venue.id:",
DehydratedVenue: _base_url+"works?filter=host_venue.id:",
HostVenue: _base_url+"works?filter=host_venue.id:"
}
_entities_prefixes: dict = {
'A':Author,
'C':Concept,
'I':Institution,
'V':Venue,
'W':Work
}

class Config:
underscore_attrs_are_private = True

def set_email(self,email: EmailStr):
self.email = email
self._headers = {
"Accept": "application/json",
"User-Agent": f"OpenAlexAPI https://github.com/dpriskorn/OpenAlexAPI mailto:{self.email}"
}

@backoff.on_exception(backoff.expo,
(requests.exceptions.Timeout,
requests.exceptions.ConnectionError),
max_time=60,
on_backoff=print(f"Backing off"))
def get_single_work(self, id: str) -> Optional[Work]:
(requests.exceptions.Timeout,
requests.exceptions.ConnectionError),
max_time=60,
on_backoff=print(f"Backing off"))
def get_single_entity(self, id: str) -> Union[Author,Concept,Institution,Venue,Work]:
"""This models the single work entity endpoint

:parameter id can be and OpenAlex ID e.g. "W123" or a namespace ID like "doi:10.123"
Expand All @@ -39,27 +79,35 @@ def get_single_work(self, id: str) -> Optional[Work]:
"Please be nice and supply your email as the first argument "
"when calling this class to get into the polite pool. This way "
"OpenAlex can contact you if needed.")
url = self.base_url + "works/" + id
id =id.replace("https://openalex.org/", "")
if id[0] in self._entities_prefixes: etype = self._entities_prefixes[id[0]]
else: raise ValueError("Id prefix does not correspond to a valid entity.")
url = self._base_url + etype.__name__.lower() + 's/' + id
logger.debug(f"Fetching {url}")
headers = {
"Accept": "application/json",
"User-Agent": f"OpenAlexAPI https://github.com/dpriskorn/OpenAlexAPI mailto:{self.email}"
}
response = requests.get(url, headers=headers)
response = requests.get(url, headers=self._headers)
if response.status_code == 200:
return Work(**response.json())
return etype(**response.json())
elif response.status_code == 404:
return None
else:
raise ValueError(f"Got {response.status_code} from OpenAlex")

# TODO: Adapt this to support multiple namespaces
@backoff.on_exception(backoff.expo,
(requests.exceptions.Timeout,
requests.exceptions.ConnectionError),
max_time=60,
on_backoff=print(f"Backing off"))
def hydrate_entity(self, dehydrated_entity: Union[DehydratedAuthor, DehydratedConcept, DehydratedInstitution, DehydratedVenue, HostVenue]) -> Union[Author,Concept,Institution,Venue]:
return self.get_single_entity(dehydrated_entity.id)

# TODO: Adapt this to support multiple namespaces
@backoff.on_exception(backoff.expo,
(requests.exceptions.Timeout,
requests.exceptions.ConnectionError),
max_time=60,
on_backoff=print(f"Backing off"))
def get_multiple_works(self, ids: List[str]) -> List[Work]:
def get_entities(self, ids: List[str]) -> Union[List[Author],List[Concept],List[Institution],List[Venue],List[Work]]:
"""Fetches multiple works by OpenAlex IDs. Note this does not support
alternative namespaces.

Expand All @@ -72,25 +120,31 @@ def get_multiple_works(self, ids: List[str]) -> List[Work]:
"Please be nice and supply your email as the first argument "
"when calling this class to get into the polite pool. This way "
"OpenAlex can contact you if needed.")
headers = {
"Accept": "application/json",
"User-Agent": f"OpenAlexAPI https://github.com/dpriskorn/OpenAlexAPI mailto:{self.email}"
}
ids = [s.replace("https://openalex.org/", "") for s in ids]
works = []
# Limit of 50 is imposed by OpenAlex API
for i in range(0, len(ids), 50):
url_ids = '|'.join(ids[i:i+50])
url = self.base_url + f"works?filter=openalex_id:{url_ids}&per_page=50"
logger.debug(f"Fetching works {i} through {i+50}")
response = requests.get(url, headers=headers)
if ids[0][0] in self._entities_prefixes: etype = self._entities_prefixes[ids[0][0]]
else: raise ValueError("Id prefix does not correspond to a valid entity.")
entities = []
for i in range(0, len(ids), self.page_limit):
url_ids = '|'.join(ids[i:i+self.page_limit])
url = self._base_url + f"{etype.__name__.lower()}s?filter=openalex_id:{url_ids}&per_page={self.page_limit}"
logger.debug(f"Fetching {etype.__name__.lower()}s {i} through {i+self.page_limit}")
response = requests.get(url, headers=self._headers)
if response.status_code == 200:
works += [Work(**w) for w in response.json()['results']]
entities += [etype(**e) for e in response.json()['results']]
elif response.status_code == 403:
raise ValueError("Got error 403. Are you using OpenAlex IDs?")
else:
raise ValueError(f"Got {response.status_code} from OpenAlex")
return works
return entities

@backoff.on_exception(backoff.expo,
(requests.exceptions.Timeout,
requests.exceptions.ConnectionError),
max_time=60,
on_backoff=print(f"Backing off"))
def hydrate_entities(self, dehydrated_entities: Union[List[DehydratedAuthor],List[DehydratedConcept],List[DehydratedInstitution],List[DehydratedVenue],List[HostVenue]]) -> Union[List[Author],List[Concept],List[Institution],List[Venue]]:
return self.get_entities([i.id for i in dehydrated_entities])


@backoff.on_exception(backoff.expo,
(requests.exceptions.Timeout,
Expand All @@ -102,7 +156,7 @@ def get_related_works(self, work: Work) -> List[Work]:

:parameter work is OpenAlex Work
"""
return self.get_multiple_works(work.related_works)
return self.get_entities(work.related_works)

@backoff.on_exception(backoff.expo,
(requests.exceptions.Timeout,
Expand All @@ -114,7 +168,7 @@ def get_referenced_works(self, work: Work) -> List[Work]:

:parameter work is OpenAlex Work
"""
return self.get_multiple_works(work.referenced_works)
return self.get_entities(work.referenced_works)

@backoff.on_exception(backoff.expo,
(requests.exceptions.Timeout,
Expand All @@ -132,16 +186,12 @@ def get_cited_by_works(self, work: Work, limit: int = None) -> List[Work]:
"Please be nice and supply your email as the first argument "
"when calling this class to get into the polite pool. This way "
"OpenAlex can contact you if needed.")
headers = {
"Accept": "application/json",
"User-Agent": f"OpenAlexAPI https://github.com/dpriskorn/OpenAlexAPI mailto:{self.email}"
}
per_page = 200 if limit is None else min(200, limit)
per_page = self.page_limit if limit is None else min(self.page_limit, limit)
works = []
cursor = '*'
while cursor:
url = f"{work.cited_by_api_url}&per_page={per_page}&cursor={cursor}"
response = requests.get(url, headers=headers)
response = requests.get(url, headers=self._headers)
if response.status_code == 200:
works += [Work(**w) for w in response.json()['results']]
cursor = response.json()['meta']['next_cursor']
Expand All @@ -150,3 +200,72 @@ def get_cited_by_works(self, work: Work, limit: int = None) -> List[Work]:
if limit and len(works) >= limit:
break
return works[:limit]

@backoff.on_exception(backoff.expo,
(requests.exceptions.Timeout,
requests.exceptions.ConnectionError),
max_time=60,
on_backoff=print(f"Backing off"))
def get_associated_works(self, entity: Union[Author, DehydratedAuthor, Institution, DehydratedInstitution, Concept, DehydratedConcept, Venue, DehydratedVenue], limit: int = None) -> List[Work]:
"""Fetches all works associated with the entity, up to some limit.

:parameter work is OpenAlex Institution, Venue, Author
:parameter limit is the maximum number of works to return
"""
if self.email is None:
print("OpenAlex has 2 pools for clients. "
"Please be nice and supply your email as the first argument "
"when calling this class to get into the polite pool. This way "
"OpenAlex can contact you if needed.")
per_page = self.page_limit if limit is None else min(self.page_limit, limit)
works = []
cursor = '*'
while cursor:
url = f"{self._works_urls[type(entity)]}{entity.id}&per_page={per_page}&cursor={cursor}"
response = requests.get(url, headers=self._headers)
if response.status_code == 200:
works += [Work(**w) for w in response.json()['results']]
cursor = response.json()['meta']['next_cursor']
else:
raise ValueError(f"Got {response.status_code} from OpenAlex")
if limit and len(works) >= limit:
break
return works[:limit]

@backoff.on_exception(backoff.expo,
(requests.exceptions.Timeout,
requests.exceptions.ConnectionError),
max_time=60,
on_backoff=print(f"Backing off"))
def search_entities(self, query: str, entity_type= Union["Author","Concept","Institution","Venue","Work"], limit: int = None) -> Union[List[Author],List[Concept],List[Institution],List[Venue],List[Work]]:
"""Fetches all works associated with the entity, up to some limit.

:parameter work is OpenAlex Institution, Venue, Author
:parameter limit is the maximum number of works to return
"""



if self.email is None:
print("OpenAlex has 2 pools for clients. "
"Please be nice and supply your email as the first argument "
"when calling this class to get into the polite pool. This way "
"OpenAlex can contact you if needed.")

if entity_type[0] in self._entities_prefixes: etype = self._entities_prefixes[entity_type[0]]
else: raise ValueError("Id prefix does not correspond to a valid entity.")

per_page = self.page_limit if limit is None else min(self.page_limit, limit)
entities = []
cursor = '*'
while cursor:
url = f"{self._base_url}{etype.__name__.lower()}s?search=\"{query}\"&per_page={per_page}&cursor={cursor}"
response = requests.get(url, headers=self._headers)
if response.status_code == 200:
entities += [etype(**e) for e in response.json()['results']]
cursor = response.json()['meta']['next_cursor']
else:
raise ValueError(f"Got {response.status_code} from OpenAlex")
if limit and len(entities) >= limit:
break
return entities[:limit]
31 changes: 28 additions & 3 deletions openalexapi/author.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
"""
Copyright 2022 Dennis Priskorn
"""
from typing import Optional
from typing import Optional, List

from openalexapi.basetype import OpenAlexBaseType

from openalexapi.ids import Ids
from openalexapi.institution import DehydratedInstitution
from openalexapi.year import Year
from openalexapi.concept import DehydratedConcept

class Author(OpenAlexBaseType):
display_name: Optional[str]
orcid: Optional[str]

display_name_alternatives: Optional[List[str]]
works_count: Optional[int]
cited_by_count: Optional[int]
ids: Optional[Ids]
last_known_institution: Optional[DehydratedInstitution]
counts_by_year: Optional[List[Year]]
works_api_url: Optional[str]
updated_date: Optional[str]
created_date: Optional[str]
x_concepts: Optional[List[DehydratedConcept]]

class Config:
arbitrary_types_allowed = True

Expand All @@ -20,3 +33,15 @@ def orcid_url(self):
@property
def orcid_id(self):
return self.orcid.replace("https://orcid.org/", "")

class DehydratedAuthor(OpenAlexBaseType):
display_name: Optional[str]
orcid: Optional[str]

@property
def orcid_url(self):
return self.orcid

@property
def orcid_id(self):
return self.orcid.replace("https://orcid.org/", "")
12 changes: 6 additions & 6 deletions openalexapi/authorship.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@

from pydantic import BaseModel

from openalexapi.author import Author
from openalexapi.institution import Institution

from openalexapi.author import DehydratedAuthor
from openalexapi.institution import DehydratedInstitution

class Authorship(BaseModel):
author_position: str
author: Optional[Author]
institutions: Optional[List[Institution]]
raw_affiliation_string: Optional[str]
author: Optional[DehydratedAuthor]
institutions: Optional[List[DehydratedInstitution]]
raw_affiliation_string: Optional[str]

24 changes: 23 additions & 1 deletion openalexapi/concept.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,37 @@
"""
Copyright 2022 Dennis Priskorn
"""
from typing import Optional
from typing import Optional, List

from openalexapi.basetype import OpenAlexBaseType
from openalexapi.ids import Ids
from openalexapi.year import Year


class DehydratedConcept(OpenAlexBaseType):
wikidata: Optional[str]
display_name: Optional[str]
level: Optional[int]

class Concept(OpenAlexBaseType):
wikidata: Optional[str]
display_name: Optional[str]
level: Optional[int]
score: Optional[float]
description: Optional[str]
works_count: Optional[int]
cited_by_count: Optional[int]
ids: Optional[Ids]
image_url: Optional[str]
image_thumbnail_url:Optional[str]
score: Optional[float] #used for ancestors and related concepts
#TODO: international
ancestors: Optional[List[DehydratedConcept]]
related_concepts: Optional[List[DehydratedConcept]]
counts_by_year: Optional[List[Year]]
works_api_url: Optional[str]
updated_date: Optional[str]
created_date: Optional[str]

class Config:
arbitrary_types_allowed = True
Expand All @@ -22,3 +43,4 @@ def wikidata_id(self):
@property
def wikidata_wiki_url(self):
return self.wikidata

Loading