머신러닝 & 딥러닝/딥러닝

[AI 기초 다지기] Transformer & The Annotated Transformer & Scaling Laws for Neural Language Models 논문 분석 및 코드 구현

Haru_29 2024. 11. 16. 10:53

Transformer 아키텍처 및 구현 상세 분석

1. Transformer 전체 구조

1.1 기본 구성

  • 인코더 6개 층
  • 디코더 6개 층
  • 모델 차원 d_model = 512
  • 내부 Feed-Forward Network 차원 = 2048
  • Multi-Head Attention의 헤드 수 = 8
  • Layer Normalization과 Residual Connection 사용

1.2 전체 데이터 흐름

[입력 시퀀스]
    ↓
[입력 임베딩 (d_model=512)]
    ↓
[포지셔널 인코딩 추가]
    ↓
[인코더 스택 (x6)]
    ↓
[인코더 출력]
    ↓     ↘
    ↓     [디코더 입력 임베딩]
    ↓         ↓
    ↓     [포지셔널 인코딩 추가]
    ↓         ↓
    ↓     [디코더 스택 (x6)]
    ↓         ↓
    ↓→→→→→→→→↓
        ↓
    [선형 층]
        ↓
    [소프트맥스]
        ↓
    [출력 확률]

2. 핵심 컴포넌트 상세 구현

2.1 Scaled Dot-Product Attention

수식

Attention(Q, K, V) = softmax(QK^T/√d_k)V

구현 코드

def scaled_dot_product_attention(query, key, value, mask=None):
    """
    query, key, value의 차원: (..., seq_len, d_k)
    mask의 차원: (..., seq_len, seq_len)
    """
    matmul_qk = tf.matmul(query, key, transpose_b=True)  # (..., seq_len, seq_len)
    
    # Scale matmul_qk
    depth = tf.cast(tf.shape(key)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(depth)
    
    # Add mask if provided
    if mask is not None:
        scaled_attention_logits += (mask * -1e9)
    
    # Softmax 적용
    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
    
    # 최종 attention 값 계산
    output = tf.matmul(attention_weights, value)  # (..., seq_len, d_v)
    
    return output, attention_weights

2.2 Multi-Head Attention

수식

MultiHead(Q, K, V) = Concat(head1, ..., headh)W^O
where headi = Attention(QW^Q_i, KW^K_i, VW^V_i)

구현 코드

class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        
        assert d_model % self.num_heads == 0
        self.depth = d_model // self.num_heads
        
        # 선형 투영을 위한 가중치 행렬
        self.wq = tf.keras.layers.Dense(d_model)
        self.wk = tf.keras.layers.Dense(d_model)
        self.wv = tf.keras.layers.Dense(d_model)
        
        self.dense = tf.keras.layers.Dense(d_model)
        
    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])
        
    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]
        
        # 선형 레이어를 통과시킨 후 head 분할
        q = self.split_heads(self.wq(q), batch_size)  # (batch_size, num_heads, seq_len_q, depth)
        k = self.split_heads(self.wk(k), batch_size)  # (batch_size, num_heads, seq_len_k, depth)
        v = self.split_heads(self.wv(v), batch_size)  # (batch_size, num_heads, seq_len_v, depth)
        
        # 스케일드 닷-프로덕트 어텐션 적용
        scaled_attention, attention_weights = scaled_dot_product_attention(
            q, k, v, mask)
        
        # 헤드 결합
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(scaled_attention, 
                                    (batch_size, -1, self.d_model))
        
        # 최종 선형 레이어
        output = self.dense(concat_attention)
        
        return output, attention_weights

2.3 Position-wise Feed-Forward Network

수식

FFN(x) = max(0, xW1 + b1)W2 + b2

구현 코드

class FeedForward(tf.keras.layers.Layer):
    def __init__(self, d_model, dff, dropout_rate=0.1):
        super().__init__()
        self.d_model = d_model
        self.dff = dff
        
        self.dense1 = tf.keras.layers.Dense(dff, activation='relu')
        self.dense2 = tf.keras.layers.Dense(d_model)
        self.dropout = tf.keras.layers.Dropout(dropout_rate)
        
    def call(self, x, training):
        x = self.dense1(x)  # (batch_size, seq_len, dff)
        x = self.dropout(x, training=training)
        x = self.dense2(x)  # (batch_size, seq_len, d_model)
        return x

2.4 Positional Encoding

수식

PE(pos,2i) = sin(pos/10000^(2i/d_model))
PE(pos,2i+1) = cos(pos/10000^(2i/d_model))

구현 코드

def get_positional_encoding(position, d_model):
    def get_angles(pos, i, d_model):
        angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
        return pos * angle_rates

    angle_rads = get_angles(np.arange(position)[:, np.newaxis],
                           np.arange(d_model)[np.newaxis, :],
                           d_model)
    
    # 짝수 인덱스에는 sin 적용
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    
    # 홀수 인덱스에는 cos 적용
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    
    pos_encoding = angle_rads[np.newaxis, ...]
    
    return tf.cast(pos_encoding, dtype=tf.float32)

3. 인코더-디코더 구조 구현

3.1 인코더 레이어

class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, dropout_rate=0.1):
        super().__init__()
        
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, dff)
        
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        
        self.dropout1 = tf.keras.layers.Dropout(dropout_rate)
        self.dropout2 = tf.keras.layers.Dropout(dropout_rate)
        
    def call(self, x, training, mask):
        # 멀티-헤드 어텐션
        attn_output, _ = self.mha(x, x, x, mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)
        
        # 피드포워드 네트워크
        ffn_output = self.ffn(out1, training=training)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)
        
        return out2

3.2 디코더 레이어

class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, dropout_rate=0.1):
        super().__init__()
        
        self.mha1 = MultiHeadAttention(d_model, num_heads)
        self.mha2 = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, dff)
        
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        
        self.dropout1 = tf.keras.layers.Dropout(dropout_rate)
        self.dropout2 = tf.keras.layers.Dropout(dropout_rate)
        self.dropout3 = tf.keras.layers.Dropout(dropout_rate)
        
    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        # 자기 어텐션
        attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)
        
        # 인코더-디코더 어텐션
        attn2, attn_weights_block2 = self.mha2(
            enc_output, enc_output, out1, padding_mask)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2 + out1)
        
        # 피드포워드 네트워크
        ffn_output = self.ffn(out2, training=training)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(ffn_output + out2)
        
        return out3, attn_weights_block1, attn_weights_block2

4. 학습 관련 설정

4.1 최적화 설정

class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000):
        super().__init__()
        
        self.d_model = tf.cast(d_model, tf.float32)
        self.warmup_steps = warmup_steps
        
    def __call__(self, step):
        step = tf.cast(step, tf.float32)
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps ** -1.5)
        
        return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

# 학습률 스케줄 및 옵티마이저 설정
learning_rate = CustomSchedule(d_model)
optimizer = tf.keras.optimizers.Adam(
    learning_rate,
    beta_1=0.9,
    beta_2=0.98,
    epsilon=1e-9
)

4.2 손실 함수와 마스킹

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred, mask):
    loss_ = loss_object(real, pred)
    
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    
    return tf.reduce_sum(loss_)/tf.reduce_sum(mask)

def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    return seq[:, tf.newaxis, tf.newaxis, :]  # (batch_size, 1, 1, seq_len)

def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask  # (seq_len, seq_len)

 

 

The Annotated Transformer

1. 기본 아키텍처 구조

1.1 인코더-디코더 기본 프레임워크

class EncoderDecoder(nn.Module):
    def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):
        super(EncoderDecoder, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.src_embed = src_embed
        self.tgt_embed = tgt_embed
        self.generator = generator
        
    def forward(self, src, tgt, src_mask, tgt_mask):
        return self.decode(self.encode(src, src_mask), src_mask, tgt, tgt_mask)

1.2 주요 구성 요소

  1. 인코더 스택 (6개 레이어)
  2. 디코더 스택 (6개 레이어)
  3. Multi-Head Attention
  4. Position-wise Feed-Forward Networks
  5. Embedding 및 Positional Encoding
  6. 최종 Linear 및 Softmax 층

2. 핵심 컴포넌트 구현

2.1 Multi-Head Attention

class MultiHeadedAttention(nn.Module):
    def __init__(self, h, d_model, dropout=0.1):
        super().__init__()
        assert d_model % h == 0
        self.d_k = d_model // h
        self.h = h
        self.linear_layers = clones(nn.Linear(d_model, d_model), 4)
        self.attn = None
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)
        
        # 1) 선형 변환 및 헤드 분할
        query, key, value = [
            lin(x).view(batch_size, -1, self.h, self.d_k).transpose(1, 2)
            for lin, x in zip(self.linear_layers, (query, key, value))
        ]
        
        # 2) Scaled Dot-Product Attention 적용
        x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)
        
        # 3) 헤드 결합 및 최종 선형 변환
        x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.h * self.d_k)
        
        return self.linear_layers[-1](x)

2.2 Scaled Dot-Product Attention

def attention(query, key, value, mask=None, dropout=None):
    d_k = query.size(-1)
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
    
    if mask is not None:
        scores = scores.masked_fill(mask == 0, -1e9)
    
    p_attn = scores.softmax(dim=-1)
    if dropout is not None:
        p_attn = dropout(p_attn)
        
    return torch.matmul(p_attn, value), p_attn

2.3 Position-wise Feed-Forward Networks

class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.w_1 = nn.Linear(d_model, d_ff)
        self.w_2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.w_2(self.dropout(self.w_1(x).relu()))

2.4 Positional Encoding

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout, max_len=5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)
        
        # 포지셔널 인코딩 행렬 계산
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model)
        )
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)].requires_grad_(False)
        return self.dropout(x)

3. 인코더-디코더 상세 구현

3.1 인코더

class Encoder(nn.Module):
    def __init__(self, layer, N):
        super().__init__()
        self.layers = clones(layer, N)
        self.norm = LayerNorm(layer.size)
        
    def forward(self, x, mask):
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)

class EncoderLayer(nn.Module):
    def __init__(self, size, self_attn, feed_forward, dropout):
        super().__init__()
        self.self_attn = self_attn
        self.feed_forward = feed_forward
        self.sublayer = clones(SublayerConnection(size, dropout), 2)
        self.size = size

    def forward(self, x, mask):
        x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))
        return self.sublayer[1](x, self.feed_forward)

3.2 디코더

class Decoder(nn.Module):
    def __init__(self, layer, N):
        super().__init__()
        self.layers = clones(layer, N)
        self.norm = LayerNorm(layer.size)
        
    def forward(self, x, memory, src_mask, tgt_mask):
        for layer in self.layers:
            x = layer(x, memory, src_mask, tgt_mask)
        return self.norm(x)

class DecoderLayer(nn.Module):
    def __init__(self, size, self_attn, src_attn, feed_forward, dropout):
        super().__init__()
        self.size = size
        self.self_attn = self_attn
        self.src_attn = src_attn
        self.feed_forward = feed_forward
        self.sublayer = clones(SublayerConnection(size, dropout), 3)

    def forward(self, x, memory, src_mask, tgt_mask):
        x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
        x = self.sublayer[1](x, lambda x: self.src_attn(x, memory, memory, src_mask))
        return self.sublayer[2](x, self.feed_forward)

4. 모델 구성 파라미터

  1. 기본 모델 파라미터:
    • d_model = 512 (모델의 기본 차원)
    • h = 8 (어텐션 헤드 수)
    • d_ff = 2048 (Feed-Forward Network 내부 차원)
    • N = 6 (인코더/디코더 층 수)
    • dropout = 0.1 (드롭아웃 비율)
  2. 어텐션 관련 파라미터:
    • d_k = d_v = d_model/h = 64 (각 헤드의 차원)
    • 스케일링 팩터: 1/√d_k

5. 서브컴포넌트

5.1 LayerNorm과 Residual Connection

class LayerNorm(nn.Module):
    def __init__(self, features, eps=1e-6):
        super().__init__()
        self.a_2 = nn.Parameter(torch.ones(features))
        self.b_2 = nn.Parameter(torch.zeros(features))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        return self.a_2 * (x - mean) / (std + self.eps) + self.b_2

5.2 Embedding과 Generator

class Embeddings(nn.Module):
    def __init__(self, d_model, vocab):
        super().__init__()
        self.lut = nn.Embedding(vocab, d_model)
        self.d_model = d_model

    def forward(self, x):
        return self.lut(x) * math.sqrt(self.d_model)

class Generator(nn.Module):
    def __init__(self, d_model, vocab):
        super().__init__()
        self.proj = nn.Linear(d_model, vocab)

    def forward(self, x):
        return F.log_softmax(self.proj(x), dim=-1)

 

 

언어 모델 스케일링 법칙 상세 구현 가이드

1. 핵심 스케일링 매개변수

1.1 주요 매개변수 정의

  • N: 임베딩을 제외한 모델 파라미터 수
  • D: 데이터셋 크기 (토큰 수)
  • C: 학습에 사용된 총 컴퓨팅 양 (PF-days 단위)
  • B: 배치 크기
  • L: 손실 값 (cross-entropy)

1.2 파라미터 스케일링 관계식

L(N) = (Nc/N)^αN   # 파라미터 스케일링
L(D) = (Dc/D)^αD   # 데이터 스케일링  
L(C) = (Cc/C)^αC   # 컴퓨팅 스케일링

여기서 각 지수값은:

  • αN ≈ 0.076
  • αD ≈ 0.095
  • αC ≈ 0.050

2. 모델 아키텍처 구현

2.1 Transformer 기본 구조

class Transformer(nn.Module):
    def __init__(self, d_model, nlayers, nheads, d_ff, dropout=0.1):
        super().__init__()
        self.encoder = Encoder(
            EncoderLayer(d_model, nheads, d_ff, dropout),
            nlayers
        )
        self.decoder = Decoder(
            DecoderLayer(d_model, nheads, d_ff, dropout), 
            nlayers
        )
        self.src_embed = Embeddings(d_model, src_vocab)
        self.tgt_embed = Embeddings(d_model, tgt_vocab)
        self.pos_enc = PositionalEncoding(d_model, dropout)

2.2 최적 모델 크기 계산

def compute_optimal_size(compute_budget):
    """컴퓨팅 예산에 따른 최적 모델 크기 계산"""
    N = (1.3 * 10**9) * compute_budget**0.73
    return N

def compute_optimal_batch(loss):
    """손실값에 따른 최적 배치 크기 계산"""
    B_crit = (2.1 * 10**8) * loss**4.8
    return B_crit

3. 학습 최적화 구현

3.1 학습 스케줄러

class CustomScheduler:
    def __init__(self, optimizer, d_model, warmup_steps=4000):
        self.optimizer = optimizer
        self.d_model = d_model
        self.warmup_steps = warmup_steps
        self.current_step = 0

    def step(self):
        self.current_step += 1
        rate = self.compute_rate()
        for p in self.optimizer.param_groups:
            p['lr'] = rate
        
    def compute_rate(self):
        step = self.current_step
        return self.d_model**(-0.5) * min(
            step**(-0.5), 
            step * self.warmup_steps**(-1.5)
        )

3.2 배치 크기 최적화

def optimize_batch_size(model, loss):
    """Critical batch size 계산"""
    B_crit = 2e8 * (loss ** (-1/0.21))
    return int(B_crit)

def adjust_training_params(batch_size, B_crit):
    """B_crit에 따른 학습 파라미터 조정"""
    steps_adjustment = 1 / (1 + B_crit/batch_size)
    compute_adjustment = 1 / (1 + batch_size/B_crit)
    return steps_adjustment, compute_adjustment

4. 구현 세부사항

4.1 모델 스케일링 유틸리티

class ModelScaler:
    def __init__(self, base_config):
        self.base_config = base_config
        
    def scale_model(self, compute_budget):
        """컴퓨팅 예산에 따른 모델 스케일링"""
        optimal_params = compute_optimal_size(compute_budget)
        
        # 레이어 수와 차원 계산
        d_model = int((optimal_params/12)**0.5)
        n_layers = int(optimal_params/(12 * d_model**2))
        
        return {
            'd_model': d_model,
            'n_layers': n_layers,
            'd_ff': d_model * 4,
            'n_heads': d_model // 64
        }

    def compute_expected_loss(self, config):
        """주어진 설정에 대한 예상 손실 계산"""
        N = 12 * config['n_layers'] * config['d_model']**2
        return (8.8e13/N)**0.076

4.2 데이터셋 요구사항 계산

class DatasetRequirements:
    def calculate_min_tokens(self, params):
        """최소 필요 데이터셋 크기 계산"""
        return 5000 * (params**0.74)
        
    def calculate_optimal_tokens(self, compute):
        """최적 데이터셋 크기 계산"""
        return int((4e10) * (compute**0.26))

    def validate_dataset_size(self, tokens, params):
        """데이터셋 크기 적절성 검증"""
        min_tokens = self.calculate_min_tokens(params)
        return tokens >= min_tokens

4.3 컴퓨팅 효율성 최적화

class ComputeOptimizer:
    def __init__(self, model_size, target_loss):
        self.model_size = model_size
        self.target_loss = target_loss
        
    def compute_optimal_training(self):
        """최적 학습 파라미터 계산"""
        B_crit = optimize_batch_size(self.model_size, self.target_loss)
        steps = self.calculate_steps(B_crit)
        compute = self.calculate_compute(B_crit, steps)
        
        return {
            'batch_size': B_crit,
            'steps': steps,
            'compute': compute
        }
        
    def calculate_steps(self, batch_size):
        """최적 학습 스텝 수 계산"""
        base_steps = 5.4e3
        compute = self.model_size * batch_size * base_steps * 6
        return int(base_steps * (compute**0.03))

    def calculate_compute(self, batch_size, steps):
        """총 컴퓨팅 요구량 계산"""
        return 6 * self.model_size * batch_size * steps

4.4 모델 초기화 및 정규화

class ComputeOptimizer:
    def __init__(self, model_size, target_loss):
        self.model_size = model_size
        self.target_loss = target_loss
        
    def compute_optimal_training(self):
        """최적 학습 파라미터 계산"""
        B_crit = optimize_batch_size(self.model_size, self.target_loss)
        steps = self.calculate_steps(B_crit)
        compute = self.calculate_compute(B_crit, steps)
        
        return {
            'batch_size': B_crit,
            'steps': steps,
            'compute': compute
        }
        
    def calculate_steps(self, batch_size):
        """최적 학습 스텝 수 계산"""
        base_steps = 5.4e3
        compute = self.model_size * batch_size * base_steps * 6
        return int(base_steps * (compute**0.03))

    def calculate_compute(self, batch_size, steps):
        """총 컴퓨팅 요구량 계산"""
        return 6 * self.model_size * batch_size * steps

5. 주요 하이퍼파라미터 가이드라인

5.1 기본 설정

BASE_CONFIG = {
    'd_model': 512,
    'n_layers': 6,
    'd_ff': 2048,
    'n_heads': 8,
    'dropout': 0.1,
    'max_seq_length': 1024,
    'learning_rate': 0.0001,
    'warmup_steps': 4000
}

5.2 스케일링 관계

  1. 모델 크기 증가: N ∝ C^0.73
  2. 배치 크기 증가: B ∝ C^0.24
  3. 학습 스텝 증가: S ∝ C^0.03
  4. 데이터셋 크기: D ∝ N^0.74