agent2agentSourceA2AStorePubSub

class PubSub

Fiber-safe pub/sub for task update streaming.

Following the gospel (async-job / protocol-http):

Each subscriber gets an Async::Queue. State mutations push events to all subscribers for the affected task. Terminal states close all subscribers for that task.

Usage:

pub_sub = A2A::Store::PubSub.new

Subscribe (returns an Async::Queue)

queue = pub_sub.subscribe("task-123")

In another fiber, consume events:

Async do while event = queue.dequeue process(event) end end

Publish an event to all subscribers:

pub_sub.notify("task-123", type: :status, data: { ... })

Close all subscribers for a task (terminal state):

pub_sub.close("task-123")

Definitions

def subscribe(task_id)

Subscribe to updates for a task. Returns an Async::Queue that will receive events. A nil sentinel signals end of stream.

Implementation

def subscribe(task_id)
  queue = Async::Queue.new
  @subscribers[task_id] << queue
  queue
end

def unsubscribe(task_id, queue)

Remove a specific subscriber queue.

Implementation

def unsubscribe(task_id, queue)
  @subscribers[task_id].delete(queue)
  @subscribers.delete(task_id) if @subscribers[task_id].empty?
end

def notify(task_id, event)

Push an event to all subscribers for a task.

Implementation

def notify(task_id, event)
  @subscribers[task_id].each do |queue|
    queue.enqueue(event)
  end
end

def close(task_id)

Close all subscribers for a task. Sends nil sentinel and removes all subscriptions.

Implementation

def close(task_id)
  @subscribers[task_id].each do |queue|
    queue.enqueue(nil) # sentinel: end of stream
  end
  @subscribers.delete(task_id)
end

def subscriber_count(task_id)

Number of active subscribers for a task.

Implementation

def subscriber_count(task_id)
  @subscribers[task_id].size
end

def task_count

Total number of tasks with active subscribers.

Implementation

def task_count
  @subscribers.size
end