| name | frappe-tweaks-async-tasks |
| description | Expert guidance for enqueueing and managing Async Tasks in Frappe Tweaks. Use when working with enqueue_async_task, enqueue_safe_async_task, bulk_enqueue_async_task, bulk_enqueue_safe_async_task, toggle_dispatcher, Async Task Log, Async Task Type, implementing concurrency limits, per-method priority ordering, task cancellation, batch/bulk task submission, document action tasks (document_type/document_name/document_action), dispatcher control, auto-retry on failure (max_retries, retry_delay, retry_count), or choosing between Async Tasks and standard frappe.enqueue background jobs. |
Async Tasks Expert
Expert guidance for the Frappe Tweaks Async Task system — a managed, observable alternative to raw frappe.enqueue.
When to Use Async Tasks vs frappe.enqueue
| Concern | frappe.enqueue | Async Tasks |
|---|
| Observability | No built-in log | Full log: status, timing, errors, memory |
| Concurrency control | None | Per-method concurrency_limit |
| Priority ordering | None | Per-method priority + per-task at_front |
| Cancellation | Cannot cancel | Cancel from UI or code |
| Server Script support | No | Yes (call_whitelisted_function=True) |
| Deduplication | Optional (job_id) | Via dispatcher deduplication |
| Auto-retry on failure | No | Yes (max_retries + retry_delay) |
Use Async Tasks when any of the following apply:
- You need to see task status, errors, or execution time in the UI
- Multiple agents/jobs can enqueue the same method and you need a cap on concurrency
- You want tasks to queue up and respect a priority order rather than flood workers
- The method is a whitelisted function or Server Script
- You need to be able to cancel tasks programmatically
- You want failed tasks to be automatically retried after a configurable delay
Stick with frappe.enqueue when:
- The job is fire-and-forget with no UI visibility required
- You are inside a framework hook that already handles retries/logging (e.g., Sync Jobs)
Public API
from tweaks.tweaks.doctype.async_task_log.async_task_log import (
enqueue_async_task,
enqueue_safe_async_task,
bulk_enqueue_async_task,
bulk_enqueue_safe_async_task,
)
from tweaks.tweaks.doctype.async_task_log.async_task_log_dispatch import (
toggle_dispatcher,
can_dispatch_now,
)
enqueue_async_task
task = enqueue_async_task(
method=None,
queue="default",
timeout=300,
job_name=None,
document_type=None,
document_name=None,
document_action=None,
max_retries=0,
retry_delay=None,
at_front=False,
call_whitelisted_function=False,
batch_id=None,
batch_order=None,
arguments=None,
**kwargs,
)
Rules:
- Pass either
method or all three of document_type + document_name + document_action. Passing neither raises ValueError.
- When the document fields are used,
method is automatically derived as the doctype controller dotted path + ".{action}" (e.g. "erpnext.accounts.doctype.sales_invoice.sales_invoice.submit").
job_name is an optional human-readable label stored as the task title. When omitted, it is set to method automatically in before_insert.
- Inner function resolution is applied at execution time: if the document controller defines
_submit, it is called instead of submit (mirroring Document.queue_action).
- Use
arguments={"queue": "short"} (not queue= in **kwargs) whenever a method argument name collides with an enqueue_async_task API parameter — arguments takes priority over **kwargs and its keys are never intercepted by the function signature.
max_retries=0 (the default) disables automatic retries. Set to a positive integer to enable them.
retry_delay is the number of seconds to wait (measured from the task's modified timestamp on failure) before the next retry attempt. None or 0 means retry on the next dispatch cycle.
enqueue_safe_async_task
Shorthand for enqueue_async_task(..., call_whitelisted_function=True). Use when calling whitelisted functions or Server Scripts by name. Accepts the same job_name, max_retries and retry_delay parameters.
task = enqueue_safe_async_task(
"myapp.api.sync_customer",
queue="short",
customer_id="CUST-0001",
)
bulk_enqueue_async_task
Create multiple Async Task Log documents in one call and dispatch them together as a single ordered batch.
tasks = bulk_enqueue_async_task(
tasks=[
{"method": "myapp.utils.process_invoice", "invoice_name": "INV-001"},
{"method": "myapp.utils.process_invoice", "invoice_name": "INV-002", "at_front": True},
],
batch_id="my-import-run-abc",
queue="short",
)
How it works:
- If
batch_id is not given, a random UUID is assigned so all tasks share the same batch.
- Each task's
batch_order is set sequentially (0, 1, 2 …) in insertion order.
- The dispatcher is internally suspended while inserting to prevent partial dispatches, then resumed atomically once all documents are committed.
- A single
dispatch_async_tasks job is enqueued at the end, respecting concurrency limits for all inserted tasks.
Key behaviour:
- Extra
**kwargs are merged into every task dict (useful for shared fields like queue or timeout).
- Individual task dicts can still override merged values before calling
enqueue_async_task.
- Returns the
batch_id string (auto-generated if not provided); use it to query Async Task Log by batch_id to track completion.
bulk_enqueue_safe_async_task
Shorthand for bulk_enqueue_async_task(..., call_whitelisted_function=True). Use when each method is a whitelisted function or Server Script name.
bulk_enqueue_safe_async_task(
tasks=[
{"method": "myapp.api.sync_customer", "customer_id": "CUST-0001"},
{"method": "myapp.api.sync_customer", "customer_id": "CUST-0002"},
],
queue="short",
)
Passing a Callable
from myapp.utils import process_invoice
task = enqueue_async_task(process_invoice, invoice_name="INV-001")
Document Action Shorthand
Use document_type + document_name + document_action instead of method when you want to call a method on a specific document in the background. This is the async-task equivalent of doc.queue_action().
task = enqueue_async_task(
document_type="Sales Invoice",
document_name="SINV-0042",
document_action="submit",
queue="long",
)
task = enqueue_async_task(
document_type="My Doctype",
document_name="MY-001",
document_action="my_custom_method",
some_arg="value",
)
Execution behaviour:
frappe.get_doc(document_type, document_name) is called inside the worker.
doc.unlock() is called to release any file lock left from the caller.
- Inner function resolution: if
doc._submit exists and document_action="submit", _submit is called instead.
getattr(doc, action)(**kwargs) is invoked with any extra kwargs.
Async Task Type concurrency is keyed on the derived method string, so you can set concurrency_limit on "erpnext.accounts.doctype.sales_invoice.sales_invoice.submit" as usual.
Async Task Type (optional configuration)
Create an Async Task Type document with method matching the dotted path to configure:
| Field | Default | Effect |
|---|
priority | 0 | Higher = dispatched first among Pending tasks |
concurrency_limit | 0 (unlimited) | Max simultaneous Queued/Started tasks for this method |
is_standard | False | Protect from accidental deletion in production |
Creating via code (fixtures/patches):
frappe.get_doc({
"doctype": "Async Task Type",
"method": "myapp.utils.heavy_import",
"priority": 10,
"concurrency_limit": 2,
"is_standard": 1,
}).insert(ignore_if_duplicate=True)
Status Lifecycle
Pending → Queued → Started → Finished
↘ Failed ──(auto-retry)──→ Pending
↘ Canceled (from Pending, Queued, or Started)
- Pending: Created, waiting for dispatcher to promote it.
- Queued: Pushed to RQ, waiting for a worker.
- Started: Worker picked it up.
- Finished / Failed: Terminal states.
error_message populated on failure.
- Canceled: Manually canceled; RQ job stopped if already Queued/Started.
- Auto-retry: When a task has
max_retries > 0 and retry_count < max_retries, the dispatcher automatically resets it to Pending after the retry_delay has elapsed since the last failure. retry_count is incremented each time a retry is triggered.
A realtime event async_task_status is published to the creating user on every status transition (Queued, Started, Finished, Failed, Canceled). Use frappe.async_tasks.show_progress (see JavaScript API below) for the idiomatic high-level approach. For raw access:
frappe.realtime.on("async_task_status", ({ name, job_name, status, message, error, batch_id, batch_done, batch_total }) => { ... })
You can also push a custom message alongside a status update by calling notify_status() directly:
task.notify_status(message="Processing row 42 of 100...")
To send a progress notification from inside the executing method (where you don't have the task document), use the notify_task_status utility:
from tweaks.tweaks.doctype.async_task_log.async_task_log import notify_task_status
def my_long_running_job(items):
for i, item in enumerate(items):
process(item)
notify_task_status(message=f"Processed {i + 1} of {len(items)}")
notify_task_status resolves the current RQ job, looks up the matching Async Task Log, and calls notify_status() on it. It is a no-op when called outside a worker context.
Structured progress messages
Pass a dict with a progress key to emit granular progress information. The frappe.async_tasks.show_progress handler understands this format and will drive the progress bar with exact count/total values instead of the default status-based percentages.
notify_task_status(message={
"progress": {
"count": i + 1,
"total": len(items),
"description": f"Processing item {i + 1} of {len(items)}…",
}
})
All three fields (count, total, description) are optional — include only the ones you need. When any field is absent the JS handler falls back to the default step-based value for that part of the progress bar.
JavaScript API
A thin JS namespace frappe.async_tasks is available on every desk page, provided by tweaks/public/js/tweaks/async_tasks.js.
frappe.async_tasks.show_progress(name, title?, handler?, hide_on_completion?)
High-level wrapper around frappe.realtime.on("async_task_status", …) that drives a frappe.show_progress bar for a given task. Use this instead of wiring the raw realtime event by hand.
| Parameter | Type | Required | Description |
|---|
name | string | Yes | Async Task Log document name to track. |
title | string | No | Progress bar title. Defaults to "Task {name}". |
handler | function | No | Optional callback invoked on every status event: ({ name, status, message, error, job_name }) => void. |
hide_on_completion | boolean | No | Whether to hide/close the bar on terminal status. Default true. |
Behaviour:
- Immediately shows the progress bar at 10 % ("Pending").
- Registers the
async_task_status realtime listener so no transition event is missed.
- On Failed, calls
frappe.throw(error) to surface the error message in a dialog.
- The realtime listener is automatically deregistered once a terminal status (
Finished, Failed, Canceled) is received.
- When
message is an object with a progress property, the handler reads progress.count, progress.total, and progress.description to drive the progress bar with exact values. Any missing field falls back to the default step-based value.
Important: handler is called on every matching event (Pending, Queued, Started, Finished, Failed, Canceled), not only on terminal ones. If your handler should only fire once (e.g. to transition to a second phase), guard it with a status check.
frappe.call({
method: "myapp.api.start_import",
callback(r) {
frappe.async_tasks.show_progress(
r.message,
__("Importing data"),
({ status }) => {
if (status === "Finished") frappe.show_alert(__("Import complete!"))
}
)
},
})
Server-side structured progress (drives the bar with exact count/total):
from tweaks.tweaks.doctype.async_task_log.async_task_log import notify_task_status
def my_long_running_job(items):
for i, item in enumerate(items):
process(item)
notify_task_status(message={
"progress": {
"count": i + 1,
"total": len(items),
"description": f"Processing item {i + 1} of {len(items)}…",
}
})
frappe.async_tasks.show_batch_progress(batch_id, title?, handler?, hide_on_completion?)
Pure batch progress tracker. Shows a frappe.show_progress bar that counts individual task completions using server-authoritative Redis counters embedded in every async_task_status event. The client owns no counter state — missed events are harmless.
| Parameter | Type | Required | Description |
|---|
batch_id | string | Yes | Batch identifier returned by the server. |
title | string | No | Progress bar title. Defaults to "Processing…". |
handler | function | No | Called on every matching event: ({ batch_id, status, message, batch_done, batch_total, job_name }) => void. |
hide_on_completion | boolean | No | Whether to hide/close the bar on completion. Default true. |
Behaviour:
- Immediately shows an indeterminate bar (
0 / 100, "Starting…") while waiting for the first event.
- Listens for
async_task_status events; filters by batch_id and drops events where batch_total == null (tasks arrived before Redis keys were written).
- Each event carries
batch_done and batch_total from Redis (set atomically by the server). The default bar label is "{job_name}: {status_label}" (or just "{status_label}" when no job_name). Structured msg.progress overrides count/total/description as usual.
handler is called on every matching event (Pending, Queued, Started, Finished, Failed, Canceled). Guard with status === 'Finished' (or a count check) if you only want to act once on completion.
- The realtime listener is deregistered when a terminal-status event brings
batch_done >= batch_total.
- Does not handle the coordinator task — that is the caller's responsibility (see below).
Usage:
frappe.async_tasks.show_batch_progress(
batch_id,
__("Creating records"),
({ status, batch_done, batch_total }) => {
if (status !== 'Finished' || batch_done < batch_total) return
frappe.show_alert({
message: __("{0} of {1} created", [batch_done, batch_total]),
indicator: "green",
})
listview.refresh()
},
)
Two-phase coordinator → batch pattern
When a coordinator task resolves a list of items and then calls bulk_enqueue_async_task to fan out, orchestrate the two phases in the caller:
frappe.call({
method: "myapp.api.bulk_create",
callback(r) {
const { batch_id, coordinator_task_name } = r.message
const onBatchComplete = ({ status, batch_done, batch_total }) => {
if (status !== 'Finished' || batch_done < batch_total) return
frappe.show_alert({
message: __("{0} of {1} created", [batch_done, batch_total]),
indicator: "green",
})
listview.refresh()
}
frappe.async_tasks.show_progress(
coordinator_task_name,
__("Creating records"),
({ status }) => {
if (status === "Finished")
frappe.async_tasks.show_batch_progress(batch_id, __("Creating records"), onBatchComplete)
},
false,
)
},
})
Key point: Guard the show_batch_progress call with status === "Finished" because handler in show_progress is called on every event. Without the guard, show_batch_progress would be registered once per event (Pending, Queued, Started, Finished), leading to duplicate completion callbacks.
Server-side coordinator pattern:
import uuid
from tweaks.tweaks.doctype.async_task_log.async_task_log import (
enqueue_async_task,
bulk_enqueue_async_task,
notify_task_status,
)
@frappe.whitelist()
def bulk_create(items):
batch_id = str(uuid.uuid4())
coordinator = enqueue_async_task(
"myapp.api._coordinator",
queue="short",
job_name="Bulk Create",
arguments={"items": items, "batch_id": batch_id},
)
return {"batch_id": batch_id, "coordinator_task_name": coordinator.name}
def _coordinator(items, batch_id):
notify_task_status(message=f"Resolved {len(items)} items. Enqueueing…")
tasks = [{"method": "myapp.api.process_item", "batch_id": batch_id, "arguments": {"item": i}} for i in items]
bulk_enqueue_async_task(tasks, batch_id=batch_id)
Cancellation
task = frappe.get_doc("Async Task Log", task_name)
task.cancel()
Retry
Failed tasks can be retried manually or automatically.
Manual retry
task = frappe.get_doc("Async Task Log", task_name)
task.retry()
task.retry(now=True)
retry() raises if called on a non-Failed/non-Canceled task.
Automatic retry
Set max_retries (and optionally retry_delay) when creating the task:
task = enqueue_async_task(
"myapp.utils.sync_customer",
max_retries=3,
retry_delay=60,
customer_id="CUST-001",
)
The dispatcher calls retry_failed_tasks() at the start of every dispatch pass. It queries all Failed tasks where max_retries > 0 and retry_count < max_retries, then resets those whose retry_delay has elapsed since their modified timestamp. Each retry increments retry_count. When retry_count reaches max_retries the task stays Failed and is no longer picked up automatically.
Key points:
retry_count tracks how many automatic (or manual) retries have been triggered, not how many failures occurred.
retry_delay=None (or 0) means retry on the very next dispatch cycle.
- Per-task errors in
retry_failed_tasks are logged and do not block retries of other tasks.
Observability
Each Async Task Log document stores:
job_name — human-readable label used as the document title (defaults to method)
started_at, ended_at, time_taken
peak_memory_usage (RSS, KB)
error_message with full traceback on failure
debug_log if frappe.debug_log is populated
job_id linking to the underlying RQ Job
document_type, document_name, document_action — persisted when created via the document action shorthand; used at execution time to re-fetch the document and call the action
max_retries, retry_delay, retry_count — retry configuration and current retry counter
Dispatcher Control
The dispatcher can be suspended site-wide, which prevents any new tasks from being promoted to Queued. Already-Queued workers continue running.
toggle_dispatcher (whitelisted)
Requires System Manager role. Persists the suspended/running state as a site default so it survives process restarts.
from tweaks.tweaks.doctype.async_task_log.async_task_log_dispatch import toggle_dispatcher
toggle_dispatcher(enable=False)
toggle_dispatcher(enable=True)
Can also be called via the HTTP API (it is @frappe.whitelist()):
POST /api/method/tweaks.tweaks.doctype.async_task_log.async_task_log_dispatch.toggle_dispatcher
{ "enable": 1 } // or 0
can_dispatch_now
Returns True when the dispatcher is running (not suspended). Use this guard before triggering dispatch manually:
from tweaks.tweaks.doctype.async_task_log.async_task_log_dispatch import can_dispatch_now
if can_dispatch_now():
enqueue_dispatch_async_tasks()
Note: bulk_enqueue_async_task internally suspends the dispatcher while inserting tasks and calls _set_dispatcher_state directly (bypassing the System Manager permission check that wraps the public toggle_dispatcher). Never call toggle_dispatcher from inside a background worker for internal use — use _set_dispatcher_state instead.
Dispatch & Recovery
Tasks are never dropped. The dispatcher runs:
- After every new task insert (
after_insert hook)
- After every task completes (success or failure)
- Via the scheduler (
all event) as a recovery mechanism for missed triggers
See references/implementation.md for the full dispatch algorithm and concurrency internals.
See references/comparison_with_prepared_report.md for a side-by-side schema and implementation comparison with Frappe's built-in Prepared Report.
See references/comparison_with_rq_job.md for a side-by-side schema and lifecycle comparison with Frappe's built-in RQ Job virtual DocType.
See references/comparison_with_scheduled_job.md for a side-by-side schema and implementation comparison with Frappe's Scheduled Job Type / Scheduled Job Log.
Source Code
tweaks/tweaks/doctype/async_task_log/async_task_log.py — Public API + Document controller
tweaks/tweaks/doctype/async_task_log/async_task_log_dispatch.py — Dispatch algorithm
tweaks/tweaks/doctype/async_task_type/async_task_type.py — Type configuration