WIP: refactoring

This commit is contained in:
wataru 2023-04-28 14:49:17 +09:00
parent 6fcbd07065
commit 96d7141976

View File

@ -1,6 +1,11 @@
import sys
import os
if sys.platform.startswith('darwin'):
from voice_changer.utils.LoadModelParams import LoadModelParams
from voice_changer.utils.VoiceChangerModel import AudioInOut
from voice_changer.utils.VoiceChangerParams import VoiceChangerParams
if sys.platform.startswith("darwin"):
baseDir = [x for x in sys.path if x.endswith("Contents/MacOS")]
if len(baseDir) != 1:
print("baseDir should be only one ", baseDir)
@ -10,24 +15,25 @@ if sys.platform.startswith('darwin'):
else:
sys.path.append("DDSP-SVC")
import io
from dataclasses import dataclass, asdict, field
from functools import reduce
import numpy as np
import torch
import onnxruntime
import pyworld as pw
import ddsp.vocoder as vo
from ddsp.core import upsample
from enhancer import Enhancer
import ddsp.vocoder as vo # type:ignore
from ddsp.core import upsample # type:ignore
from enhancer import Enhancer # type:ignore
from Exceptions import NoModeLoadedException
providers = ['OpenVINOExecutionProvider', "CUDAExecutionProvider", "DmlExecutionProvider", "CPUExecutionProvider"]
providers = [
"OpenVINOExecutionProvider",
"CUDAExecutionProvider",
"DmlExecutionProvider",
"CPUExecutionProvider",
]
@dataclass
class DDSP_SVCSettings():
class DDSP_SVCSettings:
gpu: int = 0
dstId: int = 0
@ -45,18 +51,26 @@ class DDSP_SVCSettings():
onnxModelFile: str = ""
configFile: str = ""
speakers: dict[str, int] = field(
default_factory=lambda: {}
)
speakers: dict[str, int] = field(default_factory=lambda: {})
# ↓mutableな物だけ列挙
intData = ["gpu", "dstId", "tran", "predictF0", "extraConvertSize", "enableEnhancer", "enhancerTune"]
intData = [
"gpu",
"dstId",
"tran",
"predictF0",
"extraConvertSize",
"enableEnhancer",
"enhancerTune",
]
floatData = ["silentThreshold", "clusterInferRatio"]
strData = ["framework", "f0Detector"]
class DDSP_SVC:
def __init__(self, params):
audio_buffer: AudioInOut | None = None
def __init__(self, params: VoiceChangerParams):
self.settings = DDSP_SVCSettings()
self.net_g = None
self.onnx_session = None
@ -72,24 +86,30 @@ class DDSP_SVC:
else:
return torch.device("cpu")
def loadModel(self, props):
# self.settings.configFile = props["files"]["configFilename"] # 同じフォルダにあるyamlを使う
self.settings.pyTorchModelFile = props["files"]["pyTorchModelFilename"]
def loadModel(self, props: LoadModelParams):
self.settings.pyTorchModelFile = props.files.pyTorchModelFilename
# model
model, args = vo.load_model(self.settings.pyTorchModelFile, device=self.useDevice())
model, args = vo.load_model(
self.settings.pyTorchModelFile, device=self.useDevice()
)
self.model = model
self.args = args
self.sampling_rate = args.data.sampling_rate
self.hop_size = int(self.args.data.block_size * self.sampling_rate / self.args.data.sampling_rate)
self.hop_size = int(
self.args.data.block_size
* self.sampling_rate
/ self.args.data.sampling_rate
)
# hubert
self.vec_path = self.params["hubert_soft"]
self.vec_path = self.params.hubert_soft
self.encoder = vo.Units_Encoder(
self.args.data.encoder,
self.vec_path,
self.args.data.encoder_sample_rate,
self.args.data.encoder_hop_size,
device=self.useDevice())
device=self.useDevice(),
)
# ort_options = onnxruntime.SessionOptions()
# ort_options.intra_op_num_threads = 8
@ -111,36 +131,59 @@ class DDSP_SVC:
self.sampling_rate,
self.hop_size,
float(50),
float(1100))
float(1100),
)
self.volume_extractor = vo.Volume_Extractor(self.hop_size)
self.enhancer_path = self.params["nsf_hifigan"]
self.enhancer = Enhancer(self.args.enhancer.type, self.enhancer_path, device=self.useDevice())
self.enhancer_path = self.params.nsf_hifigan
self.enhancer = Enhancer(
self.args.enhancer.type, self.enhancer_path, device=self.useDevice()
)
return self.get_info()
def update_settings(self, key: str, val: any):
if key == "onnxExecutionProvider" and self.onnx_session != None:
def update_settings(self, key: str, val: int | float | str):
if key == "onnxExecutionProvider" and self.onnx_session is not None:
if val == "CUDAExecutionProvider":
if self.settings.gpu < 0 or self.settings.gpu >= self.gpu_num:
self.settings.gpu = 0
provider_options = [{'device_id': self.settings.gpu}]
self.onnx_session.set_providers(providers=[val], provider_options=provider_options)
provider_options = [{"device_id": self.settings.gpu}]
self.onnx_session.set_providers(
providers=[val], provider_options=provider_options
)
else:
self.onnx_session.set_providers(providers=[val])
elif key in self.settings.intData:
setattr(self.settings, key, int(val))
if key == "gpu" and val >= 0 and val < self.gpu_num and self.onnx_session != None:
val = int(val)
setattr(self.settings, key, val)
if (
key == "gpu"
and val >= 0
and val < self.gpu_num
and self.onnx_session is not None
):
providers = self.onnx_session.get_providers()
print("Providers:", providers)
if "CUDAExecutionProvider" in providers:
provider_options = [{'device_id': self.settings.gpu}]
self.onnx_session.set_providers(providers=["CUDAExecutionProvider"], provider_options=provider_options)
provider_options = [{"device_id": self.settings.gpu}]
self.onnx_session.set_providers(
providers=["CUDAExecutionProvider"],
provider_options=provider_options,
)
if key == "gpu" and len(self.settings.pyTorchModelFile) > 0:
model, _args = vo.load_model(self.settings.pyTorchModelFile, device=self.useDevice())
model, _args = vo.load_model(
self.settings.pyTorchModelFile, device=self.useDevice()
)
self.model = model
self.enhancer = Enhancer(self.args.enhancer.type, self.enhancer_path, device=self.useDevice())
self.encoder = vo.Units_Encoder(self.args.data.encoder, self.vec_path, self.args.data.encoder_sample_rate,
self.args.data.encoder_hop_size, device=self.useDevice())
self.enhancer = Enhancer(
self.args.enhancer.type, self.enhancer_path, device=self.useDevice()
)
self.encoder = vo.Units_Encoder(
self.args.data.encoder,
self.vec_path,
self.args.data.encoder_sample_rate,
self.args.data.encoder_hop_size,
device=self.useDevice(),
)
elif key in self.settings.floatData:
setattr(self.settings, key, float(val))
@ -151,16 +194,13 @@ class DDSP_SVC:
if val == "dio":
val = "parselmouth"
if hasattr(self, "sampling_rate") == False:
if hasattr(self, "sampling_rate") is False:
self.sampling_rate = 44100
self.hop_size = 512
self.f0_detector = vo.F0_Extractor(
val,
self.sampling_rate,
self.hop_size,
float(50),
float(1100))
val, self.sampling_rate, self.hop_size, float(50), float(1100)
)
else:
return False
@ -169,10 +209,12 @@ class DDSP_SVC:
def get_info(self):
data = asdict(self.settings)
data["onnxExecutionProviders"] = self.onnx_session.get_providers() if self.onnx_session != None else []
data["onnxExecutionProviders"] = (
self.onnx_session.get_providers() if self.onnx_session is not None else []
)
files = ["configFile", "pyTorchModelFile", "onnxModelFile"]
for f in files:
if data[f] != None and os.path.exists(data[f]):
if data[f] is not None and os.path.exists(data[f]):
data[f] = os.path.basename(data[f])
else:
data[f] = ""
@ -182,41 +224,64 @@ class DDSP_SVC:
def get_processing_sampling_rate(self):
return self.sampling_rate
def generate_input(self, newData: any, inputSize: int, crossfadeSize: int, solaSearchFrame: int = 0):
def generate_input(
self,
newData: AudioInOut,
inputSize: int,
crossfadeSize: int,
solaSearchFrame: int = 0,
):
newData = newData.astype(np.float32) / 32768.0
if hasattr(self, "audio_buffer"):
self.audio_buffer = np.concatenate([self.audio_buffer, newData], 0) # 過去のデータに連結
if self.audio_buffer is not None:
self.audio_buffer = np.concatenate(
[self.audio_buffer, newData], 0
) # 過去のデータに連結
else:
self.audio_buffer = newData
convertSize = inputSize + crossfadeSize + solaSearchFrame + self.settings.extraConvertSize
convertSize = (
inputSize + crossfadeSize + solaSearchFrame + self.settings.extraConvertSize
)
if convertSize % self.hop_size != 0: # モデルの出力のホップサイズで切り捨てが発生するので補う。
convertSize = convertSize + (self.hop_size - (convertSize % self.hop_size))
self.audio_buffer = self.audio_buffer[-1 * convertSize:] # 変換対象の部分だけ抽出
convertOffset = -1 * convertSize
self.audio_buffer = self.audio_buffer[convertOffset:] # 変換対象の部分だけ抽出
# f0
f0 = self.f0_detector.extract(self.audio_buffer * 32768.0, uv_interp=True,
silence_front=self.settings.extraConvertSize / self.sampling_rate)
f0 = self.f0_detector.extract(
self.audio_buffer * 32768.0,
uv_interp=True,
silence_front=self.settings.extraConvertSize / self.sampling_rate,
)
f0 = torch.from_numpy(f0).float().unsqueeze(-1).unsqueeze(0)
f0 = f0 * 2 ** (float(self.settings.tran) / 12)
# volume, mask
volume = self.volume_extractor.extract(self.audio_buffer)
mask = (volume > 10 ** (float(-60) / 20)).astype('float')
mask = (volume > 10 ** (float(-60) / 20)).astype("float")
mask = np.pad(mask, (4, 4), constant_values=(mask[0], mask[-1]))
mask = np.array([np.max(mask[n: n + 9]) for n in range(len(mask) - 8)])
mask = np.array(
[np.max(mask[n : n + 9]) for n in range(len(mask) - 8)] # noqa: E203
)
mask = torch.from_numpy(mask).float().unsqueeze(-1).unsqueeze(0)
mask = upsample(mask, self.args.data.block_size).squeeze(-1)
volume = torch.from_numpy(volume).float().unsqueeze(-1).unsqueeze(0)
# embed
audio = torch.from_numpy(self.audio_buffer).float().to(self.useDevice()).unsqueeze(0)
audio = (
torch.from_numpy(self.audio_buffer)
.float()
.to(self.useDevice())
.unsqueeze(0)
)
seg_units = self.encoder.encode(audio, self.sampling_rate, self.hop_size)
crop = self.audio_buffer[-1 * (inputSize + crossfadeSize):-1 * (crossfadeSize)]
cropOffset = -1 * (inputSize + crossfadeSize)
cropEnd = -1 * (crossfadeSize)
crop = self.audio_buffer[cropOffset:cropEnd]
rms = np.sqrt(np.square(crop).mean(axis=0))
vol = max(rms, self.prevVol * 0.0)
@ -225,15 +290,14 @@ class DDSP_SVC:
return (seg_units, f0, volume, mask, convertSize, vol)
def _onnx_inference(self, data):
if hasattr(self, "onnx_session") == False or self.onnx_session == None:
if hasattr(self, "onnx_session") is False or self.onnx_session is None:
print("[Voice Changer] No onnx session.")
raise NoModeLoadedException("ONNX")
raise NoModeLoadedException("ONNX")
def _pyTorch_inference(self, data):
if hasattr(self, "model") == False or self.model == None:
if hasattr(self, "model") is False or self.model is None:
print("[Voice Changer] No pyTorch session.")
raise NoModeLoadedException("pytorch")
@ -242,15 +306,19 @@ class DDSP_SVC:
volume = data[2].to(self.useDevice())
mask = data[3].to(self.useDevice())
convertSize = data[4]
vol = data[5]
# convertSize = data[4]
# vol = data[5]
# if vol < self.settings.silentThreshold:
# print("threshold")
# return np.zeros(convertSize).astype(np.int16)
with torch.no_grad():
spk_id = torch.LongTensor(np.array([[self.settings.dstId]])).to(self.useDevice())
seg_output, _, (s_h, s_n) = self.model(c, f0, volume, spk_id=spk_id, spk_mix_dict=None)
spk_id = torch.LongTensor(np.array([[self.settings.dstId]])).to(
self.useDevice()
)
seg_output, _, (s_h, s_n) = self.model(
c, f0, volume, spk_id=spk_id, spk_mix_dict=None
)
seg_output *= mask
if self.settings.enableEnhancer:
@ -260,8 +328,9 @@ class DDSP_SVC:
f0,
self.args.data.block_size,
# adaptive_key=float(self.settings.enhancerTune),
adaptive_key='auto',
silence_front=self.settings.extraConvertSize / self.sampling_rate)
adaptive_key="auto",
silence_front=self.settings.extraConvertSize / self.sampling_rate,
)
result = seg_output.squeeze().cpu().numpy() * 32768.0
return np.array(result).astype(np.int16)
@ -282,7 +351,7 @@ class DDSP_SVC:
del self.onnx_session
remove_path = os.path.join("DDSP-SVC")
sys.path = [x for x in sys.path if x.endswith(remove_path) == False]
sys.path = [x for x in sys.path if x.endswith(remove_path) is False]
for key in list(sys.modules):
val = sys.modules.get(key)
@ -291,5 +360,5 @@ class DDSP_SVC:
if file_path.find("DDSP-SVC" + os.path.sep) >= 0:
print("remove", key, file_path)
sys.modules.pop(key)
except Exception as e:
except: # type:ignore
pass