From d53e204f9c61434e1518de897b5a5f4b6a7842d2 Mon Sep 17 00:00:00 2001 From: Kevin Mehall Date: Mon, 23 May 2016 11:27:36 -0700 Subject: [PATCH] Add C and Python APIs --- CMakeLists.txt | 1 + commkit.py | 141 ++++++++++++++++++++++++++++++++++++++ include/commkit/commkit.h | 46 ++++++++++++- src/capi.cpp | 108 +++++++++++++++++++++++++++++ 4 files changed, 295 insertions(+), 1 deletion(-) create mode 100644 commkit.py create mode 100644 src/capi.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index a38e471..f034780 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,6 +31,7 @@ set(COMMKIT_SRCS src/subscriber.cpp src/subscriberimpl.cpp src/topic.cpp + src/capi.cpp ) set(CMAKE_POSITION_INDEPENDENT_CODE True) diff --git a/commkit.py b/commkit.py new file mode 100644 index 0000000..24091ef --- /dev/null +++ b/commkit.py @@ -0,0 +1,141 @@ +from ctypes import * +import capnp +libcommkit = CDLL('libcommkit.dylib') + +class COpts(Structure): + _fields_ = [ + ("reliable", c_bool), + ] + +class CPayload(Structure): + _fields_ = [ + ("bytes", POINTER(c_ubyte)), + ("len", c_size_t), + ("sequence", c_uint64), + ("source_timestamp", c_uint64), + ] + + def to_python(self, topic): + return Payload(topic.decode_msg(string_at(self.bytes, self.len)), self.sequence, self.source_timestamp) + +class Topic: + def __init__(self, name, datatype, reliable=False): + self.name = name + self.datatype = datatype + self.reliable = reliable + + def is_capnp(self): + return isinstance(self.datatype, capnp._StructModule) + + def datatype_name(self): + if self.is_capnp(): + return "{:016x}".format(self.datatype.schema.node.id) + else: + return self.datatype + + def decode_msg(self, bytes): + if self.is_capnp(): + return self.datatype.from_bytes(bytes) + else: + return bytes + + def encode_msg(self, msg): + if self.is_capnp(): + return bytearray(msg.to_bytes()) + else: + return bytearray(msg) + +class Node: + def __init__(self, name): + self._as_parameter_ = libcommkit.commkit_node_create(name) + + def from_param(self): return self._as_parameter_ + + def __del__(self): + libcommkit.commkit_node_destroy(self) + + def subscribe(self, topic): + opts = COpts() + opts.reliable = topic.reliable + return Subscriber(libcommkit.commkit_subscriber_create(self, topic.name, topic.datatype_name(), byref(opts)), topic) + + def publish(self, topic): + opts = COpts() + opts.reliable = topic.reliable + return Publisher(libcommkit.commkit_publisher_create(self, topic.name, topic.datatype_name(), byref(opts)), topic) + +class Subscriber: + def __init__(self, obj, topic): + self._as_parameter_ = obj + self.topic = topic + + def from_param(self): return self._as_parameter_ + + def __del__(self): + libcommkit.commkit_subscriber_destroy(self) + + def wait_for_message(self): + libcommkit.commkit_wait_for_message(self) + + def matched_publishers(self): + return libcommkit.commkit_matched_publishers(self) + + def peek(self): + p = CPayload() + if libcommkit.commkit_peek(self, byref(p)): + return p.to_python(self.topic) + else: + return None + + def take(self): + p = CPayload() + if libcommkit.commkit_take(self, byref(p)): + return p.to_python(self.topic) + else: + return None + +class Payload: + def __init__(self, data, sequence, source_timestamp): + self.data = data + self.sequence = sequence + self.source_timestamp = source_timestamp + +class Publisher: + def __init__(self, obj, topic): + self._as_parameter_ = obj + self.topic = topic + + def from_param(self): return self._as_parameter_ + + def __del__(self): + libcommkit.commkit_publisher_destroy(self) + + def publish(self, data): + data = self.topic.encode_msg(data) + return libcommkit.commkit_publish(self, byref(c_ubyte.from_buffer(data)), len(data)) + + def matched_subscribers(self): + return libcommkit.commkit_matched_subscribers(self) + + +def fntype(fn, res, args): + f = getattr(libcommkit, fn) + f.argtypes = args + f.restype = res + +fntype('commkit_node_create', c_void_p, (c_char_p,)) +fntype('commkit_node_destroy', None, (c_void_p,)) + +fntype('commkit_subscriber_create', c_void_p, (c_void_p, c_char_p, c_char_p, POINTER(COpts))) +fntype('commkit_subscriber_destroy', None, (c_void_p,)) + +fntype('commkit_publisher_create', c_void_p, (c_void_p, c_char_p, c_char_p, POINTER(COpts))) +fntype('commkit_publisher_destroy', None, (c_void_p,)) + +fntype('commkit_wait_for_message', None, (c_void_p,)) +fntype('commkit_matched_publishers', None, (c_void_p,)) +fntype('commkit_peek', c_bool, (c_void_p, POINTER(CPayload))) +fntype('commkit_take', c_bool, (c_void_p, POINTER(CPayload))) + +fntype('commkit_publish', None, (c_void_p, POINTER(c_ubyte), c_size_t)) +fntype('commkit_matched_subscribers', None, (c_void_p,)) diff --git a/include/commkit/commkit.h b/include/commkit/commkit.h index 31892e8..a857e3a 100644 --- a/include/commkit/commkit.h +++ b/include/commkit/commkit.h @@ -3,7 +3,8 @@ /* * Helper to include common apis. */ - +#include +#ifdef __cplusplus #include #include #include @@ -13,3 +14,46 @@ #include #include #include + +extern "C" { +#endif + +typedef struct { + uint8_t *bytes; + size_t len; + int64_t sequence; + uint64_t source_timestamp; +} commkit_payload; + +struct commkit_opts { + bool reliable; +}; + +typedef struct commkit_opts commkit_subscriber_opts; +typedef struct commkit_opts commkit_publisher_opts; + +typedef struct commkit_node commkit_node; +typedef struct commkit_subscriber commkit_subscriber; +typedef struct commkit_publisher commkit_publisher; + +COMMKIT_API commkit_node* commkit_node_create(const char* name); +COMMKIT_API void commkit_node_destroy(commkit_node* node); + +COMMKIT_API commkit_subscriber* commkit_subscriber_create(commkit_node* node, const char* topic_name, const char* datatype, const commkit_subscriber_opts *opts); +COMMKIT_API void commkit_subscriber_destroy(commkit_subscriber* sub); + +COMMKIT_API commkit_publisher* commkit_publisher_create(commkit_node* node, const char* topic_name, const char* datatype, const commkit_publisher_opts *opts); +COMMKIT_API void commkit_publisher_destroy(commkit_publisher* pub); + +COMMKIT_API void commkit_wait_for_message(commkit_subscriber* sub); +COMMKIT_API int commkit_matched_publishers(commkit_subscriber* sub); +COMMKIT_API bool commkit_peek(commkit_subscriber* sub, commkit_payload* payload); +COMMKIT_API bool commkit_take(commkit_subscriber* sub, commkit_payload* payload); + +COMMKIT_API bool commkit_publish(commkit_publisher* pub, uint8_t* data, size_t len); +COMMKIT_API int commkit_matched_subscribers(commkit_publisher* pub); + +#ifdef __cplusplus +} +#endif + diff --git a/src/capi.cpp b/src/capi.cpp new file mode 100644 index 0000000..4e9cd9c --- /dev/null +++ b/src/capi.cpp @@ -0,0 +1,108 @@ +#include + +struct commkit_node { + commkit::Node node; +}; + +struct commkit_subscriber { + commkit::SubscriberPtr s; +}; + +struct commkit_publisher { + commkit::PublisherPtr p; +}; + +commkit_node* commkit_node_create(const char* name) { + commkit_node* n = new commkit_node; + n->node.init(name); + + return n; +} + +void commkit_node_destroy(commkit_node* node) { + delete node; +} + +commkit_subscriber* commkit_subscriber_create(commkit_node* node, const char* topic_name, const char* datatype, const commkit_subscriber_opts *opts) { + commkit::Topic t(topic_name, datatype, 4096); + commkit_subscriber* s = new commkit_subscriber; + s->s = node->node.createSubscriber(t); + + commkit::SubscriptionOpts sub_opts; + sub_opts.reliable = opts->reliable; + + if (!s->s->init(sub_opts)) { + return nullptr; + } + + return s; +} + +void commkit_subscriber_destroy(commkit_subscriber* sub) { + delete sub; +} + +commkit_publisher* commkit_publisher_create(commkit_node* node, const char* topic_name, const char* datatype, const commkit_publisher_opts *opts) { + commkit::Topic t(topic_name, datatype, 4096); + commkit_publisher* p = new commkit_publisher; + p->p = node->node.createPublisher(t); + + commkit::PublicationOpts pub_opts; + pub_opts.reliable = opts->reliable; + pub_opts.history = 0; + + if (!p->p->init(pub_opts)) { + return nullptr; + } + + return p; +} + +void commkit_publisher_destroy(commkit_publisher* pub) { + delete pub; +} + +void commkit_wait_for_message(commkit_subscriber* sub) { + sub->s->waitForMessage(); +} + +int commkit_matched_publishers(commkit_subscriber* sub) { + return sub->s->matchedPublishers(); +} + +bool commkit_peek(commkit_subscriber* sub, commkit_payload* payload) { + commkit::Payload p; + if (sub->s->peek(&p)) { + payload->bytes = p.bytes; + payload->len = p.len; + payload->sequence = p.sequence; + payload->source_timestamp = std::chrono::time_point_cast(p.sourceTimestamp).time_since_epoch().count(); + return true; + } else { + return false; + } +} + +bool commkit_take(commkit_subscriber* sub, commkit_payload* payload) { + commkit::Payload p; + if (sub->s->take(&p)) { + payload->bytes = p.bytes; + payload->len = p.len; + payload->sequence = p.sequence; + payload->source_timestamp = std::chrono::time_point_cast(p.sourceTimestamp).time_since_epoch().count(); + return true; + } else { + return false; + } +} + +bool commkit_publish(commkit_publisher* pub, uint8_t* data, size_t len) { + return pub->p->publish(data, len); +} + +int commkit_matched_subscribers(commkit_publisher* pub) { + return pub->p->matchedSubscribers(); +} + + +