Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/outboxer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ def metrics_by_status(time: Time)
"COALESCE(SUM(failed_message_count), 0) AS failed",
"COALESCE(SUM(" \
"queued_message_count + publishing_message_count + " \
"published_message_count + failed_message_count), 0) AS total"
"failed_message_count), 0) AS total"
).take

counts_hash = counts.attributes.symbolize_keys.transform_values(&:to_i)
Expand Down
31 changes: 18 additions & 13 deletions lib/outboxer/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def find_by_id(id:)
def all
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
publishers = Models::Publisher.includes(:signals).all
publishers = Models::Publisher.includes(:signals).order(created_at: :asc).all

publishers.map do |publisher|
{
Expand Down Expand Up @@ -295,9 +295,7 @@ def create_heartbeat_thread(id:, hostname:, process_id:,
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
start_rtt = process.clock_gettime(process::CLOCK_MONOTONIC)

publisher = Models::Publisher.lock.find(id)

end_rtt = process.clock_gettime(process::CLOCK_MONOTONIC)
rtt = end_rtt - start_rtt

Expand All @@ -316,18 +314,21 @@ def create_heartbeat_thread(id:, hostname:, process_id:,

publisher_thread = Models::Thread
.select(
"COALESCE(SUM(queued_message_count), 0) AS queued_message_count,
COALESCE(SUM(publishing_message_count), 0) AS publishing_message_count,
COALESCE(SUM(published_message_count), 0) AS published_message_count,
COALESCE(SUM(failed_message_count), 0) AS failed_message_count,
MAX(updated_at) AS last_updated_at"
"COALESCE(SUM(publishing_message_count), 0) AS publishing_message_count, " \
"COALESCE(SUM(published_message_count), 0) AS published_message_count, " \
"COALESCE(SUM(failed_message_count), 0) AS failed_message_count, " \
"MAX(publishing_message_count_last_updated_at) " \
"AS publishing_message_count_last_updated_at, " \
"MAX(published_message_count_last_updated_at) " \
"AS published_message_count_last_updated_at, " \
"MAX(failed_message_count_last_updated_at) " \
"AS failed_message_count_last_updated_at"
)
.where(hostname: hostname, process_id: process_id)
.group(:hostname, :process_id)
.take

current_counts = {
"queued" => publisher_thread&.queued_message_count || 0,
"publishing" => publisher_thread&.publishing_message_count || 0,
"published" => publisher_thread&.published_message_count || 0,
"failed" => publisher_thread&.failed_message_count || 0
Expand All @@ -339,10 +340,14 @@ def create_heartbeat_thread(id:, hostname:, process_id:,
h[status] = ((count - prev) / time_delta).round(0)
end

latency = 0
timestamps = [
publisher_thread.publishing_message_count_last_updated_at,
publisher_thread.published_message_count_last_updated_at,
publisher_thread.failed_message_count_last_updated_at
].compact

if !publisher_thread.nil?
latency = (current_utc_time - publisher_thread.last_updated_at).round(0)
if !timestamps.empty?
last_message_update = timestamps.max
end

publisher.update!(
Expand All @@ -364,7 +369,7 @@ def create_heartbeat_thread(id:, hostname:, process_id:,
"count" => current_counts["failed"],
"throughput" => throughput_by_status["failed"]
},
"latency" => latency,
"last_message_update" => last_message_update&.iso8601,
"cpu" => cpu,
"rss" => rss,
"rtt" => rtt
Expand Down
66 changes: 52 additions & 14 deletions lib/outboxer/web/views/home.erb
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@
<% else %>
<tr>
<th scope="col" class="dynamic-name-column">Name</th> <!-- Dynamic column -->
<th scope="col">Status</th>
<th scope="col">Created</th>
<th scope="col">Updated</th>
<th scope="col">Status</th>
<th scope="col">Throughput</th>
<th scope="col">Latency</th>
<th scope="col">CPU</th>
<th scope="col">RSS</th>
<th scope="col">RTT</th>
<th scope="col">Metrics</th>
<th scope="col">Action</th> <!-- New Action column -->
</tr>
<% end %>
Expand All @@ -34,15 +31,56 @@
href="<%= outboxer_path("/publisher/#{publisher[:id]}#{normalise_query_string(time_zone: denormalised_query_params[:time_zone])}") %>">
<%= publisher[:name] %>
</a>
</td>
<td><%= Outboxer::Web.time_ago_in_words(publisher[:created_at]) %></td>
<td><%= Outboxer::Web.time_ago_in_words(publisher[:updated_at]) %></td>

<div class="text-muted small mt-1">
<div>
Concurrency: <%= publisher[:settings]['concurrency'] %>,
Poll: <%= publisher[:settings]['poll_interval'] %>s,
Heartbeat: <%= publisher[:settings]['heartbeat_interval'] %>s,
Tick: <%= publisher[:settings]['tick_interval'] %>s
</div>
</div> </td>
<td class="text-capitalize"><%= publisher[:status] %></td>
<td><%= Outboxer::Web.pretty_throughput(per_second: publisher[:metrics]['published']['throughput']) %></td>
<td><%= Outboxer::Web.pretty_duration_from_seconds(seconds: publisher[:metrics]['latency']) %></td>
<td><%= publisher[:metrics]['cpu'].round(0) %>%</td>
<td><%= Outboxer::Web.human_readable_size(kilobytes: publisher[:metrics]['rss']) %></td>
<td><%= Outboxer::Web.pretty_duration_from_seconds(seconds: publisher[:metrics]['rtt']) %></td>
<td><%= Outboxer::Web.time_ago_in_words(publisher[:created_at]) %></td>
<td>
<div class="small text-muted">

<!-- Heartbeat / Metrics Updated -->
<div>
<strong>Heartbeat:</strong>
<%= Outboxer::Web.time_ago_in_words(publisher[:updated_at]) %>
</div>

<!-- Last Message Published -->
<div>
<strong>Message:</strong>
<% if publisher[:metrics]['last_message_update'] %>
<%= Outboxer::Web.time_ago_in_words(Time.iso8601(publisher[:metrics]['last_message_update'])) %>
<% else %>
-
<% end %>
</div>
<td><%= Outboxer::Web.pretty_throughput(per_second: publisher[:metrics]['published']['throughput']) %></td>
<td>
<div class="small text-muted">

<div>
<strong>CPU:</strong>
<%= publisher[:metrics]['cpu'].round(0) %>%
</div>

<div>
<strong>RSS:</strong>
<%= Outboxer::Web.human_readable_size(kilobytes: publisher[:metrics]['rss']) %>
</div>

<div>
<strong>RTT:</strong>
<%= Outboxer::Web.pretty_duration_from_seconds(seconds: publisher[:metrics]['rtt']) %>
</div>

</div>
</td>
<td>
<div class="d-flex gap-1">
<!-- Pause Button -->
Expand Down Expand Up @@ -112,7 +150,7 @@
<tr>
<th scope="col">Status</th>
<th scope="col">Count</th>
<th scope="col">Last Update</th>
<th scope="col">Updated</th>
<th scope="col">Latency</th>
</tr>
</thead>
Expand Down
24 changes: 16 additions & 8 deletions lib/outboxer/web/views/layout.erb
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,17 @@
<div class="container-fluid">
<a class="navbar-brand" href="<%= outboxer_path('') %>">
<i class="bi bi-envelope-open-fill"></i> Outboxer
<span class="badge bg-secondary"><%= Outboxer::Web.pretty_number(number: message_metrics_by_status[:published][:count]) %></span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarNav" aria-controls="navbarNav" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div class="collapse navbar-collapse" id="navbarNav">
<ul class="navbar-nav">
<% statuses = [
{ name: 'Queued', key: 'queued' },
{ name: 'Publishing', key: 'publishing' },
{ name: 'Failed', key: 'failed' }
{ name: 'Queued', key: 'queued', linkable: true },
{ name: 'Publishing', key: 'publishing', linkable: true },
{ name: 'Failed', key: 'failed', linkable: true },
{ name: 'Published', key: 'published', linkable: false },
] %>

<li class="nav-item">
Expand All @@ -77,10 +77,18 @@

<% statuses.each do |status| %>
<li class="nav-item">
<a class="nav-link <%= 'active' if denormalised_query_params[:status] == status[:key] %>"
href="<%= outboxer_path("/messages#{normalise_query_string(status: status[:key], time_zone: params[:time_zone])}") %>">
<%= status[:name] %> (<%= Outboxer::Web.pretty_number(number: message_metrics_by_status[status[:key].to_sym][:count]) %>)
</a>
<% if status[:linkable] %>
<a class="nav-link <%= 'active' if denormalised_query_params[:status] == status[:key] %>"
href="<%= outboxer_path("/messages#{normalise_query_string(status: status[:key], time_zone: params[:time_zone])}") %>">
<%= status[:name] %>
(<%= Outboxer::Web.pretty_number(number: message_metrics_by_status[status[:key].to_sym][:count]) %>)
</a>
<% else %>
<span class="nav-link disabled same-colour">
<%= status[:name] %>
(<%= Outboxer::Web.pretty_number(number: message_metrics_by_status[status[:key].to_sym][:count]) %>)
</span>
<% end %>
</li>
<% end %>
</ul>
Expand Down
2 changes: 1 addition & 1 deletion lib/outboxer/web/views/publisher.erb
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
<td><%= Outboxer::Web.human_readable_size(kilobytes: publisher[:metrics]['rss']) %></td>
</tr>
<tr>
<th scope="row">RTT</th>
<th scope="row">Database RTT</th>
<td><%= Outboxer::Web.pretty_duration_from_seconds(seconds: publisher[:metrics]['rtt']) %></td>
</tr>
</tbody>
Expand Down
2 changes: 1 addition & 1 deletion spec/lib/outboxer/message/metrics_by_status_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ module Outboxer
publishing: { count: 26, last_update: current_utc_time, latency: nil },
published: { count: 37, last_update: current_utc_time, latency: nil },
failed: { count: 48, last_update: current_utc_time, latency: nil },
total: { count: 126, last_update: nil, latency: nil }
total: { count: 89, last_update: nil, latency: nil }
)
end
end
Expand Down