| name | sql-query-generator |
| description | Generate secure SQL queries with validation, pagination helpers, risk analysis, and audit-focused safeguards. |
| version | 0.3.0 |
SQL Query Generator Skill
Overview
This skill enables AI agents to generate accurate, optimized SQL queries from natural language descriptions. It supports multiple database systems and follows best practices for query construction, security, and performance.
Installation
Method 1: Direct Download
git clone https://github.com/yourusername/sql-query-generator.git
cd sql-query-generator
python sql_query_generator.py
Method 2: Using as a Module
cp sql_query_generator.py /path/to/your/project/
from sql_query_generator import SQLQueryGenerator, DatabaseType
Method 3: AI Agent Integration
For AI agents using this skill:
- Read this SKILL.md file completely before generating queries
- Follow all security guidelines strictly
- Always use parameterized queries
- Validate all inputs before query generation
- Include security warnings in responses
Optional Database Drivers
Install only the drivers you need:
pip install psycopg2-binary
pip install mysql-connector-python
pip install pyodbc
pip install cx_Oracle
pip install pytest pytest-cov
System Requirements
- Python 3.7 or higher
- No external dependencies for core query generation
- Database drivers only needed for actual query execution
Supported Database Systems
- PostgreSQL
- MySQL
- SQLite
- Microsoft SQL Server
- Oracle Database
- MariaDB
Core Capabilities
1. Query Generation
- SELECT Queries: Simple and complex data retrieval
- JOIN Operations: INNER, LEFT, RIGHT, FULL OUTER, CROSS
- Aggregations: GROUP BY, HAVING, aggregate functions
- Subqueries: Correlated and non-correlated
- CTEs: Common Table Expressions (WITH clause)
- Window Functions: OVER, PARTITION BY, ROW_NUMBER, RANK
- INSERT/UPDATE/DELETE: Data manipulation queries
- DDL: CREATE, ALTER, DROP statements
2. Query Optimization
- Index usage recommendations
- Query execution plan analysis
- Performance optimization suggestions
- Avoiding N+1 query problems
3. Security Features
- SQL injection prevention
- Parameterized query generation
- Input validation patterns
- Role-based access control patterns
Usage Instructions
Basic Query Generation
When generating SQL queries, follow these steps:
-
Understand the Request
- Parse natural language input
- Identify required tables
- Determine join conditions
- Extract filter criteria
-
Generate Base Query
SELECT
column1,
column2,
aggregate_function(column3) AS alias
FROM
table1
JOIN
table2 ON table1.id = table2.foreign_id
WHERE
condition1 = value1
AND condition2 > value2
GROUP BY
column1, column2
HAVING
aggregate_condition
ORDER BY
column1 DESC
LIMIT 100;
-
Apply Security Measures
- Use parameterized queries
- Validate all inputs
- Escape special characters
Query Patterns
Pattern 1: Simple SELECT
SELECT
id,
username,
email,
registration_date
FROM
users
WHERE
registration_date > $1
ORDER BY
registration_date DESC;
Pattern 2: JOIN with Aggregation
SELECT
c.customer_name,
c.email,
COUNT(o.order_id) AS total_orders,
SUM(o.total_amount) AS total_spent
FROM
customers c
INNER JOIN
orders o ON c.customer_id = o.customer_id
WHERE
EXTRACT(YEAR FROM o.order_date) = $1
GROUP BY
c.customer_id,
c.customer_name,
c.email
HAVING
COUNT(o.order_id) > 5
ORDER BY
total_spent DESC;
Pattern 3: Subquery
SELECT
product_name,
price,
category
FROM
products
WHERE
price > (
SELECT AVG(price)
FROM products
)
ORDER BY
price DESC;
Pattern 4: CTE (Common Table Expression)
WITH product_sales AS (
SELECT
p.product_id,
p.product_name,
p.category_id,
c.category_name,
SUM(oi.quantity * oi.unit_price) AS total_sales,
ROW_NUMBER() OVER (
PARTITION BY p.category_id
ORDER BY SUM(oi.quantity * oi.unit_price) DESC
) AS rank_in_category
FROM
products p
JOIN
order_items oi ON p.product_id = oi.product_id
JOIN
categories c ON p.category_id = c.category_id
GROUP BY
p.product_id,
p.product_name,
p.category_id,
c.category_name
)
SELECT
category_name,
product_name,
total_sales,
rank_in_category
FROM
product_sales
WHERE
rank_in_category <= 3
ORDER BY
category_name,
rank_in_category;
Pattern 5: Window Functions
SELECT
sale_date,
daily_total,
SUM(daily_total) OVER (
ORDER BY sale_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS running_total,
AVG(daily_total) OVER (
ORDER BY sale_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS moving_average_7days
FROM (
SELECT
DATE(order_date) AS sale_date,
SUM(total_amount) AS daily_total
FROM
orders
GROUP BY
DATE(order_date)
) daily_sales
ORDER BY
sale_date;
Best Practices
1. Query Structure
- Always use explicit column names (avoid SELECT *)
- Use meaningful table aliases
- Indent for readability
- Comment complex logic
2. Performance
- Create appropriate indexes
- Avoid SELECT DISTINCT when possible (use GROUP BY instead)
- Use EXISTS instead of IN for large datasets
- Limit result sets when appropriate
- Use EXPLAIN to analyze query plans
3. Security (CRITICAL)
3.1 MANDATORY Security Rules
THESE RULES ARE NON-NEGOTIABLE AND MUST ALWAYS BE FOLLOWED:
-
NEVER CONCATENATE USER INPUT INTO SQL
query = f"SELECT * FROM users WHERE username = '{user_input}'"
query = "SELECT * FROM users WHERE username = %s"
cursor.execute(query, (user_input,))
-
ALL VALUES MUST BE PARAMETERIZED
- Even seemingly "safe" values like numbers
- Even values from "trusted" sources
- Even internal application values
- NO EXCEPTIONS
-
VALIDATE AND SANITIZE ALL INPUTS
VALID_STATUSES = ['active', 'inactive', 'pending']
if status not in VALID_STATUSES:
raise ValueError("Invalid status")
if not isinstance(user_id, int):
raise TypeError("user_id must be integer")
if len(username) > 50:
raise ValueError("username too long")
-
ESCAPE DYNAMIC IDENTIFIERS PROPERLY
from psycopg2 import sql
query = sql.SQL("SELECT * FROM {} WHERE id = %s").format(
sql.Identifier(table_name)
)
cursor.execute(query, (user_id,))
3.2 Input Validation Framework
import re
from typing import Any, List, Optional
class SQLInputValidator:
"""Comprehensive input validation for SQL queries"""
@staticmethod
def validate_identifier(identifier: str, max_length: int = 63) -> str:
"""Validate table/column names"""
if len(identifier) > max_length:
raise ValueError(f"Identifier too long: {len(identifier)} > {max_length}")
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', identifier):
raise ValueError(f"Invalid identifier: {identifier}")
SQL_KEYWORDS = {
'SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE',
'ALTER', 'TRUNCATE', 'UNION', 'JOIN', 'WHERE', 'FROM'
}
if identifier.upper() in SQL_KEYWORDS:
raise ValueError(f"SQL keyword not allowed: {identifier}")
return identifier
@staticmethod
def validate_integer(value: Any, min_val: Optional[int] = None,
max_val: Optional[int] = None) -> int:
"""Validate integer values"""
try:
int_value = int(value)
except (ValueError, TypeError):
raise ValueError(f"Invalid integer: {value}")
if min_val is not None and int_value < min_val:
raise ValueError(f"Value {int_value} below minimum {min_val}")
if max_val is not None and int_value > max_val:
raise ValueError(f"Value {int_value} above maximum {max_val}")
return int_value
@staticmethod
def validate_string(value: str, max_length: int = 255,
allow_empty: bool = False) -> str:
"""Validate string values"""
if not isinstance(value, str):
raise TypeError("Value must be string")
if not allow_empty and len(value) == 0:
raise ValueError("Empty string not allowed")
if len(value) > max_length:
raise ValueError(f"String too long: {len(value)} > {max_length}")
if '\x00' in value:
raise ValueError("Null bytes not allowed in string")
return value
@staticmethod
def validate_email(email: str) -> str:
"""Validate email format"""
email = SQLInputValidator.validate_string(email, max_length=254)
if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
raise ValueError(f"Invalid email format: {email}")
return email
@staticmethod
def validate_date(date_str: str) -> str:
"""Validate date format (YYYY-MM-DD)"""
if not re.match(r'^\d{4}-\d{2}-\d{2}$', date_str):
raise ValueError(f"Invalid date format: {date_str}")
return date_str
@staticmethod
def validate_enum(value: str, allowed_values: List[str]) -> str:
"""Validate value against whitelist"""
if value not in allowed_values:
raise ValueError(f"Invalid value: {value}. Allowed: {allowed_values}")
return value
3.3 SQL Injection Attack Patterns to Prevent
INJECTION_PATTERNS = [
r"('|(\\')|(--)|(\#)|(%23)|(;))",
r"((\%27)|(\'))",
r"(union.*select)",
r"(insert.*into)",
r"(update.*set)",
r"(delete.*from)",
r"(drop.*table)",
r"(exec(\s|\+)+(s|x)p\w+)",
r"(script.*>)",
]
def detect_injection_attempt(value: str) -> bool:
"""Detect potential SQL injection attempts"""
value_lower = value.lower()
for pattern in INJECTION_PATTERNS:
if re.search(pattern, value_lower):
return True
return False
3.4 Secure Query Builder
class SecureQueryBuilder:
"""Build SQL queries with mandatory security checks"""
def __init__(self, db_type: DatabaseType):
self.db_type = db_type
self.validator = SQLInputValidator()
self.params = []
def build_select(self, table: str, columns: List[str],
conditions: dict) -> tuple:
"""Build SELECT query with validation"""
table = self.validator.validate_identifier(table)
validated_columns = [
self.validator.validate_identifier(col)
for col in columns
]
query = f"SELECT {', '.join(validated_columns)} FROM {table}"
if conditions:
where_parts = []
for key, value in conditions.items():
key = self.validator.validate_identifier(key)
where_parts.append(f"{key} = %s")
self.params.append(value)
query += " WHERE " + " AND ".join(where_parts)
return query, tuple(self.params)
3.5 Database Connection Security
import ssl
from typing import Optional
class SecureConnection:
"""Secure database connection configuration"""
@staticmethod
def get_postgresql_ssl_config() -> dict:
"""PostgreSQL SSL configuration"""
return {
'sslmode': 'require',
'sslrootcert': '/path/to/ca-cert.pem',
'sslcert': '/path/to/client-cert.pem',
'sslkey': '/path/to/client-key.pem'
}
@staticmethod
def get_connection_timeout() -> dict:
"""Connection timeout settings"""
return {
'connect_timeout': 10,
'command_timeout': 30,
'keepalives': 1,
'keepalives_idle': 30,
'keepalives_interval': 10,
'keepalives_count': 5
}
@staticmethod
def create_secure_connection(database_url: str) -> Any:
"""Create connection with security settings"""
import psycopg2
conn = psycopg2.connect(
database_url,
**SecureConnection.get_postgresql_ssl_config(),
**SecureConnection.get_connection_timeout()
)
cursor = conn.cursor()
cursor.execute("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE")
cursor.execute("SET statement_timeout = 30000")
cursor.close()
return conn
3.6 Rate Limiting
import time
from collections import defaultdict
from threading import Lock
class RateLimiter:
"""Prevent query flooding attacks"""
def __init__(self, max_requests: int = 100, window_seconds: int = 60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(list)
self.lock = Lock()
def is_allowed(self, identifier: str) -> bool:
"""Check if request is allowed"""
with self.lock:
now = time.time()
window_start = now - self.window_seconds
self.requests[identifier] = [
req_time for req_time in self.requests[identifier]
if req_time > window_start
]
if len(self.requests[identifier]) >= self.max_requests:
return False
self.requests[identifier].append(now)
return True
3.7 Audit Logging
import logging
import json
from datetime import datetime
from typing import Any, Dict
class SecurityAuditLogger:
"""Log all database operations for security auditing"""
def __init__(self, log_file: str = '/var/log/sql_audit.log'):
self.logger = logging.getLogger('sql_audit')
handler = logging.FileHandler(log_file)
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_query(self, query: str, params: tuple, user_id: str,
ip_address: str, result_count: int = None):
"""Log query execution"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'ip_address': ip_address,
'query': query,
'param_count': len(params),
'result_count': result_count
}
self.logger.info(json.dumps(log_entry))
def log_security_event(self, event_type: str, details: Dict[str, Any],
severity: str = 'WARNING'):
"""Log security events"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': event_type,
'severity': severity,
'details': details
}
if severity == 'CRITICAL':
self.logger.critical(json.dumps(log_entry))
elif severity == 'ERROR':
self.logger.error(json.dumps(log_entry))
else:
self.logger.warning(json.dumps(log_entry))
3.8 Prepared Statement Pool
from typing import Dict, Any
import hashlib
class PreparedStatementPool:
"""Reuse prepared statements for better performance and security"""
def __init__(self, connection):
self.connection = connection
self.statements: Dict[str, Any] = {}
def get_statement(self, query: str):
"""Get or create prepared statement"""
query_hash = hashlib.sha256(query.encode()).hexdigest()[:16]
if query_hash not in self.statements:
cursor = self.connection.cursor()
statement_name = f"stmt_{query_hash}"
cursor.execute(f"PREPARE {statement_name} AS {query}")
self.statements[query_hash] = statement_name
return self.statements[query_hash]
def execute(self, query: str, params: tuple):
"""Execute using prepared statement"""
stmt_name = self.get_statement(query)
cursor = self.connection.cursor()
param_list = ', '.join(['%s'] * len(params))
cursor.execute(f"EXECUTE {stmt_name}({param_list})", params)
return cursor
4. Parameterization Examples
PostgreSQL/Python (psycopg2)
cursor.execute(
"SELECT * FROM users WHERE email = %s AND status = %s",
(user_email, status)
)
cursor.execute(
f"SELECT * FROM users WHERE email = '{user_email}'"
)
MySQL/Python (mysql-connector)
cursor.execute(
"SELECT * FROM products WHERE price > %s",
(min_price,)
)
SQLite/Python
cursor.execute(
"SELECT * FROM orders WHERE order_date > ?",
(start_date,)
)
Node.js (PostgreSQL)
const result = await client.query(
'SELECT * FROM users WHERE id = $1',
[userId]
);
5. Database-Specific Syntax
PostgreSQL
- Use
$1, $2, $3 for parameters
- Supports advanced features: JSONB, arrays, full-text search
- Use
RETURNING clause for INSERT/UPDATE/DELETE
- Case-sensitive text search with ILIKE
MySQL
- Use
? for parameters
- LIMIT syntax:
LIMIT offset, count
- Use backticks for identifiers with spaces
- Date functions: DATE_FORMAT, CURDATE()
SQL Server
- Use
@param1, @param2 for parameters
- TOP instead of LIMIT
- Use square brackets for identifiers
- Date functions: GETDATE(), DATEADD()
SQLite
- Use
? for parameters
- Limited ALTER TABLE support
- No RIGHT JOIN or FULL OUTER JOIN
- Date functions as strings
Error Handling
When generating queries, include error handling recommendations:
import psycopg2
from psycopg2 import sql
try:
cursor.execute(
sql.SQL("SELECT * FROM {} WHERE id = %s").format(
sql.Identifier('users')
),
(user_id,)
)
results = cursor.fetchall()
except psycopg2.Error as e:
print(f"Database error: {e}")
finally:
cursor.close()
Query Validation Checklist
Before providing a query, verify:
Response Format
When responding to a query request, provide:
- The SQL Query (properly formatted and commented)
- Explanation of what the query does
- Parameters that need to be passed
- Expected Result structure
- Performance Notes (if applicable)
- Security Warnings (if applicable)
- Implementation Example in the requested language
Example Response Structure
### SQL Query
```sql
-- Get active users with their order counts
SELECT
u.user_id,
u.username,
u.email,
COUNT(o.order_id) AS order_count,
COALESCE(SUM(o.total_amount), 0) AS lifetime_value
FROM
users u
LEFT JOIN
orders o ON u.user_id = o.user_id
WHERE
u.status = $1
AND u.created_at >= $2
GROUP BY
u.user_id,
u.username,
u.email
HAVING
COUNT(o.order_id) >= $3
ORDER BY
lifetime_value DESC
LIMIT $4;
Parameters
$1: status (string, e.g., 'active')
$2: created_at (date, e.g., '2024-01-01')
$3: min_orders (integer, e.g., 5)
$4: limit (integer, e.g., 100)
Explanation
This query retrieves active users who joined after a specified date and have placed a minimum number of orders. It calculates their total order count and lifetime value, sorted by highest spending customers first.
Expected Result
| user_id | username | email | order_count | lifetime_value |
|---|
| 123 | john_doe | john@example.com | 15 | 2500.00 |
Performance Notes
- Ensure index on
users.status and users.created_at
- Ensure index on
orders.user_id
- For large datasets, consider pagination
Implementation Example (Python/psycopg2)
cursor.execute(query, ('active', '2024-01-01', 5, 100))
results = cursor.fetchall()
## Advanced Topics
### 1. Query Optimization Techniques
- Use EXPLAIN ANALYZE to understand query plans
- Create covering indexes
- Partition large tables
- Use materialized views for complex aggregations
- Implement query result caching
### 2. Complex Scenarios
- Recursive CTEs for hierarchical data
- Pivot/Unpivot operations
- Full-text search
- Geospatial queries
- Time-series analysis
### 3. Migration Support
- Generate queries for data migration
- Schema comparison queries
- Data validation queries
- Backup and restore scripts
## Testing Recommendations
Always suggest testing generated queries with:
1. Small dataset first
2. EXPLAIN or EXPLAIN ANALYZE
3. Various edge cases (NULL values, empty sets)
4. Performance benchmarks
5. Security scanning tools
## Common Pitfalls to Avoid
1. **N+1 Query Problem**: Use JOINs instead of multiple queries
2. **SELECT ***: Specify needed columns explicitly
3. **Missing Indexes**: Recommend indexes on filter/join columns
4. **Cartesian Products**: Ensure proper JOIN conditions
5. **Implicit Type Conversions**: Cast explicitly when needed
6. **Timezone Issues**: Always use timezone-aware timestamps
## Integration Examples
### REST API
```python
from flask import Flask, request, jsonify
import psycopg2
@app.route('/api/users', methods=['GET'])
def get_users():
status = request.args.get('status', 'active')
# Validate input
if status not in ['active', 'inactive', 'suspended']:
return jsonify({'error': 'Invalid status'}), 400
try:
cursor.execute(
"SELECT id, username, email FROM users WHERE status = %s",
(status,)
)
users = cursor.fetchall()
return jsonify(users)
except Exception as e:
return jsonify({'error': str(e)}), 500
GraphQL Resolver
const resolvers = {
Query: {
users: async (_, { status, limit }, { db }) => {
const result = await db.query(
'SELECT * FROM users WHERE status = $1 LIMIT $2',
[status, limit]
);
return result.rows;
}
}
};
Conclusion
This skill provides comprehensive SQL query generation capabilities with a focus on security, performance, and best practices. Always prioritize parameterized queries and provide clear documentation with generated SQL.