from typing import Any, Tuple from yarl import URL import copy import zipfile from ebooklib import epub from pathlib import Path from pydantic import BaseModel, TypeAdapter from argparse import ArgumentParser import json import re from requests_tor import RequestsTor import requests IMAGE_REMOVE_REGEX = re.compile(r"""https?.*)\".*/>""") class Author(BaseModel): id: int name: str class BookMedia(BaseModel): name: str rusName: str engName: str otherNames: list[str] authors: list[Author] summary: str class Branch(BaseModel): id: int branchId: int def process_text(text_obj: dict): content = text_obj["text"] for mark in text_obj.get("marks", []): if mark["type"] == "italic": content = f"{content}" elif mark["type"] == "bold": content = f"{content}" else: raise ValueError(f"unknown mark for text {text_obj}") return content class Chapter(BaseModel): id: int volume: int number: str name: str branches: list[Branch] withBranches: bool def get_zip_path(self, base_dir: Path) -> Path | None: nobranch_zip = base_dir / f"v{self.volume}-n{self.number}-{self.id}.zip" if nobranch_zip.exists(): return nobranch_zip for branch in self.branches: branch_zip = ( base_dir / f"v{self.volume}-n{self.number}-{self.id}-b{branch.branchId}.zip" ) if branch_zip.exists(): return branch_zip def get_image(self, image_id: str, zip: zipfile.ZipFile) -> Tuple[str, bytes]: for file in zip.namelist(): if file.startswith(image_id): return (file, zip.read(file)) raise FileNotFoundError(f"Image {image_id} is not found in the archive") def load(self, base_dir: Path) -> Tuple[epub.EpubHtml, list[Any]]: title = f"Глава {self.number}" book_item = epub.EpubHtml( title=title, file_name="Text/ch{}.xhtml".format(self.number), lang="ru", ) zip_path = self.get_zip_path(base_dir) if zip_path is None: raise FileNotFoundError(f"Chapter for `{self}` not found") zip = zipfile.ZipFile(zip_path) data = zip.read("data.txt") try: content = json.loads(data) except Exception: # It's not a json, so we just attach content and return. data = data.decode("utf-8") content = f"

Глава {self.number}

\n{data}" book_item.content = content return (book_item, []) if content["type"] != "doc": raise ValueError(f"{self} contains unknown document format") output = [] extras = [] print(f"parsing {zip.filename}") output.append(f"

Глава {self.number}

") for item in content["content"]: if item["type"] in {"paragraph", "heading"}: inner = [] for sub_item in item.get("content", []): if sub_item["type"] == "text": inner.append(process_text(sub_item)) elif sub_item["type"] == "hardBreak": inner.append("
") else: raise ValueError(f"{self} - Unknown sub-item") inner_content = "\n".join(inner) if item["type"] == "heading": attrs = item.get("attrs", {}) level = attrs.get("level", 3) align = attrs.get("textAlign", "center") output.append( f'{inner_content}' ) if item["type"] == "paragraph": output.append(f"

{inner_content}

") elif item["type"] == "image": for image in item["attrs"]["images"]: image_name = image["image"] image_path, image_bytes = self.get_image(image_name, zip) image_item = epub.EpubImage( uid=image_name, file_name=image_path, content=image_bytes, ) extras.append(image_item) output.append(f'') elif item["type"] == "horizontalRule": output.append("
") else: raise ValueError(f"{self} - unknown content type") # Connect all items book_item.content = "\n".join(output) return (book_item, extras) def request_with_id_update( self, client: requests.Session | RequestsTor, url: str, ) -> bytes | None: print("Fetching", url) resp = client.get(url) if resp.status_code == 429: print("Ratelimit hit.") if isinstance(client, RequestsTor): print("Updating client's IP") client.new_id() resp = client.get(url) if not resp.ok: return None return resp.content def replace_images( self, page: epub.EpubHtml, client: requests.Session | RequestsTor, cache: Path, ) -> list[epub.EpubImage]: new_content = copy.copy(page.content) replaces = [] for match in IMAGE_REMOVE_REGEX.finditer(page.content): # type: ignore target_str: str = page.content[match.start() : match.end()] # type: ignore src: str = match.group("src") url = URL(src) strip_path = url.path.lstrip("/") cached_path = cache / strip_path if (cached_path).exists(): print(f"Using cached image from {cached_path}") resp = cached_path.read_bytes() else: resp = self.request_with_id_update(client, src) if resp: cached_path.parent.mkdir(parents=True, exist_ok=True) cached_path.write_bytes(resp) if resp is None: new_content = new_content.replace(target_str, "") # type: ignore continue img = epub.EpubImage( file_name=strip_path, content=resp, ) replaces.append(img) newiimage = f'' new_content = new_content.replace(target_str, newiimage) # type: ignore page.content = new_content return replaces def add_to_book( self, book: epub.EpubBook, base_dir: Path, fetch_images: bool, client: requests.Session | RequestsTor, cache: Path, ): (item, extras) = self.load(base_dir) for extra in extras: book.add_item(extra) if not fetch_images: item.content = IMAGE_REMOVE_REGEX.sub("", item.content) else: for image in self.replace_images(item, client, cache): # type: ignore book.add_item(image) book.add_item(item) book.spine.append(item) book.toc.append(item) def parse_args(): parser = ArgumentParser() parser.add_argument( "--input-dir", "-i", dest="input", type=Path, help="Dumped book directory", required=True, ) parser.add_argument( "--output-dir", "-o", dest="output", type=Path, default="output", help="Where to put output EPUB files", ) parser.add_argument( "--volume", type=int, required=False, ) parser.add_argument( "--cover", "-c", type=Path, required=False, help="Path to cover image", ) parser.add_argument( "--fetch-images", action="store_true", ) parser.add_argument( "--cache", type=Path, default="cache", help="Images cache directory", ) parser.add_argument( "--tor-ports", type=lambda x: tuple(int(p) for p in x.split(",") if p), default="", ) parser.add_argument("--tor-controller-port", type=int, default=9051) parser.add_argument("--tor-password", type=str, default=None) return parser.parse_args() def main(): args = parse_args() cover: bytes | None = None if args.cover is not None: cover = args.cover.read_bytes() info: BookMedia = BookMedia.model_validate( json.load((args.input / "info.json").open())["media"] ) chapters: list[Chapter] = sorted( TypeAdapter(list[Chapter]).validate_python( json.load((args.input / "chapters.json").open()) ), key=lambda c: float(c.number), ) if args.volume is not None: chapters = [c for c in chapters if c.volume == args.volume] book = epub.EpubBook() for author in info.authors: book.add_author(author.name) book.set_language("ru") book.set_title(info.rusName) book.add_metadata(namespace="DC", name="description", value=info.summary) book.spine = [] if cover: book.spine.insert(0, "cover") book.set_cover(args.cover.name, cover) if args.tor_ports: print(f"Using tor ports: {args.tor_ports}") client = RequestsTor( tor_ports=args.tor_ports, tor_cport=args.tor_controller_port, password=args.tor_password, ) else: print("Using default client") client = requests.Session() for chapter in chapters: chapter.add_to_book(book, args.input, args.fetch_images, client, args.cache) book.add_item(epub.EpubNcx()) output_path: Path = args.output / f"{info.rusName}-{args.volume or 'full'}.epub" output_path.parent.mkdir(parents=True, exist_ok=True) epub.write_epub(str(output_path), book) if __name__ == "__main__": main()