agent2agentSourceA2ASSEStream

class Stream

Async-native SSE body built on Protocol::HTTP::Body::Writable.

Falcon's protocol-rack passes Protocol::HTTP::Body::Readable subclasses through untouched — no Enumerable wrapping, no intermediate buffering. This gives us true async streaming with backpressure for free.

The gospel (protocol-http) teaches:

Usage:

stream = A2A::SSE::Stream.new

Async do stream.event("task" => { ... }) stream.event("statusUpdate" => { ... }) stream.finish end

Return as Rack body — Falcon streams it natively

[200, "content-type" => "text/event-stream" , stream]

Definitions

def event(data, type: nil, id: nil)

Emit an SSE event.

Implementation

def event(data, type: nil, id: nil)
  payload = data.is_a?(String) ? data : JSON.generate(data)

  buf = String.new
  buf << "event: #{type}\n" if type
  buf << "id: #{id}\n" if id

  # SSE spec: each line of data gets its own `data:` prefix.
  # For single-line JSON this is one line; multi-line is handled correctly.
  payload.each_line do |line|
    buf << "data: #{line.chomp}\n"
  end
  buf << "\n" # blank line terminates the event

  write(buf)
end

def finish

Signal end-of-stream. The reader will receive nil on next read(), closing the SSE connection.

Implementation

def finish
  close_write
end

def self.headers

Convenience: the SSE headers to return in the Rack response.

Implementation

def self.headers
  SSE_HEADERS
end