diff --git a/server/voice_changer/DDSP_SVC/DDSP_SVC.py b/server/voice_changer/DDSP_SVC/DDSP_SVC.py index b44aa191..6cd75bd2 100644 --- a/server/voice_changer/DDSP_SVC/DDSP_SVC.py +++ b/server/voice_changer/DDSP_SVC/DDSP_SVC.py @@ -1,6 +1,11 @@ import sys import os -if sys.platform.startswith('darwin'): +from voice_changer.utils.LoadModelParams import LoadModelParams + +from voice_changer.utils.VoiceChangerModel import AudioInOut +from voice_changer.utils.VoiceChangerParams import VoiceChangerParams + +if sys.platform.startswith("darwin"): baseDir = [x for x in sys.path if x.endswith("Contents/MacOS")] if len(baseDir) != 1: print("baseDir should be only one ", baseDir) @@ -10,24 +15,25 @@ if sys.platform.startswith('darwin'): else: sys.path.append("DDSP-SVC") -import io from dataclasses import dataclass, asdict, field -from functools import reduce import numpy as np import torch -import onnxruntime -import pyworld as pw -import ddsp.vocoder as vo -from ddsp.core import upsample -from enhancer import Enhancer +import ddsp.vocoder as vo # type:ignore +from ddsp.core import upsample # type:ignore +from enhancer import Enhancer # type:ignore from Exceptions import NoModeLoadedException -providers = ['OpenVINOExecutionProvider', "CUDAExecutionProvider", "DmlExecutionProvider", "CPUExecutionProvider"] +providers = [ + "OpenVINOExecutionProvider", + "CUDAExecutionProvider", + "DmlExecutionProvider", + "CPUExecutionProvider", +] @dataclass -class DDSP_SVCSettings(): +class DDSP_SVCSettings: gpu: int = 0 dstId: int = 0 @@ -45,18 +51,26 @@ class DDSP_SVCSettings(): onnxModelFile: str = "" configFile: str = "" - speakers: dict[str, int] = field( - default_factory=lambda: {} - ) + speakers: dict[str, int] = field(default_factory=lambda: {}) # ↓mutableな物だけ列挙 - intData = ["gpu", "dstId", "tran", "predictF0", "extraConvertSize", "enableEnhancer", "enhancerTune"] + intData = [ + "gpu", + "dstId", + "tran", + "predictF0", + "extraConvertSize", + "enableEnhancer", + "enhancerTune", + ] floatData = ["silentThreshold", "clusterInferRatio"] strData = ["framework", "f0Detector"] class DDSP_SVC: - def __init__(self, params): + audio_buffer: AudioInOut | None = None + + def __init__(self, params: VoiceChangerParams): self.settings = DDSP_SVCSettings() self.net_g = None self.onnx_session = None @@ -72,24 +86,30 @@ class DDSP_SVC: else: return torch.device("cpu") - def loadModel(self, props): - # self.settings.configFile = props["files"]["configFilename"] # 同じフォルダにあるyamlを使う - self.settings.pyTorchModelFile = props["files"]["pyTorchModelFilename"] + def loadModel(self, props: LoadModelParams): + self.settings.pyTorchModelFile = props.files.pyTorchModelFilename # model - model, args = vo.load_model(self.settings.pyTorchModelFile, device=self.useDevice()) + model, args = vo.load_model( + self.settings.pyTorchModelFile, device=self.useDevice() + ) self.model = model self.args = args self.sampling_rate = args.data.sampling_rate - self.hop_size = int(self.args.data.block_size * self.sampling_rate / self.args.data.sampling_rate) + self.hop_size = int( + self.args.data.block_size + * self.sampling_rate + / self.args.data.sampling_rate + ) # hubert - self.vec_path = self.params["hubert_soft"] + self.vec_path = self.params.hubert_soft self.encoder = vo.Units_Encoder( self.args.data.encoder, self.vec_path, self.args.data.encoder_sample_rate, self.args.data.encoder_hop_size, - device=self.useDevice()) + device=self.useDevice(), + ) # ort_options = onnxruntime.SessionOptions() # ort_options.intra_op_num_threads = 8 @@ -111,36 +131,59 @@ class DDSP_SVC: self.sampling_rate, self.hop_size, float(50), - float(1100)) + float(1100), + ) self.volume_extractor = vo.Volume_Extractor(self.hop_size) - self.enhancer_path = self.params["nsf_hifigan"] - self.enhancer = Enhancer(self.args.enhancer.type, self.enhancer_path, device=self.useDevice()) + self.enhancer_path = self.params.nsf_hifigan + self.enhancer = Enhancer( + self.args.enhancer.type, self.enhancer_path, device=self.useDevice() + ) return self.get_info() - def update_settings(self, key: str, val: any): - if key == "onnxExecutionProvider" and self.onnx_session != None: + def update_settings(self, key: str, val: int | float | str): + if key == "onnxExecutionProvider" and self.onnx_session is not None: if val == "CUDAExecutionProvider": if self.settings.gpu < 0 or self.settings.gpu >= self.gpu_num: self.settings.gpu = 0 - provider_options = [{'device_id': self.settings.gpu}] - self.onnx_session.set_providers(providers=[val], provider_options=provider_options) + provider_options = [{"device_id": self.settings.gpu}] + self.onnx_session.set_providers( + providers=[val], provider_options=provider_options + ) else: self.onnx_session.set_providers(providers=[val]) elif key in self.settings.intData: - setattr(self.settings, key, int(val)) - if key == "gpu" and val >= 0 and val < self.gpu_num and self.onnx_session != None: + val = int(val) + setattr(self.settings, key, val) + if ( + key == "gpu" + and val >= 0 + and val < self.gpu_num + and self.onnx_session is not None + ): providers = self.onnx_session.get_providers() print("Providers:", providers) if "CUDAExecutionProvider" in providers: - provider_options = [{'device_id': self.settings.gpu}] - self.onnx_session.set_providers(providers=["CUDAExecutionProvider"], provider_options=provider_options) + provider_options = [{"device_id": self.settings.gpu}] + self.onnx_session.set_providers( + providers=["CUDAExecutionProvider"], + provider_options=provider_options, + ) if key == "gpu" and len(self.settings.pyTorchModelFile) > 0: - model, _args = vo.load_model(self.settings.pyTorchModelFile, device=self.useDevice()) + model, _args = vo.load_model( + self.settings.pyTorchModelFile, device=self.useDevice() + ) self.model = model - self.enhancer = Enhancer(self.args.enhancer.type, self.enhancer_path, device=self.useDevice()) - self.encoder = vo.Units_Encoder(self.args.data.encoder, self.vec_path, self.args.data.encoder_sample_rate, - self.args.data.encoder_hop_size, device=self.useDevice()) + self.enhancer = Enhancer( + self.args.enhancer.type, self.enhancer_path, device=self.useDevice() + ) + self.encoder = vo.Units_Encoder( + self.args.data.encoder, + self.vec_path, + self.args.data.encoder_sample_rate, + self.args.data.encoder_hop_size, + device=self.useDevice(), + ) elif key in self.settings.floatData: setattr(self.settings, key, float(val)) @@ -151,16 +194,13 @@ class DDSP_SVC: if val == "dio": val = "parselmouth" - if hasattr(self, "sampling_rate") == False: + if hasattr(self, "sampling_rate") is False: self.sampling_rate = 44100 self.hop_size = 512 self.f0_detector = vo.F0_Extractor( - val, - self.sampling_rate, - self.hop_size, - float(50), - float(1100)) + val, self.sampling_rate, self.hop_size, float(50), float(1100) + ) else: return False @@ -169,10 +209,12 @@ class DDSP_SVC: def get_info(self): data = asdict(self.settings) - data["onnxExecutionProviders"] = self.onnx_session.get_providers() if self.onnx_session != None else [] + data["onnxExecutionProviders"] = ( + self.onnx_session.get_providers() if self.onnx_session is not None else [] + ) files = ["configFile", "pyTorchModelFile", "onnxModelFile"] for f in files: - if data[f] != None and os.path.exists(data[f]): + if data[f] is not None and os.path.exists(data[f]): data[f] = os.path.basename(data[f]) else: data[f] = "" @@ -182,41 +224,64 @@ class DDSP_SVC: def get_processing_sampling_rate(self): return self.sampling_rate - def generate_input(self, newData: any, inputSize: int, crossfadeSize: int, solaSearchFrame: int = 0): + def generate_input( + self, + newData: AudioInOut, + inputSize: int, + crossfadeSize: int, + solaSearchFrame: int = 0, + ): newData = newData.astype(np.float32) / 32768.0 - if hasattr(self, "audio_buffer"): - self.audio_buffer = np.concatenate([self.audio_buffer, newData], 0) # 過去のデータに連結 + if self.audio_buffer is not None: + self.audio_buffer = np.concatenate( + [self.audio_buffer, newData], 0 + ) # 過去のデータに連結 else: self.audio_buffer = newData - convertSize = inputSize + crossfadeSize + solaSearchFrame + self.settings.extraConvertSize + convertSize = ( + inputSize + crossfadeSize + solaSearchFrame + self.settings.extraConvertSize + ) if convertSize % self.hop_size != 0: # モデルの出力のホップサイズで切り捨てが発生するので補う。 convertSize = convertSize + (self.hop_size - (convertSize % self.hop_size)) - self.audio_buffer = self.audio_buffer[-1 * convertSize:] # 変換対象の部分だけ抽出 + convertOffset = -1 * convertSize + self.audio_buffer = self.audio_buffer[convertOffset:] # 変換対象の部分だけ抽出 # f0 - f0 = self.f0_detector.extract(self.audio_buffer * 32768.0, uv_interp=True, - silence_front=self.settings.extraConvertSize / self.sampling_rate) + f0 = self.f0_detector.extract( + self.audio_buffer * 32768.0, + uv_interp=True, + silence_front=self.settings.extraConvertSize / self.sampling_rate, + ) f0 = torch.from_numpy(f0).float().unsqueeze(-1).unsqueeze(0) f0 = f0 * 2 ** (float(self.settings.tran) / 12) # volume, mask volume = self.volume_extractor.extract(self.audio_buffer) - mask = (volume > 10 ** (float(-60) / 20)).astype('float') + mask = (volume > 10 ** (float(-60) / 20)).astype("float") mask = np.pad(mask, (4, 4), constant_values=(mask[0], mask[-1])) - mask = np.array([np.max(mask[n: n + 9]) for n in range(len(mask) - 8)]) + mask = np.array( + [np.max(mask[n : n + 9]) for n in range(len(mask) - 8)] # noqa: E203 + ) mask = torch.from_numpy(mask).float().unsqueeze(-1).unsqueeze(0) mask = upsample(mask, self.args.data.block_size).squeeze(-1) volume = torch.from_numpy(volume).float().unsqueeze(-1).unsqueeze(0) # embed - audio = torch.from_numpy(self.audio_buffer).float().to(self.useDevice()).unsqueeze(0) + audio = ( + torch.from_numpy(self.audio_buffer) + .float() + .to(self.useDevice()) + .unsqueeze(0) + ) seg_units = self.encoder.encode(audio, self.sampling_rate, self.hop_size) - crop = self.audio_buffer[-1 * (inputSize + crossfadeSize):-1 * (crossfadeSize)] + cropOffset = -1 * (inputSize + crossfadeSize) + cropEnd = -1 * (crossfadeSize) + crop = self.audio_buffer[cropOffset:cropEnd] rms = np.sqrt(np.square(crop).mean(axis=0)) vol = max(rms, self.prevVol * 0.0) @@ -225,15 +290,14 @@ class DDSP_SVC: return (seg_units, f0, volume, mask, convertSize, vol) def _onnx_inference(self, data): - if hasattr(self, "onnx_session") == False or self.onnx_session == None: + if hasattr(self, "onnx_session") is False or self.onnx_session is None: print("[Voice Changer] No onnx session.") raise NoModeLoadedException("ONNX") raise NoModeLoadedException("ONNX") def _pyTorch_inference(self, data): - - if hasattr(self, "model") == False or self.model == None: + if hasattr(self, "model") is False or self.model is None: print("[Voice Changer] No pyTorch session.") raise NoModeLoadedException("pytorch") @@ -242,15 +306,19 @@ class DDSP_SVC: volume = data[2].to(self.useDevice()) mask = data[3].to(self.useDevice()) - convertSize = data[4] - vol = data[5] + # convertSize = data[4] + # vol = data[5] # if vol < self.settings.silentThreshold: # print("threshold") # return np.zeros(convertSize).astype(np.int16) with torch.no_grad(): - spk_id = torch.LongTensor(np.array([[self.settings.dstId]])).to(self.useDevice()) - seg_output, _, (s_h, s_n) = self.model(c, f0, volume, spk_id=spk_id, spk_mix_dict=None) + spk_id = torch.LongTensor(np.array([[self.settings.dstId]])).to( + self.useDevice() + ) + seg_output, _, (s_h, s_n) = self.model( + c, f0, volume, spk_id=spk_id, spk_mix_dict=None + ) seg_output *= mask if self.settings.enableEnhancer: @@ -260,8 +328,9 @@ class DDSP_SVC: f0, self.args.data.block_size, # adaptive_key=float(self.settings.enhancerTune), - adaptive_key='auto', - silence_front=self.settings.extraConvertSize / self.sampling_rate) + adaptive_key="auto", + silence_front=self.settings.extraConvertSize / self.sampling_rate, + ) result = seg_output.squeeze().cpu().numpy() * 32768.0 return np.array(result).astype(np.int16) @@ -282,7 +351,7 @@ class DDSP_SVC: del self.onnx_session remove_path = os.path.join("DDSP-SVC") - sys.path = [x for x in sys.path if x.endswith(remove_path) == False] + sys.path = [x for x in sys.path if x.endswith(remove_path) is False] for key in list(sys.modules): val = sys.modules.get(key) @@ -291,5 +360,5 @@ class DDSP_SVC: if file_path.find("DDSP-SVC" + os.path.sep) >= 0: print("remove", key, file_path) sys.modules.pop(key) - except Exception as e: + except: # type:ignore pass