Skip to content
This repository was archived by the owner on Nov 12, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/source/userguide/introduction.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ The main takeaway points are:
.. note::
For more info, see the specifications for `activity <http://activitystrea.ms/specs/json/1.0/#activity>`_ and `object <http://activitystrea.ms/specs/json/1.0/#object>`_.

**Sunspear** also implements parts of some extensions to the specificiations. More specifically, `Audience Targeting <http://activitystrea.ms/specs/json/targeting/1.0/>`_ and `Responses <http://activitystrea.ms/specs/json/replies/1.0/>`_.
**Sunspear** also implements parts of some extensions to the specification. More specifically, `Audience Targeting <http://activitystrea.ms/specs/json/targeting/1.0/>`_ and `Responses <http://activitystrea.ms/specs/json/replies/1.0/>`_.

What it isn't
--------------

**Sunspear** strictly deals with storage and retrival of JSON activity stream items. It does not include all adquate indexes that allow you to build a fully fledged feed system.

For indexing, you'll probably want to use something like `Sandsnake <https://github.com/numan/sandsnake>`_, a sorted index backed by `redis <http://redis.io>`_.
For indexing, you'll probably want to use something like `Sandsnake <https://github.com/numan/sandsnake>`_, a sorted index backed by `redis <http://redis.io>`_.
6 changes: 4 additions & 2 deletions sunspear/activitystreams/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@


class Model(object):

_required_fields = []
_media_fields = []
_reserved_fields = []
Expand Down Expand Up @@ -137,6 +138,7 @@ def __getitem__(self, key):


class Activity(Model):

_required_fields = ['verb', 'actor', 'object']
_media_fields = ['icon']
_reserved_fields = ['updated']
Expand All @@ -157,7 +159,7 @@ def _set_defaults(self, model_dict):

def get_parsed_sub_activity_dict(self, actor, content="", verb="reply", object_type="reply", \
collection="replies", activity_class=None, extra={}, published=None, **kwargs):
#TODO: Doesn't feel like this should be here Feels like it belongs in the backend.
# TODO: Doesn't feel like this should be here Feels like it belongs in the backend.

if published is None:
published = datetime.datetime.utcnow()
Expand Down Expand Up @@ -212,7 +214,7 @@ def get_parsed_sub_activity_dict(self, actor, content="", verb="reply", object_t
return _activity, parent_activity

def parse_data(self, data, *args, **kwargs):
#TODO Rename to jsonify_dict
# TODO Rename to jsonify_dict
_parsed_data = super(Activity, self).parse_data(data, *args, **kwargs)
for response_field in self._response_fields:
if response_field in _parsed_data:
Expand Down
6 changes: 2 additions & 4 deletions sunspear/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@


class BaseBackend(object):

def clear_all_objects(self):
"""
Clears all objects from the backend.
Expand Down Expand Up @@ -99,7 +100,7 @@ def create_activity(self, activity, **kwargs):
new_obj = self.create_obj(value)
objs_created.append(new_obj)
except Exception:
#there was an error, undo everything we just did
# there was an error, undo everything we just did
self._rollback(objs_created, objs_modified)
raise

Expand Down Expand Up @@ -227,7 +228,6 @@ def create_obj(self, obj, **kwargs):
:type obj: dict
:param obj: obj we want to store in the backend

:raises: ``SunspearDuplicateEntryException`` if the record already exists in the database.
:return: dict representing the new obj.
"""
obj_id = self._extract_id(obj)
Expand Down Expand Up @@ -400,8 +400,6 @@ def _listify(self, list_or_string):
"""
if not isinstance(list_or_string, (list, tuple, set)):
list_or_string = [list_or_string]
else:
list_or_string = list_or_string
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆


return list_or_string

Expand Down
168 changes: 106 additions & 62 deletions sunspear/backends/database/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,20 @@
"""
from __future__ import absolute_import, unicode_literals

import calendar
import copy
import datetime
import json
import uuid

import six
from dateutil import tz
from dateutil.parser import parse

from sqlalchemy import create_engine, sql
from sqlalchemy.pool import QueuePool
from sunspear.activitystreams.models import Activity, Model, Object
from sunspear.backends.base import SUB_ACTIVITY_MAP, BaseBackend
from sunspear.exceptions import (SunspearDuplicateEntryException,
SunspearOperationNotSupportedException,
SunspearValidationException)
from sunspear.backends.base import BaseBackend
from sunspear.exceptions import SunspearOperationNotSupportedException


from . import schema

Expand All @@ -48,12 +46,12 @@
DB_ACTIVITY_FIELD_MAPPING = {
'id': 'id',
'verb': 'verb',
'actor': 'actor',
'object': 'object',
'target': 'target',
'author': 'author',
'generator': 'generator',
'provider': 'provider',
'actor': 'actor_id',
'object': 'object_id',
'target': 'target_id',
'author': 'author_id',
'generator': 'generator_id',
'provider': 'provider_id',
'content': 'content',
'published': 'published',
'updated': 'updated',
Expand All @@ -65,10 +63,14 @@

class DatabaseBackend(BaseBackend):

def __init__(self, db_connection_string=None, verbose=False, poolsize=10,
max_overflow=5, **kwargs):
self._engine = create_engine(db_connection_string, echo=verbose, poolclass=QueuePool,
pool_size=poolsize, max_overflow=max_overflow, convert_unicode=True)
def __init__(self, db_connection_string=None, verbose=False, poolsize=10, max_overflow=5, **kwargs):
self._engine = create_engine(
db_connection_string,
echo=verbose,
poolclass=QueuePool,
pool_size=poolsize,
max_overflow=max_overflow,
convert_unicode=True)

@property
def engine(self):
Expand All @@ -90,6 +92,22 @@ def likes_table(self):
def replies_table(self):
return schema.tables['replies']

@property
def cc_table(self):
return schema.tables['cc']

@property
def bcc_table(self):
return schema.tables['bcc']

@property
def to_table(self):
return schema.tables['to']

@property
def bto_table(self):
return schema.tables['bto']

def _get_connection(self):
return self.engine.connect()

Expand All @@ -113,7 +131,10 @@ def obj_create(self, obj, **kwargs):
obj_dict = self._get_parsed_and_validated_obj_dict(obj)
obj_db_schema_dict = self._obj_dict_to_db_schema(obj_dict)

self.engine.execute(self.objects_table.insert(), [obj_db_schema_dict])
if self.obj_exists(obj):
self.obj_update(obj)
else:
self.engine.execute(self.objects_table.insert(), [obj_db_schema_dict]).close()

return obj_dict

Expand Down Expand Up @@ -158,22 +179,38 @@ def activity_exists(self, activity, **kwargs):

return self.engine.execute(sql.select([sql.exists().where(activities_db_table.c.id == activity_id)])).scalar()

def activity_create(self, activity, **kwargs):
def activity_create(self, activity_dict, **kwargs):
"""
Creates an activity. This assumes the activity is already dehydrated (ie has refrences
Creates an activity. This assumes the activity is already dehydrated (ie has references
to the objects and not the actual objects itself)
"""
activity = Activity(activity, backend=self)
activity = Activity(activity_dict, backend=self)

activity.validate()
activity_dict = activity.get_parsed_dict()

activity_db_schema_dict = self._activity_dict_to_db_schema(activity_dict)

self.engine.execute(self.activities_table.insert(), [activity_db_schema_dict])
self.engine.execute(self.activities_table.insert(), [activity_db_schema_dict]).close()

obj_id = activity_dict['object']
activity_id = activity_dict['id']
for audience_field in ['cc', 'bcc', 'to', 'bto']:
if audience_field in activity_dict:
table = schema.tables[audience_field]
self.engine.execute(table.insert(), dict(
obj_id=obj_id,
activity_id=activity_id
)).close()

return self.get_activity(activity_dict)

def activity_delete(self, activity, **kwargs):
activity_id = activity['id']
statement = self.activities_table.delete().where(
self.activities_table.c.id == activity_id)
self.engine.execute(statement)

def _extract_activity_obj_key(self, obj_or_value):
activity_obj = None

Expand Down Expand Up @@ -216,44 +253,60 @@ def create_activity(self, activity, **kwargs):
ids_of_objs_with_no_dict.append(activity_obj_id)
activity[key] = activity_audience_targeting_objs

# For all of the objects in the activity, find out which ones actually already have existing
# objects in the database
obj_ids = self._flatten([ids_of_objs_with_no_dict, activity_objs.keys()])
for obj_id, obj in activity_objs.items():
self.obj_create(obj)

s = self._get_select_multiple_objects_query(obj_ids)
results = self.engine.execute(s).fetchall()
results = self._flatten(results)
return self.activity_create(activity, **kwargs)

objs_need_to_be_inserted = []
objs_need_to_be_updated = []
def activity_get(self,
activity_ids,
raw_filters=None,
filters="",
include_public=False,
audience_targeting=None,
aggregation_pipeline=None,
**kwargs):

if filters is None:
filters = {}
if audience_targeting is None:
audience_targeting = {}
if aggregation_pipeline is None:
aggregation_pipeline = []
activity_ids = self._listify(activity_ids) # TODO: likely don't need to listify here.

assert len(audience_targeting) == 1 or len(audience_targeting) == 0, "I can't be wrong about this. I hope"

"""
audience_activity_ids = None
for audience_type, object_id in audience_targeting.items():
# audience_table = schema.tables[audience_type]
# audience_query = sql.select([audience_table.c.activity_id]).where(
# audience_table.c.obj_id == object_id)

audience_activity_ids = self.engine.execute(sql.select([self.activities_table.c.id]).where(
self.activities_table.c.actor_id.in_(audience_targeting[audience_type])
)).fetchall()

# audience_activities_query = sql.select(['*']).where(self.activities_table.c.id.in_(audience_query))
# result_proxy = self.engine.execute(audience_activities_query)
"""
# if audience_activity_ids is not None and not include_public: # only filter then??
# activity_ids = list(set(audience_activity_ids) & (set(activity_ids)))
self.filter_activities_by_audience(activity_ids, audience_targeting)

for obj_id, obj in activity_objs.items():
parsed_validated_schema_dict = self._get_parsed_and_validated_obj_dict(obj)
parsed_validated_schema_dict = self._obj_dict_to_db_schema(parsed_validated_schema_dict)
if obj_id not in results:
objs_need_to_be_inserted.append(parsed_validated_schema_dict)
else:
objs_need_to_be_updated.append(parsed_validated_schema_dict)

# Upsert all objects for the activity
with self.engine.begin() as connection:
if objs_need_to_be_inserted:
connection.execute(self.objects_table.insert(), objs_need_to_be_inserted)
for obj in objs_need_to_be_updated:
connection.execute(
self.objects_table.update().where(self.objects_table.c.id == self._extract_id(obj)).values(**obj))

return_val = self.activity_create(activity, **kwargs)

return return_val

def activity_get(self, activity_ids, **kwargs):
activity_ids = self._listify(activity_ids)
activities = self._get_raw_activities(activity_ids, **kwargs)
activities = self.hydrate_activities(activities)

return activities

def filter_activities_by_audience(self, activity_ids, audience_targeting):
# s = sql.select(['*']).where(self.objects_table.c.id.in_(obj_ids))
# TODO: bcc, bto, etc..
cc_query = sql.select(['*']).where(self.cc_table.c.activity_id.in_(activity_ids))
res = self.engine.execute(cc_query).fetchall()
return None

def sub_activity_create(self, activity, actor, content, extra={}, sub_activity_verb="", published=None, **kwargs):
object_type = kwargs.get('object_type', sub_activity_verb)
sub_activity_model = self.get_sub_activity_model(sub_activity_verb)
Expand Down Expand Up @@ -295,21 +348,12 @@ def hydrate_activities(self, activities):

# replace the object ids with the hydrated objects
for activity in activities:
activity = self._dehydrate_object_keys(activity, objects_dict)
self._dehydrate_object_keys(activity, objects_dict)

return activities

def get_new_id(self):
"""
Generates a new unique ID. The default implementation uses uuid1 to
generate a unique ID.

:return: a new id
"""
return uuid.uuid1().hex

def _get_raw_activities(self, activity_ids, **kwargs):
activity_ids = map(self._extract_id, activity_ids)
activity_ids = map(self._extract_id, activity_ids) # Likely don't need to do this
if not activity_ids:
return []

Expand Down
Loading