diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index cc2f8d0c..c4c570f8 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -745,7 +745,7 @@ def metrics_by_status(time: Time) "COALESCE(SUM(failed_message_count), 0) AS failed", "COALESCE(SUM(" \ "queued_message_count + publishing_message_count + " \ - "published_message_count + failed_message_count), 0) AS total" + "failed_message_count), 0) AS total" ).take counts_hash = counts.attributes.symbolize_keys.transform_values(&:to_i) diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index 4f7a836c..5ae3eaae 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -95,7 +95,7 @@ def find_by_id(id:) def all ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do - publishers = Models::Publisher.includes(:signals).all + publishers = Models::Publisher.includes(:signals).order(created_at: :asc).all publishers.map do |publisher| { @@ -295,9 +295,7 @@ def create_heartbeat_thread(id:, hostname:, process_id:, ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do start_rtt = process.clock_gettime(process::CLOCK_MONOTONIC) - publisher = Models::Publisher.lock.find(id) - end_rtt = process.clock_gettime(process::CLOCK_MONOTONIC) rtt = end_rtt - start_rtt @@ -316,18 +314,21 @@ def create_heartbeat_thread(id:, hostname:, process_id:, publisher_thread = Models::Thread .select( - "COALESCE(SUM(queued_message_count), 0) AS queued_message_count, - COALESCE(SUM(publishing_message_count), 0) AS publishing_message_count, - COALESCE(SUM(published_message_count), 0) AS published_message_count, - COALESCE(SUM(failed_message_count), 0) AS failed_message_count, - MAX(updated_at) AS last_updated_at" + "COALESCE(SUM(publishing_message_count), 0) AS publishing_message_count, " \ + "COALESCE(SUM(published_message_count), 0) AS published_message_count, " \ + "COALESCE(SUM(failed_message_count), 0) AS failed_message_count, " \ + "MAX(publishing_message_count_last_updated_at) " \ + "AS publishing_message_count_last_updated_at, " \ + "MAX(published_message_count_last_updated_at) " \ + "AS published_message_count_last_updated_at, " \ + "MAX(failed_message_count_last_updated_at) " \ + "AS failed_message_count_last_updated_at" ) .where(hostname: hostname, process_id: process_id) .group(:hostname, :process_id) .take current_counts = { - "queued" => publisher_thread&.queued_message_count || 0, "publishing" => publisher_thread&.publishing_message_count || 0, "published" => publisher_thread&.published_message_count || 0, "failed" => publisher_thread&.failed_message_count || 0 @@ -339,10 +340,14 @@ def create_heartbeat_thread(id:, hostname:, process_id:, h[status] = ((count - prev) / time_delta).round(0) end - latency = 0 + timestamps = [ + publisher_thread.publishing_message_count_last_updated_at, + publisher_thread.published_message_count_last_updated_at, + publisher_thread.failed_message_count_last_updated_at + ].compact - if !publisher_thread.nil? - latency = (current_utc_time - publisher_thread.last_updated_at).round(0) + if !timestamps.empty? + last_message_update = timestamps.max end publisher.update!( @@ -364,7 +369,7 @@ def create_heartbeat_thread(id:, hostname:, process_id:, "count" => current_counts["failed"], "throughput" => throughput_by_status["failed"] }, - "latency" => latency, + "last_message_update" => last_message_update&.iso8601, "cpu" => cpu, "rss" => rss, "rtt" => rtt diff --git a/lib/outboxer/web/views/home.erb b/lib/outboxer/web/views/home.erb index 3fdeb04f..eeb41fe6 100644 --- a/lib/outboxer/web/views/home.erb +++ b/lib/outboxer/web/views/home.erb @@ -14,14 +14,11 @@ <% else %> Name + Status Created Updated - Status Throughput - Latency - CPU - RSS - RTT + Metrics Action <% end %> @@ -34,15 +31,56 @@ href="<%= outboxer_path("/publisher/#{publisher[:id]}#{normalise_query_string(time_zone: denormalised_query_params[:time_zone])}") %>"> <%= publisher[:name] %> - - <%= Outboxer::Web.time_ago_in_words(publisher[:created_at]) %> - <%= Outboxer::Web.time_ago_in_words(publisher[:updated_at]) %> + +
+
+ Concurrency: <%= publisher[:settings]['concurrency'] %>, + Poll: <%= publisher[:settings]['poll_interval'] %>s, + Heartbeat: <%= publisher[:settings]['heartbeat_interval'] %>s, + Tick: <%= publisher[:settings]['tick_interval'] %>s +
+
<%= publisher[:status] %> - <%= Outboxer::Web.pretty_throughput(per_second: publisher[:metrics]['published']['throughput']) %> - <%= Outboxer::Web.pretty_duration_from_seconds(seconds: publisher[:metrics]['latency']) %> - <%= publisher[:metrics]['cpu'].round(0) %>% - <%= Outboxer::Web.human_readable_size(kilobytes: publisher[:metrics]['rss']) %> - <%= Outboxer::Web.pretty_duration_from_seconds(seconds: publisher[:metrics]['rtt']) %> + <%= Outboxer::Web.time_ago_in_words(publisher[:created_at]) %> + +
+ + +
+ Heartbeat: + <%= Outboxer::Web.time_ago_in_words(publisher[:updated_at]) %> +
+ + +
+ Message: + <% if publisher[:metrics]['last_message_update'] %> + <%= Outboxer::Web.time_ago_in_words(Time.iso8601(publisher[:metrics]['last_message_update'])) %> + <% else %> + - + <% end %> +
+ <%= Outboxer::Web.pretty_throughput(per_second: publisher[:metrics]['published']['throughput']) %> + +
+ +
+ CPU: + <%= publisher[:metrics]['cpu'].round(0) %>% +
+ +
+ RSS: + <%= Outboxer::Web.human_readable_size(kilobytes: publisher[:metrics]['rss']) %> +
+ +
+ RTT: + <%= Outboxer::Web.pretty_duration_from_seconds(seconds: publisher[:metrics]['rtt']) %> +
+ +
+
@@ -112,7 +150,7 @@ Status Count - Last Update + Updated Latency diff --git a/lib/outboxer/web/views/layout.erb b/lib/outboxer/web/views/layout.erb index 9ebfb294..b3fa80d4 100644 --- a/lib/outboxer/web/views/layout.erb +++ b/lib/outboxer/web/views/layout.erb @@ -55,7 +55,6 @@
Outboxer - <%= Outboxer::Web.pretty_number(number: message_metrics_by_status[:published][:count]) %>