ワンクリックで
community-feed
// Social feed with batch queries, cursor pagination, trending algorithms, and engagement tracking. Efficient database queries for infinite scroll feeds.
// Social feed with batch queries, cursor pagination, trending algorithms, and engagement tracking. Efficient database queries for infinite scroll feeds.
Two-phase commit matchmaking that verifies both player connections before creating a match. Handles disconnections gracefully with automatic re-queue of healthy players.
Implement robust background job processing with dead letter queues, retries, and state machines. Use when building async workflows, scheduled tasks, or any work that shouldn't block the request/response cycle.
Manage data flow when producers outpace consumers. Bounded buffers, adaptive flushing, and graceful degradation prevent OOM crashes and data loss.
Collect-then-batch pattern for database operations achieving 30-40% throughput improvement. Includes graceful fallback to sequential processing when batch operations fail.
Implement multi-layer caching with Redis, in-memory, and HTTP caching. Covers cache invalidation, stampede prevention, and cache-aside patterns.
Exactly-once processing semantics with distributed coordination for file-based data pipelines. Atomic file claiming, status tracking, and automatic retry with in-memory fallback.
| name | community-feed |
| description | Social feed with batch queries, cursor pagination, trending algorithms, and engagement tracking. Efficient database queries for infinite scroll feeds. |
| license | MIT |
| compatibility | TypeScript/JavaScript, Python |
| metadata | {"category":"data-processing","time":"6h","source":"drift-masterguide"} |
Social feed with trending algorithms, cursor pagination, and engagement tracking.
Cursor pagination beats offset for large datasets. Batch-load relationships to avoid N+1. Store trending scores as computed columns for efficient sorting.
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional, List, Dict
import base64
import json
@dataclass
class PaginatedPosts:
posts: List[Dict]
total_count: int
has_more: bool
next_cursor: Optional[str]
class CommunityFeedService:
"""Service for community feed with cursor pagination."""
def __init__(self, db):
self.db = db
async def get_feed(
self,
feed_type: str = "trending",
viewer_id: Optional[str] = None,
cursor: Optional[str] = None,
limit: int = 20,
tags: Optional[List[str]] = None,
) -> PaginatedPosts:
cursor_data = self._parse_cursor(cursor) if cursor else None
query = self.db.table("community_posts").select(
"*",
"assets!inner(url, asset_type)",
"users!inner(id, display_name, avatar_url)",
)
if tags:
query = query.contains("tags", tags)
# Feed-specific ordering
if feed_type == "following" and viewer_id:
following = await self._get_following_ids(viewer_id)
if not following:
return PaginatedPosts(posts=[], total_count=0, has_more=False, next_cursor=None)
query = query.in_("user_id", following)
if feed_type == "trending":
query = query.order("trending_score", desc=True)
if cursor_data:
query = query.lt("trending_score", cursor_data["score"])
else:
query = query.order("created_at", desc=True)
if cursor_data:
query = query.lt("created_at", cursor_data["created_at"])
# Fetch one extra to check has_more
query = query.limit(limit + 1)
result = query.execute()
posts = result.data or []
has_more = len(posts) > limit
if has_more:
posts = posts[:limit]
# Batch load viewer's likes
if viewer_id and posts:
liked_ids = await self._get_liked_post_ids(viewer_id, [p["id"] for p in posts])
for post in posts:
post["is_liked_by_viewer"] = post["id"] in liked_ids
next_cursor = None
if has_more and posts:
next_cursor = self._generate_cursor(posts[-1], feed_type)
return PaginatedPosts(
posts=posts,
total_count=await self._get_total_count(tags),
has_more=has_more,
next_cursor=next_cursor,
)
async def _get_following_ids(self, user_id: str) -> List[str]:
result = self.db.table("user_follows").select("following_id").eq("follower_id", user_id).execute()
return [r["following_id"] for r in (result.data or [])]
async def _get_liked_post_ids(self, user_id: str, post_ids: List[str]) -> set:
result = self.db.table("post_likes").select("post_id").eq("user_id", user_id).in_("post_id", post_ids).execute()
return {r["post_id"] for r in (result.data or [])}
def _parse_cursor(self, cursor: str) -> dict:
try:
return json.loads(base64.b64decode(cursor).decode())
except:
return {}
def _generate_cursor(self, post: dict, feed_type: str) -> str:
if feed_type == "trending":
data = {"score": post.get("trending_score", 0)}
else:
data = {"created_at": post["created_at"]}
return base64.b64encode(json.dumps(data).encode()).decode()
# Engagement operations
async def like_post(self, post_id: str, user_id: str) -> bool:
existing = self.db.table("post_likes").select("id").eq("post_id", post_id).eq("user_id", user_id).execute()
if existing.data:
return False # Already liked
self.db.table("post_likes").insert({
"post_id": post_id,
"user_id": user_id,
"created_at": datetime.now(timezone.utc).isoformat(),
}).execute()
# Atomic increment
self.db.rpc("increment_like_count", {"post_id": post_id}).execute()
return True
async def unlike_post(self, post_id: str, user_id: str) -> bool:
result = self.db.table("post_likes").delete().eq("post_id", post_id).eq("user_id", user_id).execute()
if not result.data:
return False
self.db.rpc("decrement_like_count", {"post_id": post_id}).execute()
return True
# Trending algorithm
def calculate_trending_score(
like_count: int,
comment_count: int,
view_count: int,
created_at: datetime,
is_featured: bool = False,
) -> float:
"""
Trending score = engagement / age^decay
Higher engagement + newer = higher score
"""
engagement = like_count * 1.0 + comment_count * 2.0 + view_count * 0.1
age_hours = max((datetime.now(timezone.utc) - created_at).total_seconds() / 3600, 0.1)
score = engagement / (age_hours ** 1.5)
return score * 1.5 if is_featured else score
CREATE TABLE community_posts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id),
title VARCHAR(200) NOT NULL,
like_count INTEGER DEFAULT 0,
comment_count INTEGER DEFAULT 0,
view_count INTEGER DEFAULT 0,
is_featured BOOLEAN DEFAULT FALSE,
tags TEXT[] DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
-- Computed trending score
trending_score FLOAT GENERATED ALWAYS AS (
(like_count + comment_count * 2 + view_count * 0.1) /
POWER(GREATEST(EXTRACT(EPOCH FROM (NOW() - created_at)) / 3600, 0.1), 1.5)
) STORED
);
CREATE INDEX idx_posts_trending ON community_posts(trending_score DESC);
CREATE INDEX idx_posts_created ON community_posts(created_at DESC);
-- Atomic increment functions
CREATE FUNCTION increment_like_count(post_id UUID) RETURNS VOID AS $$
BEGIN UPDATE community_posts SET like_count = like_count + 1 WHERE id = post_id; END;
$$ LANGUAGE plpgsql;
feed_service = CommunityFeedService(db)
# Get trending feed
result = await feed_service.get_feed(feed_type="trending", viewer_id="user_123", limit=20)
for post in result.posts:
print(f"{post['title']} - {post['like_count']} likes")
# Load next page
if result.has_more:
next_page = await feed_service.get_feed(feed_type="trending", cursor=result.next_cursor)
# Like a post
await feed_service.like_post("post_456", "user_123")