我们先来看看f-AnoGAN的全称吧——f-AnoGAN: Fast unsupervised anomaly detection with generative adversarial networks。
如果你对我上文提到的三篇文章都有所了解的话,再来看这篇文章,你会发现它是真滴简单。🌼🌼🌼这就带大家一起来看看f-AnoGAN的网路架构。首先,我们先来看看f-AnoGAN的训练过程,训练主要分两步进行,第一步是训练一个生成对抗网络,第二步利用第一步生成对抗网络的权重,训练一个encoder编码器。我们直接来看下图好了:
在步骤①中,我们训练的是一个WGAN,在后文的代码实战中我也会谈谈这部分的内容。如果你对WGAN不了解的话,也不用太担心,这里你完全可以训练一个原始GAN,只是效果可能没有WGAN好罢了,但是对于理解f-AnoGAN的步骤是完全没用影响的。当WGAN训练完毕后,生成器G和判别器D的权重就会冻结,步骤②的G和D的权重不会发生变化。在步骤②中,我们目的是训练一个编码器E。论文中给出了三种训练E的结构,分别为ziz结构,izi结构和izif结构,我们一个个来看一下:
ziz结构
我们直接来看下图吧:
izi结构
同样的,我们直接看图:
izif结构
izif结构相比izi结构在后面加了一个判别器D,如下图所示:【论文最后选择了这个结构训练编码器E】
f-AnoGAN的训练过程就为大家介绍到这里了,是不是很简单呢。【如果你觉得有难度的话建议你看看我写在前面中提到的三篇博文,或者结合我下文的代码理解理解】
训练结束后,我们保存生成器G、判别器D和编码器E的权重,然后将它们用于缺陷检测中。缺陷检测就更加简单啦,异常得分函数就是我们上文所说的izif结构的损失函数,如下图所示:
这部分我在paperswithcode上看到了一个用pytorch实现的f-AnoGAN的代码:f-AnoGAN源码地址。🍵🍵🍵这个代码的逻辑非常清晰,所以我就以这个代码来为大家介绍f-AnoGAN的实现了。🍖🍖🍖
首先我们来看一下整个代码的结构,如下图所示:
我们需要注意一下,mnist
、mvtec_ad
和your_own_dataset
是针对不同数据集进行实验的。考虑到大家对mnist数据集相对熟悉,故本文以mnist数据集为例为大家介绍。【也就是说mvtec_ad和your_own_dataset文件夹下的文件都不会使用到,这里大家注意一下就好】
这部分定义在mnist文件夹下的tools.py中,首先我们获取MNIST数据集,通过torchvision下的datasets包直接下载即可,如下:
train = datasets.MNIST(path, train=True, download=download)
test = datasets.MNIST(path, train=False, download=download)
我们知道,minst数据集train中有60000条数据,test中有10000条数据。这些数据的targets为0-9,首先我们获取train中targets为0的数据,代码如下:
_x_train = train.data[train.targets == training_label] #传入的training_label为0
通过调试可以发现,_x_train的维度为(5923,28,28),即targets=0的数据一共有5923条。
接着我们将_x_train按照8:2的比列划分为训练集和测试集的一部分,代码如下:
x_train, x_test_normal = _x_train.split((int(len(_x_train) * split_rate)), dim=0) #传入的split_rate为0.8
运行后x_train有4738条数据,x_test_normal有1185条数据。
上文说到x_test_normal只是测试集的一部分,完整的测试数据集包括x_test_normal、train数据集中除去targets=0以外的其它数据和test中的所有数据,代码如下:
x_test = torch.cat([x_test_normal,
train.data[train.targets != training_label],
test.data], dim=0)
这样最终测试集的数据共有65262条。
上文我们获得了训练集和测试集的数据,我们还需要获取训练集和测试集的标签,代码如下:
_y_train = train.targets[train.targets == training_label]
y_train, y_test_normal = _y_train.split((int(len(_y_train) * split_rate)), dim=0)
y_test = torch.cat([y_test_normal,
train.targets[train.targets != training_label],
test.targets], dim=0)
同样,训练集的标签y_train有4738个,测试集的标签y_test有65262个。
有了数据后,我们对数据做一些预处理,然后用DataLoader加载数据集,代码如下:
train_mnist = SimpleDataset(x_train, y_train,
transform=transforms.Compose(
[transforms.ToPILImage(),
transforms.ToTensor(),
transforms.Normalize([0.5], [0.5])])
)
train_dataloader = DataLoader(train_mnist, batch_size=opt.batch_size,
shuffle=True)
class Generator(nn.Module):
def __init__(self, opt):
super().__init__()
self.img_shape = (opt.channels, opt.img_size, opt.img_size)
def block(in_feat, out_feat, normalize=True):
layers = [nn.Linear(in_feat, out_feat)]
if normalize:
layers.append(nn.BatchNorm1d(out_feat, 0.8))
layers.append(nn.LeakyReLU(0.2, inplace=True))
return layers
self.model = nn.Sequential(
*block(opt.latent_dim, 128, normalize=False),
*block(128, 256),
*block(256, 512),
*block(512, 1024),
nn.Linear(1024, int(np.prod(self.img_shape))),
nn.Tanh()
)
def forward(self, z):
img = self.model(z)
img = img.view(img.shape[0], *self.img_shape)
return img
class Discriminator(nn.Module):
def __init__(self, opt):
super().__init__()
img_shape = (opt.channels, opt.img_size, opt.img_size)
self.features = nn.Sequential(
nn.Linear(int(np.prod(img_shape)), 512),
nn.LeakyReLU(0.2, inplace=True),
nn.Linear(512, 256),
nn.LeakyReLU(0.2, inplace=True)
)
self.last_layer = nn.Sequential(
nn.Linear(256, 1)
)
def forward(self, img):
features = self.forward_features(img)
validity = self.last_layer(features)
return validity
def forward_features(self, img):
img_flat = img.view(img.shape[0], -1)
features = self.features(img_flat)
return features
class Encoder(nn.Module):
def __init__(self, opt):
super().__init__()
img_shape = (opt.channels, opt.img_size, opt.img_size)
self.model = nn.Sequential(
nn.Linear(int(np.prod(img_shape)), 512),
nn.LeakyReLU(0.2, inplace=True),
nn.Linear(512, 256),
nn.LeakyReLU(0.2, inplace=True),
nn.Linear(256, opt.latent_dim),
nn.Tanh()
)
def forward(self, img):
img_flat = img.view(img.shape[0], -1)
validity = self.model(img_flat)
return validity
由于是教学,所以搭建的模型很简单,甚至都没有卷积,都是全连接层,大家肯定一看就能明白。🌸🌸🌸
我们来看看训练WGAN的代码吧:
def train_wgangp(opt, generator, discriminator,
dataloader, device, lambda_gp=10):
generator.to(device)
discriminator.to(device)
optimizer_G = torch.optim.Adam(generator.parameters(),
lr=opt.lr, betas=(opt.b1, opt.b2))
optimizer_D = torch.optim.Adam(discriminator.parameters(),
lr=opt.lr, betas=(opt.b1, opt.b2))
os.makedirs("results/images", exist_ok=True)
padding_epoch = len(str(opt.n_epochs))
padding_i = len(str(len(dataloader)))
batches_done = 0
for epoch in range(opt.n_epochs):
for i, (imgs, _)in enumerate(dataloader):
# Configure input
real_imgs = imgs.to(device)
# ---------------------
# Train Discriminator
# ---------------------
optimizer_D.zero_grad()
# Sample noise as generator input
z = torch.randn(imgs.shape[0], opt.latent_dim, device=device)
# Generate a batch of images
fake_imgs = generator(z)
# Real images
real_validity = discriminator(real_imgs)
# Fake images
fake_validity = discriminator(fake_imgs.detach()) #使用.detach()方法可以不更新generator的值
# Gradient penalty
gradient_penalty = compute_gradient_penalty(discriminator,
real_imgs.data,
fake_imgs.data,
device)
# Adversarial loss
d_loss = (-torch.mean(real_validity) + torch.mean(fake_validity)
+ lambda_gp * gradient_penalty)
d_loss.backward()
optimizer_D.step()
optimizer_G.zero_grad()
# Train the generator and output log every n_critic steps
if i % opt.n_critic == 0:
# -----------------
# Train Generator
# -----------------
# Generate a batch of images
fake_imgs = generator(z)
# Loss measures generator's ability to fool the discriminator
# Train on fake images
fake_validity = discriminator(fake_imgs)
g_loss = -torch.mean(fake_validity)
g_loss.backward()
optimizer_G.step()
print(f"[Epoch {epoch:{padding_epoch}}/{opt.n_epochs}] "
f"[Batch {i:{padding_i}}/{len(dataloader)}] "
f"[D loss: {d_loss.item():3f}] "
f"[G loss: {g_loss.item():3f}]")
if batches_done % opt.sample_interval == 0:
save_image(fake_imgs.data[:25],
f"results/images/{batches_done:06}.png",
nrow=5, normalize=True)
batches_done += opt.n_critic
torch.save(generator.state_dict(), "results/generator")
torch.save(discriminator.state_dict(), "results/discriminator")
上述代码的核心是compute_gradient_penalty
函数,是用来计算梯度惩罚的,这也是WGAN-GP最核心的地方,代码如下:
def compute_gradient_penalty(D, real_samples, fake_samples, device):
"""Calculates the gradient penalty loss for WGAN GP"""
# Random weight term for interpolation between real and fake samples
alpha = torch.rand(*real_samples.shape[:2], 1, 1, device=device)
# Get random interpolation between real and fake samples
interpolates = (alpha * real_samples + (1 - alpha) * fake_samples)
# 可以直接对变量进行操作,现在pytorch已经舍弃autograd.Variable
interpolates.requires_grad_(requires_grad=True)
# interpolates = autograd.Variable(interpolates, requires_grad=True)
d_interpolates = D(interpolates)
fake = torch.ones(*d_interpolates.shape, device=device)
# Get gradient w.r.t. interpolates
# https://zhuanlan.zhihu.com/p/83172023
gradients = autograd.grad(outputs=d_interpolates, inputs=interpolates,
grad_outputs=fake, create_graph=True,
retain_graph=True, only_inputs=True)[0]
gradients = gradients.view(gradients.shape[0], -1)
gradient_penalty = ((gradients.norm(2, dim=1) - 1) ** 2).mean()
return gradient_penalty
注:想要理解这部分代码,需要理解pytorch中的auograd包
训练结束后,我们保存了生成器和判别器的权重,同时保存了一些生成图片结果,部分展示如下,是不是效果还不错呢。🍄🍄🍄【因为我们训练集图片都是0,所有我们生成的图片都是0喔!!!🍀🍀🍀】
话不多说,让我们直接上代码吧!!!🍭🍭🍭
def train_encoder_izif(opt, generator, discriminator, encoder,
dataloader, device, kappa=1.0):
generator.load_state_dict(torch.load("results/generator"))
discriminator.load_state_dict(torch.load("results/discriminator"))
generator.to(device).eval()
discriminator.to(device).eval()
encoder.to(device)
criterion = nn.MSELoss()
optimizer_E = torch.optim.Adam(encoder.parameters(),
lr=opt.lr, betas=(opt.b1, opt.b2))
os.makedirs("results/images_e", exist_ok=True)
padding_epoch = len(str(opt.n_epochs))
padding_i = len(str(len(dataloader)))
batches_done = 0
for epoch in range(opt.n_epochs):
for i, (imgs, _) in enumerate(dataloader):
# Configure input
real_imgs = imgs.to(device)
# ----------------
# Train Encoder
# ----------------
optimizer_E.zero_grad()
# Generate a batch of latent variables
z = encoder(real_imgs)
# Generate a batch of images
fake_imgs = generator(z)
# Real features
real_features = discriminator.forward_features(real_imgs)
# Fake features
fake_features = discriminator.forward_features(fake_imgs)
# izif architecture
loss_imgs = criterion(fake_imgs, real_imgs)
loss_features = criterion(fake_features, real_features)
e_loss = loss_imgs + kappa * loss_features
e_loss.backward()
optimizer_E.step()
# Output training log every n_critic steps
if i % opt.n_critic == 0:
print(f"[Epoch {epoch:{padding_epoch}}/{opt.n_epochs}] "
f"[Batch {i:{padding_i}}/{len(dataloader)}] "
f"[E loss: {e_loss.item():3f}]")
if batches_done % opt.sample_interval == 0:
fake_z = encoder(fake_imgs)
reconfiguration_imgs = generator(fake_z)
save_image(reconfiguration_imgs.data[:25],
f"results/images_e/{batches_done:06}.png",
nrow=5, normalize=True)
batches_done += opt.n_critic
torch.save(encoder.state_dict(), "results/encoder")
你会发现这些代码真滴很简单。训练结束后我们会保存编码器E的权重和重构后的一些图片。重构后图片效果也还是蛮好的。
我们将检测的异常得分保存在score.csv文件中,保存四项参数,分别为label、img_distance、anomaly_score和z_distance。
def test_anomaly_detection(opt, generator, discriminator, encoder,
dataloader, device, kappa=1.0):
generator.load_state_dict(torch.load("results/generator"))
discriminator.load_state_dict(torch.load("results/discriminator"))
encoder.load_state_dict(torch.load("results/encoder"))
generator.to(device).eval()
discriminator.to(device).eval()
encoder.to(device).eval()
criterion = nn.MSELoss()
with open("results/score.csv", "w") as f:
f.write("label,img_distance,anomaly_score,z_distance\n")
for (img, label) in tqdm(dataloader):
real_img = img.to(device)
real_z = encoder(real_img)
fake_img = generator(real_z)
fake_z = encoder(fake_img)
real_feature = discriminator.forward_features(real_img)
fake_feature = discriminator.forward_features(fake_img)
# Scores for anomaly detection
img_distance = criterion(fake_img, real_img)
loss_feature = criterion(fake_feature, real_feature)
anomaly_score = img_distance + kappa * loss_feature
z_distance = criterion(fake_z, real_z)
with open("results/score.csv", "a") as f:
f.write(f"{label.item()},{img_distance},"
f"{anomaly_score},{z_distance}\n")
在得到score.csv文件后,我们可以来读取文件内容绘制精度曲线。首先导入一些必要的包:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.metrics import roc_curve, precision_recall_curve, auc
然后读取刚刚得到的score.csv文件:
df = pd.read_csv("./results/score.csv")
df的内容如下:可以看到一共有65262行数据,这和我们数据读取时测试集数据大小是一致的。🥝🥝🥝
接着我们读取各列的数据,并把标签为0的标签设置为0,其它的设置为1.
trainig_label = 0
labels = np.where(df["label"].values == trainig_label, 0, 1)
anomaly_score = df["anomaly_score"].values
img_distance = df["img_distance"].values
z_distance = df["z_distance"].values
然后可以根据上面的值得到一些画图所需值:
fpr, tpr, _ = roc_curve(labels, img_distance)
precision, recall, _ = precision_recall_curve(labels, img_distance)
roc_auc = auc(fpr, tpr)
pr_auc = auc(recall, precision)
接下来就可以画图了:
plt.plot(fpr, tpr, label=f"AUC = {roc_auc:3f}")
plt.plot([0, 1], [0, 1], linestyle="--")
plt.title("ROC-AUC")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.legend()
plt.show()
plt.plot(recall, precision, label=f"PR = {pr_auc:3f}")
plt.title("PR-AUC")
plt.xlabel("Recall")
plt.ylabel("Pecision")
plt.legend()
plt.show()
plt.hist([anomaly_score[labels == 0], anomaly_score[labels == 1]],
bins=100, density=True, stacked=True,
label=["Normal", "Abnormal"])
plt.title("Discrete distributions of anomaly scores")
plt.xlabel("Anomaly scores A(x)")
plt.ylabel("h")
plt.legend()
plt.show()
代码中还定义了保存原图和生成图差异的图像,即将真实图像和生成图像做差,看看它们的差异,代码很简单,我们来看看:
def save_compared_images(opt, generator, encoder, dataloader, device):
generator.load_state_dict(torch.load("results/generator"))
encoder.load_state_dict(torch.load("results/encoder"))
generator.to(device).eval()
encoder.to(device).eval()
os.makedirs("results/images_diff", exist_ok=True)
for i, (img, label) in enumerate(dataloader):
real_img = img.to(device)
real_z = encoder(real_img)
fake_img = generator(real_z)
compared_images = torch.empty(real_img.shape[0] * 3,
*real_img.shape[1:])
compared_images[0::3] = real_img
compared_images[1::3] = fake_img
compared_images[2::3] = real_img - fake_img
save_image(compared_images.data,
f"results/images_diff/{opt.n_grid_lines*(i+1):06}.png",
nrow=3, normalize=True)
if opt.n_iters is not None and opt.n_iters == i:
break
我也抽取一张保存的图像来给大家看看结果:
通过上图可以发现,无论原始输入即原图是什么,生成图都会将其生成0,原图和生成图做差后得到的图片因此也会产生不同的差异。🍊🍊🍊
f-AnoGAN就为大家介绍到这里了,其实你细细摸索下来会觉得非常简单。代码部分大家要勤动手,多调试,这样你会有不一样的收获。🌾🌾🌾
阅读量:1242
点赞量:0
收藏量:0