Skip to content

Commit 50d9ffb

Browse files
authored
Merge pull request #2486 from aojea/exec_v5
implement pod exec websockets v5
2 parents 8115fb6 + 4846280 commit 50d9ffb

6 files changed

Lines changed: 348 additions & 10 deletions

File tree

.github/workflows/e2e-master.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ jobs:
2424
cluster_name: kubernetes-python-e2e-master-${{ matrix.python-version }}
2525
# The kind version to be used to spin the cluster up
2626
# this needs to be updated whenever a new Kind version is released
27-
version: v0.17.0
27+
version: v0.31.0
2828
# Update the config here whenever a new client snapshot is performed
2929
# This would eventually point to cluster with the latest Kubernetes version
3030
# as we sync with Kubernetes upstream

.github/workflows/e2e-release-35.0.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ jobs:
2424
cluster_name: kubernetes-python-e2e-release-35.0-${{ matrix.python-version }}
2525
# The kind version to be used to spin the cluster up
2626
# this needs to be updated whenever a new Kind version is released
27-
version: v0.17.0
27+
version: v0.31.0
2828
# Update the config here whenever a new client snapshot is performed
2929
# This would eventually point to cluster with the latest Kubernetes version
3030
# as we sync with Kubernetes upstream

kubernetes/base/stream/ws_client.py

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@
3939
STDERR_CHANNEL = 2
4040
ERROR_CHANNEL = 3
4141
RESIZE_CHANNEL = 4
42+
CLOSE_CHANNEL = 255
43+
44+
V4_CHANNEL_PROTOCOL = "v4.channel.k8s.io"
45+
V5_CHANNEL_PROTOCOL = "v5.channel.k8s.io"
4246

4347
class _IgnoredIO:
4448
def write(self, _x):
@@ -59,26 +63,40 @@ def __init__(self, configuration, url, headers, capture_all, binary=False):
5963
"""
6064
self._connected = False
6165
self._channels = {}
66+
self._closed_channels = set()
67+
self.subprotocol = None
6268
self.binary = binary
6369
self.newline = '\n' if not self.binary else b'\n'
6470
if capture_all:
6571
self._all = StringIO() if not self.binary else BytesIO()
6672
else:
6773
self._all = _IgnoredIO()
6874
self.sock = create_websocket(configuration, url, headers)
75+
self.subprotocol = getattr(self.sock, 'subprotocol', None)
76+
if not self.subprotocol and self.sock:
77+
headers_dict = self.sock.getheaders()
78+
if headers_dict:
79+
for k, v in headers_dict.items():
80+
if k.lower() == 'sec-websocket-protocol':
81+
self.subprotocol = v
82+
break
6983
self._connected = True
7084
self._returncode = None
7185

7286
def peek_channel(self, channel, timeout=0):
7387
"""Peek a channel and return part of the input,
7488
empty string otherwise."""
89+
if channel in self._closed_channels and channel not in self._channels:
90+
return b"" if self.binary else ""
7591
self.update(timeout=timeout)
7692
if channel in self._channels:
7793
return self._channels[channel]
78-
return ""
94+
return b"" if self.binary else ""
7995

8096
def read_channel(self, channel, timeout=0):
8197
"""Read data from a channel."""
98+
if channel in self._closed_channels and channel not in self._channels:
99+
return b"" if self.binary else ""
82100
if channel not in self._channels:
83101
ret = self.peek_channel(channel, timeout)
84102
else:
@@ -93,6 +111,7 @@ def readline_channel(self, channel, timeout=None):
93111
timeout = float("inf")
94112
start = time.time()
95113
while self.is_open() and time.time() - start < timeout:
114+
# Always try to drain the channel first
96115
if channel in self._channels:
97116
data = self._channels[channel]
98117
if self.newline in data:
@@ -104,6 +123,14 @@ def readline_channel(self, channel, timeout=None):
104123
else:
105124
del self._channels[channel]
106125
return ret
126+
127+
if channel in self._closed_channels:
128+
if channel in self._channels:
129+
ret = self._channels[channel]
130+
del self._channels[channel]
131+
return ret
132+
return b"" if self.binary else ""
133+
107134
self.update(timeout=(timeout - time.time() + start))
108135

109136
def write_channel(self, channel, data):
@@ -119,6 +146,14 @@ def write_channel(self, channel, data):
119146
payload = channel_prefix + data
120147
self.sock.send(payload, opcode=opcode)
121148

149+
def close_channel(self, channel):
150+
"""Close a channel (v5 protocol only)."""
151+
if self.subprotocol != V5_CHANNEL_PROTOCOL:
152+
return
153+
data = bytes([CLOSE_CHANNEL, channel])
154+
self.sock.send(data, opcode=ABNF.OPCODE_BINARY)
155+
self._closed_channels.add(channel)
156+
122157
def peek_stdout(self, timeout=0):
123158
"""Same as peek_channel with channel=1."""
124159
return self.peek_channel(STDOUT_CHANNEL, timeout=timeout)
@@ -200,13 +235,24 @@ def update(self, timeout=0):
200235
return
201236
elif op_code == ABNF.OPCODE_BINARY or op_code == ABNF.OPCODE_TEXT:
202237
data = frame.data
203-
if six.PY3 and not self.binary:
204-
data = data.decode("utf-8", "replace")
205-
if len(data) > 1:
238+
if len(data) > 0:
239+
# Parse channel from raw bytes to support v5 CLOSE signal AND avoid charset issues
206240
channel = data[0]
207-
if six.PY3 and not self.binary:
208-
channel = ord(channel)
241+
# In Py3, iterating bytes gives int, but indexing bytes gives int.
242+
# websocket-client frame.data might be bytes.
243+
244+
if channel == CLOSE_CHANNEL and self.subprotocol == V5_CHANNEL_PROTOCOL: # v5 CLOSE
245+
if len(data) > 1:
246+
# data[1] is already int in Py3 bytes
247+
close_chan = data[1]
248+
self._closed_channels.add(close_chan)
249+
return
250+
209251
data = data[1:]
252+
# Decode data if expected text
253+
if not self.binary:
254+
data = data.decode("utf-8", "replace")
255+
210256
if data:
211257
if channel in [STDOUT_CHANNEL, STDERR_CHANNEL]:
212258
# keeping all messages in the order they received
@@ -476,7 +522,7 @@ def create_websocket(configuration, url, headers=None):
476522
header.append("sec-websocket-protocol: %s" %
477523
headers['sec-websocket-protocol'])
478524
else:
479-
header.append("sec-websocket-protocol: v4.channel.k8s.io")
525+
header.append("sec-websocket-protocol: %s,%s" % (V5_CHANNEL_PROTOCOL, V4_CHANNEL_PROTOCOL))
480526

481527
if url.startswith('wss://') and configuration.verify_ssl:
482528
ssl_opts = {

kubernetes/base/stream/ws_client_test.py

Lines changed: 222 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,18 @@
1313
# limitations under the License.
1414

1515
import unittest
16+
from unittest.mock import MagicMock, patch
1617

17-
from .ws_client import get_websocket_url
18+
from . import ws_client as ws_client_module
19+
from .ws_client import get_websocket_url, WSClient, V5_CHANNEL_PROTOCOL, V4_CHANNEL_PROTOCOL, CLOSE_CHANNEL, STDIN_CHANNEL
1820
from .ws_client import websocket_proxycare
1921
from kubernetes.client.configuration import Configuration
2022
import os
2123
import socket
2224
import threading
2325
import pytest
2426
from kubernetes import stream, client, config
27+
import websocket
2528

2629
try:
2730
import urllib3
@@ -123,6 +126,224 @@ def test_websocket_proxycare(self):
123126
assert dictval(connect_opts, 'http_proxy_auth') == expect_auth
124127
assert dictval(connect_opts, 'http_no_proxy') == expect_noproxy
125128

129+
130+
class WSClientProtocolTest(unittest.TestCase):
131+
"""Tests for WSClient V5 protocol handling"""
132+
133+
def setUp(self):
134+
# Mock configuration to avoid real connections in WSClient.__init__
135+
self.config_mock = MagicMock()
136+
self.config_mock.assert_hostname = False
137+
self.config_mock.api_key = {}
138+
self.config_mock.proxy = None
139+
self.config_mock.ssl_ca_cert = None
140+
self.config_mock.cert_file = None
141+
self.config_mock.key_file = None
142+
self.config_mock.verify_ssl = True
143+
144+
def test_create_websocket_header(self):
145+
"""Verify sec-websocket-protocol header requests v5 first"""
146+
# Patch WebSocket class in the module
147+
with patch.object(ws_client_module, 'WebSocket', autospec=True) as mock_ws_cls:
148+
mock_ws = mock_ws_cls.return_value
149+
150+
WSClient(self.config_mock, "ws://test", headers=None, capture_all=True)
151+
152+
mock_ws.connect.assert_called_once()
153+
call_args = mock_ws.connect.call_args
154+
# connect(url, **options)
155+
# check kwargs for 'header'
156+
kwargs = call_args[1]
157+
self.assertIn('header', kwargs)
158+
expected_header = f"sec-websocket-protocol: {V5_CHANNEL_PROTOCOL},{V4_CHANNEL_PROTOCOL}"
159+
self.assertIn(expected_header, kwargs['header'])
160+
161+
def test_close_channel_v5(self):
162+
"""Verify close_channel sends correct frame when v5 is negotiated"""
163+
with patch.object(ws_client_module, 'create_websocket') as mock_create:
164+
mock_ws = MagicMock()
165+
mock_ws.subprotocol = V5_CHANNEL_PROTOCOL
166+
mock_ws.connected = True
167+
mock_create.return_value = mock_ws
168+
169+
client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True)
170+
client.close_channel(0)
171+
172+
mock_ws.send.assert_called_with(bytes([CLOSE_CHANNEL, STDIN_CHANNEL]), opcode=websocket.ABNF.OPCODE_BINARY)
173+
174+
def test_close_channel_v4(self):
175+
"""Verify close_channel does nothing when v4 is negotiated"""
176+
with patch.object(ws_client_module, 'create_websocket') as mock_create:
177+
mock_ws = MagicMock()
178+
mock_ws.subprotocol = V4_CHANNEL_PROTOCOL
179+
mock_ws.connected = True
180+
mock_create.return_value = mock_ws
181+
182+
client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True)
183+
client.close_channel(0)
184+
185+
mock_ws.send.assert_not_called()
186+
187+
def test_update_receives_close_v5(self):
188+
"""Verify update processes close signal when v5 is negotiated"""
189+
with patch.object(ws_client_module, 'create_websocket') as mock_create, \
190+
patch('select.select') as mock_select:
191+
192+
mock_ws = MagicMock()
193+
mock_ws.subprotocol = V5_CHANNEL_PROTOCOL
194+
mock_ws.connected = True
195+
mock_ws.sock.fileno.return_value = 10
196+
197+
# Setup frame with close signal for channel 0
198+
frame = MagicMock()
199+
frame.data = bytes([CLOSE_CHANNEL, STDIN_CHANNEL])
200+
mock_ws.recv_data_frame.return_value = (websocket.ABNF.OPCODE_BINARY, frame)
201+
202+
mock_create.return_value = mock_ws
203+
# Make select return ready
204+
mock_select.return_value = ([mock_ws.sock], [], [])
205+
206+
client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True)
207+
client.update(timeout=0)
208+
209+
self.assertIn(0, client._closed_channels)
210+
211+
def test_update_ignores_close_signal_v4(self):
212+
"""Verify update treats 0xFF as regular data (or ignores signal interpretation) when v4"""
213+
with patch.object(ws_client_module, 'create_websocket') as mock_create, \
214+
patch('select.select') as mock_select:
215+
216+
mock_ws = MagicMock()
217+
mock_ws.subprotocol = V4_CHANNEL_PROTOCOL
218+
mock_ws.connected = True
219+
mock_ws.sock.fileno.return_value = 10
220+
221+
# Setup frame that looks like close signal but should be treated as data
222+
frame = MagicMock()
223+
frame.data = bytes([CLOSE_CHANNEL, STDIN_CHANNEL])
224+
mock_ws.recv_data_frame.return_value = (websocket.ABNF.OPCODE_BINARY, frame)
225+
226+
mock_create.return_value = mock_ws
227+
mock_select.return_value = ([mock_ws.sock], [], [])
228+
229+
client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True, binary=True) # binary=True to avoid decode errors
230+
client.update(timeout=0)
231+
232+
# Should NOT be in closed channels
233+
self.assertNotIn(0, client._closed_channels)
234+
# Should be in data channels (channel 255 with data \x00)
235+
# Code: channel = data[0] (255), data = data[1:] (\x00)
236+
# if channel (255) not in _channels...
237+
self.assertIn(255, client._channels)
238+
self.assertEqual(client._channels[255], b'\x00')
239+
240+
def test_readline_channel_closed_with_leftover_data(self):
241+
"""Verify readline_channel flushes remaining buffer when channel is closed"""
242+
with patch.object(ws_client_module, 'create_websocket') as mock_create:
243+
mock_ws = MagicMock()
244+
mock_ws.subprotocol = V5_CHANNEL_PROTOCOL
245+
mock_ws.connected = True
246+
mock_create.return_value = mock_ws
247+
248+
client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True, binary=False)
249+
250+
# Simulate some data in the channel buffer, and then close it
251+
client._channels[1] = "hello"
252+
client._closed_channels.add(1)
253+
254+
# First call to readline should flush "hello" even though there is no newline
255+
line1 = client.readline_channel(1)
256+
self.assertEqual(line1, "hello")
257+
258+
# Subsequent call should return empty string
259+
line2 = client.readline_channel(1)
260+
self.assertEqual(line2, "")
261+
262+
def test_readline_channel_closed_with_leftover_data_binary(self):
263+
"""Verify readline_channel flushes remaining buffer when channel is closed in binary mode"""
264+
with patch.object(ws_client_module, 'create_websocket') as mock_create:
265+
mock_ws = MagicMock()
266+
mock_ws.subprotocol = V5_CHANNEL_PROTOCOL
267+
mock_ws.connected = True
268+
mock_create.return_value = mock_ws
269+
270+
client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True, binary=True)
271+
272+
# Simulate some bytes in the channel buffer, and then close it
273+
client._channels[1] = b"hello-binary"
274+
client._closed_channels.add(1)
275+
276+
# First call to readline should flush leftover bytes
277+
line1 = client.readline_channel(1)
278+
self.assertEqual(line1, b"hello-binary")
279+
280+
# Subsequent call should return empty bytes
281+
line2 = client.readline_channel(1)
282+
self.assertEqual(line2, b"")
283+
284+
def test_read_channel_closed_with_leftover_data(self):
285+
"""Verify read_channel drains leftover data and then short-circuits on closed channel"""
286+
with patch.object(ws_client_module, 'create_websocket') as mock_create:
287+
mock_ws = MagicMock()
288+
mock_ws.subprotocol = V5_CHANNEL_PROTOCOL
289+
mock_ws.connected = True
290+
mock_ws.sock.fileno.return_value = 10
291+
mock_create.return_value = mock_ws
292+
293+
client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True, binary=False)
294+
295+
# Simulate leftover data and closed channel
296+
client._channels[1] = "hello"
297+
client._closed_channels.add(1)
298+
299+
# First call should drain data
300+
data1 = client.read_channel(1)
301+
self.assertEqual(data1, "hello")
302+
303+
# Subsequent call should short-circuit and return empty string
304+
# Patch `update` to assert it is NOT called (short-circuit)
305+
with patch.object(client, 'update') as mock_update:
306+
data2 = client.read_channel(1)
307+
self.assertEqual(data2, "")
308+
mock_update.assert_not_called()
309+
310+
def test_peek_channel_closed_with_leftover_data(self):
311+
"""Verify peek_channel allows peeking leftover data and then short-circuits after draining"""
312+
with patch.object(ws_client_module, 'create_websocket') as mock_create, \
313+
patch('select.poll') as mock_poll:
314+
mock_poll.return_value.poll.return_value = []
315+
mock_ws = MagicMock()
316+
mock_ws.subprotocol = V5_CHANNEL_PROTOCOL
317+
mock_ws.connected = True
318+
mock_ws.sock.fileno.return_value = 10
319+
mock_create.return_value = mock_ws
320+
321+
client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True, binary=False)
322+
323+
# Simulate leftover data and closed channel
324+
client._channels[1] = "hello"
325+
client._closed_channels.add(1)
326+
327+
# First peek should return data without draining
328+
data1 = client.peek_channel(1)
329+
self.assertEqual(data1, "hello")
330+
331+
# Second peek should still return data
332+
data2 = client.peek_channel(1)
333+
self.assertEqual(data2, "hello")
334+
335+
# Drain it
336+
client.read_channel(1)
337+
338+
# Now peek should short-circuit and return empty string
339+
# Patch `update` to assert it is NOT called (short-circuit)
340+
with patch.object(client, 'update') as mock_update:
341+
data3 = client.peek_channel(1)
342+
self.assertEqual(data3, "")
343+
mock_update.assert_not_called()
344+
345+
346+
126347
@pytest.fixture(scope="module")
127348
def dummy_proxy():
128349
#Dummy Proxy

0 commit comments

Comments
 (0)