|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging
|
|
|
|
import asyncio
|
|
import os
|
|
import tempfile
|
|
from datetime import datetime
|
|
from typing import List, Dict
|
|
from urllib.parse import urljoin, urlparse
|
|
from xml.dom import minidom
|
|
from playwright.async_api import async_playwright
|
|
from bs4 import BeautifulSoup
|
|
import requests
|
|
import trafilatura
|
|
import xml.etree.ElementTree as ET
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_page_title(url: str) -> str:
|
|
try:
|
|
response = requests.get(url)
|
|
response.raise_for_status()
|
|
soup = BeautifulSoup(response.text, 'html.parser')
|
|
title_tag = soup.find('title')
|
|
return title_tag.string.strip() if title_tag else "Untitled"
|
|
except requests.RequestException as e:
|
|
logging.error(f"Error fetching page title: {e}")
|
|
return "Untitled"
|
|
|
|
|
|
def scrape_article(url):
|
|
async def fetch_html(url: str) -> str:
|
|
async with async_playwright() as p:
|
|
browser = await p.chromium.launch(headless=True)
|
|
context = await browser.new_context(
|
|
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3")
|
|
page = await context.new_page()
|
|
await page.goto(url)
|
|
await page.wait_for_load_state("networkidle")
|
|
content = await page.content()
|
|
await browser.close()
|
|
return content
|
|
|
|
|
|
def extract_article_data(html: str, url: str) -> dict:
|
|
downloaded = trafilatura.extract(html, include_comments=False, include_tables=False, include_images=False)
|
|
metadata = trafilatura.extract_metadata(html)
|
|
|
|
result = {
|
|
'title': 'N/A',
|
|
'author': 'N/A',
|
|
'content': '',
|
|
'date': 'N/A',
|
|
'url': url,
|
|
'extraction_successful': False
|
|
}
|
|
|
|
if downloaded:
|
|
result['content'] = downloaded
|
|
result['extraction_successful'] = True
|
|
|
|
if metadata:
|
|
result.update({
|
|
'title': metadata.title if metadata.title else 'N/A',
|
|
'author': metadata.author if metadata.author else 'N/A',
|
|
'date': metadata.date if metadata.date else 'N/A'
|
|
})
|
|
else:
|
|
logging.warning("Metadata extraction failed.")
|
|
|
|
if not downloaded:
|
|
logging.warning("Content extraction failed.")
|
|
|
|
return result
|
|
|
|
def convert_html_to_markdown(html: str) -> str:
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
for para in soup.find_all('p'):
|
|
|
|
para.append('\n')
|
|
|
|
return soup.get_text(separator='\n\n')
|
|
|
|
async def fetch_and_extract_article(url: str):
|
|
html = await fetch_html(url)
|
|
article_data = extract_article_data(html, url)
|
|
if article_data['extraction_successful']:
|
|
article_data['content'] = convert_html_to_markdown(article_data['content'])
|
|
return article_data
|
|
|
|
return asyncio.run(fetch_and_extract_article(url))
|
|
|
|
|
|
def collect_internal_links(base_url: str) -> set:
|
|
visited = set()
|
|
to_visit = {base_url}
|
|
|
|
while to_visit:
|
|
current_url = to_visit.pop()
|
|
if current_url in visited:
|
|
continue
|
|
|
|
try:
|
|
response = requests.get(current_url)
|
|
response.raise_for_status()
|
|
soup = BeautifulSoup(response.text, 'html.parser')
|
|
|
|
|
|
for link in soup.find_all('a', href=True):
|
|
full_url = urljoin(base_url, link['href'])
|
|
|
|
if urlparse(full_url).netloc == urlparse(base_url).netloc:
|
|
if full_url not in visited:
|
|
to_visit.add(full_url)
|
|
|
|
visited.add(current_url)
|
|
except requests.RequestException as e:
|
|
logging.error(f"Error visiting {current_url}: {e}")
|
|
continue
|
|
|
|
return visited
|
|
|
|
|
|
def generate_temp_sitemap_from_links(links: set) -> str:
|
|
"""
|
|
Generate a temporary sitemap file from collected links and return its path.
|
|
|
|
:param links: A set of URLs to include in the sitemap
|
|
:return: Path to the temporary sitemap file
|
|
"""
|
|
|
|
urlset = ET.Element("urlset")
|
|
urlset.set("xmlns", "http://www.sitemaps.org/schemas/sitemap/0.9")
|
|
|
|
|
|
for link in links:
|
|
url = ET.SubElement(urlset, "url")
|
|
loc = ET.SubElement(url, "loc")
|
|
loc.text = link
|
|
lastmod = ET.SubElement(url, "lastmod")
|
|
lastmod.text = datetime.now().strftime("%Y-%m-%d")
|
|
changefreq = ET.SubElement(url, "changefreq")
|
|
changefreq.text = "daily"
|
|
priority = ET.SubElement(url, "priority")
|
|
priority.text = "0.5"
|
|
|
|
|
|
xml_string = ET.tostring(urlset, 'utf-8')
|
|
|
|
|
|
pretty_xml = minidom.parseString(xml_string).toprettyxml(indent=" ")
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".xml", delete=False) as temp_file:
|
|
temp_file.write(pretty_xml)
|
|
temp_file_path = temp_file.name
|
|
|
|
logging.info(f"Temporary sitemap created at: {temp_file_path}")
|
|
return temp_file_path
|
|
|
|
|
|
def generate_sitemap_for_url(url: str) -> List[Dict[str, str]]:
|
|
"""
|
|
Generate a sitemap for the given URL using the create_filtered_sitemap function.
|
|
|
|
Args:
|
|
url (str): The base URL to generate the sitemap for
|
|
|
|
Returns:
|
|
List[Dict[str, str]]: A list of dictionaries, each containing 'url' and 'title' keys
|
|
"""
|
|
with tempfile.NamedTemporaryFile(mode="w+", suffix=".xml", delete=False) as temp_file:
|
|
create_filtered_sitemap(url, temp_file.name, is_content_page)
|
|
temp_file.seek(0)
|
|
tree = ET.parse(temp_file.name)
|
|
root = tree.getroot()
|
|
|
|
sitemap = []
|
|
for url_elem in root.findall(".//{http://www.sitemaps.org/schemas/sitemap/0.9}url"):
|
|
loc = url_elem.find("{http://www.sitemaps.org/schemas/sitemap/0.9}loc").text
|
|
sitemap.append({"url": loc, "title": loc.split("/")[-1] or url})
|
|
|
|
return sitemap
|
|
|
|
def scrape_entire_site(base_url: str) -> List[Dict]:
|
|
"""
|
|
Scrape the entire site by generating a temporary sitemap and extracting content from each page.
|
|
|
|
:param base_url: The base URL of the site to scrape
|
|
:return: A list of dictionaries containing scraped article data
|
|
"""
|
|
|
|
links = collect_internal_links(base_url)
|
|
logging.info(f"Collected {len(links)} internal links.")
|
|
|
|
|
|
temp_sitemap_path = generate_temp_sitemap_from_links(links)
|
|
|
|
|
|
scraped_articles = []
|
|
try:
|
|
for link in links:
|
|
logging.info(f"Scraping {link} ...")
|
|
article_data = scrape_article(link)
|
|
|
|
if article_data:
|
|
logging.info(f"Title: {article_data['title']}")
|
|
logging.info(f"Author: {article_data['author']}")
|
|
logging.info(f"Date: {article_data['date']}")
|
|
logging.info(f"Content: {article_data['content'][:500]}...")
|
|
|
|
scraped_articles.append(article_data)
|
|
finally:
|
|
|
|
os.unlink(temp_sitemap_path)
|
|
logging.info("Temporary sitemap file deleted")
|
|
|
|
return scraped_articles
|
|
|
|
|
|
def scrape_by_url_level(base_url: str, level: int) -> list:
|
|
"""Scrape articles from URLs up to a certain level under the base URL."""
|
|
|
|
def get_url_level(url: str) -> int:
|
|
return len(urlparse(url).path.strip('/').split('/'))
|
|
|
|
links = collect_internal_links(base_url)
|
|
filtered_links = [link for link in links if get_url_level(link) <= level]
|
|
|
|
return [article for link in filtered_links if (article := scrape_article(link))]
|
|
|
|
|
|
def scrape_from_sitemap(sitemap_url: str) -> list:
|
|
"""Scrape articles from a sitemap URL."""
|
|
try:
|
|
response = requests.get(sitemap_url)
|
|
response.raise_for_status()
|
|
root = ET.fromstring(response.content)
|
|
|
|
return [article for url in root.findall('.//{http://www.sitemaps.org/schemas/sitemap/0.9}loc')
|
|
if (article := scrape_article(url.text))]
|
|
except requests.RequestException as e:
|
|
logging.error(f"Error fetching sitemap: {e}")
|
|
return []
|
|
|
|
|
|
def convert_to_markdown(articles: list) -> str:
|
|
"""Convert a list of article data into a single markdown document."""
|
|
markdown = ""
|
|
for article in articles:
|
|
markdown += f"# {article['title']}\n\n"
|
|
markdown += f"Author: {article['author']}\n"
|
|
markdown += f"Date: {article['date']}\n\n"
|
|
markdown += f"{article['content']}\n\n"
|
|
markdown += "---\n\n"
|
|
return markdown
|
|
|
|
|
|
def is_content_page(url: str) -> bool:
|
|
"""
|
|
Determine if a URL is likely to be a content page.
|
|
This is a basic implementation and may need to be adjusted based on the specific website structure.
|
|
|
|
:param url: The URL to check
|
|
:return: True if the URL is likely a content page, False otherwise
|
|
"""
|
|
|
|
|
|
exclude_patterns = [
|
|
'/tag/', '/category/', '/author/', '/search/', '/page/',
|
|
'wp-content', 'wp-includes', 'wp-json', 'wp-admin',
|
|
'login', 'register', 'cart', 'checkout', 'account',
|
|
'.jpg', '.png', '.gif', '.pdf', '.zip'
|
|
]
|
|
return not any(pattern in url.lower() for pattern in exclude_patterns)
|
|
|
|
|
|
def create_filtered_sitemap(base_url: str, output_file: str, filter_function):
|
|
"""
|
|
Create a sitemap from internal links and filter them based on a custom function.
|
|
|
|
:param base_url: The base URL of the website
|
|
:param output_file: The file to save the sitemap to
|
|
:param filter_function: A function that takes a URL and returns True if it should be included
|
|
"""
|
|
links = collect_internal_links(base_url)
|
|
filtered_links = set(filter(filter_function, links))
|
|
|
|
root = ET.Element("urlset")
|
|
root.set("xmlns", "http://www.sitemaps.org/schemas/sitemap/0.9")
|
|
|
|
for link in filtered_links:
|
|
url = ET.SubElement(root, "url")
|
|
loc = ET.SubElement(url, "loc")
|
|
loc.text = link
|
|
|
|
tree = ET.ElementTree(root)
|
|
tree.write(output_file, encoding='utf-8', xml_declaration=True)
|
|
print(f"Filtered sitemap saved to {output_file}")
|
|
|
|
|
|
def scrape_from_filtered_sitemap(sitemap_file: str, filter_function) -> list:
|
|
"""
|
|
Scrape articles from a sitemap file, applying an additional filter function.
|
|
|
|
:param sitemap_file: Path to the sitemap file
|
|
:param filter_function: A function that takes a URL and returns True if it should be scraped
|
|
:return: List of scraped articles
|
|
"""
|
|
try:
|
|
tree = ET.parse(sitemap_file)
|
|
root = tree.getroot()
|
|
|
|
articles = []
|
|
for url in root.findall('.//{http://www.sitemaps.org/schemas/sitemap/0.9}loc'):
|
|
if filter_function(url.text):
|
|
article_data = scrape_article(url.text)
|
|
if article_data:
|
|
articles.append(article_data)
|
|
|
|
return articles
|
|
except ET.ParseError as e:
|
|
logging.error(f"Error parsing sitemap: {e}")
|
|
return []
|
|
|
|
|
|
def scrape_and_convert_with_filter(source: str, output_file: str, filter_function=is_content_page, level: int = None):
|
|
"""
|
|
Scrape articles from a sitemap or by URL level, apply filtering, and convert to a single markdown file.
|
|
|
|
:param source: URL of the sitemap, base URL for level-based scraping, or path to a local sitemap file
|
|
:param output_file: Path to save the output markdown file
|
|
:param filter_function: Function to filter URLs (default is is_content_page)
|
|
:param level: URL level for scraping (None if using sitemap)
|
|
"""
|
|
if level is not None:
|
|
|
|
articles = scrape_by_url_level(source, level)
|
|
articles = [article for article in articles if filter_function(article['url'])]
|
|
elif source.startswith('http'):
|
|
|
|
articles = scrape_from_sitemap(source)
|
|
articles = [article for article in articles if filter_function(article['url'])]
|
|
else:
|
|
|
|
articles = scrape_from_filtered_sitemap(source, filter_function)
|
|
|
|
articles = [article for article in articles if filter_function(article['url'])]
|
|
markdown_content = convert_to_markdown(articles)
|
|
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
f.write(markdown_content)
|
|
|
|
logging.info(f"Scraped and filtered content saved to {output_file}")
|
|
|
|
|
|
|
|
|
|
|