// Comprehensive toolkit for developing with the CocoIndex library. Use when users need to create data transformation pipelines (flows), write custom functions, or operate flows via CLI or API. Covers building ETL workflows for AI data processing, including embedding documents into vector databases, building knowledge graphs, creating search indexes, or processing data streams with incremental updates.
| name | cocoindex |
| description | Comprehensive toolkit for developing with the CocoIndex library. Use when users need to create data transformation pipelines (flows), write custom functions, or operate flows via CLI or API. Covers building ETL workflows for AI data processing, including embedding documents into vector databases, building knowledge graphs, creating search indexes, or processing data streams with incremental updates. |
CocoIndex is an ultra-performant real-time data transformation framework for AI with incremental processing. This skill enables building indexing flows that extract data from sources, apply transformations (chunking, embedding, LLM extraction), and export to targets (vector databases, graph databases, relational databases).
Core capabilities:
Key features:
For detailed documentation: https://cocoindex.io/docs/ Search documentation: https://cocoindex.io/docs/search?q=url%20encoded%20keyword
Use when users request:
Ask clarifying questions to understand:
Data source:
Transformations:
Target:
Guide user to add CocoIndex with appropriate extras to their project based on their needs:
Required dependency:
cocoindex - Core functionality, CLI, and most built-in functionsOptional extras (add as needed):
cocoindex[embeddings] - For SentenceTransformer embeddings (when using SentenceTransformerEmbed)cocoindex[colpali] - For ColPali image/document embeddings (when using ColPaliEmbedImage or ColPaliEmbedQuery)cocoindex[lancedb] - For LanceDB target (when exporting to LanceDB)cocoindex[embeddings,lancedb] - Multiple extras can be combinedWhat's included:
embeddings extra: SentenceTransformers library for local embedding modelscolpali extra: ColPali engine for multimodal document/image embeddingslancedb extra: LanceDB client library for LanceDB vector database supportUsers can install using their preferred package manager (pip, uv, poetry, etc.) or add to pyproject.toml.
For installation details: https://cocoindex.io/docs/getting_started/installation
Check existing environment first:
Check if COCOINDEX_DATABASE_URL exists in environment variables
postgres://cocoindex:cocoindex@localhost/cocoindexFor flows requiring LLM APIs (embeddings, extraction):
Guide user to create .env file:
# Database connection (required - internal storage)
COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex
# LLM API keys (add the ones you need)
OPENAI_API_KEY=sk-... # For OpenAI (generation + embeddings)
ANTHROPIC_API_KEY=sk-ant-... # For Anthropic (generation only)
GOOGLE_API_KEY=... # For Gemini (generation + embeddings)
VOYAGE_API_KEY=pa-... # For Voyage (embeddings only)
# Ollama requires no API key (local)
For more LLM options: https://cocoindex.io/docs/ai/llm
Create basic project structure:
# main.py
from dotenv import load_dotenv
import cocoindex
@cocoindex.flow_def(name="FlowName")
def my_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
# Flow definition here
pass
if __name__ == "__main__":
load_dotenv()
cocoindex.init()
my_flow.update()
Follow this structure:
@cocoindex.flow_def(name="DescriptiveName")
def flow_name(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
# 1. Import source data
data_scope["source_name"] = flow_builder.add_source(
cocoindex.sources.SourceType(...)
)
# 2. Create collector(s) for outputs
collector = data_scope.add_collector()
# 3. Transform data (iterate through rows)
with data_scope["source_name"].row() as item:
# Apply transformations
item["new_field"] = item["existing_field"].transform(
cocoindex.functions.FunctionName(...)
)
...
# Nested iteration (e.g., chunks within documents)
with item["nested_table"].row() as nested_item:
# More transformations
nested_item["embedding"] = nested_item["text"].transform(...)
# Collect data for export
collector.collect(
field1=nested_item["field1"],
field2=item["field2"],
generated_id=cocoindex.GeneratedField.UUID
)
# 4. Export to target
collector.export(
"target_name",
cocoindex.targets.TargetType(...),
primary_key_fields=["field1"],
vector_indexes=[...] # If needed
)
Key principles:
.row() to iterate through table dataitem["new_field"] = item["existing_field"].transform(...), NOT local variables like new_field = item["existing_field"].transform(...)Common mistakes to avoid:
❌ Wrong: Using local variables for transformations
with data_scope["files"].row() as file:
summary = file["content"].transform(...) # ❌ Local variable
summaries_collector.collect(filename=file["filename"], summary=summary)
✅ Correct: Assigning to row fields
with data_scope["files"].row() as file:
file["summary"] = file["content"].transform(...) # ✅ Field assignment
summaries_collector.collect(filename=file["filename"], summary=file["summary"])
❌ Wrong: Creating unnecessary dataclasses to mirror flow fields
from dataclasses import dataclass
@dataclass
class FileSummary: # ❌ Unnecessary - CocoIndex manages fields automatically
filename: str
summary: str
embedding: list[float]
# This dataclass is never used in the flow!
IMPORTANT: The patterns listed below are common starting points, but you cannot exhaustively enumerate all possible scenarios. When user requirements don't match existing patterns:
Common starting patterns (use references for detailed examples):
For text embedding: Load references/flow_patterns.md and refer to "Pattern 1: Simple Text Embedding"
For code embedding: Load references/flow_patterns.md and refer to "Pattern 2: Code Embedding with Language Detection"
For LLM extraction + knowledge graph: Load references/flow_patterns.md and refer to "Pattern 3: LLM-based Extraction to Knowledge Graph"
For live updates: Load references/flow_patterns.md and refer to "Pattern 4: Live Updates with Refresh Interval"
For custom functions: Load references/flow_patterns.md and refer to "Pattern 5: Custom Transform Function"
For reusable query logic: Load references/flow_patterns.md and refer to "Pattern 6: Transform Flow for Reusable Logic"
For concurrency control: Load references/flow_patterns.md and refer to "Pattern 7: Concurrency Control"
Example of pattern composition:
If a user asks to "index images from S3, generate captions with a vision API, and store in Qdrant", combine:
No single pattern covers this exact scenario, but the building blocks are composable.
Guide user through testing:
# 1. Run with setup
cocoindex update --setup -f main # -f force setup without confirmation prompts
# 2. Start a server and redirect users to CocoInsight
cocoindex server -ci main
# Then open CocoInsight at https://cocoindex.io/cocoinsight
CocoIndex has a type system independent of programming languages. All data types are determined at flow definition time, making schemas clear and predictable.
IMPORTANT: When to define types:
Type annotation requirements:
Any, dict[str, Any], or omit annotations; engine already knows the typesWhy specific return types matter: Custom function return types let CocoIndex infer field types throughout the flow without processing real data. This enables creating proper target schemas (e.g., vector indexes with fixed dimensions).
Common type categories:
Primitive types: str, int, float, bool, bytes, datetime.date, datetime.datetime, uuid.UUID
Vector types (embeddings): Specify dimension in return type if you plan to export as vectors to targets, as most targets require a fixed vector dimension
cocoindex.Vector[cocoindex.Float32, typing.Literal[768]] - 768-dim float32 vector (recommended)list[float] without dimension also worksStruct types: Dataclass, NamedTuple, or Pydantic model
Person)dict[str, Any] or AnyTable types:
dict[K, V] where K = key type (primitive or frozen struct), V = Struct typelist[R] where R = Struct typedict[Any, Any] or list[Any]Json type: cocoindex.Json for unstructured/dynamic data
Optional types: T | None for nullable values
Examples:
from dataclasses import dataclass
from typing import Literal
import cocoindex
@dataclass
class Person:
name: str
age: int
# ✅ Vector with dimension (recommended for vector search)
@cocoindex.op.function(behavior_version=1)
def embed_text(text: str) -> cocoindex.Vector[cocoindex.Float32, Literal[768]]:
"""Generate 768-dim embedding - dimension needed for vector index."""
# ... embedding logic ...
return embedding # numpy array or list of 768 floats
# ✅ Struct return type, relaxed argument
@cocoindex.op.function(behavior_version=1)
def process_person(person: dict[str, Any]) -> Person:
"""Argument can be dict[str, Any], return must be specific Struct."""
return Person(name=person["name"], age=person["age"])
# ✅ LTable return type
@cocoindex.op.function(behavior_version=1)
def filter_people(people: list[Any]) -> list[Person]:
"""Return type specifies list of specific Struct."""
return [p for p in people if p.age >= 18]
# ❌ Wrong: dict[str, str] is not a valid specific CocoIndex type
# @cocoindex.op.function(...)
# def bad_example(person: Person) -> dict[str, str]:
# return {"name": person.name}
For comprehensive data types documentation: https://cocoindex.io/docs/core/data_types
When users need custom transformation logic, create custom functions.
Use standalone function when:
Use spec+executor when:
@cocoindex.op.function(behavior_version=1)
def my_function(input_arg: str, optional_arg: int | None = None) -> dict:
"""
Function description.
Args:
input_arg: Description
optional_arg: Optional description
"""
# Transformation logic
return {"result": f"processed-{input_arg}"}
Requirements:
@cocoindex.op.function()cache=True for expensive ops, behavior_version (required with cache)# 1. Define configuration spec
class MyFunction(cocoindex.op.FunctionSpec):
"""Configuration for MyFunction."""
model_name: str
threshold: float = 0.5
# 2. Define executor
@cocoindex.op.executor_class(cache=True, behavior_version=1)
class MyFunctionExecutor:
spec: MyFunction # Required: link to spec
model = None # Instance variables for state
def prepare(self) -> None:
"""Optional: run once before execution."""
# Load model, setup connections, etc.
self.model = load_model(self.spec.model_name)
def __call__(self, text: str) -> dict:
"""Required: execute for each data row."""
# Use self.spec for configuration
# Use self.model for loaded resources
result = self.model.process(text)
return {"result": result}
When to enable cache:
Important: Increment behavior_version when function logic changes to invalidate cache.
For detailed examples and patterns, load references/custom_functions.md.
For more on custom functions: https://cocoindex.io/docs/custom_ops/custom_functions
Setup flow (create resources):
cocoindex setup main
One-time update:
cocoindex update main
# With auto-setup
cocoindex update --setup main
# Force reset everything before setup and update
cocoindex update --reset main
Live update (continuous monitoring):
cocoindex update main.py -L
# Requires refresh_interval on source or source-specific change capture
Drop flow (remove all resources):
cocoindex drop main.py
Inspect flow:
cocoindex show main.py:FlowName
Test without side effects:
cocoindex evaluate main.py:FlowName --output-dir ./test_output
For complete CLI reference, load references/cli_operations.md.
For CLI documentation: https://cocoindex.io/docs/core/cli
Basic setup:
from dotenv import load_dotenv
import cocoindex
load_dotenv()
cocoindex.init()
@cocoindex.flow_def(name="MyFlow")
def my_flow(flow_builder, data_scope):
# ... flow definition ...
pass
One-time update:
stats = my_flow.update()
print(f"Processed {stats.total_rows} rows")
# Async
stats = await my_flow.update_async()
Live update:
# As context manager
with cocoindex.FlowLiveUpdater(my_flow) as updater:
# Updater runs in background
# Your application logic here
pass
# Manual control
updater = cocoindex.FlowLiveUpdater(
my_flow,
cocoindex.FlowLiveUpdaterOptions(
live_mode=True,
print_stats=True
)
)
updater.start()
# ... application logic ...
updater.wait()
Setup/drop:
my_flow.setup(report_to_stdout=True)
my_flow.drop(report_to_stdout=True)
cocoindex.setup_all_flows()
cocoindex.drop_all_flows()
Query with transform flows:
@cocoindex.transform_flow()
def text_to_embedding(text: cocoindex.DataSlice[str]) -> cocoindex.DataSlice[list[float]]:
return text.transform(
cocoindex.functions.SentenceTransformerEmbed(model="...")
)
# Use in flow for indexing
doc["embedding"] = text_to_embedding(doc["content"])
# Use for querying
query_embedding = text_to_embedding.eval("search query")
For complete API reference and patterns, load references/api_operations.md.
For API documentation: https://cocoindex.io/docs/core/flow_methods
SplitRecursively - Chunk text intelligently
doc["chunks"] = doc["content"].transform(
cocoindex.functions.SplitRecursively(),
language="markdown", # or "python", "javascript", etc.
chunk_size=2000,
chunk_overlap=500
)
ParseJson - Parse JSON strings
data = json_string.transform(cocoindex.functions.ParseJson())
DetectProgrammingLanguage - Detect language from filename
file["language"] = file["filename"].transform(
cocoindex.functions.DetectProgrammingLanguage()
)
SentenceTransformerEmbed - Local embedding model
# Requires: cocoindex[embeddings]
chunk["embedding"] = chunk["text"].transform(
cocoindex.functions.SentenceTransformerEmbed(
model="sentence-transformers/all-MiniLM-L6-v2"
)
)
EmbedText - LLM API embeddings
This is the recommended way to generate embeddings using LLM APIs (OpenAI, Voyage, etc.).
chunk["embedding"] = chunk["text"].transform(
cocoindex.functions.EmbedText(
api_type=cocoindex.LlmApiType.OPENAI,
model="text-embedding-3-small",
)
)
ColPaliEmbedImage - Multimodal image embeddings
# Requires: cocoindex[colpali]
image["embedding"] = image["img_bytes"].transform(
cocoindex.functions.ColPaliEmbedImage(model="vidore/colpali-v1.2")
)
ExtractByLlm - Extract structured data with LLM
This is the recommended way to use LLMs for extraction and summarization tasks. It supports both structured outputs (dataclasses, Pydantic models) and simple text outputs (str).
import dataclasses
# For structured extraction
@dataclasses.dataclass
class ProductInfo:
name: str
price: float
category: str
item["product_info"] = item["text"].transform(
cocoindex.functions.ExtractByLlm(
llm_spec=cocoindex.LlmSpec(
api_type=cocoindex.LlmApiType.OPENAI,
model="gpt-4o-mini"
),
output_type=ProductInfo,
instruction="Extract product information"
)
)
# For text summarization/generation
file["summary"] = file["content"].transform(
cocoindex.functions.ExtractByLlm(
llm_spec=cocoindex.LlmSpec(
api_type=cocoindex.LlmApiType.OPENAI,
model="gpt-4o-mini"
),
output_type=str,
instruction="Summarize this document in one paragraph"
)
)
Browse all sources: https://cocoindex.io/docs/sources/ Browse all targets: https://cocoindex.io/docs/targets/
LocalFile:
cocoindex.sources.LocalFile(
path="documents",
included_patterns=["*.md", "*.txt"],
excluded_patterns=["**/.*", "node_modules"]
)
AmazonS3:
cocoindex.sources.AmazonS3(
bucket="my-bucket",
prefix="documents/",
aws_access_key_id=cocoindex.add_transient_auth_entry("..."),
aws_secret_access_key=cocoindex.add_transient_auth_entry("...")
)
Postgres:
cocoindex.sources.Postgres(
connection=cocoindex.add_auth_entry("conn", cocoindex.sources.PostgresConnection(...)),
query="SELECT id, content FROM documents"
)
Postgres (with vector support):
collector.export(
"target_name",
cocoindex.targets.Postgres(),
primary_key_fields=["id"],
vector_indexes=[
cocoindex.VectorIndexDef(
field_name="embedding",
metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY
)
]
)
Qdrant:
collector.export(
"target_name",
cocoindex.targets.Qdrant(collection_name="my_collection"),
primary_key_fields=["id"]
)
LanceDB:
# Requires: cocoindex[lancedb]
collector.export(
"target_name",
cocoindex.targets.LanceDB(uri="lancedb_data", table_name="my_table"),
primary_key_fields=["id"]
)
Neo4j (nodes):
collector.export(
"nodes",
cocoindex.targets.Neo4j(
connection=neo4j_conn,
mapping=cocoindex.targets.Nodes(label="Entity")
),
primary_key_fields=["id"]
)
Neo4j (relationships):
collector.export(
"relationships",
cocoindex.targets.Neo4j(
connection=neo4j_conn,
mapping=cocoindex.targets.Relationships(
rel_type="RELATES_TO",
source=cocoindex.targets.NodeFromFields(
label="Entity",
fields=[cocoindex.targets.TargetFieldMapping(source="source_id", target="id")]
),
target=cocoindex.targets.NodeFromFields(
label="Entity",
fields=[cocoindex.targets.TargetFieldMapping(source="target_id", target="id")]
)
)
),
primary_key_fields=["id"]
)
cocoindex show main.py--app-dir if not in project root.env has COCOINDEX_DATABASE_URLpsql $COCOINDEX_DATABASE_URL--env-file to specify custom locationcocoindex setup main.pycocoindex drop main.py && cocoindex setup main.pyrefresh_interval to sourcemax_inflight_rows, max_inflight_bytes.env: COCOINDEX_SOURCE_MAX_INFLIGHT_ROWSThis skill includes comprehensive reference documentation for common patterns and operations:
Load these references when users need:
For comprehensive documentation: https://cocoindex.io/docs/ Search specific topics: https://cocoindex.io/docs/search?q=url%20encoded%20keyword