class Stream
Async-native SSE body built on Protocol::HTTP::Body::Writable.
Falcon's protocol-rack passes Protocol::HTTP::Body::Readable subclasses through untouched — no Enumerable wrapping, no intermediate buffering. This gives us true async streaming with backpressure for free.
The gospel (protocol-http) teaches:
- Writable is a producer/consumer queue body
- write() pushes chunks; read() pops them (blocks when empty)
- close_write signals EOF (reader gets nil)
- Client disconnect raises on next write()
Usage:
stream = A2A::SSE::Stream.new
Async do
stream.event("task" => { ... })
stream.event("statusUpdate" => { ... })
stream.finish
end
Return as Rack body — Falcon streams it natively
[200, "content-type" => "text/event-stream" , stream]
Definitions
def event(data, type: nil, id: nil)
Emit an SSE event.
Implementation
def event(data, type: nil, id: nil)
payload = data.is_a?(String) ? data : JSON.generate(data)
buf = String.new
buf << "event: #{type}\n" if type
buf << "id: #{id}\n" if id
# SSE spec: each line of data gets its own `data:` prefix.
# For single-line JSON this is one line; multi-line is handled correctly.
payload.each_line do |line|
buf << "data: #{line.chomp}\n"
end
buf << "\n" # blank line terminates the event
write(buf)
end
def finish
Signal end-of-stream. The reader will receive nil on next read(), closing the SSE connection.
Implementation
def finish
close_write
end
def self.headers
Convenience: the SSE headers to return in the Rack response.
Implementation
def self.headers
SSE_HEADERS
end