Compare commits
1 Commits
master
...
e51df51b46
| Author | SHA1 | Date | |
|---|---|---|---|
|
e51df51b46
|
@@ -1,10 +1,10 @@
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import tiktoken
|
|
||||||
import torch
|
import torch
|
||||||
from llmfs.gpt import DummyGPT, GPTConfig, TransformerBlock
|
from llmfs.attn import MultiHeadAttention
|
||||||
from llmfs.tokenizers import BPETokenizer, Tokenizer
|
from llmfs.datasets.v1 import GPTDataSetV1
|
||||||
|
from llmfs.gpt import GPTConfig
|
||||||
|
from llmfs.tokenizers import BPETokenizer
|
||||||
|
|
||||||
DATA_DIR = Path(__file__).parent.parent / "data"
|
DATA_DIR = Path(__file__).parent.parent / "data"
|
||||||
|
|
||||||
@@ -20,63 +20,23 @@ GPT_CONFIG_124M = GPTConfig(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def generate_text_simple(
|
|
||||||
model: DummyGPT, idx: torch.Tensor, max_new_tokens: int, context_size: int
|
|
||||||
) -> torch.Tensor:
|
|
||||||
for _ in range(max_new_tokens):
|
|
||||||
idx_cond = idx[:, -context_size:]
|
|
||||||
with torch.no_grad():
|
|
||||||
logits: torch.Tensor = model(idx_cond)
|
|
||||||
logits = logits[:, -1, :]
|
|
||||||
probs = logits.softmax(dim=-1)
|
|
||||||
idx_next = probs.argmax(dim=-1, keepdim=True)
|
|
||||||
idx = torch.cat((idx, idx_next), dim=1)
|
|
||||||
return idx
|
|
||||||
|
|
||||||
|
|
||||||
def txt_to_tokens(tokenizer: Tokenizer, text: str) -> torch.Tensor:
|
|
||||||
encoded = tokenizer.encode(text, allowed_special={"<|endoftext|>"})
|
|
||||||
return torch.tensor(encoded).unsqueeze(0)
|
|
||||||
|
|
||||||
|
|
||||||
def tokens_to_txt(tokenizer: Tokenizer, tokens: torch.Tensor) -> str:
|
|
||||||
return tokenizer.decode(tokens.squeeze(0).tolist())
|
|
||||||
|
|
||||||
|
|
||||||
def process_text(text: str):
|
def process_text(text: str):
|
||||||
print("Buiding tokenizer")
|
tokenizer = BPETokenizer.build(text)
|
||||||
# tokenizer = BPETokenizer.build(text)
|
|
||||||
tokenizer = tiktoken.encoding_for_model("gpt2")
|
|
||||||
vocab_size = tokenizer.max_token_value + 1
|
vocab_size = tokenizer.max_token_value + 1
|
||||||
print(f"Tokenizer is ready. Vocab size: {vocab_size}")
|
max_len = 4
|
||||||
|
ctx_len = max_len
|
||||||
cfg = GPTConfig(
|
output_dim = 256
|
||||||
vocab_size=vocab_size,
|
token_embedding_layer = torch.nn.Embedding(vocab_size, output_dim)
|
||||||
context_length=256,
|
pos_embedding_layer = torch.nn.Embedding(ctx_len, output_dim)
|
||||||
embedding_dim=768,
|
pos_embeddings = pos_embedding_layer(torch.arange(ctx_len))
|
||||||
n_heads=12,
|
dataset = GPTDataSetV1.data_loader(
|
||||||
n_layers=12,
|
text,
|
||||||
dropout=0.1,
|
tokenizer,
|
||||||
qkv_bias=False,
|
batch_size=8,
|
||||||
|
max_len=4,
|
||||||
|
stride=1,
|
||||||
|
shuffle=False,
|
||||||
)
|
)
|
||||||
gpt = DummyGPT(cfg)
|
|
||||||
gpt.eval()
|
|
||||||
text = "Every effort moves you"
|
|
||||||
encoded = txt_to_tokens(tokenizer, text)
|
|
||||||
out = generate_text_simple(gpt, encoded, 6, cfg.context_length)
|
|
||||||
decoded = tokens_to_txt(tokenizer, out)
|
|
||||||
print(decoded)
|
|
||||||
# logits = gpt(batch)
|
|
||||||
# print(logits)
|
|
||||||
# print(logits.shape)
|
|
||||||
# dataset = GPTDataSetV1.data_loader(
|
|
||||||
# text,
|
|
||||||
# tokenizer,
|
|
||||||
# batch_size=8,
|
|
||||||
# max_len=4,
|
|
||||||
# stride=1,
|
|
||||||
# shuffle=False,
|
|
||||||
# )
|
|
||||||
# for inps, targs in iter(dataset):
|
# for inps, targs in iter(dataset):
|
||||||
# embeds = token_embedding_layer(inps)
|
# embeds = token_embedding_layer(inps)
|
||||||
# print(embeds.shape)
|
# print(embeds.shape)
|
||||||
@@ -86,9 +46,32 @@ def process_text(text: str):
|
|||||||
# tokenizer = BPETokenizer.build(text)
|
# tokenizer = BPETokenizer.build(text)
|
||||||
|
|
||||||
|
|
||||||
|
def attn_test():
|
||||||
|
inps = torch.Tensor(
|
||||||
|
[
|
||||||
|
[0.43, 0.15, 0.89],
|
||||||
|
[0.55, 0.87, 0.66],
|
||||||
|
[0.57, 0.85, 0.64],
|
||||||
|
[0.22, 0.58, 0.43],
|
||||||
|
[0.77, 0.25, 0.10],
|
||||||
|
[0.05, 0.80, 0.55],
|
||||||
|
]
|
||||||
|
)
|
||||||
|
batch = torch.stack((inps, inps), dim=0)
|
||||||
|
attn = MultiHeadAttention(
|
||||||
|
inps.shape[1],
|
||||||
|
8,
|
||||||
|
inps.shape[0],
|
||||||
|
dropout=True,
|
||||||
|
num_heads=2,
|
||||||
|
)
|
||||||
|
print(attn(batch))
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
raw_text = (DATA_DIR / "the-verdict.txt").read_text()
|
raw_text = (DATA_DIR / "the-verdict.txt").read_text()
|
||||||
process_text(raw_text)
|
# process_text(raw_text)
|
||||||
|
attn_test()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
108
llmfs/gpt.py
108
llmfs/gpt.py
@@ -2,8 +2,6 @@ from dataclasses import dataclass
|
|||||||
|
|
||||||
import torch
|
import torch
|
||||||
|
|
||||||
from llmfs.attn import MultiHeadAttention
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class GPTConfig:
|
class GPTConfig:
|
||||||
@@ -16,91 +14,8 @@ class GPTConfig:
|
|||||||
qkv_bias: bool
|
qkv_bias: bool
|
||||||
|
|
||||||
|
|
||||||
class DummyTransformerBlock(torch.nn.Module):
|
class DummyGPT:
|
||||||
def __init__(self, config: GPTConfig):
|
def __init__(self, config: GPTConfig):
|
||||||
super().__init__()
|
|
||||||
|
|
||||||
def forward(self, x: torch.Tensor) -> torch.Tensor:
|
|
||||||
return x
|
|
||||||
|
|
||||||
|
|
||||||
class GELU(torch.nn.Module):
|
|
||||||
def forward(self, x: torch.Tensor) -> torch.Tensor:
|
|
||||||
return (
|
|
||||||
0.5
|
|
||||||
* x
|
|
||||||
* (
|
|
||||||
1
|
|
||||||
+ torch.tanh(
|
|
||||||
torch.sqrt(torch.tensor(2.0 / torch.pi))
|
|
||||||
* (x + 0.44715 * torch.pow(x, 3))
|
|
||||||
)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class FeedForward(torch.nn.Module):
|
|
||||||
def __init__(self, cfg: GPTConfig) -> None:
|
|
||||||
super().__init__()
|
|
||||||
self.layers = torch.nn.Sequential(
|
|
||||||
torch.nn.Linear(cfg.embedding_dim, 4 * cfg.embedding_dim),
|
|
||||||
GELU(),
|
|
||||||
torch.nn.Linear(cfg.embedding_dim * 4, cfg.embedding_dim),
|
|
||||||
)
|
|
||||||
|
|
||||||
def forward(self, x: torch.Tensor) -> torch.Tensor:
|
|
||||||
return self.layers(x)
|
|
||||||
|
|
||||||
|
|
||||||
class TransformerBlock(torch.nn.Module):
|
|
||||||
def __init__(self, cfg: GPTConfig) -> None:
|
|
||||||
super().__init__()
|
|
||||||
self.att = MultiHeadAttention(
|
|
||||||
cfg.embedding_dim,
|
|
||||||
cfg.embedding_dim,
|
|
||||||
cfg.context_length,
|
|
||||||
cfg.dropout,
|
|
||||||
cfg.qkv_bias,
|
|
||||||
)
|
|
||||||
self.ff = FeedForward(cfg)
|
|
||||||
self.norm1 = NormLayer(cfg.embedding_dim)
|
|
||||||
self.norm2 = NormLayer(cfg.embedding_dim)
|
|
||||||
self.dropout = torch.nn.Dropout(cfg.dropout)
|
|
||||||
|
|
||||||
def forward(self, x: torch.Tensor) -> torch.Tensor:
|
|
||||||
shortcut = x
|
|
||||||
x = self.norm1(x)
|
|
||||||
x = self.att(x)
|
|
||||||
x = self.dropout(x)
|
|
||||||
x = x + shortcut
|
|
||||||
|
|
||||||
shortcut = x
|
|
||||||
x = self.norm2(x)
|
|
||||||
x = self.ff(x)
|
|
||||||
x = self.dropout(x)
|
|
||||||
x = x + shortcut
|
|
||||||
return x
|
|
||||||
|
|
||||||
|
|
||||||
class NormLayer(torch.nn.Module):
|
|
||||||
def __init__(self, dim: int, eps: float = 1e-5):
|
|
||||||
super().__init__()
|
|
||||||
self.dim = dim
|
|
||||||
self.eps = eps
|
|
||||||
self.scale = torch.nn.Parameter(torch.ones(dim))
|
|
||||||
self.shift = torch.nn.Parameter(torch.zeros(dim))
|
|
||||||
|
|
||||||
def forward(self, x: torch.Tensor) -> torch.Tensor:
|
|
||||||
mean = x.mean(-1, keepdim=True)
|
|
||||||
var = x.var(-1, keepdim=True, unbiased=True)
|
|
||||||
# Makes mean = 0 and variance = 1
|
|
||||||
norm_x = (x - mean) / torch.sqrt(var + self.eps)
|
|
||||||
return self.scale * norm_x + self.shift
|
|
||||||
|
|
||||||
|
|
||||||
class DummyGPT(torch.nn.Module):
|
|
||||||
def __init__(self, config: GPTConfig):
|
|
||||||
super().__init__()
|
|
||||||
self.tok_embedding = torch.nn.Embedding(
|
self.tok_embedding = torch.nn.Embedding(
|
||||||
config.vocab_size,
|
config.vocab_size,
|
||||||
config.embedding_dim,
|
config.embedding_dim,
|
||||||
@@ -109,23 +24,4 @@ class DummyGPT(torch.nn.Module):
|
|||||||
config.context_length,
|
config.context_length,
|
||||||
config.embedding_dim,
|
config.embedding_dim,
|
||||||
)
|
)
|
||||||
self.drop_emb = torch.nn.Dropout(config.dropout)
|
self.dropout = torch.nn.Dropout(config.dropout)
|
||||||
self.trf_blocks = torch.nn.Sequential(
|
|
||||||
*[TransformerBlock(config) for _ in range(config.n_layers)]
|
|
||||||
)
|
|
||||||
self.final_norm = NormLayer(config.embedding_dim)
|
|
||||||
self.out_head = torch.nn.Linear(
|
|
||||||
config.embedding_dim,
|
|
||||||
config.vocab_size,
|
|
||||||
bias=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
def forward(self, x: torch.Tensor) -> torch.Tensor:
|
|
||||||
_, seq_len = x.shape
|
|
||||||
res = self.tok_embedding(x) + self.pos_embedding(
|
|
||||||
torch.arange(seq_len, device=x.device)
|
|
||||||
)
|
|
||||||
res = self.drop_emb(res)
|
|
||||||
res = self.trf_blocks(res)
|
|
||||||
res = self.final_norm(res)
|
|
||||||
return self.out_head(res)
|
|
||||||
|
|||||||
@@ -1,14 +1,12 @@
|
|||||||
from .stoopid import StoopidTokenizer
|
from .stoopid import StoopidTokenizer
|
||||||
from .bpe import BPETokenizer
|
from .bpe import BPETokenizer
|
||||||
from typing import AbstractSet, Iterable, Protocol
|
from typing import Protocol
|
||||||
|
|
||||||
__all__ = ["BPETokenizer", "StoopidTokenizer", "Tokenizer"]
|
__all__ = ["BPETokenizer", "StoopidTokenizer", "Tokenizer"]
|
||||||
|
|
||||||
|
|
||||||
class Tokenizer(Protocol):
|
class Tokenizer(Protocol):
|
||||||
def encode(
|
def encode(self, text: str) -> list[int]: ...
|
||||||
self, text: str, allowed_special: AbstractSet[str] = set()
|
|
||||||
) -> list[int]: ...
|
|
||||||
def decode(self, tokens: list[int]) -> str: ...
|
def decode(self, tokens: list[int]) -> str: ...
|
||||||
@property
|
@property
|
||||||
def max_token_value(self) -> int: ...
|
def max_token_value(self) -> int: ...
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
from collections import Counter
|
from collections import Counter
|
||||||
from collections.abc import Iterable
|
|
||||||
import re
|
import re
|
||||||
from typing import AbstractSet, Self
|
from typing import Self
|
||||||
|
|
||||||
|
|
||||||
class BPETokenizer:
|
class BPETokenizer:
|
||||||
@@ -9,11 +8,9 @@ class BPETokenizer:
|
|||||||
UNKNOWN_TOKEN: str = "<|unknowntoken|>"
|
UNKNOWN_TOKEN: str = "<|unknowntoken|>"
|
||||||
END_OF_TEXT: str = "<|endoftext|>"
|
END_OF_TEXT: str = "<|endoftext|>"
|
||||||
|
|
||||||
def __init__(self, vocabulary: dict[str, int], specials: dict[str, int]) -> None:
|
def __init__(self, vocabulary: dict[str, int]) -> None:
|
||||||
self.forward: dict[str, int] = vocabulary
|
self.forward: dict[str, int] = vocabulary
|
||||||
self.reverse: dict[int, str] = {idx: token for token, idx in vocabulary.items()}
|
self.reverse: dict[int, str] = {idx: token for token, idx in vocabulary.items()}
|
||||||
self.specials = specials
|
|
||||||
self.special_values = set(specials.values())
|
|
||||||
self.unk_token: int = self.forward[self.UNKNOWN_TOKEN]
|
self.unk_token: int = self.forward[self.UNKNOWN_TOKEN]
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@@ -21,16 +18,11 @@ class BPETokenizer:
|
|||||||
return len(self.forward)
|
return len(self.forward)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def build(
|
def build(cls, text: str, target_vocab_size: int = -1) -> Self:
|
||||||
cls,
|
|
||||||
text: str,
|
|
||||||
target_vocab_size: int = -1,
|
|
||||||
specials: set[str] = {END_OF_TEXT, UNKNOWN_TOKEN},
|
|
||||||
) -> Self:
|
|
||||||
preprocessed = list(
|
preprocessed = list(
|
||||||
filter(bool, map(lambda x: x.lower().strip(), cls.SPLIT_PAT.split(text)))
|
filter(bool, map(lambda x: x.lower().strip(), cls.SPLIT_PAT.split(text)))
|
||||||
)
|
)
|
||||||
pre_vocab: set[str] = specials
|
pre_vocab: set[str] = set()
|
||||||
for word in preprocessed:
|
for word in preprocessed:
|
||||||
pre_vocab |= set(word)
|
pre_vocab |= set(word)
|
||||||
vocab: list[str] = sorted(pre_vocab)
|
vocab: list[str] = sorted(pre_vocab)
|
||||||
@@ -71,27 +63,18 @@ class BPETokenizer:
|
|||||||
|
|
||||||
vocab.extend([" ", cls.UNKNOWN_TOKEN, cls.END_OF_TEXT])
|
vocab.extend([" ", cls.UNKNOWN_TOKEN, cls.END_OF_TEXT])
|
||||||
vocab_dict = {token: i for i, token in enumerate(vocab)}
|
vocab_dict = {token: i for i, token in enumerate(vocab)}
|
||||||
specials_dict = {special: vocab_dict[special] for special in specials}
|
return cls(vocab_dict)
|
||||||
return cls(vocab_dict, specials_dict)
|
|
||||||
|
|
||||||
def _encode_word(
|
def _encode_word(self, word: str) -> list[int]:
|
||||||
self,
|
|
||||||
word: str,
|
|
||||||
allowed_specials: set[int],
|
|
||||||
) -> list[int]:
|
|
||||||
encoded: list[int] = []
|
|
||||||
parts = list(word.strip())
|
parts = list(word.strip())
|
||||||
start_part_idx = 0
|
start_part_idx = 0
|
||||||
|
encoded: list[int] = []
|
||||||
while start_part_idx < len(parts):
|
while start_part_idx < len(parts):
|
||||||
found = False
|
found = False
|
||||||
for i in range(len(parts), start_part_idx, -1):
|
for i in range(len(parts), start_part_idx, -1):
|
||||||
token = self.forward.get("".join(parts[start_part_idx:i]))
|
token = self.forward.get("".join(parts[start_part_idx:i]))
|
||||||
if token is not None:
|
if token is not None:
|
||||||
found = True
|
found = True
|
||||||
if token in self.special_values and token not in allowed_specials:
|
|
||||||
raise ValueError(
|
|
||||||
f"The token '{self.reverse[token]}' is not allowed."
|
|
||||||
)
|
|
||||||
encoded.append(token)
|
encoded.append(token)
|
||||||
start_part_idx = i
|
start_part_idx = i
|
||||||
break
|
break
|
||||||
@@ -103,11 +86,7 @@ class BPETokenizer:
|
|||||||
start_part_idx += 1
|
start_part_idx += 1
|
||||||
return encoded
|
return encoded
|
||||||
|
|
||||||
def encode(
|
def encode(self, text: str | list[str]) -> list[int]:
|
||||||
self,
|
|
||||||
text: str | list[str],
|
|
||||||
allowed_special: AbstractSet[str] = set(),
|
|
||||||
) -> list[int]:
|
|
||||||
if isinstance(text, list):
|
if isinstance(text, list):
|
||||||
text = f" {self.END_OF_TEXT} ".join(text)
|
text = f" {self.END_OF_TEXT} ".join(text)
|
||||||
|
|
||||||
@@ -115,15 +94,10 @@ class BPETokenizer:
|
|||||||
filter(bool, map(lambda x: x.lower().strip(), self.SPLIT_PAT.split(text)))
|
filter(bool, map(lambda x: x.lower().strip(), self.SPLIT_PAT.split(text)))
|
||||||
)
|
)
|
||||||
tokens: list[int] = []
|
tokens: list[int] = []
|
||||||
allowed_specials_tokens = {
|
|
||||||
self.forward[token] for token in allowed_special or []
|
|
||||||
}
|
|
||||||
for word in preprocessed:
|
for word in preprocessed:
|
||||||
if tokens:
|
if tokens:
|
||||||
tokens.append(self.forward[" "])
|
tokens.append(self.forward[" "])
|
||||||
tokens.extend(
|
tokens.extend(self._encode_word(word))
|
||||||
self._encode_word(word, allowed_specials=allowed_specials_tokens)
|
|
||||||
)
|
|
||||||
return tokens
|
return tokens
|
||||||
|
|
||||||
def decode(self, tokens: list[int]) -> str:
|
def decode(self, tokens: list[int]) -> str:
|
||||||
|
|||||||
Reference in New Issue
Block a user