Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion src/amqproxy/cli.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@ class AMQProxy::CLI

@listen_address = "localhost"
@listen_port = 5673
@tls_port = 5674
@http_port = 15673
@log_level : ::Log::Severity = ::Log::Severity::Info
@idle_connection_timeout : Int32 = 5
@term_timeout = -1
@term_client_close_timeout = 0
@server : AMQProxy::Server? = nil
@tls_context : OpenSSL::SSL::Context::Server?
@tls_key_path = ""
@tls_cert_path = ""
@tls_ciphers = ""

def parse_config(path) # ameba:disable Metrics/CyclomaticComplexity
INI.parse(File.read(path)).each do |name, section|
Expand All @@ -36,8 +41,11 @@ class AMQProxy::CLI
section.each do |key, value|
case key
when "port" then @listen_port = value.to_i
when "tls_port" then @tls_port = value.to_i
when "bind", "address" then @listen_address = value
when "log_level" then @log_level = ::Log::Severity.parse(value)
when "tls_cert_path" then @tls_cert_path = value
when "tls_key_path" then @tls_key_path = value
else raise "Unsupported config #{name}/#{key}"
end
end
Expand All @@ -51,6 +59,7 @@ class AMQProxy::CLI
def apply_env_variables
@listen_address = ENV["LISTEN_ADDRESS"]? || @listen_address
@listen_port = ENV["LISTEN_PORT"]?.try &.to_i || @listen_port
@tls_port = ENV["TLS_PORT"]?.try &.to_i || @tls_port
@http_port = ENV["HTTP_PORT"]?.try &.to_i || @http_port
@log_level = ENV["LOG_LEVEL"]?.try { |level| ::Log::Severity.parse(level) } || @log_level
@idle_connection_timeout = ENV["IDLE_CONNECTION_TIMEOUT"]?.try &.to_i || @idle_connection_timeout
Expand All @@ -77,6 +86,9 @@ class AMQProxy::CLI
@listen_address = v
end
parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { |v| @listen_port = v.to_i }
parser.on("-x PORT", "--port_tls=PORT", "Port with TLS to listen on (default: 5674)") { |v| @tls_port = v.to_i }
parser.on("-c CERT_PATH", "--cert_path=PATH", "Path for certificate chain") { |v| @tls_cert_path = v }
parser.on("-k KEY_PATH", "--key_path=PATH", "Path for certificate key") { |v| @tls_key_path = v }
parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { |v| @http_port = v.to_i }
parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maximum time in seconds an unused pooled connection stays open (default 5s)") do |v|
@idle_connection_timeout = v.to_i
Expand Down Expand Up @@ -120,7 +132,20 @@ class AMQProxy::CLI
server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout)

HTTPServer.new(server, @listen_address, @http_port.to_i)
server.listen(@listen_address, @listen_port.to_i)

if @tls_key_path == ""
server.listen(@listen_address, @listen_port.to_i)
else
# start the TLS listener for amproxy

@tls_context = create_tls_context
reload_tls_context
if @tls_context
server.listen_tls(@listen_address, @tls_port.to_i, @tls_context)
else
server.listen(@listen_address, @listen_port.to_i)
end
end

shutdown

Expand Down Expand Up @@ -189,6 +214,22 @@ class AMQProxy::CLI
end
end

private def create_tls_context
context = OpenSSL::SSL::Context::Server.new
context.add_options(OpenSSL::SSL::Options.new(0x40000000)) # disable client initiated renegotiation
context
end

private def reload_tls_context
return unless tls = @tls_context

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm i might be missing some Crystal. But I dont understand this??

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copy/pasted much of the TLS functionality from LavinMQ. Commented lines (mentioned in your other comment) are from a testing procedure -- they either did not work or were not necessary. Such commented lines should be removed

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a nil guard. @tls_context may be nil, but the thanks to the guard tls won't be nil.


# Note minimal configuration of TLS for now
tls.remove_options(OpenSSL::SSL::Options::NO_TLS_V1_2)
tls.certificate_chain = @tls_cert_path
tls.private_key = @tls_key_path.empty? ? @tls_cert_path : @tls_key_path
tls.ciphers = @tls_ciphers unless @tls_ciphers.empty?
end

struct Journal::LogFormat < ::Log::StaticFormatter
def run
source
Expand Down
25 changes: 15 additions & 10 deletions src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,21 @@ require "amq-protocol"
require "./version"
require "./upstream"
require "./records"
require "./connection_info"

module AMQProxy
class Client
Log = ::Log.for(self)
getter credentials : Credentials
getter connection_info : ConnectionInfo
@channel_map = Hash(UInt16, UpstreamChannel?).new
@lock = Mutex.new
@frame_max : UInt32
@channel_max : UInt16
@heartbeat : UInt16
@last_heartbeat = Time.monotonic

def initialize(@socket : TCPSocket)
set_socket_options(@socket)
def initialize(@socket : IO, @connection_info : ConnectionInfo)
tune_ok, @credentials = negotiate(@socket)
@frame_max = tune_ok.frame_max
@channel_max = tune_ok.channel_max
Expand Down Expand Up @@ -55,10 +56,12 @@ module AMQProxy

# frames from enduser
def read_loop(channel_pool, socket = @socket) # ameba:disable Metrics/CyclomaticComplexity
Log.context.set(client: socket.remote_address.to_s)
Log.context.set(client: @connection_info.remote_address.to_s)
Log.debug { "Connected" }
i = 0u64
socket.read_timeout = (@heartbeat / 2).ceil.seconds if @heartbeat > 0
if @heartbeat > 0 && socket.responds_to?(:read_timeout=)
socket.read_timeout = (@heartbeat / 2).ceil.seconds
end
loop do
frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian)
@last_heartbeat = Time.monotonic
Expand Down Expand Up @@ -205,12 +208,14 @@ module AMQProxy
end

private def set_socket_options(socket = @socket)
socket.sync = false
socket.keepalive = true
socket.tcp_nodelay = true
socket.tcp_keepalive_idle = 60
socket.tcp_keepalive_count = 3
socket.tcp_keepalive_interval = 10
# For SSL sockets, configure the underlying TCP socket
tcp_socket = socket.is_a?(OpenSSL::SSL::Socket::Server) ? socket.io : socket
tcp_socket.sync = false
tcp_socket.keepalive = true
tcp_socket.tcp_nodelay = true
tcp_socket.tcp_keepalive_idle = 60
tcp_socket.tcp_keepalive_count = 3
tcp_socket.tcp_keepalive_interval = 10
end

private def negotiate(socket = @socket)
Expand Down
46 changes: 46 additions & 0 deletions src/amqproxy/connection_info.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
require "socket"

module AMQProxy
class ConnectionInfo
getter remote_address : IPAddress
getter local_address : IPAddress
property? ssl : Bool = false
property? ssl_verify : Bool = false
property ssl_version : String?
property ssl_cipher : String?
property ssl_key_alg : String?
property ssl_sig_alg : String?
Comment on lines +3 to +12

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LMQ codebase spotted!!! 🔥

property ssl_cn : String?

# Remote and local addresses from the server's perspective
def initialize(remote_address, local_address)
@remote_address = IPAddress.new(remote_address)
@local_address = IPAddress.new(local_address)
end

def self.local
src = Socket::IPAddress.new("127.0.0.1", 0)
dst = Socket::IPAddress.new("127.0.0.1", 0)
new(src, dst)
end

# Suspecting memory problem with Socket::IPAddress in Crystal 1.15.0
struct IPAddress
getter address : String
getter port : UInt16

def initialize(ip_address : Socket::IPAddress)
@address = ip_address.address
@port = ip_address.port.to_u16!
end

def to_s(io)
io << @address << ':' << @port
end

def loopback?
@address == "::1" || @address.starts_with? "127."
end
end
end
end
77 changes: 72 additions & 5 deletions src/amqproxy/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ require "uri"
require "./channel_pool"
require "./client"
require "./upstream"
require "./connection_info"

module AMQProxy
class Server
Log = ::Log.for(self)
@clients_lock = Mutex.new
@clients = Array(Client).new
@closed = false

def self.new(url : URI)
tls = url.scheme == "amqps"
Expand Down Expand Up @@ -46,15 +48,65 @@ module AMQProxy
loop do
socket = server.accept? || break
begin
addr = socket.remote_address
spawn handle_connection(socket, addr), name: "Client#read_loop #{addr}"
remote_addr = socket.remote_address
set_socket_options(socket)
conn_info = ConnectionInfo.new(remote_addr, socket.local_address)
spawn handle_connection(socket, conn_info), name: "Client#read_loop #{remote_addr}"
rescue IO::Error
next
end
end
Log.info { "Proxy stopping accepting connections" }
end

def listen_tls(address, port, context)
listen_tls(TCPServer.new(address, port), context)
end

def listen_tls(s : TCPServer, context)
# @listeners[s] = protocol
Log.info { "Listening on #{s.local_address} (TLS)" }
loop do # do not try to use while
client = s.accept? || break
next client.close if @closed
accept_tls(client, context)
end
rescue ex : IO::Error | OpenSSL::Error
abort "Unrecoverable error in TLS listener: #{ex.inspect_with_backtrace}"
# ensure
# @listeners.delete(s)
Comment on lines +76 to +77

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for those comments?

end

private def accept_tls(client, context)
if context
spawn(name: "Accept TLS socket") do
remote_addr = client.remote_address
set_socket_options(client)
ssl_client = OpenSSL::SSL::Socket::Server.new(client, context, sync_close: true)
# ssl_client = OpenSSL::SSL::Socket.new(client, context, sync_close: true)
set_buffer_size(ssl_client)
Log.debug { "#{remote_addr} connected with #{ssl_client.tls_version} #{ssl_client.cipher}" }
conn_info = ConnectionInfo.new(remote_addr, client.local_address)
conn_info.ssl = true
conn_info.ssl_version = ssl_client.tls_version
conn_info.ssl_cipher = ssl_client.cipher
handle_connection(ssl_client, conn_info)
rescue ex
Log.warn(exception: ex) { "Error accepting TLS connection from #{remote_addr}" }
client.close rescue nil
end
end
end

def listen_tls(bind, port, context)
listen_tls(TCPServer.new(bind, port), context)
end

private def set_buffer_size(socket)
socket.sync = true
socket.read_buffering = false
end

def stop_accepting_clients
@server.try &.close
end
Expand All @@ -73,20 +125,35 @@ module AMQProxy
end
end

private def handle_connection(socket, remote_address)
c = Client.new(socket)
private def handle_connection(socket, connection_info)
c = Client.new(socket, connection_info)
active_client(c) do
channel_pool = @channel_pools[c.credentials]
c.read_loop(channel_pool)
end
rescue IO::EOFError
# Client closed connection before/while negotiating
rescue ex # only raise from constructor, when negotating
Log.debug(exception: ex) { "Client negotiation failure (#{remote_address}) #{ex.inspect}" }
Log.debug(exception: ex) { "Client negotiation failure (#{connection_info.remote_address}) #{ex.inspect}" }
ensure
socket.close rescue nil
end

private def set_socket_options(socket)
# Note: Very minimal support for socket options for now
unless socket.remote_address.loopback?
# if keepalive = @config.tcp_keepalive
socket.keepalive = true
socket.tcp_keepalive_idle = 60
socket.tcp_keepalive_interval = 10
socket.tcp_keepalive_count = 3
# end
end
socket.tcp_nodelay = true # if @config.tcp_nodelay?
# @config.tcp_recv_buffer_size.try { |v| socket.recv_buffer_size = v }
# @config.tcp_send_buffer_size.try { |v| socket.send_buffer_size = v }
end

private def active_client(client, &)
@clients_lock.synchronize do
@clients << client
Expand Down