diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index 24e5ce2..425b3c3 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -11,12 +11,17 @@ class AMQProxy::CLI @listen_address = "localhost" @listen_port = 5673 + @tls_port = 5674 @http_port = 15673 @log_level : ::Log::Severity = ::Log::Severity::Info @idle_connection_timeout : Int32 = 5 @term_timeout = -1 @term_client_close_timeout = 0 @server : AMQProxy::Server? = nil + @tls_context : OpenSSL::SSL::Context::Server? + @tls_key_path = "" + @tls_cert_path = "" + @tls_ciphers = "" def parse_config(path) # ameba:disable Metrics/CyclomaticComplexity INI.parse(File.read(path)).each do |name, section| @@ -36,8 +41,11 @@ class AMQProxy::CLI section.each do |key, value| case key when "port" then @listen_port = value.to_i + when "tls_port" then @tls_port = value.to_i when "bind", "address" then @listen_address = value when "log_level" then @log_level = ::Log::Severity.parse(value) + when "tls_cert_path" then @tls_cert_path = value + when "tls_key_path" then @tls_key_path = value else raise "Unsupported config #{name}/#{key}" end end @@ -51,6 +59,7 @@ class AMQProxy::CLI def apply_env_variables @listen_address = ENV["LISTEN_ADDRESS"]? || @listen_address @listen_port = ENV["LISTEN_PORT"]?.try &.to_i || @listen_port + @tls_port = ENV["TLS_PORT"]?.try &.to_i || @tls_port @http_port = ENV["HTTP_PORT"]?.try &.to_i || @http_port @log_level = ENV["LOG_LEVEL"]?.try { |level| ::Log::Severity.parse(level) } || @log_level @idle_connection_timeout = ENV["IDLE_CONNECTION_TIMEOUT"]?.try &.to_i || @idle_connection_timeout @@ -77,6 +86,9 @@ class AMQProxy::CLI @listen_address = v end parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { |v| @listen_port = v.to_i } + parser.on("-x PORT", "--port_tls=PORT", "Port with TLS to listen on (default: 5674)") { |v| @tls_port = v.to_i } + parser.on("-c CERT_PATH", "--cert_path=PATH", "Path for certificate chain") { |v| @tls_cert_path = v } + parser.on("-k KEY_PATH", "--key_path=PATH", "Path for certificate key") { |v| @tls_key_path = v } parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { |v| @http_port = v.to_i } parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maximum time in seconds an unused pooled connection stays open (default 5s)") do |v| @idle_connection_timeout = v.to_i @@ -120,7 +132,20 @@ class AMQProxy::CLI server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout) HTTPServer.new(server, @listen_address, @http_port.to_i) - server.listen(@listen_address, @listen_port.to_i) + + if @tls_key_path == "" + server.listen(@listen_address, @listen_port.to_i) + else + # start the TLS listener for amproxy + + @tls_context = create_tls_context + reload_tls_context + if @tls_context + server.listen_tls(@listen_address, @tls_port.to_i, @tls_context) + else + server.listen(@listen_address, @listen_port.to_i) + end + end shutdown @@ -189,6 +214,22 @@ class AMQProxy::CLI end end + private def create_tls_context + context = OpenSSL::SSL::Context::Server.new + context.add_options(OpenSSL::SSL::Options.new(0x40000000)) # disable client initiated renegotiation + context + end + + private def reload_tls_context + return unless tls = @tls_context + + # Note minimal configuration of TLS for now + tls.remove_options(OpenSSL::SSL::Options::NO_TLS_V1_2) + tls.certificate_chain = @tls_cert_path + tls.private_key = @tls_key_path.empty? ? @tls_cert_path : @tls_key_path + tls.ciphers = @tls_ciphers unless @tls_ciphers.empty? + end + struct Journal::LogFormat < ::Log::StaticFormatter def run source diff --git a/src/amqproxy/client.cr b/src/amqproxy/client.cr index ffdc8a6..7cb2003 100644 --- a/src/amqproxy/client.cr +++ b/src/amqproxy/client.cr @@ -3,11 +3,13 @@ require "amq-protocol" require "./version" require "./upstream" require "./records" +require "./connection_info" module AMQProxy class Client Log = ::Log.for(self) getter credentials : Credentials + getter connection_info : ConnectionInfo @channel_map = Hash(UInt16, UpstreamChannel?).new @lock = Mutex.new @frame_max : UInt32 @@ -15,8 +17,7 @@ module AMQProxy @heartbeat : UInt16 @last_heartbeat = Time.monotonic - def initialize(@socket : TCPSocket) - set_socket_options(@socket) + def initialize(@socket : IO, @connection_info : ConnectionInfo) tune_ok, @credentials = negotiate(@socket) @frame_max = tune_ok.frame_max @channel_max = tune_ok.channel_max @@ -55,10 +56,12 @@ module AMQProxy # frames from enduser def read_loop(channel_pool, socket = @socket) # ameba:disable Metrics/CyclomaticComplexity - Log.context.set(client: socket.remote_address.to_s) + Log.context.set(client: @connection_info.remote_address.to_s) Log.debug { "Connected" } i = 0u64 - socket.read_timeout = (@heartbeat / 2).ceil.seconds if @heartbeat > 0 + if @heartbeat > 0 && socket.responds_to?(:read_timeout=) + socket.read_timeout = (@heartbeat / 2).ceil.seconds + end loop do frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) @last_heartbeat = Time.monotonic @@ -205,12 +208,14 @@ module AMQProxy end private def set_socket_options(socket = @socket) - socket.sync = false - socket.keepalive = true - socket.tcp_nodelay = true - socket.tcp_keepalive_idle = 60 - socket.tcp_keepalive_count = 3 - socket.tcp_keepalive_interval = 10 + # For SSL sockets, configure the underlying TCP socket + tcp_socket = socket.is_a?(OpenSSL::SSL::Socket::Server) ? socket.io : socket + tcp_socket.sync = false + tcp_socket.keepalive = true + tcp_socket.tcp_nodelay = true + tcp_socket.tcp_keepalive_idle = 60 + tcp_socket.tcp_keepalive_count = 3 + tcp_socket.tcp_keepalive_interval = 10 end private def negotiate(socket = @socket) diff --git a/src/amqproxy/connection_info.cr b/src/amqproxy/connection_info.cr new file mode 100644 index 0000000..f811486 --- /dev/null +++ b/src/amqproxy/connection_info.cr @@ -0,0 +1,46 @@ +require "socket" + +module AMQProxy + class ConnectionInfo + getter remote_address : IPAddress + getter local_address : IPAddress + property? ssl : Bool = false + property? ssl_verify : Bool = false + property ssl_version : String? + property ssl_cipher : String? + property ssl_key_alg : String? + property ssl_sig_alg : String? + property ssl_cn : String? + + # Remote and local addresses from the server's perspective + def initialize(remote_address, local_address) + @remote_address = IPAddress.new(remote_address) + @local_address = IPAddress.new(local_address) + end + + def self.local + src = Socket::IPAddress.new("127.0.0.1", 0) + dst = Socket::IPAddress.new("127.0.0.1", 0) + new(src, dst) + end + + # Suspecting memory problem with Socket::IPAddress in Crystal 1.15.0 + struct IPAddress + getter address : String + getter port : UInt16 + + def initialize(ip_address : Socket::IPAddress) + @address = ip_address.address + @port = ip_address.port.to_u16! + end + + def to_s(io) + io << @address << ':' << @port + end + + def loopback? + @address == "::1" || @address.starts_with? "127." + end + end + end +end diff --git a/src/amqproxy/server.cr b/src/amqproxy/server.cr index 4af02f4..6016ded 100644 --- a/src/amqproxy/server.cr +++ b/src/amqproxy/server.cr @@ -5,12 +5,14 @@ require "uri" require "./channel_pool" require "./client" require "./upstream" +require "./connection_info" module AMQProxy class Server Log = ::Log.for(self) @clients_lock = Mutex.new @clients = Array(Client).new + @closed = false def self.new(url : URI) tls = url.scheme == "amqps" @@ -46,8 +48,10 @@ module AMQProxy loop do socket = server.accept? || break begin - addr = socket.remote_address - spawn handle_connection(socket, addr), name: "Client#read_loop #{addr}" + remote_addr = socket.remote_address + set_socket_options(socket) + conn_info = ConnectionInfo.new(remote_addr, socket.local_address) + spawn handle_connection(socket, conn_info), name: "Client#read_loop #{remote_addr}" rescue IO::Error next end @@ -55,6 +59,54 @@ module AMQProxy Log.info { "Proxy stopping accepting connections" } end + def listen_tls(address, port, context) + listen_tls(TCPServer.new(address, port), context) + end + + def listen_tls(s : TCPServer, context) + # @listeners[s] = protocol + Log.info { "Listening on #{s.local_address} (TLS)" } + loop do # do not try to use while + client = s.accept? || break + next client.close if @closed + accept_tls(client, context) + end + rescue ex : IO::Error | OpenSSL::Error + abort "Unrecoverable error in TLS listener: #{ex.inspect_with_backtrace}" + # ensure + # @listeners.delete(s) + end + + private def accept_tls(client, context) + if context + spawn(name: "Accept TLS socket") do + remote_addr = client.remote_address + set_socket_options(client) + ssl_client = OpenSSL::SSL::Socket::Server.new(client, context, sync_close: true) + # ssl_client = OpenSSL::SSL::Socket.new(client, context, sync_close: true) + set_buffer_size(ssl_client) + Log.debug { "#{remote_addr} connected with #{ssl_client.tls_version} #{ssl_client.cipher}" } + conn_info = ConnectionInfo.new(remote_addr, client.local_address) + conn_info.ssl = true + conn_info.ssl_version = ssl_client.tls_version + conn_info.ssl_cipher = ssl_client.cipher + handle_connection(ssl_client, conn_info) + rescue ex + Log.warn(exception: ex) { "Error accepting TLS connection from #{remote_addr}" } + client.close rescue nil + end + end + end + + def listen_tls(bind, port, context) + listen_tls(TCPServer.new(bind, port), context) + end + + private def set_buffer_size(socket) + socket.sync = true + socket.read_buffering = false + end + def stop_accepting_clients @server.try &.close end @@ -73,8 +125,8 @@ module AMQProxy end end - private def handle_connection(socket, remote_address) - c = Client.new(socket) + private def handle_connection(socket, connection_info) + c = Client.new(socket, connection_info) active_client(c) do channel_pool = @channel_pools[c.credentials] c.read_loop(channel_pool) @@ -82,11 +134,26 @@ module AMQProxy rescue IO::EOFError # Client closed connection before/while negotiating rescue ex # only raise from constructor, when negotating - Log.debug(exception: ex) { "Client negotiation failure (#{remote_address}) #{ex.inspect}" } + Log.debug(exception: ex) { "Client negotiation failure (#{connection_info.remote_address}) #{ex.inspect}" } ensure socket.close rescue nil end + private def set_socket_options(socket) + # Note: Very minimal support for socket options for now + unless socket.remote_address.loopback? + # if keepalive = @config.tcp_keepalive + socket.keepalive = true + socket.tcp_keepalive_idle = 60 + socket.tcp_keepalive_interval = 10 + socket.tcp_keepalive_count = 3 + # end + end + socket.tcp_nodelay = true # if @config.tcp_nodelay? + # @config.tcp_recv_buffer_size.try { |v| socket.recv_buffer_size = v } + # @config.tcp_send_buffer_size.try { |v| socket.send_buffer_size = v } + end + private def active_client(client, &) @clients_lock.synchronize do @clients << client