137 lines
3.6 KiB
Python
137 lines
3.6 KiB
Python
import asyncio
|
|
import secrets
|
|
import logging
|
|
from pathlib import Path
|
|
import sys
|
|
import tempfile
|
|
|
|
from aiogram import Bot, Dispatcher, types
|
|
import shutil
|
|
import os
|
|
import dotenv
|
|
|
|
dotenv.load_dotenv()
|
|
|
|
pdftex_bin = shutil.which("pdflatex")
|
|
magick_bin = shutil.which("magick")
|
|
if pdftex_bin is None:
|
|
raise ValueError("PDFTex is not installed")
|
|
|
|
if magick_bin is None:
|
|
raise ValueError("ImageMagick is not installed")
|
|
|
|
dp = Dispatcher()
|
|
bot = Bot(token=os.environ["BOT_TOKEN"])
|
|
|
|
|
|
PREAMBULA = """\
|
|
\\documentclass[preview]{standalone}
|
|
\\usepackage[utf8]{inputenc}
|
|
\\usepackage[russian]{babel}
|
|
\\begin{document}
|
|
%%CONTENT%%
|
|
\\end{document}
|
|
"""
|
|
|
|
|
|
def has_block(text: str, start: str, end: str | None = None) -> bool:
|
|
start_pos = text.find(start)
|
|
if start_pos == -1:
|
|
return False
|
|
if end is None:
|
|
end = start
|
|
end_pos = text.find(end, start_pos + 1)
|
|
if end_pos == -1:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
async def send_latex(message: types.Message) -> None:
|
|
if message.text is None:
|
|
return
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
cwd = Path(tmpdir)
|
|
name = secrets.token_urlsafe(10)
|
|
input_file = cwd / f"{name}.tex"
|
|
with input_file.open("w") as istream:
|
|
istream.write(PREAMBULA.replace("%%CONTENT%%", message.text))
|
|
istream.flush()
|
|
|
|
process = await asyncio.subprocess.create_subprocess_exec(
|
|
pdftex_bin, # type: ignore
|
|
"-no-shell-escape",
|
|
"-interaction",
|
|
"batchmode",
|
|
str(input_file),
|
|
cwd=cwd,
|
|
)
|
|
code = await process.wait()
|
|
if code != 0:
|
|
await bot.send_document(
|
|
chat_id=message.chat.id,
|
|
document=types.FSInputFile(cwd / f"{name}.log", filename="error.log"),
|
|
caption="Found error while processing latex.",
|
|
)
|
|
return
|
|
magick_process = await asyncio.subprocess.create_subprocess_exec(
|
|
magick_bin, # type: ignore
|
|
"convert",
|
|
"-density",
|
|
"300",
|
|
f"{name}.pdf",
|
|
"-quality",
|
|
"100",
|
|
f"{name}.jpg",
|
|
cwd=cwd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
)
|
|
magic_code = await magick_process.wait()
|
|
out = await magick_process.stdout.read() # type: ignore
|
|
if magic_code != 0:
|
|
await message.reply(
|
|
text=f"ImageMagic unable to convert output to image: \n{out.decode()}",
|
|
)
|
|
await bot.send_photo(
|
|
photo=types.FSInputFile(cwd / f"{name}.jpg"),
|
|
chat_id=message.chat.id,
|
|
reply_to_message_id=message.message_id,
|
|
)
|
|
|
|
|
|
@dp.message()
|
|
async def message(message: types.Message) -> None:
|
|
if message.text is None:
|
|
return
|
|
if (
|
|
has_block(message.text, "$$")
|
|
or has_block(message.text, "$")
|
|
or has_block(message.text, r"\begin", r"\end")
|
|
):
|
|
logging.info("Found LaTeX. Converting.")
|
|
await send_latex(message)
|
|
|
|
|
|
@dp.update()
|
|
async def update_message(update: types.update.Update):
|
|
if update.message is None:
|
|
return
|
|
if update.message.text is None:
|
|
return
|
|
if (
|
|
has_block(update.message.text, "$$")
|
|
or has_block(update.message.text, "$")
|
|
or has_block(update.message.text, r"\begin", r"\end")
|
|
):
|
|
logging.info("Found LaTeX. Converting.")
|
|
await send_latex(update.message)
|
|
|
|
|
|
async def main():
|
|
await dp.start_polling(bot)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
|
|
asyncio.run(main())
|