en un clic
write-script-python3
MUST use when writing Python scripts.
Installer avec Codex ou Claude Copiez ce prompt, collez-le dans Codex, Claude ou un autre assistant, puis laissez-le vérifier la page du skill et l'installer pour vous.
Menu
MUST use when writing Python scripts.
Installer avec Codex ou Claude Copiez ce prompt, collez-le dans Codex, Claude ou un autre assistant, puis laissez-le vérifier la page du skill et l'installer pour vous.
MUST use when writing Bun/TypeScript scripts.
MUST use when writing Bun Native scripts. The script must start with //native to run on the native worker.
MUST use when writing Deno/TypeScript scripts.
MUST use when writing Ansible playbooks.
MUST use when using the CLI, including debugging job failures and inspecting run history via `wmill job`.
MUST use when creating raw apps.
| name | write-script-python3 |
| description | MUST use when writing Python scripts. |
Place scripts in a folder.
After writing, tell the user which command fits what they want to do:
wmill script preview <script_path> — default when iterating on a local script. Runs the local file without deploying.wmill script run <path> — runs the script already deployed in the workspace. Use only when the user explicitly wants to test the deployed version, not local edits.wmill generate-metadata — regenerate the local .script.yaml (input schema) and .lock (resolved dependencies) for scripts you changed, and refresh their content hashes in wmill-lock.yaml. Local files only — not a deploy. See "Keep metadata in sync" below.wmill sync push — deploy local changes to the workspace. Only suggest/run this when the user explicitly asks to deploy/publish/push — not when they say "run", "try", or "test".If the user says "run the script", "try it", "test it", "does it work" while there are local edits to the script file, use script preview. Do NOT push the script to then script run it — pushing is a deploy, and deploying just to test overwrites the workspace version with untested changes.
Only use script run when:
Only use sync push when:
wmill-lock.yaml tracks a content hash for each item. Editing a script's content — most importantly adding or removing an import or changing main's arguments — invalidates that hash and leaves the .lock, the .script.yaml input schema, and the hash row out of date. Run wmill generate-metadata (scoped to what you touched) after such edits so the resolved lock, the auto-generated args UI (driven by .script.yaml), and wmill-lock.yaml all match the code. Leaving them stale produces spurious diffs in git-sync and CI.
This only writes local files (it is not a deploy), but it re-resolves dependencies, so it can bump unpinned versions (the same as deploying from the UI; expected, not a bug). So by default offer it and run it once the user agrees, rather than running it silently after every edit — unless the project's AGENTS.md opts into running metadata automatically (see the "Keeping metadata in sync" preference there). Either way YOU run the command, not the user. After running it, diff the regenerated .lock / .script.lock files and tell the user which dependency versions changed (e.g. requests 2.31.0 → 2.32.0), so they can catch an unwanted bump before deploying — even under Metadata: auto, since it's information, not a confirmation gate. Pin versions in code to keep them fixed.
With no path argument, generate-metadata regenerates only the items whose content hash drifted — not everything. Imports propagate: editing a script that others import marks every importer stale too, so a one-line change to a shared module can regenerate many locks (by design — their locks must reflect the imported code). If it touches more than you expect, run wmill generate-metadata --dry-run — it lists each stale item with a reason (content changed or depends on <path>) without changing anything — then narrow with a path argument (wmill generate-metadata f/foo) or --strict-folder-boundaries.
If the on-disk .lock and .script.yaml are already correct and only wmill-lock.yaml needs its hashes refreshed (hash drift, or bootstrapping missing entries), use wmill generate-metadata rehash — it re-records hashes from disk with no backend round-trip and no dependency changes.
If the user hasn't already told you to run/test/preview the script, offer it as a one-sentence next step (e.g. "Want me to run wmill script preview with sample args?"). Do not present a multi-option menu.
If the user already asked to test/run/try the script in their original request, skip the offer and just execute wmill script preview <path> -d '<args>' directly — pick plausible args from the script's declared parameters. The shape varies by language: main(...) for code languages, the SQL dialect's own placeholder syntax ($1 for PostgreSQL, ? for MySQL/Snowflake, @P1 for MSSQL, @name for BigQuery, etc.), positional $1, $2, … for Bash, param(...) for PowerShell.
wmill script preview does not deploy, but it still executes script code and may cause side effects; run it yourself when the user asked to test/preview (or after confirming that execution is intended). wmill generate-metadata does not deploy either — it only writes local files (locks, schemas, hashes) — but offer it before running (or run automatically if the project's AGENTS.md opts in), per "Keep metadata in sync" above. Only wmill sync push deploys to the workspace — run it only when the user explicitly asks to deploy/publish/push.
For a visual open-the-script-in-the-dev-page preview (rather than script preview's run-and-print-result), use the preview skill.
Use wmill resource-type list --schema to discover available resource types.
The script must contain at least one function called main:
def main(param1: str, param2: int):
# Your code here
return {"result": param1, "count": param2}
Do not call the main function. Libraries are installed automatically.
On Windmill, credentials and configuration are stored in resources and passed as parameters to main.
You need to redefine the type of the resources that are needed before the main function as TypedDict:
from typing import TypedDict
class postgresql(TypedDict):
host: str
port: int
user: str
password: str
dbname: str
def main(db: postgresql):
# db contains the database connection details
pass
Important rules:
Libraries are installed automatically. Do not show installation instructions.
import requests
import pandas as pd
from datetime import datetime
If an import name conflicts with a resource type:
# Wrong - don't rename the type
import stripe as stripe_lib
class stripe_type(TypedDict): ...
# Correct - rename the import
import stripe as stripe_sdk
class stripe(TypedDict):
api_key: str
Import the windmill client for platform interactions:
import wmill
See the SDK documentation for available methods.
For preprocessor scripts, the function should be named preprocessor and receives an event parameter:
from typing import TypedDict, Literal, Any
class Event(TypedDict):
kind: Literal["webhook", "http", "websocket", "kafka", "email", "nats", "postgres", "sqs", "mqtt", "gcp"]
body: Any
headers: dict[str, str]
query: dict[str, str]
def preprocessor(event: Event):
# Transform the event into flow input parameters
return {
"param1": event["body"]["field1"],
"param2": event["query"]["id"]
}
Windmill provides built-in support for S3-compatible storage operations.
To accept a file from S3 as input to a script, type the parameter with S3Object (imported from wmill):
import wmill
from wmill import S3Object
def main(file: S3Object):
content = wmill.load_s3_file(file)
# ...
import wmill
# Load file content from S3
content: bytes = wmill.load_s3_file(s3object)
# Load file as stream reader
reader: BufferedReader = wmill.load_s3_file_reader(s3object)
# Write file to S3
result: S3Object = wmill.write_s3_file(
s3object, # Target path (or None to auto-generate)
file_content, # bytes or BufferedReader
s3_resource_path, # Optional: specific S3 resource
content_type, # Optional: MIME type
content_disposition # Optional: Content-Disposition header
)
Import: import wmill
def worker_has_internal_server() -> bool
def get_mocked_api() -> Optional[dict]
def get_client() -> httpx.Client
def get(endpoint, raise_for_status = True, **kwargs) -> httpx.Response
def post(endpoint, raise_for_status = True, **kwargs) -> httpx.Response
def create_token(duration = dt.timedelta(days=1)) -> str
def run_script_async(path: str = None, hash_: str = None, args: dict = None, scheduled_in_secs: int = None) -> str
def run_script_by_path_async(path: str, args: dict = None, scheduled_in_secs: int = None) -> str
def run_script_by_hash_async(hash_: str, args: dict = None, scheduled_in_secs: int = None) -> str
def run_flow_async(path: str, args: dict = None, scheduled_in_secs: int = None, do_not_track_in_parent: bool = True) -> str
def run_script(path: str = None, hash_: str = None, args: dict = None, timeout: dt.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False) -> Any
def run_script_by_path(path: str, args: dict = None, timeout: dt.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False) -> Any
def run_script_by_hash(hash_: str, args: dict = None, timeout: dt.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False) -> Any
def run_inline_script_preview(content: str, language: str, args: dict = None) -> Any
def wait_job(job_id, timeout: dt.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False)
def cancel_job(job_id: str, reason: str = None) -> str
def cancel_running() -> dict
def get_job(job_id: str) -> dict
def get_root_job_id(job_id: str | None = None) -> dict
def get_id_token(audience: str, expires_in: int | None = None) -> str
def get_job_status(job_id: str) -> JobStatus
def get_result(job_id: str, assert_result_is_not_none: bool = True) -> Any
def get_variable(path: str) -> str
def set_variable(path: str, value: str, is_secret: bool = False) -> None
def get_resource(path: str, none_if_undefined: bool = False, interpolated: bool = True) -> dict | None
def set_resource(value: Any, path: str, resource_type: str)
def list_resources(resource_type: str = None, page: int = None, per_page: int = None) -> list[dict]
def set_state(value: Any, path: str | None = None) -> None
def get_state(path: str | None = None) -> Any
def set_progress(value: int, job_id: Optional[str] = None)
def get_progress(job_id: Optional[str] = None) -> Any
def set_flow_user_state(key: str, value: Any) -> None
def get_flow_user_state(key: str) -> Any
def version()
def get_duckdb_connection_settings(s3_resource_path: str = '') -> DuckDbConnectionSettings | None
def get_polars_connection_settings(s3_resource_path: str = '') -> PolarsConnectionSettings
def get_boto3_connection_settings(s3_resource_path: str = '') -> Boto3ConnectionSettings
def load_s3_file(s3object: S3Object | str, s3_resource_path: str | None) -> bytes
def load_s3_file_reader(s3object: S3Object | str, s3_resource_path: str | None) -> BufferedReader
def write_s3_file(s3object: S3Object | str | None, file_content: BufferedReader | bytes, s3_resource_path: str | None, content_type: str | None = None, content_disposition: str | None = None) -> S3Object
def delete_s3_object(s3object: S3Object | str, s3_resource_path: str | None = None) -> None
def sign_s3_objects(s3_objects: list[S3Object | str]) -> list[S3Object]
def sign_s3_object(s3_object: S3Object | str) -> S3Object
def get_presigned_s3_public_urls(s3_objects: list[S3Object | str], base_url: str | None = None) -> list[str]
def get_presigned_s3_public_url(s3_object: S3Object | str, base_url: str | None = None) -> str
def whoami() -> dict
def user() -> dict
def state_path() -> str
def state() -> Any
def set_shared_state_pickle(value: Any, path: str = 'state.pickle') -> None
def get_shared_state_pickle(path: str = 'state.pickle') -> Any
def set_shared_state(value: Any, path: str = 'state.json') -> None
def get_shared_state(path: str = 'state.json') -> None
def get_resume_urls(approver: str = None, flow_level: bool = None) -> dict
WM_FLOW_JOB_ID, WM_FLOW_STEP_ID) to ensure it is run in the appropriate context.def request_interactive_slack_approval(slack_resource_path: str, channel_id: str, message: str = None, approver: str = None, default_args_json: dict = None, dynamic_enums_json: dict = None) -> None
def username_to_email(username: str) -> str
def send_teams_message(conversation_id: str, text: str, success: bool = True, card_block: dict = None)
def datatable(name: str = 'main')
def ducklake(name: str = 'main')
def init_global_client(f)
def deprecate(in_favor_of: str)
def get_workspace() -> str
def get_version() -> str
def run_script_sync(hash: str, args: Dict[str, Any] = None, verbose: bool = False, assert_result_is_not_none: bool = True, cleanup: bool = True, timeout: dt.timedelta = None) -> Any
def run_script_by_path_sync(path: str, args: Dict[str, Any] = None, verbose: bool = False, assert_result_is_not_none: bool = True, cleanup: bool = True, timeout: dt.timedelta = None) -> Any
def duckdb_connection_settings(s3_resource_path: str = '') -> DuckDbConnectionSettings
def polars_connection_settings(s3_resource_path: str = '') -> PolarsConnectionSettings
def boto3_connection_settings(s3_resource_path: str = '') -> Boto3ConnectionSettings
def get_state_path() -> str
def parse_resource_syntax(s: str) -> Optional[str]
def parse_s3_object(s3_object: S3Object | str) -> S3Object
def parse_variable_syntax(s: str) -> Optional[str]
def append_to_result_stream(text: str) -> None
def stream_result(stream) -> None
def query(sql: str, *args) -> SqlQuery
select_sql into ducklaketable for one partition (or the whole table when partition is// materialize engine: withunique_key it upserts within the slice (delete-by-key + insert);select_sql is trusted (your own query).def upsert_partition(table: str, select_sql: str, partition: str = None, unique_key: str = None, partition_col: str = '_wm_partition', schema: str = None)
partition, or the whole table whenpartition is None. NOTE: unlike upsert_partition, re-running the samedef append_partition(table: str, select_sql: str, partition: str = None, partition_col: str = '_wm_partition', schema: str = None)
def read(table: str, partition: str = None, partition_col: str = '_wm_partition', schema: str = None)
def fetch(result_collection: str | None = None)
def fetch_one()
def fetch_one_scalar()
def execute()
def infer_sql_type(value) -> str
def parse_sql_client_name(name: str) -> tuple[str, Optional[str]]
def task(_func = None, path: Optional[str] = None, tag: Optional[str] = None, timeout: Optional[int] = None, cache_ttl: Optional[int] = None, priority: Optional[int] = None, concurrency_limit: Optional[int] = None, concurrency_key: Optional[str] = None, concurrency_time_window_s: Optional[int] = None)
def task_script(path: str, timeout: Optional[int] = None, tag: Optional[str] = None, cache_ttl: Optional[int] = None, priority: Optional[int] = None, concurrency_limit: Optional[int] = None, concurrency_key: Optional[str] = None, concurrency_time_window_s: Optional[int] = None)
def task_flow(path: str, timeout: Optional[int] = None, tag: Optional[str] = None, cache_ttl: Optional[int] = None, priority: Optional[int] = None, concurrency_limit: Optional[int] = None, concurrency_key: Optional[str] = None, concurrency_time_window_s: Optional[int] = None)
step() todef workflow(func)
fn inline and checkpoint the result.fn.async def step(name: str, fn)
seconds.asyncio.sleep.async def sleep(seconds: int)
get_resume_urls() (wrapped in step()) to obtainvalue (form data), approver, and approved.async def wait_for_approval(timeout: int = 1800, form: dict | None = None, self_approval: bool = True) -> dict
fn(item), which should be a @task.concurrency (default: all at once).async def parallel(items, fn, concurrency: Optional[int] = None)
def commit_kafka_offsets(trigger_path: str, topic: str, partition: int, offset: int) -> None