voice-changer/server/voice_changer/DDSP_SVC/DDSP_SVC.py

320 lines
12 KiB
Python
Raw Normal View History

2023-03-24 02:56:15 +03:00
import sys
import os
if sys.platform.startswith('darwin'):
baseDir = [x for x in sys.path if x.endswith("Contents/MacOS")]
if len(baseDir) != 1:
print("baseDir should be only one ", baseDir)
sys.exit()
modulePath = os.path.join(baseDir[0], "DDSP-SVC")
sys.path.append(modulePath)
else:
sys.path.append("DDSP-SVC")
import io
from dataclasses import dataclass, asdict, field
from functools import reduce
import numpy as np
import torch
import onnxruntime
import pyworld as pw
import ddsp.vocoder as vo
from ddsp.core import upsample
2023-03-24 04:52:36 +03:00
from enhancer import Enhancer
2023-04-17 03:45:12 +03:00
from Exceptions import NoModeLoadedException
2023-03-24 02:56:15 +03:00
providers = ['OpenVINOExecutionProvider', "CUDAExecutionProvider", "DmlExecutionProvider", "CPUExecutionProvider"]
@dataclass
class DDSP_SVCSettings():
gpu: int = 0
dstId: int = 0
2023-04-16 15:34:00 +03:00
f0Detector: str = "dio" # dio or harvest # parselmouth
2023-03-24 02:56:15 +03:00
tran: int = 20
predictF0: int = 0 # 0:False, 1:True
silentThreshold: float = 0.00001
extraConvertSize: int = 1024 * 32
2023-04-16 15:34:00 +03:00
enableEnhancer: int = 0
enhancerTune: int = 0
2023-03-24 02:56:15 +03:00
framework: str = "PyTorch" # PyTorch or ONNX
pyTorchModelFile: str = ""
onnxModelFile: str = ""
configFile: str = ""
speakers: dict[str, int] = field(
default_factory=lambda: {}
)
# ↓mutableな物だけ列挙
2023-04-16 15:34:00 +03:00
intData = ["gpu", "dstId", "tran", "predictF0", "extraConvertSize", "enableEnhancer", "enhancerTune"]
2023-03-24 02:56:15 +03:00
floatData = ["noiceScale", "silentThreshold", "clusterInferRatio"]
strData = ["framework", "f0Detector"]
class DDSP_SVC:
def __init__(self, params):
self.settings = DDSP_SVCSettings()
self.net_g = None
self.onnx_session = None
self.gpu_num = torch.cuda.device_count()
self.prevVol = 0
self.params = params
print("DDSP-SVC initialization:", params)
2023-04-16 22:37:22 +03:00
def useDevice(self):
if self.settings.gpu >= 0 and torch.cuda.is_available():
return torch.device("cuda", index=self.settings.gpu)
else:
return torch.device("cpu")
2023-04-16 15:34:00 +03:00
def loadModel(self, props):
2023-04-17 04:35:16 +03:00
# self.settings.configFile = props["files"]["configFilename"] # 同じフォルダにあるyamlを使う
2023-04-16 15:34:00 +03:00
self.settings.pyTorchModelFile = props["files"]["pyTorchModelFilename"]
2023-03-24 02:56:15 +03:00
# model
2023-04-16 22:37:22 +03:00
model, args = vo.load_model(self.settings.pyTorchModelFile, device=self.useDevice())
2023-03-24 02:56:15 +03:00
self.model = model
self.args = args
2023-04-16 22:37:22 +03:00
self.sampling_rate = args.data.sampling_rate
self.hop_size = int(self.args.data.block_size * self.sampling_rate / self.args.data.sampling_rate)
print("-------------------hopsize", self.hop_size)
2023-03-24 02:56:15 +03:00
2023-03-24 03:47:14 +03:00
# hubert
2023-04-16 22:37:22 +03:00
self.vec_path = self.params["hubertSoftPt"]
2023-03-24 02:56:15 +03:00
self.encoder = vo.Units_Encoder(
2023-04-16 22:37:22 +03:00
self.args.data.encoder,
self.vec_path,
self.args.data.encoder_sample_rate,
self.args.data.encoder_hop_size,
device=self.useDevice())
2023-03-29 17:11:03 +03:00
# ort_options = onnxruntime.SessionOptions()
# ort_options.intra_op_num_threads = 8
# self.onnx_session = onnxruntime.InferenceSession(
# "model_DDSP-SVC/hubert4.0.onnx",
# providers=providers
# )
# inputs = self.onnx_session.get_inputs()
# outputs = self.onnx_session.get_outputs()
# for input in inputs:
# print("input::::", input)
# for output in outputs:
# print("output::::", output)
2023-03-24 02:56:15 +03:00
# f0dec
self.f0_detector = vo.F0_Extractor(
2023-03-24 04:27:45 +03:00
# "crepe",
2023-03-24 02:56:15 +03:00
self.settings.f0Detector,
2023-04-16 22:37:22 +03:00
self.sampling_rate,
2023-03-24 04:27:45 +03:00
self.hop_size,
2023-03-24 02:56:15 +03:00
float(50),
float(1100))
2023-03-24 03:47:14 +03:00
self.volume_extractor = vo.Volume_Extractor(self.hop_size)
2023-04-16 22:37:22 +03:00
self.enhancer_path = self.params["enhancerPt"]
self.enhancer = Enhancer(self.args.enhancer.type, self.enhancer_path, device=self.useDevice())
2023-03-24 02:56:15 +03:00
return self.get_info()
def update_settings(self, key: str, val: any):
2023-03-24 02:56:15 +03:00
if key == "onnxExecutionProvider" and self.onnx_session != None:
if val == "CUDAExecutionProvider":
if self.settings.gpu < 0 or self.settings.gpu >= self.gpu_num:
self.settings.gpu = 0
provider_options = [{'device_id': self.settings.gpu}]
self.onnx_session.set_providers(providers=[val], provider_options=provider_options)
else:
self.onnx_session.set_providers(providers=[val])
elif key in self.settings.intData:
setattr(self.settings, key, int(val))
if key == "gpu" and val >= 0 and val < self.gpu_num and self.onnx_session != None:
providers = self.onnx_session.get_providers()
print("Providers:", providers)
if "CUDAExecutionProvider" in providers:
provider_options = [{'device_id': self.settings.gpu}]
self.onnx_session.set_providers(providers=["CUDAExecutionProvider"], provider_options=provider_options)
2023-04-17 04:35:16 +03:00
if key == "gpu" and len(self.settings.pyTorchModelFile) > 0:
2023-04-16 22:37:22 +03:00
model, _args = vo.load_model(self.settings.pyTorchModelFile, device=self.useDevice())
self.model = model
self.enhancer = Enhancer(self.args.enhancer.type, self.enhancer_path, device=self.useDevice())
self.encoder = vo.Units_Encoder(self.args.data.encoder, self.vec_path, self.args.data.encoder_sample_rate,
self.args.data.encoder_hop_size, device=self.useDevice())
2023-03-24 02:56:15 +03:00
elif key in self.settings.floatData:
setattr(self.settings, key, float(val))
elif key in self.settings.strData:
setattr(self.settings, key, str(val))
2023-04-16 15:34:00 +03:00
if key == "f0Detector":
print("f0Detector update", val)
if val == "dio":
val = "parselmouth"
2023-04-16 22:37:22 +03:00
if hasattr(self, "sampling_rate") == False:
self.sampling_rate = 44100
self.hop_size = 512
2023-04-16 15:34:00 +03:00
self.f0_detector = vo.F0_Extractor(
val,
2023-04-16 22:37:22 +03:00
self.sampling_rate,
2023-04-16 15:34:00 +03:00
self.hop_size,
float(50),
float(1100))
2023-03-24 02:56:15 +03:00
else:
return False
return True
def get_info(self):
data = asdict(self.settings)
data["onnxExecutionProviders"] = self.onnx_session.get_providers() if self.onnx_session != None else []
files = ["configFile", "pyTorchModelFile", "onnxModelFile"]
for f in files:
if data[f] != None and os.path.exists(data[f]):
data[f] = os.path.basename(data[f])
else:
data[f] = ""
return data
def get_processing_sampling_rate(self):
2023-04-16 22:37:22 +03:00
return self.sampling_rate
2023-03-24 02:56:15 +03:00
2023-04-16 15:34:00 +03:00
def generate_input(self, newData: any, inputSize: int, crossfadeSize: int, solaSearchFrame: int = 0):
2023-03-24 03:38:23 +03:00
newData = newData.astype(np.float32) / 32768.0
2023-03-24 02:56:15 +03:00
if hasattr(self, "audio_buffer"):
self.audio_buffer = np.concatenate([self.audio_buffer, newData], 0) # 過去のデータに連結
else:
self.audio_buffer = newData
2023-04-16 15:34:00 +03:00
convertSize = inputSize + crossfadeSize + solaSearchFrame + self.settings.extraConvertSize
2023-03-24 03:47:14 +03:00
if convertSize % self.hop_size != 0: # モデルの出力のホップサイズで切り捨てが発生するので補う。
convertSize = convertSize + (self.hop_size - (convertSize % self.hop_size))
2023-03-24 02:56:15 +03:00
self.audio_buffer = self.audio_buffer[-1 * convertSize:] # 変換対象の部分だけ抽出
2023-03-24 04:27:45 +03:00
# f0
2023-03-24 04:52:36 +03:00
f0 = self.f0_detector.extract(self.audio_buffer * 32768.0, uv_interp=True)
2023-03-24 03:42:21 +03:00
f0 = torch.from_numpy(f0).float().unsqueeze(-1).unsqueeze(0)
2023-03-24 04:27:45 +03:00
f0 = f0 * 2 ** (float(self.settings.tran) / 12)
2023-03-24 03:42:21 +03:00
2023-03-24 04:27:45 +03:00
# volume, mask
2023-03-24 03:47:14 +03:00
volume = self.volume_extractor.extract(self.audio_buffer)
2023-03-24 04:27:45 +03:00
mask = (volume > 10 ** (float(-60) / 20)).astype('float')
mask = np.pad(mask, (4, 4), constant_values=(mask[0], mask[-1]))
mask = np.array([np.max(mask[n: n + 9]) for n in range(len(mask) - 8)])
mask = torch.from_numpy(mask).float().unsqueeze(-1).unsqueeze(0)
mask = upsample(mask, self.args.data.block_size).squeeze(-1)
volume = torch.from_numpy(volume).float().unsqueeze(-1).unsqueeze(0)
2023-03-24 03:47:14 +03:00
2023-03-24 04:27:45 +03:00
# embed
2023-04-16 22:37:22 +03:00
audio = torch.from_numpy(self.audio_buffer).float().to(self.useDevice()).unsqueeze(0)
seg_units = self.encoder.encode(audio, self.sampling_rate, self.hop_size)
2023-03-24 04:27:45 +03:00
crop = self.audio_buffer[-1 * (inputSize + crossfadeSize):-1 * (crossfadeSize)]
2023-03-24 02:56:15 +03:00
2023-03-24 04:27:45 +03:00
rms = np.sqrt(np.square(crop).mean(axis=0))
vol = max(rms, self.prevVol * 0.0)
self.prevVol = vol
2023-03-24 02:56:15 +03:00
2023-03-24 04:27:45 +03:00
return (seg_units, f0, volume, mask, convertSize, vol)
2023-03-24 02:56:15 +03:00
def _onnx_inference(self, data):
if hasattr(self, "onnx_session") == False or self.onnx_session == None:
print("[Voice Changer] No onnx session.")
2023-04-17 03:45:12 +03:00
raise NoModeLoadedException("ONNX")
2023-03-24 02:56:15 +03:00
2023-03-24 03:38:23 +03:00
seg_units = data[0]
# f0 = data[1]
# convertSize = data[2]
# vol = data[3]
2023-03-24 02:56:15 +03:00
if vol < self.settings.silentThreshold:
return np.zeros(convertSize).astype(np.int16)
c, f0, uv = [x.numpy() for x in data]
audio1 = self.onnx_session.run(
["audio"],
{
"c": c,
"f0": f0,
"g": np.array([self.settings.dstId]).astype(np.int64),
"uv": np.array([self.settings.dstId]).astype(np.int64),
"predict_f0": np.array([self.settings.dstId]).astype(np.int64),
"noice_scale": np.array([self.settings.dstId]).astype(np.int64),
})[0][0, 0] * self.hps.data.max_wav_value
audio1 = audio1 * vol
result = audio1
return result
def _pyTorch_inference(self, data):
if hasattr(self, "model") == False or self.model == None:
print("[Voice Changer] No pyTorch session.")
2023-04-17 03:45:12 +03:00
raise NoModeLoadedException("pytorch")
2023-03-24 02:56:15 +03:00
2023-04-16 22:37:22 +03:00
c = data[0].to(self.useDevice())
f0 = data[1].to(self.useDevice())
volume = data[2].to(self.useDevice())
mask = data[3].to(self.useDevice())
2023-03-24 03:38:23 +03:00
2023-03-24 04:27:45 +03:00
convertSize = data[4]
2023-03-29 17:11:03 +03:00
vol = data[5]
# if vol < self.settings.silentThreshold:
# print("threshold")
# return np.zeros(convertSize).astype(np.int16)
2023-03-24 02:56:15 +03:00
with torch.no_grad():
2023-04-18 02:09:46 +03:00
spk_id = torch.LongTensor(np.array([[self.settings.dstId]])).to(self.useDevice())
2023-03-24 04:27:45 +03:00
seg_output, _, (s_h, s_n) = self.model(c, f0, volume, spk_id=spk_id, spk_mix_dict=None)
seg_output *= mask
2023-03-24 04:52:36 +03:00
2023-04-16 15:34:00 +03:00
if self.settings.enableEnhancer:
seg_output, output_sample_rate = self.enhancer.enhance(
seg_output,
self.args.data.sampling_rate,
f0,
self.args.data.block_size,
adaptive_key=float(self.settings.enhancerTune))
2023-03-24 04:27:45 +03:00
result = seg_output.squeeze().cpu().numpy() * 32768.0
return np.array(result).astype(np.int16)
2023-03-24 02:56:15 +03:00
def inference(self, data):
if self.settings.framework == "ONNX":
audio = self._onnx_inference(data)
else:
audio = self._pyTorch_inference(data)
return audio
def destroy(self):
del self.net_g
del self.onnx_session
2023-04-10 18:21:17 +03:00
def __del__(self):
del self.net_g
del self.onnx_session
2023-04-16 15:34:00 +03:00
remove_path = os.path.join("DDSP-SVC")
sys.path = [x for x in sys.path if x.endswith(remove_path) == False]
for key in list(sys.modules):
val = sys.modules.get(key)
try:
file_path = val.__file__
if file_path.find("DDSP-SVC" + os.path.sep) >= 0:
print("remove", key, file_path)
sys.modules.pop(key)
except Exception as e:
pass