| name | connector-ops |
| description | Manage Unstract connectors - add, remove, or modify database, filesystem, and queue connectors. This skill handles backend code, JSON schemas, tests, logo fetching, and dependency management. Use when the user wants to create a new connector, delete an existing one, or modify connector behavior.
|
Connector Operations Skill
Manage Unstract connectors with full lifecycle support: add, remove, and modify operations for database, filesystem, and queue connector types.
When to Use This Skill
- User requests adding a new connector (e.g., "add a Redis connector", "create MongoDB database connector")
- User requests removing a connector (e.g., "remove the Oracle connector", "delete Box filesystem")
- User requests modifying a connector (e.g., "add SSL support to MySQL", "update BigQuery schema")
Architecture Overview
Connectors live in /unstract/connectors/src/unstract/connectors/ with three categories:
| Type | Base Class | Directory | Mode |
|---|
| Database | UnstractDB | databases/ | ConnectorMode.DATABASE |
| Filesystem | UnstractFileSystem | filesystems/ | ConnectorMode.FILE_SYSTEM |
| Queue | UnstractQueue | queues/ | ConnectorMode.MANUAL_REVIEW |
Connector Structure
Each connector follows this structure:
connector_name/
├── __init__.py # Metadata dict with is_active flag
├── connector_name.py # Main connector class
├── constants.py # Optional constants
└── static/
├── json_schema.json # Configuration UI schema
└── settings.yaml # Optional settings
Key Files
| File | Purpose |
|---|
base.py | Root UnstractConnector abstract class |
connectorkit.py | Singleton registry for all connectors |
databases/unstract_db.py | Database connector base class |
filesystems/unstract_file_system.py | Filesystem connector base class |
queues/unstract_queue.py | Queue connector base class |
databases/register.py | Auto-discovery for database connectors |
filesystems/register.py | Auto-discovery for filesystem connectors |
Operation: ADD Connector
Step 1: Gather Requirements
Ask the user for:
- Connector name (e.g., "Redis", "MongoDB", "Wasabi")
- Connector type (database, filesystem, or queue)
- Brief description of use case
Step 2: Research the Service
Use web search to discover:
- Official Python library for the service
- Existing fsspec provider — check if a mature fsspec-compatible library exists (e.g.,
s3fs for S3, adlfs for Azure, gcsfs for GCS, boxfs for Box). Always prefer wrapping an existing provider over building a custom one.
- All supported authentication modes (API key, OAuth, connection string, certificates, etc.)
- Required connection parameters for each auth mode
- Official logo/icon source
Document findings before proceeding. If no fsspec provider exists, note that a custom AbstractFileSystem subclass will be needed (see Step 5b).
Step 3: Generate Connector ID
Create a unique connector ID using this pattern:
f"{short_name}|{uuid4()}"
Step 4: Create Directory Structure
mkdir -p unstract/connectors/src/unstract/connectors/databases/{connector_name}/static
mkdir -p unstract/connectors/src/unstract/connectors/filesystems/{connector_name}/static
mkdir -p unstract/connectors/src/unstract/connectors/queues/{connector_name}/static
Step 5: Create Connector Files
5a. Create __init__.py
from .{connector_name} import {ClassName}
metadata = {
"name": {ClassName}.__name__,
"version": "1.0.0",
"connector": {ClassName},
"description": "{Description of the connector}",
"is_active": True,
}
5b. Create Main Connector Class
Use the appropriate template from assets/templates/:
database.py.template for database connectors
filesystem.py.template for filesystem connectors
queue.py.template for queue connectors
Read the template and adapt it for the specific service. Key methods to implement:
For Database Connectors:
get_engine() → Return database connection
sql_to_db_mapping() → Map Python types to DB types
execute() → Execute queries (inherited, may override)
For Filesystem Connectors:
There are two approaches depending on whether an fsspec provider exists:
Approach 1: Wrapping an existing fsspec provider (PREFERRED)
When a library like s3fs, adlfs, gcsfs, boxfs exists, the connector only extends UnstractFileSystem:
get_fsspec_fs() → Return the provider's filesystem instance (e.g., S3FileSystem(...))
test_credentials() → Verify connection works
extract_metadata_file_hash() → Extract file hash from metadata
is_dir_by_metadata() → Check if path is directory
extract_modified_date() → Extract last modified date
- Do NOT implement
ls, walk, info, etc. — the provider handles all file operations.
- See
minio.py for reference.
Approach 2: Custom AbstractFileSystem (ONLY when no provider exists)
Create a custom AbstractFileSystem subclass with ONLY these core methods:
ls(path, detail=True) — List directory contents (REQUIRED)
info(path) — Get single item metadata (optimization, optional)
_open(path, mode) — Open a file for reading
cat_file(path) — Read file contents
pipe_file(path, value) — Write bytes (NOT write_bytes — that's an alias)
rm(path) — Delete a file
mkdir(path) — Create directory
CRITICAL: NEVER override listdir(), stat(), exists(), isdir(), isfile(), delete(), read_bytes(), or write_bytes(). These are provided by AbstractFileSystem and delegate to ls()/info()/pipe_file().
walk() exception: Override walk() ONLY if the filesystem has path normalization that differs from _strip_protocol (e.g., SharePoint maps "" to "root"), or if the service raises exceptions that aren't FileNotFoundError/OSError (the only types the base catches). If you override walk(), you MUST support the detail kwarg (return dicts when True, lists when False) and the on_error callback.
- See
sharepoint.py for reference.
For Queue Connectors:
get_engine() → Return queue connection
enqueue() → Add message to queue
dequeue() → Get message from queue
peek() → View next message without removing
5c. Create JSON Schema
Generate static/json_schema.json based on researched auth modes:
{
"title": "{Connector Display Name}",
"type": "object",
"allOf": [
{
"required": ["connectorName"],
"properties": {
"connectorName": {
"type": "string",
"title": "Name of the connector"
}
}
},
{
"oneOf": [
]
}
]
}
Use "format": "password" for sensitive fields.
Step 6: Fetch Logo
Execute the logo fetch script:
python .claude/skills/connector-ops/scripts/fetch_logo.py "{service_name}" "{output_path}"
The script tries these sources in order:
- SimpleIcons (simpleicons.org)
- Devicon (devicon.dev)
- Logo.dev API
- Web search for official logo
- Skip if not found (log warning)
Place logo at: /frontend/public/icons/connector-icons/{ConnectorName}.png
Step 7: Add Dependencies
Research required Python packages and add to pyproject.toml:
cat unstract/connectors/pyproject.toml
Step 8: Create Tests
Generate both mock-based and integration tests in unstract/connectors/tests/:
Mock-based Test (always runnable):
import unittest
from unittest.mock import patch, Mock
class Test{ClassName}(unittest.TestCase):
def setUp(self):
self.config = {
}
@patch("{module_path}.{connection_method}")
def test_connection_params(self, mock_connect):
mock_connect.return_value = Mock()
connector = {ClassName}(self.config)
Integration Test (requires real service):
import os
import unittest
class Test{ClassName}Integration(unittest.TestCase):
@unittest.skipUnless(
os.getenv("{CONNECTOR}_HOST"),
"Integration test requires {CONNECTOR}_* environment variables"
)
def test_real_connection(self):
config = {
"host": os.getenv("{CONNECTOR}_HOST"),
}
connector = {ClassName}(config)
self.assertTrue(connector.test_credentials())
Step 9: Verify
Run verification in order:
cd unstract/connectors && python -m py_compile src/unstract/connectors/{type}/{name}/{name}.py
cd unstract/connectors && python -m pytest tests/{type}/test_{name}.py -v
cd unstract/connectors && python -m pytest tests/ -v --ignore=tests/{type}/test_{name}_integration.py
Step 10: Report to User
Provide summary:
## Connector Added: {Name}
**Files created:**
- `src/unstract/connectors/{type}/{name}/__init__.py`
- `src/unstract/connectors/{type}/{name}/{name}.py`
- `src/unstract/connectors/{type}/{name}/static/json_schema.json`
- `tests/{type}/test_{name}.py`
- `tests/{type}/test_{name}_integration.py`
- `frontend/public/icons/connector-icons/{Name}.png`
**Dependencies added:**
- `{package}~={version}`
**Verification:**
- Syntax check: PASSED
- Mock tests: PASSED (X tests)
- Test suite: PASSED
**To run integration tests:**
```bash
export {CONNECTOR}_HOST=your_host
export {CONNECTOR}_USER=your_user
export {CONNECTOR}_PASSWORD=your_password
cd unstract/connectors && python -m pytest tests/{type}/test_{name}_integration.py -v
---
## Operation: REMOVE Connector
### Step 1: Identify Connector
Locate the connector by name or ID:
```bash
# Search for connector
grep -r "class {Name}" unstract/connectors/src/
Step 2: Check Dependencies
Search for usages across the codebase:
grep -r "{connector_id}" --include="*.py" .
grep -r "from.*{connector_name}" --include="*.py" .
Warn user if connector is referenced elsewhere.
Step 3: Remove Files
rm -rf unstract/connectors/src/unstract/connectors/{type}/{name}/
rm -f unstract/connectors/tests/{type}/test_{name}*.py
rm -f frontend/public/icons/connector-icons/{Name}.png
Step 4: Clean Dependencies (Optional)
If the removed connector was the only user of a dependency, offer to remove it from pyproject.toml.
Step 5: Verify
cd unstract/connectors && python -c "from unstract.connectors.connectorkit import Connectorkit; Connectorkit()"
cd unstract/connectors && python -m pytest tests/ -v
Step 6: Report to User
## Connector Removed: {Name}
**Files deleted:**
- `src/unstract/connectors/{type}/{name}/` (directory)
- `tests/{type}/test_{name}.py`
- `tests/{type}/test_{name}_integration.py`
- `frontend/public/icons/connector-icons/{Name}.png`
**Verification:**
- Import check: PASSED
- Test suite: PASSED
Operation: MODIFY Connector
Step 1: Understand the Change
Ask user what modification is needed:
- Add new configuration field?
- Add new authentication mode?
- Fix a bug?
- Update dependency version?
- Change behavior?
Step 2: Locate Files
find unstract/connectors -name "*{connector_name}*" -type f
Step 3: Make Changes
Based on modification type:
Adding configuration field:
- Update
static/json_schema.json with new field
- Update connector class
__init__ to read new field
- Update usage of field in connector methods
- Update tests to cover new field
Adding authentication mode:
- Research new auth mode parameters
- Add new
oneOf option in JSON schema
- Update connector class to handle new auth
- Add tests for new auth mode
Note: For connectors that support both OAuth and Client Credentials (like SharePoint/OneDrive), use the Dual Authentication pattern documented in references/connector_patterns.md. This pattern uses an auth_type field with JSON Schema dependencies to conditionally show/hide credential fields based on the selected authentication method.
Bug fix:
- Identify root cause
- Implement fix
- Add regression test
Dependency update:
- Update version in
pyproject.toml
- Check for breaking changes
- Update connector code if needed
- Run tests to verify
Step 4: Update Tests
Ensure tests cover the modification:
- Add new test cases for new functionality
- Update existing tests if behavior changed
- Run full test suite
Step 5: Verify
cd unstract/connectors && python -m py_compile src/unstract/connectors/{type}/{name}/{name}.py
cd unstract/connectors && python -m pytest tests/{type}/test_{name}*.py -v
cd unstract/connectors && python -m pytest tests/ -v
Step 6: Report to User
## Connector Modified: {Name}
**Changes:**
- {Description of each change}
**Files modified:**
- {List of modified files}
**Tests:**
- Added: {count} new test(s)
- Modified: {count} existing test(s)
**Verification:**
- Syntax check: PASSED
- Connector tests: PASSED
- Full suite: PASSED
Reference Materials
Consult these files for detailed patterns:
references/connector_patterns.md - Common patterns and anti-patterns
references/json_schema_examples.md - JSON schema examples for all auth types
references/test_patterns.md - Test patterns and fixtures
Asset Templates
Use these templates as starting points:
assets/templates/database.py.template
assets/templates/filesystem.py.template
assets/templates/queue.py.template
assets/templates/init.py.template
assets/templates/json_schema_template.json
assets/templates/test_mock.py.template
assets/templates/test_integration.py.template
Important Notes
-
Fork Safety: For connectors using Google APIs or gRPC, implement lazy loading to prevent SIGSEGV in Celery workers. See Google Drive connector for pattern.
-
UUID Consistency: Once a connector ID is assigned, never change it. Existing installations may reference it.
-
Schema Backwards Compatibility: When modifying schemas, ensure existing configurations remain valid.
-
Icon Naming: Use PascalCase with spaces URL-encoded (e.g., Google%20Drive.png).
-
Test Isolation: Mock tests should never require external services. Use @unittest.skipUnless for integration tests.
-
fsspec Delegation: NEVER override walk(), listdir(), stat(), exists(), isdir(), isfile(), delete(), read_bytes(), or write_bytes() in custom AbstractFileSystem subclasses. These are provided by fsspec's base class and delegate to ls()/info()/pipe_file(). Overriding them creates maintenance burden and introduces subtle bugs. See references/connector_patterns.md for the "Anti-Pattern: Reimplementing fsspec Base Methods" section.
-
pipe_file vs write_bytes: When implementing file writes in a custom AbstractFileSystem, override pipe_file() (not write_bytes()). The base write_bytes() is an alias that delegates to pipe_file().