-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatax.cpp
More file actions
91 lines (75 loc) · 2.61 KB
/
datax.cpp
File metadata and controls
91 lines (75 loc) · 2.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#include "datax.h"
#include <fstream>
#include <sstream>
#include <string_view>
#include <utility>
#include <dlfcn.h>
using datax::DataX;
std::string Getenv(const std::string &variable) {
auto value = getenv(variable.c_str());
if (value == nullptr) {
return "";
}
return {value};
}
#define LOAD_FUNCTION(type, name) do { \
name = (type) dlsym(library_handle_, #type); \
if (name == nullptr) { \
std::ostringstream oss; \
oss << dlerror() << " loading function " << #type; \
throw datax::Exception(oss.str()); \
} \
} while(0)
DataX::DataX() {
library_handle_ = dlopen("libdatax-sdk.so", RTLD_LOCAL | RTLD_NOW);
if (library_handle_ == nullptr) {
std::ostringstream oss;
oss << dlerror() << " loading libdatax-sdk.so";
throw datax::Exception(oss.str());
}
LOAD_FUNCTION(datax_sdk_v2_initialize, initialize_);
LOAD_FUNCTION(datax_sdk_v2_next, next_);
LOAD_FUNCTION(datax_sdk_v2_emit, emit_);
LOAD_FUNCTION(datax_sdk_v2_message_stream, message_stream_);
LOAD_FUNCTION(datax_sdk_v2_message_reference, message_reference_);
LOAD_FUNCTION(datax_sdk_v2_message_data, message_data_);
LOAD_FUNCTION(datax_sdk_v2_message_data_size, message_data_size_);
LOAD_FUNCTION(datax_sdk_v2_message_close, message_close_);
initialize_();
}
DataX::~DataX() {
}
nlohmann::json DataX::Configuration() {
auto configurationPath = Getenv("DATAX_CONFIGURATION");
if (configurationPath.empty()) {
configurationPath = "/datax/configuration";
}
std::ifstream in(configurationPath);
return nlohmann::json::parse(in);
}
datax::Message DataX::Next() {
datax::Message message;
auto msg = next_();
message.Stream = message_stream_(msg);
message.Reference = message_reference_(msg);
auto data = message_data_(msg);
auto data_size = message_data_size_(msg);
message.Data = nlohmann::json::from_cbor(std::string_view((const char *) data, data_size), false, false);
message_close_(msg);
return message;
}
void DataX::Emit(const nlohmann::json &message, const std::string &reference) {
auto data = nlohmann::json::to_cbor(message);
emit_(data.data(), (int32_t) data.size(), reference.c_str());
}
std::shared_ptr<DataX> datax::DataX::Instance() {
static std::shared_ptr<DataX> instance(new DataX);
return instance;
}
std::shared_ptr<DataX> datax::New() {
return DataX::Instance();
}
datax::Exception::Exception(std::string message) : message_(std::move(message)) {}
const char *datax::Exception::what() const noexcept {
return message_.c_str();
}