mit einem Klick
databricks-python-sdk
// Databricks development guidance including Python SDK, Databricks Connect, CLI, and REST API. Use when working with databricks-sdk, databricks-connect, or Databricks APIs.
// Databricks development guidance including Python SDK, Databricks Connect, CLI, and REST API. Use when working with databricks-sdk, databricks-connect, or Databricks APIs.
| name | databricks-python-sdk |
| description | Databricks development guidance including Python SDK, Databricks Connect, CLI, and REST API. Use when working with databricks-sdk, databricks-connect, or Databricks APIs. |
This skill provides guidance for Databricks SDK, Databricks Connect, CLI, and REST API.
SDK Documentation: https://databricks-sdk-py.readthedocs.io/en/latest/ GitHub Repository: https://github.com/databricks/databricks-sdk-py
.venv or use uv to create oneuv pip install databricks-connectuv pip install databricks-sdkDEFAULT~/.databrickscfgDATABRICKS_HOST, DATABRICKS_TOKENUse databricks-connect for running Spark code locally against a Databricks cluster.
from databricks.connect import DatabricksSession
# Auto-detects 'DEFAULT' profile from ~/.databrickscfg
spark = DatabricksSession.builder.getOrCreate()
# With explicit profile
spark = DatabricksSession.builder.profile("MY_PROFILE").getOrCreate()
# Use spark as normal
df = spark.sql("SELECT * FROM catalog.schema.table")
df.show()
IMPORTANT: Do NOT set .master("local[*]") - this will cause issues with Databricks Connect.
For operations not yet in SDK or overly complex via SDK, use direct REST API:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Direct API call using authenticated client
response = w.api_client.do(
method="GET",
path="/api/2.0/clusters/list"
)
# POST with body
response = w.api_client.do(
method="POST",
path="/api/2.0/jobs/run-now",
body={"job_id": 123}
)
When to use: Prefer SDK methods when available. Use api_client.do for:
# Check version (should be >= 0.278.0)
databricks --version
# Use specific profile
databricks --profile MY_PROFILE clusters list
# Common commands
databricks clusters list
databricks jobs list
databricks workspace list /Users/me
For a per-API table of method signatures and direct doc URLs (Clusters, Jobs, SQL warehouses, Unity Catalog, Serving, Vector Search, etc.), see references/doc-index.md. Worked examples for the main API surfaces live under examples/.
The SDK documentation follows a predictable URL pattern:
Base: https://databricks-sdk-py.readthedocs.io/en/latest/
Workspace APIs: /workspace/{category}/{service}.html
Account APIs: /account/{category}/{service}.html
Authentication: /authentication.html
DBUtils: /dbutils.html
| Category | Services |
|---|---|
compute | clusters, cluster_policies, command_execution, instance_pools, libraries |
catalog | catalogs, schemas, tables, volumes, functions, storage_credentials, external_locations |
jobs | jobs |
sql | warehouses, statement_execution, queries, alerts, dashboards |
serving | serving_endpoints |
vectorsearch | vector_search_indexes, vector_search_endpoints |
pipelines | pipelines |
workspace | repos, secrets, workspace, git_credentials |
files | files, dbfs |
ml | experiments, model_registry |
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/authentication.html
DATABRICKS_HOST=https://your-workspace.cloud.databricks.com
DATABRICKS_TOKEN=dapi... # Personal Access Token
# Auto-detect credentials from environment
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Explicit token auth
w = WorkspaceClient(
host="https://your-workspace.cloud.databricks.com",
token="dapi..."
)
# Azure Service Principal
w = WorkspaceClient(
host="https://adb-xxx.azuredatabricks.net",
azure_workspace_resource_id="/subscriptions/.../resourceGroups/.../providers/Microsoft.Databricks/workspaces/...",
azure_tenant_id="tenant-id",
azure_client_id="client-id",
azure_client_secret="secret"
)
# Use a named profile from ~/.databrickscfg
w = WorkspaceClient(profile="MY_PROFILE")
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/compute/clusters.html
# List all clusters
for cluster in w.clusters.list():
print(f"{cluster.cluster_name}: {cluster.state}")
# Get cluster details
cluster = w.clusters.get(cluster_id="0123-456789-abcdef")
# Create a cluster (returns Wait object)
wait = w.clusters.create(
cluster_name="my-cluster",
spark_version=w.clusters.select_spark_version(latest=True),
node_type_id=w.clusters.select_node_type(local_disk=True),
num_workers=2
)
cluster = wait.result() # Wait for cluster to be running
# Or use create_and_wait for blocking call
cluster = w.clusters.create_and_wait(
cluster_name="my-cluster",
spark_version="14.3.x-scala2.12",
node_type_id="i3.xlarge",
num_workers=2,
timeout=timedelta(minutes=30)
)
# Start/stop/delete
w.clusters.start(cluster_id="...").result()
w.clusters.stop(cluster_id="...")
w.clusters.delete(cluster_id="...")
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/jobs/jobs.html
from databricks.sdk.service.jobs import Task, NotebookTask
# List jobs
for job in w.jobs.list():
print(f"{job.job_id}: {job.settings.name}")
# Create a job
created = w.jobs.create(
name="my-job",
tasks=[
Task(
task_key="main",
notebook_task=NotebookTask(notebook_path="/Users/me/notebook"),
existing_cluster_id="0123-456789-abcdef"
)
]
)
# Run a job now
run = w.jobs.run_now_and_wait(job_id=created.job_id)
print(f"Run completed: {run.state.result_state}")
# Get run output
output = w.jobs.get_run_output(run_id=run.run_id)
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/sql/statement_execution.html
# Execute SQL query
response = w.statement_execution.execute_statement(
warehouse_id="abc123",
statement="SELECT * FROM catalog.schema.table LIMIT 10",
wait_timeout="30s"
)
# Check status and get results
if response.status.state == StatementState.SUCCEEDED:
for row in response.result.data_array:
print(row)
# For large results, fetch chunks
chunk = w.statement_execution.get_statement_result_chunk_n(
statement_id=response.statement_id,
chunk_index=0
)
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/sql/warehouses.html
# List warehouses
for wh in w.warehouses.list():
print(f"{wh.name}: {wh.state}")
# Get warehouse
warehouse = w.warehouses.get(id="abc123")
# Create warehouse
created = w.warehouses.create_and_wait(
name="my-warehouse",
cluster_size="Small",
max_num_clusters=1,
auto_stop_mins=15
)
# Start/stop
w.warehouses.start(id="abc123").result()
w.warehouses.stop(id="abc123").result()
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/tables.html
# List tables in a schema
for table in w.tables.list(catalog_name="main", schema_name="default"):
print(f"{table.full_name}: {table.table_type}")
# Get table info
table = w.tables.get(full_name="main.default.my_table")
print(f"Columns: {[c.name for c in table.columns]}")
# Check if table exists
exists = w.tables.exists(full_name="main.default.my_table")
Doc (Catalogs): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/catalogs.html Doc (Schemas): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/schemas.html
# List catalogs
for catalog in w.catalogs.list():
print(catalog.name)
# Create catalog
w.catalogs.create(name="my_catalog", comment="Description")
# List schemas
for schema in w.schemas.list(catalog_name="main"):
print(schema.name)
# Create schema
w.schemas.create(name="my_schema", catalog_name="main")
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/volumes.html
from databricks.sdk.service.catalog import VolumeType
# List volumes
for vol in w.volumes.list(catalog_name="main", schema_name="default"):
print(f"{vol.full_name}: {vol.volume_type}")
# Create managed volume
w.volumes.create(
catalog_name="main",
schema_name="default",
name="my_volume",
volume_type=VolumeType.MANAGED
)
# Read volume info
vol = w.volumes.read(name="main.default.my_volume")
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/files/files.html
# Upload file to volume
w.files.upload(
file_path="/Volumes/main/default/my_volume/data.csv",
contents=open("local_file.csv", "rb")
)
# Download file
with w.files.download(file_path="/Volumes/main/default/my_volume/data.csv") as f:
content = f.read()
# List directory contents
for entry in w.files.list_directory_contents("/Volumes/main/default/my_volume/"):
print(f"{entry.name}: {entry.is_directory}")
# Upload/download with progress (parallel)
w.files.upload_from(
file_path="/Volumes/main/default/my_volume/large.parquet",
source_path="/local/path/large.parquet",
use_parallel=True
)
w.files.download_to(
file_path="/Volumes/main/default/my_volume/large.parquet",
destination="/local/output/",
use_parallel=True
)
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/serving/serving_endpoints.html
# List endpoints
for ep in w.serving_endpoints.list():
print(f"{ep.name}: {ep.state}")
# Get endpoint
endpoint = w.serving_endpoints.get(name="my-endpoint")
# Query endpoint
response = w.serving_endpoints.query(
name="my-endpoint",
inputs={"prompt": "Hello, world!"}
)
# For chat/completions endpoints
response = w.serving_endpoints.query(
name="my-chat-endpoint",
messages=[{"role": "user", "content": "Hello!"}]
)
# Get OpenAI-compatible client
openai_client = w.serving_endpoints.get_open_ai_client()
Doc (Indexes): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/vectorsearch/vector_search_indexes.html Doc (Endpoints): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/vectorsearch/vector_search_endpoints.html
# List vector search indexes
for idx in w.vector_search_indexes.list_indexes(endpoint_name="my-vs-endpoint"):
print(idx.name)
# Query index
results = w.vector_search_indexes.query_index(
index_name="main.default.my_index",
columns=["id", "text", "embedding"],
query_text="search query",
num_results=10
)
for doc in results.result.data_array:
print(doc)
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/pipelines/pipelines.html
# List pipelines
for pipeline in w.pipelines.list_pipelines():
print(f"{pipeline.name}: {pipeline.state}")
# Get pipeline
pipeline = w.pipelines.get(pipeline_id="abc123")
# Start pipeline update
w.pipelines.start_update(pipeline_id="abc123")
# Stop pipeline
w.pipelines.stop_and_wait(pipeline_id="abc123")
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/workspace/secrets.html
# List secret scopes
for scope in w.secrets.list_scopes():
print(scope.name)
# Create scope
w.secrets.create_scope(scope="my-scope")
# Put secret
w.secrets.put_secret(scope="my-scope", key="api-key", string_value="secret123")
# Get secret (returns GetSecretResponse with value)
secret = w.secrets.get_secret(scope="my-scope", key="api-key")
# List secrets in scope (metadata only, not values)
for s in w.secrets.list_secrets(scope="my-scope"):
print(s.key)
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/dbutils.html
# Access dbutils through WorkspaceClient
dbutils = w.dbutils
# File system operations
files = dbutils.fs.ls("/")
dbutils.fs.cp("dbfs:/source", "dbfs:/dest")
dbutils.fs.rm("dbfs:/path", recurse=True)
# Secrets (same as w.secrets but dbutils interface)
value = dbutils.secrets.get(scope="my-scope", key="my-key")
The Databricks SDK is fully synchronous. All calls block the thread. In async applications (FastAPI, asyncio), you MUST wrap SDK calls with asyncio.to_thread() to avoid blocking the event loop.
import asyncio
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# WRONG - blocks the event loop
async def get_clusters_bad():
return list(w.clusters.list()) # BLOCKS!
# CORRECT - runs in thread pool
async def get_clusters_good():
return await asyncio.to_thread(lambda: list(w.clusters.list()))
# CORRECT - for simple calls
async def get_cluster(cluster_id: str):
return await asyncio.to_thread(w.clusters.get, cluster_id)
# CORRECT - FastAPI endpoint
from fastapi import FastAPI
app = FastAPI()
@app.get("/clusters")
async def list_clusters():
clusters = await asyncio.to_thread(lambda: list(w.clusters.list()))
return [{"id": c.cluster_id, "name": c.cluster_name} for c in clusters]
@app.post("/query")
async def run_query(sql: str, warehouse_id: str):
# Wrap the blocking SDK call
response = await asyncio.to_thread(
w.statement_execution.execute_statement,
statement=sql,
warehouse_id=warehouse_id,
wait_timeout="30s"
)
return response.result.data_array
Note: WorkspaceClient().config.host is NOT a network call - it just reads config. No need to wrap property access.
from datetime import timedelta
# Pattern 1: Use *_and_wait methods
cluster = w.clusters.create_and_wait(
cluster_name="test",
spark_version="14.3.x-scala2.12",
node_type_id="i3.xlarge",
num_workers=2,
timeout=timedelta(minutes=30)
)
# Pattern 2: Use Wait object
wait = w.clusters.create(...)
cluster = wait.result() # Blocks until ready
# Pattern 3: Manual polling with callback
def progress(cluster):
print(f"State: {cluster.state}")
cluster = w.clusters.wait_get_cluster_running(
cluster_id="...",
timeout=timedelta(minutes=30),
callback=progress
)
# All list methods return iterators that handle pagination automatically
for job in w.jobs.list(): # Fetches all pages
print(job.settings.name)
# For manual control
from databricks.sdk.service.jobs import ListJobsRequest
response = w.jobs.list(limit=10)
for job in response:
print(job)
from databricks.sdk.errors import NotFound, PermissionDenied, ResourceAlreadyExists
try:
cluster = w.clusters.get(cluster_id="invalid-id")
except NotFound:
print("Cluster not found")
except PermissionDenied:
print("Access denied")
If I'm unsure about a method, I should:
Check the documentation URL pattern:
https://databricks-sdk-py.readthedocs.io/en/latest/workspace/{category}/{service}.htmlCommon categories:
/workspace/compute/clusters.html/workspace/jobs/jobs.html/workspace/catalog/tables.html/workspace/sql/warehouses.html/workspace/serving/serving_endpoints.htmlFetch and verify before providing guidance on parameters or return types.
Builds Databricks applications. Prefers AppKit (TypeScript + React SDK) for new apps; falls back to Python frameworks (Dash, Streamlit, Gradio, Flask, FastAPI, Reflex) when Python is required. Handles OAuth authorization, app resources, SQL warehouse and Lakebase connectivity, model serving, foundation model APIs, and deployment. Use when building web apps, dashboards, ML demos, or REST APIs for Databricks, or when the user mentions AppKit, Streamlit, Dash, Gradio, Flask, FastAPI, Reflex, or Databricks app.
Create, configure, validate, deploy, run, and manage Declarative Automation Bundles (DABs, formerly Databricks Asset Bundles). Use when working with Databricks resources via DABs including dashboards, jobs, pipelines, alerts, volumes, and apps.
Migrate Databricks workloads from classic compute to serverless compute. Use when migrating from classic to serverless, checking serverless code compatibility, or writing new serverless-compatible notebooks and jobs. Scans code for compatibility issues, provides concrete fixes for the serverless Spark Connect architecture, and guides the full migration. Not for classic DBR version upgrades or cluster configuration changes within classic compute.
MLflow 3 GenAI agent evaluation. Use when writing mlflow.genai.evaluate() code, creating @scorer functions, using built-in scorers (Guidelines, Correctness, Safety, RetrievalGroundedness), building eval datasets from traces, setting up trace ingestion and production monitoring, aligning judges with MemAlign from domain expert feedback, or running optimize_prompts() with GEPA for automated prompt improvement.
Create Agent Bricks: Knowledge Assistants (KA) for document Q&A and Supervisor Agents for multi-agent orchestration (MAS).
Use Databricks built-in AI Functions (ai_classify, ai_extract, ai_summarize, ai_mask, ai_translate, ai_fix_grammar, ai_gen, ai_analyze_sentiment, ai_similarity, ai_parse_document, ai_query, ai_forecast) to add AI capabilities directly to SQL and PySpark pipelines without managing model endpoints. Also covers document parsing and building custom RAG pipelines (parse → chunk → index → query).