tso: validate callee id for tso requests#10600
tso: validate callee id for tso requests#10600bufferflies wants to merge 12 commits intotikv:masterfrom
Conversation
Signed-off-by: bufferflies <1045931706@qq.com>
Signed-off-by: bufferflies <1045931706@qq.com>
Signed-off-by: bufferflies <1045931706@qq.com>
Signed-off-by: bufferflies <1045931706@qq.com>
Signed-off-by: bufferflies <1045931706@qq.com>
Signed-off-by: bufferflies <1045931706@qq.com>
|
Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughClients now attach a CalleeId header to TSO requests; servers validate it against their advertise listen host and return FailedPrecondition on mismatch. Clients remove stale per-URL gRPC connections via ServiceDiscovery.RemoveClientConn and retry with a new connection. ServiceDiscovery gained RemoveClientConn; callee-mismatch error detection was added. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as TSO Client
participant SD as ServiceDiscovery
participant Conn as gRPC Conn
participant Server as TSO Server
rect rgba(100,200,100,0.5)
Note over Client,Server: Initial request with CalleeId
Client->>Client: extract calleeID from serverURL
Client->>SD: GetOrCreateGRPCConn(url)
SD-->>Client: *grpc.ClientConn
Client->>Server: TsoRequest (CalleeId header)
Server->>Server: parse advertise host & compare CalleeId
alt match
Server-->>Client: Response
else mismatch
Server-->>Client: FailedPrecondition (mismatch)
end
end
rect rgba(100,150,200,0.5)
Note over Client,SD: Recovery on mismatch
Client->>Client: IsCalleeMismatch(err)
Client->>SD: RemoveClientConn(url)
SD->>Conn: Close() and delete cache entry
Client->>SD: GetOrCreateGRPCConn(url) [retry]
SD-->>Client: new *grpc.ClientConn
Client->>Server: TsoRequest (retry)
Server-->>Client: Response
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning Review ran into problems🔥 ProblemsTimed out fetching pipeline failures after 30000ms Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
/ping @iosmanthus |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
client/errs/errs.go (1)
42-48: Harden callee-mismatch detection to include gRPC status code check.Line 47 currently relies only on substring matching against error messages. This can produce false positives if other gRPC errors coincidentally contain "mismatch callee id". Adding a status code check first narrows the scope to only
codes.FailedPreconditionerrors that match the message.♻️ Proposed refactor
import ( "strings" "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/pingcap/errors" ) @@ func IsCalleeMismatch(err error) bool { if err == nil { return false } - return strings.Contains(err.Error(), MismatchCalleeIDErr) + cause := errors.Cause(err) + if status.Code(cause) != codes.FailedPrecondition { + return false + } + return strings.Contains(status.Convert(cause).Message(), MismatchCalleeIDErr) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@client/errs/errs.go` around lines 42 - 48, IsCalleeMismatch currently only does substring matching on err.Error(), which can false-positive; update IsCalleeMismatch to first extract a gRPC status via status.FromError(err), verify the status.Code() == codes.FailedPrecondition, and only then check that st.Message() (or err.Error()) contains the MismatchCalleeIDErr constant; reference the IsCalleeMismatch function, MismatchCalleeIDErr constant, and use status.FromError and codes.FailedPrecondition in the check.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@client/errs/errs.go`:
- Around line 42-48: IsCalleeMismatch currently only does substring matching on
err.Error(), which can false-positive; update IsCalleeMismatch to first extract
a gRPC status via status.FromError(err), verify the status.Code() ==
codes.FailedPrecondition, and only then check that st.Message() (or err.Error())
contains the MismatchCalleeIDErr constant; reference the IsCalleeMismatch
function, MismatchCalleeIDErr constant, and use status.FromError and
codes.FailedPrecondition in the check.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 96598a2f-f433-46f3-8eb3-101e98530b4f
⛔ Files ignored due to path filters (4)
client/go.sumis excluded by!**/*.sumgo.sumis excluded by!**/*.sumtests/integrations/go.sumis excluded by!**/*.sumtools/go.sumis excluded by!**/*.sum
📒 Files selected for processing (16)
client/clients/tso/client.goclient/clients/tso/dispatcher.goclient/clients/tso/stream.goclient/errs/errno.goclient/errs/errs.goclient/go.modclient/resource_manager_client_test.goclient/servicediscovery/mock_service_discovery.goclient/servicediscovery/router_service_discovery.goclient/servicediscovery/service_discovery.goclient/servicediscovery/tso_service_discovery.gogo.modpkg/mcs/tso/server/grpc_service.goserver/cluster/cluster.gotests/integrations/go.modtools/go.mod
|
@iosmanthus: adding LGTM is restricted to approvers and reviewers in OWNERS files. DetailsIn response to this: Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: iosmanthus The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Signed-off-by: bufferflies <1045931706@qq.com>
Signed-off-by: tongjian <1045931706@qq.com>
Codecov Report❌ Patch coverage is ❌ Your patch check has failed because the patch coverage (53.22%) is below the target coverage (74.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## master #10600 +/- ##
==========================================
- Coverage 78.96% 78.92% -0.05%
==========================================
Files 532 532
Lines 71883 72018 +135
==========================================
+ Hits 56766 56838 +72
- Misses 11093 11137 +44
- Partials 4024 4043 +19
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
| } | ||
| cc, ok := c.svcDiscovery.GetClientConns().Load(url) | ||
| if !ok { | ||
| cc, err := c.svcDiscovery.GetOrCreateGRPCConn(url) |
There was a problem hiding this comment.
Do we need to cover discovery or unary? They also reuse the same gRPC connection, such as FindGroupByKeyspaceID / GetMinTS.
There was a problem hiding this comment.
FindGroupByKeyspaceID / GetMinTS -> all the tso services can handle this, it doesn't matter if the node mismatches.
| } | ||
|
|
||
| // RemoveClientConn removes and closes the gRPC connection of the given URL. | ||
| func (c *tsoServiceDiscovery) RemoveClientConn(url string) { |
There was a problem hiding this comment.
Do we also need to clear the cached *grpc.ClientConn in serviceClient?
Signed-off-by: tongjian <1045931706@qq.com>
There was a problem hiding this comment.
🧹 Nitpick comments (2)
pkg/mcs/tso/server/grpc_service.go (1)
104-112: Callee ID validation logic is correct.The validation correctly:
- Skips when either
calleeIDorhostis empty (graceful degradation)- Returns
FailedPreconditionwhich triggers client-side reconnection viaIsCalleeMismatch()- Contains "mismatch callee id" in error message matching client detection logic
One minor observation: The error message uses
s.GetAdvertiseListenAddr()(full URL likehttp://127.0.0.1:2379) butcalleeIDis just host:port (127.0.0.1:2379). Consider usinghostinstead for consistency:♻️ Optional: Use consistent format in error message
if calleeID := header.GetCalleeId(); calleeID != "" && host != "" { if calleeID != host { return status.Errorf( codes.FailedPrecondition, "mismatch callee id, need %s but got %s", - s.GetAdvertiseListenAddr(), calleeID, + host, calleeID, ) } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/mcs/tso/server/grpc_service.go` around lines 104 - 112, The error message uses the full advertise address while the callee ID is a host:port string, so update the returned error in the callee ID check (the block using s.getAdvertiseListenHost(), header.GetCalleeId(), and s.GetAdvertiseListenAddr()) to include the host variable instead of s.GetAdvertiseListenAddr() for consistent formatting; keep the same codes.FailedPrecondition return and message text ("mismatch callee id, need %s but got %s") but pass host as the expected value to match calleeID's host:port form.pkg/mcs/tso/server/config_test.go (1)
131-140: Use subtest'stfor require instance in table-driven subtests.The
reinstance is created from the outert(line 102), but it's used inside subtests. If an assertion fails, the error reporting may not correctly associate with the specific subtest.♻️ Suggested fix
for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + re := require.New(t) s := &Server{ cfg: &Config{ AdvertiseListenAddr: tt.advertiseAddr, }, } re.Equal(tt.expected, s.getAdvertiseListenHost()) }) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/mcs/tso/server/config_test.go` around lines 131 - 140, The table-driven subtests currently use the require instance "re" created from the outer test, so assertion failures won't be associated with the specific subtest; inside the t.Run anonymous function create a new require tied to the subtest (e.g., req := require.New(t)) and use that to assert the expected value from s.getAdvertiseListenHost() for each tt in tests, ensuring failures are reported under the correct subtest.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@pkg/mcs/tso/server/config_test.go`:
- Around line 131-140: The table-driven subtests currently use the require
instance "re" created from the outer test, so assertion failures won't be
associated with the specific subtest; inside the t.Run anonymous function create
a new require tied to the subtest (e.g., req := require.New(t)) and use that to
assert the expected value from s.getAdvertiseListenHost() for each tt in tests,
ensuring failures are reported under the correct subtest.
In `@pkg/mcs/tso/server/grpc_service.go`:
- Around line 104-112: The error message uses the full advertise address while
the callee ID is a host:port string, so update the returned error in the callee
ID check (the block using s.getAdvertiseListenHost(), header.GetCalleeId(), and
s.GetAdvertiseListenAddr()) to include the host variable instead of
s.GetAdvertiseListenAddr() for consistent formatting; keep the same
codes.FailedPrecondition return and message text ("mismatch callee id, need %s
but got %s") but pass host as the expected value to match calleeID's host:port
form.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9056f393-2e32-42cd-a137-bf57323492de
📒 Files selected for processing (3)
pkg/mcs/tso/server/config_test.gopkg/mcs/tso/server/grpc_service.gopkg/mcs/tso/server/server.go
| parsed, err := url.Parse(advertiseListenAddr) | ||
| if err != nil || parsed.Host == "" { | ||
| return "" | ||
| func (s *Server) getAdvertiseListenHost() string { |
There was a problem hiding this comment.
Why do we need to use cas here?
There was a problem hiding this comment.
This function is called by many threads, so it has a concurrent risk. I think the store is also ok for the last win.
There was a problem hiding this comment.
why?The address won't be changed after the server is started.
Signed-off-by: bufferflies <1045931706@qq.com>
206d15c to
4470418
Compare
Signed-off-by: tongjian <1045931706@qq.com>
Signed-off-by: tongjian <1045931706@qq.com>
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Linking issues" section in the CONTRIBUTING.md. |
|
@bufferflies: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
| addr := cfg.GetAdvertiseListenAddr() | ||
| parsed, err := url.Parse(addr) | ||
| if err != nil { | ||
| panic(fmt.Sprintf("invalid advertise listen address: %s", addr)) |
Issue Number
ref #10516, close #10552
What problem does this PR solve?
The TSO client can keep using a stale gRPC connection after service endpoint changes. This patch carries a callee ID in TSO requests and lets the server reject requests that land on the wrong endpoint, so the client can drop the stale connection and reconnect.
What is changed and how does it work?
CalleeIdto TSO request headersRemoveClientConnto service discovery implementations so stale gRPC connections can be closed and recreatedgithub.com/pingcap/kvprototo latestv0.0.0-20260414083400-4388bfaaedabgofmt/test-stub follow-up required formake checkCheck List
Validation
go test ./pkg/mcs/tso/server -count=1cd client && go test ./errs ./servicediscovery ./clients/tso -run TestDoesNotExist -count=1make checkauthor: @iosmanthus
cp
88e4f7d1Summary by CodeRabbit
New Features
Bug Fixes
Tests
Chores