Orchestrate Your Data via LLMs: Meet the Dagster MCP Server

Introduction

I love Dagster and use it extensively for data orchestration and ML pipelines. You can read more about it in these posts: Remove Complexity from Your RAG Applications, RLHF with Dagster and Modal, and How to Fine-Tune LLMs. I also teach Dagster in my ML Engineering Course.

So I’ve always wondered if MCP could integrate well with Dagster. Let’s explore this possibility!

Dagster for ML and Vice Versa!

There are many ways to use Dagster in ML/AI workflows, see Dagster for ML-pipelines, Routing LLM prompts, AI Reference Architectures.

But what about when ML is going to use Dagster itself?

  • Can an agent use Dagster as a tool?
  • Can an agent leverage declarative assets as resources?
  • Can an agent create new assets?

Spoiler: YES! Long story short, we will write Dagster MCP and connect Agent SDK from OpenAI to it.

Writing a Dagster MCP Server

Let’s vibe with a code-first version of the Dagster MCP, based on the Dagster GraphQL API. Here is my prompt for Cursor:

I want to create an MCP (Model Context Protocol) server for Dagster, a data pipeline orchestrator.

Here is information about MCP:  
###
[copy-paste]https://github.com/modelcontextprotocol/python-sdk/blob/main/README.md  
###

Here is the complete Dagster GraphQL documentation:

###
[copy-paste]https://docs.dagster.io/guides/operate/graphql  
###


Here is the full schema output from this query

{
  __schema {
    types {
      name
      description
      fields {
        name
        description
      }
    }
  }
}

###
[copy-paste from this query]
###


Carefully analyze the documentation above and identify the most important functionalities that could be useful for LLM agents interacting with Dagster via MCP. These functionalities should be simple yet practical, something a real data engineering team can use and rely on.

Write an MCP server based on this! Use simple and minimal code, don't overengineer, prioritize simplicity and maintainability, and don't add comments.

After several iterations, extensive refactoring and testing, this is what I got! Not bad!

from mcp.server.fastmcp import FastMCP
import httpx
import json
from typing import Dict, Optional, Any, List, Union

class DagsterGraphqlClient:
    """Client for interacting with Dagster GraphQL API"""
    
    def __init__(self, graphql_url: str = "http://localhost:3000/graphql"):
        self.graphql_url = graphql_url
        
    async def execute_graphql(self, query: str, variables: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """Execute a GraphQL query against the Dagster API"""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.graphql_url,
                json={"query": query, "variables": variables or {}}
            )
            return response.json()
    
    async def list_repositories(self) -> Dict[str, Union[List[Dict[str, str]], str]]:
        """List all available Dagster repositories"""
        query = """
        query {
          repositoriesOrError {
            ... on RepositoryConnection {
              nodes {
                name
                location {
                  name
                }
              }
            }
          }
        }
        """
        result = await self.execute_graphql(query)
        if "errors" in result:
            return {"error": result["errors"][0]["message"]}
        
        repos = result.get("data", {}).get("repositoriesOrError", {}).get("nodes", [])
        return {"repositories": [{"name": r["name"], "location": r["location"]["name"]} for r in repos]}
    
    # See full code on github https://github.com/kyryl-opens-ml/mcp-server-dagster/blob/main/src/mcp_dagster/server.py

    async def get_asset_info(self, asset_key: str) -> Dict[str, Any]:
        """Get detailed information about a specific asset"""
        query = """
        query AssetQuery($assetKey: AssetKeyInput!) {
          assetOrError(assetKey: $assetKey) {
            ... on Asset {
              key {
                path
              }
              definition {
                description
              }
            }
            ... on AssetNotFoundError {
              message
            }
          }
        }
        """
        
        asset_key_parts = asset_key.split("/")
        variables = {
            "assetKey": {"path": asset_key_parts}
        }
        
        result = await self.execute_graphql(query, variables)
        if "errors" in result:
            return {"error": result["errors"][0]["message"]}
        
        asset_data = result.get("data", {}).get("assetOrError", {})
        
        if "message" in asset_data:
            return {"error": asset_data["message"]}
        
        formatted_asset = {
            "asset_key": "/".join(asset_data.get("key", {}).get("path", [])),
            "description": asset_data.get("definition", {}).get("description")
        }
        jobs_query = """
        query AssetJobsQuery($assetKey: AssetKeyInput!) {
          assetOrError(assetKey: $assetKey) {
            ... on Asset {
              definition {
                jobNames
              }
            }
          }
        }
        """
        
        jobs_result = await self.execute_graphql(jobs_query, variables)
        if "errors" not in jobs_result:
            jobs_data = jobs_result.get("data", {}).get("assetOrError", {}).get("definition", {})
            if "jobNames" in jobs_data:
                formatted_asset["materialized_by_jobs"] = jobs_data["jobNames"]
        
        return formatted_asset


mcp = FastMCP("Dagster Explorer", description="A simple MCP server for Dagster", dependencies=["httpx"])
dagster_client = DagsterGraphqlClient()

@mcp.tool()
async def list_repositories() -> dict:
    """List all available Dagster repositories"""
    return await dagster_client.list_repositories()

@mcp.tool()
async def list_jobs(repository_location: str, repository_name: str) -> dict:
    """List all jobs in a specific repository"""
    return await dagster_client.list_jobs(repository_location, repository_name)

@mcp.tool()
async def list_assets(repository_location: str, repository_name: str) -> dict:
    """List all assets in a specific repository"""
    return await dagster_client.list_assets(repository_location, repository_name)

@mcp.tool()
async def recent_runs(limit: int = 10) -> dict:
    """Get recent Dagster runs"""
    return await dagster_client.get_recent_runs(limit)

@mcp.tool()
async def get_run_info(run_id: str) -> dict:
    """Get detailed information about a specific run"""
    return await dagster_client.get_run_info(run_id)

@mcp.tool()
async def launch_run(repository_location: str, repository_name: str, job_name: str, run_config: str = "{}") -> dict:
    """Launch a Dagster job run"""
    return await dagster_client.launch_run(repository_location, repository_name, job_name, run_config)

@mcp.tool()
async def materialize_asset(asset_key: str, repository_location: str, repository_name: str) -> dict:
    """Materialize a specific Dagster asset"""
    return await dagster_client.materialize_asset(asset_key, repository_location, repository_name)

@mcp.tool()
async def terminate_run(run_id: str) -> dict:
    """Terminate an in-progress Dagster run"""
    return await dagster_client.terminate_run(run_id)

@mcp.tool()
async def get_asset_info(asset_key: str) -> dict:
    """Get detailed information about a specific asset"""
    return await dagster_client.get_asset_info(asset_key)

if __name__ == "__main__":
    # Run with SSE transport
    mcp.run(transport="sse")

Connecting with Agent

Great! With the MCP server ready, we can connect Dagster to the Agent and see how it can help the data engineering team.

import asyncio

from agents import Agent, Runner, ModelSettings
from agents.mcp import MCPServerSse


async def main():
    async with MCPServerSse(
        name="Dagster MCP Server",
        params={
            "url": "http://0.0.0.0:8000/sse",
        },
    ) as server:
        tools = await server.list_tools()
        print(f"Available tools: {[x.name for x in tools]}")
    
        agent = Agent(
            name="Dagster Explorer",
            instructions="""You are a Data Engineering Assistant specializing in Dagster workflows.
You help users explore their Dagster instance, monitor pipelines, and troubleshoot issues.
When interacting with the Dagster instance:
1. Start by exploring available repositories, jobs, and assets
2. Provide clear, concise explanations of pipeline components
3. Help users materialize assets and monitor runs
4. Summarize complex Dagster information in a user-friendly way
Always confirm actions before materializing assets or launching jobs.""",
            mcp_servers=[server],
            model_settings=ModelSettings(tool_choice="auto", parallel_tool_calls=True),
        )

        while True:
            user_input = input("\nEnter your question (or 'exit' to quit): ")
            if user_input.lower() == 'exit':
                break
                
            print("\nProcessing your request...")
            result = await Runner.run(starting_agent=agent, input=user_input)
            print(f"\nResponse: {result.final_output}")


if __name__ == "__main__":
    asyncio.run(main())

Make sure to run both dagster-mcp and dagster itself. For this testing, I used simple assets from the homepage of Dagster.

from dagster import asset
from pandas import DataFrame, read_html, get_dummies
from sklearn.linear_model import LinearRegression

@asset
def country_populations() -> DataFrame:
    df = read_html("https://tinyurl.com/mry64ebh")[0]
    df.columns = ["country", "pop2022", "pop2023", "change", "continent", "region"]
    df["change"] = df["change"].str.rstrip("%").str.replace("−", "-").astype("float")
    return df

@asset
def continent_change_model(country_populations: DataFrame) -> LinearRegression:
    data = country_populations.dropna(subset=["change"])
    return LinearRegression().fit(get_dummies(data[["continent"]]), data["change"])

@asset
def continent_stats(country_populations: DataFrame, continent_change_model: LinearRegression) -> DataFrame:
    result = country_populations.groupby("continent").sum()
    result["pop_change_factor"] = continent_change_model.coef_
    return result

Let’s ask our data engineering agent some questions:

Q1: What assets are available in my Dagster instance and what do they do?

Response: Here are the available assets in your Dagster instance:

1. **continent_change_model**
   - Description: Not provided
   - Materialized by: `__ASSET_JOB`

2. **continent_stats**
   - Description: Not provided
   - Materialized by: `__ASSET_JOB`

3. **country_populations**
   - Description: Not provided
   - Materialized by: `__ASSET_JOB`

It seems that the descriptions are not available, so I can help you explore more or materialize any asset if needed. Let me know what you'd like to do!

Good! Pass.

Q2: Can you materialize the continent_stats asset and show me the result?

Response: The materialization of the `continent_stats` asset is still in progress. I'll keep checking the status for you

Looks like it needs one more tool to wait for runs to finish, but when I checked the UI – assets really materialized after some time, so fails, but very close.

Q3: Check the status of recent runs and provide a summary of any failures

Response: The recent Dagster runs indicate that there were no failures. Here's a summary of the latest runs:

1. **Run ID**: `7cfd51b9-5518-4627-9566-6ba3deb700ff`
   - **Status**: SUCCESS
   - **Job Name**: `__ASSET_JOB`
   - **Start Time**: 1744148542.15
   - **End Time**: 1744148546.39

2. **Run ID**: `1f643943-834c-41df-99f6-f8a2f125d76d`
   - **Status**: SUCCESS
   - **Job Name**: `__ASSET_JOB`
   - **Start Time**: 1744148355.01
   - **End Time**: 1744148360.42

All recent runs were successful. If you have any specific actions or need further assistance, let me know!

Basic, but nevertheless — pass.

Let’s ask something we know current MCP implementation cannot do.

Q4: Create a new monthly aggregation asset that depends on continent_stats.

Response: To create a new asset in Dagster, such as a "monthly aggregation" that depends on an existing asset like "continent_stats," you'll typically follow these steps:

1. **Define the Asset**: You'll need to create a new asset definition in your Dagster repository code. This involves specifying the transformation logic, dependencies, and any configuration needed.

2. **Specify Dependencies**: Ensure the new asset specifies that it depends on "continent_stats." This is typically done by including the `AssetIn.materializes` or equivalent parameter.

3. **Deployment**: Update your Dagster repository to include the new asset definition. This usually involves modifying Python code and redeploying your repository.

4. **Test and Materialize**: Once deployed, you can test the asset by materializing it through the Dagster interface or CLI.

Would you like to see the existing repositories and assets to understand the current state better, or do you need guidance on writing the asset code?

Good, I would call it a pass — because the agent did not call any tools, realized that there was no right tool for this task, and just provided guidance.

Afterthoughts!

And just like that — we switched the paradigm. Dagster, once the orchestrator, became orchestrated by an LLM agent. For the complete implementation and Python package, check out this repo: https://github.com/kyryl-opens-ml/mcp-server-dagster.

2 thoughts on “Orchestrate Your Data via LLMs: Meet the Dagster MCP Server”

  1. Pingback: AI Engineering Product Template for Reliable Coding Agents

Leave a Reply to Jackson HardeeCancel reply

Scroll to Top

Discover more from Kyryl Opens ML

Subscribe now to keep reading and get access to the full archive.

Continue reading