according to docs, i can setup an auth interceptor like this:
class AuthInterceptorSync:
"""Synchronous interceptor to add authentication headers to all requests"""
def __init__(self, token: str):
self.token = token
def on_start(self, ctx: RequestContext) -> None:
"""Add authorization header when RPC starts"""
ctx.request_headers()["authorization"] = self.token
def on_end(self, token: None, ctx: RequestContext) -> None:
"""Called when RPC ends"""
pass
But on_start is never called.
When i instead defined intercept_unary_sync, it worked.
class AuthInterceptorSync:
"""Synchronous interceptor to add authentication headers to all requests"""
def __init__(self, token: str):
self.token = token
def intercept_unary_sync(
self,
call_next: Callable[[REQ, RequestContext], RES],
request: REQ,
ctx: RequestContext,
) -> RES:
"""Intercepts a unary RPC.
Args:
call_next: A callable to invoke to continue processing, either to another
interceptor or the actual RPC. Generally will be called with the same
request the interceptor received but the request can be replaced as
needed. Can be skipped if returning a response from the interceptor
directly.
request: The request message.
ctx: The request context.
Returns:
The response message.
"""
ctx.request_headers()["authorization"] = self.token
return call_next(request, ctx)
according to docs, i can setup an auth interceptor like this:
But
on_startis never called.When i instead defined
intercept_unary_sync, it worked.