Skip to article frontmatterSkip to article content

Dual Location Specification

Once we have specialized insertion tools, we need a way for the agent to specify where it wants to insert the hints. This is a surprisingly tricky problem. ## TODO: audit the prose to make sure its not too LLMy

One approach is to use line numbers. Line numbers are precise, but they’re also brittle. If the code changes, the line numbers might become invalid.

Another approach is to use context matching. The agent can specify a few lines of code that appear before or after the insertion point. This is more flexible, but it can be ambiguous if the same context appears in multiple places.

Our plain Python agent uses a hybrid approach. The insertion tools accept both a line_number and a context_before argument. The agent can provide either one, or both.

tools.py
"""Tool execution for plain DafnyBench implementation with hint insertion tools."""

import subprocess
import tempfile
from pathlib import Path

# Tool definitions (Anthropic API format)
INSERT_INVARIANT_TOOL = {
    "name": "insert_invariant",
    "description": "Insert a loop invariant at specified location",
    "input_schema": {
        "type": "object",
        "properties": {
            "invariant": {
                "type": "string",
                "description": "Invariant expression",
            },
            "line_number": {
                "type": "integer",
                "description": "Line number (1-indexed, optional)",
            },
            "context_before": {
                "type": "string",
                "description": "Line before insertion point (optional)",
            },
            "context_after": {
                "type": "string",
                "description": "Line after insertion point (optional)",
            },
        },
        "required": ["invariant"],
    },
}

INSERT_ASSERTION_TOOL = {
    "name": "insert_assertion",
    "description": "Insert an assertion at specified location",
    "input_schema": {
        "type": "object",
        "properties": {
            "assertion": {
                "type": "string",
                "description": "Assertion expression",
            },
            "line_number": {
                "type": "integer",
                "description": "Line number (1-indexed, optional)",
            },
            "context_before": {
                "type": "string",
                "description": "Line before insertion point (optional)",
            },
            "context_after": {
                "type": "string",
                "description": "Line after insertion point (optional)",
            },
        },
        "required": ["assertion"],
    },
}

INSERT_PRECONDITION_TOOL = {
    "name": "insert_precondition",
    "description": "Insert a function precondition (requires clause) at specified location",
    "input_schema": {
        "type": "object",
        "properties": {
            "precondition": {
                "type": "string",
                "description": "Precondition expression",
            },
            "line_number": {
                "type": "integer",
                "description": "Line number (1-indexed, optional)",
            },
            "context_before": {
                "type": "string",
                "description": "Line before insertion point (optional)",
            },
            "context_after": {
                "type": "string",
                "description": "Line after insertion point (optional)",
            },
        },
        "required": ["precondition"],
    },
}

INSERT_POSTCONDITION_TOOL = {
    "name": "insert_postcondition",
    "description": "Insert a function postcondition (ensures clause) at specified location",
    "input_schema": {
        "type": "object",
        "properties": {
            "postcondition": {
                "type": "string",
                "description": "Postcondition expression",
            },
            "line_number": {
                "type": "integer",
                "description": "Line number (1-indexed, optional)",
            },
            "context_before": {
                "type": "string",
                "description": "Line before insertion point (optional)",
            },
            "context_after": {
                "type": "string",
                "description": "Line after insertion point (optional)",
            },
        },
        "required": ["postcondition"],
    },
}

INSERT_MEASURE_TOOL = {
    "name": "insert_measure",
    "description": "Insert a termination measure (decreases clause) at specified location",
    "input_schema": {
        "type": "object",
        "properties": {
            "measure": {"type": "string", "description": "Decreases expression"},
            "line_number": {
                "type": "integer",
                "description": "Line number (1-indexed, optional)",
            },
            "context_before": {
                "type": "string",
                "description": "Line before insertion point (optional)",
            },
            "context_after": {
                "type": "string",
                "description": "Line after insertion point (optional)",
            },
        },
        "required": ["measure"],
    },
}

VERIFY_DAFNY_TOOL = {
    "name": "verify_dafny",
    "description": "Verify the current code state with all hints inserted so far. "
    "Returns verification results and full rendered code.",
    "input_schema": {
        "type": "object",
        "properties": {},  # No parameters - reads from state
    },
}

TOOLS = [
    INSERT_INVARIANT_TOOL,
    INSERT_ASSERTION_TOOL,
    INSERT_PRECONDITION_TOOL,
    INSERT_POSTCONDITION_TOOL,
    INSERT_MEASURE_TOOL,
    VERIFY_DAFNY_TOOL,
]

# ===== Phase 1: State Management =====


def get_code_state(messages: list[dict]) -> str | None:
    """Extract current code state from message history.

    Searches backwards through messages for most recent code state marker.
    Returns None if no state found (initialization case).

    Args:
        messages: Message history containing code states

    Returns:
        Current code string or None if no state found
    """
    for msg in reversed(messages):
        if msg["role"] == "user":
            # Check for list of tool_result blocks
            if isinstance(msg["content"], list):
                for block in msg["content"]:
                    if block.get("type") == "tool_result":
                        content = block.get("content", "")
                        if "=== CURRENT_CODE_STATE ===" in content:
                            # Extract code between markers
                            start = content.find("```dafny\n")
                            end = content.find("\n```", start)
                            if start != -1 and end != -1:
                                return content[start + 9 : end]
            # Check for string content with state marker
            elif isinstance(msg["content"], str):
                if "=== CURRENT_CODE_STATE ===" in msg["content"]:
                    start = msg["content"].find("```dafny\n")
                    end = msg["content"].find("\n```", start)
                    if start != -1 and end != -1:
                        return msg["content"][start + 9 : end]
    return None


def update_code_state(messages: list[dict], new_code: str) -> None:
    """Update message history with new code state.

    Appends user message containing state marker and code.
    This is called after each insertion to update the tracked state.

    Args:
        messages: Message history to update (modified in place)
        new_code: New code state to store
    """
    from evals.dafnybench.plain.config import get_config

    config = get_config()
    state_message = config.prompt.state_update_template.format(code=new_code)

    messages.append({"role": "user", "content": state_message})


# ===== Phase 2: Insertion Utilities =====


def get_indentation(line: str) -> str:
    """Extract leading whitespace from a line.

    Args:
        line: Line of code

    Returns:
        Leading whitespace string
    """
    return line[: len(line) - len(line.lstrip())]


def find_insertion_point(
    code_lines: list[str],
    line_number: int | None = None,
    context_before: str | None = None,
    context_after: str | None = None,
) -> tuple[int, str] | tuple[None, str]:
    """Find where to insert hint using line number or context matching.

    Returns:
        (line_index, indentation) on success
        (None, error_message) on failure

    Priority:
        1. If line_number provided, use it (0-indexed internally)
        2. If context_before/after provided, search for match
        3. If both provided, try line_number first, fall back to context

    Args:
        code_lines: Lines of code
        line_number: 1-indexed line number (optional)
        context_before: Line before insertion point (optional)
        context_after: Line after insertion point for disambiguation (optional)

    Returns:
        Either (index, indentation) or (None, error_message)
    """
    if line_number is not None:
        # Line-based insertion (convert to 0-indexed)
        idx = line_number - 1
        if 0 <= idx <= len(code_lines):
            # Get indentation from previous line or next line
            if idx > 0:
                indent = get_indentation(code_lines[idx - 1])
            elif idx < len(code_lines):
                indent = get_indentation(code_lines[idx])
            else:
                indent = ""
            return idx, indent
        else:
            return (
                None,
                f"Line number {line_number} out of range (1-{len(code_lines)})",
            )

    if context_before is not None:
        # Context-based search
        matches = []
        for i, line in enumerate(code_lines):
            if context_before.strip() in line:
                # Check context_after if provided
                if context_after is not None:
                    if (
                        i + 1 < len(code_lines)
                        and context_after.strip() in code_lines[i + 1]
                    ):
                        matches.append(i + 1)
                else:
                    matches.append(i + 1)

        if len(matches) == 0:
            return None, f"Context not found: '{context_before}'"
        elif len(matches) > 1:
            return (
                None,
                f"Ambiguous context: '{context_before}' matches {len(matches)} locations. "
                f"Use context_after or line_number to disambiguate.",
            )
        else:
            idx = matches[0]
            indent = get_indentation(code_lines[idx - 1]) if idx > 0 else ""
            return idx, indent

    return None, "Must provide either line_number or context_before"


def format_hint_line(hint_type: str, hint_content: str, indentation: str) -> str:
    """Format a hint with proper indentation and syntax.

    Args:
        hint_type: "invariant", "assert", "requires", "ensures", "decreases"
        hint_content: The actual hint expression
        indentation: Leading whitespace

    Returns:
        Formatted hint line (no trailing newline)
    """
    return f"{indentation}{hint_type} {hint_content}"


# ===== Phase 3: Specialized Insertion Tools =====


def insert_hint(
    messages: list[dict],
    hint_type: str,
    hint_content: str,
    line_number: int | None = None,
    context_before: str | None = None,
    context_after: str | None = None,
) -> dict[str, str | bool]:
    """Generic hint insertion function.

    Returns:
        dict with keys:
            - success (bool): True if insertion succeeded
            - message (str): Success/error message
            - code (str): Full code after insertion (on success)

    Args:
        messages: Message history containing code state
        hint_type: Type of hint to insert
        hint_content: Content of the hint
        line_number: Optional line number (1-indexed)
        context_before: Optional context before insertion point
        context_after: Optional context after insertion point
    """
    # Get current code state
    current_code = get_code_state(messages)
    if current_code is None:
        return {
            "success": False,
            "message": "Error: No code state found. This is a system error.",
            "code": "",
        }

    # Parse into lines
    code_lines = current_code.split("\n")

    # Find insertion point
    result = find_insertion_point(
        code_lines, line_number, context_before, context_after
    )
    if result[0] is None:
        return {
            "success": False,
            "message": result[1],
            "code": current_code,
        }

    idx, indent = result
    assert idx is not None, "idx should not be None after early return check"

    # Format hint
    hint_line = format_hint_line(hint_type, hint_content, indent)

    # Insert at position
    code_lines.insert(idx, hint_line)

    # Reconstruct code
    new_code = "\n".join(code_lines)

    # DON'T update state here - let the agent do it after collecting all tool results
    # This prevents breaking tool_use/tool_result pairing in message history

    return {
        "success": True,
        "message": f"✓ {hint_type.capitalize()} inserted at line {idx + 1}",
        "code": new_code,
    }


def insert_invariant(
    messages: list[dict],
    invariant: str,
    line_number: int | None = None,
    context_before: str | None = None,
    context_after: str | None = None,
) -> dict[str, str | bool]:
    """Insert loop invariant hint.

    Args:
        messages: Message history
        invariant: Invariant expression
        line_number: Optional line number (1-indexed)
        context_before: Optional context before insertion
        context_after: Optional context after insertion

    Returns:
        Result dict with success, message, and code
    """
    return insert_hint(
        messages, "invariant", invariant, line_number, context_before, context_after
    )


def insert_assertion(
    messages: list[dict],
    assertion: str,
    line_number: int | None = None,
    context_before: str | None = None,
    context_after: str | None = None,
) -> dict[str, str | bool]:
    """Insert assertion hint.

    Args:
        messages: Message history
        assertion: Assertion expression
        line_number: Optional line number (1-indexed)
        context_before: Optional context before insertion
        context_after: Optional context after insertion

    Returns:
        Result dict with success, message, and code
    """
    return insert_hint(
        messages, "assert", assertion, line_number, context_before, context_after
    )


def insert_precondition(
    messages: list[dict],
    precondition: str,
    line_number: int | None = None,
    context_before: str | None = None,
    context_after: str | None = None,
) -> dict[str, str | bool]:
    """Insert function precondition (requires).

    Args:
        messages: Message history
        precondition: Precondition expression
        line_number: Optional line number (1-indexed)
        context_before: Optional context before insertion
        context_after: Optional context after insertion

    Returns:
        Result dict with success, message, and code
    """
    return insert_hint(
        messages, "requires", precondition, line_number, context_before, context_after
    )


def insert_postcondition(
    messages: list[dict],
    postcondition: str,
    line_number: int | None = None,
    context_before: str | None = None,
    context_after: str | None = None,
) -> dict[str, str | bool]:
    """Insert function postcondition (ensures).

    Args:
        messages: Message history
        postcondition: Postcondition expression
        line_number: Optional line number (1-indexed)
        context_before: Optional context before insertion
        context_after: Optional context after insertion

    Returns:
        Result dict with success, message, and code
    """
    return insert_hint(
        messages, "ensures", postcondition, line_number, context_before, context_after
    )


def insert_measure(
    messages: list[dict],
    measure: str,
    line_number: int | None = None,
    context_before: str | None = None,
    context_after: str | None = None,
) -> dict[str, str | bool]:
    """Insert termination measure (decreases).

    Args:
        messages: Message history
        measure: Decreases expression
        line_number: Optional line number (1-indexed)
        context_before: Optional context before insertion
        context_after: Optional context after insertion

    Returns:
        Result dict with success, message, and code
    """
    return insert_hint(
        messages, "decreases", measure, line_number, context_before, context_after
    )


# ===== Phase 4: Updated verify_dafny =====


def verify_dafny(messages: list[dict]) -> dict[str, str | bool]:
    """Verify current code state using Dafny compiler.

    Reads code from message history state, runs verification, returns
    results including full rendered code.

    Args:
        messages: Message history containing code state

    Returns:
        dict with keys:
            - success (bool): True if verification succeeded
            - message (str): Success/failure message
            - code (str): Full current code with all hints
            - stdout (str): Standard output from Dafny
            - stderr (str): Standard error from Dafny
    """
    # Get current code state
    code = get_code_state(messages)
    if code is None:
        return {
            "success": False,
            "message": "Error: No code state found. Insert hints first.",
            "code": "",
            "stdout": "",
            "stderr": "",
        }

    # Check for verification bypass attempts (following DafnyBench methodology)
    if "{:verify false}" in code.lower():
        return {
            "success": False,
            "message": "Invalid code: contains {:verify false} which bypasses verification. "
            "You must properly verify the code with correct annotations.",
            "code": code,
            "stdout": "",
            "stderr": "Bypass attempt detected",
        }

    # Write to temporary file
    with tempfile.NamedTemporaryFile(mode="w", suffix=".dfy", delete=False) as tmp:
        tmp.write(code)
        temp_path = tmp.name

    try:
        # Run Dafny verification with timeout
        result = subprocess.run(
            ["dafny", "verify", temp_path],
            capture_output=True,
            text=True,
            timeout=30,
        )

        # Check for successful verification
        success = result.returncode == 0 and "0 errors" in result.stdout

        if success:
            message = "✓ Verification succeeded! All checks passed."
        else:
            error_output = result.stderr if result.stderr else result.stdout
            message = f"Verification failed:\n\n{error_output}"

        return {
            "success": success,
            "message": message,
            "code": code,  # Return full code
            "stdout": result.stdout,
            "stderr": result.stderr,
        }

    except subprocess.TimeoutExpired:
        return {
            "success": False,
            "message": "Verification timed out after 30 seconds. "
            "The program may be too complex or contain infinite loops.",
            "code": code,
            "stdout": "",
            "stderr": "Timeout",
        }
    finally:
        # Cleanup temporary file
        Path(temp_path).unlink(missing_ok=True)

The find_insertion_point function

The find_insertion_point function first checks for a line number. If it’s provided, it uses that. Otherwise, it uses the context to find the insertion point. If the context is ambiguous, it returns an error message to the agent.

This dual location specification makes the tools more robust and user-friendly for the agent. It’s a great example of how to design tool APIs with the agent in mind.