๐ธ celery-flower-mcp
Give your AI assistant full control over Celery โ monitor workers, manage tasks, inspect queues.
Features ยท Quick Start ยท Configuration ยท Tools ยท Development ยท Contributing
What is this?
celery-flower-mcp is a Model Context Protocol server that exposes the full Celery Flower REST API as MCP tools. Point it at your Flower instance and your AI assistant (Claude, Cursor, Windsurf, etc.) can:
- Monitor workers, tasks, and queues in real time
- Control worker pools โ grow, shrink, autoscale, restart, shut down
- Manage tasks โ apply, revoke, abort, set timeouts and rate limits
- Inspect queues โ check depths, add/remove consumers
All 21 Flower API endpoints are covered.
Features
- Full API coverage โ every Flower REST endpoint exposed as an MCP tool
- Dependency injection via dishka โ clean, testable architecture
- Pydantic Settings โ typed configuration with
.envfile support - Async throughout โ built on
httpx+FastMCP - 65 tests โ 49 unit tests (99% coverage) + 16 integration tests against real Flower
- Strict typing โ mypy strict mode, fully annotated
Quick Start
Install via uvx
FLOWER_URL=http://localhost:5555 uvx celery-flower-mcp
Install from source
git clone https://github.com/Darius1223/celery-flower-mcp
cd celery-flower-mcp
uv sync
uv run python -m source.main
Claude Desktop
Add to ~/Library/Application Support/Claude/claude_desktop_config.json:
{
"mcpServers": {
"celery-flower": {
"command": "uvx",
"args": ["celery-flower-mcp"],
"env": {
"FLOWER_URL": "http://localhost:5555"
}
}
}
}
Configuration
Configuration is read from environment variables or a .env file in the project root. Copy .env.example to get started:
cp .env.example .env
| Variable | Default | Description |
|---|---|---|
FLOWER_URL |
http://localhost:5555 |
Base URL of your Flower instance |
FLOWER_USERNAME |
โ | Basic auth username |
FLOWER_PASSWORD |
โ | Basic auth password |
FLOWER_API_TOKEN |
โ | Bearer token (takes priority over basic auth) |
Available Tools
Workers (8 tools)
| Tool | Description |
|---|---|
list_workers |
List all workers โ optionally filter by name, refresh live stats, or get status only |
shutdown_worker |
Gracefully shut down a worker |
restart_worker_pool |
Restart a worker's process pool |
grow_worker_pool |
Add N processes to a worker's pool |
shrink_worker_pool |
Remove N processes from a worker's pool |
autoscale_worker_pool |
Configure autoscale min/max bounds |
add_queue_consumer |
Make a worker start consuming from a queue |
cancel_queue_consumer |
Make a worker stop consuming from a queue |
Tasks (11 tools)
| Tool | Description |
|---|---|
list_tasks |
List tasks with filters: state, worker, name, date range, search, pagination |
list_task_types |
List all registered task types across workers |
get_task_info |
Get full details for a task by UUID |
get_task_result |
Retrieve a task's result (with optional timeout) |
apply_task |
Execute a task synchronously and wait for the result |
async_apply_task |
Dispatch a task asynchronously, returns task UUID |
send_task |
Send a task by name โ no registration required on worker side |
abort_task |
Abort a running task |
revoke_task |
Revoke a task; optionally terminate with a signal |
set_task_timeout |
Set soft and/or hard time limits for a task on a worker |
set_task_rate_limit |
Set rate limit for a task on a worker (e.g. 100/m) |
Queues & Health (2 tools)
| Tool | Description |
|---|---|
get_queue_lengths |
Get the current depth of all configured queues |
healthcheck |
Check whether the Flower instance is reachable and healthy |
Architecture
source/
โโโ main.py # FastMCP server entry point + dishka container wiring
โโโ settings.py # Pydantic Settings โ typed config from env / .env
โโโ client.py # Async HTTP client wrapping Flower REST API
โโโ providers.py # dishka Provider โ manages FlowerClient lifecycle
โโโ tools/
โโโ workers.py # 8 worker management tools
โโโ tasks.py # 11 task management tools
โโโ queues.py # 2 queue / health tools
dishka manages the FlowerClient lifecycle: created once at startup, closed cleanly on shutdown via an async generator provider.
Development
make fmt # auto-format with ruff
make lint # lint with ruff
make typecheck # type-check with mypy (strict)
make test # run 49 unit tests
make cov # unit tests + coverage report
make all # fmt + lint + typecheck
Testing
The test suite is split into two layers:
Unit tests (tests/) โ fast, no external dependencies, use pytest-httpx to mock HTTP calls:
make test
# or
uv run pytest tests/ -m "not integration"
Integration tests (tests/integration/) โ run against a real Flower instance backed by Redis and a live Celery worker, all managed by Docker Compose:
make integration
This command:
- Builds and starts the Docker Compose stack (
docker-compose.test.yml) โ Redis โ Celery worker โ Flower - Waits for Flower's
/healthcheckendpoint to return OK - Runs the 16 integration tests against
http://localhost:5555 - Tears down the stack when done
The stack is defined in docker-compose.test.yml. The worker and Flower images are built from tests/integration/Dockerfile.worker and tests/integration/Dockerfile.flower.
To start the stack manually for exploratory testing:
docker compose -f docker-compose.test.yml up -d --build
# run tests, explore, etc.
make integration-down # stop + remove volumes
Integration tests use pytest.mark.asyncio(loop_scope="session") so all tests share one event loop โ this avoids RuntimeError: Event loop is closed when httpx transports are cleaned up across test boundaries on Python 3.14.
See CONTRIBUTING.md for details on adding new tools or submitting a PR.
Changelog
See CHANGELOG.md.
License
MIT