Introduction
I love Dagster and use it extensively for data orchestration and ML pipelines. You can read more about it in these posts: Remove Complexity from Your RAG Applications, RLHF with Dagster and Modal, and How to Fine-Tune LLMs. I also teach Dagster in my ML Engineering Course.
So I’ve always wondered if MCP could integrate well with Dagster. Let’s explore this possibility!
Dagster for ML and Vice Versa!
There are many ways to use Dagster in ML/AI workflows, see Dagster for ML-pipelines, Routing LLM prompts, AI Reference Architectures.
But what about when ML is going to use Dagster itself?
- Can an agent use Dagster as a tool?
- Can an agent leverage declarative assets as resources?
- Can an agent create new assets?
Spoiler: YES! Long story short, we will write Dagster MCP and connect Agent SDK from OpenAI to it.
Writing a Dagster MCP Server
Let’s vibe with a code-first version of the Dagster MCP, based on the Dagster GraphQL API. Here is my prompt for Cursor:
I want to create an MCP (Model Context Protocol) server for Dagster, a data pipeline orchestrator.
Here is information about MCP:
###
[copy-paste]https://github.com/modelcontextprotocol/python-sdk/blob/main/README.md
###
Here is the complete Dagster GraphQL documentation:
###
[copy-paste]https://docs.dagster.io/guides/operate/graphql
###
Here is the full schema output from this query
{
__schema {
types {
name
description
fields {
name
description
}
}
}
}
###
[copy-paste from this query]
###
Carefully analyze the documentation above and identify the most important functionalities that could be useful for LLM agents interacting with Dagster via MCP. These functionalities should be simple yet practical, something a real data engineering team can use and rely on.
Write an MCP server based on this! Use simple and minimal code, don't overengineer, prioritize simplicity and maintainability, and don't add comments.
After several iterations, extensive refactoring and testing, this is what I got! Not bad!
from mcp.server.fastmcp import FastMCP
import httpx
import json
from typing import Dict, Optional, Any, List, Union
class DagsterGraphqlClient:
"""Client for interacting with Dagster GraphQL API"""
def __init__(self, graphql_url: str = "http://localhost:3000/graphql"):
self.graphql_url = graphql_url
async def execute_graphql(self, query: str, variables: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Execute a GraphQL query against the Dagster API"""
async with httpx.AsyncClient() as client:
response = await client.post(
self.graphql_url,
json={"query": query, "variables": variables or {}}
)
return response.json()
async def list_repositories(self) -> Dict[str, Union[List[Dict[str, str]], str]]:
"""List all available Dagster repositories"""
query = """
query {
repositoriesOrError {
... on RepositoryConnection {
nodes {
name
location {
name
}
}
}
}
}
"""
result = await self.execute_graphql(query)
if "errors" in result:
return {"error": result["errors"][0]["message"]}
repos = result.get("data", {}).get("repositoriesOrError", {}).get("nodes", [])
return {"repositories": [{"name": r["name"], "location": r["location"]["name"]} for r in repos]}
# See full code on github https://github.com/kyryl-opens-ml/mcp-server-dagster/blob/main/src/mcp_dagster/server.py
async def get_asset_info(self, asset_key: str) -> Dict[str, Any]:
"""Get detailed information about a specific asset"""
query = """
query AssetQuery($assetKey: AssetKeyInput!) {
assetOrError(assetKey: $assetKey) {
... on Asset {
key {
path
}
definition {
description
}
}
... on AssetNotFoundError {
message
}
}
}
"""
asset_key_parts = asset_key.split("/")
variables = {
"assetKey": {"path": asset_key_parts}
}
result = await self.execute_graphql(query, variables)
if "errors" in result:
return {"error": result["errors"][0]["message"]}
asset_data = result.get("data", {}).get("assetOrError", {})
if "message" in asset_data:
return {"error": asset_data["message"]}
formatted_asset = {
"asset_key": "/".join(asset_data.get("key", {}).get("path", [])),
"description": asset_data.get("definition", {}).get("description")
}
jobs_query = """
query AssetJobsQuery($assetKey: AssetKeyInput!) {
assetOrError(assetKey: $assetKey) {
... on Asset {
definition {
jobNames
}
}
}
}
"""
jobs_result = await self.execute_graphql(jobs_query, variables)
if "errors" not in jobs_result:
jobs_data = jobs_result.get("data", {}).get("assetOrError", {}).get("definition", {})
if "jobNames" in jobs_data:
formatted_asset["materialized_by_jobs"] = jobs_data["jobNames"]
return formatted_asset
mcp = FastMCP("Dagster Explorer", description="A simple MCP server for Dagster", dependencies=["httpx"])
dagster_client = DagsterGraphqlClient()
@mcp.tool()
async def list_repositories() -> dict:
"""List all available Dagster repositories"""
return await dagster_client.list_repositories()
@mcp.tool()
async def list_jobs(repository_location: str, repository_name: str) -> dict:
"""List all jobs in a specific repository"""
return await dagster_client.list_jobs(repository_location, repository_name)
@mcp.tool()
async def list_assets(repository_location: str, repository_name: str) -> dict:
"""List all assets in a specific repository"""
return await dagster_client.list_assets(repository_location, repository_name)
@mcp.tool()
async def recent_runs(limit: int = 10) -> dict:
"""Get recent Dagster runs"""
return await dagster_client.get_recent_runs(limit)
@mcp.tool()
async def get_run_info(run_id: str) -> dict:
"""Get detailed information about a specific run"""
return await dagster_client.get_run_info(run_id)
@mcp.tool()
async def launch_run(repository_location: str, repository_name: str, job_name: str, run_config: str = "{}") -> dict:
"""Launch a Dagster job run"""
return await dagster_client.launch_run(repository_location, repository_name, job_name, run_config)
@mcp.tool()
async def materialize_asset(asset_key: str, repository_location: str, repository_name: str) -> dict:
"""Materialize a specific Dagster asset"""
return await dagster_client.materialize_asset(asset_key, repository_location, repository_name)
@mcp.tool()
async def terminate_run(run_id: str) -> dict:
"""Terminate an in-progress Dagster run"""
return await dagster_client.terminate_run(run_id)
@mcp.tool()
async def get_asset_info(asset_key: str) -> dict:
"""Get detailed information about a specific asset"""
return await dagster_client.get_asset_info(asset_key)
if __name__ == "__main__":
# Run with SSE transport
mcp.run(transport="sse")Connecting with Agent
Great! With the MCP server ready, we can connect Dagster to the Agent and see how it can help the data engineering team.
import asyncio
from agents import Agent, Runner, ModelSettings
from agents.mcp import MCPServerSse
async def main():
async with MCPServerSse(
name="Dagster MCP Server",
params={
"url": "http://0.0.0.0:8000/sse",
},
) as server:
tools = await server.list_tools()
print(f"Available tools: {[x.name for x in tools]}")
agent = Agent(
name="Dagster Explorer",
instructions="""You are a Data Engineering Assistant specializing in Dagster workflows.
You help users explore their Dagster instance, monitor pipelines, and troubleshoot issues.
When interacting with the Dagster instance:
1. Start by exploring available repositories, jobs, and assets
2. Provide clear, concise explanations of pipeline components
3. Help users materialize assets and monitor runs
4. Summarize complex Dagster information in a user-friendly way
Always confirm actions before materializing assets or launching jobs.""",
mcp_servers=[server],
model_settings=ModelSettings(tool_choice="auto", parallel_tool_calls=True),
)
while True:
user_input = input("\nEnter your question (or 'exit' to quit): ")
if user_input.lower() == 'exit':
break
print("\nProcessing your request...")
result = await Runner.run(starting_agent=agent, input=user_input)
print(f"\nResponse: {result.final_output}")
if __name__ == "__main__":
asyncio.run(main())
Make sure to run both dagster-mcp and dagster itself. For this testing, I used simple assets from the homepage of Dagster.
from dagster import asset
from pandas import DataFrame, read_html, get_dummies
from sklearn.linear_model import LinearRegression
@asset
def country_populations() -> DataFrame:
df = read_html("https://tinyurl.com/mry64ebh")[0]
df.columns = ["country", "pop2022", "pop2023", "change", "continent", "region"]
df["change"] = df["change"].str.rstrip("%").str.replace("−", "-").astype("float")
return df
@asset
def continent_change_model(country_populations: DataFrame) -> LinearRegression:
data = country_populations.dropna(subset=["change"])
return LinearRegression().fit(get_dummies(data[["continent"]]), data["change"])
@asset
def continent_stats(country_populations: DataFrame, continent_change_model: LinearRegression) -> DataFrame:
result = country_populations.groupby("continent").sum()
result["pop_change_factor"] = continent_change_model.coef_
return resultLet’s ask our data engineering agent some questions:
Q1: What assets are available in my Dagster instance and what do they do?
Response: Here are the available assets in your Dagster instance:
1. **continent_change_model**
- Description: Not provided
- Materialized by: `__ASSET_JOB`
2. **continent_stats**
- Description: Not provided
- Materialized by: `__ASSET_JOB`
3. **country_populations**
- Description: Not provided
- Materialized by: `__ASSET_JOB`
It seems that the descriptions are not available, so I can help you explore more or materialize any asset if needed. Let me know what you'd like to do!
Good! Pass.
Q2: Can you materialize the
continent_statsasset and show me the result?
Response: The materialization of the `continent_stats` asset is still in progress. I'll keep checking the status for you
Looks like it needs one more tool to wait for runs to finish, but when I checked the UI – assets really materialized after some time, so fails, but very close.
Q3: Check the status of recent runs and provide a summary of any failures
Response: The recent Dagster runs indicate that there were no failures. Here's a summary of the latest runs:
1. **Run ID**: `7cfd51b9-5518-4627-9566-6ba3deb700ff`
- **Status**: SUCCESS
- **Job Name**: `__ASSET_JOB`
- **Start Time**: 1744148542.15
- **End Time**: 1744148546.39
2. **Run ID**: `1f643943-834c-41df-99f6-f8a2f125d76d`
- **Status**: SUCCESS
- **Job Name**: `__ASSET_JOB`
- **Start Time**: 1744148355.01
- **End Time**: 1744148360.42
All recent runs were successful. If you have any specific actions or need further assistance, let me know!
Basic, but nevertheless — pass.
Let’s ask something we know current MCP implementation cannot do.
Q4: Create a new monthly aggregation asset that depends on
continent_stats.
Response: To create a new asset in Dagster, such as a "monthly aggregation" that depends on an existing asset like "continent_stats," you'll typically follow these steps:
1. **Define the Asset**: You'll need to create a new asset definition in your Dagster repository code. This involves specifying the transformation logic, dependencies, and any configuration needed.
2. **Specify Dependencies**: Ensure the new asset specifies that it depends on "continent_stats." This is typically done by including the `AssetIn.materializes` or equivalent parameter.
3. **Deployment**: Update your Dagster repository to include the new asset definition. This usually involves modifying Python code and redeploying your repository.
4. **Test and Materialize**: Once deployed, you can test the asset by materializing it through the Dagster interface or CLI.
Would you like to see the existing repositories and assets to understand the current state better, or do you need guidance on writing the asset code?
Good, I would call it a pass — because the agent did not call any tools, realized that there was no right tool for this task, and just provided guidance.
Afterthoughts!
And just like that — we switched the paradigm. Dagster, once the orchestrator, became orchestrated by an LLM agent. For the complete implementation and Python package, check out this repo: https://github.com/kyryl-opens-ml/mcp-server-dagster.
Super cool! I set it up already. Watching for more to come
Pingback: AI Engineering Product Template for Reliable Coding Agents