diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb
index cc2f8d0c..c4c570f8 100644
--- a/lib/outboxer/message.rb
+++ b/lib/outboxer/message.rb
@@ -745,7 +745,7 @@ def metrics_by_status(time: Time)
"COALESCE(SUM(failed_message_count), 0) AS failed",
"COALESCE(SUM(" \
"queued_message_count + publishing_message_count + " \
- "published_message_count + failed_message_count), 0) AS total"
+ "failed_message_count), 0) AS total"
).take
counts_hash = counts.attributes.symbolize_keys.transform_values(&:to_i)
diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb
index 4f7a836c..5ae3eaae 100644
--- a/lib/outboxer/publisher.rb
+++ b/lib/outboxer/publisher.rb
@@ -95,7 +95,7 @@ def find_by_id(id:)
def all
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
- publishers = Models::Publisher.includes(:signals).all
+ publishers = Models::Publisher.includes(:signals).order(created_at: :asc).all
publishers.map do |publisher|
{
@@ -295,9 +295,7 @@ def create_heartbeat_thread(id:, hostname:, process_id:,
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
start_rtt = process.clock_gettime(process::CLOCK_MONOTONIC)
-
publisher = Models::Publisher.lock.find(id)
-
end_rtt = process.clock_gettime(process::CLOCK_MONOTONIC)
rtt = end_rtt - start_rtt
@@ -316,18 +314,21 @@ def create_heartbeat_thread(id:, hostname:, process_id:,
publisher_thread = Models::Thread
.select(
- "COALESCE(SUM(queued_message_count), 0) AS queued_message_count,
- COALESCE(SUM(publishing_message_count), 0) AS publishing_message_count,
- COALESCE(SUM(published_message_count), 0) AS published_message_count,
- COALESCE(SUM(failed_message_count), 0) AS failed_message_count,
- MAX(updated_at) AS last_updated_at"
+ "COALESCE(SUM(publishing_message_count), 0) AS publishing_message_count, " \
+ "COALESCE(SUM(published_message_count), 0) AS published_message_count, " \
+ "COALESCE(SUM(failed_message_count), 0) AS failed_message_count, " \
+ "MAX(publishing_message_count_last_updated_at) " \
+ "AS publishing_message_count_last_updated_at, " \
+ "MAX(published_message_count_last_updated_at) " \
+ "AS published_message_count_last_updated_at, " \
+ "MAX(failed_message_count_last_updated_at) " \
+ "AS failed_message_count_last_updated_at"
)
.where(hostname: hostname, process_id: process_id)
.group(:hostname, :process_id)
.take
current_counts = {
- "queued" => publisher_thread&.queued_message_count || 0,
"publishing" => publisher_thread&.publishing_message_count || 0,
"published" => publisher_thread&.published_message_count || 0,
"failed" => publisher_thread&.failed_message_count || 0
@@ -339,10 +340,14 @@ def create_heartbeat_thread(id:, hostname:, process_id:,
h[status] = ((count - prev) / time_delta).round(0)
end
- latency = 0
+ timestamps = [
+ publisher_thread.publishing_message_count_last_updated_at,
+ publisher_thread.published_message_count_last_updated_at,
+ publisher_thread.failed_message_count_last_updated_at
+ ].compact
- if !publisher_thread.nil?
- latency = (current_utc_time - publisher_thread.last_updated_at).round(0)
+ if !timestamps.empty?
+ last_message_update = timestamps.max
end
publisher.update!(
@@ -364,7 +369,7 @@ def create_heartbeat_thread(id:, hostname:, process_id:,
"count" => current_counts["failed"],
"throughput" => throughput_by_status["failed"]
},
- "latency" => latency,
+ "last_message_update" => last_message_update&.iso8601,
"cpu" => cpu,
"rss" => rss,
"rtt" => rtt
diff --git a/lib/outboxer/web/views/home.erb b/lib/outboxer/web/views/home.erb
index 3fdeb04f..eeb41fe6 100644
--- a/lib/outboxer/web/views/home.erb
+++ b/lib/outboxer/web/views/home.erb
@@ -14,14 +14,11 @@
<% else %>
| Name |
+ Status |
Created |
Updated |
- Status |
Throughput |
- Latency |
- CPU |
- RSS |
- RTT |
+ Metrics |
Action |
<% end %>
@@ -34,15 +31,56 @@
href="<%= outboxer_path("/publisher/#{publisher[:id]}#{normalise_query_string(time_zone: denormalised_query_params[:time_zone])}") %>">
<%= publisher[:name] %>
-
- <%= Outboxer::Web.time_ago_in_words(publisher[:created_at]) %> |
- <%= Outboxer::Web.time_ago_in_words(publisher[:updated_at]) %> |
+
+
+
+ Concurrency: <%= publisher[:settings]['concurrency'] %>,
+ Poll: <%= publisher[:settings]['poll_interval'] %>s,
+ Heartbeat: <%= publisher[:settings]['heartbeat_interval'] %>s,
+ Tick: <%= publisher[:settings]['tick_interval'] %>s
+
+
<%= publisher[:status] %> |
- <%= Outboxer::Web.pretty_throughput(per_second: publisher[:metrics]['published']['throughput']) %> |
- <%= Outboxer::Web.pretty_duration_from_seconds(seconds: publisher[:metrics]['latency']) %> |
- <%= publisher[:metrics]['cpu'].round(0) %>% |
- <%= Outboxer::Web.human_readable_size(kilobytes: publisher[:metrics]['rss']) %> |
- <%= Outboxer::Web.pretty_duration_from_seconds(seconds: publisher[:metrics]['rtt']) %> |
+ <%= Outboxer::Web.time_ago_in_words(publisher[:created_at]) %> |
+
+
+
+
+
+ Heartbeat:
+ <%= Outboxer::Web.time_ago_in_words(publisher[:updated_at]) %>
+
+
+
+
+ Message:
+ <% if publisher[:metrics]['last_message_update'] %>
+ <%= Outboxer::Web.time_ago_in_words(Time.iso8601(publisher[:metrics]['last_message_update'])) %>
+ <% else %>
+ -
+ <% end %>
+
+ <%= Outboxer::Web.pretty_throughput(per_second: publisher[:metrics]['published']['throughput']) %> |
+
+
+
+
+ CPU:
+ <%= publisher[:metrics]['cpu'].round(0) %>%
+
+
+
+ RSS:
+ <%= Outboxer::Web.human_readable_size(kilobytes: publisher[:metrics]['rss']) %>
+
+
+
+ RTT:
+ <%= Outboxer::Web.pretty_duration_from_seconds(seconds: publisher[:metrics]['rtt']) %>
+
+
+
+ |
@@ -112,7 +150,7 @@
| Status |
Count |
- Last Update |
+ Updated |
Latency |
diff --git a/lib/outboxer/web/views/layout.erb b/lib/outboxer/web/views/layout.erb
index 9ebfb294..b3fa80d4 100644
--- a/lib/outboxer/web/views/layout.erb
+++ b/lib/outboxer/web/views/layout.erb
@@ -55,7 +55,6 @@
Outboxer
- <%= Outboxer::Web.pretty_number(number: message_metrics_by_status[:published][:count]) %>
| |