class Fluent::SecureForwardInput
Constants
- DEFAULT_SECURE_LISTEN_PORT
- HOSTNAME_PLACEHOLDERS
Attributes
nodes[R]
read_interval[R]
sessions[R]
socket_interval[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_secure_forward.rb, line 90 def initialize super @cert = nil end
Public Instance Methods
certificate()
click to toggle source
# File lib/fluent/plugin/in_secure_forward.rb, line 201 def certificate return @cert, @key if @cert && @key @client_ca = nil if @cert_path @key = OpenSSL::PKey::RSA.new(File.read(@private_key_path), @private_key_passphrase) certs = Fluent::SecureForward::CertUtil.certificates_from_file(@cert_path) @cert = certs.shift @client_ca = certs elsif @ca_cert_path opts = { ca_cert_path: @ca_cert_path, ca_key_path: @ca_private_key_path, ca_key_passphrase: @ca_private_key_passphrase, private_key_length: @generate_private_key_length, country: @generate_cert_country, state: @generate_cert_state, locality: @generate_cert_locality, common_name: @generate_cert_common_name, } @cert, @key = Fluent::SecureForward::CertUtil.generate_server_pair(opts) else opts = { private_key_length: @generate_private_key_length, country: @generate_cert_country, state: @generate_cert_state, locality: @generate_cert_locality, common_name: @generate_cert_common_name, } @cert, @key = Fluent::SecureForward::CertUtil.generate_self_signed_server_pair(opts) end return @cert, @key end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_secure_forward.rb, line 112 def configure(conf) hostname = conf.has_key?('hostname') ? conf['hostname'].to_s : Socket.gethostname replace_hostname_placeholder(conf, hostname) super if @secure unless @cert_path || @ca_cert_path raise Fluent::ConfigError, "cert_path or ca_cert_path required for secure communication" end if @cert_path raise Fluent::ConfigError, "private_key_path required" unless @private_key_path raise Fluent::ConfigError, "private_key_passphrase required" unless @private_key_passphrase certs = Fluent::SecureForward::CertUtil.certificates_from_file(@cert_path) if certs.size < 1 raise Fluent::ConfigError, "no valid certificates in cert_path: #{@cert_path}" end else # @ca_cert_path raise Fluent::ConfigError, "ca_private_key_path required" unless @ca_private_key_path raise Fluent::ConfigError, "ca_private_key_passphrase required" unless @ca_private_key_passphrase end else log.warn "'insecure' mode has vulnerability for man-in-the-middle attacks for clients (output plugins)." end @read_interval = @read_interval_msec / 1000.0 @socket_interval = @socket_interval_msec / 1000.0 @nodes = [] @clients.each do |client| if client.host && client.network raise Fluent::ConfigError, "both of 'host' and 'network' are specified for client" end if !client.host && !client.network raise Fluent::ConfigError, "Either of 'host' and 'network' must be specified for client" end source = nil if client.host begin source = IPSocket.getaddress(client.host) rescue SocketError => e raise Fluent::ConfigError, "host '#{client.host}' cannot be resolved" end end source_addr = begin IPAddr.new(source || client.network) rescue ArgumentError => e raise Fluent::ConfigError, "network '#{client.network}' address format is invalid" end @nodes.push({ address: source_addr, shared_key: (client.shared_key || @shared_key), users: (client.users ? client.users.split(',') : nil) }) end @generate_cert_common_name ||= @self_hostname # To check whether certificates are successfully generated/loaded at startup time self.certificate true end
on_message(msg)
click to toggle source
# File lib/fluent/plugin/in_secure_forward.rb, line 282 def on_message(msg) # NOTE: copy&paste from Fluent::ForwardInput#on_message(msg) # TODO: format error tag = msg[0].to_s entries = msg[1] if entries.class == String # PackedForward es = MessagePackEventStream.new(entries, @cached_unpacker) router.emit_stream(tag, es) elsif entries.class == Array # Forward es = Fluent::MultiEventStream.new entries.each {|e| time = e[0].to_i time = (now ||= Fluent::Engine.now) if time == 0 record = e[1] es.add(time, record) } router.emit_stream(tag, es) else # Message time = msg[1] time = Fluent::Engine.now if time == 0 record = msg[2] router.emit(tag, time, record) end end
replace_hostname_placeholder(conf, hostname)
click to toggle source
# File lib/fluent/plugin/in_secure_forward.rb, line 97 def replace_hostname_placeholder(conf, hostname) replace_element = ->(c) { c.keys.each do |key| v = c[key] if v && v.respond_to?(:include?) && v.respond_to?(:gsub) if HOSTNAME_PLACEHOLDERS.any?{|ph| v.include?(ph) } c[key] = HOSTNAME_PLACEHOLDERS.inject(v){|r, ph| r.gsub(ph, hostname) } end end end c.elements.each{|e| replace_element.call(e) } } replace_element.call(conf) end
run()
click to toggle source
# File lib/fluent/plugin/in_secure_forward.rb, line 235 def run # sslsocket server thread log.trace "setup for ssl sessions" cert, key = self.certificate ctx = OpenSSL::SSL::SSLContext.new(@ssl_version) if @secure # inject OpenSSL::SSL::SSLContext::DEFAULT_PARAMS # https://bugs.ruby-lang.org/issues/9424 ctx.set_params({}) if @ssl_ciphers ctx.ciphers = @ssl_ciphers else ### follow httpclient configuration by nahi # OpenSSL 0.9.8 default: "ALL:!ADH:!LOW:!EXP:!MD5:+SSLv2:@STRENGTH" ctx.ciphers = "ALL:!aNULL:!eNULL:!SSLv2" # OpenSSL >1.0.0 default end end ctx.cert = cert ctx.key = key if @client_ca ctx.extra_chain_cert = @client_ca end log.trace "start to listen", bind: @bind, port: @port server = TCPServer.new(@bind, @port) log.trace "starting SSL server", bind: @bind, port: @port @sock = OpenSSL::SSL::SSLServer.new(server, ctx) @sock.start_immediately = false begin log.trace "accepting sessions" loop do while socket = @sock.accept log.trace "accept tcp connection (ssl session not established yet)" @sessions.push Session.new(self, socket) # cleanup closed session instance @sessions.delete_if(&:closed?) log.trace "session instances:", all: @sessions.size, closed: @sessions.select(&:closed?).size end end rescue OpenSSL::SSL::SSLError => e raise unless e.message.start_with?('SSL_accept SYSCALL') # signal trap on accept end end
select_authenticate_users(node, username)
click to toggle source
# File lib/fluent/plugin/in_secure_forward.rb, line 193 def select_authenticate_users(node, username) if node.nil? || node[:users].nil? @users.select{|u| u.username == username} else @users.select{|u| node[:users].include?(u.username) && u.username == username} end end
shutdown()
click to toggle source
# File lib/fluent/plugin/in_secure_forward.rb, line 186 def shutdown @listener.kill @listener.join @sessions.each{ |s| s.shutdown } @sock.close end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_secure_forward.rb, line 177 def start super OpenSSL::Random.seed(SecureRandom.random_bytes(16)) @sessions = [] @sock = nil @listener = Thread.new(&method(:run)) @listener.abort_on_exception end