LinuxParty

Inicio activadoInicio activadoInicio activadoInicio activadoInicio desactivado
 

Un Web crawler es un programa que navega por Internet de una manera predeterminada, configurable y automática, realizando acciones programadas en el contenido crawler. Los motores de búsqueda como Google y Yahoo utilizan rastreadores como medio de proporcionar datos de búsqueda, es decir, estos rastreadores encuentran y almacenan lo que luego tu vas a buscar.

 El rastreador multi-hilo basado python es bastante simple y rápido. Es capaz de detectar y eliminar enlaces duplicados y guardar la fuente y el enlace de la que luego puede usarse en la búsqueda de enlaces entrantes y salientes para el cálculo de fila de la página. Es totalmente gratuito y el código se muestra a continuación:

(Puede que dependiendo de la seguridad de la página, no funcione en todas las webs)


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Script de rastreo web básico y multihilo en Python.
---------------------------------------------------
Este script toma una URL inicial como argumento, descarga su contenido HTML,
lo guarda en un archivo local con nombre basado en la hora actual, y analiza
los enlaces (<a href="/...">) para seguir rastreando nuevas páginas.
Funciona de forma recursiva utilizando una cola (Queue) y múltiples hilos 
(thread.start_new_thread) para procesar las URLs encontradas.

Características:
- Evita procesar la misma URL más de una vez usando un conjunto (dupcheck).
- Almacena los HTML descargados en ficheros individuales.
- Construye URLs absolutas a partir de enlaces relativos.
- Procesa hasta 100 URLs en cola de forma concurrente.
- Limita la memoria del conjunto de URLs vistas borrándolo si crece demasiado.
"""

import sys, thread, Queue, re, urllib, urlparse, time, os, sys

# Conjunto para evitar URLs duplicadas ya procesadas
dupcheck = set()

# Cola con capacidad para 100 URLs a procesar
q = Queue.Queue(100)

# Insertar la URL inicial desde los argumentos del script
q.put(sys.argv[1])

def queueURLs(html, origLink):
    """
    Busca enlaces en el HTML y los añade a la cola para procesar.
    Convierte enlaces relativos a absolutos usando la URL de origen.
    """
    # Buscar todos los href de enlaces <a> en el HTML
    for url in re.findall(r'''<a[^>]+href=["'](.[^"']+)["']''', html, re.I):
        # Construir URL absoluta si es relativa
        link = url.split("#", 1)[0] if url.startswith("http") \
               else '{uri.scheme}://{uri.netloc}'.format(uri=urlparse.urlparse(origLink)) + url.split("#", 1)[0]
        
        # Saltar si la URL ya fue procesada
        if link in dupcheck:
            continue
        
        # Añadir la URL al conjunto de procesadas
        dupcheck.add(link)

        # Si la lista de URLs procesadas es enorme, vaciarla para ahorrar memoria
        if len(dupcheck) > 99999:
            dupcheck.clear()
        
        # Añadir la nueva URL a la cola de trabajo
        q.put(link)

def getHTML(link):
    """
    Descarga el HTML de una URL y lo guarda en un archivo local.
    Luego extrae nuevos enlaces y los añade a la cola.
    """
    try:
        # Descargar el HTML de la URL
        html = urllib.urlopen(link).read()

        # Guardar el HTML en un archivo con nombre basado en la hora actual
        open(str(time.time()) + ".html", "w").write("" % link + "\n" + html)

        # Analizar y encolar nuevas URLs
        queueURLs(html, link)

    except (KeyboardInterrupt, SystemExit):
        # Si se interrumpe el script, relanzar la excepción para terminar
        raise
    except Exception:
        # Ignorar cualquier otro error y continuar
        pass

# Bucle infinito para procesar URLs de la cola en nuevos hilos
while True:
    # Sacar una URL de la cola y procesarla en un hilo nuevo
    thread.start_new_thread(getHTML, (q.get(),))
    
    # Pausa pequeña para evitar sobrecarga
    time.sleep(0.5)

Guarde el código como LinuxPartyCrawler.py

Para iniciar el rastreo escriba:

python LinuxPartyCrawler.py https://es.wikipedia.org

Esta versión está modernizada a Python3

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Crawler web básico y concurrente (Python 3, stdlib)
---------------------------------------------------
- Parte de una URL inicial y rastrea enlaces <a href="/..."> encontrados.
- Descarga páginas HTML y las guarda como archivos .html en el directorio actual.
- Evita procesar la misma URL dos veces mediante un conjunto compartido (thread-safe).
- Usa ThreadPoolExecutor para concurrencia y urllib para las peticiones HTTP.
- Convierte enlaces relativos a absolutos con urllib.parse.urljoin.
- Permite limitar el número total de páginas a descargar.

Uso:
    python crawler.py https://ejemplo.com --workers 8 --max-pages 500 --delay 0.2
"""

from __future__ import annotations
import argparse
import logging
import os
import re
import threading
import time
from queue import Queue, Empty
from urllib.parse import urljoin, urlsplit
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
from concurrent.futures import ThreadPoolExecutor

# --- Configuración de logging (info por defecto) ---
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
)

# --- Expresión para extraer href de anchors (simple y rápida) ---
HREF_RE = re.compile(r'''<a[^>]+href=["'](.*?)["']''', re.I)

def normalize_sleep(delay: float) -> None:
    """Pequeña espera opcional entre peticiones para ser más amables con el servidor."""
    if delay > 0:
        time.sleep(delay)

def safe_filename(url: str) -> str:
    """
    Genera un nombre de archivo seguro a partir de la URL.
    Incluye timestamp para evitar colisiones y elimina caracteres problemáticos.
    """
    ts = int(time.time() * 1000)
    parts = urlsplit(url)
    base = (parts.netloc + parts.path).strip("/")
    if not base:
        base = "index"
    # Sustituye cualquier cosa no alfanumérica por guiones bajos
    base = re.sub(r"[^a-zA-Z0-9._-]+", "_", base)
    # Limita longitud por seguridad
    base = base[:140]  # suficientemente largo
    return f"{ts}_{base or 'page'}.html"

def fetch_html(url: str, timeout: float = 15.0) -> tuple[str | None, str | None]:
    """
    Descarga una URL y devuelve (html, content_type).
    Si no es texto/html o falla la descarga, devuelve (None, None).
    """
    headers = {
        "User-Agent": "SimpleCrawler/1.0 (+https://example.invalid)",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
    }
    try:
        req = Request(url, headers=headers)
        with urlopen(req, timeout=timeout) as resp:
            ctype = resp.headers.get("Content-Type", "")
            # Sólo seguimos con contenido HTML
            if "text/html" not in ctype.lower():
                return None, None
            raw = resp.read()
            # Intenta inferir charset del header, si no, usa utf-8
            charset = "utf-8"
            m = re.search(r"charset=([\w-]+)", ctype, re.I)
            if m:
                charset = m.group(1).strip()
            try:
                html = raw.decode(charset, errors="replace")
            except LookupError:
                html = raw.decode("utf-8", errors="replace")
            return html, ctype
    except (HTTPError, URLError, TimeoutError) as e:
        logging.debug("Error al descargar %s: %s", url, e)
    except Exception as e:  # robustez
        logging.debug("Excepción inesperada en %s: %s", url, e)
    return None, None

def extract_links(html: str, base_url: str) -> list[str]:
    """
    Extrae enlaces desde HTML y los convierte a absolutos con base_url.
    Filtra enlaces vacíos y fragmentos.
    """
    links = []
    for href in HREF_RE.findall(html):
        if not href or href.startswith("#") or href.lower().startswith("javascript:"):
            continue
        abs_url = urljoin(base_url, href.split("#", 1)[0])
        links.append(abs_url)
    return links

def save_html_to_file(url: str, html: str) -> str:
    """Guarda el HTML en un archivo .html y devuelve la ruta del archivo."""
    fname = safe_filename(url)
    with open(fname, "w", encoding="utf-8", errors="replace") as f:
        f.write(f"<!-- URL: {url} -->\n")
        f.write(html)
    return os.path.abspath(fname)

def worker_loop(
    name: str,
    q: Queue,
    seen: set[str],
    seen_lock: threading.Lock,
    max_pages: int,
    processed_counter: list[int],
    delay: float,
):
    """
    Bucle del trabajador:
    - Saca URL de la cola, la descarga, guarda, extrae enlaces y encola nuevos.
    - Se detiene cuando se alcanza max_pages o la cola se queda sin trabajo por un tiempo.
    """
    while True:
        if processed_counter[0] >= max_pages:
            return
        try:
            url = q.get(timeout=1.0)
        except Empty:
            # Si no hay trabajo, terminar el hilo
            return

        normalize_sleep(delay)

        html, ctype = fetch_html(url)
        if html is None:
            q.task_done()
            continue

        # Guarda la página
        try:
            path = save_html_to_file(url, html)
            with seen_lock:
                processed_counter[0] += 1
                idx = processed_counter[0]
            logging.info("[%s] (%d/%d) Guardado: %s", name, idx, max_pages, path)
        except Exception as e:
            logging.debug("No se pudo guardar %s: %s", url, e)

        # Extrae y encola enlaces nuevos
        for link in extract_links(html, url):
            with seen_lock:
                if link in seen or processed_counter[0] >= max_pages:
                    continue
                seen.add(link)
            try:
                q.put_nowait(link)
            except Exception:
                # Si la cola está llena, ignoramos y seguimos
                pass

        q.task_done()

def main():
    parser = argparse.ArgumentParser(description="Crawler web básico concurrente (Python 3).")
    parser.add_argument("start_url", help="URL inicial para comenzar el rastreo")
    parser.add_argument("--workers", type=int, default=8, help="Número de hilos (por defecto: 8)")
    parser.add_argument("--max-pages", type=int, default=200, help="Máximo de páginas a descargar (def: 200)")
    parser.add_argument("--delay", type=float, default=0.2, help="Retraso (segundos) entre peticiones por hilo (def: 0.2)")
    args = parser.parse_args()

    # Estructuras compartidas
    q: Queue[str] = Queue(maxsize=2000)
    seen: set[str] = set()
    seen_lock = threading.Lock()
    processed_counter = [0]  # lista para mutabilidad compartida

    # Semilla
    with seen_lock:
        seen.add(args.start_url)
    q.put(args.start_url)

    logging.info("Inicio: %s | workers=%d | max_pages=%d | delay=%.2fs",
                 args.start_url, args.workers, args.max_pages, args.delay)

    # Lanzamos el pool de hilos
    with ThreadPoolExecutor(max_workers=args.workers) as executor:
        futures = [
            executor.submit(
                worker_loop,
                f"worker-{i+1}",
                q,
                seen,
                seen_lock,
                args.max_pages,
                processed_counter,
                args.delay,
            )
            for i in range(args.workers)
        ]

        # Espera a que la cola se procese o se alcance el límite
        try:
            q.join()
        except KeyboardInterrupt:
            logging.warning("Interrumpido por el usuario.")

    logging.info("Finalizado. Total de páginas procesadas: %d", processed_counter[0])

if __name__ == "__main__":
    main()

Para que nuestro script respete "robots.txt" bastará añadir:

from urllib.robotparser import RobotFileParser
from urllib.parse import urlparse

# Diccionario para cachear robots.txt por dominio
robots_cache: dict[str, RobotFileParser] = {}

def can_fetch(url: str, user_agent: str = "SimpleCrawler") -> bool:
    """
    Comprueba si el user_agent puede acceder a la URL según robots.txt.
    Descarga y cachea robots.txt por dominio.
    """
    parsed = urlparse(url)
    base_url = f"{parsed.scheme}://{parsed.netloc}"
    
    # Si no está en cache, lo descargamos
    if base_url not in robots_cache:
        rp = RobotFileParser()
        rp.set_url(f"{base_url}/robots.txt")
        try:
            rp.read()
        except Exception:
            # Si falla, permitimos por defecto
            return True
        robots_cache[base_url] = rp
    
    return robots_cache[base_url].can_fetch(user_agent, url)

Y luego, en el bucle del crawler, justo antes de llamar a fetch_html(url) harías algo como:

if not can_fetch(url):
    logging.info("Bloqueado por robots.txt: %s", url)
    q.task_done()
    continue

 

Disfrute!

¡¡Atención!!
 Algunas Webs, (incluida la nuestra) tienen herramientas que protegen de ciertos bots y crawlers, y pueden bloquearte el acceso a la web, incluso de forma permanente. Haz pruebas bajo tu responsabilidad.

En otros artículos intentaré mostrar otros códigos similares realizados en PHP.

Comentarios  

-1 # rafa 14-09-2017 03:24
Hola en la escuela para la clase de big data me pidieron un codigo de web crawler que funcione para descargar paginas, me pueden ayudar a pasarme su codigo o decirme como lo hago correr en mi computadora por favor?
# LinuxParty 20-09-2017 23:52
Hola Rafa:

Me sorprende que escribas eso, arriba ves el código y una explicación de cómo funciona. Evidentemente desde un Linux. Si no tienes Linux, deberás hacerlo correr desde la consola de tu sistema, y "python", deberá estar en el path para poder ejecutarse.

No estás registrado para postear comentarios



Redes:



   

 

Suscribete / Newsletter

Suscribete a nuestras Newsletter y periódicamente recibirás un resumen de las noticias publicadas.

Donar a LinuxParty

Probablemente te niegues, pero.. ¿Podrías ayudarnos con una donación?


Tutorial de Linux

Top 15 artículos por Fecha

Viendo artículos de: Julio de 2025

Filtro por Categorías