| name | pathling-python |
| description | Comprehensive cheat sheet for using the Pathling Python API. Use this skill when working with FHIR data in Python, running SQL on FHIR queries, using terminology functions, encoding FHIR resources, or any other Pathling Python operations. Trigger keywords include "pathling", "pathling python", "fhir encoding", "sql on fhir python", "terminology functions", "member_of", "translate", "subsumes", "PathlingContext". |
Pathling Python API cheat sheet
You are an expert in using the Pathling Python API for working with FHIR data in Python applications and data science workflows.
Installation
Prerequisites:
pip install pathling
Core concepts
PathlingContext
The main entry point for all Pathling operations. It manages the Spark session and provides access to data reading, encoding, and terminology functions.
Creating a basic context:
from pathling import PathlingContext
pc = PathlingContext.create()
Creating a context with terminology server authentication:
pc = PathlingContext.create(
terminology_server_url='https://ontology.nhs.uk/production1/fhir',
token_endpoint='https://ontology.nhs.uk/authorisation/auth/realms/nhs-digital-terminology/protocol/openid-connect/token',
client_id='[client ID]',
client_secret='[client secret]'
)
Creating a context with caching:
pc = PathlingContext.create(
terminology_server_url="http://localhost:8081/fhir",
terminology_verbose_request_logging=True,
cache_override_expiry=2_628_000,
cache_storage_type="disk",
cache_storage_path=".local/tx-cache"
)
Accessing the Spark session:
pc.spark
pc.spark.sparkContext.setLogLevel("DEBUG")
Reading FHIR data
Reading NDJSON files
NDJSON is a common format for bulk FHIR data, with one JSON resource per line.
ndjson_dir = '/some/path/ndjson/'
json_resources = pc.spark.read.text(ndjson_dir)
patients = pc.encode(json_resources, 'Patient')
patients.select('id', 'gender', 'birthDate').show()
Using the DataSource API:
data = pc.read.ndjson("/some/file/location")
Reading FHIR Bundles
FHIR Bundles contain collections of related resources.
bundles_dir = '/some/path/bundles/'
bundles = pc.spark.read.text(bundles_dir, wholetext=True)
patients = pc.encode_bundle(bundles, 'Patient')
patients.select('id', 'gender', 'birthDate').show()
Reading from Delta tables
data = pc.read.tables()
data.resource_types()
patients = data.read('Patient')
patients.count()
SQL on FHIR views
SQL on FHIR views project FHIR data into easy-to-use tabular forms.
Basic view with simple columns
result = data.view(
resource="Patient",
select=[
{"column": [{"path": "getResourceKey()", "name": "patient_id"}]},
{
"forEach": "address",
"column": [
{"path": "line.join('\\n')", "name": "street"},
{"path": "use", "name": "use"},
{"path": "city", "name": "city"},
{"path": "postalCode", "name": "zip"},
],
},
],
)
display(result)
View with where clause
view_ds = datasource.view(
resource='Patient',
select=[
{
'column': [
{'path': 'id', 'name': 'id'},
{'path': 'gender', 'name': 'gender'},
{'path': "telecom.where(system='phone').value ", 'name': 'phone_numbers', 'collection': True},
]
}
],
where=[
{'path': "gender = 'male'"},
]
)
view_ds.show()
Nested forEach with forEachOrNull
view_ds = datasource.view(
resource='Patient',
select=[
{
'forEach': 'name',
'column': [
{'path': 'use', 'name': 'name_use'},
{'path': 'family', 'name': 'family_name'},
],
'select': [
{
'forEachOrNull': 'given',
'column': [
{'path': '$this', 'name': 'given_name'},
],
}
]
},
]
)
view_ds.show()
Terminology functions
Terminology functions require a FHIR terminology server to be configured.
Helper functions for creating Coding structs
to_coding:
from pathling import to_coding
coding_column = to_coding(df.CODE, 'http://snomed.info/sct')
coding_column = to_coding(df.CODE, 'http://snomed.info/sct', version='http://snomed.info/sct/32506021000036107/version/20250831')
to_snomed_coding:
from pathling import to_snomed_coding
coding_column = to_snomed_coding(df.CODE)
coding_column = to_snomed_coding(df.CODE, version='http://snomed.info/sct/32506021000036107/version/20250831')
to_loinc_coding:
from pathling.functions import to_loinc_coding
coding_column = to_loinc_coding(df.CODE)
Coding class:
from pathling import Coding
coding = Coding('http://snomed.info/sct', '232208008')
snomed_coding = Coding.of_snomed('232208008')
coding_literal = coding.to_literal()
to_ecl_value_set:
from pathling import to_ecl_value_set
viral_infection_ecl = """
<< 64572001|Disease| : (
<< 370135005|Pathological process| = << 441862004|Infectious process|,
<< 246075003|Causative agent| = << 49872002|Virus|
)
"""
value_set_uri = to_ecl_value_set(viral_infection_ecl)
member_of - Value set membership
Test if a code is a member of a value set.
from pathling import member_of, to_snomed_coding, to_ecl_value_set
result = csv.select(
"CODE",
"DESCRIPTION",
member_of(
to_snomed_coding(csv.CODE),
to_ecl_value_set("<< 64572001")
).alias("VIRAL_INFECTION")
)
result.show()
result = csv.select(
"CODE",
"DESCRIPTION",
member_of(
to_coding(csv.CODE, 'http://loinc.org'),
'http://hl7.org/fhir/ValueSet/observation-vitalsignresult'
).alias("IS_VITAL_SIGN")
)
Alternative syntax using PathlingContext:
result = transformed_df.withColumn(
"Viral Infection",
pc.snomed.member_of(col("primary_diagnosis_concept"), "<< 64572001")
)
translate - Concept translation
Translate codes from one code system to another.
from pathling import translate, to_coding
result = pc.translate(
csv,
to_coding(csv.CODE, 'http://snomed.info/sct'),
'http://snomed.info/sct/900000000000207008?fhir_cm=900000000000497000',
output_column_name='READ_CODE'
)
result = result.withColumn('READ_CODE', result.READ_CODE.code)
result.select('CODE', 'DESCRIPTION', 'READ_CODE').show()
subsumes and subsumed_by - Subsumption testing
Test if one code is equal to or a subtype of another code.
from pathling import subsumes, to_snomed_coding, Coding
left_coding = Coding('http://snomed.info/sct', '232208008')
right_coding_column = to_snomed_coding(csv.CODE)
result = pc.subsumes(
csv,
'IS_ENT',
left_coding=left_coding,
right_coding_column=right_coding_column
)
result.select('CODE', 'DESCRIPTION', 'IS_ENT').show()
Using subsumed_by (reverse order):
from pathling import subsumed_by
result = pc.subsumed_by(
csv,
'IS_SUBTYPE',
left_coding_column=to_snomed_coding(csv.CODE),
right_coding=Coding('http://snomed.info/sct', '232208008')
)
property_of - Retrieve code properties
Retrieve properties associated with codes in terminologies.
from pathling import property_of, to_snomed_coding, PropertyType
parents = csv.withColumn(
"PARENTS",
property_of(to_snomed_coding(csv.CODE), "parent", PropertyType.CODE)
)
exploded_parents = parents.selectExpr(
"CODE", "DESCRIPTION", "explode_outer(PARENTS) AS PARENT"
)
PropertyType values:
PropertyType.CODE - Returns an array of codes.
PropertyType.STRING - Returns an array of strings.
PropertyType.INTEGER - Returns an array of integers.
PropertyType.BOOLEAN - Returns an array of booleans.
PropertyType.DATETIME - Returns an array of timestamps.
PropertyType.DECIMAL - Returns an array of decimals.
display - Get preferred display term
Retrieve the preferred display term for codes.
from pathling import display, to_snomed_coding
with_displays = exploded_parents.withColumn(
"PARENT_DISPLAY",
display(to_snomed_coding(exploded_parents.PARENT))
)
with_displays.show()
Alternative syntax using PathlingContext:
transformed_df = source_df.withColumn(
"Primary Diagnosis Term",
pc.snomed.display(col("primary_diagnosis_concept"))
)
designation - Get alternative display terms
Retrieve alternative display terms (synonyms, translations, etc.).
from pathling import designation, to_snomed_coding, Coding
synonyms = csv.withColumn(
"SYNONYMS",
designation(
to_snomed_coding(csv.CODE),
Coding.of_snomed("900000000000013009")
)
)
exploded_synonyms = synonyms.selectExpr(
"CODE", "DESCRIPTION", "explode_outer(SYNONYMS) AS SYNONYM"
)
exploded_synonyms.show()
Common patterns
Grouping and categorising with SNOMED CT
from pathling import PathlingContext, to_snomed_coding, to_ecl_value_set, member_of
from pyspark.sql.functions import col, when
pc = PathlingContext.create()
viral_infection_ecl = "<< 64572001"
musculoskeletal_injury_ecl = "<< 263534002"
mental_health_ecl = "<< 40733004 |Mental state finding|"
categorised_df = df.withColumn(
"Viral Infection",
member_of(to_snomed_coding(col("diagnosis_code")), to_ecl_value_set(viral_infection_ecl))
).withColumn(
"Musculoskeletal Injury",
member_of(to_snomed_coding(col("diagnosis_code")), to_ecl_value_set(musculoskeletal_injury_ecl))
).withColumn(
"Mental Health Problem",
member_of(to_snomed_coding(col("diagnosis_code")), to_ecl_value_set(mental_health_ecl))
)
mutually_exclusive_df = categorised_df.withColumn(
"Category",
when(col("Viral Infection"), "Viral Infection")
.when(~col("Viral Infection") & col("Musculoskeletal Injury"), "Musculoskeletal Injury")
.when(~col("Viral Infection") & ~col("Musculoskeletal Injury") & col("Mental Health Problem"), "Mental Health Problem")
.otherwise("Other")
)
Enriching data with terminology information
from pathling import display, property_of, to_snomed_coding, PropertyType
enriched_df = df.withColumn(
"diagnosis_display",
display(to_snomed_coding(df.diagnosis_code))
)
with_parents = enriched_df.withColumn(
"parent_codes",
property_of(to_snomed_coding(df.diagnosis_code), "parent", PropertyType.CODE)
)
Converting Spark DataFrames to Pandas
After creating a view or running terminology functions, you can convert the result to a Pandas DataFrame for use in Python data science tools.
pandas_df = result.toPandas()
import plotly.express as px
fig = px.bar(pandas_df, x="category", y="count")
fig.show()
Configuration options
PathlingContext.create() parameters
terminology_server_url - URL of the FHIR terminology server.
token_endpoint - OAuth2 token endpoint for authentication.
client_id - OAuth2 client ID.
client_secret - OAuth2 client secret.
terminology_verbose_request_logging - Enable verbose logging of terminology requests.
cache_override_expiry - Cache expiry time in seconds.
cache_storage_type - Cache storage type ("memory" or "disk").
cache_storage_path - Path for disk-based cache.
Spark configuration
When running your own Spark cluster, configure Pathling as a Spark package:
spark.jars.packages au.csiro.pathling:library-api:[version]
MimeType and Version enums
from pathling import MimeType, Version
MimeType.FHIR_JSON
MimeType.FHIR_XML
Version.R4
Databricks installation
Install both packages:
pathling PyPI package
au.csiro.pathling:library-api Maven package
Enable Java 21 support in Advanced Options > Spark > Environment Variables:
JNAME=zulu21-ca-amd64
Common gotchas
-
Always create Coding structs when using terminology functions - Use to_coding(), to_snomed_coding(), or the Coding class to convert code columns into the proper struct format.
-
ECL expressions need to be converted to value set URIs - Use to_ecl_value_set() to convert ECL expressions before passing them to member_of().
-
Terminology functions return new columns - Use .withColumn() or .select() to add terminology function results to your DataFrame.
-
Resource encoding requires the correct resource type - Make sure to specify the correct FHIR resource type when using encode() or encode_bundle().
-
SQL on FHIR views use FHIRPath syntax - The path elements in view definitions use FHIRPath expressions, not SQL or Python syntax.
-
PathlingContext manages the Spark session - Access the Spark session via pc.spark, don't create a separate one.
Best practices
-
Use DataSource API for reading data - Prefer pc.read.ndjson() or pc.read.tables() over manual encoding.
-
Cache terminology results - Configure terminology caching to avoid repeated requests to the terminology server.
-
Use appropriate terminology server - For Australian FHIR content, use the Australian terminology server.
-
Batch terminology operations - Process data in batches to improve performance of terminology operations.
-
Use SQL on FHIR views for complex projections - Views provide a declarative way to flatten and transform FHIR data.
-
Profile your Spark jobs - Use Spark's monitoring tools to identify performance bottlenecks.
-
Set appropriate log levels - Use pc.spark.sparkContext.setLogLevel() to control logging verbosity.