from typing import Any, Tuple from yarl import URL import copy import zipfile from ebooklib import epub from pathlib import Path from pydantic import BaseModel, TypeAdapter from argparse import ArgumentParser import json import re from requests_tor import RequestsTor import requests import hashlib from PIL import Image from io import BytesIO IMAGE_REMOVE_REGEX = re.compile(r"""https?.*)\".*/>""") KINDLE_MAX_HEIGHT = 1024 KINDLE_MAX_WIDTH = 768 class Author(BaseModel): id: int name: str class BookMedia(BaseModel): name: str rusName: str engName: str otherNames: list[str] authors: list[Author] summary: str class BranchTeam(BaseModel): name: str class Branch(BaseModel): id: int branchId: int teams: list[BranchTeam] IMG_HASHES: dict[bytes, str] = {} def process_text(text_obj: dict): content = text_obj["text"] for mark in text_obj.get("marks", []): if mark["type"] == "italic": content = f"{content}" elif mark["type"] == "bold": content = f"{content}" elif mark["type"] == "underline": content = f"{content}" else: raise ValueError(f"unknown mark for text {text_obj}") return content def fit_image(data: bytes) -> bytes: buff = BytesIO(data) img = Image.open(buff) out_buff = BytesIO() img = img.convert("RGB") img.thumbnail((KINDLE_MAX_WIDTH, KINDLE_MAX_HEIGHT)) img.save(out_buff, format="JPEG") return out_buff.getvalue() class Chapter(BaseModel): id: int volume: int number: str name: str branches: list[Branch] withBranches: bool def get_translators(self, base_dir: Path) -> list[str]: if len(self.branches) == 1: return [team.name for team in self.branches[0].teams] for branch in self.branches: branch_zip = ( base_dir / f"v{self.volume}-n{self.number}-{self.id}-b{branch.branchId}.zip" ) if branch_zip.exists(): return [team.name for team in branch.teams] return [] def get_zip_path(self, base_dir: Path) -> Path | None: nobranch_zip = base_dir / f"v{self.volume}-n{self.number}-{self.id}.zip" if nobranch_zip.exists(): return nobranch_zip for branch in self.branches: branch_zip = ( base_dir / f"v{self.volume}-n{self.number}-{self.id}-b{branch.branchId}.zip" ) if branch_zip.exists(): return branch_zip def get_image(self, image_id: str, zip: zipfile.ZipFile) -> Tuple[str, bytes]: for file in zip.namelist(): if file.startswith(image_id): return (file, zip.read(file)) raise FileNotFoundError(f"Image {image_id} is not found in the archive") def load(self, base_dir: Path) -> Tuple[epub.EpubHtml, list[Any]]: title = f"Глава {self.number}" book_item = epub.EpubHtml( title=title, file_name="Text/ch{}.xhtml".format(self.number), lang="ru", ) zip_path = self.get_zip_path(base_dir) if zip_path is None: raise FileNotFoundError(f"Chapter for `{self}` not found") zip = zipfile.ZipFile(zip_path) print(f"Loaded {zip.filename}") data = zip.read("data.txt") try: content = json.loads(data) except Exception: # It's not a json, so we just attach content and return. data = data.decode("utf-8") content = f"

Глава {self.number}

\n{data}" book_item.content = content return (book_item, []) if content["type"] != "doc": raise ValueError(f"{self} contains unknown document format") output = [] extras = [] output.append(f"

Глава {self.number}

") for item in content["content"]: if item["type"] in {"paragraph", "heading"}: inner = [] for sub_item in item.get("content", []): if sub_item["type"] == "text": inner.append(process_text(sub_item)) elif sub_item["type"] == "hardBreak": inner.append("
") else: raise ValueError(f"{self} - Unknown sub-item") inner_content = "\n".join(inner) if item["type"] == "heading": attrs = item.get("attrs", {}) level = attrs.get("level", 3) align = attrs.get("textAlign", "center") output.append( f'{inner_content}' ) if item["type"] == "paragraph": output.append(f"

{inner_content}

") elif item["type"] == "image": for image in item["attrs"]["images"]: image_name = image["image"] image_path, image_bytes = self.get_image(image_name, zip) image_item = epub.EpubImage( file_name=image_path, media_type="image/jpeg", content=fit_image(image_bytes), ) extras.append(image_item) output.append( f'
' ) elif item["type"] == "horizontalRule": output.append("
") else: raise ValueError(f"{self} - unknown content type") # Connect all items book_item.content = "\n".join(output) return (book_item, extras) def request_with_id_update( self, client: requests.Session | RequestsTor, url: str, ) -> bytes | None: print("Fetching", url) resp = client.get(url) if resp.status_code == 429: print("Ratelimit hit.") if isinstance(client, RequestsTor): print("Updating client's IP") client.new_id() resp = client.get(url) if not resp.ok: return None return resp.content def replace_images( self, page: epub.EpubHtml, client: requests.Session | RequestsTor, cache: Path, ) -> list[epub.EpubImage]: new_content = copy.copy(page.content) replaces = [] for match in IMAGE_REMOVE_REGEX.finditer(page.content): # type: ignore target_str: str = page.content[match.start() : match.end()] # type: ignore src: str = match.group("src") url = URL(src) strip_path = url.path.lstrip("/") cached_path = cache / strip_path if (cached_path).exists(): print(f"Using cached image from {cached_path}") resp = cached_path.read_bytes() else: resp = self.request_with_id_update(client, src) if resp: cached_path.parent.mkdir(parents=True, exist_ok=True) cached_path.write_bytes(resp) if resp is None: new_content = new_content.replace(target_str, "") # type: ignore continue md5 = hashlib.md5(resp).digest() final_path = None if md5 in IMG_HASHES: similar_path = IMG_HASHES[md5] img_bytes = (cache / similar_path).read_bytes() if img_bytes == resp: print("Found identical image in cache") final_path = similar_path else: IMG_HASHES[md5] = strip_path if final_path is None: img = epub.EpubImage( file_name=strip_path, media_type="image/jpeg", content=fit_image(resp), ) replaces.append(img) final_path = strip_path newiimage = f'
' print(target_str, newiimage) new_content = new_content.replace(target_str, newiimage) # type: ignore page.content = new_content return replaces def add_to_book( self, book: epub.EpubBook, base_dir: Path, fetch_images: bool, client: requests.Session | RequestsTor, cache: Path, ) -> epub.EpubHtml: (item, extras) = self.load(base_dir) for extra in extras: book.add_item(extra) if not fetch_images: item.content = IMAGE_REMOVE_REGEX.sub("", item.content) else: for image in self.replace_images(item, client, cache): # type: ignore book.add_item(image) book.add_item(item) return item def parse_args(): parser = ArgumentParser() parser.add_argument( "--input-dir", "-i", dest="input", type=Path, help="Dumped book directory", required=True, ) parser.add_argument( "--output-dir", "-o", dest="output", type=Path, default="output", help="Where to put output EPUB files", ) parser.add_argument( "--volume", type=int, required=False, ) parser.add_argument( "--cover", "-c", type=Path, required=False, help="Path to cover image", ) parser.add_argument( "--fetch-images", action="store_true", ) parser.add_argument( "--cache", type=Path, default="cache", help="Images cache directory", ) parser.add_argument( "--tor-ports", type=lambda x: tuple(int(p) for p in x.split(",") if p), default="", ) parser.add_argument("--tor-controller-port", type=int, default=9051) parser.add_argument("--tor-password", type=str, default=None) return parser.parse_args() def main(): args = parse_args() cover: bytes | None = None if args.cover is not None: cover = args.cover.read_bytes() info: BookMedia = BookMedia.model_validate( json.load((args.input / "info.json").open())["media"] ) chapters: list[Chapter] = sorted( TypeAdapter(list[Chapter]).validate_python( json.load((args.input / "chapters.json").open()) ), key=lambda c: float(c.number), ) if args.volume is not None: chapters = [c for c in chapters if c.volume == args.volume] book = epub.EpubBook() for author in info.authors: book.add_author(author.name) book.set_language("ru") book.set_title(info.rusName) book.add_metadata(namespace="DC", name="description", value=info.summary) book.spine = ["translators"] if cover: book.spine.insert(0, "cover") book.set_cover(args.cover.name, cover) if args.tor_ports: print(f"Using tor ports: {args.tor_ports}") client = RequestsTor( tor_ports=args.tor_ports, tor_cport=args.tor_controller_port, password=args.tor_password, ) else: print("Using default client") client = requests.Session() translators_page = epub.EpubHtml( file_name="translators.xhtml", title="Translators", lang="ru", ) book.add_item(translators_page) book.spine.append(translators_page) # type: ignore css = epub.EpubItem( uid="doc_style", file_name="styles/default.css", media_type="text/css", content=Path("./style.css").read_text(), ) css_ref = epub.EpubItem( uid="doc_style", file_name="../styles/default.css", media_type="text/css", content="", ) book.add_item(css) translators = set() for chapter in chapters: item = chapter.add_to_book( book, args.input, args.fetch_images, client, args.cache, ) item.add_item(css_ref) book.spine.append(item) # type: ignore book.toc.append(item) translators.update(chapter.get_translators(args.input)) translators_content = '

Переводчики:

\n\n" translators_page.content = translators_content book.add_item(epub.EpubNcx()) suffix = f"-{args.volume}" if args.volume else "" output_path: Path = args.output / f"{info.rusName}{suffix}.epub" output_path.parent.mkdir(parents=True, exist_ok=True) epub.write_epub(str(output_path), book) if __name__ == "__main__": main()