Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 176 additions & 0 deletions gotests/cgo_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ package main
#include <bpf/bpf.h>
#include <bpf/libbpf.h>

struct netdata_ringbuf_stats {
uint64_t samples;
uint64_t bytes;
};

#ifdef LIBBPF_MAJOR_VERSION
static int netdata_libbpf_probe_bpf_map_type(unsigned int map_type)
{
Expand Down Expand Up @@ -113,6 +118,99 @@ static int netdata_close_fd(int fd)
{
return close(fd);
}

static int netdata_ring_buffer_sample_cb(void *ctx, void *data, size_t size)
{
struct netdata_ringbuf_stats *stats = ctx;

(void)data;
if (stats) {
stats->samples++;
stats->bytes += size;
}

return 0;
}

static struct netdata_ringbuf_stats *netdata_ringbuf_stats_new(void)
{
return calloc(1, sizeof(struct netdata_ringbuf_stats));
}

static void netdata_ringbuf_stats_free(struct netdata_ringbuf_stats *stats)
{
free(stats);
}

static uint64_t netdata_ringbuf_stats_samples(const struct netdata_ringbuf_stats *stats)
{
return stats ? stats->samples : 0;
}

static uint64_t netdata_ringbuf_stats_bytes(const struct netdata_ringbuf_stats *stats)
{
return stats ? stats->bytes : 0;
}

static struct ring_buffer *netdata_ring_buffer_new(int map_fd, struct netdata_ringbuf_stats *stats)
{
return ring_buffer__new(map_fd, netdata_ring_buffer_sample_cb, stats, NULL);
}

static int netdata_ring_buffer_poll(struct ring_buffer *rb, int timeout_ms)
{
return ring_buffer__poll(rb, timeout_ms);
}

static uint64_t netdata_ring_buffer_avail_data(const struct ring_buffer *rb)
{
struct ring *ring = ring_buffer__ring((struct ring_buffer *)rb, 0);

if (!ring)
return 0;

return (uint64_t)ring__avail_data_size(ring);
}

static uint64_t netdata_ring_buffer_size(const struct ring_buffer *rb)
{
struct ring *ring = ring_buffer__ring((struct ring_buffer *)rb, 0);

if (!ring)
return 0;

return (uint64_t)ring__size(ring);
}

static void netdata_ring_buffer_free(struct ring_buffer *rb)
{
ring_buffer__free(rb);
}

static struct user_ring_buffer *netdata_user_ring_buffer_new(int map_fd)
{
return user_ring_buffer__new(map_fd, NULL);
}

static int netdata_user_ring_buffer_submit_u64(struct user_ring_buffer *rb, uint64_t value)
{
void *sample;

errno = 0;
sample = user_ring_buffer__reserve(rb, sizeof(value));
if (!sample)
return errno ? -errno : -1;

memcpy(sample, &value, sizeof(value));
user_ring_buffer__submit(rb, sample);

return 0;
}

static void netdata_user_ring_buffer_free(struct user_ring_buffer *rb)
{
user_ring_buffer__free(rb);
}
*/
import "C"

Expand All @@ -129,6 +227,8 @@ const (
bpfMapTypeArray = uint32(C.BPF_MAP_TYPE_ARRAY)
bpfMapTypePerCPUHash = uint32(C.BPF_MAP_TYPE_PERCPU_HASH)
bpfMapTypePerCPUArray = uint32(C.BPF_MAP_TYPE_PERCPU_ARRAY)
bpfMapTypeRingBuf = uint32(C.BPF_MAP_TYPE_RINGBUF)
bpfMapTypeUserRingBuf = uint32(C.BPF_MAP_TYPE_USER_RINGBUF)
)

type bpfObject struct {
Expand All @@ -147,6 +247,15 @@ type bpfLink struct {
ptr *C.struct_bpf_link
}

type ringBuffer struct {
ptr *C.struct_ring_buffer
stats *C.struct_netdata_ringbuf_stats
}

type userRingBuffer struct {
ptr *C.struct_user_ring_buffer
}

type mapMeta struct {
Name string
FD int
Expand Down Expand Up @@ -365,3 +474,70 @@ func closeFD(fd int) error {

return nil
}

func newRingBuffer(mapFD int) (*ringBuffer, int) {
stats := C.netdata_ringbuf_stats_new()
if stats == nil {
return nil, -int(C.ENOMEM)
}

rb := C.netdata_ring_buffer_new(C.int(mapFD), stats)
if err := int(C.netdata_libbpf_get_error(unsafe.Pointer(rb))); err != 0 {
C.netdata_ringbuf_stats_free(stats)
return nil, err
}

return &ringBuffer{ptr: rb, stats: stats}, 0
}

func (rb *ringBuffer) free() {
if rb == nil {
return
}

if rb.ptr != nil {
C.netdata_ring_buffer_free(rb.ptr)
}
if rb.stats != nil {
C.netdata_ringbuf_stats_free(rb.stats)
}
}

func (rb *ringBuffer) poll(timeoutMS int) int {
return int(C.netdata_ring_buffer_poll(rb.ptr, C.int(timeoutMS)))
}

func (rb *ringBuffer) samples() uint64 {
return uint64(C.netdata_ringbuf_stats_samples(rb.stats))
}

func (rb *ringBuffer) bytes() uint64 {
return uint64(C.netdata_ringbuf_stats_bytes(rb.stats))
}

func (rb *ringBuffer) availData() uint64 {
return uint64(C.netdata_ring_buffer_avail_data(rb.ptr))
}

func (rb *ringBuffer) size() uint64 {
return uint64(C.netdata_ring_buffer_size(rb.ptr))
}

func newUserRingBuffer(mapFD int) (*userRingBuffer, int) {
rb := C.netdata_user_ring_buffer_new(C.int(mapFD))
if err := int(C.netdata_libbpf_get_error(unsafe.Pointer(rb))); err != 0 {
return nil, err
}

return &userRingBuffer{ptr: rb}, 0
}

func (rb *userRingBuffer) free() {
if rb != nil && rb.ptr != nil {
C.netdata_user_ring_buffer_free(rb.ptr)
}
}

func (rb *userRingBuffer) submitUint64(value uint64) int {
return int(C.netdata_user_ring_buffer_submit_u64(rb.ptr, C.uint64_t(value)))
}
128 changes: 123 additions & 5 deletions gotests/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,9 +663,18 @@ func detectSupportedMapTypes(rhfVersion int, kernelVersion int) map[uint32]bool
bpfMapTypeArray: kernelVersion >= netdataMinimumEBPFKernel || rhfVersion > 0,
bpfMapTypePerCPUHash: fallbackPerCPUMapSupport(rhfVersion, kernelVersion),
bpfMapTypePerCPUArray: fallbackPerCPUMapSupport(rhfVersion, kernelVersion),
}

for _, mapType := range []uint32{bpfMapTypeHash, bpfMapTypeArray, bpfMapTypePerCPUHash, bpfMapTypePerCPUArray} {
bpfMapTypeRingBuf: false,
bpfMapTypeUserRingBuf: false,
}

for _, mapType := range []uint32{
bpfMapTypeHash,
bpfMapTypeArray,
bpfMapTypePerCPUHash,
bpfMapTypePerCPUArray,
bpfMapTypeRingBuf,
bpfMapTypeUserRingBuf,
} {
if probe := probeMapTypeSupport(mapType); probe >= 0 {
supported[mapType] = probe > 0
}
Expand All @@ -684,14 +693,25 @@ func mapTypeName(mapType uint32) string {
return "percpu_hash"
case bpfMapTypePerCPUArray:
return "percpu_array"
case bpfMapTypeRingBuf:
return "ringbuf"
case bpfMapTypeUserRingBuf:
return "user_ringbuf"
default:
return fmt.Sprintf("type_%d", mapType)
}
}

func writeSupportedMapTypes(w io.Writer, supported map[uint32]bool) {
names := make([]string, 0, 4)
for _, mapType := range []uint32{bpfMapTypeHash, bpfMapTypeArray, bpfMapTypePerCPUHash, bpfMapTypePerCPUArray} {
names := make([]string, 0, 6)
for _, mapType := range []uint32{
bpfMapTypeHash,
bpfMapTypeArray,
bpfMapTypePerCPUHash,
bpfMapTypePerCPUArray,
bpfMapTypeRingBuf,
bpfMapTypeUserRingBuf,
} {
if supported[mapType] {
names = append(names, fmt.Sprintf("\"%s\"", mapTypeName(mapType)))
}
Expand Down Expand Up @@ -1121,6 +1141,18 @@ func isPerCPUMapType(mapType uint32) bool {
return mapType == bpfMapTypePerCPUArray || mapType == bpfMapTypePerCPUHash
}

func isRingBufferMapType(mapType uint32) bool {
return mapType == bpfMapTypeRingBuf || mapType == bpfMapTypeUserRingBuf
}

func isUserRingBufferMapType(mapType uint32) bool {
return mapType == bpfMapTypeUserRingBuf
}

func supportsMapKeyValueIO(mapType uint32) bool {
return !isRingBufferMapType(mapType)
}

func roundUpSize(value int, align int) int {
return ((value + align - 1) / align) * align
}
Expand Down Expand Up @@ -1232,10 +1264,93 @@ func controllerJSON(w io.Writer, fd int, meta mapMeta, nprocesses int) {
filled+zero, filled, zero)
}

func testRingBufferMap(w io.Writer, meta mapMeta, iterations int) {
mode := "ringbuf_consumer"
var (
rb *ringBuffer
urb *userRingBuffer
setupErr int
)

if isUserRingBufferMapType(meta.Type) {
mode = "user_ringbuf_producer"
urb, setupErr = newUserRingBuffer(meta.FD)
} else {
rb, setupErr = newRingBuffer(meta.FD)
}

fmt.Fprintf(w,
" \"%s\" : {\n \"Info\" : { \"Length\" : { \"Key\" : %d, \"Value\" : %d},\n"+
" \"Type\" : %d,\n"+
" \"FD\" : %d,\n"+
" \"Data\" : [\n",
meta.Name, meta.KeySize, meta.ValueSize, meta.Type, meta.FD)

var prevSamples uint64
var prevBytes uint64
for i := 0; i < iterations; i++ {
time.Sleep(5 * time.Second)

opErr := setupErr
opResult := 0
var iterSamples uint64
var iterBytes uint64
var availData uint64
var ringSize uint64

if rb != nil {
opResult = rb.poll(0)
if opResult < 0 {
opErr = opResult
}

samples := rb.samples()
bytes := rb.bytes()
iterSamples = samples - prevSamples
iterBytes = bytes - prevBytes
prevSamples = samples
prevBytes = bytes
availData = rb.availData()
ringSize = rb.size()
} else if urb != nil {
opResult = urb.submitUint64(uint64(i + 1))
if opResult < 0 {
opErr = opResult
}
}

if i > 0 {
fmt.Fprint(w, ",\n")
}
fmt.Fprintf(w,
" { \"Iteration\" : %d, \"Mode\" : \"%s\", \"Setup\" : %d, "+
"\"Operation Result\" : %d, \"Samples\" : %d, \"Bytes\" : %d, "+
"\"Available\" : %d, \"Ring Size\" : %d, \"Error Code\" : %d, "+
"\"Error Message\" : \"%s\" }",
i, mode, boolToInt(setupErr == 0), opResult, iterSamples, iterBytes,
availData, ringSize, opErr, describeError(opErr))
}
fmt.Fprint(w, "\n")

if rb != nil {
rb.free()
}
if urb != nil {
urb.free()
}
}

func testMaps(w io.Writer, obj *bpfObject, ctrl string, iterations int, nprocesses int) {
tables := 0
for m := obj.firstMap(); m != nil; m = obj.nextMap(m) {
meta := m.meta()
if !supportsMapKeyValueIO(meta.Type) {
testRingBufferMap(w, meta, iterations)
fmt.Fprint(w, " ]\n }\n },\n")
tables++
continue
}

values := allocateTableData(meta, nprocesses)
fmt.Fprintf(w,
" \"%s\" : {\n \"Info\" : { \"Length\" : { \"Key\" : %d, \"Value\" : %d},\n"+
Expand Down Expand Up @@ -1266,6 +1381,9 @@ func fillCtrl(obj *bpfObject, ctrl string, mapLevel int, nprocesses int) {
}

meta := m.meta()
if !supportsMapKeyValueIO(meta.Type) {
return
}
values := []uint64{1, uint64(mapLevel), 0, 0, 0, 0}
key := make([]byte, meta.KeySize)
value := make([]byte, mapValueLength(meta, nprocesses))
Expand Down
Loading