voice-changer/server/voice_changer/RVC/RVC.py
2023-04-28 07:36:08 +09:00

554 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import os
import json
import resampy
from voice_changer.RVC.ModelWrapper import ModelWrapper
from Exceptions import NoModeLoadedException
from voice_changer.RVC.RVCSettings import RVCSettings
from voice_changer.utils.LoadModelParams import LoadModelParams
from voice_changer.utils.VoiceChangerParams import VoiceChangerParams
from dataclasses import asdict
import numpy as np
import torch
from fairseq import checkpoint_utils
from const import TMP_DIR
# avoiding parse arg error in RVC
sys.argv = ["MMVCServerSIO.py"]
if sys.platform.startswith("darwin"):
baseDir = [x for x in sys.path if x.endswith("Contents/MacOS")]
if len(baseDir) != 1:
print("baseDir should be only one ", baseDir)
sys.exit()
modulePath = os.path.join(baseDir[0], "RVC")
sys.path.append(modulePath)
else:
sys.path.append("RVC")
from .models import SynthesizerTrnMsNSFsid as SynthesizerTrnMsNSFsid_webui
from .models import SynthesizerTrnMsNSFsidNono as SynthesizerTrnMsNSFsidNono_webui
from .const import RVC_MODEL_TYPE_RVC, RVC_MODEL_TYPE_WEBUI
from voice_changer.RVC.custom_vc_infer_pipeline import VC
from infer_pack.models import SynthesizerTrnMs256NSFsid, SynthesizerTrnMs256NSFsid_nono
providers = [
"OpenVINOExecutionProvider",
"CUDAExecutionProvider",
"DmlExecutionProvider",
"CPUExecutionProvider",
]
class RVC:
def __init__(self, params: VoiceChangerParams):
self.initialLoad = True
self.settings = RVCSettings()
self.net_g = None
self.onnx_session = None
self.feature_file = None
self.index_file = None
self.gpu_num = torch.cuda.device_count()
self.prevVol = 0
self.params = params
self.mps_enabled: bool = (
getattr(torch.backends, "mps", None) is not None
and torch.backends.mps.is_available()
)
self.currentSlot = -1
print("RVC initialization: ", params)
print("mps: ", self.mps_enabled)
def loadModel(self, props: LoadModelParams):
"""
loadModelはスロットへのエントリ推論向けにはロードしない
例外的に、まだ一つも推論向けにロードされていない場合は、ロードする。
"""
self.is_half = props.isHalf
tmp_slot = props.slot
params_str = props.params
params = json.loads(params_str)
self.settings.modelSlots[
tmp_slot
].pyTorchModelFile = props.files.pyTorchModelFilename
self.settings.modelSlots[tmp_slot].onnxModelFile = props.files.onnxModelFilename
self.settings.modelSlots[tmp_slot].featureFile = props.files.featureFilename
self.settings.modelSlots[tmp_slot].indexFile = props.files.indexFilename
self.settings.modelSlots[tmp_slot].defaultTrans = params["trans"]
isONNX = (
True
if self.settings.modelSlots[tmp_slot].onnxModelFile is not None
else False
)
# メタデータ設定
if isONNX:
self._setInfoByONNX(
tmp_slot, self.settings.modelSlots[tmp_slot].onnxModelFile
)
else:
self._setInfoByPytorch(
tmp_slot, self.settings.modelSlots[tmp_slot].pyTorchModelFile
)
print(
f"[Voice Changer] RVC loading... slot:{tmp_slot}",
asdict(self.settings.modelSlots[tmp_slot]),
)
# hubertロード
try:
hubert_path = self.params.hubert_base
models, saved_cfg, task = checkpoint_utils.load_model_ensemble_and_task(
[hubert_path],
suffix="",
)
model = models[0]
model.eval()
if self.is_half:
model = model.half()
self.hubert_model = model
except Exception as e:
print("EXCEPTION during loading hubert/contentvec model", e)
# 初回のみロード
if self.initialLoad or tmp_slot == self.currentSlot:
self.prepareModel(tmp_slot)
self.settings.modelSlotIndex = tmp_slot
self.currentSlot = self.settings.modelSlotIndex
self.switchModel()
self.initialLoad = False
return self.get_info()
def _setInfoByPytorch(self, slot, file):
cpt = torch.load(file, map_location="cpu")
config_len = len(cpt["config"])
if config_len == 18:
self.settings.modelSlots[slot].modelType = RVC_MODEL_TYPE_RVC
self.settings.modelSlots[slot].embChannels = 256
self.settings.modelSlots[slot].embedder = "hubert_base"
else:
self.settings.modelSlots[slot].modelType = RVC_MODEL_TYPE_WEBUI
self.settings.modelSlots[slot].embChannels = cpt["config"][17]
self.settings.modelSlots[slot].embedder = cpt["embedder_name"]
if self.settings.modelSlots[slot].embedder.endswith("768"):
self.settings.modelSlots[slot].embedder = self.settings.modelSlots[
slot
].embedder[:-3]
self.settings.modelSlots[slot].f0 = True if cpt["f0"] == 1 else False
self.settings.modelSlots[slot].samplingRate = cpt["config"][-1]
# self.settings.modelSamplingRate = cpt["config"][-1]
def _setInfoByONNX(self, slot, file):
tmp_onnx_session = ModelWrapper(file)
self.settings.modelSlots[slot].modelType = tmp_onnx_session.getModelType()
self.settings.modelSlots[slot].embChannels = tmp_onnx_session.getEmbChannels()
self.settings.modelSlots[slot].embedder = tmp_onnx_session.getEmbedder()
self.settings.modelSlots[slot].f0 = tmp_onnx_session.getF0()
self.settings.modelSlots[slot].samplingRate = tmp_onnx_session.getSamplingRate()
self.settings.modelSlots[slot].deprecated = tmp_onnx_session.getDeprecated()
def prepareModel(self, slot: int):
print("[Voice Changer] Prepare Model of slot:", slot)
onnxModelFile = self.settings.modelSlots[slot].onnxModelFile
isONNX = (
True if self.settings.modelSlots[slot].onnxModelFile is not None else False
)
if isONNX:
print("[Voice Changer] Loading ONNX Model...")
self.next_onnx_session = ModelWrapper(onnxModelFile)
self.next_net_g = None
else:
print("[Voice Changer] Loading Pytorch Model...")
torchModelSlot = self.settings.modelSlots[slot]
cpt = torch.load(torchModelSlot.pyTorchModelFile, map_location="cpu")
if (
torchModelSlot.modelType == RVC_MODEL_TYPE_RVC
and torchModelSlot.f0 is True
):
net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=self.is_half)
elif (
torchModelSlot.modelType == RVC_MODEL_TYPE_RVC
and torchModelSlot.f0 is False
):
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"])
elif (
torchModelSlot.modelType == RVC_MODEL_TYPE_WEBUI
and torchModelSlot.f0 is True
):
net_g = SynthesizerTrnMsNSFsid_webui(
**cpt["params"], is_half=self.is_half
)
else:
net_g = SynthesizerTrnMsNSFsidNono_webui(
**cpt["params"], is_half=self.is_half
)
net_g.eval()
net_g.load_state_dict(cpt["weight"], strict=False)
if self.is_half:
net_g = net_g.half()
self.next_net_g = net_g
self.next_onnx_session = None
self.next_feature_file = self.settings.modelSlots[slot].featureFile
self.next_index_file = self.settings.modelSlots[slot].indexFile
self.next_trans = self.settings.modelSlots[slot].defaultTrans
self.next_samplingRate = self.settings.modelSlots[slot].samplingRate
self.next_framework = (
"ONNX" if self.next_onnx_session is not None else "PyTorch"
)
print("[Voice Changer] Prepare done.")
return self.get_info()
def switchModel(self):
print("[Voice Changer] Switching model..")
# del self.net_g
# del self.onnx_session
self.net_g = self.next_net_g
self.onnx_session = self.next_onnx_session
self.feature_file = self.next_feature_file
self.index_file = self.next_index_file
self.settings.tran = self.next_trans
self.settings.framework = self.next_framework
self.settings.modelSamplingRate = self.next_samplingRate
self.next_net_g = None
self.next_onnx_session = None
print(
"[Voice Changer] Switching model..done",
)
def update_settings(self, key: str, val: any):
if key == "onnxExecutionProvider" and self.onnx_session is not None:
if val == "CUDAExecutionProvider":
if self.settings.gpu < 0 or self.settings.gpu >= self.gpu_num:
self.settings.gpu = 0
provider_options = [{"device_id": self.settings.gpu}]
self.onnx_session.set_providers(
providers=[val], provider_options=provider_options
)
if hasattr(self, "hubert_onnx"):
self.hubert_onnx.set_providers(
providers=[val], provider_options=provider_options
)
else:
self.onnx_session.set_providers(providers=[val])
if hasattr(self, "hubert_onnx"):
self.hubert_onnx.set_providers(providers=[val])
elif key == "onnxExecutionProvider" and self.onnx_session == None:
print("Onnx is not enabled. Please load model.")
return False
elif key in self.settings.intData:
if (
key == "gpu"
and val >= 0
and val < self.gpu_num
and self.onnx_session is not None
):
providers = self.onnx_session.get_providers()
print("Providers:", providers)
if "CUDAExecutionProvider" in providers:
provider_options = [{"device_id": self.settings.gpu}]
self.onnx_session.set_providers(
providers=["CUDAExecutionProvider"],
provider_options=provider_options,
)
if key == "modelSlotIndex":
# self.switchModel(int(val))
val = int(val) % 1000 # Quick hack for same slot is selected
self.prepareModel(val)
self.currentSlot = -1
setattr(self.settings, key, int(val))
elif key in self.settings.floatData:
setattr(self.settings, key, float(val))
elif key in self.settings.strData:
setattr(self.settings, key, str(val))
else:
return False
return True
def get_info(self):
data = asdict(self.settings)
data["onnxExecutionProviders"] = (
self.onnx_session.get_providers() if self.onnx_session is not None else []
)
files = ["configFile", "pyTorchModelFile", "onnxModelFile"]
for f in files:
if data[f] is not None and os.path.exists(data[f]):
data[f] = os.path.basename(data[f])
else:
data[f] = ""
return data
def get_processing_sampling_rate(self):
return self.settings.modelSamplingRate
def generate_input(
self, newData: any, inputSize: int, crossfadeSize: int, solaSearchFrame: int = 0
):
newData = newData.astype(np.float32) / 32768.0
if hasattr(self, "audio_buffer"):
self.audio_buffer = np.concatenate(
[self.audio_buffer, newData], 0
) # 過去のデータに連結
else:
self.audio_buffer = newData
convertSize = (
inputSize + crossfadeSize + solaSearchFrame + self.settings.extraConvertSize
)
if convertSize % 128 != 0: # モデルの出力のホップサイズで切り捨てが発生するので補う。
convertSize = convertSize + (128 - (convertSize % 128))
self.audio_buffer = self.audio_buffer[-1 * convertSize :] # 変換対象の部分だけ抽出
crop = self.audio_buffer[
-1 * (inputSize + crossfadeSize) : -1 * (crossfadeSize)
] # 出力部分だけ切り出して音量を確認。(solaとの関係性について、現状は無考慮)
rms = np.sqrt(np.square(crop).mean(axis=0))
vol = max(rms, self.prevVol * 0.0)
self.prevVol = vol
return (self.audio_buffer, convertSize, vol)
def _onnx_inference(self, data):
if hasattr(self, "onnx_session") == False or self.onnx_session == None:
print("[Voice Changer] No onnx session.")
raise NoModeLoadedException("ONNX")
if self.settings.gpu < 0 or self.gpu_num == 0:
dev = torch.device("cpu")
else:
dev = torch.device("cuda", index=self.settings.gpu)
self.hubert_model = self.hubert_model.to(dev)
audio = data[0]
convertSize = data[1]
vol = data[2]
audio = resampy.resample(audio, self.settings.modelSamplingRate, 16000)
if vol < self.settings.silentThreshold:
return np.zeros(convertSize).astype(np.int16)
with torch.no_grad():
repeat = 3 if self.is_half else 1
repeat *= self.settings.rvcQuality # 0 or 3
vc = VC(self.settings.modelSamplingRate, dev, self.is_half, repeat)
sid = 0
times = [0, 0, 0]
f0_up_key = self.settings.tran
f0_method = self.settings.f0Detector
file_index = self.index_file if self.index_file != None else ""
file_big_npy = self.feature_file if self.feature_file != None else ""
index_rate = self.settings.indexRatio
if_f0 = 1 if self.settings.modelSlots[self.currentSlot].f0 else 0
f0_file = None
f0 = self.settings.modelSlots[self.currentSlot].f0
embChannels = self.settings.modelSlots[self.currentSlot].embChannels
audio_out = vc.pipeline(
self.hubert_model,
self.onnx_session,
sid,
audio,
times,
f0_up_key,
f0_method,
file_index,
file_big_npy,
index_rate,
if_f0,
f0_file=f0_file,
silence_front=self.settings.extraConvertSize
/ self.settings.modelSamplingRate,
embChannels=embChannels,
)
result = audio_out * np.sqrt(vol)
return result
def _pyTorch_inference(self, data):
if hasattr(self, "net_g") is False or self.net_g is None:
print(
"[Voice Changer] No pyTorch session.",
hasattr(self, "net_g"),
self.net_g,
)
raise NoModeLoadedException("pytorch")
if self.settings.gpu < 0 or (self.gpu_num == 0 and self.mps_enabled is False):
dev = torch.device("cpu")
elif self.mps_enabled:
dev = torch.device("mps")
else:
dev = torch.device("cuda", index=self.settings.gpu)
# print("device:", dev)
self.hubert_model = self.hubert_model.to(dev)
self.net_g = self.net_g.to(dev)
audio = data[0]
convertSize = data[1]
vol = data[2]
audio = resampy.resample(audio, self.settings.modelSamplingRate, 16000)
if vol < self.settings.silentThreshold:
return np.zeros(convertSize).astype(np.int16)
with torch.no_grad():
repeat = 3 if self.is_half else 1
repeat *= self.settings.rvcQuality # 0 or 3
vc = VC(self.settings.modelSamplingRate, dev, self.is_half, repeat)
sid = 0
times = [0, 0, 0]
f0_up_key = self.settings.tran
f0_method = self.settings.f0Detector
file_index = self.index_file if self.index_file != None else ""
file_big_npy = self.feature_file if self.feature_file != None else ""
index_rate = self.settings.indexRatio
if_f0 = 1 if self.settings.modelSlots[self.currentSlot].f0 else 0
f0_file = None
embChannels = self.settings.modelSlots[self.currentSlot].embChannels
audio_out = vc.pipeline(
self.hubert_model,
self.net_g,
sid,
audio,
times,
f0_up_key,
f0_method,
file_index,
file_big_npy,
index_rate,
if_f0,
f0_file=f0_file,
silence_front=self.settings.extraConvertSize
/ self.settings.modelSamplingRate,
embChannels=embChannels,
)
result = audio_out * np.sqrt(vol)
return result
def inference(self, data):
if self.settings.modelSlotIndex < 0:
print(
"[Voice Changer] wait for loading model...",
self.settings.modelSlotIndex,
self.currentSlot,
)
raise NoModeLoadedException("model_common")
if self.currentSlot != self.settings.modelSlotIndex:
print(f"Switch model {self.currentSlot} -> {self.settings.modelSlotIndex}")
self.currentSlot = self.settings.modelSlotIndex
self.switchModel()
if self.settings.framework == "ONNX":
audio = self._onnx_inference(data)
else:
audio = self._pyTorch_inference(data)
return audio
def __del__(self):
del self.net_g
del self.onnx_session
remove_path = os.path.join("RVC")
sys.path = [x for x in sys.path if x.endswith(remove_path) == False]
for key in list(sys.modules):
val = sys.modules.get(key)
try:
file_path = val.__file__
if file_path.find("RVC" + os.path.sep) >= 0:
print("remove", key, file_path)
sys.modules.pop(key)
except Exception as e:
pass
def export2onnx(self):
if hasattr(self, "net_g") == False or self.net_g == None:
print("[Voice Changer] export2onnx, No pyTorch session.")
return {"status": "ng", "path": f""}
pyTorchModelFile = self.settings.modelSlots[
self.settings.modelSlotIndex
].pyTorchModelFile # inference前にexportできるようにcurrentSlotではなくslot
if pyTorchModelFile == None:
print("[Voice Changer] export2onnx, No pyTorch filepath.")
return {"status": "ng", "path": f""}
import voice_changer.RVC.export2onnx as onnxExporter
output_file = os.path.splitext(os.path.basename(pyTorchModelFile))[0] + ".onnx"
output_file_simple = (
os.path.splitext(os.path.basename(pyTorchModelFile))[0] + "_simple.onnx"
)
output_path = os.path.join(TMP_DIR, output_file)
output_path_simple = os.path.join(TMP_DIR, output_file_simple)
print(
"embChannels",
self.settings.modelSlots[self.settings.modelSlotIndex].embChannels,
)
metadata = {
"application": "VC_CLIENT",
"version": "1",
"modelType": self.settings.modelSlots[
self.settings.modelSlotIndex
].modelType,
"samplingRate": self.settings.modelSlots[
self.settings.modelSlotIndex
].samplingRate,
"f0": self.settings.modelSlots[self.settings.modelSlotIndex].f0,
"embChannels": self.settings.modelSlots[
self.settings.modelSlotIndex
].embChannels,
"embedder": self.settings.modelSlots[self.settings.modelSlotIndex].embedder,
}
if torch.cuda.device_count() > 0:
onnxExporter.export2onnx(
pyTorchModelFile, output_path, output_path_simple, True, metadata
)
else:
print(
"[Voice Changer] Warning!!! onnx export with float32. maybe size is doubled."
)
onnxExporter.export2onnx(
pyTorchModelFile, output_path, output_path_simple, False, metadata
)
return {
"status": "ok",
"path": f"/tmp/{output_file_simple}",
"filename": output_file_simple,
}