voice-changer/server/voice_changer/RVC/RVC.py
2023-04-25 16:15:13 +09:00

512 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import os
import json
import resampy
from voice_changer.RVC.ModelWrapper import ModelWrapper
from Exceptions import NoModeLoadedException
# avoiding parse arg error in RVC
sys.argv = ["MMVCServerSIO.py"]
if sys.platform.startswith('darwin'):
baseDir = [x for x in sys.path if x.endswith("Contents/MacOS")]
if len(baseDir) != 1:
print("baseDir should be only one ", baseDir)
sys.exit()
modulePath = os.path.join(baseDir[0], "RVC")
sys.path.append(modulePath)
else:
sys.path.append("RVC")
import io
from dataclasses import dataclass, asdict, field
from functools import reduce
import numpy as np
import torch
import onnxruntime
# onnxruntime.set_default_logger_severity(3)
from const import HUBERT_ONNX_MODEL_PATH, TMP_DIR
import pyworld as pw
from voice_changer.RVC.custom_vc_infer_pipeline import VC
from infer_pack.models import SynthesizerTrnMs256NSFsid, SynthesizerTrnMs256NSFsid_nono
from .models import SynthesizerTrnMsNSFsid as SynthesizerTrnMsNSFsid_webui
from .models import SynthesizerTrnMsNSFsidNono as SynthesizerTrnMsNSFsidNono_webui
from .const import RVC_MODEL_TYPE_RVC, RVC_MODEL_TYPE_WEBUI
from fairseq import checkpoint_utils
providers = ['OpenVINOExecutionProvider', "CUDAExecutionProvider", "DmlExecutionProvider", "CPUExecutionProvider"]
@dataclass
class ModelSlot():
pyTorchModelFile: str = ""
onnxModelFile: str = ""
featureFile: str = ""
indexFile: str = ""
defaultTrans: int = ""
modelType: int = RVC_MODEL_TYPE_RVC
samplingRate: int = -1
f0: bool = True
embChannels: int = 256
deprecated: bool = False
# samplingRateOnnx: int = -1
# f0Onnx: bool = True
# embChannelsOnnx: int = 256
@dataclass
class RVCSettings():
gpu: int = 0
dstId: int = 0
f0Detector: str = "pm" # pm or harvest
tran: int = 20
silentThreshold: float = 0.00001
extraConvertSize: int = 1024 * 32
clusterInferRatio: float = 0.1
framework: str = "PyTorch" # PyTorch or ONNX
pyTorchModelFile: str = ""
onnxModelFile: str = ""
configFile: str = ""
modelSlots: list[ModelSlot] = field(
default_factory=lambda: [
ModelSlot(), ModelSlot(), ModelSlot()
]
)
indexRatio: float = 0
rvcQuality: int = 0
silenceFront: int = 1 # 0:off, 1:on
modelSamplingRate: int = 48000
modelSlotIndex: int = -1
speakers: dict[str, int] = field(
default_factory=lambda: {}
)
# ↓mutableな物だけ列挙
intData = ["gpu", "dstId", "tran", "extraConvertSize", "rvcQuality", "modelSamplingRate", "silenceFront", "modelSlotIndex"]
floatData = ["silentThreshold", "indexRatio"]
strData = ["framework", "f0Detector"]
class RVC:
def __init__(self, params):
self.initialLoad = True
self.settings = RVCSettings()
self.inferenceing: bool = False
self.net_g = None
self.onnx_session = None
self.feature_file = None
self.index_file = None
self.gpu_num = torch.cuda.device_count()
self.prevVol = 0
self.params = params
self.mps_enabled: bool = getattr(torch.backends, "mps", None) is not None and torch.backends.mps.is_available()
self.currentSlot = -1
print("RVC initialization: ", params)
print("mps: ", self.mps_enabled)
def loadModel(self, props):
self.is_half = props["isHalf"]
tmp_slot = props["slot"]
params_str = props["params"]
params = json.loads(params_str)
newSlot = asdict(self.settings.modelSlots[tmp_slot])
newSlot.update({
"pyTorchModelFile": props["files"]["pyTorchModelFilename"],
"onnxModelFile": props["files"]["onnxModelFilename"],
"featureFile": props["files"]["featureFilename"],
"indexFile": props["files"]["indexFilename"],
"defaultTrans": params["trans"]
})
self.settings.modelSlots[tmp_slot] = ModelSlot(**newSlot)
print("[Voice Changer] RVC loading... slot:", tmp_slot)
# Load metadata
if self.settings.modelSlots[tmp_slot].pyTorchModelFile != None and self.settings.modelSlots[tmp_slot].pyTorchModelFile != "":
self._setInfoByPytorch(tmp_slot, self.settings.modelSlots[tmp_slot].pyTorchModelFile)
if self.settings.modelSlots[tmp_slot].onnxModelFile != None and self.settings.modelSlots[tmp_slot].onnxModelFile != "":
self._setInfoByONNX(tmp_slot, self.settings.modelSlots[tmp_slot].onnxModelFile)
try:
hubert_path = self.params["hubert_base"]
models, saved_cfg, task = checkpoint_utils.load_model_ensemble_and_task([hubert_path], suffix="",)
model = models[0]
model.eval()
if self.is_half:
model = model.half()
self.hubert_model = model
except Exception as e:
print("EXCEPTION during loading hubert/contentvec model", e)
if self.initialLoad:
self.prepareModel(tmp_slot)
self.settings.modelSlotIndex = tmp_slot
self.currentSlot = self.settings.modelSlotIndex
self.switchModel()
self.initialLoad = False
return self.get_info()
def _setInfoByPytorch(self, slot, file):
cpt = torch.load(file, map_location="cpu")
config_len = len(cpt["config"])
if config_len == 18:
self.settings.modelSlots[slot].modelType = RVC_MODEL_TYPE_RVC
self.settings.modelSlots[slot].embChannels = 256
else:
self.settings.modelSlots[slot].modelType = RVC_MODEL_TYPE_WEBUI
self.settings.modelSlots[slot].embChannels = cpt["config"][17]
self.settings.modelSlots[slot].f0 = True if cpt["f0"] == 1 else False
self.settings.modelSlots[slot].samplingRate = cpt["config"][-1]
self.settings.modelSamplingRate = cpt["config"][-1]
def _setInfoByONNX(self, slot, file):
tmp_onnx_session = ModelWrapper(file)
self.settings.modelSlots[slot].modelType = tmp_onnx_session.getModelType()
self.settings.modelSlots[slot].embChannelsOnnx = tmp_onnx_session.getEmbChannels()
self.settings.modelSlots[slot].f0 = tmp_onnx_session.getF0()
self.settings.modelSlots[slot].samplingRate = tmp_onnx_session.getSamplingRate()
self.settings.modelSlots[slot].deprecated = tmp_onnx_session.getDeprecated()
def prepareModel(self, slot: int):
print("[Voice Changer] Prepare Model of slot:", slot)
pyTorchModelFile = self.settings.modelSlots[slot].pyTorchModelFile
onnxModelFile = self.settings.modelSlots[slot].onnxModelFile
# PyTorchモデル生成
if pyTorchModelFile != None and pyTorchModelFile != "":
print("[Voice Changer] Loading Pytorch Model...")
cpt = torch.load(pyTorchModelFile, map_location="cpu")
'''
(1) オリジナルとrvc-webuiのモデル判定 ⇒ config全体の形状
ーマル256
[1025, 32, 192, 192, 768, 2, 6, 3, 0, '1', [3, 7, 11], [[1, 3, 5], [1, 3, 5], [1, 3, 5]], [10, 6, 2, 2, 2], 512, [16, 16, 4, 4, 4], 109, 256, 48000]
■ ノーマル 768対応
[1025, 32, 192, 192, 768, 2, 6, 3, 0, '1', [3, 7, 11], [[1, 3, 5], [1, 3, 5], [1, 3, 5]], [10, 6, 2, 2, 2], 512, [16, 16, 4, 4, 4], 109, 256, 768, 48000]
⇒ 18: オリジナル, 19: rvc-webui
(2-1) オリジナルのーマルorPitchレス判定 ⇒ ckp["f0"]で判定
0: ピッチレス, 1:ノーマル
(2-2) rvc-webuiの、(256 or 768) x (ーマルor pitchレス)判定 ⇒ 256, or 768 は17番目の要素で判定。, ーマルor pitchレスはckp["f0"]で判定
'''
# config_len = len(cpt["config"])
# if config_len == 18:
# self.settings.modelSlots[slot].modelType = RVC_MODEL_TYPE_RVC
# self.settings.modelSlots[slot].embChannels = 256
# else:
# self.settings.modelSlots[slot].modelType = RVC_MODEL_TYPE_WEBUI
# self.settings.modelSlots[slot].embChannels = cpt["config"][17]
# self.settings.modelSlots[slot].f0 = True if cpt["f0"] == 1 else False
# self.settings.modelSlots[slot].samplingRate = cpt["config"][-1]
# self.settings.modelSamplingRate = cpt["config"][-1]
if self.settings.modelSlots[slot].modelType == RVC_MODEL_TYPE_RVC and self.settings.modelSlots[slot].f0 == True:
net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=self.is_half)
elif self.settings.modelSlots[slot].modelType == RVC_MODEL_TYPE_RVC and self.settings.modelSlots[slot].f0 == False:
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"])
elif self.settings.modelSlots[slot].modelType == RVC_MODEL_TYPE_WEBUI and self.settings.modelSlots[slot].f0 == True:
net_g = SynthesizerTrnMsNSFsid_webui(**cpt["params"], is_half=self.is_half)
elif self.settings.modelSlots[slot].modelType == RVC_MODEL_TYPE_WEBUI and self.settings.modelSlots[slot].f0 == False:
######################
# TBD
######################
print("webui non-f0 is not supported yet")
net_g = SynthesizerTrnMsNSFsidNono_webui(**cpt["params"], is_half=self.is_half)
else:
print("unknwon")
net_g.eval()
net_g.load_state_dict(cpt["weight"], strict=False)
if self.is_half:
net_g = net_g.half()
self.next_net_g = net_g
else:
print("[Voice Changer] Skip Loading Pytorch Model...")
self.next_net_g = None
# ONNXモデル生成
if onnxModelFile != None and onnxModelFile != "":
print("[Voice Changer] Loading ONNX Model...")
self.next_onnx_session = ModelWrapper(onnxModelFile)
# self.settings.modelSlots[slot].samplingRateOnnx = self.next_onnx_session.getSamplingRate()
# self.settings.modelSlots[slot].f0Onnx = self.next_onnx_session.getF0()
# self.settings.modelSlots[slot].embChannelsOnnx = self.next_onnx_session.getEmbChannels()
# # ONNXがある場合は、ONNXの設定を優先
# self.settings.modelSlots[slot].samplingRate = self.settings.modelSlots[slot].samplingRateOnnx
# self.settings.modelSlots[slot].f0 = self.settings.modelSlots[slot].f0Onnx
# self.settings.modelSlots[slot].embChannels = self.settings.modelSlots[slot].embChannelsOnnx
else:
print("[Voice Changer] Skip Loading ONNX Model...")
self.next_onnx_session = None
self.next_feature_file = self.settings.modelSlots[slot].featureFile
self.next_index_file = self.settings.modelSlots[slot].indexFile
self.next_trans = self.settings.modelSlots[slot].defaultTrans
self.next_samplingRate = self.settings.modelSlots[slot].samplingRate
self.next_framework = "ONNX" if self.next_onnx_session != None else "PyTorch"
print("[Voice Changer] Prepare done.",)
return self.get_info()
def switchModel(self):
print("[Voice Changer] Switching model..",)
# del self.net_g
# del self.onnx_session
self.net_g = self.next_net_g
self.onnx_session = self.next_onnx_session
self.feature_file = self.next_feature_file
self.index_file = self.next_index_file
self.settings.tran = self.next_trans
self.settings.framework = self.next_framework
self.settings.modelSamplingRate = self.next_samplingRate
self.next_net_g = None
self.next_onnx_session = None
print("[Voice Changer] Switching model..done",)
def update_settings(self, key: str, val: any):
if key == "onnxExecutionProvider" and self.onnx_session != None:
if val == "CUDAExecutionProvider":
if self.settings.gpu < 0 or self.settings.gpu >= self.gpu_num:
self.settings.gpu = 0
provider_options = [{'device_id': self.settings.gpu}]
self.onnx_session.set_providers(providers=[val], provider_options=provider_options)
if hasattr(self, "hubert_onnx"):
self.hubert_onnx.set_providers(providers=[val], provider_options=provider_options)
else:
self.onnx_session.set_providers(providers=[val])
if hasattr(self, "hubert_onnx"):
self.hubert_onnx.set_providers(providers=[val])
elif key == "onnxExecutionProvider" and self.onnx_session == None:
print("Onnx is not enabled. Please load model.")
return False
elif key in self.settings.intData:
if key == "gpu" and val >= 0 and val < self.gpu_num and self.onnx_session != None:
providers = self.onnx_session.get_providers()
print("Providers:", providers)
if "CUDAExecutionProvider" in providers:
provider_options = [{'device_id': self.settings.gpu}]
self.onnx_session.set_providers(providers=["CUDAExecutionProvider"], provider_options=provider_options)
if key == "modelSlotIndex":
# self.switchModel(int(val))
val = int(val) % 1000 # Quick hack for same slot is selected
self.prepareModel(val)
self.currentSlot = -1
setattr(self.settings, key, int(val))
elif key in self.settings.floatData:
setattr(self.settings, key, float(val))
elif key in self.settings.strData:
setattr(self.settings, key, str(val))
else:
return False
return True
def get_info(self):
data = asdict(self.settings)
data["onnxExecutionProviders"] = self.onnx_session.get_providers() if self.onnx_session != None else []
files = ["configFile", "pyTorchModelFile", "onnxModelFile"]
for f in files:
if data[f] != None and os.path.exists(data[f]):
data[f] = os.path.basename(data[f])
else:
data[f] = ""
return data
def get_processing_sampling_rate(self):
return self.settings.modelSamplingRate
def generate_input(self, newData: any, inputSize: int, crossfadeSize: int, solaSearchFrame: int = 0):
newData = newData.astype(np.float32) / 32768.0
if hasattr(self, "audio_buffer"):
self.audio_buffer = np.concatenate([self.audio_buffer, newData], 0) # 過去のデータに連結
else:
self.audio_buffer = newData
convertSize = inputSize + crossfadeSize + solaSearchFrame + self.settings.extraConvertSize
if convertSize % 128 != 0: # モデルの出力のホップサイズで切り捨てが発生するので補う。
convertSize = convertSize + (128 - (convertSize % 128))
self.audio_buffer = self.audio_buffer[-1 * convertSize:] # 変換対象の部分だけ抽出
crop = self.audio_buffer[-1 * (inputSize + crossfadeSize):-1 * (crossfadeSize)] # 出力部分だけ切り出して音量を確認。(solaとの関係性について、現状は無考慮)
rms = np.sqrt(np.square(crop).mean(axis=0))
vol = max(rms, self.prevVol * 0.0)
self.prevVol = vol
return (self.audio_buffer, convertSize, vol)
def _onnx_inference(self, data):
if hasattr(self, "onnx_session") == False or self.onnx_session == None:
print("[Voice Changer] No onnx session.")
raise NoModeLoadedException("ONNX")
if self.settings.gpu < 0 or self.gpu_num == 0:
dev = torch.device("cpu")
else:
dev = torch.device("cuda", index=self.settings.gpu)
self.hubert_model = self.hubert_model.to(dev)
audio = data[0]
convertSize = data[1]
vol = data[2]
audio = resampy.resample(audio, self.settings.modelSamplingRate, 16000)
if vol < self.settings.silentThreshold:
return np.zeros(convertSize).astype(np.int16)
with torch.no_grad():
repeat = 3 if self.is_half else 1
repeat *= self.settings.rvcQuality # 0 or 3
vc = VC(self.settings.modelSamplingRate, dev, self.is_half, repeat)
sid = 0
times = [0, 0, 0]
f0_up_key = self.settings.tran
f0_method = self.settings.f0Detector
file_index = self.index_file if self.index_file != None else ""
file_big_npy = self.feature_file if self.feature_file != None else ""
index_rate = self.settings.indexRatio
if_f0 = 1 if self.settings.modelSlots[self.currentSlot].f0 else 0
f0_file = None
f0 = self.settings.modelSlots[self.currentSlot].f0
embChannels = self.settings.modelSlots[self.currentSlot].embChannels
audio_out = vc.pipeline(self.hubert_model, self.onnx_session, sid, audio, times, f0_up_key, f0_method,
file_index, file_big_npy, index_rate, if_f0, f0_file=f0_file, silence_front=self.settings.extraConvertSize / self.settings.modelSamplingRate, embChannels=embChannels)
result = audio_out * np.sqrt(vol)
return result
def _pyTorch_inference(self, data):
if hasattr(self, "net_g") == False or self.net_g == None:
print("[Voice Changer] No pyTorch session.", hasattr(self, "net_g"), self.net_g)
raise NoModeLoadedException("pytorch")
if self.settings.gpu < 0 or (self.gpu_num == 0 and self.mps_enabled == False):
dev = torch.device("cpu")
elif self.mps_enabled:
dev = torch.device("mps")
else:
dev = torch.device("cuda", index=self.settings.gpu)
# print("device:", dev)
self.hubert_model = self.hubert_model.to(dev)
self.net_g = self.net_g.to(dev)
audio = data[0]
convertSize = data[1]
vol = data[2]
audio = resampy.resample(audio, self.settings.modelSamplingRate, 16000)
if vol < self.settings.silentThreshold:
return np.zeros(convertSize).astype(np.int16)
with torch.no_grad():
repeat = 3 if self.is_half else 1
repeat *= self.settings.rvcQuality # 0 or 3
vc = VC(self.settings.modelSamplingRate, dev, self.is_half, repeat)
sid = 0
times = [0, 0, 0]
f0_up_key = self.settings.tran
f0_method = self.settings.f0Detector
file_index = self.index_file if self.index_file != None else ""
file_big_npy = self.feature_file if self.feature_file != None else ""
index_rate = self.settings.indexRatio
if_f0 = 1 if self.settings.modelSlots[self.currentSlot].f0 else 0
f0_file = None
embChannels = self.settings.modelSlots[self.currentSlot].embChannels
audio_out = vc.pipeline(self.hubert_model, self.net_g, sid, audio, times, f0_up_key, f0_method,
file_index, file_big_npy, index_rate, if_f0, f0_file=f0_file, silence_front=self.settings.extraConvertSize / self.settings.modelSamplingRate, embChannels=embChannels)
result = audio_out * np.sqrt(vol)
return result
def inference(self, data):
if self.settings.modelSlotIndex < 0:
print("[Voice Changer] wait for loading model...", self.settings.modelSlotIndex, self.currentSlot)
raise NoModeLoadedException("model_common")
if self.currentSlot != self.settings.modelSlotIndex:
print(f"Switch model {self.currentSlot} -> {self.settings.modelSlotIndex}")
self.currentSlot = self.settings.modelSlotIndex
self.switchModel()
if self.settings.framework == "ONNX":
audio = self._onnx_inference(data)
else:
audio = self._pyTorch_inference(data)
return audio
def __del__(self):
del self.net_g
del self.onnx_session
remove_path = os.path.join("RVC")
sys.path = [x for x in sys.path if x.endswith(remove_path) == False]
for key in list(sys.modules):
val = sys.modules.get(key)
try:
file_path = val.__file__
if file_path.find("RVC" + os.path.sep) >= 0:
print("remove", key, file_path)
sys.modules.pop(key)
except Exception as e:
pass
def export2onnx(self):
if hasattr(self, "net_g") == False or self.net_g == None:
print("[Voice Changer] export2onnx, No pyTorch session.")
return {"status": "ng", "path": f""}
pyTorchModelFile = self.settings.modelSlots[self.settings.modelSlotIndex].pyTorchModelFile # inference前にexportできるようにcurrentSlotではなくslot
if pyTorchModelFile == None:
print("[Voice Changer] export2onnx, No pyTorch filepath.")
return {"status": "ng", "path": f""}
import voice_changer.RVC.export2onnx as onnxExporter
output_file = os.path.splitext(os.path.basename(pyTorchModelFile))[0] + ".onnx"
output_file_simple = os.path.splitext(os.path.basename(pyTorchModelFile))[0] + "_simple.onnx"
output_path = os.path.join(TMP_DIR, output_file)
output_path_simple = os.path.join(TMP_DIR, output_file_simple)
metadata = {
"application": "VC_CLIENT",
"version": "1",
"modelType": self.settings.modelSlots[self.settings.modelSlotIndex].modelType,
"samplingRate": self.settings.modelSlots[self.settings.modelSlotIndex].samplingRate,
"f0": self.settings.modelSlots[self.settings.modelSlotIndex].f0,
"embChannels": self.settings.modelSlots[self.settings.modelSlotIndex].embChannels,
}
if torch.cuda.device_count() > 0:
onnxExporter.export2onnx(pyTorchModelFile, output_path, output_path_simple, True, metadata)
else:
print("[Voice Changer] Warning!!! onnx export with float32. maybe size is doubled.")
onnxExporter.export2onnx(pyTorchModelFile, output_path, output_path_simple, False, metadata)
return {"status": "ok", "path": f"/tmp/{output_file_simple}", "filename": output_file_simple}