Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.0+2025.10.28T13.41.31.134Z.87861f3a.master
0.1.0+2025.11.03T14.41.21.167Z.3ff5ed7a.berickson.user.presets
2 changes: 1 addition & 1 deletion learning_observer/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.0+2025.10.14T13.37.19.878Z.ec3193cb.master
0.1.0+2025.11.03T14.41.21.167Z.3ff5ed7a.berickson.user.presets
18 changes: 14 additions & 4 deletions learning_observer/learning_observer/dash_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
import dash
from dash import Dash, html, clientside_callback, callback, Output, Input
import dash_bootstrap_components as dbc
from flask import request
import flask

from lo_dash_react_components import LOConnection

import learning_observer.auth
import learning_observer.constants
import learning_observer.prestartup
import learning_observer.paths
Expand Down Expand Up @@ -72,6 +73,14 @@ def static_url(filename):
return f"/static/{filename}"


def get_aiohttp_request_from_dash():
return flask.request.environ['aiohttp.request']


async def get_active_user_from_dash():
return await learning_observer.auth.get_active_user(get_aiohttp_request_from_dash())


test_layout = html.Div(children=[
html.H1(children='Test Case for Dash'),
LOConnection(
Expand Down Expand Up @@ -208,7 +217,8 @@ def load_dash_pages():
external_scripts=all_dash_resources('SCRIPTS'),
assets_folder=compile_dash_assets(),
assets_url_path='dash/assets',
update_title=None
update_title=None,
use_async=True
)

app.layout = html.Div([html.Div(id=impersonation_header_id), dash.page_container])
Expand Down Expand Up @@ -243,7 +253,7 @@ def load_dash_pages():
Output(impersonation_header_id, 'children'),
Input(impersonation_header_id, 'id') # triggers on page load
)
def update_impersonation_header(id):
async def update_impersonation_header(id):
'''Add impersonation header if we are impersonating
a user. This includes a 'Stop' button

Expand All @@ -254,7 +264,7 @@ def update_impersonation_header(id):
This does not feel like the optimal workflow for this, but it
does achieve the goal of wrapping a page in a header.
'''
session = asyncio.run(aiohttp_session.get_session(request.environ['aiohttp.request']))
session = await aiohttp_session.get_session(get_aiohttp_request_from_dash())
if learning_observer.constants.IMPERSONATING_AS in session:
return html.Div([
# TODO clean up text for who we are impersonating
Expand Down
101 changes: 93 additions & 8 deletions learning_observer/learning_observer/redis_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
the library.
'''

import asyncio
import functools

import pmss
import redis.asyncio
import redis.exceptions

import learning_observer.settings
from learning_observer.log_event import debug_log
Expand All @@ -35,21 +39,68 @@


REDIS_CONNECTION = None
_RECONNECT_LOCK = asyncio.Lock()


def _new_client():
"""
Create a new redis.asyncio client with current settings.
NOTE: We do not ping here to avoid extra round trips under heavy load.
"""
return redis.asyncio.Redis(
host=learning_observer.settings.pmss_settings.redis_host(types=['redis_connection']),
port=learning_observer.settings.pmss_settings.redis_port(types=['redis_connection']),
# TODO figure out how to properly use None from pmss
# password=learning_observer.settings.pmss_settings.redis_password(types=['redis_connection'])
)


async def _close_client(client):
"""
Best-effort, version-tolerant close that avoids raising on shutdown.
"""
if client is None:
return
try:
# redis-py 5.x provides aclose(); earlier versions may have async close() or sync close()
if hasattr(client, "aclose"):
await client.aclose()
elif hasattr(client, "close"):
maybe = client.close()
if asyncio.iscoroutine(maybe):
await maybe
# Also ensure underlying pool is dropped
if hasattr(client, "connection_pool") and client.connection_pool is not None:
try:
await client.connection_pool.disconnect(inuse_connections=True)
except TypeError:
# older versions are sync
client.connection_pool.disconnect()
except Exception:
# Don't let shutdown errors cascade
pass


async def _recreate_connection():
"""
Recreate the global client safely (once) and atomically.
"""
global REDIS_CONNECTION
async with _RECONNECT_LOCK:
old = REDIS_CONNECTION
# Another task may have already recreated it while we were waiting
REDIS_CONNECTION = _new_client()
# Close the old client outside the critical path (best effort)
await _close_client(old)


async def connect():
'''
Connect to redis
Initialize the global Redis client if needed (no ping).
'''
global REDIS_CONNECTION
if REDIS_CONNECTION is None:
REDIS_CONNECTION = redis.asyncio.Redis(
host=learning_observer.settings.pmss_settings.redis_host(types=['redis_connection']),
port=learning_observer.settings.pmss_settings.redis_port(types=['redis_connection']),
# TODO figure out how to properly use None from pmss
# password=learning_observer.settings.pmss_settings.redis_password(types=['redis_connection'])
)
await REDIS_CONNECTION.ping()
REDIS_CONNECTION = _new_client()


async def connection():
Expand All @@ -63,6 +114,36 @@ async def connection():
return REDIS_CONNECTION


_RETRY_EXC = (
redis.exceptions.ConnectionError, redis.exceptions.TimeoutError,
ConnectionError, TimeoutError,
RuntimeError
)


def _with_reconnect(fn):
"""
Decorator for async Redis ops:
- Try once.
- On connection/timeout/runtime failures: recreate client and retry once.
- No extra PINGs; only recreates on failure.
"""
@functools.wraps(fn)
async def wrapper(*args, **kwargs):
try:
return await fn(*args, **kwargs)
except _RETRY_EXC as e:
# Log at debug level to avoid noise under high throughput
try:
debug_log(f"Redis op '{fn.__name__}' failed ({type(e).__name__}); recreating client and retrying once.")
except Exception:
pass
await _recreate_connection()
return await fn(*args, **kwargs)
return wrapper


@_with_reconnect
async def keys():
'''
Return all the keys in the database. This might take a while on production
Expand All @@ -71,27 +152,31 @@ async def keys():
return [key.decode('utf-8') for key in await (await connection()).keys()]


@_with_reconnect
async def get(key):
'''
Get a key. Returns a future.
'''
return await (await connection()).get(key)


@_with_reconnect
async def mget(keys):
'''
Get mutliple keys. Returns a future.
'''
return await (await connection()).mget(keys)


@_with_reconnect
async def set(key, value, expiry=None):
'''
Set a key. We should eventually do multi-sets. Returns a future.
'''
return await (await connection()).set(key, value, expiry)


@_with_reconnect
async def delete(key):
'''
Delete a key. Returns a future.
Expand Down
2 changes: 1 addition & 1 deletion modules/wo_classroom_text_highlighter/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.0+2025.09.30T15.53.18.556Z.1c72d9e1.master
0.1.0+2025.11.03T14.41.21.167Z.3ff5ed7a.berickson.user.presets
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@
presets the user wants displayed.
TODO create a react component that does this
'''
import asyncio
from dash import html, dcc, clientside_callback, callback, Output, Input, State, ALL, exceptions, Patch, ctx
import dash_bootstrap_components as dbc

import learning_observer.constants as c
import learning_observer.dash_integration
import learning_observer.kvs
import learning_observer.stream_analytics.helpers as sa_helpers

import wo_classroom_text_highlighter.options

_prefix = 'option-preset'
Expand All @@ -16,8 +22,36 @@
_set_item = f'{_prefix}-set-item'
_remove_item = f'{_prefix}-remove-item'

# TODO many dashboards will likely use some form of presets tied to the user
# this ought to be generic enough that we can easily add it to each dashboard
def preset_components():
pass


async def get_preset_components():
return wo_classroom_text_highlighter.options.PRESETS
current_user = await learning_observer.dash_integration.get_active_user_from_dash()
if c.USER_ID not in current_user or not current_user[c.USER_ID]:
raise KeyError(f'User id not found in active user')
key = _make_key(current_user[c.USER_ID])
kvs = learning_observer.kvs.KVS()
presets = await kvs[key]
if presets is None:
presets = wo_classroom_text_highlighter.options.PRESETS
await kvs.set(key, presets)
return presets


def _make_key(user_id):
return sa_helpers.make_key(
preset_components,
{sa_helpers.KeyField.STUDENT: user_id},
sa_helpers.KeyStateType.INTERNAL
)


def create_layout():
async def create_layout():
kvs_presets = await get_preset_components()
add_preset = dbc.InputGroup([
dbc.Input(id=_add_input, placeholder='Preset name', type='text', value=''),
dbc.InputGroupText(html.I(className='fas fa-circle-question'), id=_add_help),
Expand All @@ -33,9 +67,7 @@ def create_layout():
return html.Div([
add_preset,
html.Div(id=_tray),
# TODO we ought to store the presets on the server instead of browser storage
# TODO we need to migrate the old options to new ones
dcc.Store(id=_store, data=wo_classroom_text_highlighter.options.PRESETS, storage_type='local')
dcc.Store(id=_store, data=kvs_presets, storage_type='local')
], id=_prefix)


Expand Down Expand Up @@ -88,6 +120,7 @@ def create_tray_items_from_store(ts, data):
return [html.Div(create_tray_item(preset), className='d-inline-block me-1 mb-1') for preset in reversed(data.keys())]


# TODO we should fetch from redis and then reset in redis
@callback(
Output(_store, 'data', allow_duplicate=True),
Input({'type': _remove_item, 'index': ALL}, 'submit_n_clicks'),
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ asyncio_redis # pubsub
asyncpg # used in prototypes
cookiecutter
cryptography
dash
dash[async]
dash_renderjson
docopt
dash-bootstrap-components
Expand Down
Loading