from .attention import Attention from .tiler import TileWorker from einops import repeat, rearrange import math import torch class HunyuanDiTRotaryEmbedding(torch.nn.Module): def __init__(self, q_norm_shape=88, k_norm_shape=88, rotary_emb_on_k=True): super().__init__() self.q_norm = torch.nn.LayerNorm((q_norm_shape,), elementwise_affine=True, eps=1e-06) self.k_norm = torch.nn.LayerNorm((k_norm_shape,), elementwise_affine=True, eps=1e-06) self.rotary_emb_on_k = rotary_emb_on_k self.k_cache, self.v_cache = [], [] def reshape_for_broadcast(self, freqs_cis, x): ndim = x.ndim shape = [d if i == ndim - 2 or i == ndim - 1 else 1 for i, d in enumerate(x.shape)] return freqs_cis[0].view(*shape), freqs_cis[1].view(*shape) def rotate_half(self, x): x_real, x_imag = x.float().reshape(*x.shape[:-1], -1, 2).unbind(-1) return torch.stack([-x_imag, x_real], dim=-1).flatten(3) def apply_rotary_emb(self, xq, xk, freqs_cis): xk_out = None cos, sin = self.reshape_for_broadcast(freqs_cis, xq) cos, sin = cos.to(xq.device), sin.to(xq.device) xq_out = (xq.float() * cos + self.rotate_half(xq.float()) * sin).type_as(xq) if xk is not None: xk_out = (xk.float() * cos + self.rotate_half(xk.float()) * sin).type_as(xk) return xq_out, xk_out def forward(self, q, k, v, freqs_cis_img, to_cache=False): # norm q = self.q_norm(q) k = self.k_norm(k) # RoPE if self.rotary_emb_on_k: q, k = self.apply_rotary_emb(q, k, freqs_cis_img) else: q, _ = self.apply_rotary_emb(q, None, freqs_cis_img) if to_cache: self.k_cache.append(k) self.v_cache.append(v) elif len(self.k_cache) > 0 and len(self.v_cache) > 0: k = torch.concat([k] + self.k_cache, dim=2) v = torch.concat([v] + self.v_cache, dim=2) self.k_cache, self.v_cache = [], [] return q, k, v class FP32_Layernorm(torch.nn.LayerNorm): def forward(self, inputs): origin_dtype = inputs.dtype return torch.nn.functional.layer_norm(inputs.float(), self.normalized_shape, self.weight.float(), self.bias.float(), self.eps).to(origin_dtype) class FP32_SiLU(torch.nn.SiLU): def forward(self, inputs): origin_dtype = inputs.dtype return torch.nn.functional.silu(inputs.float(), inplace=False).to(origin_dtype) class HunyuanDiTFinalLayer(torch.nn.Module): def __init__(self, final_hidden_size=1408, condition_dim=1408, patch_size=2, out_channels=8): super().__init__() self.norm_final = torch.nn.LayerNorm(final_hidden_size, elementwise_affine=False, eps=1e-6) self.linear = torch.nn.Linear(final_hidden_size, patch_size * patch_size * out_channels, bias=True) self.adaLN_modulation = torch.nn.Sequential( FP32_SiLU(), torch.nn.Linear(condition_dim, 2 * final_hidden_size, bias=True) ) def modulate(self, x, shift, scale): return x * (1 + scale.unsqueeze(1)) + shift.unsqueeze(1) def forward(self, hidden_states, condition_emb): shift, scale = self.adaLN_modulation(condition_emb).chunk(2, dim=1) hidden_states = self.modulate(self.norm_final(hidden_states), shift, scale) hidden_states = self.linear(hidden_states) return hidden_states class HunyuanDiTBlock(torch.nn.Module): def __init__( self, hidden_dim=1408, condition_dim=1408, num_heads=16, mlp_ratio=4.3637, text_dim=1024, skip_connection=False ): super().__init__() self.norm1 = FP32_Layernorm((hidden_dim,), eps=1e-6, elementwise_affine=True) self.rota1 = HunyuanDiTRotaryEmbedding(hidden_dim//num_heads, hidden_dim//num_heads) self.attn1 = Attention(hidden_dim, num_heads, hidden_dim//num_heads, bias_q=True, bias_kv=True, bias_out=True) self.norm2 = FP32_Layernorm((hidden_dim,), eps=1e-6, elementwise_affine=True) self.rota2 = HunyuanDiTRotaryEmbedding(hidden_dim//num_heads, hidden_dim//num_heads, rotary_emb_on_k=False) self.attn2 = Attention(hidden_dim, num_heads, hidden_dim//num_heads, kv_dim=text_dim, bias_q=True, bias_kv=True, bias_out=True) self.norm3 = FP32_Layernorm((hidden_dim,), eps=1e-6, elementwise_affine=True) self.modulation = torch.nn.Sequential(FP32_SiLU(), torch.nn.Linear(condition_dim, hidden_dim, bias=True)) self.mlp = torch.nn.Sequential( torch.nn.Linear(hidden_dim, int(hidden_dim*mlp_ratio), bias=True), torch.nn.GELU(approximate="tanh"), torch.nn.Linear(int(hidden_dim*mlp_ratio), hidden_dim, bias=True) ) if skip_connection: self.skip_norm = FP32_Layernorm((hidden_dim * 2,), eps=1e-6, elementwise_affine=True) self.skip_linear = torch.nn.Linear(hidden_dim * 2, hidden_dim, bias=True) else: self.skip_norm, self.skip_linear = None, None def forward(self, hidden_states, condition_emb, text_emb, freq_cis_img, residual=None, to_cache=False): # Long Skip Connection if self.skip_norm is not None and self.skip_linear is not None: hidden_states = torch.cat([hidden_states, residual], dim=-1) hidden_states = self.skip_norm(hidden_states) hidden_states = self.skip_linear(hidden_states) # Self-Attention shift_msa = self.modulation(condition_emb).unsqueeze(dim=1) attn_input = self.norm1(hidden_states) + shift_msa hidden_states = hidden_states + self.attn1(attn_input, qkv_preprocessor=lambda q, k, v: self.rota1(q, k, v, freq_cis_img, to_cache=to_cache)) # Cross-Attention attn_input = self.norm3(hidden_states) hidden_states = hidden_states + self.attn2(attn_input, text_emb, qkv_preprocessor=lambda q, k, v: self.rota2(q, k, v, freq_cis_img)) # FFN Layer mlp_input = self.norm2(hidden_states) hidden_states = hidden_states + self.mlp(mlp_input) return hidden_states class AttentionPool(torch.nn.Module): def __init__(self, spacial_dim, embed_dim, num_heads, output_dim = None): super().__init__() self.positional_embedding = torch.nn.Parameter(torch.randn(spacial_dim + 1, embed_dim) / embed_dim ** 0.5) self.k_proj = torch.nn.Linear(embed_dim, embed_dim) self.q_proj = torch.nn.Linear(embed_dim, embed_dim) self.v_proj = torch.nn.Linear(embed_dim, embed_dim) self.c_proj = torch.nn.Linear(embed_dim, output_dim or embed_dim) self.num_heads = num_heads def forward(self, x): x = x.permute(1, 0, 2) # NLC -> LNC x = torch.cat([x.mean(dim=0, keepdim=True), x], dim=0) # (L+1)NC x = x + self.positional_embedding[:, None, :].to(x.dtype) # (L+1)NC x, _ = torch.nn.functional.multi_head_attention_forward( query=x[:1], key=x, value=x, embed_dim_to_check=x.shape[-1], num_heads=self.num_heads, q_proj_weight=self.q_proj.weight, k_proj_weight=self.k_proj.weight, v_proj_weight=self.v_proj.weight, in_proj_weight=None, in_proj_bias=torch.cat([self.q_proj.bias, self.k_proj.bias, self.v_proj.bias]), bias_k=None, bias_v=None, add_zero_attn=False, dropout_p=0, out_proj_weight=self.c_proj.weight, out_proj_bias=self.c_proj.bias, use_separate_proj_weight=True, training=self.training, need_weights=False ) return x.squeeze(0) class PatchEmbed(torch.nn.Module): def __init__( self, patch_size=(2, 2), in_chans=4, embed_dim=1408, bias=True, ): super().__init__() self.proj = torch.nn.Conv2d(in_chans, embed_dim, kernel_size=patch_size, stride=patch_size, bias=bias) def forward(self, x): x = self.proj(x) x = x.flatten(2).transpose(1, 2) # BCHW -> BNC return x def timestep_embedding(t, dim, max_period=10000, repeat_only=False): # https://github.com/openai/glide-text2im/blob/main/glide_text2im/nn.py if not repeat_only: half = dim // 2 freqs = torch.exp( -math.log(max_period) * torch.arange(start=0, end=half, dtype=torch.float32) / half ).to(device=t.device) # size: [dim/2], 一个指数衰减的曲线 args = t[:, None].float() * freqs[None] embedding = torch.cat([torch.cos(args), torch.sin(args)], dim=-1) if dim % 2: embedding = torch.cat( [embedding, torch.zeros_like(embedding[:, :1])], dim=-1 ) else: embedding = repeat(t, "b -> b d", d=dim) return embedding class TimestepEmbedder(torch.nn.Module): def __init__(self, hidden_size=1408, frequency_embedding_size=256): super().__init__() self.mlp = torch.nn.Sequential( torch.nn.Linear(frequency_embedding_size, hidden_size, bias=True), torch.nn.SiLU(), torch.nn.Linear(hidden_size, hidden_size, bias=True), ) self.frequency_embedding_size = frequency_embedding_size def forward(self, t): t_freq = timestep_embedding(t, self.frequency_embedding_size).type(self.mlp[0].weight.dtype) t_emb = self.mlp(t_freq) return t_emb class HunyuanDiT(torch.nn.Module): def __init__(self, num_layers_down=21, num_layers_up=19, in_channels=4, out_channels=8, hidden_dim=1408, text_dim=1024, t5_dim=2048, text_length=77, t5_length=256): super().__init__() # Embedders self.text_emb_padding = torch.nn.Parameter(torch.randn(text_length + t5_length, text_dim, dtype=torch.float32)) self.t5_embedder = torch.nn.Sequential( torch.nn.Linear(t5_dim, t5_dim * 4, bias=True), FP32_SiLU(), torch.nn.Linear(t5_dim * 4, text_dim, bias=True), ) self.t5_pooler = AttentionPool(t5_length, t5_dim, num_heads=8, output_dim=1024) self.style_embedder = torch.nn.Parameter(torch.randn(hidden_dim)) self.patch_embedder = PatchEmbed(in_chans=in_channels) self.timestep_embedder = TimestepEmbedder() self.extra_embedder = torch.nn.Sequential( torch.nn.Linear(256 * 6 + 1024 + hidden_dim, hidden_dim * 4), FP32_SiLU(), torch.nn.Linear(hidden_dim * 4, hidden_dim), ) # Transformer blocks self.num_layers_down = num_layers_down self.num_layers_up = num_layers_up self.blocks = torch.nn.ModuleList( [HunyuanDiTBlock(skip_connection=False) for _ in range(num_layers_down)] + \ [HunyuanDiTBlock(skip_connection=True) for _ in range(num_layers_up)] ) # Output layers self.final_layer = HunyuanDiTFinalLayer() self.out_channels = out_channels def prepare_text_emb(self, text_emb, text_emb_t5, text_emb_mask, text_emb_mask_t5): text_emb_mask = text_emb_mask.bool() text_emb_mask_t5 = text_emb_mask_t5.bool() text_emb_t5 = self.t5_embedder(text_emb_t5) text_emb = torch.cat([text_emb, text_emb_t5], dim=1) text_emb_mask = torch.cat([text_emb_mask, text_emb_mask_t5], dim=-1) text_emb = torch.where(text_emb_mask.unsqueeze(2), text_emb, self.text_emb_padding.to(text_emb)) return text_emb def prepare_extra_emb(self, text_emb_t5, timestep, size_emb, dtype, batch_size): # Text embedding pooled_text_emb_t5 = self.t5_pooler(text_emb_t5) # Timestep embedding timestep_emb = self.timestep_embedder(timestep) # Size embedding size_emb = timestep_embedding(size_emb.view(-1), 256).to(dtype) size_emb = size_emb.view(-1, 6 * 256) # Style embedding style_emb = repeat(self.style_embedder, "D -> B D", B=batch_size) # Concatenate all extra vectors extra_emb = torch.cat([pooled_text_emb_t5, size_emb, style_emb], dim=1) condition_emb = timestep_emb + self.extra_embedder(extra_emb) return condition_emb def unpatchify(self, x, h, w): return rearrange(x, "B (H W) (P Q C) -> B C (H P) (W Q)", H=h, W=w, P=2, Q=2) def build_mask(self, data, is_bound): _, _, H, W = data.shape h = repeat(torch.arange(H), "H -> H W", H=H, W=W) w = repeat(torch.arange(W), "W -> H W", H=H, W=W) border_width = (H + W) // 4 pad = torch.ones_like(h) * border_width mask = torch.stack([ pad if is_bound[0] else h + 1, pad if is_bound[1] else H - h, pad if is_bound[2] else w + 1, pad if is_bound[3] else W - w ]).min(dim=0).values mask = mask.clip(1, border_width) mask = (mask / border_width).to(dtype=data.dtype, device=data.device) mask = rearrange(mask, "H W -> 1 H W") return mask def tiled_block_forward(self, block, hidden_states, condition_emb, text_emb, freq_cis_img, residual, torch_dtype, data_device, computation_device, tile_size, tile_stride): B, C, H, W = hidden_states.shape weight = torch.zeros((1, 1, H, W), dtype=torch_dtype, device=data_device) values = torch.zeros((B, C, H, W), dtype=torch_dtype, device=data_device) # Split tasks tasks = [] for h in range(0, H, tile_stride): for w in range(0, W, tile_stride): if (h-tile_stride >= 0 and h-tile_stride+tile_size >= H) or (w-tile_stride >= 0 and w-tile_stride+tile_size >= W): continue h_, w_ = h + tile_size, w + tile_size if h_ > H: h, h_ = H - tile_size, H if w_ > W: w, w_ = W - tile_size, W tasks.append((h, h_, w, w_)) # Run for hl, hr, wl, wr in tasks: hidden_states_batch = hidden_states[:, :, hl:hr, wl:wr].to(computation_device) hidden_states_batch = rearrange(hidden_states_batch, "B C H W -> B (H W) C") if residual is not None: residual_batch = residual[:, :, hl:hr, wl:wr].to(computation_device) residual_batch = rearrange(residual_batch, "B C H W -> B (H W) C") else: residual_batch = None # Forward hidden_states_batch = block(hidden_states_batch, condition_emb, text_emb, freq_cis_img, residual_batch).to(data_device) hidden_states_batch = rearrange(hidden_states_batch, "B (H W) C -> B C H W", H=hr-hl) mask = self.build_mask(hidden_states_batch, is_bound=(hl==0, hr>=H, wl==0, wr>=W)) values[:, :, hl:hr, wl:wr] += hidden_states_batch * mask weight[:, :, hl:hr, wl:wr] += mask values /= weight return values def forward( self, hidden_states, text_emb, text_emb_t5, text_emb_mask, text_emb_mask_t5, timestep, size_emb, freq_cis_img, tiled=False, tile_size=64, tile_stride=32, to_cache=False, use_gradient_checkpointing=False, ): # Embeddings text_emb = self.prepare_text_emb(text_emb, text_emb_t5, text_emb_mask, text_emb_mask_t5) condition_emb = self.prepare_extra_emb(text_emb_t5, timestep, size_emb, hidden_states.dtype, hidden_states.shape[0]) # Input height, width = hidden_states.shape[-2], hidden_states.shape[-1] hidden_states = self.patch_embedder(hidden_states) # Blocks def create_custom_forward(module): def custom_forward(*inputs): return module(*inputs) return custom_forward if tiled: hidden_states = rearrange(hidden_states, "B (H W) C -> B C H W", H=height//2) residuals = [] for block_id, block in enumerate(self.blocks): residual = residuals.pop() if block_id >= self.num_layers_down else None hidden_states = self.tiled_block_forward( block, hidden_states, condition_emb, text_emb, freq_cis_img, residual, torch_dtype=hidden_states.dtype, data_device=hidden_states.device, computation_device=hidden_states.device, tile_size=tile_size, tile_stride=tile_stride ) if block_id < self.num_layers_down - 2: residuals.append(hidden_states) hidden_states = rearrange(hidden_states, "B C H W -> B (H W) C") else: residuals = [] for block_id, block in enumerate(self.blocks): residual = residuals.pop() if block_id >= self.num_layers_down else None if self.training and use_gradient_checkpointing: hidden_states = torch.utils.checkpoint.checkpoint( create_custom_forward(block), hidden_states, condition_emb, text_emb, freq_cis_img, residual, use_reentrant=False, ) else: hidden_states = block(hidden_states, condition_emb, text_emb, freq_cis_img, residual, to_cache=to_cache) if block_id < self.num_layers_down - 2: residuals.append(hidden_states) # Output hidden_states = self.final_layer(hidden_states, condition_emb) hidden_states = self.unpatchify(hidden_states, height//2, width//2) hidden_states, _ = hidden_states.chunk(2, dim=1) return hidden_states def state_dict_converter(self): return HunyuanDiTStateDictConverter() class HunyuanDiTStateDictConverter(): def __init__(self): pass def from_diffusers(self, state_dict): state_dict_ = {} for name, param in state_dict.items(): name_ = name name_ = name_.replace(".default_modulation.", ".modulation.") name_ = name_.replace(".mlp.fc1.", ".mlp.0.") name_ = name_.replace(".mlp.fc2.", ".mlp.2.") name_ = name_.replace(".attn1.q_norm.", ".rota1.q_norm.") name_ = name_.replace(".attn2.q_norm.", ".rota2.q_norm.") name_ = name_.replace(".attn1.k_norm.", ".rota1.k_norm.") name_ = name_.replace(".attn2.k_norm.", ".rota2.k_norm.") name_ = name_.replace(".q_proj.", ".to_q.") name_ = name_.replace(".out_proj.", ".to_out.") name_ = name_.replace("text_embedding_padding", "text_emb_padding") name_ = name_.replace("mlp_t5.0.", "t5_embedder.0.") name_ = name_.replace("mlp_t5.2.", "t5_embedder.2.") name_ = name_.replace("pooler.", "t5_pooler.") name_ = name_.replace("x_embedder.", "patch_embedder.") name_ = name_.replace("t_embedder.", "timestep_embedder.") name_ = name_.replace("t5_pooler.to_q.", "t5_pooler.q_proj.") name_ = name_.replace("style_embedder.weight", "style_embedder") if ".kv_proj." in name_: param_k = param[:param.shape[0]//2] param_v = param[param.shape[0]//2:] state_dict_[name_.replace(".kv_proj.", ".to_k.")] = param_k state_dict_[name_.replace(".kv_proj.", ".to_v.")] = param_v elif ".Wqkv." in name_: param_q = param[:param.shape[0]//3] param_k = param[param.shape[0]//3:param.shape[0]//3*2] param_v = param[param.shape[0]//3*2:] state_dict_[name_.replace(".Wqkv.", ".to_q.")] = param_q state_dict_[name_.replace(".Wqkv.", ".to_k.")] = param_k state_dict_[name_.replace(".Wqkv.", ".to_v.")] = param_v elif "style_embedder" in name_: state_dict_[name_] = param.squeeze() else: state_dict_[name_] = param return state_dict_ def from_civitai(self, state_dict): return self.from_diffusers(state_dict)