class PubSub
Fiber-safe pub/sub for task update streaming.
Following the gospel (async-job / protocol-http):
- Async::Queue is fiber-safe (no locks needed)
- enqueue/dequeue yield the fiber cooperatively
- nil sentinel signals end of stream
Each subscriber gets an Async::Queue. State mutations push events to all subscribers for the affected task. Terminal states close all subscribers for that task.
Usage:
pub_sub = A2A::Store::PubSub.new
Subscribe (returns an Async::Queue)
queue = pub_sub.subscribe("task-123")
In another fiber, consume events:
Async do while event = queue.dequeue process(event) end end
Publish an event to all subscribers:
pub_sub.notify("task-123", type: :status, data: { ... })
Close all subscribers for a task (terminal state):
pub_sub.close("task-123")
Definitions
def subscribe(task_id)
Subscribe to updates for a task. Returns an Async::Queue that will receive events. A nil sentinel signals end of stream.
Implementation
def subscribe(task_id)
queue = Async::Queue.new
@subscribers[task_id] << queue
queue
end
def unsubscribe(task_id, queue)
Remove a specific subscriber queue.
Implementation
def unsubscribe(task_id, queue)
@subscribers[task_id].delete(queue)
@subscribers.delete(task_id) if @subscribers[task_id].empty?
end
def notify(task_id, event)
Push an event to all subscribers for a task.
Implementation
def notify(task_id, event)
@subscribers[task_id].each do |queue|
queue.enqueue(event)
end
end
def close(task_id)
Close all subscribers for a task. Sends nil sentinel and removes all subscriptions.
Implementation
def close(task_id)
@subscribers[task_id].each do |queue|
queue.enqueue(nil) # sentinel: end of stream
end
@subscribers.delete(task_id)
end
def subscriber_count(task_id)
Number of active subscribers for a task.
Implementation
def subscriber_count(task_id)
@subscribers[task_id].size
end
def task_count
Total number of tasks with active subscribers.
Implementation
def task_count
@subscribers.size
end