// "Latch platform for bioinformatics workflows. Build pipelines with Latch SDK, @workflow/@task decorators, deploy serverless workflows, LatchFile/LatchDir, Nextflow/Snakemake integration."
| name | latchbio-integration |
| description | Latch platform for bioinformatics workflows. Build pipelines with Latch SDK, @workflow/@task decorators, deploy serverless workflows, LatchFile/LatchDir, Nextflow/Snakemake integration. |
Latch is a Python framework for building and deploying bioinformatics workflows as serverless pipelines. Built on Flyte, create workflows with @workflow/@task decorators, manage cloud data with LatchFile/LatchDir, configure resources, and integrate Nextflow/Snakemake pipelines.
The Latch platform provides four main areas of functionality:
# Install Latch SDK
python3 -m uv pip install latch
# Login to Latch
latch login
# Initialize a new workflow
latch init my-workflow
# Register workflow to platform
latch register my-workflow
Prerequisites:
from latch import workflow, small_task
from latch.types import LatchFile
@small_task
def process_file(input_file: LatchFile) -> LatchFile:
"""Process a single file"""
# Processing logic
return output_file
@workflow
def my_workflow(input_file: LatchFile) -> LatchFile:
"""
My bioinformatics workflow
Args:
input_file: Input data file
"""
return process_file(input_file=input_file)
This skill should be used when encountering any of the following scenarios:
Workflow Development:
@workflow, @task decoratorsData Management:
latch:/// pathsResource Configuration:
Verified Workflows:
latch.verified moduleThis skill includes comprehensive reference documentation organized by capability:
Read this for:
Key topics:
latch init and latch register commands@workflow and @task decoratorsRead this for:
Key topics:
latch:/// path formatRead this for:
Key topics:
@small_task, @large_task, @small_gpu_task, @large_gpu_task@custom_task with precise specificationsRead this for:
Key topics:
latch.verified module importsfrom latch import workflow, small_task, large_task
from latch.types import LatchFile, LatchDir
@small_task
def quality_control(fastq: LatchFile) -> LatchFile:
"""Run FastQC"""
return qc_output
@large_task
def alignment(fastq: LatchFile, genome: str) -> LatchFile:
"""STAR alignment"""
return bam_output
@small_task
def quantification(bam: LatchFile) -> LatchFile:
"""featureCounts"""
return counts
@workflow
def rnaseq_pipeline(
input_fastq: LatchFile,
genome: str,
output_dir: LatchDir
) -> LatchFile:
"""RNA-seq analysis pipeline"""
qc = quality_control(fastq=input_fastq)
aligned = alignment(fastq=qc, genome=genome)
return quantification(bam=aligned)
from latch import workflow, small_task, large_gpu_task
from latch.types import LatchFile
@small_task
def preprocess(input_file: LatchFile) -> LatchFile:
"""Prepare data"""
return processed
@large_gpu_task
def gpu_computation(data: LatchFile) -> LatchFile:
"""GPU-accelerated analysis"""
return results
@workflow
def gpu_pipeline(input_file: LatchFile) -> LatchFile:
"""Pipeline with GPU tasks"""
preprocessed = preprocess(input_file=input_file)
return gpu_computation(data=preprocessed)
from latch import workflow, small_task
from latch.registry.table import Table
from latch.registry.record import Record
from latch.types import LatchFile
@small_task
def process_and_track(sample_id: str, table_id: str) -> str:
"""Process sample and update Registry"""
# Get sample from registry
table = Table.get(table_id=table_id)
records = Record.list(table_id=table_id, filter={"sample_id": sample_id})
sample = records[0]
# Process
input_file = sample.values["fastq_file"]
output = process(input_file)
# Update registry
sample.update(values={"status": "completed", "result": output})
return "Success"
@workflow
def registry_workflow(sample_id: str, table_id: str):
"""Workflow integrated with Registry"""
return process_and_track(sample_id=sample_id, table_id=table_id)
Registration Failures:
latch login--verbose flag for detailed logsResource Problems:
Data Access:
latch:/// path formatType Errors:
For issues or questions: