Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 89 additions & 1 deletion sunbeam-python/sunbeam/core/questions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

import json
import logging
import os
import sys
import termios
import typing
from pathlib import Path
from typing import Callable
Expand Down Expand Up @@ -36,6 +38,47 @@ def render_default(self, default: str) -> Text: # type: ignore [override]
return Text(f"({default[:2]}{PASSWORD_MASK})", "prompt.default")


def _read_line_no_icanon(fd: int) -> str:
"""Read one line with ICANON disabled, bypassing the N_TTY 4096-byte limit.

The Linux kernel N_TTY line discipline silently truncates pasted input at
4095 characters in canonical (ICANON) mode. Clearing only ICANON removes
that limit while preserving all other terminal behaviour:
- ECHO stays on — user sees what they paste
- ISIG stays on — Ctrl+C / Ctrl+Z still generate signals
- IEXTEN stays on — Ctrl+V etc. still work
Terminal settings are always restored in the finally block, including on
KeyboardInterrupt. Only SIGKILL bypasses finally; run ``stty sane`` if
that ever leaves the terminal in a broken state.
"""
old = termios.tcgetattr(fd)
new = list(old)
new[3] &= ~termios.ICANON # clear only ICANON, touch nothing else
new[6] = list(new[6])
new[6][termios.VMIN] = 1 # return after each character
new[6][termios.VTIME] = 0 # no read timeout
termios.tcsetattr(fd, termios.TCSANOW, new)
try:
chars: list[str] = []
while True:
ch = os.read(fd, 1).decode("utf-8", errors="replace")
if ch in ("\r", "\n"):
break
elif ch == "\x7f": # backspace
if chars:
chars.pop()
elif ch == "\x03": # Ctrl+C — belt-and-suspenders, ISIG fires first
raise KeyboardInterrupt
else:
chars.append(ch)
return "".join(chars)
finally:
try:
termios.tcsetattr(fd, termios.TCSADRAIN, old)
except OSError:
LOG.debug("Failed to restore terminal settings on fd %d", fd)


# workaround until https://github.com/Textualize/rich/issues/2994 is fixed
class StreamWrapper:
def __init__(self, read_stream, write_stream):
Expand All @@ -61,6 +104,30 @@ def write(self, s: str):
STREAM = StreamWrapper(sys.stdin, sys.stdout)


class LargeInputStreamWrapper(StreamWrapper):
"""StreamWrapper that bypasses the N_TTY 4096-byte canonical mode limit.

Used for certificate inputs where base64-encoded CA chains routinely
exceed 4095 characters. Falls back to normal readline() when stdin
is not a TTY (pipes, preseeds) so non-interactive paths are unaffected.
"""

def readline(self) -> str:
"""Read one line, bypassing the N_TTY 4096-byte limit when stdin is a TTY."""
try:
fd = self.read_stream.fileno()
if os.isatty(fd):
value = _read_line_no_icanon(fd)
return value if value else ""
except (AttributeError, OSError):
pass
value = self.read_stream.readline()
return "" if value == "\n" else value


LARGE_INPUT_STREAM = LargeInputStreamWrapper(sys.stdin, sys.stdout)

Comment thread
hemanthnakkina marked this conversation as resolved.

def get_stdin_reopen_tty() -> str:
"""Get stdin content and reopen tty if needed.

Expand All @@ -79,6 +146,7 @@ def get_stdin_reopen_tty() -> str:
# note(gboutry): Reassign stream wrapper read_stream
# to the new stdin.
STREAM.read_stream = sys.stdin
LARGE_INPUT_STREAM.read_stream = sys.stdin
LOG.debug("Reopened stdin to /dev/tty")
return stdin_input

Expand Down Expand Up @@ -148,6 +216,11 @@ def question_function(self) -> Callable[..., T]:
"""Allow subclasses to define the question function."""
raise NotImplementedError

@property
def _input_stream(self) -> StreamWrapper:
"""Stream used to read user input. Override for large inputs."""
return STREAM

def calculate_default(self, new_default: T | None = None) -> T | None:
"""Find the value to should be presented to the user as the default.

Expand Down Expand Up @@ -203,7 +276,7 @@ def ask(
console=self.console,
choices=new_choices or self.choices,
password=self.password,
stream=STREAM,
stream=self._input_stream,
)
if self.validation_function is not None and self.answer is not None:
try:
Expand All @@ -228,6 +301,21 @@ def question_function(self):
return Prompt.ask


class LargeInputPromptQuestion(PromptQuestion[T]):
"""PromptQuestion for inputs that may exceed the N_TTY 4096-byte limit.

Use this for certificate fields (certificate, ca-certificate, ca-chain)
where base64-encoded data — especially CA chains with multiple intermediates
— can easily exceed 4095 characters and be silently truncated by the
terminal line discipline in canonical mode.
"""

@property
def _input_stream(self) -> LargeInputStreamWrapper:
"""Use the large-input stream that clears ICANON on TTY reads."""
return LARGE_INPUT_STREAM


class PasswordPromptQuestion(Question[T]):
"""Ask the user for a password."""

Expand Down
6 changes: 3 additions & 3 deletions sunbeam-python/sunbeam/features/loadbalancer/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,18 +419,18 @@ def _amphora_certificate_questions(
)

base: dict[str, questions.Question] = {
"certificate": questions.PromptQuestion(
"certificate": questions.LargeInputPromptQuestion(
cert_prompt, description=cert_description
)
}
base["ca_certificate"] = questions.PromptQuestion(
base["ca_certificate"] = questions.LargeInputPromptQuestion(
"CA certificate (base64 PEM)",
description=(
"The CA certificate that signed the certificate above, base64-encoded."
" This is passed to Octavia as the issuing CA."
),
)
base["ca_chain"] = questions.PromptQuestion(
base["ca_chain"] = questions.LargeInputPromptQuestion(
"CA chain (base64 PEM, optional)",
description=(
"Full certificate chain (intermediate + root CAs) base64-encoded."
Expand Down
4 changes: 2 additions & 2 deletions sunbeam-python/sunbeam/features/tls/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,13 +400,13 @@ def certificate_questions(app: str, unit: str | None, subject: str):
# with unit name else with app name
if unit:
return {
"certificate": questions.PromptQuestion(
"certificate": questions.LargeInputPromptQuestion(
f"Base64 encoded Certificate for unit {unit} CSR Unique ID: {subject}",
),
}
else:
return {
"certificate": questions.PromptQuestion(
"certificate": questions.LargeInputPromptQuestion(
f"Base64 encoded Certificate for app {app} CSR Unique ID: {subject}",
),
}
Expand Down
Loading
Loading