Compare commits
1 Commits
master
...
51108bb518
| Author | SHA1 | Date | |
|---|---|---|---|
|
51108bb518
|
@@ -1,10 +1,10 @@
|
||||
from pathlib import Path
|
||||
|
||||
import tiktoken
|
||||
import torch
|
||||
from llmfs.gpt import DummyGPT, GPTConfig, TransformerBlock
|
||||
from llmfs.tokenizers import BPETokenizer, Tokenizer
|
||||
|
||||
from llmfs.attn import MultiHeadAttention
|
||||
from llmfs.datasets.v1 import GPTDataSetV1
|
||||
from llmfs.gpt import GPTConfig
|
||||
from llmfs.tokenizers import BPETokenizer
|
||||
|
||||
DATA_DIR = Path(__file__).parent.parent / "data"
|
||||
|
||||
@@ -20,63 +20,23 @@ GPT_CONFIG_124M = GPTConfig(
|
||||
)
|
||||
|
||||
|
||||
def generate_text_simple(
|
||||
model: DummyGPT, idx: torch.Tensor, max_new_tokens: int, context_size: int
|
||||
) -> torch.Tensor:
|
||||
for _ in range(max_new_tokens):
|
||||
idx_cond = idx[:, -context_size:]
|
||||
with torch.no_grad():
|
||||
logits: torch.Tensor = model(idx_cond)
|
||||
logits = logits[:, -1, :]
|
||||
probs = logits.softmax(dim=-1)
|
||||
idx_next = probs.argmax(dim=-1, keepdim=True)
|
||||
idx = torch.cat((idx, idx_next), dim=1)
|
||||
return idx
|
||||
|
||||
|
||||
def txt_to_tokens(tokenizer: Tokenizer, text: str) -> torch.Tensor:
|
||||
encoded = tokenizer.encode(text, allowed_special={"<|endoftext|>"})
|
||||
return torch.tensor(encoded).unsqueeze(0)
|
||||
|
||||
|
||||
def tokens_to_txt(tokenizer: Tokenizer, tokens: torch.Tensor) -> str:
|
||||
return tokenizer.decode(tokens.squeeze(0).tolist())
|
||||
|
||||
|
||||
def process_text(text: str):
|
||||
print("Buiding tokenizer")
|
||||
# tokenizer = BPETokenizer.build(text)
|
||||
tokenizer = tiktoken.encoding_for_model("gpt2")
|
||||
tokenizer = BPETokenizer.build(text)
|
||||
vocab_size = tokenizer.max_token_value + 1
|
||||
print(f"Tokenizer is ready. Vocab size: {vocab_size}")
|
||||
|
||||
cfg = GPTConfig(
|
||||
vocab_size=vocab_size,
|
||||
context_length=256,
|
||||
embedding_dim=768,
|
||||
n_heads=12,
|
||||
n_layers=12,
|
||||
dropout=0.1,
|
||||
qkv_bias=False,
|
||||
max_len = 4
|
||||
ctx_len = max_len
|
||||
output_dim = 256
|
||||
token_embedding_layer = torch.nn.Embedding(vocab_size, output_dim)
|
||||
pos_embedding_layer = torch.nn.Embedding(ctx_len, output_dim)
|
||||
pos_embeddings = pos_embedding_layer(torch.arange(ctx_len))
|
||||
dataset = GPTDataSetV1.data_loader(
|
||||
text,
|
||||
tokenizer,
|
||||
batch_size=8,
|
||||
max_len=4,
|
||||
stride=1,
|
||||
shuffle=False,
|
||||
)
|
||||
gpt = DummyGPT(cfg)
|
||||
gpt.eval()
|
||||
text = "Every effort moves you"
|
||||
encoded = txt_to_tokens(tokenizer, text)
|
||||
out = generate_text_simple(gpt, encoded, 6, cfg.context_length)
|
||||
decoded = tokens_to_txt(tokenizer, out)
|
||||
print(decoded)
|
||||
# logits = gpt(batch)
|
||||
# print(logits)
|
||||
# print(logits.shape)
|
||||
# dataset = GPTDataSetV1.data_loader(
|
||||
# text,
|
||||
# tokenizer,
|
||||
# batch_size=8,
|
||||
# max_len=4,
|
||||
# stride=1,
|
||||
# shuffle=False,
|
||||
# )
|
||||
# for inps, targs in iter(dataset):
|
||||
# embeds = token_embedding_layer(inps)
|
||||
# print(embeds.shape)
|
||||
@@ -86,9 +46,32 @@ def process_text(text: str):
|
||||
# tokenizer = BPETokenizer.build(text)
|
||||
|
||||
|
||||
def attn_test():
|
||||
inps = torch.Tensor(
|
||||
[
|
||||
[0.43, 0.15, 0.89],
|
||||
[0.55, 0.87, 0.66],
|
||||
[0.57, 0.85, 0.64],
|
||||
[0.22, 0.58, 0.43],
|
||||
[0.77, 0.25, 0.10],
|
||||
[0.05, 0.80, 0.55],
|
||||
]
|
||||
)
|
||||
batch = torch.stack((inps, inps), dim=0)
|
||||
attn = MultiHeadAttention(
|
||||
inps.shape[1],
|
||||
8,
|
||||
inps.shape[0],
|
||||
dropout=True,
|
||||
num_heads=2,
|
||||
)
|
||||
print(attn(batch))
|
||||
|
||||
|
||||
def main():
|
||||
raw_text = (DATA_DIR / "the-verdict.txt").read_text()
|
||||
process_text(raw_text)
|
||||
# process_text(raw_text)
|
||||
attn_test()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
108
llmfs/gpt.py
108
llmfs/gpt.py
@@ -2,8 +2,6 @@ from dataclasses import dataclass
|
||||
|
||||
import torch
|
||||
|
||||
from llmfs.attn import MultiHeadAttention
|
||||
|
||||
|
||||
@dataclass
|
||||
class GPTConfig:
|
||||
@@ -16,91 +14,8 @@ class GPTConfig:
|
||||
qkv_bias: bool
|
||||
|
||||
|
||||
class DummyTransformerBlock(torch.nn.Module):
|
||||
class DummyGPT:
|
||||
def __init__(self, config: GPTConfig):
|
||||
super().__init__()
|
||||
|
||||
def forward(self, x: torch.Tensor) -> torch.Tensor:
|
||||
return x
|
||||
|
||||
|
||||
class GELU(torch.nn.Module):
|
||||
def forward(self, x: torch.Tensor) -> torch.Tensor:
|
||||
return (
|
||||
0.5
|
||||
* x
|
||||
* (
|
||||
1
|
||||
+ torch.tanh(
|
||||
torch.sqrt(torch.tensor(2.0 / torch.pi))
|
||||
* (x + 0.44715 * torch.pow(x, 3))
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class FeedForward(torch.nn.Module):
|
||||
def __init__(self, cfg: GPTConfig) -> None:
|
||||
super().__init__()
|
||||
self.layers = torch.nn.Sequential(
|
||||
torch.nn.Linear(cfg.embedding_dim, 4 * cfg.embedding_dim),
|
||||
GELU(),
|
||||
torch.nn.Linear(cfg.embedding_dim * 4, cfg.embedding_dim),
|
||||
)
|
||||
|
||||
def forward(self, x: torch.Tensor) -> torch.Tensor:
|
||||
return self.layers(x)
|
||||
|
||||
|
||||
class TransformerBlock(torch.nn.Module):
|
||||
def __init__(self, cfg: GPTConfig) -> None:
|
||||
super().__init__()
|
||||
self.att = MultiHeadAttention(
|
||||
cfg.embedding_dim,
|
||||
cfg.embedding_dim,
|
||||
cfg.context_length,
|
||||
cfg.dropout,
|
||||
cfg.qkv_bias,
|
||||
)
|
||||
self.ff = FeedForward(cfg)
|
||||
self.norm1 = NormLayer(cfg.embedding_dim)
|
||||
self.norm2 = NormLayer(cfg.embedding_dim)
|
||||
self.dropout = torch.nn.Dropout(cfg.dropout)
|
||||
|
||||
def forward(self, x: torch.Tensor) -> torch.Tensor:
|
||||
shortcut = x
|
||||
x = self.norm1(x)
|
||||
x = self.att(x)
|
||||
x = self.dropout(x)
|
||||
x = x + shortcut
|
||||
|
||||
shortcut = x
|
||||
x = self.norm2(x)
|
||||
x = self.ff(x)
|
||||
x = self.dropout(x)
|
||||
x = x + shortcut
|
||||
return x
|
||||
|
||||
|
||||
class NormLayer(torch.nn.Module):
|
||||
def __init__(self, dim: int, eps: float = 1e-5):
|
||||
super().__init__()
|
||||
self.dim = dim
|
||||
self.eps = eps
|
||||
self.scale = torch.nn.Parameter(torch.ones(dim))
|
||||
self.shift = torch.nn.Parameter(torch.zeros(dim))
|
||||
|
||||
def forward(self, x: torch.Tensor) -> torch.Tensor:
|
||||
mean = x.mean(-1, keepdim=True)
|
||||
var = x.var(-1, keepdim=True, unbiased=True)
|
||||
# Makes mean = 0 and variance = 1
|
||||
norm_x = (x - mean) / torch.sqrt(var + self.eps)
|
||||
return self.scale * norm_x + self.shift
|
||||
|
||||
|
||||
class DummyGPT(torch.nn.Module):
|
||||
def __init__(self, config: GPTConfig):
|
||||
super().__init__()
|
||||
self.tok_embedding = torch.nn.Embedding(
|
||||
config.vocab_size,
|
||||
config.embedding_dim,
|
||||
@@ -109,23 +24,4 @@ class DummyGPT(torch.nn.Module):
|
||||
config.context_length,
|
||||
config.embedding_dim,
|
||||
)
|
||||
self.drop_emb = torch.nn.Dropout(config.dropout)
|
||||
self.trf_blocks = torch.nn.Sequential(
|
||||
*[TransformerBlock(config) for _ in range(config.n_layers)]
|
||||
)
|
||||
self.final_norm = NormLayer(config.embedding_dim)
|
||||
self.out_head = torch.nn.Linear(
|
||||
config.embedding_dim,
|
||||
config.vocab_size,
|
||||
bias=False,
|
||||
)
|
||||
|
||||
def forward(self, x: torch.Tensor) -> torch.Tensor:
|
||||
_, seq_len = x.shape
|
||||
res = self.tok_embedding(x) + self.pos_embedding(
|
||||
torch.arange(seq_len, device=x.device)
|
||||
)
|
||||
res = self.drop_emb(res)
|
||||
res = self.trf_blocks(res)
|
||||
res = self.final_norm(res)
|
||||
return self.out_head(res)
|
||||
self.dropout = torch.nn.Dropout(config.dropout)
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
from .stoopid import StoopidTokenizer
|
||||
from .bpe import BPETokenizer
|
||||
from typing import AbstractSet, Iterable, Protocol
|
||||
from typing import Protocol
|
||||
|
||||
__all__ = ["BPETokenizer", "StoopidTokenizer", "Tokenizer"]
|
||||
|
||||
|
||||
class Tokenizer(Protocol):
|
||||
def encode(
|
||||
self, text: str, allowed_special: AbstractSet[str] = set()
|
||||
) -> list[int]: ...
|
||||
def encode(self, text: str) -> list[int]: ...
|
||||
def decode(self, tokens: list[int]) -> str: ...
|
||||
@property
|
||||
def max_token_value(self) -> int: ...
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
from collections import Counter
|
||||
from collections.abc import Iterable
|
||||
import re
|
||||
from typing import AbstractSet, Self
|
||||
from typing import Self
|
||||
|
||||
|
||||
class BPETokenizer:
|
||||
@@ -9,11 +8,9 @@ class BPETokenizer:
|
||||
UNKNOWN_TOKEN: str = "<|unknowntoken|>"
|
||||
END_OF_TEXT: str = "<|endoftext|>"
|
||||
|
||||
def __init__(self, vocabulary: dict[str, int], specials: dict[str, int]) -> None:
|
||||
def __init__(self, vocabulary: dict[str, int]) -> None:
|
||||
self.forward: dict[str, int] = vocabulary
|
||||
self.reverse: dict[int, str] = {idx: token for token, idx in vocabulary.items()}
|
||||
self.specials = specials
|
||||
self.special_values = set(specials.values())
|
||||
self.unk_token: int = self.forward[self.UNKNOWN_TOKEN]
|
||||
|
||||
@property
|
||||
@@ -21,16 +18,11 @@ class BPETokenizer:
|
||||
return len(self.forward)
|
||||
|
||||
@classmethod
|
||||
def build(
|
||||
cls,
|
||||
text: str,
|
||||
target_vocab_size: int = -1,
|
||||
specials: set[str] = {END_OF_TEXT, UNKNOWN_TOKEN},
|
||||
) -> Self:
|
||||
def build(cls, text: str, target_vocab_size: int = -1) -> Self:
|
||||
preprocessed = list(
|
||||
filter(bool, map(lambda x: x.lower().strip(), cls.SPLIT_PAT.split(text)))
|
||||
)
|
||||
pre_vocab: set[str] = specials
|
||||
pre_vocab: set[str] = set()
|
||||
for word in preprocessed:
|
||||
pre_vocab |= set(word)
|
||||
vocab: list[str] = sorted(pre_vocab)
|
||||
@@ -71,27 +63,18 @@ class BPETokenizer:
|
||||
|
||||
vocab.extend([" ", cls.UNKNOWN_TOKEN, cls.END_OF_TEXT])
|
||||
vocab_dict = {token: i for i, token in enumerate(vocab)}
|
||||
specials_dict = {special: vocab_dict[special] for special in specials}
|
||||
return cls(vocab_dict, specials_dict)
|
||||
return cls(vocab_dict)
|
||||
|
||||
def _encode_word(
|
||||
self,
|
||||
word: str,
|
||||
allowed_specials: set[int],
|
||||
) -> list[int]:
|
||||
encoded: list[int] = []
|
||||
def _encode_word(self, word: str) -> list[int]:
|
||||
parts = list(word.strip())
|
||||
start_part_idx = 0
|
||||
encoded: list[int] = []
|
||||
while start_part_idx < len(parts):
|
||||
found = False
|
||||
for i in range(len(parts), start_part_idx, -1):
|
||||
token = self.forward.get("".join(parts[start_part_idx:i]))
|
||||
if token is not None:
|
||||
found = True
|
||||
if token in self.special_values and token not in allowed_specials:
|
||||
raise ValueError(
|
||||
f"The token '{self.reverse[token]}' is not allowed."
|
||||
)
|
||||
encoded.append(token)
|
||||
start_part_idx = i
|
||||
break
|
||||
@@ -103,11 +86,7 @@ class BPETokenizer:
|
||||
start_part_idx += 1
|
||||
return encoded
|
||||
|
||||
def encode(
|
||||
self,
|
||||
text: str | list[str],
|
||||
allowed_special: AbstractSet[str] = set(),
|
||||
) -> list[int]:
|
||||
def encode(self, text: str | list[str]) -> list[int]:
|
||||
if isinstance(text, list):
|
||||
text = f" {self.END_OF_TEXT} ".join(text)
|
||||
|
||||
@@ -115,15 +94,10 @@ class BPETokenizer:
|
||||
filter(bool, map(lambda x: x.lower().strip(), self.SPLIT_PAT.split(text)))
|
||||
)
|
||||
tokens: list[int] = []
|
||||
allowed_specials_tokens = {
|
||||
self.forward[token] for token in allowed_special or []
|
||||
}
|
||||
for word in preprocessed:
|
||||
if tokens:
|
||||
tokens.append(self.forward[" "])
|
||||
tokens.extend(
|
||||
self._encode_word(word, allowed_specials=allowed_specials_tokens)
|
||||
)
|
||||
tokens.extend(self._encode_word(word))
|
||||
return tokens
|
||||
|
||||
def decode(self, tokens: list[int]) -> str:
|
||||
|
||||
Reference in New Issue
Block a user