#!/usr/bin/env python3
"""
Phase 2: daily blogger-list verifier.

Reads the next BATCH_SIZE rows from phase1_survivors.csv (starting at the
saved cursor), verifies email + website for each, and writes a day-stamped
results file. Designed to be invoked once per day from cron.

State files:
  phase2_state/cursor.txt          — next row index to process (0-based)
Outputs:
  phase2_results/YYYY-MM-DD.csv    — that day's verified rows
  phase2.log                       — per-run summary

Output columns:
  category, website, email,
  mx_ok, dns_ok, http_status, final_url,
  blog_status, email_status, final_status, reason

final_status:
  send  — verified email domain + live blog signals
  risky — email OK but blog has no clear blog signal (still a real site)
  skip  — dead site, parked, blocked, dead email domain, etc.
"""
import csv
import os
import re
import socket
import ssl
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from urllib.parse import urlparse, urlsplit

import dns.resolver
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# Hard socket-level timeout — last-resort guard against requests whose
# `timeout=` doesn't fire (e.g., servers that trickle bytes during
# iter_content). Without this, slow stragglers can hang worker threads
# indefinitely.
socket.setdefaulttimeout(15)

# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
ROOT       = "/home2/writeup/public_html/outreach"
SURVIVORS  = f"{ROOT}/phase1_survivors.csv"
STATE_DIR  = f"{ROOT}/phase2_state"
RESULTS    = f"{ROOT}/phase2_results"
LOG_FILE   = f"{ROOT}/phase2.log"
CURSOR     = f"{STATE_DIR}/cursor.txt"

BATCH_SIZE   = 10_000
WORKERS      = 30
HTTP_TIMEOUT = 12          # seconds
MAX_BYTES    = 250_000     # don't download more than ~250 KB of HTML
USER_AGENT   = (
    "WriteUpCafe-DirectoryBot/1.0 "
    "(+https://writeupcafe.com/contact; verifying directory submission candidates)"
)

PARKING_PATTERNS = re.compile(
    r"(domain (is )?for sale|buy this domain|this domain has expired|"
    r"this site can'?t be reached|website coming soon|"
    r"is parked free|godaddy|sedo\.com|hugedomains|dan\.com|"
    r"namecheap parking|expired domain|under construction)",
    re.IGNORECASE,
)
BLOG_SIGNAL_PATTERNS = (
    re.compile(r'<link[^>]+type=["\']application/(rss|atom)\+xml["\']', re.I),
    re.compile(r'href=["\'][^"\']*/(feed|rss|atom)/?["\']', re.I),
    re.compile(r'href=["\'][^"\']*/blog/?["\']', re.I),
    re.compile(r'<article[\s>]', re.I),
    re.compile(r'(posted on|published (on|by)|by\s+<a[^>]+rel=["\']author)', re.I),
    re.compile(r'(wp-content/|/wp-includes/)', re.I),    # WordPress
    re.compile(r'ghost\.io|ghost\.org', re.I),
    re.compile(r'name=["\']generator["\'][^>]+(WordPress|Ghost|Hugo|Jekyll|Substack|Wix Blog)', re.I),
)
# Hosts where a redirect offsite is normal (not a parking page)
SAFE_REDIRECT_HOSTS = {
    "wordpress.com", "blogger.com", "blogspot.com", "medium.com", "substack.com",
    "ghost.io", "wixsite.com", "weebly.com", "squarespace.com", "tumblr.com",
}

# ---------------------------------------------------------------------------
# DNS / HTTP plumbing
# ---------------------------------------------------------------------------
_resolver = dns.resolver.Resolver()
_resolver.timeout  = 4
_resolver.lifetime = 6

_mx_cache  = {}
_dns_cache = {}


def email_domain(addr):
    return addr.rsplit("@", 1)[-1].lower().strip()


def host_of(url):
    s = (url or "").strip()
    if "://" not in s:
        s = "http://" + s
    try:
        h = urlsplit(s).netloc.lower()
    except Exception:
        return ""
    if h.startswith("www."):
        h = h[4:]
    return h.split(":", 1)[0]


def has_mx(domain):
    if not domain:
        return False
    if domain in _mx_cache:
        return _mx_cache[domain]
    try:
        answers = _resolver.query(domain, "MX")
        ok = any(r.exchange for r in answers)
    except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN,
            dns.resolver.NoNameservers, dns.exception.Timeout):
        # Some domains accept mail at A record (legal per RFC 5321)
        try:
            _resolver.query(domain, "A")
            ok = True
        except Exception:
            ok = False
    except Exception:
        ok = False
    _mx_cache[domain] = ok
    return ok


def resolves(host):
    if not host:
        return False
    if host in _dns_cache:
        return _dns_cache[host]
    try:
        socket.gethostbyname(host)
        ok = True
    except Exception:
        ok = False
    _dns_cache[host] = ok
    return ok


def make_session():
    s = requests.Session()
    s.headers.update({
        "User-Agent": USER_AGENT,
        "Accept": "text/html,application/xhtml+xml,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
    })
    retry = Retry(total=2, backoff_factor=0.4,
                  status_forcelist=(500, 502, 503, 504))
    s.mount("http://",  HTTPAdapter(max_retries=retry))
    s.mount("https://", HTTPAdapter(max_retries=retry))
    return s


# ---------------------------------------------------------------------------
# Per-row verification
# ---------------------------------------------------------------------------
def verify_row(row, session):
    category, website, email = row[0], row[1], row[2]
    src_host = host_of(website)
    out = {
        "category": category, "website": website, "email": email,
        "mx_ok": 0, "dns_ok": 0, "http_status": "",
        "final_url": "", "blog_status": "", "email_status": "",
        "final_status": "skip", "reason": "",
    }

    out["mx_ok"] = 1 if has_mx(email_domain(email)) else 0
    out["email_status"] = "ok" if out["mx_ok"] else "no_mx"

    if not src_host:
        out["blog_status"] = "bad_url"; out["reason"] = "could not parse website URL"
        return out

    if not resolves(src_host):
        out["blog_status"] = "dead"; out["reason"] = "DNS does not resolve"
        return out
    out["dns_ok"] = 1

    url = website.strip()
    if "://" not in url:
        url = "http://" + url

    try:
        r = session.get(url, timeout=(8, HTTP_TIMEOUT), allow_redirects=True, stream=True)
        out["http_status"] = str(r.status_code)
        out["final_url"]   = r.url
        body_bytes = b""
        body_start = time.time()
        try:
            for chunk in r.iter_content(chunk_size=16_384, decode_unicode=False):
                body_bytes += chunk
                if len(body_bytes) >= MAX_BYTES:
                    break
                # Wall-clock guard: iter_content does not honour the requests
                # timeout for individual chunks, so a server that drips bytes
                # can hang a worker. Bail out after HTTP_TIMEOUT seconds total.
                if time.time() - body_start > HTTP_TIMEOUT:
                    break
        finally:
            r.close()
        body = body_bytes.decode(r.encoding or "utf-8", errors="replace")
    except requests.exceptions.SSLError:
        out["http_status"] = "ssl_error"; out["blog_status"] = "blocked"
        out["reason"] = "SSL error"; return out
    except requests.exceptions.ConnectTimeout:
        out["http_status"] = "timeout"; out["blog_status"] = "timeout"
        out["reason"] = "connect timeout"; return out
    except requests.exceptions.ReadTimeout:
        out["http_status"] = "timeout"; out["blog_status"] = "timeout"
        out["reason"] = "read timeout"; return out
    except requests.exceptions.TooManyRedirects:
        out["http_status"] = "loop"; out["blog_status"] = "blocked"
        out["reason"] = "redirect loop"; return out
    except requests.exceptions.ConnectionError as e:
        out["http_status"] = "conn_error"; out["blog_status"] = "dead"
        out["reason"] = f"connection failed: {type(e).__name__}"; return out
    except Exception as e:
        out["http_status"] = "error"; out["blog_status"] = "error"
        out["reason"] = f"{type(e).__name__}"; return out

    final_host = host_of(out["final_url"])
    if r.status_code >= 400:
        out["blog_status"] = "dead"
        out["reason"] = f"HTTP {r.status_code}"
        return out

    # Off-site redirect → likely parked or expired (unless landed on a blog
    # platform where the source domain is just an alias for a hosted blog)
    if final_host and final_host != src_host and not final_host.endswith("." + src_host):
        if final_host not in SAFE_REDIRECT_HOSTS and \
           not any(final_host.endswith("." + s) for s in SAFE_REDIRECT_HOSTS):
            out["blog_status"] = "redirect_offsite"
            out["reason"] = f"redirected to {final_host}"
            return out

    if PARKING_PATTERNS.search(body):
        out["blog_status"] = "parked"
        out["reason"] = "parking-page marker found"
        return out

    has_signal = any(p.search(body) for p in BLOG_SIGNAL_PATTERNS)
    if has_signal:
        out["blog_status"] = "live_blog"
    else:
        out["blog_status"] = "live_no_blog_signal"

    # Final classification
    if out["mx_ok"] and out["blog_status"] == "live_blog":
        out["final_status"] = "send"
        out["reason"] = "verified blog + email domain"
    elif out["mx_ok"] and out["blog_status"] == "live_no_blog_signal":
        out["final_status"] = "risky"
        out["reason"] = "live site but no clear blog signals"
    else:
        out["final_status"] = "skip"
        if not out["mx_ok"]:
            out["reason"] = "email domain has no MX"
    return out


# ---------------------------------------------------------------------------
# Driver
# ---------------------------------------------------------------------------
def read_cursor():
    if not os.path.exists(CURSOR):
        return 0
    try:
        return int(open(CURSOR).read().strip() or "0")
    except Exception:
        return 0


def write_cursor(n):
    os.makedirs(STATE_DIR, exist_ok=True)
    tmp = CURSOR + ".tmp"
    with open(tmp, "w") as f:
        f.write(str(n))
    os.replace(tmp, CURSOR)


def slice_batch(start, size):
    rows = []
    with open(SURVIVORS, newline="", encoding="utf-8", errors="replace") as fh:
        reader = csv.reader(fh)
        next(reader, None)
        for i, row in enumerate(reader):
            if i < start:
                continue
            if i >= start + size:
                break
            if len(row) >= 3:
                rows.append(row)
    return rows


def total_survivors():
    n = 0
    with open(SURVIVORS, encoding="utf-8", errors="replace") as fh:
        for _ in fh:
            n += 1
    return max(0, n - 1)  # minus header


def run_batch(batch_size: int = BATCH_SIZE) -> None:
    os.makedirs(RESULTS, exist_ok=True)
    os.makedirs(STATE_DIR, exist_ok=True)

    start = read_cursor()
    rows  = slice_batch(start, batch_size)
    if not rows:
        log(f"cursor={start:,}: nothing left to process — list complete")
        return

    today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
    out_path = f"{RESULTS}/{today}.csv"
    new_file = not os.path.exists(out_path)

    counts = {"send": 0, "risky": 0, "skip": 0, "total": 0}
    blog_counts: dict[str, int] = {}
    t0 = time.time()

    with open(out_path, "a", newline="", encoding="utf-8") as fout:
        w = csv.writer(fout)
        if new_file:
            w.writerow([
                "category", "website", "email",
                "mx_ok", "dns_ok", "http_status", "final_url",
                "blog_status", "email_status", "final_status", "reason",
            ])
        session = make_session()
        with ThreadPoolExecutor(max_workers=WORKERS) as pool:
            futures = {pool.submit(verify_row, row, session): row for row in rows}
            for fut in as_completed(futures):
                try:
                    r = fut.result()
                except Exception as e:
                    src = futures[fut]
                    r = {"category": src[0], "website": src[1], "email": src[2],
                         "mx_ok": 0, "dns_ok": 0, "http_status": "exc",
                         "final_url": "", "blog_status": "error",
                         "email_status": "", "final_status": "skip",
                         "reason": f"worker exception: {type(e).__name__}"}
                w.writerow([
                    r["category"], r["website"], r["email"],
                    r["mx_ok"], r["dns_ok"], r["http_status"], r["final_url"],
                    r["blog_status"], r["email_status"], r["final_status"], r["reason"],
                ])
                counts[r["final_status"]] += 1
                counts["total"] += 1
                blog_counts[r["blog_status"]] = blog_counts.get(r["blog_status"], 0) + 1
                if counts["total"] % 500 == 0:
                    fout.flush()

    write_cursor(start + len(rows))
    elapsed = time.time() - t0
    total = total_survivors()
    pct = 100 * (start + len(rows)) / total if total else 0
    log(
        f"batch start={start:,} size={len(rows):,} elapsed={elapsed:.0f}s "
        f"send={counts['send']:,} risky={counts['risky']:,} skip={counts['skip']:,} "
        f"blog_breakdown={blog_counts} progress={pct:.1f}% file={out_path}"
    )


def log(msg: str) -> None:
    line = f"[{datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')}Z] {msg}"
    print(line, file=sys.stderr)
    with open(LOG_FILE, "a", encoding="utf-8") as f:
        f.write(line + "\n")


if __name__ == "__main__":
    size = BATCH_SIZE
    if len(sys.argv) > 1:
        try:
            size = int(sys.argv[1])
        except ValueError:
            pass
    run_batch(size)
