| name | affinity-python-sdk |
| description | Writes Python code using the Affinity SDK for CRM data access and automation. |
| when_to_use | Use when user asks to write Python scripts for Affinity, mentions affinity-sdk, typed IDs, async client, pagination, or Affinity Python code. |
Affinity Python SDK
IMPORTANT: Write Operations Require Explicit User Request
Always use read-only mode by default. Only allow writes when the user explicitly requests data modification.
from affinity.policies import Policies, WritePolicy
with Affinity.from_env(policies=Policies(write=WritePolicy.DENY)) as client:
...
with Affinity.from_env() as client:
...
Installation
pip install affinity-sdk
pip install "affinity-sdk[dotenv]"
Client Initialization
from affinity import Affinity, AsyncAffinity
from affinity.policies import Policies, WritePolicy
with Affinity.from_env(load_dotenv=True, policies=Policies(write=WritePolicy.DENY)) as client:
me = client.whoami()
companies = client.companies.all()
async with AsyncAffinity.from_env(policies=Policies(write=WritePolicy.DENY)) as client:
companies = await client.companies.all()
Multi-Source Tasks: Output Only the Summary
When a task combines data from multiple Affinity sources (e.g., person + interactions + list entries), fetch everything in one script and print only the relevant summary. Never dump raw model_dump_json() — it floods the conversation with hundreds of lines the agent must parse just to extract a few facts.
Do this when: combining entity details with interactions, cross-referencing list entries with entities, generating reports from multiple queries.
A single SDK call is fine when: fetching one entity, listing one page of results, or performing a single write.
Bad: dumping raw models
person = client.persons.get(PersonId(123))
print(person.model_dump_json(indent=2))
Good: extract and print only what's needed
person = client.persons.get(PersonId(123))
print(f"Name: {person.first_name} {person.last_name}")
print(f"Email: {person.primary_email}")
Full example: deals with no recent contact
"""Find pipeline deals with no contact in 30 days."""
from datetime import datetime, timedelta, timezone
from affinity import Affinity
from affinity.policies import Policies, WritePolicy
from affinity.types import InteractionType, FieldType
with Affinity.from_env(load_dotenv=True, policies=Policies(write=WritePolicy.DENY)) as client:
pipeline = client.lists.resolve(name="Dealflow")
entries = client.lists.entries(pipeline.id).all(
field_types=[FieldType.LIST],
expand=["interactionDates"],
)
cutoff = datetime.now(timezone.utc) - timedelta(days=30)
stale = []
for entry in entries:
last = getattr(entry, "last_interaction_date", None)
if last is None or last < cutoff:
stale.append(entry)
print(f"Deals with no contact in 30 days: {len(stale)}/{len(entries)}")
for e in stale:
days = (datetime.now(timezone.utc) - e.last_interaction_date).days if e.last_interaction_date else "never"
print(f" {e.entity_name}: last contact {days} days ago")
When to use bash instead
For simple 2-command pipelines (fetch an ID, then use it in a second command), a bash script with xaffinity session start + jq is lighter weight. See the CLI skill's "Multi-Source Tasks" section.
Typed IDs (ALWAYS USE)
Prevent mixing up entity types by using typed IDs:
from affinity.types import (
PersonId, CompanyId, ListId, ListEntryId,
OpportunityId, FieldId, NoteId, UserId
)
person = client.persons.get(PersonId(123))
company = client.companies.get(CompanyId(456))
entries = client.lists.entries(ListId(789))
creator = client.persons.get(note.creator_id)
person = client.persons.get(123)
Pagination Patterns
page = client.companies.list(limit=50)
for company in page.data:
process(company)
all_companies = client.companies.all()
companies = client.companies.all(limit=1000)
companies = client.companies.all(limit=None)
for person in client.persons.iter():
process(person)
for page in client.companies.pages():
for company in page.data:
process(company)
from affinity import PaginationProgress
def log_progress(p: PaginationProgress) -> None:
print(f"Page {p.page_number}: {p.items_so_far} items")
for company in client.companies.all(on_progress=log_progress):
...
Filtering (Custom Fields Only)
Note: Global-entity lists (companies, persons, opportunities) do NOT accept filter= — server-side filtering is not supported on these endpoints, and the SDK raises ValueError to prevent silently-unfiltered results. Use search_pages() for name/domain/email fuzzy search, or filter list entries (which support client-side filtering).
from affinity import F
for page in client.companies.search_pages("Acme"):
for company in page.data:
...
for page in client.persons.search_pages("alex@acme.com"):
for person in page.data:
...
entries = client.lists.entries(ListId(123)).list(
filter=F.field("Department").equals("Sales")
)
entries = client.lists.entries(ListId(123)).list(saved_view_id=SavedViewId(456))
Do NOT pass filter= to client.companies.list() / client.persons.list() / client.opportunities.list() — it raises ValueError with a hint to use search_pages().
Duplicate prevention on create
companies.create() and persons.create() default to if_not_exists=True. On conflict they raise DuplicateEntityError carrying the existing entity ID so callers can recover without creating a duplicate:
from affinity.exceptions import DuplicateEntityError
from affinity.models import CompanyCreate, CompanyId
try:
company = client.companies.create(
CompanyCreate(name="Elssway", domain="elssway.com")
)
except DuplicateEntityError as e:
company = client.companies.get(CompanyId(e.existing_id))
Pass if_not_exists=False only when you deliberately want to create a distinct record that collides on name/domain.
Services Reference
with Affinity.from_env() as client:
client.persons.list() / .get() / .all() / .search()
client.companies.list() / .get() / .all() / .search()
client.opportunities.list() / .get() / .all()
client.lists.list() / .get() / .all()
client.lists.resolve(name="Pipeline Name")
client.lists.get_fields(ListId(123))
entries_service = client.lists.entries(ListId(123))
entries_service.list() / .get() / .all()
entries_service.add_person() / .add_company() / .add_opportunity()
entries_service.update_field_value() / .batch_update_fields()
client.notes.list() / .create()
client.reminders.list() / .create()
client.interactions.list(type=..., start_time=..., end_time=..., person_id=...)
client.interactions.iter(type=..., start_time=..., person_id=...)
snapshot = client.rate_limits.snapshot()
me = client.whoami()
Error Handling
from affinity.exceptions import (
AffinityError,
AuthenticationError,
AuthorizationError,
NotFoundError,
ValidationError,
MergedEntityError,
CompanyMergedError,
PersonMergedError,
RateLimitError,
ServerError,
WriteNotAllowedError,
TooManyResultsError,
)
try:
company = client.companies.get(CompanyId(123))
except CompanyMergedError as e:
company = client.companies.get(CompanyId(e.target_id))
except NotFoundError:
print("Company not found")
except RateLimitError as e:
print(f"Rate limited. Retry after: {e.retry_after}")
except AffinityError as e:
print(f"Error: {e}")
if e.diagnostics:
print(f"Request ID: {e.diagnostics.request_id}")
Creating Records (requires explicit user approval)
from affinity.models import NoteCreate, ReminderCreate
from affinity.types import NoteType, ReminderType
from datetime import datetime, timedelta
entries_service = client.lists.entries(ListId(123))
entry = entries_service.add_person(PersonId(456))
entry = entries_service.add_company(CompanyId(789))
note = client.notes.create(NoteCreate(
content="<p>Meeting notes</p>",
type=NoteType.HTML,
person_ids=[PersonId(123)],
))
reminder = client.reminders.create(ReminderCreate(
owner_id=UserId(me.user.id),
type=ReminderType.ONE_TIME,
content="Follow up",
due_date=datetime.now() + timedelta(days=7),
person_id=PersonId(123),
))
entries_service.update_field_value(
ListEntryId(456),
FieldId(789),
"New Value"
)
entries_service.batch_update_fields(
ListEntryId(456),
{FieldId(789): "Value1", FieldId(790): "Value2"}
)
Field Selection
from affinity.types import FieldType
client.companies.list(field_types=[FieldType.ENRICHED])
client.persons.get(PersonId(123), field_types=[FieldType.GLOBAL, FieldType.RELATIONSHIP_INTELLIGENCE])
if company.fields.requested:
for field_name, value in company.fields.data.items():
print(f"{field_name}: {value}")
Resolving Fields by Name
Pattern A: Reading field values by name — use FieldResolver.get():
from affinity.field_resolver import FieldResolver
from affinity.types import ListId, FieldType, ResolveMode
fields = client.lists.get_fields(ListId(123))
resolver = FieldResolver(fields)
entries = client.lists.entries(ListId(123)).all(field_types=[FieldType.LIST])
for entry in entries:
status = resolver.get(entry, "Status")
owner = resolver.get(entry, "Owner", resolve=ResolveMode.TEXT)
FieldResolver.get() maps names to values. FieldResolver.find_field() maps names to metadata (including FieldId).
Pattern B: Getting FieldId for write operations — use FieldResolver.find_field():
fields = client.lists.get_fields(ListId(123))
resolver = FieldResolver(fields)
status_meta = resolver.find_field("Status")
if status_meta:
entries_service = client.lists.entries(ListId(123))
entries_service.update_field_value(ListEntryId(456), status_meta.id, "Active")
Enriched Fields
Enriched fields (Phone Number, Source of Introduction, Industry, Location, Description, etc.) are returned on entity.fields.data like any other field when you request them via field_types=[FieldType.ENRICHED].
Most enriched fields are writable via the normal update_field_value() path using their FieldMetadata.id. A small number are purely derived (notably "Current Organization" on persons, which is computed from email domain) and cannot be written — the SDK raises EnrichedFieldNotWritableError (subclass of UnsupportedOperationError) for these.
from affinity import EnrichedFieldNotWritableError
For fresh post-write reads where you want to skip the in-memory field-metadata cache, pass skip_cache=True to client.fields.list(...).
Rate Limits
snapshot = client.rate_limits.snapshot()
print(f"Per-minute: {snapshot.api_key_per_minute.remaining}/{snapshot.api_key_per_minute.limit}")
print(f"Monthly: {snapshot.org_monthly.remaining}/{snapshot.org_monthly.limit}")
refreshed = client.rate_limits.refresh()
Retry Behavior
- GET/HEAD: Automatic retries (3 by default) for rate limits and transient errors
- POST/PUT/PATCH/DELETE: No automatic retries (to avoid duplicates)
client = Affinity(api_key="key", max_retries=5)
Documentation