inspect_ai.util
Store
Store
The Store is used to record state and state changes.
The TaskState for each sample has a Store which can be used when solvers and/or tools need to coordinate changes to shared state. The Store can be accessed directly from the TaskState via state.store or can be accessed using the store() global function.
Note that changes to the store that occur are automatically recorded to transcript as a StoreEvent. In order to be serialised to the transcript, values and objects must be JSON serialisable (you can make objects with several fields serialisable using the @dataclass decorator or by inheriting from Pydantic BaseModel)
class StoreMethods
- get
-
Get a value from the store.
Provide a
defaultto automatically initialise a named store value with the default when it does not yet exist.def get(self, key: str, default: VT | None = None) -> VT | Anykeystr-
Name of value to get
defaultVT | None-
Default value (defaults to
None)
- set
-
Set a value into the store.
def set(self, key: str, value: Any) -> Nonekeystr-
Name of value to set
valueAny-
Value to set
- delete
-
Remove a value from the store.
def delete(self, key: str) -> Nonekeystr-
Name of value to remove
- keys
-
View of keys within the store.
def keys(self) -> KeysView[str] - values
-
View of values within the store.
def values(self) -> ValuesView[Any] - items
-
View of items within the store.
def items(self) -> ItemsView[str, Any]
store
Get the currently active Store.
def store() -> Storestore_as
Get a Pydantic model interface to the store.
def store_as(model_cls: Type[SMT], instance: str | None = None) -> SMTmodel_clsType[SMT]-
Pydantic model type (must derive from StoreModel)
instancestr | None-
Optional instance name for store (enables multiple instances of a given StoreModel type within a single sample)
StoreModel
Store backed Pydandic BaseModel.
The model is initialised from a Store, so that Store should either already satisfy the validation constraints of the model OR you should provide Field(default=) annotations for all of your model fields (the latter approach is recommended).
class StoreModel(BaseModel)Limits
message_limit
Limits the number of messages in a conversation.
The total number of messages in the conversation are compared to the limit (not just “new” messages).
These limits can be stacked.
This relies on “cooperative” checking - consumers must call check_message_limit() themselves whenever the message count is updated.
When a limit is exceeded, a LimitExceededError is raised.
def message_limit(limit: int | None) -> _MessageLimitlimitint | None-
The maximum conversation length (number of messages) allowed while the context manager is open. A value of None means unlimited messages.
token_limit
Limits the total number of tokens which can be used.
The counter starts when the context manager is opened and ends when it is closed.
These limits can be stacked.
This relies on “cooperative” checking - consumers must call check_token_limit() themselves whenever tokens are consumed.
When a limit is exceeded, a LimitExceededError is raised.
def token_limit(limit: int | None) -> _TokenLimitlimitint | None-
The maximum number of tokens that can be used while the context manager is open. Tokens used before the context manager was opened are not counted. A value of None means unlimited tokens.
time_limit
Limits the wall clock time which can elapse.
The timer starts when the context manager is opened and stops when it is closed.
These limits can be stacked.
When a limit is exceeded, the code block is cancelled and a LimitExceededError is raised.
Uses anyio’s cancellation scopes meaning that the operations within the context manager block are cancelled if the limit is exceeded. The LimitExceededError is therefore raised at the level that the time_limit() context manager was opened, not at the level of the operation which caused the limit to be exceeded (e.g. a call to generate()). Ensure you handle LimitExceededError at the level of opening the context manager.
def time_limit(limit: float | None) -> _TimeLimitlimitfloat | None-
The maximum number of seconds that can pass while the context manager is open. A value of None means unlimited time.
working_limit
Limits the working time which can elapse.
Working time is the wall clock time minus any waiting time e.g. waiting before retrying in response to rate limits or waiting on a semaphore.
The timer starts when the context manager is opened and stops when it is closed.
These limits can be stacked.
When a limit is exceeded, a LimitExceededError is raised.
def working_limit(limit: float | None) -> _WorkingLimitlimitfloat | None-
The maximum number of seconds of working that can pass while the context manager is open. A value of None means unlimited time.
apply_limits
Apply a list of limits within a context manager.
Optionally catches any LimitExceededError raised by the applied limits, while allowing other limit errors from any other scope (e.g. the Sample level) to propagate.
Yields a LimitScope object which can be used once the context manager is closed to determine which, if any, limits were exceeded.
@contextmanager
def apply_limits(
limits: list[Limit], catch_errors: bool = False
) -> Iterator[LimitScope]limitslist[Limit]-
List of limits to apply while the context manager is open. Should a limit be exceeded, a LimitExceededError is raised.
catch_errorsbool-
If True, catch any LimitExceededError raised by the applied limits. Callers can determine whether any limits were exceeded by checking the limit_error property of the
LimitScopeobject yielded by this function. If False, all LimitExceededError exceptions will be allowed to propagate.
sample_limits
Get the top-level limits applied to the current Sample.
def sample_limits() -> SampleLimitsSampleLimits
Data class to hold the limits applied to a Sample.
This is used to return the limits from sample_limits().
@dataclass
class SampleLimitsAttributes
Limit
Base class for all limit context managers.
class Limit(abc.ABC)Attributes
limitfloat | None-
The value of the limit being applied.
Can be None which represents no limit.
usagefloat-
The current usage of the resource being limited.
remainingfloat | None-
The remaining “unused” amount of the resource being limited.
Returns None if the limit is None.
LimitExceededError
Exception raised when a limit is exceeded.
In some scenarios this error may be raised when value >= limit to prevent another operation which is guaranteed to exceed the limit from being wastefully performed.
class LimitExceededError(Exception)Concurrency
concurrency
Concurrency context manager.
A concurrency context can be used to limit the number of coroutines executing a block of code (e.g calling an API). For example, here we limit concurrent calls to an api (‘api-name’) to 10:
async with concurrency("api-name", 10):
# call the apiNote that concurrency for model API access is handled internally via the max_connections generation config option. Concurrency for launching subprocesses is handled via the subprocess function.
@contextlib.asynccontextmanager
async def concurrency(
name: str, concurrency: int, key: str | None = None, visible: bool = True
) -> AsyncIterator[None]namestr-
Name for concurrency context. This serves as the display name for the context, and also the unique context key (if the
keyparameter is omitted) concurrencyint-
Maximum number of coroutines that can enter the context.
keystr | None-
Unique context key for this context. Optional. Used if the unique key isn’t human readable – e.g. includes api tokens or account ids so that the more readable
namecan be presented to users e.g in console UI> visiblebool-
Should context utilization be visible in the status bar.
subprocess
Execute and wait for a subprocess.
Convenience method for solvers, scorers, and tools to launch subprocesses. Automatically enforces a limit on concurrent subprocesses (defaulting to os.cpu_count() but controllable via the max_subprocesses eval config option).
async def subprocess(
args: str | list[str],
text: bool = True,
input: str | bytes | memoryview | None = None,
cwd: str | Path | None = None,
env: dict[str, str] = {},
capture_output: bool = True,
output_limit: int | None = None,
timeout: int | None = None,
concurrency: bool = True,
) -> Union[ExecResult[str], ExecResult[bytes]]argsstr | list[str]-
Command and arguments to execute.
textbool-
Return stdout and stderr as text (defaults to True)
inputstr | bytes | memoryview | None-
Optional stdin for subprocess.
cwdstr | Path | None-
Switch to directory for execution.
envdict[str, str]-
Additional environment variables.
capture_outputbool-
Capture stderr and stdout into ExecResult (if False, then output is redirected to parent stderr/stdout)
output_limitint | None-
Maximum bytes to retain from stdout/stderr. If output exceeds this limit, only the most recent bytes are kept (older output is discarded). The process continues to completion.
timeoutint | None-
Timeout. If the timeout expires then a
TimeoutErrorwill be raised. concurrencybool-
Request that the concurrency() function is used to throttle concurrent subprocesses.
ExecResult
Execution result from call to subprocess().
@dataclass
class ExecResult(Generic[T])Attributes
successbool-
Did the process exit with success.
returncodeint-
Return code from process exit.
stdoutT-
Contents of stdout.
stderrT-
Contents of stderr.
Display
display_counter
Display a counter in the UI.
def display_counter(caption: str, value: str) -> Nonecaptionstr-
The counter’s caption e.g. “HTTP rate limits”.
valuestr-
The counter’s value e.g. “42”.
display_type
Get the current console display type.
def display_type() -> DisplayTypeDisplayType
Console display type.
DisplayType = Literal["full", "conversation", "rich", "plain", "log", "none"]input_screen
Input screen for receiving user input.
Context manager that clears the task display and provides a screen for receiving console input.
@contextmanager
def input_screen(
header: str | None = None,
transient: bool | None = None,
width: int | None = None,
) -> Iterator[Console]headerstr | None-
Header line to print above console content (defaults to printing no header)
transientbool | None-
Return to task progress display after the user completes input (defaults to
Truefor normal sessions andFalsewhen trace mode is enabled). widthint | None-
Input screen width in characters (defaults to full width)
Utilities
span
Context manager for establishing a transcript span.
@contextlib.asynccontextmanager
async def span(name: str, *, type: str | None = None) -> AsyncIterator[None]namestr-
Step name.
typestr | None-
Optional span type.
collect
Run and collect the results of one or more async coroutines.
Similar to asyncio.gather(), but also works when Trio is the async backend.
Automatically includes each task in a span(), which ensures that its events are grouped together in the transcript.
Using collect() in preference to asyncio.gather() is highly recommended for both Trio compatibility and more legible transcript output.
async def collect(*tasks: Awaitable[T]) -> list[T]*tasksAwaitable[T]-
Tasks to run
resource
Read and resolve a resource to a string.
Resources are often used for templates, configuration, etc. They are sometimes hard-coded strings, and sometimes paths to external resources (e.g. in the local filesystem or remote stores e.g. s3:// or https://).
The resource() function will resolve its argument to a resource string. If a protocol-prefixed file name (e.g. s3://) or the path to a local file that exists is passed then it will be read and its contents returned. Otherwise, it will return the passed str directly This function is mostly intended as a helper for other functions that take either a string or a resource path as an argument, and want to easily resolve them to the underlying content.
If you want to ensure that only local or remote files are consumed, specify type="file". For example: resource("templates/prompt.txt", type="file")
def resource(
resource: str,
type: Literal["auto", "file"] = "auto",
fs_options: dict[str, Any] = {},
) -> strresourcestr-
Path to local or remote (e.g. s3://) resource, or for
type="auto"(the default), a string containing the literal resource value. typeLiteral['auto', 'file']-
For “auto” (the default), interpret the resource as a literal string if its not a valid path. For “file”, always interpret it as a file path.
fs_optionsdict[str, Any]-
Optional. Additional arguments to pass through to the
fsspecfilesystem provider (e.g.S3FileSystem). Use{"anon": True }if you are accessing a public S3 bucket with no credentials.
throttle
Throttle a function to ensure it is called no more than every n seconds.
def throttle(seconds: float) -> Callable[..., Any]secondsfloat-
Throttle time.
background
Run an async function in the background of the current sample.
Background functions must be run from an executing sample. The function will run as long as the current sample is running.
When the sample terminates, an anyio cancelled error will be raised in the background function. To catch this error and cleanup:
import anyio
async def run():
try:
# background code
except anyio.get_cancelled_exc_class():
...def background(
func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
*args: Unpack[PosArgsT],
) -> NonefuncCallable[[Unpack[PosArgsT]], Awaitable[Any]]-
Async function to run
*argsUnpack[PosArgsT]-
Optional function arguments.
trace_action
Trace a long running or poentially unreliable action.
Trace actions for which you want to collect data on the resolution (e.g. succeeded, cancelled, failed, timed out, etc.) and duration of.
Traces are written to the TRACE log level (which is just below HTTP and INFO). List and read trace logs with inspect trace list and related commands (see inspect trace --help for details).
@contextmanager
def trace_action(
logger: Logger, action: str, message: str, *args: Any, **kwargs: Any
) -> Generator[None, None, None]loggerLogger-
Logger to use for tracing (e.g. from
getLogger(__name__)) actionstr-
Name of action to trace (e.g. ‘Model’, ‘Subprocess’, etc.)
messagestr-
Message describing action (can be a format string w/ args or kwargs)
*argsAny-
Positional arguments for
messageformat string. **kwargsAny-
Named args for
messageformat string.
trace_message
Log a message using the TRACE log level.
The TRACE log level is just below HTTP and INFO). List and read trace logs with inspect trace list and related commands (see inspect trace --help for details).
def trace_message(
logger: Logger, category: str, message: str, *args: Any, **kwargs: Any
) -> NoneloggerLogger-
Logger to use for tracing (e.g. from
getLogger(__name__)) categorystr-
Category of trace message.
messagestr-
Trace message (can be a format string w/ args or kwargs)
*argsAny-
Positional arguments for
messageformat string. **kwargsAny-
Named args for
messageformat string.
Sandbox
sandbox
Get the SandboxEnvironment for the current sample.
def sandbox(name: str | None = None) -> SandboxEnvironmentnamestr | None-
Optional sandbox environment name.
sandbox_with
Get the SandboxEnvironment for the current sample that has the specified file.
async def sandbox_with(
file: str, on_path: bool = False, *, name: str | None = None
) -> SandboxEnvironment | Nonefilestr-
Path to file to check for if on_path is False. If on_path is True, file should be a filename that exists on the system path.
on_pathbool-
If True, file is a filename to be verified using “which”. If False, file is a path to be checked within the sandbox environments.
namestr | None-
Optional sandbox environment name.
sandbox_default
Set the default sandbox environment for the current context.
@contextmanager
def sandbox_default(name: str) -> Iterator[None]namestr-
Sandbox to set as the default.
SandboxEnvironment
Environment for executing arbitrary code from tools.
Sandbox environments provide both an execution environment as well as a per-sample filesystem context to copy samples files into and resolve relative paths to.
class SandboxEnvironment(abc.ABC)Methods
- exec
-
Execute a command within a sandbox environment.
The current working directory for execution will be the per-sample filesystem context.
Each output stream (stdout and stderr) is limited to 10 MiB. If exceeded, an
OutputLimitExceededErrorwill be raised.@abc.abstractmethod async def exec( self, cmd: list[str], input: str | bytes | None = None, cwd: str | None = None, env: dict[str, str] = {}, user: str | None = None, timeout: int | None = None, timeout_retry: bool = True, concurrency: bool = True, ) -> ExecResult[str]cmdlist[str]-
Command or command and arguments to execute.
inputstr | bytes | None-
Standard input (optional).
cwdstr | None-
Current working dir (optional). If relative, will be relative to the per-sample filesystem context.
envdict[str, str]-
Environment variables for execution.
userstr | None-
Optional username or UID to run the command as.
timeoutint | None-
Optional execution timeout (seconds).
timeout_retrybool-
Retry the command in the case that it times out. Commands will be retried up to twice, with a timeout of no greater than 60 seconds for the first retry and 30 for the second.
concurrencybool-
For sandboxes that run locally, request that the concurrency() function be used to throttle concurrent subprocesses.
- write_file
-
Write a file into the sandbox environment.
If the parent directories of the file path do not exist they should be automatically created.
@abc.abstractmethod async def write_file(self, file: str, contents: str | bytes) -> Nonefilestr-
Path to file (relative file paths will resolve to the per-sample working directory).
contentsstr | bytes-
Text or binary file contents.
- read_file
-
Read a file from the sandbox environment.
File size is limited to 100 MiB.
When reading text files, implementations should preserve newline constructs (e.g. crlf should be preserved not converted to lf). This is equivalent to specifying
newline=""in a call to the Pythonopen()function.@abc.abstractmethod async def read_file(self, file: str, text: bool = True) -> Union[str | bytes]filestr-
Path to file (relative file paths will resolve to the per-sample working directory).
textbool-
Read as a utf-8 encoded text file.
- connection
-
Information required to connect to sandbox environment.
async def connection(self, *, user: str | None = None) -> SandboxConnectionuserstr | None-
User to login as.
- as_type
-
Verify and return a reference to a subclass of SandboxEnvironment.
def as_type(self, sandbox_cls: Type[ST]) -> STsandbox_clsType[ST]-
Class of sandbox (subclass of SandboxEnvironment)
- default_polling_interval
-
Polling interval for sandbox service requests.
def default_polling_interval(self) -> float - default_concurrency
-
Default max_sandboxes for this provider (
Nonemeans no maximum)@classmethod def default_concurrency(cls) -> int | None - task_init
-
Called at task startup initialize resources.
@classmethod async def task_init( cls, task_name: str, config: SandboxEnvironmentConfigType | None ) -> Nonetask_namestr-
Name of task using the sandbox environment.
configSandboxEnvironmentConfigType | None-
Implementation defined configuration (optional).
- task_init_environment
-
Called at task startup to identify environment variables required by task_init for a sample.
Return 1 or more environment variables to request a dedicated call to task_init for samples that have exactly these environment variables (by default there is only one call to task_init for all of the samples in a task if they share a sandbox configuration).
This is useful for situations where config files are dynamic (e.g. through sample metadata variable interpolation) and end up yielding different images that need their own init (e.g. ‘docker pull’).
@classmethod async def task_init_environment( cls, config: SandboxEnvironmentConfigType | None, metadata: dict[str, str] ) -> dict[str, str]configSandboxEnvironmentConfigType | None-
Implementation defined configuration (optional).
metadatadict[str, str]-
metadata: Sample
metadatafield
- sample_init
-
Initialize sandbox environments for a sample.
@classmethod async def sample_init( cls, task_name: str, config: SandboxEnvironmentConfigType | None, metadata: dict[str, str], ) -> dict[str, "SandboxEnvironment"]task_namestr-
Name of task using the sandbox environment.
configSandboxEnvironmentConfigType | None-
Implementation defined configuration (optional).
metadatadict[str, str]-
Sample
metadatafield
- sample_cleanup
-
Cleanup sandbox environments.
@classmethod @abc.abstractmethod async def sample_cleanup( cls, task_name: str, config: SandboxEnvironmentConfigType | None, environments: dict[str, "SandboxEnvironment"], interrupted: bool, ) -> Nonetask_namestr-
Name of task using the sandbox environment.
configSandboxEnvironmentConfigType | None-
Implementation defined configuration (optional).
environmentsdict[str, 'SandboxEnvironment']-
Sandbox environments created for this sample.
interruptedbool-
Was the task interrupted by an error or cancellation
- task_cleanup
-
Called at task exit as a last chance to cleanup resources.
@classmethod async def task_cleanup( cls, task_name: str, config: SandboxEnvironmentConfigType | None, cleanup: bool ) -> Nonetask_namestr-
Name of task using the sandbox environment.
configSandboxEnvironmentConfigType | None-
Implementation defined configuration (optional).
cleanupbool-
Whether to actually cleanup environment resources (False if
--no-sandbox-cleanupwas specified)
- cli_cleanup
-
Handle a cleanup invoked from the CLI (e.g. inspect sandbox cleanup).
@classmethod async def cli_cleanup(cls, id: str | None) -> Noneidstr | None-
Optional ID to limit scope of cleanup.
- config_files
-
Standard config files for this provider (used for automatic discovery)
@classmethod def config_files(cls) -> list[str] - config_deserialize
-
Deserialize a sandbox-specific configuration model from a dict.
Override this method if you support a custom configuration model.
A basic implementation would be:
return MySandboxEnvironmentConfig(**config)@classmethod def config_deserialize(cls, config: dict[str, Any]) -> BaseModelconfigdict[str, Any]-
Configuration dictionary produced by serializing the configuration model.
SandboxConnection
Information required to connect to sandbox.
class SandboxConnection(BaseModel)Attributes
typestr-
Sandbox type name (e.g. ‘docker’, ‘local’, etc.)
commandstr-
Shell command to connect to sandbox.
vscode_commandlist[Any] | None-
Optional vscode command (+args) to connect to sandbox.
portslist[PortMapping] | None-
Optional list of port mappings into container
containerstr | None-
Optional container name (does not apply to all sandboxes).
sandboxenv
Decorator for registering sandbox environments.
def sandboxenv(name: str) -> Callable[..., Type[T]]namestr-
Name of SandboxEnvironment type
sandbox_service
Run a service that is callable from within a sandbox.
The service makes available a set of methods to a sandbox for calling back into the main Inspect process.
To use the service from within a sandbox, either add it to the sys path or use importlib. For example, if the service is named ‘foo’:
import sys
sys.path.append("/var/tmp/sandbox-services/foo")
import fooOr:
import importlib.util
spec = importlib.util.spec_from_file_location(
"foo", "/var/tmp/sandbox-services/foo/foo.py"
)
foo = importlib.util.module_from_spec(spec)
spec.loader.exec_module(foo)async def sandbox_service(
name: str,
methods: list[SandboxServiceMethod] | dict[str, SandboxServiceMethod],
until: Callable[[], bool],
sandbox: SandboxEnvironment,
user: str | None = None,
instance: str | None = None,
polling_interval: float | None = None,
started: anyio.Event | None = None,
requires_python: bool = True,
) -> Nonenamestr-
Service name
methodslist[SandboxServiceMethod] | dict[str, SandboxServiceMethod]-
Service methods.
untilCallable[[], bool]-
Function used to check whether the service should stop.
sandboxSandboxEnvironment-
Sandbox to publish service to.
userstr | None-
User to login as. Defaults to the sandbox environment’s default user.
instancestr | None-
If you want multiple instances of a service in a single sandbox then use the
instanceparam. polling_intervalfloat | None-
Polling interval for request checking. If not specified uses sandbox specific default (2 seconds if not specified, 0.2 seconds for Docker).
startedanyio.Event | None-
Event to set when service has been started
requires_pythonbool-
Does the sandbox service require Python? Note that ALL sandbox services require Python unless they’ve injected an alternate implementation of the sandbox service client code.
Registry
registry_info
Lookup RegistryInfo for an object.
def registry_info(o: object) -> RegistryInfooobject-
Object to lookup info for
registry_create
Create a registry object.
Creates objects registered via decorator (e.g. @task, @solver). Note that this can also create registered objects within Python packages, in which case the name of the package should be used a prefix, e.g.
registry_create("scorer", "mypackage/myscorer", ...)Object within the Inspect package do not require a prefix, nor do objects from imported modules that aren’t in a package.
def registry_create(type: RegistryType, name: str, **kwargs: Any) -> object: # type: ignore[return]typeRegistryType-
Type of registry object to create
namestr-
Name of registry object to create
**kwargsAny-
Optional creation arguments
RegistryInfo
Registry information for registered object (e.g. solver, scorer, etc.).
class RegistryInfo(BaseModel)Attributes
typeRegistryType-
Type of registry object.
namestr-
Registered name.
metadatadict[str, Any]-
Additional registry metadata.
RegistryType
Enumeration of registry object types.
These are the types of objects in this system that can be registered using a decorator (e.g. @task, @solver). Registered objects can in turn be created dynamically using the registry_create() function.
RegistryType = Literal[
"agent",
"approver",
"hooks",
"metric",
"modelapi",
"plan",
"sandboxenv",
"score_reducer",
"scorer",
"solver",
"task",
"tool",
"loader",
"scanner",
"scanjob",
]JSON
JSONType
Valid types within JSON schema.
JSONType = Literal["string", "integer", "number", "boolean", "array", "object", "null"]JSONSchema
JSON Schema for type.
class JSONSchema(BaseModel)Attributes
typeJSONType | None-
JSON type of tool parameter.
formatstr | None-
Format of the parameter (e.g. date-time).
descriptionstr | None-
Parameter description.
defaultAny-
Default value for parameter.
enumlist[Any] | None-
Valid values for enum parameters.
itemsOptional[JSONSchema]-
Valid type for array parameters.
propertiesdict[str, JSONSchema] | None-
Valid fields for object parametrs.
additionalPropertiesOptional[JSONSchema] | bool | None-
Are additional properties allowed?
anyOflist[JSONSchema] | None-
Valid types for union parameters.
requiredlist[str] | None-
Required fields for object parameters.
json_schema
Provide a JSON Schema for the specified type.
Schemas can be automatically inferred for a wide variety of Python class types including Pydantic BaseModel, dataclasses, and typed dicts.
def json_schema(t: Type[Any]) -> JSONSchematType[Any]-
Python type