Define ITelemetryClient interface\n\nTask ID: task-2.1-telemetry-client-interface#302
Define ITelemetryClient interface\n\nTask ID: task-2.1-telemetry-client-interface#302jadewang-db wants to merge 13 commits intomainfrom
Conversation
…sk ID: task-1.2-statement-telemetry-context
GetEndpointUrl now ensures the host has a scheme prefix before constructing the telemetry endpoint URL. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
3b6f01c to
51f461f
Compare
Use JsonFormatter with PreserveProtoFieldNames(true) instead of JsonFormatter.Default to produce snake_case field names (session_id, system_configuration, etc.) matching the JDBC driver and proto schema. Default produces camelCase (sessionId) which is incorrect. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
51f461f to
5880af9
Compare
This file was accidentally committed and should not be tracked. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…task-2.2-telemetry-client-manager
5880af9 to
9447ddf
Compare
jadewang-db
left a comment
There was a problem hiding this comment.
Code review for the incremental changes in this stacked PR (ITelemetryClient interface + TelemetryClientManager + TelemetryClient implementation).
| _cts.Cancel(); | ||
|
|
||
| // Final flush of remaining events | ||
| await FlushAsync().ConfigureAwait(false); |
There was a problem hiding this comment.
Critical: Final flush is a no-op due to ordering bug.
At line 181, _disposed = true is set before the final FlushAsync() call at line 192. But FlushAsync() checks if (_disposed) return at line 121 and exits immediately. This means all remaining queued events will be silently dropped when the client closes.
Suggested fix: Move the _disposed = true assignment to after the final flush:
public async Task CloseAsync()
{
if (_disposed) return;
try
{
_flushTimer.Dispose();
_cts.Cancel();
await FlushAsync().ConfigureAwait(false); // flush FIRST
}
catch (Exception ex)
{
Debug.WriteLine($"[TRACE] TelemetryClient close error: {ex.Message}");
}
finally
{
_disposed = true; // THEN mark disposed
_cts.Dispose();
_flushLock.Dispose();
}
}Note: You'll also need a separate _closing flag or use Interlocked.CompareExchange to prevent concurrent CloseAsync calls from both flushing.
There was a problem hiding this comment.
✅ Done - Fixed the ordering bug. Introduced a _closing flag that is set first (to prevent re-entry and to signal FlushAsync to use blocking wait), then flush runs, and _disposed = true is set last in the finally block. This ensures the final flush actually executes.
This comment was generated with GitHub MCP.
| while (batch.Count < _config.BatchSize && _queue.TryDequeue(out TelemetryFrontendLog? log)) | ||
| { | ||
| batch.Add(log); | ||
| } |
There was a problem hiding this comment.
High: FlushAsync only drains one batch, remaining events are lost on close.
The while loop drains at most _config.BatchSize items. If the queue has more items than the batch size (e.g., during a burst of enqueues), only the first batch is exported. During normal operation, the timer handles subsequent batches. But during CloseAsync(), only one FlushAsync() call is made, so events beyond the first batch are dropped.
Suggested fix: Loop until the queue is empty:
// In FlushAsync, after the current batch export, continue draining:
while (!_queue.IsEmpty)
{
batch.Clear();
while (batch.Count < _config.BatchSize && _queue.TryDequeue(out var log))
batch.Add(log);
if (batch.Count > 0)
await _exporter.ExportAsync(batch, ct).ConfigureAwait(false);
}Or add a dedicated DrainAsync() method called from CloseAsync().
There was a problem hiding this comment.
✅ Done - FlushAsync now drains all queued events in a loop (while (!_queue.IsEmpty)) rather than stopping after one batch. Each iteration exports up to BatchSize items.
This comment was generated with GitHub MCP.
| /// </remarks> | ||
| public async Task ReleaseClientAsync(string host) | ||
| { | ||
| if (_clients.TryGetValue(host, out TelemetryClientHolder? holder)) |
There was a problem hiding this comment.
Critical: Race condition between GetOrCreateClient and ReleaseClientAsync.
Consider this interleaving:
- Thread A calls
ReleaseClientAsync("host"), decrements refCount to 0 - Thread B calls
GetOrCreateClient("host"),AddOrUpdatefinds the existing holder, increments refCount to 1 - Thread A calls
TryRemove("host"), removes the holder and closes the client - Thread B now holds a reference to a closed client
The issue is that Decrement → check zero → TryRemove is not atomic with respect to AddOrUpdate's update factory.
Suggested fix: Use a lock around the critical section, or use ConcurrentDictionary operations more carefully. One approach:
private readonly object _lock = new object();
public ITelemetryClient GetOrCreateClient(string host, ...)
{
lock (_lock)
{
if (_clients.TryGetValue(host, out var existing))
{
existing._refCount++;
return existing.Client;
}
var holder = new TelemetryClientHolder(new TelemetryClient(...));
_clients[host] = holder;
return holder.Client;
}
}
public async Task ReleaseClientAsync(string host)
{
TelemetryClientHolder? toClose = null;
lock (_lock)
{
if (_clients.TryGetValue(host, out var holder))
{
holder._refCount--;
if (holder._refCount == 0)
{
_clients.TryRemove(host, out toClose);
}
}
}
if (toClose != null)
await toClose.Client.CloseAsync().ConfigureAwait(false);
}There was a problem hiding this comment.
✅ Done - Replaced ConcurrentDictionary + Interlocked with a Dictionary + explicit lock to make the ref count operations (increment/decrement + conditional add/remove) atomic. CloseAsync() is called outside the lock to avoid holding it during async I/O.
This comment was generated with GitHub MCP.
| catch (Exception ex) | ||
| { | ||
| // Swallow all exceptions per telemetry requirement | ||
| Debug.WriteLine($"[TRACE] TelemetryClient flush error: {ex.Message}"); |
There was a problem hiding this comment.
Medium: Use ActivitySource instead of Debug.WriteLine for tracing.
Per the C#/.NET coding guidelines in this project, structured tracing should use ActivitySource and Activity.Current?.AddEvent() rather than Debug.WriteLine. This applies to both line 146 and line 196.
Debug.WriteLine output is stripped in Release builds and doesn't integrate with distributed tracing systems.
There was a problem hiding this comment.
✅ Done - Replaced Debug.WriteLine with Activity.Current?.AddEvent() using ActivityEvent with tagged error messages, consistent with the rest of the codebase (e.g., DatabricksStatement.cs, StatementExecutionStatement.cs).
This comment was generated with GitHub MCP.
| /// All properties have private setters to ensure immutability after construction. | ||
| /// Use the constructor or object initializer to set values. | ||
| /// </remarks> | ||
| internal sealed class TelemetrySessionContext |
There was a problem hiding this comment.
Medium: Class documentation says "Immutable" but all properties have public setters.
The XML doc says "Immutable per-connection session-level telemetry context" and "All properties have private setters to ensure immutability after construction", but all properties are declared with { get; set; } (public setters).
Either:
- Change setters to
{ get; internal set; }or{ get; private set; }and use a constructor/builder - Update the documentation to reflect that the class is mutable
This is important because immutability is a thread-safety guarantee for shared state across statements.
There was a problem hiding this comment.
✅ Done - Changed all property setters to internal set and updated documentation to accurately describe the class as "populated during connection initialization" with internal setters restricting mutation to the driver assembly.
This comment was generated with GitHub MCP.
| _queue.Enqueue(log); | ||
|
|
||
| // Trigger flush if batch size reached | ||
| if (_queue.Count >= _config.BatchSize) |
There was a problem hiding this comment.
Low: _queue.Count on ConcurrentQueue is O(n) on some runtimes.
On .NET Framework (net472 target), ConcurrentQueue<T>.Count can be expensive as it takes a snapshot. Since this is called on every Enqueue, consider using a separate Interlocked counter for the queue size if performance is a concern, or accept that the batch-size trigger is best-effort (which is fine since the timer handles periodic flushing).
There was a problem hiding this comment.
✅ Done - Added an Interlocked counter (_queueCount) that is incremented on Enqueue and decremented on TryDequeue. The batch-size check now uses this O(1) counter instead of _queue.Count.
This comment was generated with GitHub MCP.
| if (_disposed) return; | ||
|
|
||
| // Prevent concurrent flushes | ||
| if (!await _flushLock.WaitAsync(0, ct).ConfigureAwait(false)) |
There was a problem hiding this comment.
Low: WaitAsync(0) means concurrent flush requests are silently dropped.
If a timer-triggered flush is in progress and Enqueue triggers another flush (batch size reached), the second flush is silently skipped because WaitAsync(0) returns false immediately. This is acceptable for timer-based flushing, but during CloseAsync, it means the final flush could be skipped if a timer flush is still running.
Consider using WaitAsync(ct) (blocking wait) in CloseAsync's final flush path, or at least documenting this behavior.
There was a problem hiding this comment.
✅ Done - FlushAsync now uses a blocking WaitAsync(ct) when _closing is true (during CloseAsync), ensuring the final flush waits for any in-progress timer-triggered flush to complete before draining. Non-close flushes still use WaitAsync(0) for non-blocking behavior.
This comment was generated with GitHub MCP.
| public void GetOrCreateClient_NewHost_CreatesClient() | ||
| { | ||
| // Arrange | ||
| TelemetryClientManager manager = TelemetryClientManager.GetInstance(); |
There was a problem hiding this comment.
Medium: Tests use singleton TelemetryClientManager.GetInstance() causing shared state across tests.
All tests operate on the same singleton instance. While unique GUID-based host names prevent key collisions, the tests leave orphan entries in the singleton's dictionary (e.g., GetOrCreateClient_NewHost_CreatesClient creates a client but never releases it). Over time this leaks TelemetryClient instances with running timers.
Consider either:
- Making
TelemetryClientManageraccept a constructor for testability (internal +InternalsVisibleTo) - Adding cleanup in each test that calls
ReleaseClientAsyncfor created hosts - Using
IDisposable/ test fixture to clean up after tests
There was a problem hiding this comment.
✅ Done - Added an internal TelemetryClientManager(bool forTesting) constructor. All tests now create their own isolated instance instead of using the singleton. Tests also properly clean up by calling ReleaseClientAsync in finally blocks. Extracted a GetClients() helper to reduce reflection boilerplate.
This comment was generated with GitHub MCP.
- Fix final flush no-op by using _closing flag before _disposed - Drain all batches in FlushAsync instead of just one - Fix race condition in TelemetryClientManager with lock-based synchronization - Replace Debug.WriteLine with Activity.Current?.AddEvent() for tracing - Change TelemetrySessionContext setters to internal - Use Interlocked counter for queue size instead of ConcurrentQueue.Count - Use blocking wait in FlushAsync during close - Use test-isolated TelemetryClientManager instances with proper cleanup Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
🥞 Stacked PR
Use this link to review incremental changes.
What's Changed
Please fill in a description of the changes here.
This contains breaking changes.
Closes #NNN.