Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions fluent-plugin-oceanbase-logs/.env.example
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# --- Required (OceanBase Cloud API) ---
OCEANBASE_ACCESS_KEY_ID=your_access_key_id
OCEANBASE_ACCESS_KEY_SECRET=your_access_key_secret
# Instance ID and tenant ID must both come from the console; do not reuse the same value for both.
OCEANBASE_INSTANCE_ID=ob317v4uif****
OCEANBASE_TENANT_ID=t4louaeei****

# --- Optional ---
# LOKI_URL=http://loki:3100
Expand Down
6 changes: 4 additions & 2 deletions fluent-plugin-oceanbase-logs/example/fluentd.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
access_key_id "#{ENV['OCEANBASE_ACCESS_KEY_ID']}"
access_key_secret "#{ENV['OCEANBASE_ACCESS_KEY_SECRET']}"

instance_id "#{ENV['OCEANBASE_INSTANCE_ID']}"
tenant_id "#{ENV['OCEANBASE_TENANT_ID']}"
<target>
instance_id ""
tenant_id ""
</target>

# API endpoint (override with OCEANBASE_ENDPOINT)
endpoint "#{ENV['OCEANBASE_ENDPOINT'] || 'api-cloud-cn.oceanbase.com'}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ services:
networks:
- obs

fluentd-oceanbase-demo:
fluentd-oceanbase-demo1:
image: fluent/fluentd:v1.16-1
platform: linux/amd64
container_name: fluentd-oceanbase-demo
container_name: fluentd-oceanbase-demo1
user: root
command:
- /bin/sh
Expand All @@ -37,13 +37,48 @@ services:
mkdir -p /var/log/fluentd
exec fluentd -c /fluentd/etc/fluentd-to-loki.conf
volumes:
- ./fluentd-to-loki.conf:/fluentd/etc/fluentd-to-loki.conf:ro
- ./fluentd-to-loki1.conf:/fluentd/etc/fluentd-to-loki.conf:ro
environment:
LOKI_URL: "${LOKI_URL:-http://loki:3100}"
OCEANBASE_ACCESS_KEY_ID: "${OCEANBASE_ACCESS_KEY_ID}"
OCEANBASE_INSTANCE1: "${OCEANBASE_INSTANCE1}"
OCEANBASE_TENANT1: "${OCEANBASE_TENANT1}"
OCEANBASE_INSTANCE2: "${OCEANBASE_INSTANCE2}"
OCEANBASE_TENANT2: "${OCEANBASE_TENANT2}"
OCEANBASE_ACCESS_KEY_SECRET: "${OCEANBASE_ACCESS_KEY_SECRET}"
OCEANBASE_ENDPOINT: "${OCEANBASE_ENDPOINT:-api-cloud-cn.oceanbase.com}"
OCEANBASE_FETCH_INTERVAL: "${OCEANBASE_FETCH_INTERVAL:-60}"
OCEANBASE_LOOKBACK_SECONDS: "${OCEANBASE_LOOKBACK_SECONDS:-600}"
OCEANBASE_DB_NAME: "${OCEANBASE_DB_NAME:-}"
OCEANBASE_SEARCH_KEYWORD: "${OCEANBASE_SEARCH_KEYWORD:-}"
OCEANBASE_PROJECT_ID: "${OCEANBASE_PROJECT_ID:-}"
networks:
- obs
depends_on:
- loki

fluentd-oceanbase-demo2:
image: fluent/fluentd:v1.16-1
platform: linux/amd64
container_name: fluentd-oceanbase-demo2
user: root
command:
- /bin/sh
- -c
- |
gem install fluent-plugin-oceanbase-logs fluent-plugin-grafana-loki --no-document
mkdir -p /var/log/fluentd
exec fluentd -c /fluentd/etc/fluentd-to-loki.conf
volumes:
- ./fluentd-to-loki2.conf:/fluentd/etc/fluentd-to-loki.conf:ro
environment:
LOKI_URL: "${LOKI_URL:-http://loki:3100}"
OCEANBASE_ACCESS_KEY_ID: "${OCEANBASE_ACCESS_KEY_ID}"
OCEANBASE_INSTANCE1: "${OCEANBASE_INSTANCE1}"
OCEANBASE_TENANT1: "${OCEANBASE_TENANT1}"
OCEANBASE_INSTANCE2: "${OCEANBASE_INSTANCE2}"
OCEANBASE_TENANT2: "${OCEANBASE_TENANT2}"
OCEANBASE_ACCESS_KEY_SECRET: "${OCEANBASE_ACCESS_KEY_SECRET}"
OCEANBASE_INSTANCE_ID: "${OCEANBASE_INSTANCE_ID}"
OCEANBASE_TENANT_ID: "${OCEANBASE_TENANT_ID}"
OCEANBASE_ENDPOINT: "${OCEANBASE_ENDPOINT:-api-cloud-cn.oceanbase.com}"
OCEANBASE_FETCH_INTERVAL: "${OCEANBASE_FETCH_INTERVAL:-60}"
OCEANBASE_LOOKBACK_SECONDS: "${OCEANBASE_LOOKBACK_SECONDS:-600}"
Expand All @@ -67,7 +102,9 @@ services:
- obs
depends_on:
- loki
- fluentd-oceanbase-demo1
- fluentd-oceanbase-demo2

networks:
obs:
driver: bridge
driver: bridge
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,30 @@
access_key_id "#{ENV['OCEANBASE_ACCESS_KEY_ID']}"
access_key_secret "#{ENV['OCEANBASE_ACCESS_KEY_SECRET']}"

instance_id "#{ENV['OCEANBASE_INSTANCE_ID']}"
tenant_id "#{ENV['OCEANBASE_TENANT_ID']}"
<target>
instance_id "#{ENV['OCEANBASE_INSTANCE1']}"
tenant_id "#{ENV['OCEANBASE_TENANT1']}"
</target>

endpoint "#{ENV['OCEANBASE_ENDPOINT'] || 'api-cloud-cn.oceanbase.com'}"
fetch_interval "#{ENV['OCEANBASE_FETCH_INTERVAL'] || 30}"
lookback_seconds "#{ENV['OCEANBASE_LOOKBACK_SECONDS'] || 600}"

sql_text_length "#{ENV['OCEANBASE_SQL_TEXT_LENGTH'] || 65535}"
<target>
instance_id "#{ENV['OCEANBASE_INSTANCE2']}"
tenant_id "#{ENV['OCEANBASE_TENANT2']}"
</target>

deduplicate true
include_metadata true

<storage>
@type local
persistent true
path /var/log/fluentd/oceanbase_slow_sql.state
</storage>
</source>

<source>
@type oceanbase_logs
tag oceanbase.top_sql
log_type top_sql

access_key_id "#{ENV['OCEANBASE_ACCESS_KEY_ID']}"
access_key_secret "#{ENV['OCEANBASE_ACCESS_KEY_SECRET']}"

instance_id "#{ENV['OCEANBASE_INSTANCE_ID']}"
tenant_id "#{ENV['OCEANBASE_TENANT_ID']}"

endpoint "#{ENV['OCEANBASE_ENDPOINT'] || 'api-cloud-cn.oceanbase.com'}"
fetch_interval "#{ENV['OCEANBASE_FETCH_INTERVAL'] || 60}"
endpoint "api-cloud-cn.oceanbase.com"
fetch_interval "#{ENV['OCEANBASE_FETCH_INTERVAL'] || 30}"
lookback_seconds "#{ENV['OCEANBASE_LOOKBACK_SECONDS'] || 600}"

sql_text_length "#{ENV['OCEANBASE_SQL_TEXT_LENGTH'] || 65535}"

db_name "#{ENV['OCEANBASE_DB_NAME'] || 'test'}"
search_keyword "#{ENV['OCEANBASE_SEARCH_KEYWORD'] || 'SELECT'}"
project_id "#{ENV['OCEANBASE_PROJECT_ID']}"

deduplicate true
include_metadata true

<storage>
@type local
persistent true
path /var/log/fluentd/oceanbase_top_sql.state
path /var/log/fluentd/oceanbase_slow_sql.state
</storage>
</source>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<source>
@type oceanbase_logs
tag oceanbase.slow_sql
log_type slow_sql

access_key_id "#{ENV['OCEANBASE_ACCESS_KEY_ID']}"
access_key_secret "#{ENV['OCEANBASE_ACCESS_KEY_SECRET']}"

<target>
instance_id "#{ENV['OCEANBASE_INSTANCE1']}"
tenant_id "#{ENV['OCEANBASE_TENANT1']}"
</target>

<target>
instance_id "#{ENV['OCEANBASE_INSTANCE2']}"
tenant_id "#{ENV['OCEANBASE_TENANT2']}"
</target>


endpoint "api-cloud-cn.oceanbase.com"
fetch_interval "#{ENV['OCEANBASE_FETCH_INTERVAL'] || 30}"
lookback_seconds "#{ENV['OCEANBASE_LOOKBACK_SECONDS'] || 600}"

sql_text_length "#{ENV['OCEANBASE_SQL_TEXT_LENGTH'] || 65535}"

deduplicate true
include_metadata true

<storage>
@type local
persistent true
path /var/log/fluentd/oceanbase_slow_sql.state
</storage>
</source>

<filter oceanbase.**>
@type record_transformer
enable_ruby true
renew_record true
<record>
log ${r = record; require('json'); u = (r.dig("userName") || "").to_s; u = "unknown" if u.empty?; ip = (r.dig("userClientIp") || r.dig("clientIp") || "").to_s; tid = (r.dig("traceId") || "").to_s; t = (r.dig("requestTime") || "").to_s; et_ms = r.dig("executeTime"); qt_s = et_ms.nil? ? 0.0 : et_ms.to_f / 1000.0; lock_s = (r.dig("queueTime") || 0).to_f; rs = (r.dig("returnRows") || 0); mr = (r.dig("memstoreReadRows") || 0).to_f; sr = (r.dig("ssstoreReadRows") || 0).to_f; re = (mr + sr).to_i; db = (r.dig("dbName") || "").to_s; sql = (r.dig("fullSqlText") || "").to_s; rid = r.dig("requestId"); ltype = (r.dig("ob_log_type") || "").to_s; ts_unix = 0; begin; ts_unix = Time.parse(t).utc.to_i if !t.empty?; rescue; end; ub = sprintf("\u0023 User@Host: %s%c%s%c @ %c%s%c", u, 91, u, 93, 91, ip, 93); ub = ub + sprintf(" Id: %s", rid) if rid; parts = Array.new; parts.push("\u0023 Time: " + t) unless t.empty?; parts.push(sprintf("\u0023 Log_type: %s", ltype)) unless ltype.empty?; parts.push(ub); parts.push("\u0023 Trace_id: " + tid) unless tid.to_s.empty?; parts.push(sprintf("\u0023 Query_time: %.6f Lock_time: %.6f Rows_sent: %s Rows_examined: %s", qt_s, lock_s, rs.to_s, re.to_s)); parts.push(sprintf("use %s%c", db, 59)) unless db.empty?; parts.push(sprintf("SET timestamp=%s%c", ts_unix.to_s, 59)) if ts_unix > 0; parts.push(sql); mysql_part = parts.join("\n"); keys = %w(requestTime clientIp traceId executeTime queueTime returnRows rowsExaminedApprox dbName fullSqlText userName userClientIp clientId clientPort requestId sqlTextShort elapsedTime cpuTime memstoreReadRows ssstoreReadRows diskReads affectedRows blockCacheHit blockIndexCacheHit bloomFilterCacheHit decodeTime executorRpc expectedWorkerCount getPlanTime hitPlan inner netTime netWaitTime partitionCount planId planType retCode retryCount rowCacheHit rpcCount server sqlId sqlType tableScan transHash usedWorkerCount waitCount waitEvent ob_instance_id ob_tenant_id ob_log_type query_start_time query_end_time); h = Hash.new; keys.each do |k|; if k == "clientId"; h.store(k, r.dig("clientIp") || r.dig("userClientIp")); elsif k == "rowsExaminedApprox"; h.store(k, ((r.dig("memstoreReadRows") || 0).to_f + (r.dig("ssstoreReadRows") || 0).to_f).to_i); elsif r.key?(k); h.store(k, r.dig(k)); end; end; mysql_part + "\n\n" + "\u0023 ob_api_payload\uFF1A" + JSON.pretty_generate(h)}
ob_instance_id ${record.dig("ob_instance_id")}
ob_tenant_id ${record.dig("ob_tenant_id")}
dbName ${record.dig("dbName")}
sqlType ${record.dig("sqlType")}
userName ${record.dig("userName")}
ob_log_type ${record.dig("ob_log_type")}
</record>
</filter>

<match oceanbase.**>
@type loki
url "#{ENV['LOKI_URL'] || 'http://loki:3100'}"

drop_single_key true

extra_labels {"job":"oceanbase-logs"}

<label>
ob_instance_id
ob_tenant_id
ob_log_type
dbName
sqlType
userName
</label>

remove_keys ob_log_type,message

<buffer>
@type memory
flush_interval 10s
chunk_limit_size 1m
retry_max_interval 30s
retry_forever true
</buffer>
</match>
Loading