3. Data Integrity & Anomaly Detection¶
Why Data Integrity Matters¶
Streaks are a trust contract with users. If a user loses a streak due to a system bug, they lose engagement and trust. If a user exploits a bug to inflate their streak, the gamification loses meaning. Both failure modes are unacceptable.
Deduplication Strategy¶
The Problem¶
Duplicate events arise from:
- Client retries — mobile apps retry on network timeouts, sending the same event twice
- Lambda retries — AWS Lambda retries on failure (at-least-once delivery)
- DynamoDB Streams replay — stream events can be delivered more than once
Multi-Layer Dedup¶
flowchart TD
A[Incoming Event] --> B{Layer 1:<br/>Idempotency key<br/>in request?}
B -->|No| C[Reject — 400]
B -->|Yes| D{Layer 2:<br/>Check IdempotencyIndex<br/>GSI}
D -->|Exists| E[Return cached<br/>response — 200]
D -->|New| F{Layer 3:<br/>Conditional PutItem<br/>attribute_not_exists}
F -->|Succeeded| G[Process event]
F -->|ConditionalCheckFailed| E
Layer 1 — Client-Side Idempotency Key¶
The mobile app generates a deterministic key per event:
Same user + same event type + same timestamp = same key. Retries produce the same key and are caught.
Layer 2 — GSI Lookup¶
Before writing, the Lambda queries the IdempotencyIndex GSI on the BPEvents table. If the key exists, the event is a duplicate — return the original response.
Layer 3 — Conditional Write¶
Even if the GSI check passes (eventual consistency window), the PutItem uses:
This is strongly consistent and prevents the race condition where two Lambda invocations both pass the GSI check.
Stream Processing Dedup¶
The streak processor Lambda stores processed event IDs in a short-lived Redis set:
if redis_client.sismember(f"processed:{user_id}", event_id):
return # Already processed
# ... process streak logic ...
redis_client.sadd(f"processed:{user_id}", event_id)
redis_client.expire(f"processed:{user_id}", 3600) # 1-hour TTL
Consistency Validation¶
Nightly Reconciliation Job¶
A scheduled Glue job runs nightly to detect and fix inconsistencies between the operational and analytical layers.
flowchart TD
A[Nightly Trigger:<br/>EventBridge] --> B[Read DailyActivity<br/>from DynamoDB]
B --> C[Read streak summaries<br/>from Redshift]
C --> D{Compare:<br/>streak counts match?}
D -->|Yes| E[Log: all consistent]
D -->|No| F[Generate<br/>discrepancy report]
F --> G{Auto-fixable?}
G -->|Yes — minor drift| H[Apply correction<br/>to DynamoDB]
G -->|No — large gap| I[Alert on-call<br/>engineer via SNS]
H --> J[Log correction<br/>to audit trail]
What gets reconciled:
| Check | Source of Truth | Action |
|---|---|---|
current_streak vs DailyActivity records |
DailyActivity |
Recompute streak from activity log |
total_bestie_points vs sum of BPEvents |
BPEvents |
Recalculate and update |
Missing DailyActivity records despite events |
BPEvents |
Backfill DailyActivity |
Orphan DailyActivity without events |
BPEvents |
Flag for investigation |
Cross-Layer Consistency¶
-- Redshift: find users where analytical data disagrees with operational
SELECT
r.user_id,
r.current_streak AS redshift_streak,
o.current_streak AS dynamo_streak,
ABS(r.current_streak - o.current_streak) AS drift
FROM analytics.user_streaks_snapshot r
JOIN staging.dynamo_user_streaks o
ON r.user_id = o.user_id
WHERE r.current_streak != o.current_streak
ORDER BY drift DESC;
Anomaly Detection¶
Pattern-Based Detection¶
flowchart LR
A[Raw Events] --> B[Anomaly<br/>Detection<br/>Lambda]
B --> C{Check rules}
C --> D[Rate limit<br/>exceeded?]
C --> E[Impossible<br/>timestamp?]
C --> F[Duplicate<br/>pattern?]
C --> G[Streak jump<br/>> 1 per day?]
D --> H[Flag & throttle]
E --> H
F --> H
G --> H
H --> I[DynamoDB:<br/>AnomalyLog]
H --> J[SNS Alert]
Detection Rules¶
| Rule | Condition | Severity | Action |
|---|---|---|---|
| Rate limiting | > 100 events/user/hour | Medium | Throttle, don't award BP |
| Impossible timestamps | client_timestamp > 24h in the future or > 7 days in the past |
High | Reject event |
| Rapid timezone changes | User timezone changes > 2x in 24h | High | Flag for review |
| Streak jump | current_streak increases by > 1 in a single update |
Critical | Block update, alert |
| Midnight exploit | Activity at 23:59 in TZ1 then 00:01 in TZ2 |
High | Use server-side TZ resolution |
| BP spike | Points earned in 1 hour > 10x user's daily average | Medium | Queue for review |
Implementation: Rate Limiting with Token Bucket¶
def check_rate_limit(user_id: str, redis_client) -> bool:
"""Token bucket rate limiter using Redis."""
key = f"rate:{user_id}"
current = redis_client.get(key)
if current and int(current) >= 100:
return False # Rate limited
pipe = redis_client.pipeline()
pipe.incr(key)
pipe.expire(key, 3600) # Reset every hour
pipe.execute()
return True
Implementation: Timezone Exploit Detection¶
def detect_timezone_abuse(user_id: str, new_tz: str, dynamodb_table) -> bool:
"""Detect if a user is changing timezones to game the streak system."""
response = dynamodb_table.get_item(Key={"user_id": user_id})
user = response.get("Item", {})
old_tz = user.get("timezone", new_tz)
if old_tz == new_tz:
return False
# Check how many TZ changes in last 24h (stored as a list)
tz_changes = user.get("recent_tz_changes", [])
recent = [
c for c in tz_changes
if datetime.fromisoformat(c["timestamp"]) > datetime.now(UTC) - timedelta(hours=24)
]
if len(recent) >= 2:
return True # Suspicious — flag for review
return False
Security Considerations¶
Data Protection¶
| Layer | Control |
|---|---|
| In transit | TLS 1.3 on API Gateway; VPC endpoints for DynamoDB/S3 |
| At rest | AES-256 encryption (AWS-managed KMS keys) on DynamoDB, S3, and Redshift |
| Access control | IAM roles with least-privilege; no long-lived credentials |
| API authentication | Cognito JWT tokens validated at API Gateway |
| PII handling | user_id is an opaque UUID — no PII in the gamification tables |
Audit Trail¶
Every mutation to streak data is logged:
audit_table.put_item(Item={
"audit_id": str(uuid4()),
"user_id": user_id,
"action": "STREAK_UPDATE",
"old_value": {"current_streak": old_streak},
"new_value": {"current_streak": new_streak},
"trigger": "streak_processor_lambda",
"timestamp": datetime.now(UTC).isoformat(),
"ttl": int((datetime.now(UTC) + timedelta(days=365)).timestamp())
})
Preventing Exploitation¶
- Server-side authority — the server calculates dates and streaks; the client only sends raw events
- Conditional writes — DynamoDB's
ConditionExpressionprevents concurrent overwrites - Rate limiting — prevents automated/bot interactions from inflating BP
- Anomaly alerts — unusual patterns trigger investigation before rewards are granted
Code Samples
code-samples/lambda/event_ingestion.py— Includes validation and dedup logiccode-samples/glue/etl_job.py— Includes batch deduplication