bark-voice-cloning / hubert /customtokenizer.py
Mylo
Initial commit
9449f27
raw
history blame
No virus
6.31 kB
import json
import os.path
from zipfile import ZipFile
import numpy
import torch
from torch import nn, optim
from torch.serialization import MAP_LOCATION
class CustomTokenizer(nn.Module):
def __init__(self, hidden_size=1024, input_size=768, output_size=10000, version=0):
super(CustomTokenizer, self).__init__()
next_size = input_size
if version == 0:
self.lstm = nn.LSTM(input_size, hidden_size, 2, batch_first=True)
next_size = hidden_size
if version == 1:
self.lstm = nn.LSTM(input_size, hidden_size, 2, batch_first=True)
self.intermediate = nn.Linear(hidden_size, 4096)
next_size = 4096
self.fc = nn.Linear(next_size, output_size)
self.softmax = nn.LogSoftmax(dim=1)
self.optimizer: optim.Optimizer = None
self.lossfunc = nn.CrossEntropyLoss()
self.input_size = input_size
self.hidden_size = hidden_size
self.output_size = output_size
self.version = version
def forward(self, x):
x, _ = self.lstm(x)
if self.version == 1:
x = self.intermediate(x)
x = self.fc(x)
x = self.softmax(x)
return x
@torch.no_grad()
def get_token(self, x):
"""
Used to get the token for the first
:param x: An array with shape (N, input_size) where N is a whole number greater or equal to 1, and input_size is the input size used when creating the model.
:return: An array with shape (N,) where N is the same as N from the input. Every number in the array is a whole number in range 0...output_size - 1 where output_size is the output size used when creating the model.
"""
return torch.argmax(self(x), dim=1)
def prepare_training(self):
self.optimizer = optim.Adam(self.parameters(), 0.001)
def train_step(self, x_train, y_train, log_loss=False):
# y_train = y_train[:-1]
# y_train = y_train[1:]
optimizer = self.optimizer
lossfunc = self.lossfunc
# Zero the gradients
self.zero_grad()
# Forward pass
y_pred = self(x_train)
y_train_len = len(y_train)
y_pred_len = y_pred.shape[0]
if y_train_len > y_pred_len:
diff = y_train_len - y_pred_len
y_train = y_train[diff:]
elif y_train_len < y_pred_len:
diff = y_pred_len - y_train_len
y_pred = y_pred[:-diff, :]
y_train_hot = torch.zeros(len(y_train), self.output_size)
y_train_hot[range(len(y_train)), y_train] = 1
y_train_hot = y_train_hot.to('cuda')
# Calculate the loss
loss = lossfunc(y_pred, y_train_hot)
# Print loss
if log_loss:
print('Loss', loss.item())
# Backward pass
loss.backward()
# Update the weights
optimizer.step()
def save(self, path):
info_path = os.path.basename(path) + '/.info'
torch.save(self.state_dict(), path)
data_from_model = Data(self.input_size, self.hidden_size, self.output_size, self.version)
with ZipFile(path, 'a') as model_zip:
model_zip.writestr(info_path, data_from_model.save())
model_zip.close()
@staticmethod
def load_from_checkpoint(path, map_location: MAP_LOCATION = None):
old = True
with ZipFile(path) as model_zip:
filesMatch = [file for file in model_zip.namelist() if file.endswith('/.info')]
file = filesMatch[0] if filesMatch else None
if file:
old = False
data_from_model = Data.load(model_zip.read(file).decode('utf-8'))
model_zip.close()
if old:
model = CustomTokenizer()
else:
model = CustomTokenizer(data_from_model.hidden_size, data_from_model.input_size, data_from_model.output_size, data_from_model.version)
model.load_state_dict(torch.load(path, map_location))
return model
class Data:
input_size: int
hidden_size: int
output_size: int
version: int
def __init__(self, input_size=768, hidden_size=1024, output_size=10000, version=0):
self.input_size = input_size
self.hidden_size = hidden_size
self.output_size = output_size
self.version = version
@staticmethod
def load(string):
data = json.loads(string)
return Data(data['input_size'], data['hidden_size'], data['output_size'], data['version'])
def save(self):
data = {
'input_size': self.input_size,
'hidden_size': self.hidden_size,
'output_size': self.output_size,
'version': self.version,
}
return json.dumps(data)
def auto_train(data_path, save_path='model.pth', load_model: str | None = None, save_epochs=1):
data_x, data_y = [], []
if load_model and os.path.isfile(load_model):
print('Loading model from', load_model)
model_training = CustomTokenizer.load_from_checkpoint(load_model, 'cuda')
else:
print('Creating new model.')
model_training = CustomTokenizer(version=1).to('cuda') # Settings for the model to run without lstm
save_path = os.path.join(data_path, save_path)
base_save_path = '.'.join(save_path.split('.')[:-1])
sem_string = '_semantic.npy'
feat_string = '_semantic_features.npy'
ready = os.path.join(data_path, 'ready')
for input_file in os.listdir(ready):
full_path = os.path.join(ready, input_file)
if input_file.endswith(sem_string):
data_y.append(numpy.load(full_path))
elif input_file.endswith(feat_string):
data_x.append(numpy.load(full_path))
model_training.prepare_training()
epoch = 1
while 1:
for i in range(save_epochs):
j = 0
for x, y in zip(data_x, data_y):
model_training.train_step(torch.tensor(x).to('cuda'), torch.tensor(y).to('cuda'), j % 50 == 0) # Print loss every 50 steps
j += 1
save_p = save_path
save_p_2 = f'{base_save_path}_epoch_{epoch}.pth'
model_training.save(save_p)
model_training.save(save_p_2)
print(f'Epoch {epoch} completed')
epoch += 1