Skip to content
This repository was archived by the owner on Feb 23, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ set(COMMKIT_SRCS
src/subscriber.cpp
src/subscriberimpl.cpp
src/topic.cpp
src/capi.cpp
)

set(CMAKE_POSITION_INDEPENDENT_CODE True)
Expand Down
141 changes: 141 additions & 0 deletions commkit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from ctypes import *
import capnp
libcommkit = CDLL('libcommkit.dylib')

class COpts(Structure):
_fields_ = [
("reliable", c_bool),
]

class CPayload(Structure):
_fields_ = [
("bytes", POINTER(c_ubyte)),
("len", c_size_t),
("sequence", c_uint64),
("source_timestamp", c_uint64),
]

def to_python(self, topic):
return Payload(topic.decode_msg(string_at(self.bytes, self.len)), self.sequence, self.source_timestamp)

class Topic:
def __init__(self, name, datatype, reliable=False):
self.name = name
self.datatype = datatype
self.reliable = reliable

def is_capnp(self):
return isinstance(self.datatype, capnp._StructModule)

def datatype_name(self):
if self.is_capnp():
return "{:016x}".format(self.datatype.schema.node.id)
else:
return self.datatype

def decode_msg(self, bytes):
if self.is_capnp():
return self.datatype.from_bytes(bytes)
else:
return bytes

def encode_msg(self, msg):
if self.is_capnp():
return bytearray(msg.to_bytes())
else:
return bytearray(msg)

class Node:
def __init__(self, name):
self._as_parameter_ = libcommkit.commkit_node_create(name)

def from_param(self): return self._as_parameter_

def __del__(self):
libcommkit.commkit_node_destroy(self)

def subscribe(self, topic):
opts = COpts()
opts.reliable = topic.reliable
return Subscriber(libcommkit.commkit_subscriber_create(self, topic.name, topic.datatype_name(), byref(opts)), topic)

def publish(self, topic):
opts = COpts()
opts.reliable = topic.reliable
return Publisher(libcommkit.commkit_publisher_create(self, topic.name, topic.datatype_name(), byref(opts)), topic)

class Subscriber:
def __init__(self, obj, topic):
self._as_parameter_ = obj
self.topic = topic

def from_param(self): return self._as_parameter_

def __del__(self):
libcommkit.commkit_subscriber_destroy(self)

def wait_for_message(self):
libcommkit.commkit_wait_for_message(self)

def matched_publishers(self):
return libcommkit.commkit_matched_publishers(self)

def peek(self):
p = CPayload()
if libcommkit.commkit_peek(self, byref(p)):
return p.to_python(self.topic)
else:
return None

def take(self):
p = CPayload()
if libcommkit.commkit_take(self, byref(p)):
return p.to_python(self.topic)
else:
return None

class Payload:
def __init__(self, data, sequence, source_timestamp):
self.data = data
self.sequence = sequence
self.source_timestamp = source_timestamp

class Publisher:
def __init__(self, obj, topic):
self._as_parameter_ = obj
self.topic = topic

def from_param(self): return self._as_parameter_

def __del__(self):
libcommkit.commkit_publisher_destroy(self)

def publish(self, data):
data = self.topic.encode_msg(data)
return libcommkit.commkit_publish(self, byref(c_ubyte.from_buffer(data)), len(data))

def matched_subscribers(self):
return libcommkit.commkit_matched_subscribers(self)


def fntype(fn, res, args):
f = getattr(libcommkit, fn)
f.argtypes = args
f.restype = res

fntype('commkit_node_create', c_void_p, (c_char_p,))
fntype('commkit_node_destroy', None, (c_void_p,))

fntype('commkit_subscriber_create', c_void_p, (c_void_p, c_char_p, c_char_p, POINTER(COpts)))
fntype('commkit_subscriber_destroy', None, (c_void_p,))

fntype('commkit_publisher_create', c_void_p, (c_void_p, c_char_p, c_char_p, POINTER(COpts)))
fntype('commkit_publisher_destroy', None, (c_void_p,))

fntype('commkit_wait_for_message', None, (c_void_p,))
fntype('commkit_matched_publishers', None, (c_void_p,))
fntype('commkit_peek', c_bool, (c_void_p, POINTER(CPayload)))
fntype('commkit_take', c_bool, (c_void_p, POINTER(CPayload)))

fntype('commkit_publish', None, (c_void_p, POINTER(c_ubyte), c_size_t))
fntype('commkit_matched_subscribers', None, (c_void_p,))
46 changes: 45 additions & 1 deletion include/commkit/commkit.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
/*
* Helper to include common apis.
*/

#include <commkit/visibility.h>
#ifdef __cplusplus
#include <commkit/callback.h>
#include <commkit/chrono.h>
#include <commkit/types.h>
Expand All @@ -13,3 +14,46 @@
#include <commkit/publisher.h>
#include <commkit/subscriber.h>
#include <commkit/rtps.h>

extern "C" {
#endif

typedef struct {
uint8_t *bytes;
size_t len;
int64_t sequence;
uint64_t source_timestamp;
} commkit_payload;

struct commkit_opts {
bool reliable;
};

typedef struct commkit_opts commkit_subscriber_opts;
typedef struct commkit_opts commkit_publisher_opts;

typedef struct commkit_node commkit_node;
typedef struct commkit_subscriber commkit_subscriber;
typedef struct commkit_publisher commkit_publisher;

COMMKIT_API commkit_node* commkit_node_create(const char* name);
COMMKIT_API void commkit_node_destroy(commkit_node* node);

COMMKIT_API commkit_subscriber* commkit_subscriber_create(commkit_node* node, const char* topic_name, const char* datatype, const commkit_subscriber_opts *opts);
COMMKIT_API void commkit_subscriber_destroy(commkit_subscriber* sub);

COMMKIT_API commkit_publisher* commkit_publisher_create(commkit_node* node, const char* topic_name, const char* datatype, const commkit_publisher_opts *opts);
COMMKIT_API void commkit_publisher_destroy(commkit_publisher* pub);

COMMKIT_API void commkit_wait_for_message(commkit_subscriber* sub);
COMMKIT_API int commkit_matched_publishers(commkit_subscriber* sub);
COMMKIT_API bool commkit_peek(commkit_subscriber* sub, commkit_payload* payload);
COMMKIT_API bool commkit_take(commkit_subscriber* sub, commkit_payload* payload);

COMMKIT_API bool commkit_publish(commkit_publisher* pub, uint8_t* data, size_t len);
COMMKIT_API int commkit_matched_subscribers(commkit_publisher* pub);

#ifdef __cplusplus
}
#endif

108 changes: 108 additions & 0 deletions src/capi.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#include <commkit/commkit.h>

struct commkit_node {
commkit::Node node;
};

struct commkit_subscriber {
commkit::SubscriberPtr s;
};

struct commkit_publisher {
commkit::PublisherPtr p;
};

commkit_node* commkit_node_create(const char* name) {
commkit_node* n = new commkit_node;
n->node.init(name);

return n;
}

void commkit_node_destroy(commkit_node* node) {
delete node;
}

commkit_subscriber* commkit_subscriber_create(commkit_node* node, const char* topic_name, const char* datatype, const commkit_subscriber_opts *opts) {
commkit::Topic t(topic_name, datatype, 4096);
commkit_subscriber* s = new commkit_subscriber;
s->s = node->node.createSubscriber(t);

commkit::SubscriptionOpts sub_opts;
sub_opts.reliable = opts->reliable;

if (!s->s->init(sub_opts)) {
return nullptr;
}

return s;
}

void commkit_subscriber_destroy(commkit_subscriber* sub) {
delete sub;
}

commkit_publisher* commkit_publisher_create(commkit_node* node, const char* topic_name, const char* datatype, const commkit_publisher_opts *opts) {
commkit::Topic t(topic_name, datatype, 4096);
commkit_publisher* p = new commkit_publisher;
p->p = node->node.createPublisher(t);

commkit::PublicationOpts pub_opts;
pub_opts.reliable = opts->reliable;
pub_opts.history = 0;

if (!p->p->init(pub_opts)) {
return nullptr;
}

return p;
}

void commkit_publisher_destroy(commkit_publisher* pub) {
delete pub;
}

void commkit_wait_for_message(commkit_subscriber* sub) {
sub->s->waitForMessage();
}

int commkit_matched_publishers(commkit_subscriber* sub) {
return sub->s->matchedPublishers();
}

bool commkit_peek(commkit_subscriber* sub, commkit_payload* payload) {
commkit::Payload p;
if (sub->s->peek(&p)) {
payload->bytes = p.bytes;
payload->len = p.len;
payload->sequence = p.sequence;
payload->source_timestamp = std::chrono::time_point_cast<std::chrono::nanoseconds>(p.sourceTimestamp).time_since_epoch().count();
return true;
} else {
return false;
}
}

bool commkit_take(commkit_subscriber* sub, commkit_payload* payload) {
commkit::Payload p;
if (sub->s->take(&p)) {
payload->bytes = p.bytes;
payload->len = p.len;
payload->sequence = p.sequence;
payload->source_timestamp = std::chrono::time_point_cast<std::chrono::nanoseconds>(p.sourceTimestamp).time_since_epoch().count();
return true;
} else {
return false;
}
}

bool commkit_publish(commkit_publisher* pub, uint8_t* data, size_t len) {
return pub->p->publish(data, len);
}

int commkit_matched_subscribers(commkit_publisher* pub) {
return pub->p->matchedSubscribers();
}