diff --git a/.gitignore b/.gitignore
index aaef126..8d26df3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -120,4 +120,8 @@ test_suite.tar.gz
invent.min.tar.gz
static/default.css
static/default.min.css
-static/*.zip
\ No newline at end of file
+static/*.zip
+
+# All temp files
+/temp
+temp/
\ No newline at end of file
diff --git a/examples/tests/chart/chart_helper.py b/examples/tests/chart/chart_helper.py
new file mode 100644
index 0000000..a0a8dc7
--- /dev/null
+++ b/examples/tests/chart/chart_helper.py
@@ -0,0 +1,10 @@
+def run(chart_type, data, options, plugin_code):
+ namespace = {
+ "chart_type": chart_type,
+ "data": data,
+ "options": options,
+ }
+ exec(plugin_code, namespace, namespace)
+ if "result" not in namespace:
+ raise ValueError("Plugin code must assign to `result`.")
+ return namespace["result"]
diff --git a/examples/tests/chart/config.json b/examples/tests/chart/config.json
new file mode 100644
index 0000000..0870c7b
--- /dev/null
+++ b/examples/tests/chart/config.json
@@ -0,0 +1,6 @@
+{
+ "files": {
+ "/static/invent.min.tar.gz": "./*",
+ "./chart_helper.py": ""
+ }
+}
\ No newline at end of file
diff --git a/examples/tests/chart/index.html b/examples/tests/chart/index.html
new file mode 100644
index 0000000..cbb8f02
--- /dev/null
+++ b/examples/tests/chart/index.html
@@ -0,0 +1,23 @@
+
+
+
+
+
+ Chart Donkey Interactive Test
+
+
+
+
+
+
+
+
+
+
+
diff --git a/examples/tests/chart/main.py b/examples/tests/chart/main.py
new file mode 100644
index 0000000..93e6a6d
--- /dev/null
+++ b/examples/tests/chart/main.py
@@ -0,0 +1,159 @@
+import invent
+import sys
+import os
+from pathlib import Path
+from invent.tools import make_helper
+from invent.ui import *
+
+from invent.tools.common_ui import (
+ back_link_widget,
+ fail_html,
+ make_status_setter,
+ pass_html,
+ publish_run,
+ wait_html,
+)
+
+await invent.setup()
+
+
+chart = Chart(
+ chart_type="bar",
+ data={
+ "labels": ["A", "B", "C"],
+ "datasets": [
+ {
+ "label": "Values",
+ "data": [3, 5, 2],
+ "backgroundColor": "rgba(54, 162, 235, 0.5)",
+ }
+ ],
+ },
+ options={"plugins": {"legend": {"display": True}}},
+)
+
+status_label = Label(text="Donkey starting...")
+_set_chart_status = make_status_setter(invent, status_label, channel="chart")
+
+default_code = (
+ "# Inputs: chart_type, data, options\n"
+ "# Assign a dict to result with optional data/options keys.\n"
+ "new_data = dict(data)\n"
+ "datasets = [dict(ds) for ds in data.get('datasets', [])]\n"
+ "if datasets:\n"
+ " first = dict(datasets[0])\n"
+ " values = list(first.get('data', []))\n"
+ " first['data'] = [value * 2 for value in values]\n"
+ " first['label'] = 'Values x2'\n"
+ " datasets[0] = first\n"
+ "new_data['datasets'] = datasets\n"
+ "result = {'data': new_data}\n"
+)
+
+code_editor = CodeEditor(
+ code=default_code,
+ language="python",
+ min_height="260px",
+)
+
+assert_worker = Html(html=wait_html("Worker not started."))
+assert_run = Html(html=wait_html("Code not run."))
+
+HELPER_CHANNEL = "chart-helper"
+
+make_helper(src="chart_helper.py", channel=HELPER_CHANNEL)
+
+
+def handle_helper_status(msg):
+ state = getattr(msg, "state", None)
+ detail = getattr(msg, "detail", None)
+ if state == "starting":
+ _set_chart_status("Starting donkey worker...")
+ elif state == "ready":
+ _set_chart_status("Donkey ready. Press Run Code.")
+ assert_worker.html = pass_html("Donkey worker ready.")
+ elif state == "busy":
+ _set_chart_status("Running code...")
+ elif state == "error":
+ _set_chart_status(f"Failed to start donkey: {detail}")
+ assert_worker.html = fail_html(f"Donkey worker failed: {detail}")
+
+
+def handle_helper_result(msg):
+ if msg.function != "run":
+ return
+ if msg.error:
+ _set_chart_status(f"Worker error: {msg.error}")
+ assert_run.html = fail_html(f"Code run failed: {msg.error}")
+ return
+ payload = msg.result
+ if not isinstance(payload, dict):
+ assert_run.html = fail_html("Result must be a dict.")
+ _set_chart_status("Invalid chart result.")
+ return
+ if "data" in payload:
+ chart.data = payload["data"]
+ if "options" in payload:
+ chart.options = payload["options"]
+ assert_run.html = pass_html("Code run succeeded.")
+ _set_chart_status("Done. Chart updated from donkey result.")
+
+
+invent.subscribe(handle_helper_status, HELPER_CHANNEL, "status")
+invent.subscribe(handle_helper_result, HELPER_CHANNEL, "result")
+
+
+async def handle_controls(message):
+ if getattr(message.source, "name", "") != "run_chart_code":
+ return
+ publish_run(
+ invent,
+ channel=HELPER_CHANNEL,
+ function="run",
+ args=[
+ chart.chart_type,
+ chart.data,
+ chart.options,
+ code_editor.code or "",
+ ],
+ )
+
+
+invent.subscribe(
+ handle_controls,
+ to_channel="chart-controls",
+ when_subject=["press"],
+)
+
+app = invent.App(
+ name="Chart Donkey Interactive Test",
+ pages=[
+ Page(
+ id="chart-donkey-test",
+ children=[
+ back_link_widget(),
+ Label(text="# Chart Donkey Interactive Test"),
+ Label(
+ text=(
+ "Run Python code in a donkey worker to transform "
+ "chart data and apply the result back to the "
+ "widget."
+ )
+ ),
+ chart,
+ Button(
+ text="Run Code",
+ name="run_chart_code",
+ channel="chart-controls",
+ ),
+ code_editor,
+ status_label,
+ Label(text="## Assertions"),
+ assert_worker,
+ assert_run,
+ ],
+ ),
+ ],
+)
+
+invent.go()
diff --git a/examples/tests/codeeditor/codeeditor_helper.py b/examples/tests/codeeditor/codeeditor_helper.py
new file mode 100644
index 0000000..d0adf31
--- /dev/null
+++ b/examples/tests/codeeditor/codeeditor_helper.py
@@ -0,0 +1,6 @@
+def run(editor_code, plugin_code):
+ namespace = {"editor_code": editor_code}
+ exec(plugin_code, namespace, namespace)
+ if "result" not in namespace:
+ raise ValueError("Plugin code must assign to `result`.")
+ return namespace["result"]
diff --git a/examples/tests/codeeditor/config.json b/examples/tests/codeeditor/config.json
new file mode 100644
index 0000000..a6bf99c
--- /dev/null
+++ b/examples/tests/codeeditor/config.json
@@ -0,0 +1,6 @@
+{
+ "files": {
+ "/static/invent.min.tar.gz": "./*",
+ "./codeeditor_helper.py": ""
+ }
+}
diff --git a/examples/tests/codeeditor/index.html b/examples/tests/codeeditor/index.html
new file mode 100644
index 0000000..61cffdf
--- /dev/null
+++ b/examples/tests/codeeditor/index.html
@@ -0,0 +1,23 @@
+
+
+
+
+
+ CodeEditor Donkey Interactive Test
+
+
+
+
+
+
+
+
+
+
+
diff --git a/examples/tests/codeeditor/main.py b/examples/tests/codeeditor/main.py
new file mode 100644
index 0000000..7c300e5
--- /dev/null
+++ b/examples/tests/codeeditor/main.py
@@ -0,0 +1,152 @@
+import invent
+import sys
+import os
+from pathlib import Path
+from invent.tools import make_helper
+from invent.ui import *
+
+from invent.tools.common_ui import (
+ back_link_widget,
+ fail_html,
+ make_status_setter,
+ pass_html,
+ publish_run,
+ wait_html,
+)
+
+await invent.setup()
+
+
+source_editor = CodeEditor(
+ code=(
+ "Invent donkey plugins are composable.\n"
+ "This text comes from the source editor."
+ ),
+ language="python",
+ min_height="120px",
+)
+
+plugin_editor = CodeEditor(
+ code=(
+ "# Available variable: editor_code\n"
+ "lines = editor_code.splitlines()\n"
+ "summary = {\n"
+ " 'line_count': len(lines),\n"
+ " 'char_count': len(editor_code),\n"
+ " 'preview': lines[0] if lines else '',\n"
+ "}\n"
+ "result = {'output': str(summary)}\n"
+ ),
+ language="python",
+ min_height="260px",
+)
+
+output_label = Label(text="Output appears here after run.")
+status_label = Label(text="Donkey starting...")
+_set_codeeditor_status = make_status_setter(
+ invent, status_label, channel="codeeditor"
+)
+
+assert_worker = Html(html=wait_html("Worker not started."))
+assert_run = Html(html=wait_html("Code not run."))
+
+HELPER_CHANNEL = "codeeditor-helper"
+
+make_helper(src="codeeditor_helper.py", channel=HELPER_CHANNEL)
+
+
+def handle_helper_status(msg):
+ state = getattr(msg, "state", None)
+ detail = getattr(msg, "detail", None)
+ if state == "starting":
+ _set_codeeditor_status("Starting donkey worker...")
+ elif state == "ready":
+ _set_codeeditor_status("Donkey ready. Press Run Code.")
+ assert_worker.html = pass_html("Donkey worker ready.")
+ elif state == "busy":
+ _set_codeeditor_status("Running plugin...")
+ elif state == "error":
+ _set_codeeditor_status(f"Failed to start donkey: {detail}")
+ assert_worker.html = fail_html(f"Donkey worker failed: {detail}")
+
+
+def handle_helper_result(msg):
+ if msg.function != "run":
+ return
+ if msg.error:
+ _set_codeeditor_status(f"Worker error: {msg.error}")
+ assert_run.html = fail_html(f"Code run failed: {msg.error}")
+ return
+ payload = msg.result
+ if not isinstance(payload, dict):
+ assert_run.html = fail_html("Result must be a dict.")
+ return
+ out = payload.get("output")
+ if out is None:
+ assert_run.html = fail_html("Result must include `output`.")
+ return
+ output_label.text = str(out)
+ assert_run.html = pass_html("Code run succeeded.")
+ _set_codeeditor_status("Done. Plugin updated output label.")
+
+
+invent.subscribe(handle_helper_status, HELPER_CHANNEL, "status")
+invent.subscribe(handle_helper_result, HELPER_CHANNEL, "result")
+
+
+async def handle_controls(message):
+ if getattr(message.source, "name", "") != "run_codeeditor_plugin":
+ return
+ publish_run(
+ invent,
+ channel=HELPER_CHANNEL,
+ function="run",
+ args=[
+ source_editor.code or "",
+ plugin_editor.code or "",
+ ],
+ )
+
+
+invent.subscribe(
+ handle_controls,
+ to_channel="codeeditor-controls",
+ when_subject=["press"],
+)
+
+app = invent.App(
+ name="CodeEditor Donkey Interactive Test",
+ pages=[
+ Page(
+ id="codeeditor-donkey-test",
+ children=[
+ back_link_widget(),
+ Label(text="# CodeEditor Donkey Interactive Test"),
+ Label(
+ text=(
+ "Run plugin code in a donkey worker. The plugin "
+ "reads source editor text via context and writes "
+ "an output message."
+ )
+ ),
+ Label(text="## Source Editor (context input)"),
+ source_editor,
+ Label(text="## Plugin Code"),
+ plugin_editor,
+ Button(
+ text="Run Code",
+ name="run_codeeditor_plugin",
+ channel="codeeditor-controls",
+ ),
+ Label(text="## Output"),
+ output_label,
+ status_label,
+ Label(text="## Assertions"),
+ assert_worker,
+ assert_run,
+ ],
+ ),
+ ],
+)
+
+invent.go()
diff --git a/examples/tests/webcam/config.json b/examples/tests/webcam/config.json
new file mode 100644
index 0000000..7068849
--- /dev/null
+++ b/examples/tests/webcam/config.json
@@ -0,0 +1,6 @@
+{
+ "files": {
+ "/static/invent.min.tar.gz": "./*",
+ "./opencv_helper.py": ""
+ }
+}
\ No newline at end of file
diff --git a/examples/tests/webcam/index.html b/examples/tests/webcam/index.html
new file mode 100644
index 0000000..d123bd5
--- /dev/null
+++ b/examples/tests/webcam/index.html
@@ -0,0 +1,59 @@
+
+
+
+
+
+ Test Card
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/examples/tests/webcam/main.py b/examples/tests/webcam/main.py
new file mode 100644
index 0000000..2050693
--- /dev/null
+++ b/examples/tests/webcam/main.py
@@ -0,0 +1,171 @@
+import invent
+import sys
+import os
+from pathlib import Path
+from invent.tools import make_helper
+from invent.ui import *
+
+from invent.tools.common_ui import (
+ fail_html,
+ make_status_setter,
+ pass_html,
+ publish_run,
+ wait_html,
+)
+
+await invent.setup()
+
+preview_webcam = Webcam(
+ photo_output="download",
+)
+
+opencv_webcam = Webcam(
+ photo_output="preview",
+ preview_layout="side-by-side",
+ mode="photo",
+)
+
+opencv_status = Label(text="Donkey starting...")
+_set_opencv_status = make_status_setter(
+ invent, opencv_status, channel="opencv"
+)
+
+
+default_code = (
+ "# Available variables: image_bgr, image_rgb, grey, cv2, np\n"
+ "# Set result_image (or result) to a numpy ndarray.\n"
+ "# Example: start with the current frame and modify it however you like.\n\n"
+ "result_image = image_bgr.copy()\n"
+)
+
+opencv_code_editor = CodeEditor(
+ code=default_code,
+ language="python",
+ min_height="280px",
+)
+
+assert_worker = Html(html=wait_html("Worker not started."))
+assert_run = Html(html=wait_html("Code not run."))
+
+HELPER_CHANNEL = "opencv-helper"
+
+make_helper(
+ src="opencv_helper.py",
+ channel=HELPER_CHANNEL,
+ config={"packages": ["opencv-python", "numpy"]},
+)
+
+
+def handle_helper_status(msg):
+ state = getattr(msg, "state", None)
+ detail = getattr(msg, "detail", None)
+ if state == "starting":
+ _set_opencv_status("Starting Donkey worker...")
+ elif state == "ready":
+ _set_opencv_status("Donkey ready. Capture a photo and run your code.")
+ assert_worker.html = pass_html("Donkey worker ready.")
+ elif state == "busy":
+ _set_opencv_status("Running code...")
+ elif state == "error":
+ _set_opencv_status(f"Failed to start donkey worker: {detail}")
+ assert_worker.html = fail_html(f"Donkey worker failed: {detail}")
+
+
+def handle_helper_result(msg):
+ if msg.function != "process":
+ return
+ if msg.error:
+ _set_opencv_status(f"Worker error: {msg.error}")
+ assert_run.html = fail_html(f"Code run failed: {msg.error}")
+ return
+ result = msg.result
+ getter = getattr(result, "get", None)
+ if callable(getter) and getter("ok"):
+ processed = getter("data_url")
+ if processed:
+ opencv_webcam.show_image(processed)
+ assert_run.html = pass_html("Code run succeeded.")
+ _set_opencv_status("Done. Custom OpenCV code executed.")
+ return
+ assert_run.html = fail_html("Worker result did not include ok/data_url.")
+ _set_opencv_status(
+ f"Worker returned no displayable result ({type(result).__name__})."
+ )
+
+
+invent.subscribe(handle_helper_status, HELPER_CHANNEL, "status")
+invent.subscribe(handle_helper_result, HELPER_CHANNEL, "result")
+
+
+def _latest_capture_data_url():
+ capture = opencv_webcam.latest_capture(media_type="photo")
+ if capture is None:
+ return None
+ return capture.get("data_url")
+
+
+async def handle_opencv_controls(message):
+ button_name = getattr(message.source, "name", "")
+
+ if button_name != "run_code_button":
+ return
+
+ data_url = _latest_capture_data_url()
+ if not data_url:
+ _set_opencv_status("Capture a photo first, then run an action.")
+ return
+
+ code = opencv_code_editor.code or ""
+ if not code.strip():
+ _set_opencv_status("Write some OpenCV code first.")
+ return
+
+ publish_run(
+ invent,
+ channel=HELPER_CHANNEL,
+ function="process",
+ args=[code, data_url],
+ )
+
+
+invent.subscribe(
+ handle_opencv_controls,
+ to_channel="opencv-controls",
+ when_subject=["press"],
+)
+
+app = invent.App(
+ name="Theme Testcard",
+ pages=[
+ Page(
+ id="testcard",
+ children=[
+ Label(text="# Invent Test Card"),
+ Label(
+ text="This is a test card for the Invent framework. It includes all the different widgets and components in the framework, so that we can see how they look with different themes applied."
+ ),
+ Label(text="## Standard webcam"),
+ preview_webcam,
+ Label(text="## OpenCV webcam playground"),
+ Label(
+ text=(
+ "Take a photo, write your OpenCV code, and then press **Run Code**."
+ )
+ ),
+ opencv_webcam,
+ Button(
+ text="Run Code",
+ name="run_code_button",
+ channel="opencv-controls",
+ ),
+ opencv_code_editor,
+ opencv_status,
+ Label(text="## Assertions"),
+ assert_worker,
+ assert_run,
+ ],
+ ),
+ ],
+)
+
+invent.go()
diff --git a/examples/tests/webcam/opencv_helper.py b/examples/tests/webcam/opencv_helper.py
new file mode 100644
index 0000000..06fc94d
--- /dev/null
+++ b/examples/tests/webcam/opencv_helper.py
@@ -0,0 +1,65 @@
+import base64
+import cv2
+import numpy as np
+
+
+def _decode_data_url(data_url):
+ if not data_url or "," not in data_url:
+ raise ValueError("Expected an image data URL")
+ payload = data_url.split(",", 1)[1]
+ binary = base64.b64decode(payload)
+ buf = np.frombuffer(binary, dtype=np.uint8)
+ image = cv2.imdecode(buf, cv2.IMREAD_COLOR)
+ if image is None:
+ raise ValueError("Could not decode input image")
+ return image
+
+
+def _encode_png_data_url(image):
+ ok, encoded = cv2.imencode(".png", image)
+ if not ok:
+ raise ValueError("Could not encode processed image")
+ payload = base64.b64encode(encoded.tobytes()).decode("ascii")
+ return "data:image/png;base64," + payload
+
+
+def process(user_code, data_url):
+ image_bgr = _decode_data_url(data_url)
+ image_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)
+ grey = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY)
+
+ namespace = {
+ "cv2": cv2,
+ "np": np,
+ "image_bgr": image_bgr,
+ "image_rgb": image_rgb,
+ "grey": grey,
+ "result_image": None,
+ "result": None,
+ }
+
+ exec(user_code, namespace, namespace)
+
+ result = namespace.get("result_image")
+ if result is None:
+ result = namespace.get("result")
+ if result is None:
+ result = image_bgr
+
+ if not isinstance(result, np.ndarray):
+ raise ValueError(
+ "Your code must assign a numpy ndarray to result_image or result"
+ )
+
+ if result.ndim == 2:
+ result = cv2.cvtColor(result, cv2.COLOR_GRAY2BGR)
+ elif result.ndim == 3 and result.shape[2] == 3:
+ pass
+ else:
+ raise ValueError("Unsupported result_image shape")
+
+ return {
+ "ok": True,
+ "kind": "user_code",
+ "data_url": _encode_png_data_url(result),
+ }
diff --git a/src/invent/themes/default.css b/src/invent/themes/default.css
index bb2c709..e325def 100644
--- a/src/invent/themes/default.css
+++ b/src/invent/themes/default.css
@@ -2699,7 +2699,7 @@ figure.invent-avatar:focus-visible {
.invent-webcam-shutter-container,
.shutter-container {
justify-self: center;
- text-align: center;
+ text-align: center;
}
.invent-webcam-mode-btn,
@@ -2773,6 +2773,73 @@ figure.invent-avatar:focus-visible {
z-index: 1;
}
+/* Media row: transparent by default so video box stacks normally in the
+ * flex-column webcam container. Activated side-by-side with modifier class. */
+.invent-webcam-media-row {
+ display: contents;
+}
+
+/* Capture preview in stacked mode: full-width img below the video. */
+.invent-webcam-preview-col {
+ width: 100%;
+}
+
+.invent-webcam-preview-col .invent-webcam-capture-preview {
+ border-radius: 12px;
+ display: block;
+ height: auto;
+ width: 100%;
+}
+
+.invent-webcam-preview-col .invent-webcam-capture-preview.hidden {
+ display: none;
+}
+
+/* Side-by-side mode: media row becomes a flex row, both columns equal size. */
+.invent-webcam--side-by-side {
+ min-height: 0;
+}
+
+.invent-webcam--side-by-side .invent-webcam-media-row {
+ display: flex;
+ gap: var(--spacing-lg);
+ align-items: flex-start;
+}
+
+.invent-webcam--side-by-side .invent-webcam-media-row .invent-webcam-box {
+ flex: 1;
+ min-width: 0;
+ min-height: 0;
+ width: auto;
+}
+
+.invent-webcam--side-by-side .invent-webcam-preview-col {
+ aspect-ratio: 4 / 3;
+ background: var(--webcam-bg);
+ border-radius: 12px;
+ display: flex;
+ align-items: center;
+ justify-content: center;
+ flex: 1;
+ min-width: 0;
+ overflow: hidden;
+ width: auto;
+}
+
+.invent-webcam--side-by-side .invent-webcam-preview-col .invent-webcam-capture-preview {
+ border-radius: 0;
+ display: block;
+ height: 100%;
+ object-fit: cover;
+ width: 100%;
+}
+
+@media (max-width: 600px) {
+ .invent-webcam--side-by-side .invent-webcam-media-row {
+ flex-direction: column;
+ }
+}
+
@keyframes invent-webcam-recording-ring {
0% {
border: var(--border-width) solid var(--webcam-text-primary);
@@ -2841,4 +2908,4 @@ figure.invent-avatar:focus-visible {
.shutter-container {
justify-self: center;
}
-}
\ No newline at end of file
+}
diff --git a/src/invent/tools/__init__.py b/src/invent/tools/__init__.py
index e69de29..135444b 100644
--- a/src/invent/tools/__init__.py
+++ b/src/invent/tools/__init__.py
@@ -0,0 +1,40 @@
+from .device import (
+ ChartDonkeyAdapter,
+ CodeEditorDonkeyAdapter,
+ DonkeyConnection,
+ DONKEY_BUSY,
+ DONKEY_CREATING,
+ DONKEY_ERROR,
+ DONKEY_KILLED,
+ DONKEY_READY,
+ WebcamDonkeyAdapter,
+ WidgetDonkeyAdapter,
+ create_donkey_connection,
+)
+from .donkey_plugin import (
+ DonkeyPluginFlow,
+ make_assertion_callbacks,
+ make_plugin_runner,
+)
+from .test_helpers import StatusProxy, fail_html, pass_html, wait_html
+
+__all__ = [
+ "ChartDonkeyAdapter",
+ "CodeEditorDonkeyAdapter",
+ "DonkeyConnection",
+ "DonkeyPluginFlow",
+ "make_assertion_callbacks",
+ "make_plugin_runner",
+ "DONKEY_BUSY",
+ "DONKEY_CREATING",
+ "DONKEY_ERROR",
+ "DONKEY_KILLED",
+ "DONKEY_READY",
+ "WebcamDonkeyAdapter",
+ "WidgetDonkeyAdapter",
+ "create_donkey_connection",
+ "StatusProxy",
+ "fail_html",
+ "pass_html",
+ "wait_html",
+]
diff --git a/src/invent/tools/device.py b/src/invent/tools/device.py
index 937caad..7297ba0 100644
--- a/src/invent/tools/device.py
+++ b/src/invent/tools/device.py
@@ -1,6 +1,436 @@
+import invent
+import asyncio
+import json
+from pyscript import document, js_import, window
+from pyscript.ffi import to_js
+
+# Datastore flags for Donkey worker status.
+DONKEY_CREATING = "_DEVICE_DONKEY_CREATING"
+DONKEY_READY = "_DEVICE_DONKEY_READY"
+DONKEY_BUSY = "_DEVICE_DONKEY_BUSY"
+DONKEY_ERROR = "_DEVICE_DONKEY_ERROR"
+DONKEY_KILLED = "_DEVICE_DONKEY_KILLED"
+
+
"""
-Vibrate
-Microphone
-Flashlight
-Camera
+NOTE: I am using execute("from _opencv_worker import *") which
+pulls everything into the worker's global scope, which is the only
+reliable way to define persistent callables in a donkey worker that I
+could find. Both
+ - process() feeds large multiline strings line-by-line which
+ prevents this sort of data from using it, and
+ - execute() alone seems to scope defs locally and discard them
+ after the call returns.
"""
+
+# The OpenCV worker code written to the worker's virtual filesystem as a
+# proper Python module.
+_OPENCV_WORKER_MODULE = r"""
+import base64
+import cv2
+import numpy as np
+
+
+def _decode_data_url(data_url):
+ if not data_url or "," not in data_url:
+ raise ValueError("Expected an image data URL")
+ payload = data_url.split(",", 1)[1]
+ binary = base64.b64decode(payload)
+ buf = np.frombuffer(binary, dtype=np.uint8)
+ image = cv2.imdecode(buf, cv2.IMREAD_COLOR)
+ if image is None:
+ raise ValueError("Could not decode input image")
+ return image
+
+
+def _encode_png_data_url(image):
+ ok, encoded = cv2.imencode(".png", image)
+ if not ok:
+ raise ValueError("Could not encode processed image")
+ payload = base64.b64encode(encoded.tobytes()).decode("ascii")
+ return "data:image/png;base64," + payload
+
+
+def worker_run_user_code(user_code, data_url):
+ image_bgr = _decode_data_url(data_url)
+ image_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)
+ grey = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY)
+
+ namespace = {
+ "cv2": cv2,
+ "np": np,
+ "image_bgr": image_bgr,
+ "image_rgb": image_rgb,
+ "grey": grey,
+ "result_image": None,
+ "result": None,
+ }
+
+ exec(user_code, namespace, namespace)
+
+ result = namespace.get("result_image")
+ if result is None:
+ result = namespace.get("result")
+ if result is None:
+ result = image_bgr
+
+ if not isinstance(result, np.ndarray):
+ raise ValueError(
+ "Your code must assign a numpy ndarray to result_image or result"
+ )
+
+ if result.ndim == 2:
+ result = cv2.cvtColor(result, cv2.COLOR_GRAY2BGR)
+ elif result.ndim == 3 and result.shape[2] == 3:
+ pass
+ else:
+ raise ValueError("Unsupported result_image shape")
+
+ return {
+ "ok": True,
+ "kind": "user_code",
+ "data_url": _encode_png_data_url(result),
+ }
+"""
+
+_DONKEY_RUNTIME_MODULE = r"""
+import json
+
+
+def invent_run_code(code, context_json):
+ namespace = {}
+ if context_json:
+ context = json.loads(context_json)
+ if not isinstance(context, dict):
+ raise ValueError("context must decode to a dict")
+ namespace.update(context)
+ exec(code, namespace, namespace)
+ if "result" not in namespace:
+ raise ValueError("Your code must assign a value to `result`")
+ return namespace["result"]
+"""
+
+_PYSCRIPT_CORE = "https://pyscript.net/releases/2026.3.1/core.js"
+
+
+def _ensure_terminal_div(terminal_id="donkey-terminal"):
+ """
+ Create a hidden terminal container in the DOM if needed.
+ """
+ if document.getElementById(terminal_id) is None:
+ div = document.createElement("div")
+ div.id = terminal_id
+ div.style.display = "none"
+ document.body.appendChild(div)
+ return f"#{terminal_id}"
+
+
+class DonkeyConnection:
+ """General donkey wrapper with datastore-oriented execution."""
+
+ def __init__(self, donkey, result_key=None):
+ self._donkey = donkey
+ self._result_key = result_key
+ self._ready = False
+
+ @property
+ def ready(self):
+ return self._ready
+
+ def _set_status(self, status):
+ if self._result_key:
+ invent.datastore[self._result_key] = status
+
+ async def initialize(self):
+ self._set_status(DONKEY_BUSY)
+ await self.execute(
+ "open('_invent_runtime.py', 'w').write("
+ f"{_DONKEY_RUNTIME_MODULE!r})"
+ )
+ await self.execute("from _invent_runtime import *")
+ self._ready = True
+ self._set_status(DONKEY_READY)
+
+ async def execute(self, code):
+ if not self._ready and "from _invent_runtime import *" not in code:
+ self._set_status(DONKEY_BUSY)
+ try:
+ result = await self._donkey.execute(code)
+ if self._ready:
+ self._set_status(DONKEY_READY)
+ return result
+ except Exception as exc:
+ self._set_status(f"{DONKEY_ERROR}: {exc}")
+ raise
+
+ async def evaluate(self, expression):
+ if not self._ready:
+ raise RuntimeError("Donkey is not ready yet")
+ self._set_status(DONKEY_BUSY)
+ try:
+ result = await self._donkey.evaluate(expression)
+ self._set_status(DONKEY_READY)
+ return result
+ except Exception as exc:
+ self._set_status(f"{DONKEY_ERROR}: {exc}")
+ raise
+
+ async def run_code(self, code, result_key, context=None):
+ """Execute code and store structured result in datastore."""
+ context_json = json.dumps(context or {})
+ expression = (
+ "__import__('json').dumps({"
+ "'ok': True, "
+ "'result': invent_run_code("
+ f"{code!r}, {context_json!r}"
+ ")})"
+ )
+ try:
+ payload = await self.evaluate(expression)
+ invent.datastore[result_key] = json.loads(payload)
+ except Exception as exc:
+ invent.datastore[result_key] = {
+ "ok": False,
+ "error": f"{DONKEY_ERROR}: {exc}",
+ }
+
+ async def kill(self):
+ await self._donkey.kill()
+ self._ready = False
+ self._set_status(DONKEY_KILLED)
+
+
+class WidgetDonkeyAdapter:
+ """
+ Base adapter for attaching donkey logic to a widget-like target.
+
+ Subclasses must implement:
+ - _context()
+ - _apply_result(payload)
+ """
+
+ def __init__(self, status_key, result_key):
+ self._status_key = status_key
+ self._result_key = result_key
+ self._connection = None
+
+ @property
+ def ready(self):
+ return self._connection is not None and self._connection.ready
+
+ async def initialize(self):
+ # Allow subclasses to request extra packages by defining
+ # `self._packages` prior to initialize() being called.
+ packages = getattr(self, "_packages", None)
+ self._connection = await create_donkey_connection(
+ result_key=self._status_key, packages=packages
+ )
+ # Give subclasses a chance to prepare the worker (write
+ # modules, import helpers, etc.). Default is no-op.
+ await self._prepare_worker(self._connection)
+
+ def _context(self):
+ raise NotImplementedError()
+
+ def _apply_result(self, payload):
+ raise NotImplementedError()
+
+ async def run(self, code):
+ if not self.ready:
+ raise RuntimeError("Donkey is not ready yet")
+ await self._connection.run_code(
+ code=code,
+ result_key=self._result_key,
+ context=self._context(),
+ )
+ payload = invent.datastore.get(self._result_key)
+ try:
+ return self._apply_result(payload)
+ except Exception as exc:
+ result = {
+ "ok": False,
+ "error": f"{DONKEY_ERROR}: {exc}",
+ }
+ invent.datastore[self._result_key] = result
+ return result
+
+ async def _prepare_worker(self, connection):
+ """Optional hook called after the connection is created.
+
+ Subclasses may override to write files into the worker and
+ import helper modules. The default implementation does
+ nothing.
+ """
+ return None
+
+ async def kill(self):
+ if self._connection is not None:
+ await self._connection.kill()
+
+
+class ChartDonkeyAdapter(WidgetDonkeyAdapter):
+ """
+ Attach donkey-driven Python logic to a Chart widget.
+
+ The adapter expects plugin code to assign `result` as a dictionary
+ containing optional `data` and `options` keys for chart updates.
+ """
+
+ def __init__(self, chart_widget, status_key, result_key):
+ super().__init__(status_key=status_key, result_key=result_key)
+ self._chart = chart_widget
+
+ def _context(self):
+ return {
+ "chart_type": self._chart.chart_type,
+ "data": self._chart.data,
+ "options": self._chart.options,
+ }
+
+ def _apply_result(self, payload):
+ if not isinstance(payload, dict):
+ raise ValueError("Result payload must be a dict.")
+ if not payload.get("ok"):
+ return payload
+ result = payload.get("result")
+ if not isinstance(result, dict):
+ raise ValueError("Result value must be a dict.")
+ if "data" in result:
+ self._chart.data = result["data"]
+ if "options" in result:
+ self._chart.options = result["options"]
+ return payload
+
+
+class CodeEditorDonkeyAdapter(WidgetDonkeyAdapter):
+ """
+ Run donkey logic from a CodeEditor and update an output target.
+
+ The output target must provide a writable `text` attribute.
+ """
+
+ def __init__(
+ self,
+ code_editor_widget,
+ output_widget,
+ status_key,
+ result_key,
+ ):
+ super().__init__(status_key=status_key, result_key=result_key)
+ self._code_editor = code_editor_widget
+ self._output = output_widget
+
+ def _context(self):
+ return {
+ "editor_code": self._code_editor.code or "",
+ }
+
+ def _apply_result(self, payload):
+ if not isinstance(payload, dict):
+ raise ValueError("Result payload must be a dict.")
+ if not payload.get("ok"):
+ return payload
+ result = payload.get("result")
+ if not isinstance(result, dict):
+ raise ValueError("Result value must be a dict.")
+ message = result.get("output")
+ if message is None:
+ raise ValueError("Result must include an `output` value.")
+ self._output.text = str(message)
+ return payload
+
+
+class WebcamDonkeyAdapter(WidgetDonkeyAdapter):
+ """
+ Attach OpenCV donkey logic to a Webcam widget.
+
+ The worker executes user code with a pre-built image namespace
+ (image_bgr, image_rgb, grey, cv2, np). User code must assign a
+ numpy ndarray to `result_image` or `result`. The processed frame
+ is displayed via the webcam widget's `show_image()` method.
+ """
+
+ def __init__(self, webcam_widget, status_key, result_key):
+ super().__init__(status_key=status_key, result_key=result_key)
+ self._webcam = webcam_widget
+ # Request OpenCV and numpy packages when creating the worker.
+ self._packages = ["opencv-python", "numpy"]
+
+ async def _prepare_worker(self, connection):
+ # Write the OpenCV helper module into the worker and import it.
+ await connection.execute(
+ f"open('_opencv_worker.py', 'w').write({_OPENCV_WORKER_MODULE!r})"
+ )
+ await connection.execute("from _opencv_worker import *")
+
+ def _context(self):
+ # Not used; OpenCV worker uses its own entrypoint.
+ return {}
+
+ def _apply_result(self, payload):
+ if not isinstance(payload, dict):
+ raise ValueError("Result payload must be a dict.")
+ if not payload.get("ok"):
+ return payload
+ data_url = payload.get("data_url")
+ if not data_url:
+ raise ValueError("Result must include a data_url value.")
+ self._webcam.show_image(data_url)
+ return payload
+
+ async def run(self, code):
+ if not self.ready:
+ raise RuntimeError("Donkey is not ready yet")
+
+ capture = self._webcam.latest_capture(media_type="photo")
+ if capture is None:
+ raise ValueError("No photo captured yet. Take a photo first.")
+ data_url = capture.get("data_url")
+ if not data_url:
+ raise ValueError("Latest capture has no data_url.")
+
+ expression = (
+ "__import__('json').dumps(worker_run_user_code("
+ f"{code!r}, {data_url!r}"
+ "))"
+ )
+ try:
+ payload_str = await self._connection.evaluate(expression)
+ payload = json.loads(payload_str)
+ invent.datastore[self._result_key] = payload
+ except Exception as exc:
+ payload = {"ok": False, "error": f"{DONKEY_ERROR}: {exc}"}
+ invent.datastore[self._result_key] = payload
+
+ try:
+ return self._apply_result(payload)
+ except Exception as exc:
+ result = {"ok": False, "error": f"{DONKEY_ERROR}: {exc}"}
+ invent.datastore[self._result_key] = result
+ return result
+
+
+async def create_donkey_connection(result_key=None, packages=None):
+ """
+ Create a donkey connection for Python code execution.
+
+ Each call creates a new worker instance. The framework manages
+ worker options internally.
+ """
+ if result_key:
+ invent.datastore[result_key] = DONKEY_CREATING
+
+ (core,) = await js_import(_PYSCRIPT_CORE)
+ terminal_selector = _ensure_terminal_div()
+ options = to_js(
+ {
+ "type": "py",
+ "persistent": True,
+ "terminal": terminal_selector,
+ "config": {"packages": packages or []},
+ }
+ )
+ donkey = await core.donkey(options)
+ connection = DonkeyConnection(donkey, result_key=result_key)
+ await connection.initialize()
+ return connection
diff --git a/src/invent/tools/donkey_plugin.py b/src/invent/tools/donkey_plugin.py
new file mode 100644
index 0000000..081102c
--- /dev/null
+++ b/src/invent/tools/donkey_plugin.py
@@ -0,0 +1,143 @@
+"""Helpers for building user-facing donkey plugin pages."""
+
+
+class DonkeyPluginFlow:
+ """
+ Handle common donkey plugin page workflow.
+
+ This keeps page-level `main.py` files small by centralising startup,
+ run-state text, and standard error handling.
+ """
+
+ def __init__(self, adapter, status_widget):
+ self._adapter = adapter
+ self._status = status_widget
+
+ @staticmethod
+ def _error_from_result(result):
+ if isinstance(result, dict):
+ error = result.get("error")
+ if error:
+ return str(error)
+ return "Unknown error."
+
+ async def ensure_worker(
+ self,
+ *,
+ ready_text="Donkey ready.",
+ starting_text="Starting donkey worker...",
+ ):
+ if self._adapter.ready:
+ self._status.text = ready_text
+ return {"ok": True}
+ self._status.text = starting_text
+ try:
+ await self._adapter.initialize()
+ self._status.text = ready_text
+ return {"ok": True}
+ except Exception as exc:
+ error = str(exc)
+ self._status.text = f"Failed to start donkey: {error}"
+ return {"ok": False, "error": error}
+
+ async def run_code(
+ self,
+ code,
+ *,
+ running_text="Running code...",
+ success_text="Done.",
+ not_ready_text="Donkey not ready.",
+ empty_code_text="Write plugin code first.",
+ ):
+ if not self._adapter.ready:
+ self._status.text = not_ready_text
+ return {"ok": False, "error": not_ready_text}
+ if not (code or "").strip():
+ self._status.text = empty_code_text
+ return {"ok": False, "error": empty_code_text}
+ self._status.text = running_text
+ result = await self._adapter.run(code)
+ if isinstance(result, dict) and result.get("ok"):
+ self._status.text = success_text
+ return result
+ error = self._error_from_result(result)
+ self._status.text = f"Worker error: {error}"
+ return {"ok": False, "error": error}
+
+
+def make_plugin_runner(
+ *,
+ adapter,
+ status_widget,
+ code_getter,
+ ready_text="Donkey ready. Press Run Code.",
+ success_text="Done.",
+ on_worker_ready=None,
+ on_worker_error=None,
+ on_run_success=None,
+ on_run_error=None,
+):
+ """
+ Return standard `ensure_worker` and `run_code` callables.
+
+ This helper reduces repetitive workflow wiring in user-facing pages.
+ """
+ flow = DonkeyPluginFlow(adapter=adapter, status_widget=status_widget)
+
+ async def ensure_worker():
+ result = await flow.ensure_worker(ready_text=ready_text)
+ if result.get("ok"):
+ if callable(on_worker_ready):
+ on_worker_ready(result)
+ return result
+ if callable(on_worker_error):
+ on_worker_error(result)
+ return result
+
+ async def run_code():
+ result = await flow.run_code(
+ code_getter(),
+ success_text=success_text,
+ )
+ if result.get("ok"):
+ if callable(on_run_success):
+ on_run_success(result)
+ return result
+ if callable(on_run_error):
+ on_run_error(result)
+ return result
+
+ return flow, ensure_worker, run_code
+
+
+def make_assertion_callbacks(
+ *,
+ worker_assert_widget,
+ run_assert_widget,
+ pass_html,
+ fail_html,
+):
+ """Return standard assertion callbacks for plugin runner hooks."""
+
+ def on_worker_ready(_result):
+ worker_assert_widget.html = pass_html("Donkey worker started.")
+
+ def on_worker_error(result):
+ error = result.get("error", "Unknown error.")
+ worker_assert_widget.html = fail_html(
+ f"Donkey worker failed to start: {error}"
+ )
+
+ def on_run_success(_result):
+ run_assert_widget.html = pass_html("Code run succeeded.")
+
+ def on_run_error(result):
+ error = result.get("error", "Unknown error.")
+ run_assert_widget.html = fail_html(f"Code run failed: {error}")
+
+ return {
+ "on_worker_ready": on_worker_ready,
+ "on_worker_error": on_worker_error,
+ "on_run_success": on_run_success,
+ "on_run_error": on_run_error,
+ }
diff --git a/src/invent/tools/test_helpers.py b/src/invent/tools/test_helpers.py
new file mode 100644
index 0000000..c5491ba
--- /dev/null
+++ b/src/invent/tools/test_helpers.py
@@ -0,0 +1,39 @@
+"""Small helpers shared by interactive donkey test pages."""
+
+import invent
+
+_PASS = "color:green;font-family:monospace;margin:4px 0"
+_FAIL = "color:red;font-family:monospace;margin:4px 0"
+_WAIT = "color:#555;font-family:monospace;margin:4px 0"
+
+
+def pass_html(text):
+ return f'[PASS] {text}
'
+
+
+def fail_html(text):
+ return f'[FAIL] {text}
'
+
+
+def wait_html(text):
+ return f'[ ] {text}
'
+
+
+class StatusProxy:
+ """Proxy a label while publishing status updates."""
+
+ def __init__(self, status_label, channel):
+ self._label = status_label
+ self._channel = channel
+
+ @property
+ def text(self):
+ return self._label.text
+
+ @text.setter
+ def text(self, value):
+ self._label.text = value
+ invent.publish(
+ invent.Message("status", status=value),
+ to_channel=self._channel,
+ )
diff --git a/src/invent/ui/core/property.py b/src/invent/ui/core/property.py
index c60989f..b17d447 100644
--- a/src/invent/ui/core/property.py
+++ b/src/invent/ui/core/property.py
@@ -406,11 +406,15 @@ def validate(self, value):
length = len(value)
if self.min_length and length < self.min_length:
raise ValidationError(
- _("The length of the value is less than min_value allowed.")
+ _(
+ "The length of the value is less than min_value allowed."
+ )
)
if self.max_length and length > self.max_length:
raise ValidationError(
- _("The length of the value is more than max_value allowed.")
+ _(
+ "The length of the value is more than max_value allowed."
+ )
)
return value
diff --git a/src/invent/ui/widgets/chart.py b/src/invent/ui/widgets/chart.py
index f39f700..bbb0e7c 100644
--- a/src/invent/ui/widgets/chart.py
+++ b/src/invent/ui/widgets/chart.py
@@ -40,15 +40,36 @@
# Module-level Chart.js reference, loaded once on first use.
_chart_js = None
+_chart_js_error = None
+
+# Candidate ESM sources for Chart.js. We try these in order.
+_CHART_JS_SOURCES = [
+ "https://esm.sh/chart.js@4.5.1/auto?bundle-deps",
+ "https://cdn.jsdelivr.net/npm/chart.js@4.5.1/auto/+esm",
+ "https://esm.run/chart.js/auto",
+]
async def _ensure_chart_js():
"""
Load Chart.js the first time a Chart widget is rendered.
"""
- global _chart_js
- if _chart_js is None:
- (_chart_js,) = await js_import("https://esm.run/chart.js/auto")
+ global _chart_js, _chart_js_error
+ if _chart_js is not None:
+ return
+ if _chart_js_error is not None:
+ raise RuntimeError(_chart_js_error)
+ for source in _CHART_JS_SOURCES:
+ try:
+ (_chart_js,) = await js_import(source)
+ _chart_js_error = None
+ return
+ except Exception as ex:
+ _chart_js_error = f"{type(ex).__name__}: {ex}"
+ raise RuntimeError(
+ "Could not load Chart.js from any configured source. "
+ f"Last error: {_chart_js_error}"
+ )
class Chart(Widget):
@@ -135,6 +156,11 @@ def _update_chart(self):
during render().
"""
if self.parent:
+ if _chart_js is None:
+ raise RuntimeError(
+ "Chart.js is not loaded. "
+ "Cannot render chart without JS runtime."
+ )
chart_args = {
"data": self.data,
"responsive": True,
@@ -149,6 +175,12 @@ def _update_chart(self):
self.chart_instance.options = to_js(self.options)
self.chart_instance.update()
else:
+ existing = _chart_js.Chart.getChart(
+ self.chart_canvas._dom_element
+ )
+ destroy = getattr(existing, "destroy", None)
+ if callable(destroy):
+ destroy()
self.chart_instance = _chart_js.Chart.new(
self.chart_canvas._dom_element, to_js(chart_args)
)
@@ -165,10 +197,19 @@ async def _init_chart(self):
Load Chart.js then trigger the initial render. Runs as a background
task started by render() so that render() itself stays synchronous.
"""
- await _ensure_chart_js()
- window.requestAnimationFrame(
- create_proxy(lambda x: self._update_chart())
- )
+ try:
+ await _ensure_chart_js()
+ window.requestAnimationFrame(
+ create_proxy(lambda x: self._update_chart())
+ )
+ except Exception as ex:
+ if self.chart_instance:
+ try:
+ self.chart_instance.destroy()
+ except Exception:
+ pass
+ self.chart_instance = None
+ print(f"Chart initialisation failed: {ex}")
def render(self):
"""
diff --git a/src/invent/ui/widgets/webcam.py b/src/invent/ui/widgets/webcam.py
index ed297fe..9e691af 100644
--- a/src/invent/ui/widgets/webcam.py
+++ b/src/invent/ui/widgets/webcam.py
@@ -19,23 +19,33 @@
limitations under the License.
"""
-from invent.i18n import _
+import asyncio
import time
+
+from invent.i18n import _
from invent.ui.core import (
Widget,
ChoiceProperty,
BooleanProperty,
Event,
)
-from pyscript.web import div, video, button, canvas
+from pyscript import window
+from pyscript.web import div, video, button, canvas, img
from pyscript.ffi import create_proxy
class Webcam(Widget):
"""
- A webcam widget with photo capture and video recording capabilities.
+ A webcam widget with photo capture and video recording.
"""
+ photo_output = ChoiceProperty(
+ _("How captured photos are handled: downloaded, previewed, or both."),
+ default_value="download",
+ choices=["download", "preview", "both"],
+ group="behavior",
+ )
+
mode = ChoiceProperty(
_("Webcam mode: photo, video, or both."),
default_value="both",
@@ -49,9 +59,17 @@ class Webcam(Widget):
group="style",
)
+ preview_layout = ChoiceProperty(
+ _("Layout of the capture preview relative to the live video feed."),
+ default_value="stacked",
+ choices=["stacked", "side-by-side"],
+ group="style",
+ )
+
photo_captured = Event(
_("Sent when a photo is captured."),
webcam=_("The Webcam widget that captured the photo."),
+ capture=_("The captured photo metadata."),
)
video_recorded = Event(
@@ -59,46 +77,98 @@ class Webcam(Widget):
webcam=_("The Webcam widget that recorded the video."),
)
- @classmethod
- def icon(cls):
- return '' # noqa
+ def __init__(self, *args, **kwargs):
+ self._captures = []
+ self._capture_counter = 0
+ super().__init__(*args, **kwargs)
+
+ # Capture management
+ def _store_capture(self, capture):
+ capture = dict(capture)
+ capture.setdefault("type", "photo")
+ capture.setdefault("timestamp", self._timestamp())
+ self._capture_counter += 1
+ capture.setdefault(
+ "id",
+ f"{capture['type']}-{self._timestamp()}-{self._capture_counter}",
+ )
+ self._captures.append(capture)
- def trigger(self):
- """
- Trigger the current action (capture photo or start/stop recording).
- """
- if hasattr(self, "_shutter_btn"):
- self._shutter_btn.click()
+ preview_enabled = self.photo_output in ("preview", "both")
+
+ if capture["type"] == "photo" and preview_enabled:
+ self._show_capture_preview(capture)
+
+ return capture
+
+ def captures(self, media_type=None):
+ if media_type is None:
+ return list(self._captures)
+ return [c for c in self._captures if c.get("type") == media_type]
+
+ def latest_capture(self, media_type=None):
+ captures = self.captures(media_type=media_type)
+ return captures[-1] if captures else None
+
+ # Preview helpers
+ def _show_capture_preview(self, capture):
+ if not hasattr(self, "_capture_preview"):
+ return
+ data_url = capture.get("data_url")
+ if not data_url:
+ return
+ self._capture_preview.src = data_url
+ self._capture_preview.classes.remove("hidden")
+
+ def _hide_capture_preview(self):
+ if hasattr(self, "_capture_preview"):
+ self._capture_preview.classes.add("hidden")
+
+ def _refresh_capture_preview(self):
+ preview_enabled = self.photo_output in ("preview", "both")
+ if not preview_enabled:
+ self._hide_capture_preview()
+ return
+ latest_photo = self.latest_capture(media_type="photo")
+ if latest_photo:
+ self._show_capture_preview(latest_photo)
+ else:
+ self._hide_capture_preview()
+ def show_image(self, data_url):
+ """Display any image in the preview panel, replacing the current content."""
+ if not hasattr(self, "_capture_preview"):
+ return
+ self._capture_preview.src = data_url
+ self._capture_preview.classes.remove("hidden")
+
+ # Mode switching
def set_mode(self, mode):
- """
- Set the active webcam mode when switching is enabled.
- """
+ """Set the active webcam mode when switching is enabled."""
if self.mode != "both":
return
if mode in ["photo", "video"]:
self._active_mode = mode
+ mode_label = (
+ "Video Mode"
+ if self._current_mode() == "video"
+ else "Photo Mode"
+ )
if hasattr(self, "_mode_buttons"):
self._update_mode_buttons()
if hasattr(self, "_mode_indicator"):
- self._mode_indicator._dom_element.textContent = (
- self._mode_label()
- )
+ self._mode_indicator._dom_element.textContent = mode_label
if hasattr(self, "_shutter_btn"):
self._set_shutter_text()
def _current_mode(self):
- """
- Return the active mode used for behavior and UI labels.
- """
if self.mode == "both":
return getattr(self, "_active_mode", "photo")
return self.mode
+ # Photo capture
def capture_photo(self):
- """
- Capture a photo from the current video stream.
- """
+ """Capture a photo from the current video stream."""
if hasattr(self, "_canvas") and hasattr(self, "_video_elem"):
video_el = self._video_elem._dom_element
canvas_el = self._canvas._dom_element
@@ -109,35 +179,43 @@ def capture_photo(self):
canvas_el.height = height
ctx = canvas_el.getContext("2d")
- ctx.drawImage(
- video_el,
- 0,
- 0,
- width,
- height,
+ ctx.drawImage(video_el, 0, 0, width, height)
+
+ capture = self._store_capture(
+ {
+ "type": "photo",
+ "timestamp": self._timestamp(),
+ "data_url": self._canvas._dom_element.toDataURL(
+ "image/jpeg"
+ ),
+ }
)
- # Trigger download
- self._download_canvas_as_image()
- self.publish(self.photo_captured, webcam=self)
+ download_enabled = self.photo_output in (
+ "download",
+ "both",
+ )
+ preview_enabled = self.photo_output in ("preview", "both")
+
+ if download_enabled:
+ self._download_canvas_as_image(capture)
+
+ if not preview_enabled:
+ self._hide_capture_preview()
+
+ self.publish(self.photo_captured, webcam=self, capture=capture)
+
+ # Status helpers
def _set_status(self, text):
- """
- Update the status indicator text.
- """
if not hasattr(self, "_status_elem"):
return
self._status_elem._dom_element.textContent = text
def _timestamp(self):
- """
- Return a millisecond timestamp for generated filenames.
- """
return int(time.time() * 1000)
+ # Video recording
def start_recording(self):
- """
- Start recording video from the webcam.
- """
if hasattr(self, "_recorder"):
if not self._recording and self._recorder.state == "inactive":
self._recorded_chunks = []
@@ -148,9 +226,6 @@ def start_recording(self):
self._set_status("Recording...")
def stop_recording(self):
- """
- Stop recording and trigger download of the video file.
- """
if (
hasattr(self, "_recorder")
and self._recording
@@ -162,24 +237,21 @@ def stop_recording(self):
self._set_shutter_text()
self._set_status("Saving video...")
+ # Callbacks
def on_mode_changed(self):
- """
- Apply all mode-dependent configuration to the already-rendered DOM.
- The framework calls this after the widget's properties are set,
- so self.mode is reliable here (unlike during render()).
- """
if not hasattr(self, "_controls"):
- # render() hasn't run yet; nothing to configure
return
- # Set active mode
self._active_mode = "photo" if self.mode == "both" else self.mode
+ mode_label = (
+ "Video Mode" if self._current_mode() == "video" else "Photo Mode"
+ )
- # Rebuild controls: remove any existing modes container, then
- # prepend one only when mode='both'.
controls_el = self._controls._dom_element
- # Remove any previously inserted modes container
- if hasattr(self, "_modes_container"):
+ if (
+ hasattr(self, "_modes_container")
+ and self._modes_container is not None
+ ):
try:
controls_el.removeChild(self._modes_container._dom_element)
except Exception:
@@ -196,30 +268,31 @@ def on_mode_changed(self):
else:
self._mode_buttons = []
- # Update shutter label and mode indicator text
self._set_shutter_text()
if hasattr(self, "_mode_indicator"):
- self._mode_indicator._dom_element.textContent = self._mode_label()
+ self._mode_indicator._dom_element.textContent = mode_label
- # Apply show_mode_indicator
if hasattr(self, "_indicators"):
if self.show_mode_indicator:
self._indicators.classes.remove("hidden")
else:
self._indicators.classes.add("hidden")
- def _mode_label(self):
- """
- Return the display label for the current mode.
- """
- if self._current_mode() == "video":
- return "Video Mode"
- return "Photo Mode"
+ def on_photo_output_changed(self):
+ if not hasattr(self, "_capture_preview"):
+ return
+ self._refresh_capture_preview()
+
+ def on_preview_layout_changed(self):
+ if not hasattr(self, "element"):
+ return
+ if self.preview_layout == "side-by-side":
+ self.element.classes.add("invent-webcam--side-by-side")
+ else:
+ self.element.classes.remove("invent-webcam--side-by-side")
+ # UI helpers
def _update_mode_buttons(self):
- """
- Update the visual state of mode buttons based on current mode.
- """
for btn_info in self._mode_buttons:
btn = btn_info["element"]
btn_mode = btn_info["mode"]
@@ -231,9 +304,6 @@ def _update_mode_buttons(self):
btn.classes.remove("active")
def _set_shutter_text(self):
- """
- Keep shutter text aligned with current mode and recording state.
- """
if not hasattr(self, "_shutter_btn"):
return
is_recording = getattr(self, "_recording", False)
@@ -243,24 +313,23 @@ def _set_shutter_text(self):
text = "Take"
self._shutter_btn._dom_element.textContent = text
- def _download_canvas_as_image(self):
- """
- Download the canvas content as an image file.
- """
+ # Download helper
+ def _download_canvas_as_image(self, capture=None):
try:
- from pyscript import window
+ capture = capture or self.latest_capture(media_type="photo")
+ if capture and capture.get("data_url"):
+ data_url = capture["data_url"]
+ else:
+ data_url = self._canvas._dom_element.toDataURL("image/jpeg")
link = window.document.createElement("a")
- link.href = self._canvas._dom_element.toDataURL("image/jpeg")
+ link.href = data_url
link.download = f"photo-{self._timestamp()}.jpg"
link.click()
except Exception as e:
print(f"Error downloading photo: {e}")
def _on_shutter_click(self, event):
- """
- Handle shutter button clicks.
- """
if self._current_mode() == "photo":
self.capture_photo()
else:
@@ -269,10 +338,9 @@ def _on_shutter_click(self, event):
else:
self.start_recording()
+ # Webcam stream
def _setup_webcam_stream(self):
try:
- from pyscript import window
-
navigator = window.navigator
if not navigator.mediaDevices:
print("Camera not supported in this browser")
@@ -305,20 +373,13 @@ async def get_stream():
print(f"Camera access denied or error: {e}")
self._set_status("Camera access denied")
- # Handle promise
- import asyncio
-
asyncio.create_task(get_stream())
except Exception as e:
print(f"Error setting up webcam: {e}")
def _setup_recorder(self, stream):
- """
- Set up the MediaRecorder for video recording.
- """
try:
- from pyscript import window
def on_dataavailable(event):
if event.data.size > 0:
@@ -353,11 +414,8 @@ def on_stop(event):
except Exception as e:
print(f"Error setting up recorder: {e}")
+ # Determine Camera Mode (video or photo)
def _build_mode_buttons(self):
- """
- Build and return the mode toggle container for mode='both'.
- Populates self._mode_buttons as a side effect.
- """
self._mode_buttons = []
def build_mode_button(mode_name):
@@ -386,15 +444,13 @@ def build_mode_button(mode_name):
def render(self):
"""
- Render the webcam widget with controls.
- Mode-dependent configuration is applied in on_mode_changed,
- which the framework calls after properties are set.
+ Render the webcam widget.
"""
- # Hidden canvas for photo capture
+ # hidden canvas for photo capture
self._canvas = canvas()
self._canvas.classes.add("invent-webcam-canvas-hidden")
- # Video element for preview
+ # live video element
self._video_elem = video()
self._video_elem.id = f"{self.id}-video"
self._video_elem.autoplay = True
@@ -404,10 +460,8 @@ def render(self):
def on_video_ready(event):
video_el = self._video_elem._dom_element
canvas_el = self._canvas._dom_element
- width = video_el.videoWidth or 1280
- height = video_el.videoHeight or 720
- canvas_el.width = width
- canvas_el.height = height
+ canvas_el.width = video_el.videoWidth or 1280
+ canvas_el.height = video_el.videoHeight or 720
self._video_elem._dom_element.addEventListener(
"loadedmetadata", create_proxy(on_video_ready)
@@ -417,7 +471,7 @@ def on_video_ready(event):
video_container.classes.add("invent-webcam-box")
video_container.classes.add("webcam-box")
- # Shutter button
+ # shutter button
self._shutter_btn = button("Take")
self._shutter_btn.id = f"{self.id}-shutter"
self._shutter_btn.classes.add("invent-webcam-shutter")
@@ -430,14 +484,12 @@ def on_video_ready(event):
shutter_container.classes.add("invent-webcam-shutter-container")
shutter_container.classes.add("shutter-container")
- # Controls container: starts with just the shutter;
- # on_mode_changed inserts mode toggle buttons when mode='both'.
self._controls = div(shutter_container)
self._controls.classes.add("invent-webcam-actions")
self._controls.classes.add("actions")
self._shutter_container = shutter_container
- # Status indicators
+ # status indicators
self._status_elem = div("Initializing camera...")
self._status_elem.id = f"{self.id}-status"
self._status_elem.classes.add("invent-webcam-status")
@@ -451,18 +503,32 @@ def on_video_ready(event):
self._indicators.classes.add("invent-webcam-indicators")
self._indicators.classes.add("indicators")
- # Main container
+ # capture preview image
+ self._capture_preview = img()
+ self._capture_preview.id = f"{self.id}-capture-preview"
+ self._capture_preview.classes.add("invent-webcam-capture-preview")
+ self._capture_preview.classes.add("capture-preview")
+ self._capture_preview.classes.add("hidden")
+
+ # Wrap video and preview in a media row
+ self._preview_col = div(self._capture_preview)
+ self._preview_col.classes.add("invent-webcam-preview-col")
+
+ self._media_row = div(video_container, self._preview_col)
+ self._media_row.classes.add("invent-webcam-media-row")
+
element = div(
self._canvas,
- video_container,
+ self._media_row,
self._controls,
self._indicators,
id=self.id,
)
+
element.classes.add("invent-webcam")
element.classes.add("webcam-container")
- # Initialize the webcam stream
+ # Start the camera stream!
self._setup_webcam_stream()
return element