A data analysis agent is an AI system that takes natural language questions about data and autonomously generates SQL queries, executes Python/pandas code, creates visualizations, and produces natural language summaries. This pattern powers tools like ChatGPT Code Interpreter, Jupyter AI, and enterprise analytics platforms. This guide covers the architecture, sandboxed execution, and working code for building your own.1)2)3)
Data analysis agents follow a plan-execute-interpret loop. The agent receives a question, inspects the data schema, generates code (SQL or Python), executes it in a sandbox, interprets the results, and optionally generates visualizations.4)
Before generating any query, the agent must understand the data structure. This means reading table schemas, column types, sample rows, and relationships.5)
import sqlite3 def get_schema_info(db_path: str) -> str: """Extract schema information from a SQLite database.""" conn = sqlite3.connect(db_path) cursor = conn.cursor() # Get all tables cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = cursor.fetchall() schema_info = [] for (table_name,) in tables: cursor.execute(f"PRAGMA table_info({table_name})") columns = cursor.fetchall() col_defs = ", ".join(f"{c[1]} {c[2]}" for c in columns) schema_info.append(f"Table {table_name}: ({col_defs})") # Sample rows for context cursor.execute(f"SELECT * FROM {table_name} LIMIT 3") samples = cursor.fetchall() if samples: schema_info.append(f" Sample: {samples[0]}") conn.close() return "\n".join(schema_info)
The agent converts natural language to SQL using the schema as context. Key techniques:
All generated code must run in a sandbox to prevent data corruption, filesystem access, or resource exhaustion. Options:
The agent generates matplotlib or plotly code based on the data shape and the question type (trend → line chart, comparison → bar chart, distribution → histogram).
A complete agent using OpenAI for reasoning and SQLite for data.
import json, os, sqlite3, subprocess, tempfile from openai import OpenAI client = OpenAI() MODEL = "gpt-4o" def get_schema(db_path: str) -> str: conn = sqlite3.connect(db_path) cur = conn.cursor() cur.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = cur.fetchall() schema_parts = [] for (tbl,) in tables: cur.execute(f"PRAGMA table_info({tbl})") cols = cur.fetchall() col_str = ", ".join(f"{c[1]} {c[2]}" for c in cols) schema_parts.append(f"CREATE TABLE {tbl} ({col_str});") cur.execute(f"SELECT * FROM {tbl} LIMIT 2") rows = cur.fetchall() if rows: schema_parts.append(f"-- Example row: {rows[0]}") conn.close() return "\n".join(schema_parts) def execute_sql(db_path: str, query: str) -> str: """Execute SQL query safely with timeout.""" try: conn = sqlite3.connect(db_path) conn.execute("PRAGMA query_only = ON") # Read-only mode cur = conn.cursor() cur.execute(query) columns = [desc[0] for desc in cur.description] if cur.description else [] rows = cur.fetchmany(100) # Limit results conn.close() if not rows: return "Query returned no results." header = " | ".join(columns) data = "\n".join(" | ".join(str(v) for v in row) for row in rows) return f"{header}\n{'-' * len(header)}\n{data}" except Exception as e: return f"SQL Error: {e}" def execute_python_sandboxed(code: str, timeout: int = 30) -> str: """Execute Python code in a subprocess sandbox.""" with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: f.write(code) f.flush() try: result = subprocess.run( ["python3", f.name], capture_output=True, text=True, timeout=timeout, env={**os.environ, "MPLBACKEND": "Agg"}, # Non-interactive plots ) output = result.stdout + result.stderr return output[:5000] except subprocess.TimeoutExpired: return "Execution timed out." finally: os.unlink(f.name) TOOLS = [ { "type": "function", "function": { "name": "run_sql", "description": "Execute a SQL query against the database. Use for data retrieval.", "parameters": { "type": "object", "properties": { "query": {"type": "string", "description": "SQL SELECT query to execute"} }, "required": ["query"], }, }, }, { "type": "function", "function": { "name": "run_python", "description": "Execute Python code for data analysis or visualization. Use pandas, matplotlib, etc.", "parameters": { "type": "object", "properties": { "code": {"type": "string", "description": "Python code to execute"} }, "required": ["code"], }, }, }, ] def analyze(question: str, db_path: str) -> str: """Run the data analysis agent.""" schema = get_schema(db_path) messages = [ { "role": "system", "content": ( "You are a data analysis agent. You have access to a SQLite database.\n" f"Schema:\n{schema}\n\n" "Workflow: 1) Understand the question 2) Write SQL or Python to answer it " "3) Interpret the results 4) Generate visualizations if helpful " "5) Provide a clear natural language summary.\n" "Always use run_sql for queries and run_python for analysis/charts." ), }, {"role": "user", "content": question}, ] for _ in range(10): response = client.chat.completions.create( model=MODEL, messages=messages, tools=TOOLS ) msg = response.choices[0].message messages.append(msg) if not msg.tool_calls: return msg.content for tc in msg.tool_calls: name = tc.function.name args = json.loads(tc.function.arguments) if name == "run_sql": result = execute_sql(db_path, args["query"]) elif name == "run_python": result = execute_python_sandboxed(args["code"]) else: result = f"Unknown tool: {name}" messages.append({ "role": "tool", "tool_call_id": tc.id, "content": result, }) return "Analysis incomplete." # Usage if __name__ == "__main__": report = analyze( "What are the top 5 products by revenue? Show a bar chart.", "sales.db" ) print(report)
A graph-based approach with separate nodes for SQL generation, execution, analysis, and visualization.
import json, sqlite3, operator from typing import Annotated, TypedDict from langgraph.graph import StateGraph, END from langchain_openai import ChatOpenAI llm = ChatOpenAI(model="gpt-4o") class AnalysisState(TypedDict): question: str schema: str sql_query: str query_result: str python_code: str python_output: str analysis: str error: str retry_count: int def inspect_schema(state: AnalysisState) -> AnalysisState: """Load database schema.""" conn = sqlite3.connect("data.db") cur = conn.cursor() cur.execute("SELECT sql FROM sqlite_master WHERE type='table'") schemas = [row[0] for row in cur.fetchall() if row[0]] conn.close() return {"schema": "\n".join(schemas)} def generate_sql(state: AnalysisState) -> AnalysisState: """Generate SQL from natural language question.""" prompt = ( f"Given this schema:\n{state['schema']}\n\n" f"Write a SQL query to answer: {state['question']}\n" f"Return ONLY the SQL query, no explanation." ) if state.get("error"): prompt += f"\nPrevious error: {state['error']}\nFix the query." response = llm.invoke(prompt) sql = response.content.strip().strip("`").replace("sql\n", "") return {"sql_query": sql, "error": ""} def execute_query(state: AnalysisState) -> AnalysisState: """Execute SQL query safely.""" try: conn = sqlite3.connect("data.db") conn.execute("PRAGMA query_only = ON") cur = conn.cursor() cur.execute(state["sql_query"]) cols = [d[0] for d in cur.description] if cur.description else [] rows = cur.fetchmany(50) conn.close() result = json.dumps({"columns": cols, "rows": rows}) return {"query_result": result} except Exception as e: return {"error": str(e), "retry_count": state.get("retry_count", 0) + 1} def check_execution(state: AnalysisState) -> str: """Route based on execution success.""" if state.get("error") and state.get("retry_count", 0) < 3: return "retry" return "analyze" def analyze_results(state: AnalysisState) -> AnalysisState: """Generate natural language analysis and optional viz code.""" response = llm.invoke( f"Question: {state['question']}\n" f"SQL: {state['sql_query']}\n" f"Results: {state['query_result']}\n\n" f"Provide: 1) A clear summary of the findings. " f"2) If a chart would be helpful, provide matplotlib Python code " f"that creates it. Include the data inline in the code." ) return {"analysis": response.content} # Build graph workflow = StateGraph(AnalysisState) workflow.add_node("inspect", inspect_schema) workflow.add_node("generate_sql", generate_sql) workflow.add_node("execute", execute_query) workflow.add_node("analyze", analyze_results) workflow.set_entry_point("inspect") workflow.add_edge("inspect", "generate_sql") workflow.add_edge("generate_sql", "execute") workflow.add_conditional_edges("execute", check_execution, { "retry": "generate_sql", "analyze": "analyze", }) workflow.add_edge("analyze", END) app = workflow.compile() result = app.invoke({ "question": "What is the monthly revenue trend for the last 12 months?", "schema": "", "sql_query": "", "query_result": "", "python_code": "", "python_output": "", "analysis": "", "error": "", "retry_count": 0, }) print(result["analysis"])
| Criteria | Single Agent (ReAct) | LangGraph Pipeline |
|---|---|---|
| Control flow | LLM decides next step | Explicit graph edges |
| Error recovery | LLM interprets errors | Conditional routing with retry counter |
| SQL validation | LLM self-checks | Can add validation node |
| Cost | Lower (fewer LLM calls for simple queries) | Higher (fixed nodes always execute) |
| Flexibility | Handles unexpected questions well | Best for known workflow patterns |
| Observability | Tool call logs | Full graph trace with state snapshots |
| Sandboxing | Per-tool isolation | Per-node isolation possible |
| Best for | Ad-hoc analysis, exploration | Production pipelines, dashboards |
Running LLM-generated code is inherently risky. Here are the main approaches ranked by isolation level:
| Strategy | Isolation | Performance | Setup Complexity |
|---|---|---|---|
| RestrictedPython | Medium (whitelist ops) | Fast (in-process) | Low |
| subprocess + timeout | Low (same machine) | Fast | Low |
| Docker container | High (full isolation) | Slower (container startup) | Medium |
| Cloud sandbox (Modal, E2B) | Very High (remote VM) | Slower (network) | Low (managed) |
| WebAssembly (Pyodide) | High (browser sandbox) | Medium | Medium |
Docker sandboxing example:
import docker, tempfile, os def execute_in_docker(code: str, timeout: int = 30) -> str: """Execute Python code in an isolated Docker container.""" client = docker.from_env() with tempfile.NamedTemporaryFile( mode="w", suffix=".py", delete=False, dir="/tmp" ) as f: f.write(code) script_path = f.name try: container = client.containers.run( "python:3.12-slim", f"python /script/{os.path.basename(script_path)}", volumes={"/tmp": {"bind": "/script", "mode": "ro"}}, mem_limit="256m", cpu_period=100000, cpu_quota=50000, # 50% of one CPU network_disabled=True, # No network access remove=True, timeout=timeout, ) return container.decode("utf-8")[:5000] except Exception as e: return f"Execution error: {e}" finally: os.unlink(script_path)
PRAGMA query_only = ON for SQLite or read-only database usersThe agent should select chart types based on the data and question:
| Question Pattern | Chart Type | Library |
|---|---|---|
| Trend over time | Line chart | matplotlib / plotly |
| Comparison between categories | Bar chart | matplotlib / plotly |
| Distribution of values | Histogram | matplotlib / seaborn |
| Part of whole | Pie/donut chart | matplotlib / plotly |
| Relationship between variables | Scatter plot | matplotlib / seaborn |
| Geographic data | Map | folium / plotly |