WIP: refactoring

This commit is contained in:
wataru 2023-04-28 14:20:54 +09:00
parent b3d7946592
commit 6fcbd07065

View File

@ -1,6 +1,11 @@
import sys
import os
if sys.platform.startswith('darwin'):
from voice_changer.utils.LoadModelParams import LoadModelParams
from voice_changer.utils.VoiceChangerModel import AudioInOut
from voice_changer.utils.VoiceChangerParams import VoiceChangerParams
if sys.platform.startswith("darwin"):
baseDir = [x for x in sys.path if x.endswith("Contents/MacOS")]
if len(baseDir) != 1:
print("baseDir should be only one ", baseDir)
@ -12,25 +17,29 @@ else:
import io
from dataclasses import dataclass, asdict, field
from functools import reduce
import numpy as np
import torch
import onnxruntime
import pyworld as pw
from models import SynthesizerTrn
import cluster
from models import SynthesizerTrn # type:ignore
import cluster # type:ignore
import utils
from fairseq import checkpoint_utils
import librosa
from Exceptions import NoModeLoadedException
providers = ['OpenVINOExecutionProvider', "CUDAExecutionProvider", "DmlExecutionProvider", "CPUExecutionProvider"]
providers = [
"OpenVINOExecutionProvider",
"CUDAExecutionProvider",
"DmlExecutionProvider",
"CPUExecutionProvider",
]
@dataclass
class SoVitsSvc40v2Settings():
class SoVitsSvc40v2Settings:
gpu: int = 0
dstId: int = 0
@ -47,9 +56,7 @@ class SoVitsSvc40v2Settings():
onnxModelFile: str = ""
configFile: str = ""
speakers: dict[str, int] = field(
default_factory=lambda: {}
)
speakers: dict[str, int] = field(default_factory=lambda: {})
# ↓mutableな物だけ列挙
intData = ["gpu", "dstId", "tran", "predictF0", "extraConvertSize"]
@ -58,7 +65,9 @@ class SoVitsSvc40v2Settings():
class SoVitsSvc40v2:
def __init__(self, params):
audio_buffer: AudioInOut | None = None
def __init__(self, params: VoiceChangerParams):
self.settings = SoVitsSvc40v2Settings()
self.net_g = None
self.onnx_session = None
@ -69,23 +78,21 @@ class SoVitsSvc40v2:
self.params = params
print("so-vits-svc 40v2 initialization:", params)
def loadModel(self, props):
self.settings.configFile = props["files"]["configFilename"]
def loadModel(self, props: LoadModelParams):
self.settings.configFile = props.files.configFilename
self.hps = utils.get_hparams_from_file(self.settings.configFile)
self.settings.speakers = self.hps.spk
self.settings.pyTorchModelFile = props["files"]["pyTorchModelFilename"]
self.settings.onnxModelFile = props["files"]["onnxModelFilename"]
clusterTorchModel = props["files"]["clusterTorchModelFilename"]
self.settings.pyTorchModelFile = props.files.pyTorchModelFilename
self.settings.onnxModelFile = props.files.onnxModelFilename
clusterTorchModel = props.files.clusterTorchModelFilename
content_vec_path = self.params["content_vec_500"]
# content_vec_hubert_onnx_path = self.params["content_vec_500_onnx"]
# content_vec_hubert_onnx_on = self.params["content_vec_500_onnx_on"]
hubert_base_path = self.params["hubert_base"]
content_vec_path = self.params.content_vec_500
hubert_base_path = self.params.hubert_base
# hubert model
try:
if os.path.exists(content_vec_path) == False:
if os.path.exists(content_vec_path) is False:
content_vec_path = hubert_base_path
models, saved_cfg, task = checkpoint_utils.load_model_ensemble_and_task(
@ -100,7 +107,7 @@ class SoVitsSvc40v2:
# cluster
try:
if clusterTorchModel != None and os.path.exists(clusterTorchModel):
if clusterTorchModel is not None and os.path.exists(clusterTorchModel):
self.cluster_model = cluster.get_cluster_model(clusterTorchModel)
else:
self.cluster_model = None
@ -108,41 +115,50 @@ class SoVitsSvc40v2:
print("EXCEPTION during loading cluster model ", e)
# PyTorchモデル生成
if self.settings.pyTorchModelFile != None:
self.net_g = SynthesizerTrn(
self.hps
)
self.net_g.eval()
if self.settings.pyTorchModelFile is not None:
net_g = SynthesizerTrn(self.hps)
net_g.eval()
self.net_g = net_g
utils.load_checkpoint(self.settings.pyTorchModelFile, self.net_g, None)
# ONNXモデル生成
if self.settings.onnxModelFile != None:
if self.settings.onnxModelFile is not None:
ort_options = onnxruntime.SessionOptions()
ort_options.intra_op_num_threads = 8
self.onnx_session = onnxruntime.InferenceSession(
self.settings.onnxModelFile,
providers=providers
self.settings.onnxModelFile, providers=providers
)
input_info = self.onnx_session.get_inputs()
# input_info = self.onnx_session.get_inputs()
return self.get_info()
def update_settings(self, key: str, val: any):
if key == "onnxExecutionProvider" and self.onnx_session != None:
def update_settings(self, key: str, val: int | float | str):
if key == "onnxExecutionProvider" and self.onnx_session is not None:
if val == "CUDAExecutionProvider":
if self.settings.gpu < 0 or self.settings.gpu >= self.gpu_num:
self.settings.gpu = 0
provider_options = [{'device_id': self.settings.gpu}]
self.onnx_session.set_providers(providers=[val], provider_options=provider_options)
provider_options = [{"device_id": self.settings.gpu}]
self.onnx_session.set_providers(
providers=[val], provider_options=provider_options
)
else:
self.onnx_session.set_providers(providers=[val])
elif key in self.settings.intData:
setattr(self.settings, key, int(val))
if key == "gpu" and val >= 0 and val < self.gpu_num and self.onnx_session != None:
val = int(val)
setattr(self.settings, key, val)
if (
key == "gpu"
and val >= 0
and val < self.gpu_num
and self.onnx_session is not None
):
providers = self.onnx_session.get_providers()
print("Providers:", providers)
if "CUDAExecutionProvider" in providers:
provider_options = [{'device_id': self.settings.gpu}]
self.onnx_session.set_providers(providers=["CUDAExecutionProvider"], provider_options=provider_options)
provider_options = [{"device_id": self.settings.gpu}]
self.onnx_session.set_providers(
providers=["CUDAExecutionProvider"],
provider_options=provider_options,
)
elif key in self.settings.floatData:
setattr(self.settings, key, float(val))
elif key in self.settings.strData:
@ -155,10 +171,12 @@ class SoVitsSvc40v2:
def get_info(self):
data = asdict(self.settings)
data["onnxExecutionProviders"] = self.onnx_session.get_providers() if self.onnx_session != None else []
data["onnxExecutionProviders"] = (
self.onnx_session.get_providers() if self.onnx_session is not None else []
)
files = ["configFile", "pyTorchModelFile", "onnxModelFile"]
for f in files:
if data[f] != None and os.path.exists(data[f]):
if data[f] is not None and os.path.exists(data[f]):
data[f] = os.path.basename(data[f])
else:
data[f] = ""
@ -166,7 +184,7 @@ class SoVitsSvc40v2:
return data
def get_processing_sampling_rate(self):
if hasattr(self, "hps") == False:
if hasattr(self, "hps") is False:
raise NoModeLoadedException("config")
return self.hps.data.sampling_rate
@ -175,12 +193,22 @@ class SoVitsSvc40v2:
# f0 = utils.compute_f0_parselmouth(wav, sampling_rate=self.target_sample, hop_length=self.hop_size)
# f0 = utils.compute_f0_dio(wav_44k, sampling_rate=self.hps.data.sampling_rate, hop_length=self.hps.data.hop_length)
if self.settings.f0Detector == "dio":
f0 = compute_f0_dio(wav_44k, sampling_rate=self.hps.data.sampling_rate, hop_length=self.hps.data.hop_length)
f0 = compute_f0_dio(
wav_44k,
sampling_rate=self.hps.data.sampling_rate,
hop_length=self.hps.data.hop_length,
)
else:
f0 = compute_f0_harvest(wav_44k, sampling_rate=self.hps.data.sampling_rate, hop_length=self.hps.data.hop_length)
f0 = compute_f0_harvest(
wav_44k,
sampling_rate=self.hps.data.sampling_rate,
hop_length=self.hps.data.hop_length,
)
if wav_44k.shape[0] % self.hps.data.hop_length != 0:
print(f" !!! !!! !!! wav size not multiple of hopsize: {wav_44k.shape[0] / self.hps.data.hop_length}")
print(
f" !!! !!! !!! wav size not multiple of hopsize: {wav_44k.shape[0] / self.hps.data.hop_length}"
)
f0, uv = utils.interpolate_f0(f0)
f0 = torch.FloatTensor(f0)
@ -190,10 +218,14 @@ class SoVitsSvc40v2:
uv = uv.unsqueeze(0)
# wav16k = librosa.resample(audio_buffer, orig_sr=24000, target_sr=16000)
wav16k = librosa.resample(audio_buffer, orig_sr=self.hps.data.sampling_rate, target_sr=16000)
wav16k = librosa.resample(
audio_buffer, orig_sr=self.hps.data.sampling_rate, target_sr=16000
)
wav16k = torch.from_numpy(wav16k)
if (self.settings.gpu < 0 or self.gpu_num == 0) or self.settings.framework == "ONNX":
if (
self.settings.gpu < 0 or self.gpu_num == 0
) or self.settings.framework == "ONNX":
dev = torch.device("cpu")
else:
dev = torch.device("cuda", index=self.settings.gpu)
@ -206,37 +238,64 @@ class SoVitsSvc40v2:
c = utils.get_hubert_content(self.hubert_model, wav_16k_tensor=wav16k)
c = utils.repeat_expand_2d(c.squeeze(0), f0.shape[1])
if self.settings.clusterInferRatio != 0 and hasattr(self, "cluster_model") and self.cluster_model != None:
speaker = [key for key, value in self.settings.speakers.items() if value == self.settings.dstId]
if (
self.settings.clusterInferRatio != 0
and hasattr(self, "cluster_model")
and self.cluster_model is not None
):
speaker = [
key
for key, value in self.settings.speakers.items()
if value == self.settings.dstId
]
if len(speaker) != 1:
pass
# print("not only one speaker found.", speaker)
else:
cluster_c = cluster.get_cluster_center_result(self.cluster_model, c.cpu().numpy().T, speaker[0]).T
cluster_c = cluster.get_cluster_center_result(
self.cluster_model, c.cpu().numpy().T, speaker[0]
).T
# cluster_c = cluster.get_cluster_center_result(self.cluster_model, c.cpu().numpy().T, self.settings.dstId).T
cluster_c = torch.FloatTensor(cluster_c).to(dev)
# print("cluster DEVICE", cluster_c.device, c.device)
c = self.settings.clusterInferRatio * cluster_c + (1 - self.settings.clusterInferRatio) * c
c = (
self.settings.clusterInferRatio * cluster_c
+ (1 - self.settings.clusterInferRatio) * c
)
c = c.unsqueeze(0)
return c, f0, uv
def generate_input(self, newData: any, inputSize: int, crossfadeSize: int, solaSearchFrame: int = 0):
def generate_input(
self,
newData: AudioInOut,
inputSize: int,
crossfadeSize: int,
solaSearchFrame: int = 0,
):
newData = newData.astype(np.float32) / self.hps.data.max_wav_value
if hasattr(self, "audio_buffer"):
self.audio_buffer = np.concatenate([self.audio_buffer, newData], 0) # 過去のデータに連結
if self.audio_buffer is not None:
self.audio_buffer = np.concatenate(
[self.audio_buffer, newData], 0
) # 過去のデータに連結
else:
self.audio_buffer = newData
convertSize = inputSize + crossfadeSize + solaSearchFrame + self.settings.extraConvertSize
convertSize = (
inputSize + crossfadeSize + solaSearchFrame + self.settings.extraConvertSize
)
if convertSize % self.hps.data.hop_length != 0: # モデルの出力のホップサイズで切り捨てが発生するので補う。
convertSize = convertSize + (self.hps.data.hop_length - (convertSize % self.hps.data.hop_length))
convertSize = convertSize + (
self.hps.data.hop_length - (convertSize % self.hps.data.hop_length)
)
convertOffset = -1 * convertSize
self.audio_buffer = self.audio_buffer[convertOffset:] # 変換対象の部分だけ抽出
self.audio_buffer = self.audio_buffer[-1 * convertSize:] # 変換対象の部分だけ抽出
crop = self.audio_buffer[-1 * (inputSize + crossfadeSize):-1 * (crossfadeSize)]
cropOffset = -1 * (inputSize + crossfadeSize)
cropEnd = -1 * (crossfadeSize)
crop = self.audio_buffer[cropOffset:cropEnd]
rms = np.sqrt(np.square(crop).mean(axis=0))
vol = max(rms, self.prevVol * 0.0)
@ -246,30 +305,36 @@ class SoVitsSvc40v2:
return (c, f0, uv, convertSize, vol)
def _onnx_inference(self, data):
if hasattr(self, "onnx_session") == False or self.onnx_session == None:
if hasattr(self, "onnx_session") is False or self.onnx_session is None:
print("[Voice Changer] No onnx session.")
raise NoModeLoadedException("ONNX")
convertSize = data[3]
vol = data[4]
data = (data[0], data[1], data[2],)
data = (
data[0],
data[1],
data[2],
)
if vol < self.settings.silentThreshold:
return np.zeros(convertSize).astype(np.int16)
c, f0, uv = [x.numpy() for x in data]
audio1 = self.onnx_session.run(
["audio"],
{
"c": c,
"f0": f0,
"g": np.array([self.settings.dstId]).astype(np.int64),
"uv": np.array([self.settings.dstId]).astype(np.int64),
"predict_f0": np.array([self.settings.dstId]).astype(np.int64),
"noice_scale": np.array([self.settings.dstId]).astype(np.int64),
})[0][0, 0] * self.hps.data.max_wav_value
audio1 = (
self.onnx_session.run(
["audio"],
{
"c": c,
"f0": f0,
"g": np.array([self.settings.dstId]).astype(np.int64),
"uv": np.array([self.settings.dstId]).astype(np.int64),
"predict_f0": np.array([self.settings.dstId]).astype(np.int64),
"noice_scale": np.array([self.settings.dstId]).astype(np.int64),
},
)[0][0, 0]
* self.hps.data.max_wav_value
)
audio1 = audio1 * vol
@ -278,7 +343,7 @@ class SoVitsSvc40v2:
return result
def _pyTorch_inference(self, data):
if hasattr(self, "net_g") == False or self.net_g == None:
if hasattr(self, "net_g") is False or self.net_g is None:
print("[Voice Changer] No pyTorch session.")
raise NoModeLoadedException("pytorch")
@ -289,19 +354,29 @@ class SoVitsSvc40v2:
convertSize = data[3]
vol = data[4]
data = (data[0], data[1], data[2],)
data = (
data[0],
data[1],
data[2],
)
if vol < self.settings.silentThreshold:
return np.zeros(convertSize).astype(np.int16)
with torch.no_grad():
c, f0, uv = [x.to(dev)for x in data]
c, f0, uv = [x.to(dev) for x in data]
sid_target = torch.LongTensor([self.settings.dstId]).to(dev)
self.net_g.to(dev)
# audio1 = self.net_g.infer(c, f0=f0, g=sid_target, uv=uv, predict_f0=True, noice_scale=0.1)[0][0, 0].data.float()
predict_f0_flag = True if self.settings.predictF0 == 1 else False
audio1 = self.net_g.infer(c, f0=f0, g=sid_target, uv=uv, predict_f0=predict_f0_flag,
noice_scale=self.settings.noiseScale)[0][0, 0].data.float()
audio1 = self.net_g.infer(
c,
f0=f0,
g=sid_target,
uv=uv,
predict_f0=predict_f0_flag,
noice_scale=self.settings.noiseScale,
)[0][0, 0].data.float()
audio1 = audio1 * self.hps.data.max_wav_value
audio1 = audio1 * vol
@ -323,7 +398,7 @@ class SoVitsSvc40v2:
del self.onnx_session
remove_path = os.path.join("so-vits-svc-40v2")
sys.path = [x for x in sys.path if x.endswith(remove_path) == False]
sys.path = [x for x in sys.path if x.endswith(remove_path) is False]
for key in list(sys.modules):
val = sys.modules.get(key)
@ -332,14 +407,18 @@ class SoVitsSvc40v2:
if file_path.find("so-vits-svc-40v2" + os.path.sep) >= 0:
print("remove", key, file_path)
sys.modules.pop(key)
except Exception as e:
except: # type:ignore
pass
def resize_f0(x, target_len):
source = np.array(x)
source[source < 0.001] = np.nan
target = np.interp(np.arange(0, len(source) * target_len, len(source)) / target_len, np.arange(0, len(source)), source)
target = np.interp(
np.arange(0, len(source) * target_len, len(source)) / target_len,
np.arange(0, len(source)),
source,
)
res = np.nan_to_num(target)
return res
@ -362,7 +441,13 @@ def compute_f0_dio(wav_numpy, p_len=None, sampling_rate=44100, hop_length=512):
def compute_f0_harvest(wav_numpy, p_len=None, sampling_rate=44100, hop_length=512):
if p_len is None:
p_len = wav_numpy.shape[0] // hop_length
f0, t = pw.harvest(wav_numpy.astype(np.double), fs=sampling_rate, frame_period=5.5, f0_floor=71.0, f0_ceil=1000.0)
f0, t = pw.harvest(
wav_numpy.astype(np.double),
fs=sampling_rate,
frame_period=5.5,
f0_floor=71.0,
f0_ceil=1000.0,
)
for index, pitch in enumerate(f0):
f0[index] = round(pitch, 1)