Compare commits

..

1 Commits

Author SHA1 Message Date
e51df51b46 Added DummyGPT. 2026-05-01 12:44:17 +02:00
4 changed files with 55 additions and 204 deletions

View File

@@ -1,10 +1,10 @@
from pathlib import Path from pathlib import Path
import tiktoken
import torch import torch
from llmfs.gpt import DummyGPT, GPTConfig, TransformerBlock from llmfs.attn import MultiHeadAttention
from llmfs.tokenizers import BPETokenizer, Tokenizer from llmfs.datasets.v1 import GPTDataSetV1
from llmfs.gpt import GPTConfig
from llmfs.tokenizers import BPETokenizer
DATA_DIR = Path(__file__).parent.parent / "data" DATA_DIR = Path(__file__).parent.parent / "data"
@@ -20,63 +20,23 @@ GPT_CONFIG_124M = GPTConfig(
) )
def generate_text_simple(
model: DummyGPT, idx: torch.Tensor, max_new_tokens: int, context_size: int
) -> torch.Tensor:
for _ in range(max_new_tokens):
idx_cond = idx[:, -context_size:]
with torch.no_grad():
logits: torch.Tensor = model(idx_cond)
logits = logits[:, -1, :]
probs = logits.softmax(dim=-1)
idx_next = probs.argmax(dim=-1, keepdim=True)
idx = torch.cat((idx, idx_next), dim=1)
return idx
def txt_to_tokens(tokenizer: Tokenizer, text: str) -> torch.Tensor:
encoded = tokenizer.encode(text, allowed_special={"<|endoftext|>"})
return torch.tensor(encoded).unsqueeze(0)
def tokens_to_txt(tokenizer: Tokenizer, tokens: torch.Tensor) -> str:
return tokenizer.decode(tokens.squeeze(0).tolist())
def process_text(text: str): def process_text(text: str):
print("Buiding tokenizer") tokenizer = BPETokenizer.build(text)
# tokenizer = BPETokenizer.build(text)
tokenizer = tiktoken.encoding_for_model("gpt2")
vocab_size = tokenizer.max_token_value + 1 vocab_size = tokenizer.max_token_value + 1
print(f"Tokenizer is ready. Vocab size: {vocab_size}") max_len = 4
ctx_len = max_len
cfg = GPTConfig( output_dim = 256
vocab_size=vocab_size, token_embedding_layer = torch.nn.Embedding(vocab_size, output_dim)
context_length=256, pos_embedding_layer = torch.nn.Embedding(ctx_len, output_dim)
embedding_dim=768, pos_embeddings = pos_embedding_layer(torch.arange(ctx_len))
n_heads=12, dataset = GPTDataSetV1.data_loader(
n_layers=12, text,
dropout=0.1, tokenizer,
qkv_bias=False, batch_size=8,
max_len=4,
stride=1,
shuffle=False,
) )
gpt = DummyGPT(cfg)
gpt.eval()
text = "Every effort moves you"
encoded = txt_to_tokens(tokenizer, text)
out = generate_text_simple(gpt, encoded, 6, cfg.context_length)
decoded = tokens_to_txt(tokenizer, out)
print(decoded)
# logits = gpt(batch)
# print(logits)
# print(logits.shape)
# dataset = GPTDataSetV1.data_loader(
# text,
# tokenizer,
# batch_size=8,
# max_len=4,
# stride=1,
# shuffle=False,
# )
# for inps, targs in iter(dataset): # for inps, targs in iter(dataset):
# embeds = token_embedding_layer(inps) # embeds = token_embedding_layer(inps)
# print(embeds.shape) # print(embeds.shape)
@@ -86,9 +46,32 @@ def process_text(text: str):
# tokenizer = BPETokenizer.build(text) # tokenizer = BPETokenizer.build(text)
def attn_test():
inps = torch.Tensor(
[
[0.43, 0.15, 0.89],
[0.55, 0.87, 0.66],
[0.57, 0.85, 0.64],
[0.22, 0.58, 0.43],
[0.77, 0.25, 0.10],
[0.05, 0.80, 0.55],
]
)
batch = torch.stack((inps, inps), dim=0)
attn = MultiHeadAttention(
inps.shape[1],
8,
inps.shape[0],
dropout=True,
num_heads=2,
)
print(attn(batch))
def main(): def main():
raw_text = (DATA_DIR / "the-verdict.txt").read_text() raw_text = (DATA_DIR / "the-verdict.txt").read_text()
process_text(raw_text) # process_text(raw_text)
attn_test()
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -2,8 +2,6 @@ from dataclasses import dataclass
import torch import torch
from llmfs.attn import MultiHeadAttention
@dataclass @dataclass
class GPTConfig: class GPTConfig:
@@ -16,91 +14,8 @@ class GPTConfig:
qkv_bias: bool qkv_bias: bool
class DummyTransformerBlock(torch.nn.Module): class DummyGPT:
def __init__(self, config: GPTConfig): def __init__(self, config: GPTConfig):
super().__init__()
def forward(self, x: torch.Tensor) -> torch.Tensor:
return x
class GELU(torch.nn.Module):
def forward(self, x: torch.Tensor) -> torch.Tensor:
return (
0.5
* x
* (
1
+ torch.tanh(
torch.sqrt(torch.tensor(2.0 / torch.pi))
* (x + 0.44715 * torch.pow(x, 3))
)
)
)
class FeedForward(torch.nn.Module):
def __init__(self, cfg: GPTConfig) -> None:
super().__init__()
self.layers = torch.nn.Sequential(
torch.nn.Linear(cfg.embedding_dim, 4 * cfg.embedding_dim),
GELU(),
torch.nn.Linear(cfg.embedding_dim * 4, cfg.embedding_dim),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.layers(x)
class TransformerBlock(torch.nn.Module):
def __init__(self, cfg: GPTConfig) -> None:
super().__init__()
self.att = MultiHeadAttention(
cfg.embedding_dim,
cfg.embedding_dim,
cfg.context_length,
cfg.dropout,
cfg.qkv_bias,
)
self.ff = FeedForward(cfg)
self.norm1 = NormLayer(cfg.embedding_dim)
self.norm2 = NormLayer(cfg.embedding_dim)
self.dropout = torch.nn.Dropout(cfg.dropout)
def forward(self, x: torch.Tensor) -> torch.Tensor:
shortcut = x
x = self.norm1(x)
x = self.att(x)
x = self.dropout(x)
x = x + shortcut
shortcut = x
x = self.norm2(x)
x = self.ff(x)
x = self.dropout(x)
x = x + shortcut
return x
class NormLayer(torch.nn.Module):
def __init__(self, dim: int, eps: float = 1e-5):
super().__init__()
self.dim = dim
self.eps = eps
self.scale = torch.nn.Parameter(torch.ones(dim))
self.shift = torch.nn.Parameter(torch.zeros(dim))
def forward(self, x: torch.Tensor) -> torch.Tensor:
mean = x.mean(-1, keepdim=True)
var = x.var(-1, keepdim=True, unbiased=True)
# Makes mean = 0 and variance = 1
norm_x = (x - mean) / torch.sqrt(var + self.eps)
return self.scale * norm_x + self.shift
class DummyGPT(torch.nn.Module):
def __init__(self, config: GPTConfig):
super().__init__()
self.tok_embedding = torch.nn.Embedding( self.tok_embedding = torch.nn.Embedding(
config.vocab_size, config.vocab_size,
config.embedding_dim, config.embedding_dim,
@@ -109,23 +24,4 @@ class DummyGPT(torch.nn.Module):
config.context_length, config.context_length,
config.embedding_dim, config.embedding_dim,
) )
self.drop_emb = torch.nn.Dropout(config.dropout) self.dropout = torch.nn.Dropout(config.dropout)
self.trf_blocks = torch.nn.Sequential(
*[TransformerBlock(config) for _ in range(config.n_layers)]
)
self.final_norm = NormLayer(config.embedding_dim)
self.out_head = torch.nn.Linear(
config.embedding_dim,
config.vocab_size,
bias=False,
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
_, seq_len = x.shape
res = self.tok_embedding(x) + self.pos_embedding(
torch.arange(seq_len, device=x.device)
)
res = self.drop_emb(res)
res = self.trf_blocks(res)
res = self.final_norm(res)
return self.out_head(res)

View File

@@ -1,14 +1,12 @@
from .stoopid import StoopidTokenizer from .stoopid import StoopidTokenizer
from .bpe import BPETokenizer from .bpe import BPETokenizer
from typing import AbstractSet, Iterable, Protocol from typing import Protocol
__all__ = ["BPETokenizer", "StoopidTokenizer", "Tokenizer"] __all__ = ["BPETokenizer", "StoopidTokenizer", "Tokenizer"]
class Tokenizer(Protocol): class Tokenizer(Protocol):
def encode( def encode(self, text: str) -> list[int]: ...
self, text: str, allowed_special: AbstractSet[str] = set()
) -> list[int]: ...
def decode(self, tokens: list[int]) -> str: ... def decode(self, tokens: list[int]) -> str: ...
@property @property
def max_token_value(self) -> int: ... def max_token_value(self) -> int: ...

View File

@@ -1,7 +1,6 @@
from collections import Counter from collections import Counter
from collections.abc import Iterable
import re import re
from typing import AbstractSet, Self from typing import Self
class BPETokenizer: class BPETokenizer:
@@ -9,11 +8,9 @@ class BPETokenizer:
UNKNOWN_TOKEN: str = "<|unknowntoken|>" UNKNOWN_TOKEN: str = "<|unknowntoken|>"
END_OF_TEXT: str = "<|endoftext|>" END_OF_TEXT: str = "<|endoftext|>"
def __init__(self, vocabulary: dict[str, int], specials: dict[str, int]) -> None: def __init__(self, vocabulary: dict[str, int]) -> None:
self.forward: dict[str, int] = vocabulary self.forward: dict[str, int] = vocabulary
self.reverse: dict[int, str] = {idx: token for token, idx in vocabulary.items()} self.reverse: dict[int, str] = {idx: token for token, idx in vocabulary.items()}
self.specials = specials
self.special_values = set(specials.values())
self.unk_token: int = self.forward[self.UNKNOWN_TOKEN] self.unk_token: int = self.forward[self.UNKNOWN_TOKEN]
@property @property
@@ -21,16 +18,11 @@ class BPETokenizer:
return len(self.forward) return len(self.forward)
@classmethod @classmethod
def build( def build(cls, text: str, target_vocab_size: int = -1) -> Self:
cls,
text: str,
target_vocab_size: int = -1,
specials: set[str] = {END_OF_TEXT, UNKNOWN_TOKEN},
) -> Self:
preprocessed = list( preprocessed = list(
filter(bool, map(lambda x: x.lower().strip(), cls.SPLIT_PAT.split(text))) filter(bool, map(lambda x: x.lower().strip(), cls.SPLIT_PAT.split(text)))
) )
pre_vocab: set[str] = specials pre_vocab: set[str] = set()
for word in preprocessed: for word in preprocessed:
pre_vocab |= set(word) pre_vocab |= set(word)
vocab: list[str] = sorted(pre_vocab) vocab: list[str] = sorted(pre_vocab)
@@ -71,27 +63,18 @@ class BPETokenizer:
vocab.extend([" ", cls.UNKNOWN_TOKEN, cls.END_OF_TEXT]) vocab.extend([" ", cls.UNKNOWN_TOKEN, cls.END_OF_TEXT])
vocab_dict = {token: i for i, token in enumerate(vocab)} vocab_dict = {token: i for i, token in enumerate(vocab)}
specials_dict = {special: vocab_dict[special] for special in specials} return cls(vocab_dict)
return cls(vocab_dict, specials_dict)
def _encode_word( def _encode_word(self, word: str) -> list[int]:
self,
word: str,
allowed_specials: set[int],
) -> list[int]:
encoded: list[int] = []
parts = list(word.strip()) parts = list(word.strip())
start_part_idx = 0 start_part_idx = 0
encoded: list[int] = []
while start_part_idx < len(parts): while start_part_idx < len(parts):
found = False found = False
for i in range(len(parts), start_part_idx, -1): for i in range(len(parts), start_part_idx, -1):
token = self.forward.get("".join(parts[start_part_idx:i])) token = self.forward.get("".join(parts[start_part_idx:i]))
if token is not None: if token is not None:
found = True found = True
if token in self.special_values and token not in allowed_specials:
raise ValueError(
f"The token '{self.reverse[token]}' is not allowed."
)
encoded.append(token) encoded.append(token)
start_part_idx = i start_part_idx = i
break break
@@ -103,11 +86,7 @@ class BPETokenizer:
start_part_idx += 1 start_part_idx += 1
return encoded return encoded
def encode( def encode(self, text: str | list[str]) -> list[int]:
self,
text: str | list[str],
allowed_special: AbstractSet[str] = set(),
) -> list[int]:
if isinstance(text, list): if isinstance(text, list):
text = f" {self.END_OF_TEXT} ".join(text) text = f" {self.END_OF_TEXT} ".join(text)
@@ -115,15 +94,10 @@ class BPETokenizer:
filter(bool, map(lambda x: x.lower().strip(), self.SPLIT_PAT.split(text))) filter(bool, map(lambda x: x.lower().strip(), self.SPLIT_PAT.split(text)))
) )
tokens: list[int] = [] tokens: list[int] = []
allowed_specials_tokens = {
self.forward[token] for token in allowed_special or []
}
for word in preprocessed: for word in preprocessed:
if tokens: if tokens:
tokens.append(self.forward[" "]) tokens.append(self.forward[" "])
tokens.extend( tokens.extend(self._encode_word(word))
self._encode_word(word, allowed_specials=allowed_specials_tokens)
)
return tokens return tokens
def decode(self, tokens: list[int]) -> str: def decode(self, tokens: list[int]) -> str: