""" att_uncontrol9_adam以及之前的都是用这个 """ import numpy as np import torch import torch.nn as nn import math class SubPixelConv(nn.Module): def __init__(self, in_channels, out_channels, kernel_size, stride=1, scale_factor=2): super(SubPixelConv, self).__init__() self.conv = nn.Conv2d(in_channels, out_channels * scale_factor ** 2, kernel_size, stride, padding=kernel_size // 2) self.pixel_shuffle = nn.PixelShuffle(scale_factor) def forward(self, x): x = self.conv(x) x = self.pixel_shuffle(x) return x class Swish(nn.Module): def __init__(self): super(Swish, self).__init__() def forward(self, x): # swish return x * torch.sigmoid(x) def zero_module(module): """ Zero out the parameters of a module and return it. """ for p in module.parameters(): p.detach().zero_() return module class AttentionBlock(nn.Module): """ An attention block that allows spatial positions to attend to each other. Originally ported from here, but adapted to the N-d case. https://github.com/hojonathanho/diffusion/blob/1e0dceb3b3495bbe19116a5e1b3596cd0706c543/diffusion_tf/models/unet.py#L66. """ def __init__(self, channels, num_heads=-1, use_checkpoint=False): super().__init__() self.channels = channels self.num_heads = num_heads if num_heads != -1 else min(channels // 32, 8) self.use_checkpoint = use_checkpoint self.norm = nn.GroupNorm(16, channels, eps=1e-6) self.qkv = nn.Conv1d(channels, channels * 3, 1) self.attention = QKVAttention() self.proj_out = zero_module(nn.Conv1d(channels, channels, 1)) def forward(self, x): b, c, *spatial = x.shape x = x.reshape(b, c, -1) qkv = self.qkv(self.norm(x)) qkv = qkv.reshape(b * self.num_heads, -1, qkv.shape[2]) h = self.attention(qkv) h = h.reshape(b, -1, h.shape[-1]) h = self.proj_out(h) return (x + h).reshape(b, c, *spatial) class QKVAttention(nn.Module): """ A module which performs QKV attention. """ def forward(self, qkv): """ Apply QKV attention. :param qkv: an [N x (C * 3) x T] tensor of Qs, Ks, and Vs. :return: an [N x C x T] tensor after attention. """ ch = qkv.shape[1] // 3 q, k, v = torch.split(qkv, ch, dim=1) scale = 1 / math.sqrt(math.sqrt(ch)) weight = torch.einsum( "bct,bcs->bts", q * scale, k * scale ) # More stable with f16 than dividing afterwards weight = torch.softmax(weight.float(), dim=-1).type(weight.dtype) return torch.einsum("bts,bcs->bct", weight, v) @staticmethod def count_flops(model, _x, y): """ A counter for the `thop` package to count the operations in an attention operation. Meant to be used like: macs, params = thop.profile( model, inputs=(inputs, timestamps), custom_ops={QKVAttention: QKVAttention.count_flops}, ) """ b, c, *spatial = y[0].shape num_spatial = int(np.prod(spatial)) # We perform two matmuls with the same number of ops. # The first computes the weight matrix, the second computes # the combination of the value vectors. matmul_ops = 2 * b * (num_spatial ** 2) * c model.total_ops += torch.DoubleTensor([matmul_ops]) # ==================================================================== class TEncoder(nn.Module): def __init__(self, out_c=256, scale=30.): super(TEncoder, self).__init__() # 随机映射 self.out_c = out_c self.W = nn.Parameter(torch.randn(out_c // 2) * scale, requires_grad=False) self.linear = nn.Sequential(nn.Linear(out_c, out_c), Swish(), nn.Linear(out_c, out_c), ) def timestep_embedding(self, timesteps, max_period=10000): """ Create sinusoidal timestep embeddings. :param timesteps: a 1-D Tensor of N indices, one per batch element. These may be fractional. :param dim: the dimension of the output. :param max_period: controls the minimum frequency of the embeddings. :return: an [N x dim] Tensor of positional embeddings. """ half = self.out_c // 2 freqs = torch.exp( -math.log(max_period) * torch.arange(start=0, end=half, dtype=torch.float32) / half ).to(device=timesteps.device) args = timesteps[:, None].float() * freqs[None] embedding = torch.cat([torch.cos(args), torch.sin(args)], dim=-1) if self.out_c % 2: embedding = torch.cat([embedding, torch.zeros_like(embedding[:, :1])], dim=-1) return embedding def forward(self, t): # t_proj = t * self.W[None, :] * 2 * np.pi # t_proj = torch.cat((torch.sin(t_proj), torch.cos(t_proj)), dim=-1) t_proj = self.timestep_embedding(t)[:, 0, :] encoded_t = self.linear(t_proj) return encoded_t class EncoderBlock(nn.Module): def __init__(self, in_c, out_c, kernel_size, stride, t_in_c, att_num_head=-1, block_deep=4): super(EncoderBlock, self).__init__() self.in_c = in_c self.out_c = out_c self.stride = stride self.model_list_len = block_deep # 一个block有多少次卷积 padding = kernel_size // 2 self.model_list = nn.ModuleList() self.model_list.append(nn.Sequential( nn.Conv2d(in_c, out_c, kernel_size=kernel_size, stride=stride, padding=padding), nn.GroupNorm(16, out_c, eps=1e-6), Swish())) if att_num_head != 0: # stride == 1 self.att_block = AttentionBlock(out_c, num_heads=att_num_head) else: self.att_block = nn.Identity() for _ in range(self.model_list_len - 2): # -2是减一头一尾 self.model_list.append( nn.Sequential( nn.Conv2d(out_c, out_c, kernel_size=kernel_size, stride=1, padding=padding), nn.GroupNorm(16, out_c, eps=1e-6), Swish(), )) self.model_list.append( nn.Sequential( nn.Conv2d(out_c, out_c, kernel_size=kernel_size, stride=1, padding=padding), nn.GroupNorm(16, out_c, eps=1e-6), )) # 编码时间t self.encode_t = nn.ModuleList( [nn.Linear(t_in_c, out_c) for _ in range(len(self.model_list) - 1)]) if self.in_c != self.out_c or self.stride != 1: self.conv_skip = nn.Conv2d(in_c, out_c, kernel_size=1, stride=stride, padding=0) else: self.conv_skip = nn.Identity() self.act_skip = Swish() def forward(self, x, t): skip = self.conv_skip(x) for i, layer in enumerate(self.model_list): x = layer(x) if i == 0: x = self.att_block(x) if i < self.model_list_len - 1: t_ = self.encode_t[i](t) # t_ = torch.tile(t[:, :, None, None], dims=[1, 1, x.shape[2], x.shape[3]]) t_ = t_[:, :, None, None] x = x + t_ return self.act_skip(x + skip) class DecoderBlock(nn.Module): def __init__(self, in_c, out_c, kernel_size, upsample="none", t_in_c=256, att_num_head=-1, block_deep=4): super(DecoderBlock, self).__init__() self.in_c = in_c self.out_c = out_c self.model_list_len = block_deep # 一个block有多少次卷积 self.model_list = nn.ModuleList() if upsample == "subpix": self.model_list.append(nn.Sequential( SubPixelConv(in_c, out_c, kernel_size=3), nn.GroupNorm(16, out_c, eps=1e-6), Swish() )) self.upsample = SubPixelConv(in_c, in_c, kernel_size=3) elif upsample == "convt": self.model_list.append(nn.Sequential( nn.ConvTranspose2d(in_c, out_c, kernel_size=4, stride=2, padding=1), nn.GroupNorm(16, out_c, eps=1e-6), Swish() )) self.upsample = nn.ConvTranspose2d(in_c, in_c, kernel_size=4, stride=2, padding=1) else: self.model_list.append(nn.Sequential( nn.Conv2d(in_c, out_c, kernel_size=kernel_size, stride=1, padding=kernel_size // 2), nn.GroupNorm(16, out_c, eps=1e-6), Swish() )) self.upsample = nn.Identity() if att_num_head != 0: # upsample != "none" self.att_block = AttentionBlock(out_c, num_heads=att_num_head) else: self.att_block = nn.Identity() for _ in range(self.model_list_len - 2): self.model_list.append(nn.Sequential(nn.Conv2d(out_c, out_c, kernel_size=kernel_size, stride=1, padding=kernel_size // 2), nn.GroupNorm(16, out_c, eps=1e-6), Swish())) self.model_list.append(nn.Sequential(nn.Conv2d(out_c, out_c, kernel_size=kernel_size, stride=1, padding=kernel_size // 2), nn.GroupNorm(16, out_c, eps=1e-6))) # 编码时间t self.encode_t = nn.ModuleList([nn.Linear(t_in_c, out_c) for _ in range(len(self.model_list) - 1)]) self.conv_skip = nn.Conv2d(in_c, out_c, kernel_size=1, stride=1, padding=0) self.act_skip = Swish() def forward(self, x, t): skip = self.upsample(x) skip = self.conv_skip(skip) for i, layer in enumerate(self.model_list): x = layer(x) if i == 0: x = self.att_block(x) if i < self.model_list_len - 1: t_ = self.encode_t[i](t) # t_ = torch.tile(t[:, :, None, None], dims=[1, 1, x.shape[2], x.shape[3]]) t_ = t_[:, :, None, None] x = x + t_ return self.act_skip(x + skip) class Encoder(nn.Module): def __init__(self, model_in_c=8, out_cs=(64, 64, 128, 128, 256, 256, 512, 512), down_sample=(0, 0, 1, 0, 1, 0, 1, 0), skip_out=(0, 1, 0, 1, 0, 1, 0, 1), att_num_heads=(-1, -1, -1, -1, -1, -1, -1, -1), t_in_c=256, block_deep=4): """ :param out_cs: 每一个块输出的尺寸 :param down_sample: 是否下采样 :param skip_out: unet的条连 """ super(Encoder, self).__init__() self.skip_out = skip_out self.model_list = nn.ModuleList() for i, (out_c, down, att_num_head) in enumerate(zip(out_cs, down_sample, att_num_heads)): in_c = model_in_c if i == 0 else out_cs[i - 1] self.model_list.append( EncoderBlock(in_c, out_cs[i], kernel_size=3, stride=down + 1, t_in_c=t_in_c, att_num_head=att_num_head, block_deep=block_deep)) def forward(self, x, t): res_x = [] for i, layer in enumerate(self.model_list): x = layer(x, t) if self.skip_out[i] == 1: res_x.append(x) return res_x class Decoder(nn.Module): def __init__(self, in_c, model_out_c=8, out_cs=(512, 256, 256, 128, 128, 64, 64, 32), up_sample=("none", "convt", "none", "subpix", "none", "subpix", "none", "none"), skip_out=(1, 0, 1, 0, 1, 0, 1, 0), att_num_heads=(-1, -1, -1, -1, -1, -1, -1, -1), t_in_c=256, block_deep=4): """ :param out_cs: 每一个块输出的尺寸 :param up_sample: 上采样方法,none是不进行上采样 :param skip_out: unet的跳连 """ super(Decoder, self).__init__() self.skip_out = skip_out self.model_list = nn.ModuleList() for i, (out_c, up, att_num_head) in enumerate(zip(out_cs, up_sample, att_num_heads)): if self.skip_out[i] == 1 and i > 0: in_c *= 2 self.model_list.append( DecoderBlock(in_c, out_cs[i], kernel_size=3, upsample=up, t_in_c=t_in_c, att_num_head=att_num_head, block_deep=block_deep)) in_c = out_cs[i] self.Conv1 = nn.Conv2d(out_cs[-1], model_out_c, kernel_size=1, stride=1, padding=0) def forward(self, x, t): x_list = x # print([xx.shape for xx in x_list]) x = None for i, layer in enumerate(self.model_list): if self.skip_out[i] == 1: # print("skip_x:", x_list[-1].shape) if i == 0: x = x_list.pop() else: x = torch.cat([x, x_list.pop()], dim=1) # print("x:", x.shape) x = layer(x, t) x = self.Conv1(x) return x class UNet(nn.Module): def __init__(self, en_out_c, en_down, en_skip, en_att_heads, de_out_c, de_up, de_skip, de_att_heads, t_out_c, vae_c=8, block_deep=4): """ :param en_out_c: encoder参数 :param en_down: :param en_skip: :param de_out_c: decoder参数 :param de_up: :param de_skip: """ super(UNet, self).__init__() self.encoder = Encoder(model_in_c=vae_c, out_cs=en_out_c, down_sample=en_down, skip_out=en_skip, att_num_heads=en_att_heads, t_in_c=t_out_c, block_deep=block_deep) self.decoder = Decoder(in_c=en_out_c[-1], model_out_c=vae_c, out_cs=de_out_c, up_sample=de_up, skip_out=de_skip, att_num_heads=de_att_heads, t_in_c=t_out_c, block_deep=block_deep) self.t_encoder = TEncoder(t_out_c) def forward(self, x, t): t = self.t_encoder(t) # print("encoded_t:", torch.mean(t), torch.std(t)) # print("t:", t.shape) encoder_out = self.encoder(x, t) # print("encode:") # for e in encoder_out: # print(e.shape) decoder_out = self.decoder(encoder_out, t) # print("decoder:") # print(decoder_out.shape) return decoder_out if __name__ == '__main__': import cv2, os def modelSave(model, save_path, save_name): if not os.path.exists(save_path): os.mkdir(save_path) torch.save(model.state_dict(), os.path.join(save_path, save_name)) def merge_images(images: np.ndarray): """ 合并图像 :param images: 图像数组 :return: 合并后的图像数组 """ n, h, w, c = images.shape nn = int(np.ceil(n ** 0.5)) merged_image = np.zeros((h * nn, w * nn, 3), dtype=images.dtype) for i in range(n): row = i // nn col = i % nn merged_image[row * h:(row + 1) * h, col * w:(col + 1) * w, :] = images[i] merged_image = np.clip(merged_image, 0, 255) merged_image = np.array(merged_image, dtype=np.uint8) return merged_image # 320,448,576,832 config = { # 模型结构相关 "en_out_c": (256, 256, 256, 320, 320, 320, 576, 576, 576, 704, 704, 704), "en_down": (0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0), "en_skip": (0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1), "en_att_heads": (8, 8, 8, 0, 8, 8, 0, 8, 8, 0, 8, 8), "de_out_c": (704, 576, 576, 576, 320, 320, 320, 256, 256, 256, 256), "de_up": ("none", "subpix", "none", "none", "subpix", "none", "none", "subpix", "none", "none", "none"), "de_skip": (1, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0), "de_att_heads": (8, 8, 0, 8, 8, 0, 8, 8, 0, 8, 8), # skip的地方不做self-attention "t_out_c": 256, "vae_c": 4, "block_deep": 3, } device = "cuda" total_step = 1000 unet = UNet(config["en_out_c"], config["en_down"], config["en_skip"], config["en_att_heads"], config["de_out_c"], config["de_up"], config["de_skip"], config["de_att_heads"], config["t_out_c"], config["vae_c"], config["block_deep"]).to(device) print("总参数", sum(i.numel() for i in unet.parameters()) / 10000, "单位:万") print("encoder", sum(i.numel() for i in unet.encoder.parameters()) / 10000, "单位:万") print("decoder", sum(i.numel() for i in unet.decoder.parameters()) / 10000, "单位:万") print("t", sum(i.numel() for i in unet.t_encoder.parameters()) / 10000, "单位:万") batch_size = 2 x = np.random.random((batch_size, config["vae_c"], 32, 32)) t = np.random.uniform(1, total_step + 0.9999, size=(batch_size, 1)) t = np.array(t, dtype=np.int16) t = t / total_step with torch.no_grad(): x = torch.Tensor(x).to(device) t = torch.Tensor(t).to(device) y = unet(x, t) print(y.shape) z = y[0].cpu().numpy() # z = (z - np.mean(z)) / (np.max(z) - np.min(z)) z = np.clip(np.asarray((z + 1) * 127.5), 0, 255) z = np.asarray(z, dtype=np.uint8) z = [np.tile(z[ii, :, :, np.newaxis], (1, 1, 3)) for ii in range(z.shape[0])] noise = merge_images(np.array(z)) noise = cv2.resize(noise, None, fx=2, fy=2) cv2.imshow("noise", noise) cv2.waitKey(0) # modelSave(unet, "./", "test.pth") # 导出为onnx格式 torch.onnx.export( unet, (x, t), 'unet.onnx', export_params=True, opset_version=12, ) import onnx # 增加维度信息 model_file = 'unet.onnx' onnx_model = onnx.load(model_file) onnx.save(onnx.shape_inference.infer_shapes(onnx_model), model_file)