async-matrixSourceAsyncMatrixClient

class Client

Async HTTP client for the Matrix Client-Server API.

Every outbound request is authenticated with the appservice as_token. All methods are fiber-safe and run naturally inside Falcon's async reactor.

client = Async::Matrix::Client.new(config) client.send_text("!room:example.com", "Hello world") client.join_room("!room:example.com")

Definitions

DEFAULT_MAX_RETRIES = 3

Retry defaults

RATE_LIMIT_STATUS = 429

Status codes eligible for retry

DEFAULT_RESPONSE_SIZE_LIMIT = 50 * 1024 * 1024

Response size limits (bytes)

def api

Returns a Gateway that provides method-chained access to every Matrix Client-Server API endpoint. Chains are validated against the official OpenAPI path tree and terminated by .get(), .post(), .put(), or .delete().

client.api.account.whoami.get client.api.createRoom.post(name: "Pub") client.api.rooms("!room:ex.com").ban.post(user_id: "@bad:ex.com") client.api.rooms("!room:ex.com").messages.get(dir: "b", limit: 10)

Implementation

def api
  Api::Gateway.new(self)
end

def media

Returns a Gateway rooted at /_matrix/media/v3 for media operations. Binary routes (upload/download/thumbnail) are automatically detected by the Chain and dispatched to the MediaClient.

client.media.upload.post(bytes, content_type: "image/png") client.media.download("example.com", "abc123").get client.media.thumbnail("example.com", "abc123").get(width: 64, height: 64)

Implementation

def media
  Api::Gateway.new(self, prefix: %w[_matrix media v3])
end

def media_client

Returns the binary media client used for upload/download operations. Lazily initialized, shares the same config as this client.

Implementation

def media_client
  @media_client ||= MediaClient.new(@config)
end

def read_limited(response, limit)

Read the response body with a size limit. Raises ResponseTooLargeError if the body exceeds the limit. Checks Content-Length first (fast path), then enforces during streaming read (safe path).

Implementation

def read_limited(response, limit)
  body = response.body
  return nil unless body

  # Fast path: reject immediately if Content-Length exceeds limit
  if body.respond_to?(:length) && body.length && body.length > limit
    body.close
    raise ResponseTooLargeError.new(
      "M_TOO_LARGE",
      "Response Content-Length #{body.length} bytes exceeds limit of #{limit} bytes"
    )
  end

  # Streaming read with enforcement
  buffer = String.new(encoding: Encoding::BINARY)
  body.each do |chunk|
    buffer << chunk
    if buffer.bytesize > limit
      body.close
      raise ResponseTooLargeError.new(
        "M_TOO_LARGE",
        "Response body exceeds limit of #{limit} bytes"
      )
    end
  end
  buffer.empty? ? nil : buffer
end

def retryable_status?(status)

Whether the given HTTP status code should trigger a retry. 429 is only retried if @ignore_rate_limit is false. 502/503/504 are always retried.

Implementation

def retryable_status?(status)
  if status == RATE_LIMIT_STATUS
    !@ignore_rate_limit
  else
    GATEWAY_ERROR_STATUSES.include?(status)
  end
end

def compute_retry_delay(status, response, attempt)

Compute the delay before the next retry attempt.

For 429 (rate-limited): use the server's Retry-After header if present, falling back to exponential backoff. The value is capped but not jittered — the server is telling us exactly when to come back.

For 502/503/504 (gateway errors): exponential backoff with full jitter. Full jitter means rand(0..calculated), which is the AWS-recommended approach to avoid thundering herd on shared homeservers.

Implementation

def compute_retry_delay(status, response, attempt)
  if status == RATE_LIMIT_STATUS
    server_delay = parse_retry_after(response)
    delay = server_delay || exponential_delay(attempt)
    [delay, @max_retry_delay].min
  else
    calculated = exponential_delay(attempt)
    rand(0.0..[calculated, @max_retry_delay].min)
  end
end

def exponential_delay(attempt)

Base * 2^(attempt-1): 0.5, 1.0, 2.0, 4.0, ...

Implementation

def exponential_delay(attempt)
  @retry_base_delay * (2 ** (attempt - 1))
end

def parse_retry_after(response)

Parse the Retry-After header. Supports both delta-seconds ("120") and HTTP-date ("Fri, 31 Dec 2026 23:59:59 GMT") formats per RFC 9110. Returns seconds to wait as a Float, or nil if absent/unparseable.

Implementation

def parse_retry_after(response)
  value = response.headers["retry-after"]
  return nil unless value

  value = value.strip
  if value.match?(/\A\d+\z/)
    value.to_f
  else
    # HTTP-date format
    begin
      target = Time.httpdate(value)
      delay = target - Time.now
      delay > 0 ? delay : 0.0
    rescue ArgumentError
      nil
    end
  end
end