Best Practices

Professional web scraping techniques and ethical guidelines for using PepeProxy.

Ethical Scraping

Respect robots.txt

Always check and respect the robots.txt file:

from urllib.robotparser import RobotFileParser

def can_scrape(url):
    """Check if scraping is allowed"""
    rp = RobotFileParser()
    rp.set_url(f"{url.scheme}://{url.netloc}/robots.txt")
    rp.read()
    return rp.can_fetch("*", url.geturl())

# Usage
from urllib.parse import urlparse
url = urlparse('https://example.com/page')
if can_scrape(url):
    # Proceed with scraping
    pass
else:
    print("Scraping not allowed by robots.txt")

Add Appropriate Delays

Never hammer servers with rapid-fire requests:

import time
import random

# Fixed delay
time.sleep(3)  # 3 seconds between requests

# Random delay (more human-like)
time.sleep(random.uniform(2, 5))  # 2-5 seconds

# Exponential backoff on errors
def fetch_with_backoff(url, max_retries=5):
    for attempt in range(max_retries):
        try:
            response = requests.get(url, proxies=proxies)
            response.raise_for_status()
            return response
        except requests.exceptions.RequestException:
            if attempt == max_retries - 1:
                raise
            wait = (2 ** attempt) + random.random()
            time.sleep(wait)

Identify Your Bot

Use a descriptive User-Agent when appropriate:

headers = {
    'User-Agent': 'MyCompanyBot/1.0 (+https://example.com/bot-info)'
}

For stealth scraping, use realistic browser user agents:

headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}

Honor Rate Limits

Implement rate limiting to avoid overwhelming servers:

from datetime import datetime, timedelta
from collections import deque

class RateLimiter:
    def __init__(self, max_requests, time_window_seconds):
        self.max_requests = max_requests
        self.time_window = timedelta(seconds=time_window_seconds)
        self.requests = deque()

    def acquire(self):
        now = datetime.now()

        # Remove old requests
        while self.requests and now - self.requests[0] > self.time_window:
            self.requests.popleft()

        # Check if we can make a request
        if len(self.requests) >= self.max_requests:
            sleep_time = (self.requests[0] + self.time_window - now).total_seconds()
            if sleep_time > 0:
                time.sleep(sleep_time)
                return self.acquire()

        self.requests.append(now)
        return True

# Usage: 10 requests per minute
limiter = RateLimiter(max_requests=10, time_window_seconds=60)

for url in urls:
    limiter.acquire()
    response = requests.get(url, proxies=proxies)

Proxy Management

Choose the Right Session Type

Rotating Proxies - Best for:

  • High-volume scraping
  • Data aggregation
  • Price monitoring
  • When you need fresh IPs

Sticky Proxies - Best for:

  • Login flows
  • Shopping carts
  • Multi-step forms
  • Account management
# Rotating: New IP each request
for url in large_list_of_urls:
    response = requests.get(url, proxies=rotating_proxies)

# Sticky: Same IP for 10 minutes
session = requests.Session()
session.proxies = sticky_proxies
response1 = session.get(login_url)
response2 = session.post(login_url, data=credentials)  # Same IP

Select Optimal Locations

Choose proxy locations strategically:

# Scraping US e-commerce sites
proxies = generate_proxies(country='United States', city='New York')

# Multi-region scraping
regions = ['United States', 'United Kingdom', 'Germany']
for region in regions:
    regional_proxies = generate_proxies(country=region)
    scrape_region(regional_proxies)

Implement Proxy Rotation

For large-scale scraping, rotate through multiple proxies:

import itertools

class ProxyPool:
    def __init__(self, proxy_list):
        self.proxies = itertools.cycle(proxy_list)
        self.current = None

    def get_next(self):
        self.current = next(self.proxies)
        return {
            'http': self.current,
            'https': self.current
        }

# Usage
proxy_list = [
    'http://user1:pass1@us-01.pepeproxy.com:2333',
    'http://user2:pass2@us-01.pepeproxy.com:2333',
    'http://user3:pass3@us-01.pepeproxy.com:2333',
]

pool = ProxyPool(proxy_list)

for url in urls:
    proxies = pool.get_next()
    response = requests.get(url, proxies=proxies)

Performance Optimization

Minimize Traffic Usage

1. Block Unnecessary Resources

# Puppeteer example
await page.setRequestInterception(True)
page.on('request', request => {
    const resourceType = request.resourceType()
    if (['image', 'stylesheet', 'font', 'media'].includes(resourceType)) {
        request.abort()  # Block images, CSS, fonts
    } else {
        request.continue()
    }
})

2. Use HEAD Requests When Possible

# Check if resource exists without downloading
response = requests.head(url, proxies=proxies)
if response.status_code == 200:
    print(f"Resource exists: {response.headers.get('Content-Length')} bytes")

3. Compress Responses

headers = {
    'Accept-Encoding': 'gzip, deflate, br'
}
response = requests.get(url, proxies=proxies, headers=headers)

4. Implement Caching

import requests_cache

# Cache responses for 1 hour
requests_cache.install_cache('scraper_cache', expire_after=3600)

# Now requests are cached automatically
response = requests.get(url, proxies=proxies)  # Downloads
response = requests.get(url, proxies=proxies)  # From cache (no traffic)

Concurrent Requests

Use async/concurrent requests for faster scraping:

import asyncio
import aiohttp

async def fetch(session, url, proxy):
    try:
        async with session.get(url, proxy=proxy, timeout=30) as response:
            return await response.text()
    except Exception as e:
        print(f"Error fetching {url}: {e}")
        return None

async def scrape_concurrent(urls, proxy):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url, proxy) for url in urls]
        return await asyncio.gather(*tasks)

# Usage
urls = ['https://example.com/page1', 'https://example.com/page2']
proxy = 'http://username:password@us-01.pepeproxy.com:2333'

results = asyncio.run(scrape_concurrent(urls, proxy))

Connection Pooling

Reuse connections to reduce overhead:

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

session = requests.Session()

# Configure retries
retry_strategy = Retry(
    total=3,
    backoff_factor=1,
    status_forcelist=[429, 500, 502, 503, 504]
)

adapter = HTTPAdapter(
    max_retries=retry_strategy,
    pool_connections=10,
    pool_maxsize=20
)

session.mount("http://", adapter)
session.mount("https://", adapter)
session.proxies = proxies

# Reuse session for all requests
for url in urls:
    response = session.get(url)

Error Handling

Robust Retry Logic

import time
import requests
from requests.exceptions import RequestException

def fetch_with_retry(url, proxies, max_retries=5):
    """Fetch URL with exponential backoff retry"""
    for attempt in range(max_retries):
        try:
            response = requests.get(
                url,
                proxies=proxies,
                timeout=(10, 30),  # (connect, read)
                headers={'User-Agent': 'Mozilla/5.0'}
            )

            # Check for specific status codes
            if response.status_code == 429:
                # Rate limited
                retry_after = int(response.headers.get('Retry-After', 60))
                print(f"Rate limited. Waiting {retry_after}s...")
                time.sleep(retry_after)
                continue

            response.raise_for_status()
            return response

        except requests.exceptions.Timeout:
            print(f"Timeout on attempt {attempt + 1}")
            if attempt == max_retries - 1:
                raise
            time.sleep(2 ** attempt)

        except requests.exceptions.ProxyError:
            print(f"Proxy error on attempt {attempt + 1}")
            # Try regenerating proxy or switching to backup
            if attempt == max_retries - 1:
                raise
            time.sleep(2 ** attempt)

        except RequestException as e:
            print(f"Request failed on attempt {attempt + 1}: {e}")
            if attempt == max_retries - 1:
                raise
            time.sleep(2 ** attempt)

    raise Exception(f"Failed to fetch {url} after {max_retries} attempts")

Handle Different Response Types

def parse_response(response):
    """Parse response based on content type"""
    content_type = response.headers.get('Content-Type', '')

    if 'application/json' in content_type:
        return response.json()
    elif 'text/html' in content_type:
        from bs4 import BeautifulSoup
        return BeautifulSoup(response.content, 'html.parser')
    elif 'application/xml' in content_type:
        import xml.etree.ElementTree as ET
        return ET.fromstring(response.content)
    else:
        return response.text

Logging and Monitoring

import logging
from datetime import datetime

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'scraper_{datetime.now().strftime("%Y%m%d")}.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

def scrape_with_logging(url, proxies):
    """Scrape with detailed logging"""
    logger.info(f"Starting scrape: {url}")

    try:
        start_time = time.time()
        response = requests.get(url, proxies=proxies, timeout=30)

        elapsed = time.time() - start_time
        logger.info(f"Success: {url} ({response.status_code}) in {elapsed:.2f}s")

        return response

    except requests.exceptions.Timeout:
        logger.error(f"Timeout: {url}")
        raise

    except requests.exceptions.RequestException as e:
        logger.error(f"Error scraping {url}: {e}")
        raise

Data Extraction

Use Proper Selectors

BeautifulSoup:

from bs4 import BeautifulSoup

soup = BeautifulSoup(html, 'html.parser')

# CSS selectors (preferred)
title = soup.select_one('h1.product-title').text.strip()
price = soup.select_one('span.price').text.strip()

# XPath-like navigation
products = soup.find_all('div', class_='product')
for product in products:
    name = product.find('h2').text
    price = product.find('span', class_='price').text

lxml (faster):

from lxml import html

tree = html.fromstring(response.content)

# XPath selectors
title = tree.xpath('//h1[@class="product-title"]/text()')[0]
prices = tree.xpath('//span[@class="price"]/text()')

Clean and Validate Data

import re
from decimal import Decimal

def clean_price(price_text):
    """Extract and clean price from text"""
    # Remove currency symbols and commas
    clean = re.sub(r'[^d.]', '', price_text)
    try:
        return Decimal(clean)
    except:
        return None

def clean_text(text):
    """Clean and normalize text"""
    if not text:
        return ''
    # Remove extra whitespace
    text = re.sub(r's+', ' ', text)
    return text.strip()

# Usage
raw_price = "$1,234.56"
price = clean_price(raw_price)  # Decimal('1234.56')

Storage Best Practices

Structured Data Storage

CSV:

import csv

with open('products.csv', 'w', newline='', encoding='utf-8') as f:
    writer = csv.DictWriter(f, fieldnames=['name', 'price', 'url'])
    writer.writeheader()

    for product in products:
        writer.writerow({
            'name': product['name'],
            'price': product['price'],
            'url': product['url']
        })

JSON:

import json

data = {
    'scraped_at': datetime.now().isoformat(),
    'source': 'example.com',
    'products': products
}

with open('products.json', 'w', encoding='utf-8') as f:
    json.dump(data, f, indent=2, ensure_ascii=False)

Database (SQLite):

import sqlite3

conn = sqlite3.connect('scraper.db')
cursor = conn.cursor()

# Create table
cursor.execute('''
    CREATE TABLE IF NOT EXISTS products (
        id INTEGER PRIMARY KEY,
        name TEXT,
        price REAL,
        url TEXT UNIQUE,
        scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
''')

# Insert data
cursor.execute(
    'INSERT OR REPLACE INTO products (name, price, url) VALUES (?, ?, ?)',
    (product['name'], product['price'], product['url'])
)

conn.commit()
conn.close()

Security Best Practices

Protect Credentials

Never hardcode credentials:

# ❌ BAD
proxy = 'http://myuser:mypass123@us-01.pepeproxy.com:2333'

# ✅ GOOD
import os
from dotenv import load_dotenv

load_dotenv()

proxy_user = os.getenv('PROXY_USER')
proxy_pass = os.getenv('PROXY_PASS')
proxy = f'http://{proxy_user}:{proxy_pass}@us-01.pepeproxy.com:2333'

Sanitize User Input

from urllib.parse import quote

def build_url(base, params):
    """Safely build URL with parameters"""
    query = '&'.join(f'{k}={quote(str(v))}' for k, v in params.items())
    return f'{base}?{query}'

# Usage
url = build_url('https://example.com/search', {
    'q': user_input,  # Safely quoted
    'page': 1
})

Validate SSL Certificates

# ✅ Always verify SSL (default)
response = requests.get(url, proxies=proxies, verify=True)

# Only disable for testing (never in production)
import urllib3
urllib3.disable_warnings()
response = requests.get(url, proxies=proxies, verify=False)

Monitoring and Maintenance

Track Success Rates

class ScraperStats:
    def __init__(self):
        self.total_requests = 0
        self.successful_requests = 0
        self.failed_requests = 0
        self.traffic_used_kb = 0

    def record_success(self, response):
        self.total_requests += 1
        self.successful_requests += 1
        self.traffic_used_kb += len(response.content) / 1024

    def record_failure(self):
        self.total_requests += 1
        self.failed_requests += 1

    def get_success_rate(self):
        if self.total_requests == 0:
            return 0
        return (self.successful_requests / self.total_requests) * 100

    def print_stats(self):
        print(f"Total Requests: {self.total_requests}")
        print(f"Successful: {self.successful_requests}")
        print(f"Failed: {self.failed_requests}")
        print(f"Success Rate: {self.get_success_rate():.2f}%")
        print(f"Traffic Used: {self.traffic_used_kb:.2f} KB")

# Usage
stats = ScraperStats()

for url in urls:
    try:
        response = fetch(url)
        stats.record_success(response)
    except:
        stats.record_failure()

stats.print_stats()

Set Up Alerts

import smtplib
from email.message import EmailMessage

def send_alert(subject, body):
    """Send email alert for critical failures"""
    msg = EmailMessage()
    msg.set_content(body)
    msg['Subject'] = subject
    msg['From'] = 'scraper@example.com'
    msg['To'] = 'admin@example.com'

    with smtplib.SMTP('smtp.gmail.com', 587) as smtp:
        smtp.starttls()
        smtp.login('user', 'password')
        smtp.send_message(msg)

# Usage
if stats.get_success_rate() < 50:
    send_alert(
        'Scraper Alert: Low Success Rate',
        f'Success rate dropped to {stats.get_success_rate():.2f}%'
    )

Legal and Compliance

Stay Compliant

  • Read Terms of Service: Always review target website’s ToS
  • Respect Copyright: Don’t scrape and republish copyrighted content
  • Personal Data: Follow GDPR/CCPA when scraping personal information
  • Attribution: Credit sources when required
  • Commercial Use: Ensure you have rights for commercial scraping

Ethical Guidelines

  1. Don’t cause harm: Never disrupt services or overload servers
  2. Be transparent: Identify your bot when appropriate
  3. Respect privacy: Don’t scrape sensitive personal information
  4. Follow the law: Comply with local and international laws
  5. Be responsible: Use scraped data ethically and legally

Ready to implement these practices? Start with our Integration Guides →