Quick Fix
Understanding Content Policy Violations
Each provider has specific safety guidelines. See OpenAI's safety practices,Anthropic's safety overview, andGoogle's safety settings.
"Your request was rejected as a result of our safety system"
Automatic content filtering
stop_reason: "CONTENT_FILTER"
Constitutional AI safety
"INVALID_ARGUMENT" - blockedContent
Configurable safety thresholds
"content_policy_violation"
Enterprise content filtering
Common Content Policy Triggers
Understanding what triggers violations helps prevent them. Reference Azure's content filter documentation for detailed categories.
Violence & Harm
- • Physical violence
- • Self-harm content
- • Graphic descriptions
- • Weapons & threats
Hate & Harassment
- • Discriminatory content
- • Targeted harassment
- • Hate speech
- • Stereotyping
Sexual Content
- • Adult content
- • Suggestive material
- • Sexual services
- • Explicit descriptions
Illegal Activity
- • Criminal planning
- • Fraud & scams
- • Hacking instructions
- • Drug manufacturing
Sensitive Info
- • Personal data (PII)
- • Medical advice
- • Legal counsel
- • Financial advice
Jailbreaking
- • Bypass attempts
- • Role-play tricks
- • System prompts
- • DAN prompts
Implementing Content Pre-Screening
Prevent violations before they occur using tools like Detoxify orCohere Guardrails.
Python: Content Screening Implementation
from typing import Dict, List, Tuple, Optional
import re
from detoxify import Detoxify
import logging
class ContentScreener:
"""Pre-screen content for policy violations"""
def __init__(self, threshold: float = 0.7):
self.threshold = threshold
self.detox = Detoxify('original')
self.logger = logging.getLogger(__name__)
# Basic keyword filters
self.blocked_keywords = [
# Violence
"kill", "murder", "suicide", "harm", "attack",
# Illegal
"hack", "crack", "bypass", "jailbreak",
# PII patterns
r"\b\d{3}-\d{2}-\d{4}\b", # SSN
r"\b\d{16}\b", # Credit card
# Medical
"diagnose", "prescribe", "medical advice"
]
# Context-aware replacements
self.safe_replacements = {
"kill": "stop",
"hack": "access",
"bypass": "work with",
"jailbreak": "customize"
}
def screen_content(self, text: str) -> Tuple[bool, Optional[str], Dict]:
"""
Screen content for policy violations
Returns: (is_safe, violation_reason, scores)
"""
# Step 1: Check for blocked keywords
keyword_check = self.check_keywords(text)
if not keyword_check[0]:
return False, f"Blocked keyword: {keyword_check[1]}", {}
# Step 2: Use Detoxify for toxicity analysis
try:
scores = self.detox.predict(text)
# Check each category
violations = []
for category, score in scores.items():
if score > self.threshold:
violations.append(f"{category}: {score:.2f}")
if violations:
return False, f"High risk categories: {', '.join(violations)}", scores
return True, None, scores
except Exception as e:
self.logger.error(f"Detoxify error: {e}")
# Fallback to keyword check only
return keyword_check[0], keyword_check[1], {}
def check_keywords(self, text: str) -> Tuple[bool, Optional[str]]:
"""Check for blocked keywords and patterns"""
text_lower = text.lower()
for keyword in self.blocked_keywords:
if keyword.startswith(r"\b"): # Regex pattern
if re.search(keyword, text):
return False, "PII pattern detected"
elif keyword in text_lower:
# Check if it's in a safe context
if not self.is_safe_context(text_lower, keyword):
return False, keyword
return True, None
def is_safe_context(self, text: str, keyword: str) -> bool:
"""Check if keyword appears in safe context"""
safe_contexts = {
"kill": ["kill process", "kill -9", "kill command"],
"hack": ["life hack", "hack solution", "hackathon"],
"bypass": ["bypass cache", "bypass proxy"],
}
if keyword in safe_contexts:
for safe_phrase in safe_contexts[keyword]:
if safe_phrase in text:
return True
return False
def suggest_rephrase(self, text: str, scores: Dict) -> str:
"""Suggest safer rephrasing"""
# Replace problematic keywords
rephrased = text
for bad, good in self.safe_replacements.items():
rephrased = re.sub(
rf"\b{bad}\b",
good,
rephrased,
flags=re.IGNORECASE
)
# Add clarifying context
if any(score > 0.5 for score in scores.values()):
rephrased = f"For educational purposes only: {rephrased}"
return rephrased
def validate_and_clean(self, text: str) -> Tuple[str, bool, List[str]]:
"""Validate and clean content, return cleaned text and warnings"""
warnings = []
cleaned = text
# Remove potential PII
cleaned = re.sub(r"\b\d{3}-\d{2}-\d{4}\b", "[SSN_REMOVED]", cleaned)
cleaned = re.sub(r"\b\d{16}\b", "[CC_REMOVED]", cleaned)
cleaned = re.sub(
r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"[EMAIL_REMOVED]",
cleaned
)
if cleaned != text:
warnings.append("Removed potential PII")
# Screen the cleaned content
is_safe, reason, scores = self.screen_content(cleaned)
if not is_safe:
# Try rephrasing
rephrased = self.suggest_rephrase(cleaned, scores)
re_screened = self.screen_content(rephrased)
if re_screened[0]:
warnings.append(f"Content rephrased to avoid: {reason}")
return rephrased, True, warnings
else:
warnings.append(f"Content may violate policy: {reason}")
return cleaned, False, warnings
return cleaned, True, warnings
# Usage example
screener = ContentScreener(threshold=0.7)
def safe_api_call(prompt: str) -> Dict:
"""Make API call with content screening"""
# Pre-screen
cleaned_prompt, is_safe, warnings = screener.validate_and_clean(prompt)
if warnings:
print(f"⚠️ Warnings: {', '.join(warnings)}")
if not is_safe:
return {
"error": "Content policy violation",
"warnings": warnings,
"suggestion": "Please rephrase your request"
}
# Make actual API call with cleaned prompt
try:
response = make_llm_api_call(cleaned_prompt)
return response
except ContentPolicyError as e:
# Handle if still rejected
return {
"error": "Content rejected by API",
"details": str(e),
"suggestion": screener.suggest_rephrase(prompt, {})
}
# Test the screener
test_prompts = [
"How can I kill a Python process?", # Safe context
"Write code to hack into a system", # Unsafe
"My SSN is 123-45-6789", # PII
"Explain how to bypass security", # Needs context
]
for prompt in test_prompts:
result = safe_api_call(prompt)
print(f"\nPrompt: {prompt}")
print(f"Result: {result}")
TypeScript: Content Validation Middleware
interface ContentPolicy {
maxToxicity: number;
blockedPatterns: RegExp[];
sensitiveCategories: string[];
}
class ContentValidator {
private policy: ContentPolicy = {
maxToxicity: 0.7,
blockedPatterns: [
/\b\d{3}-\d{2}-\d{4}\b/g, // SSN
/\b\d{16}\b/g, // Credit card
/\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b/g, // Email
],
sensitiveCategories: [
'violence', 'self-harm', 'sexual', 'hate',
'harassment', 'illegal', 'deception'
]
};
private blockedKeywords = new Set([
'kill', 'murder', 'suicide', 'hack', 'crack',
'bypass', 'jailbreak', 'exploit', 'malware'
]);
async validateContent(text: string): Promise<{
isValid: boolean;
violations: string[];
cleaned: string;
}> {
const violations: string[] = [];
let cleaned = text;
// Check for PII
for (const pattern of this.policy.blockedPatterns) {
if (pattern.test(text)) {
violations.push('Contains potentially sensitive information');
cleaned = cleaned.replace(pattern, '[REDACTED]');
}
}
// Check keywords
const words = text.toLowerCase().split(/\s+/);
for (const word of words) {
if (this.blockedKeywords.has(word)) {
// Check context
if (!this.isSafeContext(text, word)) {
violations.push(`Problematic keyword: ${word}`);
}
}
}
// Use ML-based classification if available
const toxicityScore = await this.checkToxicity(cleaned);
if (toxicityScore > this.policy.maxToxicity) {
violations.push(`High toxicity score: ${toxicityScore.toFixed(2)}`);
}
return {
isValid: violations.length === 0,
violations,
cleaned
};
}
private isSafeContext(text: string, keyword: string): boolean {
const safeContexts: Record<string, string[]> = {
'kill': ['kill process', 'kill signal', 'kill command'],
'hack': ['hackathon', 'life hack', 'hack day'],
'bypass': ['bypass cache', 'bypass filter', 'bypass route']
};
const contexts = safeContexts[keyword] || [];
return contexts.some(ctx => text.toLowerCase().includes(ctx));
}
private async checkToxicity(text: string): Promise<number> {
// Implement with your preferred toxicity API
// This is a placeholder
return 0.0;
}
rephraseContent(text: string): string {
const replacements: Record<string, string> = {
'kill': 'terminate',
'hack': 'access',
'bypass': 'work around',
'exploit': 'utilize'
};
let rephrased = text;
for (const [bad, good] of Object.entries(replacements)) {
const regex = new RegExp(`\\b${bad}\\b`, 'gi');
rephrased = rephrased.replace(regex, good);
}
return rephrased;
}
}
// Express middleware
import { Request, Response, NextFunction } from 'express';
const contentValidator = new ContentValidator();
export const contentPolicyMiddleware = async (
req: Request,
res: Response,
next: NextFunction
) => {
const { prompt } = req.body;
if (!prompt) {
return next();
}
const validation = await contentValidator.validateContent(prompt);
if (!validation.isValid) {
// Log violation
console.warn('Content policy violation:', {
violations: validation.violations,
timestamp: new Date().toISOString()
});
// Try rephrasing
const rephrased = contentValidator.rephraseContent(prompt);
const revalidation = await contentValidator.validateContent(rephrased);
if (revalidation.isValid) {
req.body.prompt = rephrased;
req.body.wasRephrased = true;
return next();
}
return res.status(400).json({
error: 'Content policy violation',
violations: validation.violations,
suggestion: 'Please rephrase your request to comply with content policies'
});
}
// Use cleaned content
req.body.prompt = validation.cleaned;
next();
};
Handling Content Policy Errors
When violations occur, handle them gracefully. Implementation patterns from PromptLayer show effective approaches.
Comprehensive Error Handler
from openai import OpenAI
from anthropic import Anthropic
import logging
from typing import Optional, Dict, Any
class SafetyErrorHandler:
"""Handle content policy errors across providers"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.violation_cache = {} # Cache violations to avoid repeated attempts
async def handle_openai_error(self, error: Exception, prompt: str) -> Dict:
"""Handle OpenAI content policy errors"""
error_str = str(error).lower()
if "content_policy_violation" in error_str:
self.logger.warning(f"OpenAI content policy violation: {error}")
# Cache the violation
prompt_hash = hash(prompt)
self.violation_cache[prompt_hash] = {
"provider": "openai",
"error": error_str,
"timestamp": datetime.now()
}
return {
"error": "content_policy_violation",
"message": "Your request contains content that violates OpenAI's usage policies",
"suggestions": [
"Remove any harmful, illegal, or adult content",
"Clarify legitimate use cases (e.g., 'for educational purposes')",
"Avoid requests for personal information",
"Rephrase to be more specific about your intent"
]
}
raise error
async def handle_anthropic_error(self, response: Dict) -> Optional[Dict]:
"""Handle Anthropic content filtering"""
if response.get("stop_reason") == "CONTENT_FILTER":
self.logger.warning("Anthropic content filter triggered")
return {
"error": "content_filtered",
"message": "Claude's safety system filtered this content",
"suggestions": [
"Ensure your request doesn't involve harmful content",
"Be more specific about legitimate use cases",
"Break complex requests into smaller, clearer parts"
]
}
return None
async def handle_google_error(self, response: Dict) -> Optional[Dict]:
"""Handle Google Gemini safety blocks"""
if "blockedContent" in response:
categories = response.get("blockedContent", {}).get("categories", [])
return {
"error": "safety_block",
"message": "Content blocked by Google's safety filters",
"categories": categories,
"suggestions": [
"Review Google's prohibited content categories",
"Adjust safety settings if appropriate",
"Rephrase to avoid ambiguous content"
]
}
return None
def suggest_alternatives(self, prompt: str, error_type: str) -> List[str]:
"""Suggest alternative phrasings"""
suggestions = []
# Add context for legitimate use
if any(word in prompt.lower() for word in ["hack", "exploit", "vulnerability"]):
suggestions.append(
f"For security research: {prompt}"
)
suggestions.append(
f"To understand security concepts: {prompt}"
)
# Medical/legal disclaimers
if any(word in prompt.lower() for word in ["diagnose", "prescribe", "legal advice"]):
suggestions.append(
"For informational purposes only, not professional advice: " + prompt
)
# Educational context
suggestions.append(f"For educational discussion: {prompt}")
return suggestions
# Usage with fallback
async def safe_llm_call(
prompt: str,
providers: List[str] = ["openai", "anthropic", "google"]
) -> Dict:
"""Try multiple providers with safety handling"""
handler = SafetyErrorHandler()
for provider in providers:
try:
if provider == "openai":
client = OpenAI()
response = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return {"provider": provider, "response": response}
elif provider == "anthropic":
client = Anthropic()
response = await client.messages.create(
model="claude-3-opus-20240229",
messages=[{"role": "user", "content": prompt}]
)
# Check for content filtering
error = await handler.handle_anthropic_error(response)
if error:
continue
return {"provider": provider, "response": response}
except Exception as e:
if provider == "openai":
error_response = await handler.handle_openai_error(e, prompt)
if error_response.get("error") == "content_policy_violation":
# Try alternatives
alternatives = handler.suggest_alternatives(prompt, "openai")
for alt in alternatives:
try:
response = await safe_llm_call(alt, [provider])
if not response.get("error"):
return response
except:
continue
continue
return {
"error": "all_providers_failed",
"message": "Content was rejected by all providers",
"suggestion": "Please significantly rephrase your request"
}
Content Policy Monitoring
import json
from datetime import datetime, timedelta
from collections import defaultdict
import pandas as pd
class ViolationMonitor:
"""Monitor and analyze content policy violations"""
def __init__(self, log_file: str = "violations.jsonl"):
self.log_file = log_file
self.violations = defaultdict(list)
def log_violation(
self,
user_id: str,
prompt: str,
provider: str,
error_type: str,
severity: str = "medium"
):
"""Log a content policy violation"""
violation = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"prompt_hash": hash(prompt), # Don't log full prompt
"prompt_length": len(prompt),
"provider": provider,
"error_type": error_type,
"severity": severity
}
# Write to file
with open(self.log_file, 'a') as f:
f.write(json.dumps(violation) + '\n')
# Keep in memory for analysis
self.violations[user_id].append(violation)
def get_user_violations(self, user_id: str, days: int = 30) -> List[Dict]:
"""Get recent violations for a user"""
cutoff = datetime.now() - timedelta(days=days)
return [
v for v in self.violations[user_id]
if datetime.fromisoformat(v["timestamp"]) > cutoff
]
def analyze_patterns(self) -> Dict:
"""Analyze violation patterns"""
# Load all violations
all_violations = []
with open(self.log_file, 'r') as f:
for line in f:
all_violations.append(json.loads(line))
df = pd.DataFrame(all_violations)
if df.empty:
return {}
# Convert timestamp
df['timestamp'] = pd.to_datetime(df['timestamp'])
analysis = {
"total_violations": len(df),
"unique_users": df['user_id'].nunique(),
"violations_by_provider": df['provider'].value_counts().to_dict(),
"violations_by_type": df['error_type'].value_counts().to_dict(),
"violations_by_hour": df.set_index('timestamp').resample('H').size().to_dict(),
"repeat_offenders": df['user_id'].value_counts().head(10).to_dict()
}
return analysis
def should_flag_user(self, user_id: str) -> bool:
"""Check if user should be flagged for violations"""
recent_violations = self.get_user_violations(user_id, days=7)
# Flag if more than 5 violations in 7 days
if len(recent_violations) > 5:
return True
# Flag if severe violations
severe_count = sum(
1 for v in recent_violations
if v.get('severity') == 'high'
)
return severe_count > 0
# Usage
monitor = ViolationMonitor()
# Log a violation
monitor.log_violation(
user_id="user123",
prompt="[redacted]",
provider="openai",
error_type="content_policy_violation",
severity="medium"
)
# Check user status
if monitor.should_flag_user("user123"):
print("User flagged for multiple violations")
# Analyze patterns
patterns = monitor.analyze_patterns()
print(f"Total violations: {patterns.get('total_violations', 0)}")
Best Practices
Do's
- • Pre-screen content before API calls
- • Provide clear context for legitimate uses
- • Implement content filtering middleware
- • Log violations for pattern analysis
- • Educate users on content policies
- • Use rephrasing strategies
- • Handle errors gracefully
Don'ts
- • Don't try to bypass safety systems
- • Don't log full prompts with PII
- • Don't ignore repeated violations
- • Don't allow unlimited retries
- • Don't process clearly harmful content
- • Don't share violation details publicly
- • Don't implement weak filtering
Effective Rephrasing Strategies
How to Rephrase Problematic Requests
1. Add Educational Context
❌ "How to hack a website"
✅ "Explain common web vulnerabilities for security education"
2. Specify Legitimate Use
❌ "Generate fake ID information"
✅ "Generate example data for testing user registration forms"
3. Use Technical Terms
❌ "Kill the process"
✅ "Terminate the process with PID 1234"
4. Focus on Understanding
❌ "Write malware code"
✅ "Explain how antivirus software detects malicious patterns"
References
- [1] OpenAI. "Error Codes Reference" (2024)
- [2] Anthropic. "API Errors" (2024)
- [3] Stack Overflow. "OpenAI API Questions" (2024)