OPVS Protocol

Agent Executor

The Agent Executor

The core logic of how an OPVS agent processes requests and generates responses/events is handled by an Agent Executor. The OPVS Python SDK provides an abstract base class `opvs.server.agent_execution.AgentExecutor` that you implement.

AgentExecutor Interface

The `AgentExecutor` class defines two primary methods:

  • `async def execute(self, context: RequestContext, event_queue: EventQueue)`: Handles incoming requests that expect a response or a stream of events. It processes the user's input (available via `context`) and uses the `event_queue` to send back `Message`, `Task`, `TaskStatusUpdateEvent`, or `TaskArtifactUpdateEvent` objects.
  • `async def cancel(self, context: RequestContext, event_queue: EventQueue)`: Handles requests to cancel an ongoing task.

The `RequestContext` provides information about the incoming request, such as the user's message and any existing task details. The `EventQueue` is used by the executor to send events back to the client.

Helloworld Agent Executor

Let's look at `agent_executor.py`. It defines `HelloWorldAgentExecutor`.

  • The Agent (HelloWorldAgent): This is a simple helper class that encapsulates the actual "business logic".
example
python
class HelloWorldAgent:
    """Hello World Agent."""

    async def invoke(self) -> str:
        """Invoke the Hello World agent to generate a response."""
        return 'Hello, World!'

It has a simple `invoke` method that returns the string "Hello, World!".

  • The Executor (HelloWorldAgentExecutor): This class implements the `AgentExecutor` interface.
  • __init__:
example
python
class HelloWorldAgentExecutor(AgentExecutor):
    """Test AgentProxy Implementation."""

    def __init__(self) -> None:
        self.agent = HelloWorldAgent()

It instantiates the `HelloWorldAgent`.

  • execute:
example
python
async def execute(
    self,
    context: RequestContext,
    event_queue: EventQueue,
) -> None:
    """Execute the agent process and enqueue the final response."""
    task = context.current_task or new_task(context.message)
    await event_queue.enqueue_event(task)

    await event_queue.enqueue_event(
        TaskStatusUpdateEvent(
            task_id=context.task_id,
            context_id=context.context_id,
            status=TaskStatus(
                state=TaskState.TASK_STATE_WORKING,
                message=new_agent_text_message('Processing request...'),
            ),
        )
    )

    result = await self.agent.invoke()

    await event_queue.enqueue_event(
        TaskArtifactUpdateEvent(
            task_id=context.task_id,
            context_id=context.context_id,
            artifact=new_text_artifact(name='result', text=result),
        )
    )
    await event_queue.enqueue_event(
        TaskStatusUpdateEvent(
            task_id=context.task_id,
            context_id=context.context_id,
            status=TaskStatus(state=TaskState.TASK_STATE_COMPLETED),
        )
    )

When a `message/send` or `message/stream` request comes in (both are handled by `execute` in this simplified executor):

  • a. It retrieves the current task from the context or creates a new one, enqueuing it as the first event.
  • b. It enqueues a `TaskStatusUpdateEvent` with a state of `TASK_STATE_WORKING` to indicate the agent has begun processing.
  • c. It calls `self.agent.invoke()` to execute the actual business logic (which simply returns "Hello, World!").
  • d. It enqueues a `TaskArtifactUpdateEvent` containing the result text.
  • e. Finally, it enqueues a `TaskStatusUpdateEvent` with a state of `TASK_STATE_COMPLETED` to conclude the task.
  • cancel: The Hello World example's `cancel` method simply raises an exception, indicating that cancellation is not supported for this basic agent.
example
python
async def cancel(
    self, context: RequestContext, event_queue: EventQueue
) -> None:
    """Raise exception as cancel is not supported."""
    raise Exception('cancel not supported')

The `AgentExecutor` acts as the bridge between the OPVS protocol (managed by the request handler and server application) and your agent's specific logic. It receives context about the request and uses an event queue to communicate results or updates back.