403Webshell
Server IP : 66.29.132.122  /  Your IP : 3.144.252.224
Web Server : LiteSpeed
System : Linux business142.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : admazpex ( 531)
PHP Version : 7.2.34
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/self/root/opt/alt/ruby32/share/rubygems/rubygems/gemcutter_utilities/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/opt/alt/ruby32/share/rubygems/rubygems/gemcutter_utilities//webauthn_listener.rb
# frozen_string_literal: true

require_relative "webauthn_listener/response"

##
# The WebauthnListener class retrieves an OTP after a user successfully WebAuthns with the Gem host.
# An instance opens a socket using the TCPServer instance given and listens for a request from the Gem host.
# The request should be a GET request to the root path and contains the OTP code in the form
# of a query parameter `code`. The listener will return the code which will be used as the OTP for
# API requests.
#
# Types of responses sent by the listener after receiving a request:
#   - 200 OK: OTP code was successfully retrieved
#   - 204 No Content: If the request was an OPTIONS request
#   - 400 Bad Request: If the request did not contain a query parameter `code`
#   - 404 Not Found: The request was not to the root path
#   - 405 Method Not Allowed: OTP code was not retrieved because the request was not a GET/OPTIONS request
#
# Example usage:
#
#   thread = Gem::WebauthnListener.listener_thread("https://rubygems.example", server)
#   thread.join
#   otp = thread[:otp]
#   error = thread[:error]
#

module Gem::GemcutterUtilities
  class WebauthnListener
    attr_reader :host

    def initialize(host)
      @host = host
    end

    def self.listener_thread(host, server)
      Thread.new do
        thread = Thread.current
        thread.abort_on_exception = true
        thread.report_on_exception = false
        thread[:otp] = new(host).wait_for_otp_code(server)
      rescue Gem::WebauthnVerificationError => e
        thread[:error] = e
      ensure
        server.close
      end
    end

    def wait_for_otp_code(server)
      loop do
        socket = server.accept
        request_line = socket.gets

        method, req_uri, _protocol = request_line.split(" ")
        req_uri = URI.parse(req_uri)

        responder = SocketResponder.new(socket)

        unless root_path?(req_uri)
          responder.send(NotFoundResponse.for(host))
          raise Gem::WebauthnVerificationError, "Page at #{req_uri.path} not found."
        end

        case method.upcase
        when "OPTIONS"
          responder.send(NoContentResponse.for(host))
          next # will be GET
        when "GET"
          if otp = parse_otp_from_uri(req_uri)
            responder.send(OkResponse.for(host))
            return otp
          end
          responder.send(BadRequestResponse.for(host))
          raise Gem::WebauthnVerificationError, "Did not receive OTP from #{host}."
        else
          responder.send(MethodNotAllowedResponse.for(host))
          raise Gem::WebauthnVerificationError, "Invalid HTTP method #{method.upcase} received."
        end
      end
    end

    private

    def root_path?(uri)
      uri.path == "/"
    end

    def parse_otp_from_uri(uri)
      require "cgi"

      return if uri.query.nil?
      CGI.parse(uri.query).dig("code", 0)
    end

    class SocketResponder
      def initialize(socket)
        @socket = socket
      end

      def send(response)
        @socket.print response.to_s
        @socket.close
      end
    end
  end
end

Youez - 2016 - github.com/yon3zu
LinuXploit