Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ async with AsyncCircleClient(api_token="YOUR_TOKEN") as client:
- [Auth API](docs/auth-api.md) -- headless auth token management
- [Models](docs/models.md) -- complete models reference
- [Webhooks](docs/webhooks.md) -- signature verification and payload parsing
- [Limitations](docs/limitations.md) -- known Circle API limitations (mentions, polls, moderators)

## Project Structure

Expand Down
100 changes: 100 additions & 0 deletions docs/limitations.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Circle API Limitations

Known limitations of the Circle.so API that affect SDK operations. These are platform-level restrictions, not SDK bugs.

## Mentions

Circle uses Rails Signed Global IDs (SGIDs) to resolve @mentions. The SGID is a cryptographically signed token generated server-side that maps to a `CommunityMember` record.

**What works:**
- Reading mentions from existing posts via `tiptap_body.community_members` and `sgids_to_object_map`
- Reusing a known SGID to embed a mention in a new post or comment

**What does not work:**
- Creating mentions with just a `community_member_id` or email -- the mention node renders as blank text
- Generating SGIDs via the API -- no endpoint returns member SGIDs
- Sending `community_members` in the request body -- Circle ignores it and resolves from SGIDs only

**Workaround:** Manually mention a member once in the Circle UI, retrieve the post via API to capture their SGID from `tiptap_body.sgids_to_object_map`, store it, and reuse it in future API posts. SGIDs have no expiry.

```python
# Reading a mention SGID from an existing post
post = client.admin.posts.show_post(post_id)
tiptap = post.tiptap_body # dict
for sgid, obj in tiptap["sgids_to_object_map"].items():
if obj["type"] == "CommunityMember":
print(f'{obj["name"]}: sgid={sgid}')

# Using a captured SGID in a new post
client.admin.posts.create_post(
space_id=space_id,
name="Post with mention",
status="published",
tiptap_body={
"body": {
"type": "doc",
"content": [{
"type": "paragraph",
"content": [
{"type": "text", "text": "Hey "},
{"type": "mention", "attrs": {"sgid": captured_sgid}},
{"type": "text", "text": " check this out"},
]
}]
},
"community_members": [{
"id": member_id, "name": "Member Name",
"user_id": user_id, "sgid": captured_sgid,
"type": "CommunityMember",
}],
"sgids_to_object_map": {
captured_sgid: {
"id": member_id, "name": "Member Name",
"user_id": user_id, "sgid": captured_sgid,
"type": "CommunityMember",
}
}
},
)
```

## Polls

Polls use the same SGID mechanism as mentions. Each poll is a server-side object referenced by a signed token.

**What works:**
- Reading poll data from existing posts via `tiptap_body.polls` and `sgids_to_object_map`
- Reading poll titles, options, status, and closing dates
- Embedding an existing poll in a new post by reusing its SGID (same poll instance, shared votes)

**What does not work:**
- Creating new polls via any API endpoint -- no `/polls` endpoint exists
- Sending poll data in `tiptap_body.polls`, `polls_attributes`, or as inline node attrs -- all ignored
- Creating a poll node without an SGID -- the node is stripped from the post

**Workaround:** Create polls in the Circle UI. Use the API to read poll data for reporting and dashboards.

```python
# Reading polls from a post
post = client.admin.posts.show_post(post_id)
tiptap = post.tiptap_body # dict
for poll in tiptap.get("polls", []):
print(f'Poll: {poll["title"]} (id={poll["id"]})')
print(f' Status: {poll["status"]}, Closes: {poll["closes_at"]}')
for opt in poll["poll_options"]:
print(f' - {opt["value"]} (id={opt["id"]})')
```

## Other Limitations

| Feature | Limitation |
|---|---|
| Moderator assignment | Per-space moderator roles can only be set in the Circle UI, not via API |
| Bulk member add | No bulk endpoint -- members must be added one at a time |
| Post move | Posts cannot be moved between spaces via API |
| Comment authorship | Comments are always authored as the API token owner; no `user_email` override (posts do support `user_email`) |
| Space slugs | Circle appends a hash to slugs even when explicitly set via API |
| Inherited access | `show_space_member` returns space group inherited access as if it were direct space membership |
| Resend invitation | No endpoint to resend a member invitation |
| Workflows | No API for Circle automations or workflows |
| Poll votes | Vote counts and individual votes are not returned in the API response |
50 changes: 35 additions & 15 deletions src/circle/api/headless_chat_notif_members.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from circle.constants import HEADLESS_V1_PREFIX as _P
from circle.http import AsyncTransport, SyncTransport
from circle.models.headless.chat import (
ChatRoom, ChatRoomList, ChatRoomMessage, ChatRoomMessages, ChatRoomParticipantList,
ChatRoom, ChatRoomDetail, ChatRoomList, ChatRoomMessage, ChatRoomMessages, ChatRoomParticipantList,
ChatThread, ChatThreadList, UnreadChatRooms, CreateReactionResponse,
)
from circle.models.headless.notifications import (
Expand Down Expand Up @@ -47,6 +47,22 @@ def _page_params(page=1, per_page=10):
return {"page": page, "per_page": per_page}


def _text_to_tiptap(text: str) -> Dict[str, Any]:
"""Convert plain text to Circle's tiptap rich_text_body format."""
content = []
for p in text.split("\n\n"):
if not p.strip():
continue
lines = p.split("\n")
para_content: List[Dict[str, Any]] = []
for i, line in enumerate(lines):
if i > 0:
para_content.append({"type": "hardBreak"})
para_content.append({"type": "text", "text": line})
content.append({"type": "paragraph", "content": para_content})
return {"body": {"type": "doc", "content": content}}


class HeadlessChatNotifMembersClient:
def __init__(self, transport: SyncTransport) -> None:
self._t = transport
Expand All @@ -55,17 +71,19 @@ def __init__(self, transport: SyncTransport) -> None:
def list_chat_rooms(self, *, page: int = 1, per_page: int = 10) -> ChatRoomList:
return ChatRoomList.model_validate(self._t.request("GET", f"{_P}/messages", params=_page_params(page, per_page)))

def create_chat_room(self, **kw: Any) -> ChatRoom:
return ChatRoom.model_validate(self._t.request("POST", f"{_P}/messages", json={"chat_room": kw}))
def create_chat_room(self, **kw: Any) -> ChatRoomDetail:
return ChatRoomDetail.model_validate(self._t.request("POST", f"{_P}/messages", json={"chat_room": kw}))

def get_chat_room(self, uuid: str) -> ChatRoom:
return ChatRoom.model_validate(self._t.request("GET", f"{_P}/messages/{uuid}"))
def get_chat_room(self, uuid: str) -> ChatRoomDetail:
return ChatRoomDetail.model_validate(self._t.request("GET", f"{_P}/messages/{uuid}"))

def list_chat_messages(self, chat_room_uuid: str, **kw: Any) -> ChatRoomMessages:
return ChatRoomMessages.model_validate(
self._t.request("GET", f"{_P}/messages/{chat_room_uuid}/chat_room_messages", params=_chat_msg_params(**kw)))

def create_chat_message(self, chat_room_uuid: str, **kw: Any) -> Dict[str, Any]:
def create_chat_message(self, chat_room_uuid: str, *, body: Optional[str] = None, **kw: Any) -> Dict[str, Any]:
if body is not None and "rich_text_body" not in kw:
kw["rich_text_body"] = _text_to_tiptap(body)
return self._t.request("POST", f"{_P}/messages/{chat_room_uuid}/chat_room_messages", json=kw)

def get_chat_message(self, chat_room_uuid: str, message_id: int) -> ChatRoomMessage:
Expand All @@ -83,8 +101,8 @@ def list_chat_participants(self, chat_room_uuid: str, *, page: int = 1, per_page
return ChatRoomParticipantList.model_validate(
self._t.request("GET", f"{_P}/messages/{chat_room_uuid}/chat_room_participants", params=_page_params(page, per_page)))

def update_chat_participant(self, chat_room_uuid: str, participant_id: int, **kw: Any) -> ChatRoom:
return ChatRoom.model_validate(
def update_chat_participant(self, chat_room_uuid: str, participant_id: int, **kw: Any) -> ChatRoomDetail:
return ChatRoomDetail.model_validate(
self._t.request("PUT", f"{_P}/messages/{chat_room_uuid}/chat_room_participants/{participant_id}", json=kw))

def mark_chat_as_read(self, uuid: str) -> None:
Expand Down Expand Up @@ -194,17 +212,19 @@ def __init__(self, transport: AsyncTransport) -> None:
async def list_chat_rooms(self, *, page: int = 1, per_page: int = 10) -> ChatRoomList:
return ChatRoomList.model_validate(await self._t.request("GET", f"{_P}/messages", params=_page_params(page, per_page)))

async def create_chat_room(self, **kw: Any) -> ChatRoom:
return ChatRoom.model_validate(await self._t.request("POST", f"{_P}/messages", json={"chat_room": kw}))
async def create_chat_room(self, **kw: Any) -> ChatRoomDetail:
return ChatRoomDetail.model_validate(await self._t.request("POST", f"{_P}/messages", json={"chat_room": kw}))

async def get_chat_room(self, uuid: str) -> ChatRoom:
return ChatRoom.model_validate(await self._t.request("GET", f"{_P}/messages/{uuid}"))
async def get_chat_room(self, uuid: str) -> ChatRoomDetail:
return ChatRoomDetail.model_validate(await self._t.request("GET", f"{_P}/messages/{uuid}"))

async def list_chat_messages(self, chat_room_uuid: str, **kw: Any) -> ChatRoomMessages:
return ChatRoomMessages.model_validate(
await self._t.request("GET", f"{_P}/messages/{chat_room_uuid}/chat_room_messages", params=_chat_msg_params(**kw)))

async def create_chat_message(self, chat_room_uuid: str, **kw: Any) -> Dict[str, Any]:
async def create_chat_message(self, chat_room_uuid: str, *, body: Optional[str] = None, **kw: Any) -> Dict[str, Any]:
if body is not None and "rich_text_body" not in kw:
kw["rich_text_body"] = _text_to_tiptap(body)
return await self._t.request("POST", f"{_P}/messages/{chat_room_uuid}/chat_room_messages", json=kw)

async def get_chat_message(self, chat_room_uuid: str, message_id: int) -> ChatRoomMessage:
Expand All @@ -222,8 +242,8 @@ async def list_chat_participants(self, chat_room_uuid: str, *, page: int = 1, pe
return ChatRoomParticipantList.model_validate(
await self._t.request("GET", f"{_P}/messages/{chat_room_uuid}/chat_room_participants", params=_page_params(page, per_page)))

async def update_chat_participant(self, chat_room_uuid: str, participant_id: int, **kw: Any) -> ChatRoom:
return ChatRoom.model_validate(
async def update_chat_participant(self, chat_room_uuid: str, participant_id: int, **kw: Any) -> ChatRoomDetail:
return ChatRoomDetail.model_validate(
await self._t.request("PUT", f"{_P}/messages/{chat_room_uuid}/chat_room_participants/{participant_id}", json=kw))

async def mark_chat_as_read(self, uuid: str) -> None:
Expand Down
34 changes: 34 additions & 0 deletions src/circle/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,40 @@ def close(self) -> None:
self._admin_transport.close()
self._bearer_transport.close()

def headless_as_user(self, *, email: Optional[str] = None,
community_member_id: Optional[int] = None) -> _HeadlessNamespace:
"""Authenticate as a user and return a headless namespace for DMs, notifications, etc.

Requires the client to be initialized with a Headless Auth API token
(generated in Circle admin > Developers > Tokens > Headless Auth).

The returned namespace has a short-lived access token. Create a new one
for each session.

Args:
email: User's email address.
community_member_id: User's community member ID.

Returns:
A ``_HeadlessNamespace`` authenticated as the specified user.

Example:
>>> client = CircleClient(api_token="HEADLESS_AUTH_TOKEN")
>>> headless = client.headless_as_user(email="user@example.com")
>>> rooms = headless.chat_notif_members.list_chat_rooms()
>>> headless.chat_notif_members.create_chat_message(uuid, body="Hello!")
"""
token = self.auth.create_auth_token(
email=email, community_member_id=community_member_id
)
url = self._bearer_transport._base_url
rl = self._bearer_transport._rate_limiter
user_transport = SyncTransport(
api_token=token.access_token, base_url=url,
auth_scheme="Bearer", rate_limit=rl._rate if rl else None,
)
return _HeadlessNamespace(user_transport)

def __enter__(self) -> CircleClient:
return self

Expand Down
3 changes: 2 additions & 1 deletion src/circle/models/headless/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,10 @@ class ChatRoomDetail(CircleModel):


class ChatRoom(CircleModel):
"""Deprecated: use ChatRoomDetail directly. Kept for backward compatibility."""
chat_room: Optional[ChatRoomDetail] = None

ChatRoomList = PaginatedResponse[ChatRoom]
ChatRoomList = PaginatedResponse[ChatRoomDetail]


class ChatThreadRoom(CircleModel):
Expand Down
Loading