| name | mz-parallel-workload |
| description | Extend parallel-workload stress framework: random SQL concurrently to catch panics + unexpected errors (not perf — see mz-benchmark). Trigger: "parallel workload", "parallel-workload", "action.py" re parallel workload, or testing panics/unexpected errors under concurrency. Also "add this to parallel workload" or bug that panics under concurrent DDL/DML.
|
Extending Parallel Workload
The parallel-workload framework stress-tests Materialize by running random SQL actions concurrently across multiple threads. It catches panics and unexpected query errors - it does not verify result correctness.
When to Use This Framework
Add coverage here when a bug manifests as:
- A panic under concurrent operations
- An unexpected error from a query that should succeed (or fail gracefully)
- A crash or connection loss triggered by specific DDL/DML combinations
How It Works
- The orchestrator spawns N worker threads, each assigned an action list (read, fetch, write, dml_nontrans, or ddl) based on the configured
Complexity.
- Each worker randomly selects actions from its list using weighted random choice and executes them in a loop until the runtime expires.
- If an action raises a
QueryError, the worker checks action.errors_to_ignore(exe). If the error matches, it's logged and ignored. If not, the test fails.
- All threads share a
Database object that tracks created objects (tables, views, clusters, etc.) with thread-safe locking.
Adding a New Action
Step 1: Define the Action Class
Add your class in action.py. Subclass Action and implement run():
class MyNewAction(Action):
def run(self, exe: Executor) -> bool:
exe.execute("SELECT my_new_feature()", http=Http.RANDOM)
return True
Step 2: Override errors_to_ignore() If Needed
If your action can produce expected errors (e.g., object was concurrently dropped), extend the base list:
def errors_to_ignore(self, exe: Executor) -> list[str]:
result = super().errors_to_ignore(exe)
result.extend([
"my expected error message",
])
if exe.db.complexity == Complexity.DDL:
result.extend(["does not exist"])
return result
Step 3: Register in an Action List
Add your action class and weight to the appropriate list at the bottom of action.py:
ddl_action_list = ActionList(
[
...
(MyNewAction, 5),
...
],
autocommit=True,
)
Which list to use:
read_action_list - SELECT, SUBSCRIBE, COPY TO (autocommit=False)
fetch_action_list - DECLARE CURSOR / FETCH (autocommit=False)
write_action_list - INSERT, COPY FROM (autocommit=False)
dml_nontrans_action_list - DELETE, UPDATE, INSERT RETURNING (autocommit=True)
ddl_action_list - CREATE/DROP/ALTER/RENAME (autocommit=True)
Common Patterns
Picking a Random Database Object
def run(self, exe: Executor) -> bool:
with exe.db.lock:
if not exe.db.tables:
return False
table = self.rng.choice(exe.db.tables)
with table.lock:
if table not in exe.db.tables:
return False
exe.execute(f"ALTER TABLE {table} ...", http=Http.RANDOM)
return True
Respecting Object Limits
def run(self, exe: Executor) -> bool:
if len(exe.db.tables) >= MAX_TABLES:
return False
...
Limits are defined in database.py: MAX_TABLES=5, MAX_VIEWS=15, MAX_CLUSTERS, MAX_SCHEMAS, etc.
Creating a New Database Object
def run(self, exe: Executor) -> bool:
if len(exe.db.views) >= MAX_VIEWS:
return False
view_id = exe.db.view_id
exe.db.view_id += 1
try:
schema = self.rng.choice(exe.db.schemas)
except IndexError:
return False
with schema.lock:
if schema not in exe.db.schemas:
return False
view = View(self.rng, view_id, base_object, schema, ...)
view.create(exe)
exe.db.views.append(view)
return True
Dropping a Database Object
def run(self, exe: Executor) -> bool:
with exe.db.lock:
if len(exe.db.views) <= 2:
return False
view = self.rng.choice(exe.db.views)
with view.lock:
if view not in exe.db.views:
return False
exe.execute(f"DROP VIEW {view}", http=Http.RANDOM)
exe.db.views.remove(view)
return True
Scenario-Specific Actions
Some actions only run in specific scenarios:
def run(self, exe: Executor) -> bool:
if exe.db.scenario != Scenario.Rename:
return False
...
Using Prepared Statements
def run(self, exe: Executor) -> bool:
query = "SELECT ..."
if self.rng.choice([True, False]):
self.stmt_id += 1
self.exe_prepared(query, f"mystmt{self.stmt_id}", exe)
else:
exe.execute(query, http=Http.RANDOM)
return True
Extending an Existing Action
Often the simplest approach is to add new SQL variants to an existing action's run() method. For example, to test a new expression type, add it to expression.py. To test a new system flag, add it to FlipFlagsAction.flags.
Adding a System Flag to FlipFlagsAction
In the FlipFlagsAction class, add an entry to the flags dictionary:
flags: dict[str, list[str]] = {
...
"my_new_flag": ["true", "false"],
...
}
Running
bin/mzcompose --find parallel-workload run default
bin/mzcompose --find parallel-workload run default \
--complexity=ddl \
--scenario=regression \
--runtime=300 \
--seed=42
Important Design Rules
- Minimize locking - Use
exe.db.lock briefly to pick objects, then use per-object locks. Excessive locking reduces the chance of finding real race conditions. You can instead add an expected error in errors_to_ignore
- Return False, don't raise - If a precondition isn't met (empty list, at capacity), return
False to skip the action.
- Handle IndexError from
rng.choice - Objects can be concurrently removed. Either check length first under lock, or catch IndexError and return False.
- Track state changes - If you create an object, append it to the appropriate
exe.db list. If you drop one, remove it.
- Use
self.rng - Always use the action's seeded random.Random instance, never random.choice() directly. This makes failures reproducible via --seed.