Best Practices
Professional web scraping techniques and ethical guidelines for using PepeProxy.
Ethical Scraping
Respect robots.txt
Always check and respect the robots.txt file:
from urllib.robotparser import RobotFileParser
def can_scrape(url):
"""Check if scraping is allowed"""
rp = RobotFileParser()
rp.set_url(f"{url.scheme}://{url.netloc}/robots.txt")
rp.read()
return rp.can_fetch("*", url.geturl())
# Usage
from urllib.parse import urlparse
url = urlparse('https://example.com/page')
if can_scrape(url):
# Proceed with scraping
pass
else:
print("Scraping not allowed by robots.txt") Add Appropriate Delays
Never hammer servers with rapid-fire requests:
import time
import random
# Fixed delay
time.sleep(3) # 3 seconds between requests
# Random delay (more human-like)
time.sleep(random.uniform(2, 5)) # 2-5 seconds
# Exponential backoff on errors
def fetch_with_backoff(url, max_retries=5):
for attempt in range(max_retries):
try:
response = requests.get(url, proxies=proxies)
response.raise_for_status()
return response
except requests.exceptions.RequestException:
if attempt == max_retries - 1:
raise
wait = (2 ** attempt) + random.random()
time.sleep(wait) Identify Your Bot
Use a descriptive User-Agent when appropriate:
headers = {
'User-Agent': 'MyCompanyBot/1.0 (+https://example.com/bot-info)'
} For stealth scraping, use realistic browser user agents:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
} Honor Rate Limits
Implement rate limiting to avoid overwhelming servers:
from datetime import datetime, timedelta
from collections import deque
class RateLimiter:
def __init__(self, max_requests, time_window_seconds):
self.max_requests = max_requests
self.time_window = timedelta(seconds=time_window_seconds)
self.requests = deque()
def acquire(self):
now = datetime.now()
# Remove old requests
while self.requests and now - self.requests[0] > self.time_window:
self.requests.popleft()
# Check if we can make a request
if len(self.requests) >= self.max_requests:
sleep_time = (self.requests[0] + self.time_window - now).total_seconds()
if sleep_time > 0:
time.sleep(sleep_time)
return self.acquire()
self.requests.append(now)
return True
# Usage: 10 requests per minute
limiter = RateLimiter(max_requests=10, time_window_seconds=60)
for url in urls:
limiter.acquire()
response = requests.get(url, proxies=proxies) Proxy Management
Choose the Right Session Type
Rotating Proxies - Best for:
- High-volume scraping
- Data aggregation
- Price monitoring
- When you need fresh IPs
Sticky Proxies - Best for:
- Login flows
- Shopping carts
- Multi-step forms
- Account management
# Rotating: New IP each request
for url in large_list_of_urls:
response = requests.get(url, proxies=rotating_proxies)
# Sticky: Same IP for 10 minutes
session = requests.Session()
session.proxies = sticky_proxies
response1 = session.get(login_url)
response2 = session.post(login_url, data=credentials) # Same IP Select Optimal Locations
Choose proxy locations strategically:
# Scraping US e-commerce sites
proxies = generate_proxies(country='United States', city='New York')
# Multi-region scraping
regions = ['United States', 'United Kingdom', 'Germany']
for region in regions:
regional_proxies = generate_proxies(country=region)
scrape_region(regional_proxies) Implement Proxy Rotation
For large-scale scraping, rotate through multiple proxies:
import itertools
class ProxyPool:
def __init__(self, proxy_list):
self.proxies = itertools.cycle(proxy_list)
self.current = None
def get_next(self):
self.current = next(self.proxies)
return {
'http': self.current,
'https': self.current
}
# Usage
proxy_list = [
'http://user1:pass1@us-01.pepeproxy.com:2333',
'http://user2:pass2@us-01.pepeproxy.com:2333',
'http://user3:pass3@us-01.pepeproxy.com:2333',
]
pool = ProxyPool(proxy_list)
for url in urls:
proxies = pool.get_next()
response = requests.get(url, proxies=proxies) Performance Optimization
Minimize Traffic Usage
1. Block Unnecessary Resources
# Puppeteer example
await page.setRequestInterception(True)
page.on('request', request => {
const resourceType = request.resourceType()
if (['image', 'stylesheet', 'font', 'media'].includes(resourceType)) {
request.abort() # Block images, CSS, fonts
} else {
request.continue()
}
}) 2. Use HEAD Requests When Possible
# Check if resource exists without downloading
response = requests.head(url, proxies=proxies)
if response.status_code == 200:
print(f"Resource exists: {response.headers.get('Content-Length')} bytes") 3. Compress Responses
headers = {
'Accept-Encoding': 'gzip, deflate, br'
}
response = requests.get(url, proxies=proxies, headers=headers) 4. Implement Caching
import requests_cache
# Cache responses for 1 hour
requests_cache.install_cache('scraper_cache', expire_after=3600)
# Now requests are cached automatically
response = requests.get(url, proxies=proxies) # Downloads
response = requests.get(url, proxies=proxies) # From cache (no traffic) Concurrent Requests
Use async/concurrent requests for faster scraping:
import asyncio
import aiohttp
async def fetch(session, url, proxy):
try:
async with session.get(url, proxy=proxy, timeout=30) as response:
return await response.text()
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
async def scrape_concurrent(urls, proxy):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url, proxy) for url in urls]
return await asyncio.gather(*tasks)
# Usage
urls = ['https://example.com/page1', 'https://example.com/page2']
proxy = 'http://username:password@us-01.pepeproxy.com:2333'
results = asyncio.run(scrape_concurrent(urls, proxy)) Connection Pooling
Reuse connections to reduce overhead:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
session = requests.Session()
# Configure retries
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10,
pool_maxsize=20
)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.proxies = proxies
# Reuse session for all requests
for url in urls:
response = session.get(url) Error Handling
Robust Retry Logic
import time
import requests
from requests.exceptions import RequestException
def fetch_with_retry(url, proxies, max_retries=5):
"""Fetch URL with exponential backoff retry"""
for attempt in range(max_retries):
try:
response = requests.get(
url,
proxies=proxies,
timeout=(10, 30), # (connect, read)
headers={'User-Agent': 'Mozilla/5.0'}
)
# Check for specific status codes
if response.status_code == 429:
# Rate limited
retry_after = int(response.headers.get('Retry-After', 60))
print(f"Rate limited. Waiting {retry_after}s...")
time.sleep(retry_after)
continue
response.raise_for_status()
return response
except requests.exceptions.Timeout:
print(f"Timeout on attempt {attempt + 1}")
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt)
except requests.exceptions.ProxyError:
print(f"Proxy error on attempt {attempt + 1}")
# Try regenerating proxy or switching to backup
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt)
except RequestException as e:
print(f"Request failed on attempt {attempt + 1}: {e}")
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt)
raise Exception(f"Failed to fetch {url} after {max_retries} attempts") Handle Different Response Types
def parse_response(response):
"""Parse response based on content type"""
content_type = response.headers.get('Content-Type', '')
if 'application/json' in content_type:
return response.json()
elif 'text/html' in content_type:
from bs4 import BeautifulSoup
return BeautifulSoup(response.content, 'html.parser')
elif 'application/xml' in content_type:
import xml.etree.ElementTree as ET
return ET.fromstring(response.content)
else:
return response.text Logging and Monitoring
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'scraper_{datetime.now().strftime("%Y%m%d")}.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def scrape_with_logging(url, proxies):
"""Scrape with detailed logging"""
logger.info(f"Starting scrape: {url}")
try:
start_time = time.time()
response = requests.get(url, proxies=proxies, timeout=30)
elapsed = time.time() - start_time
logger.info(f"Success: {url} ({response.status_code}) in {elapsed:.2f}s")
return response
except requests.exceptions.Timeout:
logger.error(f"Timeout: {url}")
raise
except requests.exceptions.RequestException as e:
logger.error(f"Error scraping {url}: {e}")
raise Data Extraction
Use Proper Selectors
BeautifulSoup:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
# CSS selectors (preferred)
title = soup.select_one('h1.product-title').text.strip()
price = soup.select_one('span.price').text.strip()
# XPath-like navigation
products = soup.find_all('div', class_='product')
for product in products:
name = product.find('h2').text
price = product.find('span', class_='price').text lxml (faster):
from lxml import html
tree = html.fromstring(response.content)
# XPath selectors
title = tree.xpath('//h1[@class="product-title"]/text()')[0]
prices = tree.xpath('//span[@class="price"]/text()') Clean and Validate Data
import re
from decimal import Decimal
def clean_price(price_text):
"""Extract and clean price from text"""
# Remove currency symbols and commas
clean = re.sub(r'[^d.]', '', price_text)
try:
return Decimal(clean)
except:
return None
def clean_text(text):
"""Clean and normalize text"""
if not text:
return ''
# Remove extra whitespace
text = re.sub(r's+', ' ', text)
return text.strip()
# Usage
raw_price = "$1,234.56"
price = clean_price(raw_price) # Decimal('1234.56') Storage Best Practices
Structured Data Storage
CSV:
import csv
with open('products.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=['name', 'price', 'url'])
writer.writeheader()
for product in products:
writer.writerow({
'name': product['name'],
'price': product['price'],
'url': product['url']
}) JSON:
import json
data = {
'scraped_at': datetime.now().isoformat(),
'source': 'example.com',
'products': products
}
with open('products.json', 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False) Database (SQLite):
import sqlite3
conn = sqlite3.connect('scraper.db')
cursor = conn.cursor()
# Create table
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY,
name TEXT,
price REAL,
url TEXT UNIQUE,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Insert data
cursor.execute(
'INSERT OR REPLACE INTO products (name, price, url) VALUES (?, ?, ?)',
(product['name'], product['price'], product['url'])
)
conn.commit()
conn.close() Security Best Practices
Protect Credentials
Never hardcode credentials:
# ❌ BAD
proxy = 'http://myuser:mypass123@us-01.pepeproxy.com:2333'
# ✅ GOOD
import os
from dotenv import load_dotenv
load_dotenv()
proxy_user = os.getenv('PROXY_USER')
proxy_pass = os.getenv('PROXY_PASS')
proxy = f'http://{proxy_user}:{proxy_pass}@us-01.pepeproxy.com:2333' Sanitize User Input
from urllib.parse import quote
def build_url(base, params):
"""Safely build URL with parameters"""
query = '&'.join(f'{k}={quote(str(v))}' for k, v in params.items())
return f'{base}?{query}'
# Usage
url = build_url('https://example.com/search', {
'q': user_input, # Safely quoted
'page': 1
}) Validate SSL Certificates
# ✅ Always verify SSL (default)
response = requests.get(url, proxies=proxies, verify=True)
# Only disable for testing (never in production)
import urllib3
urllib3.disable_warnings()
response = requests.get(url, proxies=proxies, verify=False) Monitoring and Maintenance
Track Success Rates
class ScraperStats:
def __init__(self):
self.total_requests = 0
self.successful_requests = 0
self.failed_requests = 0
self.traffic_used_kb = 0
def record_success(self, response):
self.total_requests += 1
self.successful_requests += 1
self.traffic_used_kb += len(response.content) / 1024
def record_failure(self):
self.total_requests += 1
self.failed_requests += 1
def get_success_rate(self):
if self.total_requests == 0:
return 0
return (self.successful_requests / self.total_requests) * 100
def print_stats(self):
print(f"Total Requests: {self.total_requests}")
print(f"Successful: {self.successful_requests}")
print(f"Failed: {self.failed_requests}")
print(f"Success Rate: {self.get_success_rate():.2f}%")
print(f"Traffic Used: {self.traffic_used_kb:.2f} KB")
# Usage
stats = ScraperStats()
for url in urls:
try:
response = fetch(url)
stats.record_success(response)
except:
stats.record_failure()
stats.print_stats() Set Up Alerts
import smtplib
from email.message import EmailMessage
def send_alert(subject, body):
"""Send email alert for critical failures"""
msg = EmailMessage()
msg.set_content(body)
msg['Subject'] = subject
msg['From'] = 'scraper@example.com'
msg['To'] = 'admin@example.com'
with smtplib.SMTP('smtp.gmail.com', 587) as smtp:
smtp.starttls()
smtp.login('user', 'password')
smtp.send_message(msg)
# Usage
if stats.get_success_rate() < 50:
send_alert(
'Scraper Alert: Low Success Rate',
f'Success rate dropped to {stats.get_success_rate():.2f}%'
) Legal and Compliance
Stay Compliant
- Read Terms of Service: Always review target website’s ToS
- Respect Copyright: Don’t scrape and republish copyrighted content
- Personal Data: Follow GDPR/CCPA when scraping personal information
- Attribution: Credit sources when required
- Commercial Use: Ensure you have rights for commercial scraping
Ethical Guidelines
- Don’t cause harm: Never disrupt services or overload servers
- Be transparent: Identify your bot when appropriate
- Respect privacy: Don’t scrape sensitive personal information
- Follow the law: Comply with local and international laws
- Be responsible: Use scraped data ethically and legally
Ready to implement these practices? Start with our Integration Guides →