import requests
from bs4 import BeautifulSoup as bs
import re
import unicodedata
import sys
# Define your headers once, at the top
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'
}
# --- Helper function for text normalization ---
def normalize_text(text):
"""
Normalize text for more robust duplicate checking.
Converts to lowercase, standardizes internal whitespace.
You can add more normalization steps here if needed.
"""
if not isinstance(text, str):
return "" # Handle non-string input gracefully
# 1. Convert to lowercase
text = text.lower()
# 2. Replace various whitespace characters with a single space
# This handles spaces, tabs, newlines, non-breaking spaces (\xa0), etc.
text = re.sub(r'\s+', ' ', text).strip()
# 3. Optional: Remove or replace problematic characters (e.g., zero-width space \u200b)
text = text.replace('\u200b', '') # Example: remove zero width space
# Optional: More advanced Unicode normalization if character representation is an issue
# text = unicodedata.normalize('NFC', text)
return text
# --- Core Scraping Functions ---
def get_total_pages(url):
"""
Look inside the .entry container for any <a> tags whose text is a digit.
Returns the highest digit found, or 1 if none or error.
"""
print(f"Attempting to find total pages from: {url}")
try:
response = requests.get(url, headers=HEADERS, timeout=30)
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
soup = bs(response.text, 'html.parser')
content_div = soup.select_one('.entry')
if not content_div:
print("Warning: Could not find .entry container for pagination.")
return 1
page_numbers = []
# Look for pagination links within or near the content area
# Common selectors for pagination: div.page-links, div.pagination, .entry a
# Sticking to original scope (.entry a) but mentioning alternatives
pagination_candidates = content_div.select('a') # Look at all links in .entry
for a in pagination_candidates:
try:
# Get text including text from nested elements like <span>
text = a.get_text(strip=True)
if text.isdigit():
page_numbers.append(int(text))
except ValueError:
# Ignore links whose text is not purely a digit
pass
print(f"Error processing pagination link text: {e}")
print(f"Found potential page numbers: {page_numbers}")
return max(page_numbers) if page_numbers else 1
except requests.exceptions.RequestException as e:
print(f"Error fetching total pages from {url}: {e}")
return 1 # Assume 1 page on error
print(f"An unexpected error occurred while getting total pages from {url}: {e}")
return 1 # Assume 1 page on unexpected error
def parse_page_content(url, seen_texts, debug=False, page_number=1, normalize=True):
"""
Return only _new_ paragraphs (compared to seen_texts).
Includes optional text normalization for more robust duplicate detection.
"""
print(f"Scraping content from page {page_number} ({url})")
try:
response = requests.get(url, headers=HEADERS, timeout=30)
response.raise_for_status() # Raise an exception for bad status codes
soup = bs(response.text, 'html.parser')
# Remove non-content elements - ADDED header/footer as common areas for repetition
selectors_to_decompose = ['form', 'div.page-links', 'aside', 'script', 'style', 'header', 'footer']
for sel in selectors_to_decompose:
for tag in soup.select(sel):
tag.decompose()
content_div = soup.select_one('.entry')
if not content_div:
if debug:
print(f"Warning: Could not find .entry container on page {page_number}.")
return [] # Return empty list if main content div is not found
out = []
for p in content_div.find_all('p'):
original_text = p.get_text(strip=True)
# Skip empty paragraphs after stripping
if not original_text:
if debug:
print(f"Skipping empty paragraph on page {page_number}.")
continue
# Apply normalization for the check
text_to_check = normalize_text(original_text) if normalize else original_text
# --- Duplicate Check ---
# Use the normalized/checked text for the set lookup and add
if text_to_check not in seen_texts:
seen_texts.add(text_to_check)
# Add the *original* text to the output list
out.append(original_text)
if debug:
# Print start of checked text when adding new
print(f"Added new paragraph (checked text): {text_to_check[:100]}...")
elif debug:
# This message confirms the duplicate detection is working for the *checked* text
# Print start of checked text when skipping duplicate
print(f"Duplicate (or similar) text found on page {page_number}, skipping (checked text): {text_to_check[:100]}...")
if debug:
print(f"Page {page_number}: Found {len(out)} new paragraphs added to output.")
print(f"Seen texts set size: {len(seen_texts)}")
# print a sample of seen texts (the checked version)
seen_sample = list(seen_texts)[:min(5, len(seen_texts))]
print(f"Seen texts sample (checked version): {seen_sample}")
return out
except requests.exceptions.RequestException as e:
print(f"Error fetching content from page {page_number} ({url}): {e}")
return [] # Return empty list on error
print(f"An unexpected error occurred while parsing page {page_number} ({url}): {e}")
return [] # Return empty list on unexpected error
def get_next_page_url(base_url, page_number):
"""Append '/2/', '/3/' etc., but leave Page 1 as-is."""
if page_number <= 1:
return base_url
# Ensure base_url doesn't end with a slash already before adding page number
next_url = base_url.rstrip('/') + f'/{page_number}/'
# print(f"Generated URL for page {page_number}: {next_url}") # Optional debug
return next_url
def parse_all_pages(start_url, debug=False, normalize=True):
"""Orchestrates the scraping of all pages."""
if not start_url:
print("Error: start_url is empty.")
return {
'page_text': '',
'pages_scraped': 0,
'total_pages_detected': 0,
'total_unique_comparison_strings': 0,
'error
': 'Start
URL was empty
' }
total_pages = get_total_pages(start_url)
print(f"Total pages detected (or assumed): {total_pages}")
seen_texts = set() # This set tracks all unique paragraph strings *used for comparison*
chunks = []
pages_scraped_count = 0
# Loop through potential pages - attempt fetching a few pages beyond detected total
# This helps if total_pages is slightly off or detection fails.
# parse_page_content returning empty will trigger break if it happens after the first page.
# Using a range like total_pages + 5 for more buffer, but the break condition is key.
for i in range(1, total_pages + 5):
url = get_next_page_url(start_url, i)
if debug:
print(f"\n--- Attempting to scrape Page {i} ---")
# Pass the normalize flag down
new_paras = parse_page_content(url, seen_texts, debug=debug, page_number=i, normalize=normalize)
# Break condition: If we get *no* new paragraphs on any page *after* the first page, assume end.
# This is more robust than relying solely on total_pages prediction.
if not new_paras and i > 1:
print(f"No *new* content added from page {i} and it's not the first page. Assuming end of article.")
# Decide if you want to *stop* completely or just note it and continue for a few more pages
# The current range +5 will attempt a few more pages before the loop ends naturally.
# To stop immediately: break
if new_paras:
# Append the original text paragraphs to the chunks
chunks.append(f"Page {i}\n\n" + "\n\n".join(new_paras))
pages_scraped_count += 1
print(f"Successfully scraped page {i}. Added {len(new_paras)} new paragraphs. Total unique comparison strings seen: {len(seen_texts)}")
elif debug and i > 1: # Print this warning only for pages after the first if no new content was found
print(f"Page {i} returned no paragraphs (either empty, content div not found, or all paragraphs were duplicates based on comparison text).")
elif debug and i == 1 and not new_paras: # Special warning if even the first page had no content
print(f"Warning
: Page
1 returned no paragraphs.
Check URL and selectors.
") return {
'page_text': '',
'pages_scraped': 0,
'total_pages_detected': total_pages,
'total_unique_comparison_strings': len(seen_texts),
'error': 'No content found on the first page'
}
full_text = "\n\n".join(chunks).strip()
# Add a final check or summary for debugging
if debug:
print(f"\n--- Scraping Complete ---")
# i will be one more than the last attempted page if break occurred, or loop finished
print(f"Total pages attempted (approx): {i if i > 1 else 1}") # Report the last page number tried
print(f"Pages successfully scraped (with new content): {pages_scraped_count}")
print(f"Total unique comparison strings collected: {len(seen_texts)}")
return {
'page_text': full_text,
'pages_scraped': pages_scraped_count,
'total_pages_detected': total_pages,
'total_unique_comparison_strings': len(seen_texts)
}
# --- Example usage: ---
# Assuming 'input_data' is provided from your environment.
# If running standalone, you need to define 'input_data' manually.
# Check if input_data is defined (e.g., by an external system)
if 'input_data' not in locals():
print("Warning: 'input_data' dictionary not found. Defining a placeholder for standalone execution.")
# Define a placeholder/default input_data for local testing
# *** REPLACE THIS WITH YOUR ACTUAL START URL ***
input_data = {'initial_url': 'YOUR_URL_HERE'}
initial_url_to_scrape = input_data.get('initial_url', '').strip() # Use strip() in case of whitespace
if not initial_url_to_scrape or initial_url_to_scrape == 'YOUR_URL_HERE':
print("Please provide a valid
URL to scrape via
'input_data'.
") print("Example: input_data = {'initial_url': 'https://w...content-available-to-author-only...e.com/article/page/1/'}")
# Depending on your environment, you might exit or raise an error here
# sys.exit(1) # Uncomment this line if you want the script to stop when no URL is given
else:
print(f"Starting scrape for: {initial_url_to_scrape}")
# Set debug=True to see logging
# Set normalize=True to use text normalization for duplicate checking (recommended)
# Set normalize=False to revert to exact string matching only
output = parse_all_pages(initial_url_to_scrape, debug=True, normalize=True)
# Print the results
print("\n--- Final Output Summary ---")
print(f"Pages scraped with new content: {output.get('pages_scraped')}")
print(f"Total unique comparison strings collected: {output.get('total_unique_comparison_strings')}")
print(f"Initial total pages detected: {output.get('total_pages_detected')}")
print("\n--- Extracted Text ---")
# Use repr() for a small part to see hidden characters if needed for debugging
# print(repr(output.get('page_text', '')[:500]))
print(output.get('page_text')) # This prints the full combined text