Compare commits

..

1 Commits

Author SHA1 Message Date
e51df51b46 Added DummyGPT. 2026-05-01 12:44:17 +02:00
4 changed files with 55 additions and 204 deletions

View File

@@ -1,10 +1,10 @@
from pathlib import Path
import tiktoken
import torch
from llmfs.gpt import DummyGPT, GPTConfig, TransformerBlock
from llmfs.tokenizers import BPETokenizer, Tokenizer
from llmfs.attn import MultiHeadAttention
from llmfs.datasets.v1 import GPTDataSetV1
from llmfs.gpt import GPTConfig
from llmfs.tokenizers import BPETokenizer
DATA_DIR = Path(__file__).parent.parent / "data"
@@ -20,63 +20,23 @@ GPT_CONFIG_124M = GPTConfig(
)
def generate_text_simple(
model: DummyGPT, idx: torch.Tensor, max_new_tokens: int, context_size: int
) -> torch.Tensor:
for _ in range(max_new_tokens):
idx_cond = idx[:, -context_size:]
with torch.no_grad():
logits: torch.Tensor = model(idx_cond)
logits = logits[:, -1, :]
probs = logits.softmax(dim=-1)
idx_next = probs.argmax(dim=-1, keepdim=True)
idx = torch.cat((idx, idx_next), dim=1)
return idx
def txt_to_tokens(tokenizer: Tokenizer, text: str) -> torch.Tensor:
encoded = tokenizer.encode(text, allowed_special={"<|endoftext|>"})
return torch.tensor(encoded).unsqueeze(0)
def tokens_to_txt(tokenizer: Tokenizer, tokens: torch.Tensor) -> str:
return tokenizer.decode(tokens.squeeze(0).tolist())
def process_text(text: str):
print("Buiding tokenizer")
# tokenizer = BPETokenizer.build(text)
tokenizer = tiktoken.encoding_for_model("gpt2")
tokenizer = BPETokenizer.build(text)
vocab_size = tokenizer.max_token_value + 1
print(f"Tokenizer is ready. Vocab size: {vocab_size}")
cfg = GPTConfig(
vocab_size=vocab_size,
context_length=256,
embedding_dim=768,
n_heads=12,
n_layers=12,
dropout=0.1,
qkv_bias=False,
max_len = 4
ctx_len = max_len
output_dim = 256
token_embedding_layer = torch.nn.Embedding(vocab_size, output_dim)
pos_embedding_layer = torch.nn.Embedding(ctx_len, output_dim)
pos_embeddings = pos_embedding_layer(torch.arange(ctx_len))
dataset = GPTDataSetV1.data_loader(
text,
tokenizer,
batch_size=8,
max_len=4,
stride=1,
shuffle=False,
)
gpt = DummyGPT(cfg)
gpt.eval()
text = "Every effort moves you"
encoded = txt_to_tokens(tokenizer, text)
out = generate_text_simple(gpt, encoded, 6, cfg.context_length)
decoded = tokens_to_txt(tokenizer, out)
print(decoded)
# logits = gpt(batch)
# print(logits)
# print(logits.shape)
# dataset = GPTDataSetV1.data_loader(
# text,
# tokenizer,
# batch_size=8,
# max_len=4,
# stride=1,
# shuffle=False,
# )
# for inps, targs in iter(dataset):
# embeds = token_embedding_layer(inps)
# print(embeds.shape)
@@ -86,9 +46,32 @@ def process_text(text: str):
# tokenizer = BPETokenizer.build(text)
def attn_test():
inps = torch.Tensor(
[
[0.43, 0.15, 0.89],
[0.55, 0.87, 0.66],
[0.57, 0.85, 0.64],
[0.22, 0.58, 0.43],
[0.77, 0.25, 0.10],
[0.05, 0.80, 0.55],
]
)
batch = torch.stack((inps, inps), dim=0)
attn = MultiHeadAttention(
inps.shape[1],
8,
inps.shape[0],
dropout=True,
num_heads=2,
)
print(attn(batch))
def main():
raw_text = (DATA_DIR / "the-verdict.txt").read_text()
process_text(raw_text)
# process_text(raw_text)
attn_test()
if __name__ == "__main__":

View File

@@ -2,8 +2,6 @@ from dataclasses import dataclass
import torch
from llmfs.attn import MultiHeadAttention
@dataclass
class GPTConfig:
@@ -16,91 +14,8 @@ class GPTConfig:
qkv_bias: bool
class DummyTransformerBlock(torch.nn.Module):
class DummyGPT:
def __init__(self, config: GPTConfig):
super().__init__()
def forward(self, x: torch.Tensor) -> torch.Tensor:
return x
class GELU(torch.nn.Module):
def forward(self, x: torch.Tensor) -> torch.Tensor:
return (
0.5
* x
* (
1
+ torch.tanh(
torch.sqrt(torch.tensor(2.0 / torch.pi))
* (x + 0.44715 * torch.pow(x, 3))
)
)
)
class FeedForward(torch.nn.Module):
def __init__(self, cfg: GPTConfig) -> None:
super().__init__()
self.layers = torch.nn.Sequential(
torch.nn.Linear(cfg.embedding_dim, 4 * cfg.embedding_dim),
GELU(),
torch.nn.Linear(cfg.embedding_dim * 4, cfg.embedding_dim),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.layers(x)
class TransformerBlock(torch.nn.Module):
def __init__(self, cfg: GPTConfig) -> None:
super().__init__()
self.att = MultiHeadAttention(
cfg.embedding_dim,
cfg.embedding_dim,
cfg.context_length,
cfg.dropout,
cfg.qkv_bias,
)
self.ff = FeedForward(cfg)
self.norm1 = NormLayer(cfg.embedding_dim)
self.norm2 = NormLayer(cfg.embedding_dim)
self.dropout = torch.nn.Dropout(cfg.dropout)
def forward(self, x: torch.Tensor) -> torch.Tensor:
shortcut = x
x = self.norm1(x)
x = self.att(x)
x = self.dropout(x)
x = x + shortcut
shortcut = x
x = self.norm2(x)
x = self.ff(x)
x = self.dropout(x)
x = x + shortcut
return x
class NormLayer(torch.nn.Module):
def __init__(self, dim: int, eps: float = 1e-5):
super().__init__()
self.dim = dim
self.eps = eps
self.scale = torch.nn.Parameter(torch.ones(dim))
self.shift = torch.nn.Parameter(torch.zeros(dim))
def forward(self, x: torch.Tensor) -> torch.Tensor:
mean = x.mean(-1, keepdim=True)
var = x.var(-1, keepdim=True, unbiased=True)
# Makes mean = 0 and variance = 1
norm_x = (x - mean) / torch.sqrt(var + self.eps)
return self.scale * norm_x + self.shift
class DummyGPT(torch.nn.Module):
def __init__(self, config: GPTConfig):
super().__init__()
self.tok_embedding = torch.nn.Embedding(
config.vocab_size,
config.embedding_dim,
@@ -109,23 +24,4 @@ class DummyGPT(torch.nn.Module):
config.context_length,
config.embedding_dim,
)
self.drop_emb = torch.nn.Dropout(config.dropout)
self.trf_blocks = torch.nn.Sequential(
*[TransformerBlock(config) for _ in range(config.n_layers)]
)
self.final_norm = NormLayer(config.embedding_dim)
self.out_head = torch.nn.Linear(
config.embedding_dim,
config.vocab_size,
bias=False,
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
_, seq_len = x.shape
res = self.tok_embedding(x) + self.pos_embedding(
torch.arange(seq_len, device=x.device)
)
res = self.drop_emb(res)
res = self.trf_blocks(res)
res = self.final_norm(res)
return self.out_head(res)
self.dropout = torch.nn.Dropout(config.dropout)

View File

@@ -1,14 +1,12 @@
from .stoopid import StoopidTokenizer
from .bpe import BPETokenizer
from typing import AbstractSet, Iterable, Protocol
from typing import Protocol
__all__ = ["BPETokenizer", "StoopidTokenizer", "Tokenizer"]
class Tokenizer(Protocol):
def encode(
self, text: str, allowed_special: AbstractSet[str] = set()
) -> list[int]: ...
def encode(self, text: str) -> list[int]: ...
def decode(self, tokens: list[int]) -> str: ...
@property
def max_token_value(self) -> int: ...

View File

@@ -1,7 +1,6 @@
from collections import Counter
from collections.abc import Iterable
import re
from typing import AbstractSet, Self
from typing import Self
class BPETokenizer:
@@ -9,11 +8,9 @@ class BPETokenizer:
UNKNOWN_TOKEN: str = "<|unknowntoken|>"
END_OF_TEXT: str = "<|endoftext|>"
def __init__(self, vocabulary: dict[str, int], specials: dict[str, int]) -> None:
def __init__(self, vocabulary: dict[str, int]) -> None:
self.forward: dict[str, int] = vocabulary
self.reverse: dict[int, str] = {idx: token for token, idx in vocabulary.items()}
self.specials = specials
self.special_values = set(specials.values())
self.unk_token: int = self.forward[self.UNKNOWN_TOKEN]
@property
@@ -21,16 +18,11 @@ class BPETokenizer:
return len(self.forward)
@classmethod
def build(
cls,
text: str,
target_vocab_size: int = -1,
specials: set[str] = {END_OF_TEXT, UNKNOWN_TOKEN},
) -> Self:
def build(cls, text: str, target_vocab_size: int = -1) -> Self:
preprocessed = list(
filter(bool, map(lambda x: x.lower().strip(), cls.SPLIT_PAT.split(text)))
)
pre_vocab: set[str] = specials
pre_vocab: set[str] = set()
for word in preprocessed:
pre_vocab |= set(word)
vocab: list[str] = sorted(pre_vocab)
@@ -71,27 +63,18 @@ class BPETokenizer:
vocab.extend([" ", cls.UNKNOWN_TOKEN, cls.END_OF_TEXT])
vocab_dict = {token: i for i, token in enumerate(vocab)}
specials_dict = {special: vocab_dict[special] for special in specials}
return cls(vocab_dict, specials_dict)
return cls(vocab_dict)
def _encode_word(
self,
word: str,
allowed_specials: set[int],
) -> list[int]:
encoded: list[int] = []
def _encode_word(self, word: str) -> list[int]:
parts = list(word.strip())
start_part_idx = 0
encoded: list[int] = []
while start_part_idx < len(parts):
found = False
for i in range(len(parts), start_part_idx, -1):
token = self.forward.get("".join(parts[start_part_idx:i]))
if token is not None:
found = True
if token in self.special_values and token not in allowed_specials:
raise ValueError(
f"The token '{self.reverse[token]}' is not allowed."
)
encoded.append(token)
start_part_idx = i
break
@@ -103,11 +86,7 @@ class BPETokenizer:
start_part_idx += 1
return encoded
def encode(
self,
text: str | list[str],
allowed_special: AbstractSet[str] = set(),
) -> list[int]:
def encode(self, text: str | list[str]) -> list[int]:
if isinstance(text, list):
text = f" {self.END_OF_TEXT} ".join(text)
@@ -115,15 +94,10 @@ class BPETokenizer:
filter(bool, map(lambda x: x.lower().strip(), self.SPLIT_PAT.split(text)))
)
tokens: list[int] = []
allowed_specials_tokens = {
self.forward[token] for token in allowed_special or []
}
for word in preprocessed:
if tokens:
tokens.append(self.forward[" "])
tokens.extend(
self._encode_word(word, allowed_specials=allowed_specials_tokens)
)
tokens.extend(self._encode_word(word))
return tokens
def decode(self, tokens: list[int]) -> str: