| name | preferences-railway-oriented-programming |
| description | Railway-oriented programming with Result types and workflow composition for error handling. Load when designing error handling pipelines or composing fallible operations. |
Railway-oriented programming
Overview
Railway-oriented programming (ROP) is a functional pattern for composing operations that can fail.
The mental model is a two-track railway: success track and failure track.
Once on the failure track, operations are skipped and errors propagate to the end.
This approach makes error handling explicit, composable, and type-safe.
The Result type
The foundation of ROP is a discriminated union representing success or failure.
Pattern: Result<T, E> in Python
from typing import Generic, TypeVar, Union
from dataclasses import dataclass
T = TypeVar('T')
E = TypeVar('E')
U = TypeVar('U')
@dataclass
class Success(Generic[T]):
value: T
@dataclass
class Failure(Generic[E]):
error: E
Result = Union[Success[T], Failure[E]]
def parseInput(raw: dict) -> Result[Input, ParseError]:
"""Parse untrusted input - can fail"""
try:
return Success(Input(**raw))
except Exception as e:
return Failure(ParseError(str(e)))
def validate(input: Input) -> Result[ValidInput, ValidationError]:
"""Validate business rules - can fail"""
errors = []
if not input.email or '@' not in input.email:
errors.append("email must contain @")
if not input.name:
errors.append("name is required")
if errors:
return Failure(ValidationError(errors))
return Success(ValidInput(email=input.email, name=input.name))
match parseInput(data):
case Success(input):
print(f"Parsed: {input}")
case Failure(error):
print(f"Parse error: {error}")
Pattern: Result in TypeScript
type Result<T, E> =
| { ok: true; value: T }
| { ok: false; error: E };
function success<T, E>(value: T): Result<T, E> {
return { ok: true, value };
}
function failure<T, E>(error: E): Result<T, E> {
return { ok: false, error };
}
function parseInput(raw: unknown): Result<Input, ParseError> {
try {
const input = Input.parse(raw);
return success(input);
} catch (e) {
return failure(new ParseError(e.message));
}
}
function validate(input: Input): Result<ValidInput, ValidationError> {
const errors: string[] = [];
if (!input.email || !input.email.includes('@')) {
errors.push('email must contain @');
}
if (!input.name) {
errors.push('name is required');
}
if (errors.length > 0) {
return failure(new ValidationError(errors));
}
return success({ email: input.email, name: input.name });
}
const result = parseInput(data);
if (result.ok) {
console.log(`Parsed: ${result.value}`);
} else {
console.log(`Error: ${result.error}`);
}
Pattern: Result in Go
type Result[T any] struct {
Value T
Error error
}
func Success[T any](value T) Result[T] {
return Result[T]{Value: value, Error: nil}
}
func Failure[T any](err error) Result[T] {
var zero T
return Result[T]{Value: zero, Error: err}
}
func (r Result[T]) IsOk() bool {
return r.Error == nil
}
func (r Result[T]) IsErr() bool {
return r.Error != nil
}
func ParseInput(raw map[string]any) Result[Input] {
input, err := parseInputInternal(raw)
if err != nil {
return Failure[Input](err)
}
return Success(input)
}
func Validate(input Input) Result[ValidInput] {
if input.Email == "" || !strings.Contains(input.Email, "@") {
return Failure[ValidInput](errors.New("email must contain @"))
}
if input.Name == "" {
return Failure[ValidInput](errors.New("name is required"))
}
return Success(ValidInput{Email: input.Email, Name: input.Name})
}
result := ParseInput(data)
if result.IsOk() {
fmt.Printf("Parsed: %v\n", result.Value)
} else {
fmt.Printf("Error: %v\n", result.Error)
}
The abstraction hierarchy
Result types participate in a hierarchy of abstractions, each more powerful than the last.
Understanding this hierarchy helps you choose the appropriate abstraction for each situation.
Functor: transforming success values
The functor interface (map) transforms the success value without affecting the error track.
Use when you have a pure function to apply to a successful result.
result.map(|account| account.balance)
This is the simplest and most widely applicable abstraction.
Every Result is a functor, and functors compose freely.
Applicative: combining independent results
The applicative interface (apply, map2, mapN) combines multiple independent results.
All validations run regardless of individual failures, collecting all errors.
Use when operations don't depend on each other's results.
validate_name(name)
.and(validate_email(email))
.and(validate_age(age))
.map3(|n, e, a| User::new(n, e, a))
Applicative is more powerful than functor because it can combine multiple Results, but less powerful than monad because it cannot express sequential dependencies.
Monad: sequencing dependent operations
The monad interface (flatMap, and_then, >>=) sequences operations where later steps depend on earlier results.
Execution short-circuits on first failure.
Use when each step needs the result of the previous step.
validate_account_no(no)
.and_then(|no| lookup_account(no))
.and_then(|account| validate_balance(account, amount))
.and_then(|account| debit(account, amount))
Monad is the most powerful abstraction in this hierarchy, but also the most specialized.
Not all applicatives are monads.
The least powerful abstraction principle
Prefer the simplest abstraction that solves your problem.
Functors compose more freely than applicatives.
Applicatives compose more freely than monads.
Using a more powerful abstraction than necessary restricts composability.
When in doubt, follow this decision tree:
- Can you just transform the success value? Use functor (
map)
- Are the operations independent of each other? Use applicative (
apply, mapN)
- Does each step depend on the previous result? Use monad (
bind, and_then)
Selection guidance
| Situation | Abstraction | Interface | Why |
|---|
| Transform success value | Functor | map | Simplest, most reusable, composes freely |
| Combine independent validations | Applicative | apply, mapN | Parallel execution, error accumulation |
| Chain dependent operations | Monad | bind, and_then | Sequential dependency, fail-fast |
See preferences-algebraic-laws for the mathematical laws each abstraction must satisfy, and preferences-theoretical-foundations for the categorical setting they live in.
bind: Monadic composition
Chain operations that can fail - short-circuit on first error.
Pattern: Sequential pipeline in Python
from typing import Callable
def bind(result: Result[T, E], f: Callable[[T], Result[U, E]]) -> Result[U, E]:
"""
Monadic bind: compose world-crossing functions.
If result is Success, apply f to the value.
If result is Failure, skip f and propagate error.
"""
match result:
case Success(value):
return f(value)
case Failure(error):
return Failure(error)
def bindAsync(
result: Result[T, E],
f: Callable[[T], Awaitable[Result[U, E]]]
) -> Awaitable[Result[U, E]]:
"""Bind for async operations"""
async def inner():
match result:
case Success(value):
return await f(value)
case Failure(error):
return Failure(error)
return inner()
def processOrder(raw: dict) -> Result[Order, Error]:
"""
Process order through pipeline:
1. Parse input (can fail)
2. Validate (can fail)
3. Save to DB (can fail)
Stop at first error - don't waste work.
"""
return (
bind(parseInput(raw), lambda input:
bind(validate(input), lambda valid:
bind(saveToDb(valid), lambda order:
Success(order))))
)
class ResultMonad:
def __init__(self, result: Result[T, E]):
self._result = result
def bind(self, f: Callable[[T], Result[U, E]]) -> 'ResultMonad[U, E]':
return ResultMonad(bind(self._result, f))
def unwrap(self) -> Result[T, E]:
return self._result
result = (
ResultMonad(parseInput(raw))
.bind(validate)
.bind(saveToDb)
.unwrap()
)
Pattern: Async pipeline in Python
async def processOrderAsync(raw: dict) -> Result[Order, Error]:
"""
Async pipeline with Result:
- Parse (sync)
- Validate (sync)
- Fetch from DB (async, can fail)
- Update DB (async, can fail)
"""
inputResult = parseInput(raw)
validResult = bind(inputResult, validate)
match validResult:
case Success(valid):
userResult = await fetchUser(valid.userId)
return await bindAsync(userResult, lambda user:
await updateUser(user, valid))
case Failure(error):
return Failure(error)
async def bindAsync(
result: Result[T, E],
f: Callable[[T], Awaitable[Result[U, E]]]
) -> Result[U, E]:
match result:
case Success(value):
return await f(value)
case Failure(error):
return Failure(error)
When to use bind (monadic style)
Use bind when:
- Steps depend on previous results: Next operation needs output of previous
- Want to short-circuit: Stop on first error, don't waste work
- Database operations: Skip writes if validation fails
- External API calls: Don't call next API if previous failed
- Expensive operations: Avoid unnecessary computation
Example scenarios:
- User registration: validate → check email not taken → create user → send email
- Order processing: validate → reserve inventory → charge card → create shipment
- Data pipeline: parse → validate → transform → load
apply: Applicative composition
Combine independent validations and collect all errors.
Pattern: Parallel validation in Python
def apply(
fResult: Result[Callable[[T], U], list[E]],
xResult: Result[T, list[E]]
) -> Result[U, list[E]]:
"""
Applicative apply: combine independent computations.
If both Success: apply function to value.
If either Failure: collect errors from both.
"""
match (fResult, xResult):
case (Success(f), Success(x)):
return Success(f(x))
case (Failure(e1), Success(_)):
return Failure(e1)
case (Success(_), Failure(e2)):
return Failure(e2)
case (Failure(e1), Failure(e2)):
return Failure(e1 + e2)
def map(f: Callable[[T], U], result: Result[T, E]) -> Result[U, E]:
"""Lift normal function to Result world"""
match result:
case Success(value):
return Success(f(value))
case Failure(error):
return Failure(error)
def validateUser(raw: dict) -> Result[User, list[ValidationError]]:
"""
Validate all fields independently.
Returns all validation errors, not just first.
Better UX than stopping at first error.
"""
emailResult = validateEmail(raw.get('email'))
nameResult = validateName(raw.get('name'))
ageResult = validateAge(raw.get('age'))
def makeUser(email: EmailAddress):
def withName(name: str):
def withAge(age: int):
return User(email=email, name=name, age=age)
return withAge
return withName
return apply(
apply(
map(makeUser, emailResult),
nameResult),
ageResult)
result = validateUser({
'email': 'invalid',
'name': '',
'age': '-5'
})
Pattern: Applicative in TypeScript
function apply<T, U, E>(
fResult: Result<(t: T) => U, E[]>,
xResult: Result<T, E[]>
): Result<U, E[]> {
if (fResult.ok && xResult.ok) {
return success(fResult.value(xResult.value));
}
if (!fResult.ok && !xResult.ok) {
return failure([...fResult.error, ...xResult.error]);
}
if (!fResult.ok) {
return failure(fResult.error);
}
return failure(xResult.error);
}
function map<T, U, E>(
f: (t: T) => U,
result: Result<T, E>
): Result<U, E> {
if (result.ok) {
return success(f(result.value));
}
return failure(result.error);
}
function validateUser(raw: unknown): Result<User, ValidationError[]> {
const emailResult = validateEmail(raw.email);
const nameResult = validateName(raw.name);
const ageResult = validateAge(raw.age);
const makeUser = (email: EmailAddress) =>
(name: string) =>
(age: number) =>
({ email, name, age });
return apply(
apply(
map(makeUser, emailResult),
nameResult),
ageResult);
}
Error accumulation with semigroups
For applicative validation to accumulate errors, the error type must form a semigroup (support associative combination).
NonEmptyList or NonEmptyVec are common choices because they guarantee at least one error when in the Failure case.
use nonempty::NonEmpty;
type ValidationResult<T> = Result<T, NonEmpty<ValidationError>>;
#[derive(Debug, Clone)]
enum ValidationError {
InvalidName(String),
InvalidEmail(String),
InvalidAge(String),
}
fn validate_name(name: &str) -> ValidationResult<String> {
if name.is_empty() {
Err(NonEmpty::new(ValidationError::InvalidName(
"name is required".into()
)))
} else {
Ok(name.to_string())
}
}
fn validate_email(email: &str) -> ValidationResult<String> {
if !email.contains('@') {
Err(NonEmpty::new(ValidationError::InvalidEmail(
"email must contain @".into()
)))
} else {
Ok(email.to_string())
}
}
fn validate_age(age: i32) -> ValidationResult<i32> {
if age < 0 {
Err(NonEmpty::new(ValidationError::InvalidAge(
"age must be positive".into()
)))
} else {
Ok(age)
}
}
fn validate_user(
name: &str,
email: &str,
age: i32
) -> ValidationResult<User> {
validate_name(name)
.and(validate_email(email))
.and(validate_age(age))
.map(|(n, e, a)| User::new(n, e, a))
}
let result = validate_user("", "invalid", -5);
The semigroup requirement means errors can be combined associatively.
NonEmptyVec satisfies this: (e1 + e2) + e3 == e1 + (e2 + e3).
This enables the applicative to collect errors from independent validations while maintaining type safety (at least one error when failed).
Pattern: Applicative with NonEmptyList in Python
from typing import Generic, TypeVar
from dataclasses import dataclass
T = TypeVar('T')
E = TypeVar('E')
@dataclass
class NonEmptyList(Generic[E]):
"""List guaranteed to have at least one element."""
head: E
tail: list[E]
def append(self, other: 'NonEmptyList[E]') -> 'NonEmptyList[E]':
"""Semigroup operation: combine two non-empty lists."""
return NonEmptyList(
head=self.head,
tail=self.tail + [other.head] + other.tail
)
def to_list(self) -> list[E]:
return [self.head] + self.tail
ValidationResult = Result[T, NonEmptyList[E]]
def validate_user_fields(
name: str,
email: str,
age: int
) -> ValidationResult[User]:
"""
Validate all fields independently.
Accumulates all errors using NonEmptyList semigroup.
"""
name_result = validate_name(name)
email_result = validate_email(email)
age_result = validate_age(age)
return apply(
apply(
map(lambda n: lambda e: lambda a: User(n, e, a), name_result),
email_result
),
age_result
)
result = validate_user_fields("", "bad-email", -1)
When to use apply (applicative style)
Use apply when:
- Validations are independent: Each check doesn't need results of others
- Want to collect all errors: Better UX to show all problems at once
- Can run in parallel: No dependencies means potential parallelism
- Form validation: Show all field errors to user
Example scenarios:
- User input validation: email, name, age all validated independently
- Configuration validation: check all required fields before proceeding
- Multi-field business rules: credit score AND income AND debt ratio
The key difference from monadic bind: applicative runs all validations regardless of individual failures, while bind short-circuits on first error.
Effect signatures
Make side effects explicit in function type signatures.
Pattern: Async operations that can fail
from typing import Awaitable
AsyncResult = Awaitable[Result[T, E]]
async def fetchUser(userId: UserId) -> AsyncResult[User, DatabaseError]:
"""
Effect signature says:
- This is async (Awaitable)
- Can fail (Result)
- Failure type is DatabaseError
"""
try:
row = await db.fetchrow(
"SELECT id, email, name FROM users WHERE id = $1",
userId.value
)
if row is None:
return Failure(DatabaseError("user not found"))
return Success(User(
id=UserId(value=row['id']),
email=EmailAddress(value=row['email']),
name=row['name']
))
except Exception as e:
return Failure(DatabaseError(str(e)))
async def updateUser(user: User) -> AsyncResult[User, DatabaseError]:
"""Effect: async write that can fail"""
try:
await db.execute(
"UPDATE users SET email = $1, name = $2 WHERE id = $3",
user.email.value,
user.name,
user.id.value
)
return Success(user)
except Exception as e:
return Failure(DatabaseError(str(e)))
async def updateUserEmail(
userId: UserId,
newEmail: EmailAddress
) -> AsyncResult[User, Error]:
"""
Railway-oriented pipeline:
1. Fetch user (async, can fail: not found, db error)
2. Update email field (pure, cannot fail)
3. Save user (async, can fail: db error)
"""
userResult = await fetchUser(userId)
match userResult:
case Success(user):
updatedUser = User(
id=user.id,
email=newEmail,
name=user.name
)
return await updateUser(updatedUser)
case Failure(error):
return Failure(error)
Pattern: Effect signatures in TypeScript
type AsyncResult<T, E> = Promise<Result<T, E>>;
async function fetchUser(
userId: UserId
): AsyncResult<User, DatabaseError> {
try {
const row = await db.query(
"SELECT id, email, name FROM users WHERE id = $1",
[userId]
);
if (row.rows.length === 0) {
return failure(new DatabaseError("not found"));
}
return success(parseUser(row.rows[0]));
} catch (e) {
return failure(new DatabaseError(e.message));
}
}
async function updateUser(
user: User
): AsyncResult<User, DatabaseError> {
try {
await db.query(
"UPDATE users SET email = $1, name = $2 WHERE id = $3",
[user.email, user.name, user.id]
);
return success(user);
} catch (e) {
return failure(new DatabaseError(e.message));
}
}
Combining Result with other effects
When Result must be combined with other effects (async, reader, state), monad transformers stack the effects.
ResultT for async + error
Most commonly, Result needs to be combined with async operations.
The standard pattern is Future<Result<T, E>> or Promise<Result<T, E>>.
type AsyncResult<T, E> = impl Future<Output = Result<T, E>>;
async fn fetch_and_validate(id: &str) -> Result<Account, Error> {
let data = fetch(id).await?;
validate(data)?;
Ok(Account::from(data))
}
async fn process_account(id: &str) -> Result<ProcessedAccount, Error> {
let account = fetch_and_validate(id).await?;
let verified = verify_account(account).await?;
let processed = process(verified).await?;
Ok(processed)
}
type AsyncResult<T, E> = Promise<Result<T, E>>;
async function fetchAndValidate(id: string): AsyncResult<Account, Error> {
const dataResult = await fetch(id);
if (!dataResult.ok) return dataResult;
const validResult = validate(dataResult.value);
if (!validResult.ok) return validResult;
return success(Account.from(validResult.value));
}
async function processAccount(id: string): AsyncResult<ProcessedAccount, Error> {
const accountResult = await fetchAndValidate(id);
if (!accountResult.ok) return accountResult;
const verifiedResult = await verifyAccount(accountResult.value);
if (!verifiedResult.ok) return verifiedResult;
const processedResult = await process(verifiedResult.value);
return processedResult;
}
from typing import Awaitable
AsyncResult = Awaitable[Result[T, E]]
async def fetch_and_validate(id: str) -> Result[Account, Error]:
"""
Async operation that can fail.
Combines async effect (await) with error effect (Result).
"""
data_result = await fetch(id)
match data_result:
case Success(data):
valid_result = validate(data)
match valid_result:
case Success(valid):
return Success(Account.from_valid(valid))
case Failure(error):
return Failure(error)
case Failure(error):
return Failure(error)
async def process_account(id: str) -> Result[ProcessedAccount, Error]:
account_result = await fetch_and_validate(id)
match account_result:
case Success(account):
verified_result = await verify_account(account)
match verified_result:
case Success(verified):
return await process(verified)
case Failure(error):
return Failure(error)
case Failure(error):
return Failure(error)
Effect ordering matters
The transformer stacking order determines behavior.
Future<Result<T, E>> gives a Future that resolves to a Result - this is the standard pattern.
Result<Future<T>, E> (less common) would give a Result containing a Future, which is rarely useful because you cannot meaningfully combine a successful Future with a failed computation.
For most domain code, prefer Future<Result<T, E>> / async fn() -> Result<T, E>.
This allows async operations to complete before error handling, matching the natural execution order.
Combining multiple effects with monad transformers
When stacking more than two effects, libraries provide transformer types; a transformer stack is one interpreter of a capability interface, not the interface itself (see preferences-theoretical-foundations):
-- Haskell: ReaderT + ExceptT + IO
type AppM a = ReaderT Config (ExceptT AppError IO) a
-- Unwrapped: Config -> IO (Either AppError a)
-- Three effects: Reader (config access), Except (errors), IO (effects)
runApp :: AppM a -> Config -> IO (Either AppError a)
runApp app config = runExceptT (runReaderT app config)
struct AppM<T> {
run: Box<dyn Fn(Config) -> Pin<Box<dyn Future<Output = Result<T, AppError>>>>>
}
impl<T> AppM<T> {
fn run(self, config: Config) -> impl Future<Output = Result<T, AppError>> {
(self.run)(config)
}
}
Most languages without native transformer support use a simpler pattern:
type AppM<T> = (config: Config) => Promise<Result<T, AppError>>;
async function processOrder(config: Config): Promise<Result<Order, AppError>> {
const validResult = await validateOrder(config);
if (!validResult.ok) return validResult;
return success(validResult.value);
}
Guidelines for effect composition
- Most common:
async fn() -> Result<T, E> / Future<Result<T, E>> for async + error
- Keep transformers shallow: Two or three effects maximum before complexity becomes unwieldy
- Document effect order: Make clear what each layer represents
- Use language idioms: async/await with Result is more idiomatic than custom transformers in most languages
- Consider effect libraries: fp-ts (TypeScript), cats-effect (Scala), polysemy (Haskell) when transformer stacks become complex
See preferences-theoretical-foundations, and in particular its effects-and-handlers material, for why a capability interface discharged by handlers, not a transformer tower, is the primitive: a transformer stack is one leaky interpreter of that interface, so keep any stack shallow and hidden behind a type alias rather than treating it as the integration ideal.
The two-track model
Transform all functions to uniform two-track shape for composition.
Transformation functions
def map(f: Callable[[T], U], result: Result[T, E]) -> Result[U, E]:
"""
Lift normal function to Result world.
One-track in, two-track out.
"""
match result:
case Success(value):
return Success(f(value))
case Failure(error):
return Failure(error)
def tee(f: Callable[[T], None]) -> Callable[[T], T]:
"""
Convert side-effect function to pass-through.
Useful for logging, metrics, etc.
"""
def wrapper(x: T) -> T:
f(x)
return x
return wrapper
def tryCatch(
f: Callable[[T], U],
errorHandler: Callable[[Exception], E]
) -> Callable[[T], Result[U, E]]:
"""
Convert exception-throwing function to Result-returning.
Catches exceptions and converts to Failure.
"""
def wrapper(x: T) -> Result[U, E]:
try:
return Success(f(x))
except Exception as e:
return Failure(errorHandler(e))
return wrapper
Uniform pipeline composition
def canonicalizeEmail(email: str) -> str:
return email.strip().lower()
def logUser(user: User) -> None:
logger.info(f"Processing user {user.id}")
def encryptPassword(password: str) -> str:
if len(password) < 8:
raise ValueError("password too short")
return bcrypt.hash(password)
def updateUserPipeline(raw: dict) -> Result[User, Error]:
"""
All functions transformed to two-track for uniform composition:
- parseInput: already two-track (returns Result)
- validate: already two-track (returns Result)
- canonicalizeEmail: lifted via map
- logUser: converted via tee + map
- encryptPassword: lifted via tryCatch
- saveToDb: already two-track
"""
return (
bind(parseInput(raw), lambda input:
bind(validate(input), lambda valid:
bind(
map(lambda v: canonicalizeEmail(v.email), Success(valid)),
lambda canonicalized:
bind(map(tee(logUser), Success(canonicalized)), lambda logged:
bind(
tryCatch(encryptPassword, lambda e: Error(str(e)))(valid.password),
lambda encrypted:
saveToDb(logged, encrypted)
)))))
)
Railway diagrams
Single-track function (one input, one output):
─────[ f ]─────
Switch function (one input, two outputs - Success or Failure):
───┬─[ f ]─── Success
└────────── Failure
Two-track function (two inputs, two outputs):
───┬─[ f ]─┬─── Success
│ └─── Failure (from success track)
───┴─────────── Failure (passthrough)
Pipeline of switches with bind:
───┬─[ f1 ]─┬─[ f2 ]─┬─[ f3 ]─┬─── Success
└────────┴────────┴────────┴─── Failure
All functions uniform after transformation:
───┬─[ switch ]─┬─[ map g ]─┬─[ tee h ]─┬─── Success
└────────────┴───────────┴───────────┴─── Failure
Error classification and handling strategies
Not all errors should be modeled the same way.
Understanding error categories helps choose appropriate handling strategies.
Three categories of errors
Following Wlaschin's classification from "Domain Modeling Made Functional":
1. Domain errors - Expected outcomes of domain operations
- Subject matter experts can describe them
- Part of normal workflow, not exceptional
- Have established procedures for handling
- Should be modeled explicitly as Result types
Examples:
- Validation failures (invalid email format)
- Business rule violations (insufficient funds)
- Not found errors (user doesn't exist)
- Convergence failures (model didn't converge)
2. Infrastructure errors - Technical/architectural failures
- Technical concerns outside domain logic
- May be transient (retry can help)
- Outside subject matter expert vocabulary
- Can model as Result or use exceptions
Examples:
- Network timeouts
- Database connection failures
- Disk full, out of memory
- Authentication/authorization failures
3. Panics - Unrecoverable system errors
- System in unknown state
- Usually programmer errors
- Cannot meaningfully continue
- Should use exceptions/panics
Examples:
- Division by zero (bug in code)
- Array index out of bounds (logic error)
- Null/None when value guaranteed (broken invariant)
- Stack overflow, out of memory
Decision tree
Ask: "Would a subject matter expert recognize this error as part of the domain?"
Is this a domain concept?
├─ Yes → Domain error
│ Model explicitly with Result
│ Example: OrderQuantityMustBePositive
│
└─ No → Ask: "Can we meaningfully continue?"
├─ Yes → Infrastructure error
│ Consider modeling as Result
│ Example: DatabaseTemporarilyUnavailable
│
└─ No → Panic
Use exceptions/panics
Example: IndexOutOfBounds (programmer error)
Handling strategies by category
Domain errors with Result:
from expression import Result, Ok, Error
@dataclass
class ValidationError:
field: str
reason: str
@dataclass
class InsufficientFunds:
account_id: str
balance: Decimal
requested: Decimal
DomainError = ValidationError | InsufficientFunds
def process_payment(
account: Account,
amount: Decimal
) -> Result[Payment, DomainError]:
"""Domain operation that can fail with known errors."""
if amount > account.balance:
return Error(InsufficientFunds(
account_id=account.id,
balance=account.balance,
requested=amount
))
return Ok(Payment(account=account, amount=amount))
Infrastructure errors - explicit or exception:
@dataclass
class DatabaseError:
operation: str
exception: str
async def save_to_database(
data: Data
) -> Result[SavedData, DatabaseError]:
try:
result = await db.save(data)
return Ok(result)
except Exception as e:
return Error(DatabaseError("save", str(e)))
async def save_to_database(data: Data) -> SavedData:
"""May raise DatabaseException."""
return await db.save(data)
Panics - always exceptions:
def get_first_element(items: list[T]) -> T:
"""
Get first element.
Precondition: items must be non-empty.
Raises AssertionError if violated (programmer error).
"""
assert len(items) > 0, "items must be non-empty"
return items[0]
from typing import NewType
NonEmptyList = NewType('NonEmptyList', list)
def get_first_element(items: NonEmptyList[T]) -> T:
"""Get first element. Type guarantees non-empty."""
return items[0]
Composing different error types
When workflows combine operations with different error types:
from typing import Union
WorkflowError = (
ValidationError |
InsufficientFunds |
DatabaseError
)
def process_order_workflow(
order: UnvalidatedOrder
) -> Result[OrderConfirmation, WorkflowError]:
"""Workflow combining domain and infrastructure errors."""
return (
validate_order(order)
.map_error(lambda e: e)
.bind(process_payment)
.map_error(lambda e: e)
.bind(save_to_database)
.map_error(lambda e: e)
)
Guidelines
- Default to Result for domain errors: If domain experts discuss it, model it explicitly
- Infrastructure errors are judgment calls: Model explicitly if need fine-grained control, use exceptions if want to fail fast
- Never catch panics in domain logic: Panics indicate bugs, not business scenarios
- Use types to prevent panics: Make invalid states unrepresentable instead of asserting
- Document error possibilities: Function signatures should show what can go wrong
Cross-language error handling
Python:
- Domain: Result from Expression library
- Infrastructure: Result or raise exceptions
- Panics: assert, raise RuntimeError
TypeScript:
- Domain: Either from fp-ts or Effect.Effect
- Infrastructure: Either or throw Error
- Panics: throw Error, assert
Rust:
- Domain: Result<T, E> with custom error types
- Infrastructure: Result or anyhow::Result
- Panics: panic!, assert
See also:
- domain-modeling.md#pattern-7-domain-errors-vs-infrastructure-errors for detailed examples
- Language-specific docs (python-development.md, typescript-nodejs-development.md, rust-development/02-error-handling.md) for error type hierarchies
Integration with data pipelines
SQLMesh models as pure functions
Treat SQLMesh models as pure, composable functions in the Result world.
MODEL (
name analytics.validated_events,
kind INCREMENTAL_BY_TIME_RANGE(time_column created_at),
dialect postgres
);
SELECT
event_id,
event_type,
payload,
created_at
FROM raw_events
WHERE
event_type IN ('UserCreated', 'UserUpdated', 'UserDeleted')
AND jsonb_typeof(payload) = 'object'
AND created_at IS NOT NULL;
MODEL (
name analytics.event_validation_failures,
kind FULL
);
SELECT
event_id,
event_type,
'invalid_event_type' as failure_reason
FROM raw_events
WHERE event_type NOT IN ('UserCreated', 'UserUpdated', 'UserDeleted')
UNION ALL
SELECT
event_id,
event_type,
'invalid_payload' as failure_reason
FROM raw_events
WHERE jsonb_typeof(payload) != 'object';
Composing models with Result semantics
MODEL (
name analytics.user_stats,
kind INCREMENTAL_BY_TIME_RANGE(time_column event_date)
);
SELECT
user_id,
DATE(created_at) as event_date,
COUNT(*) as event_count
FROM {{ ref('validated_events') }}
WHERE event_type = 'UserCreated'
GROUP BY user_id, DATE(created_at);
MODEL (
name analytics.data_quality_metrics,
kind FULL
);
SELECT
'event_validation' as check_name,
COUNT(*) as failure_count,
CURRENT_TIMESTAMP as checked_at
FROM {{ ref('event_validation_failures') }};
Testing railway-oriented code
Property-based testing for bind/apply
from hypothesis import given, strategies as st
@given(
st.integers(),
st.integers()
)
def test_bind_associativity(x: int, y: int):
"""(m >>= f) >>= g === m >>= (\x -> f x >>= g)"""
m = Success(x)
f = lambda a: Success(a + y)
g = lambda b: Success(b * 2)
left = bind(bind(m, f), g)
right = bind(m, lambda a: bind(f(a), g))
assert left == right
@given(st.text(), st.text(), st.text())
def test_apply_collects_errors(email: str, name: str, age: str):
"""Applicative should collect errors from all validations"""
result = validateUser({'email': email, 'name': name, 'age': age})
match result:
case Failure(errors):
expected_errors = 0
if '@' not in email:
expected_errors += 1
if not name:
expected_errors += 1
try:
if int(age) < 0:
expected_errors += 1
except:
expected_errors += 1
assert len(errors) == expected_errors
case Success(_):
assert '@' in email
assert name
assert int(age) >= 0
Error pipeline observability
Railway-oriented pipelines compose error handling at the type level, but operators also need runtime visibility into error flows.
The type system ensures errors are handled; observability ensures error patterns are detectable and diagnosable.
When a pipeline step switches from the success track to the failure track, this is an observability event.
Record it as a span event (not a separate span) on the current workflow span, including the step name that produced the error, the error classification (domain validation, infrastructure failure, timeout, etc.), and relevant context from the error value.
This makes the railway topology visible in traces — operators can see which step failed, what the error was, and how it propagated through the remaining pipeline.
Aggregate error distribution across pipeline steps as metrics.
A counter per step per error classification reveals which steps fail most frequently and what categories of failure dominate.
Domain validation errors (e.g., invalid input, business rule violations) are typically expected and informational.
Infrastructure errors (e.g., database unavailable, external service timeout) are operational signals that may indicate systemic problems.
The error classification from the Result type maps directly to metric labels.
For pipelines that accumulate errors (applicative validation using Validation or similar), the aggregated error set is the span event payload.
Record the count and categories of accumulated errors, not necessarily every individual error (which could be high-volume).
The key integration point: preferences-observability-engineering establishes that errors appearing in both trace context and error tracking is normal and useful.
Railway-oriented pipelines provide a natural point to emit to both — the failure track transition is where the structured error context is richest.
Cross-reference preferences-observability-engineering for the observability model and preferences-architectural-patterns for where error classification happens in layered architecture (application layer).
Integration with other preferences
See ~/.claude/skills/preferences-algebraic-data-types/SKILL.md for:
- How to model domain types that work with Result
- Sum types for error variants
- Newtypes for validated values
See ~/.claude/skills/preferences-schema-versioning/SKILL.md for:
- Configuring sqlc to generate Result-returning queries
- Database operations in railway-oriented style
See ~/.claude/skills/preferences-data-modeling/SKILL.md for:
- How ROP fits into data pipeline architecture
- Effect isolation at boundaries
- Capability-interface discharge for effect composition at boundaries
The transformer-stack technique above combines Result with async, reader, and state effects, and is correct as language-specific engineering; a stack is one interpreter of a capability interface, not the interface itself and not the integration ideal.
See preferences-theoretical-foundations, and in particular its effects-and-handlers material, for why a capability interface discharged by handlers, not a transformer tower, is the primitive, and for the conjectural internal language the design converges on.