WIP: refactoring

This commit is contained in:
wataru 2023-04-28 06:39:51 +09:00
parent 55118815b4
commit 308fd190f3
13 changed files with 182 additions and 178 deletions

View File

@ -9,7 +9,7 @@
"editor.formatOnSave": true // "editor.formatOnSave": true //
}, },
"flake8.args": [ "flake8.args": [
"--ignore=E501" "--ignore=E501,E402,W503"
// "--max-line-length=150", // "--max-line-length=150",
// "--max-complexity=20" // "--max-complexity=20"
] ]

View File

@ -1,12 +1,13 @@
class NoModeLoadedException(Exception): class NoModeLoadedException(Exception):
def __init__(self, framework): def __init__(self, framework):
self.framework = framework self.framework = framework
def __str__(self): def __str__(self):
return repr(f"No model for {self.framework} loaded. Please confirm the model uploaded.") return repr(
f"No model for {self.framework} loaded. Please confirm the model uploaded."
)
class ONNXInputArgumentException(Exception): class ONNXInputArgumentException(Exception):
def __str__(self): def __str__(self):
return repr(f"ONNX received invalid argument.") return repr("ONNX received invalid argument.")

View File

@ -4,7 +4,15 @@ import tempfile
from typing import Literal, TypeAlias from typing import Literal, TypeAlias
ModelType: TypeAlias = Literal['MMVCv15', 'MMVCv13', 'so-vits-svc-40v2', 'so-vits-svc-40', 'so-vits-svc-40_c', 'DDSP-SVC', 'RVC'] ModelType: TypeAlias = Literal[
"MMVCv15",
"MMVCv13",
"so-vits-svc-40v2",
"so-vits-svc-40",
"so-vits-svc-40_c",
"DDSP-SVC",
"RVC",
]
ERROR_NO_ONNX_SESSION = "ERROR_NO_ONNX_SESSION" ERROR_NO_ONNX_SESSION = "ERROR_NO_ONNX_SESSION"
@ -13,19 +21,45 @@ tmpdir = tempfile.TemporaryDirectory()
# print("generate tmpdir:::",tmpdir) # print("generate tmpdir:::",tmpdir)
SSL_KEY_DIR = os.path.join(tmpdir.name, "keys") if hasattr(sys, "_MEIPASS") else "keys" SSL_KEY_DIR = os.path.join(tmpdir.name, "keys") if hasattr(sys, "_MEIPASS") else "keys"
MODEL_DIR = os.path.join(tmpdir.name, "logs") if hasattr(sys, "_MEIPASS") else "logs" MODEL_DIR = os.path.join(tmpdir.name, "logs") if hasattr(sys, "_MEIPASS") else "logs"
UPLOAD_DIR = os.path.join(tmpdir.name, "upload_dir") if hasattr(sys, "_MEIPASS") else "upload_dir" UPLOAD_DIR = (
NATIVE_CLIENT_FILE_WIN = os.path.join(sys._MEIPASS, "voice-changer-native-client.exe") if hasattr(sys, "_MEIPASS") else "voice-changer-native-client" os.path.join(tmpdir.name, "upload_dir")
NATIVE_CLIENT_FILE_MAC = os.path.join(sys._MEIPASS, "voice-changer-native-client.app", "Contents", "MacOS", if hasattr(sys, "_MEIPASS")
"voice-changer-native-client") if hasattr(sys, "_MEIPASS") else "voice-changer-native-client" else "upload_dir"
)
NATIVE_CLIENT_FILE_WIN = (
os.path.join(sys._MEIPASS, "voice-changer-native-client.exe") # type: ignore
if hasattr(sys, "_MEIPASS")
else "voice-changer-native-client"
)
NATIVE_CLIENT_FILE_MAC = (
os.path.join(
sys._MEIPASS, # type: ignore
"voice-changer-native-client.app",
"Contents",
"MacOS",
"voice-changer-native-client",
)
if hasattr(sys, "_MEIPASS")
else "voice-changer-native-client"
)
HUBERT_ONNX_MODEL_PATH = os.path.join(sys._MEIPASS, "model_hubert/hubert_simple.onnx") if hasattr(sys, HUBERT_ONNX_MODEL_PATH = (
"_MEIPASS") else "model_hubert/hubert_simple.onnx" os.path.join(sys._MEIPASS, "model_hubert/hubert_simple.onnx") # type: ignore
if hasattr(sys, "_MEIPASS")
else "model_hubert/hubert_simple.onnx"
)
TMP_DIR = os.path.join(tmpdir.name, "tmp_dir") if hasattr(sys, "_MEIPASS") else "tmp_dir" TMP_DIR = (
os.path.join(tmpdir.name, "tmp_dir") if hasattr(sys, "_MEIPASS") else "tmp_dir"
)
os.makedirs(TMP_DIR, exist_ok=True) os.makedirs(TMP_DIR, exist_ok=True)
def getFrontendPath(): def getFrontendPath():
frontend_path = os.path.join(sys._MEIPASS, "dist") if hasattr(sys, "_MEIPASS") else "../client/demo/dist" frontend_path = (
os.path.join(sys._MEIPASS, "dist")
if hasattr(sys, "_MEIPASS")
else "../client/demo/dist"
)
return frontend_path return frontend_path

View File

@ -10,6 +10,9 @@ from restapi.mods.FileUploader import upload_file, concat_file_chunks
from voice_changer.VoiceChangerManager import VoiceChangerManager from voice_changer.VoiceChangerManager import VoiceChangerManager
from const import MODEL_DIR, UPLOAD_DIR, ModelType from const import MODEL_DIR, UPLOAD_DIR, ModelType
from voice_changer.utils.LoadModelParams import FilePaths, LoadModelParams
from dataclasses import fields
os.makedirs(UPLOAD_DIR, exist_ok=True) os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(MODEL_DIR, exist_ok=True) os.makedirs(MODEL_DIR, exist_ok=True)
@ -30,12 +33,6 @@ class MMVC_Rest_Fileuploader:
"/update_settings", self.post_update_settings, methods=["POST"] "/update_settings", self.post_update_settings, methods=["POST"]
) )
self.router.add_api_route("/load_model", self.post_load_model, methods=["POST"]) self.router.add_api_route("/load_model", self.post_load_model, methods=["POST"])
self.router.add_api_route(
"/load_model_for_train", self.post_load_model_for_train, methods=["POST"]
)
self.router.add_api_route(
"/extract_voices", self.post_extract_voices, methods=["POST"]
)
self.router.add_api_route("/model_type", self.post_model_type, methods=["POST"]) self.router.add_api_route("/model_type", self.post_model_type, methods=["POST"])
self.router.add_api_route("/model_type", self.get_model_type, methods=["GET"]) self.router.add_api_route("/model_type", self.get_model_type, methods=["GET"])
self.router.add_api_route("/onnx", self.get_onnx, methods=["GET"]) self.router.add_api_route("/onnx", self.get_onnx, methods=["GET"])
@ -80,74 +77,42 @@ class MMVC_Rest_Fileuploader:
isHalf: bool = Form(...), isHalf: bool = Form(...),
params: str = Form(...), params: str = Form(...),
): ):
props = { files = FilePaths(
"slot": slot, configFilename=configFilename,
"isHalf": isHalf, pyTorchModelFilename=pyTorchModelFilename,
"files": { onnxModelFilename=onnxModelFilename,
"configFilename": configFilename, clusterTorchModelFilename=clusterTorchModelFilename,
"pyTorchModelFilename": pyTorchModelFilename, featureFilename=featureFilename,
"onnxModelFilename": onnxModelFilename, indexFilename=indexFilename,
"clusterTorchModelFilename": clusterTorchModelFilename, )
"featureFilename": featureFilename, props: LoadModelParams = LoadModelParams(
"indexFilename": indexFilename, slot=slot, isHalf=isHalf, params=params, files=files
}, )
"params": params,
}
# Change Filepath # Change Filepath
for key, val in props["files"].items(): for field in fields(props.files):
key = field.name
val = getattr(props.files, key)
if val != "-": if val != "-":
uploadPath = os.path.join(UPLOAD_DIR, val) uploadPath = os.path.join(UPLOAD_DIR, val)
storeDir = os.path.join(UPLOAD_DIR, f"{slot}") storeDir = os.path.join(UPLOAD_DIR, f"{slot}")
os.makedirs(storeDir, exist_ok=True) os.makedirs(storeDir, exist_ok=True)
storePath = os.path.join(storeDir, val) storePath = os.path.join(storeDir, val)
shutil.move(uploadPath, storePath) shutil.move(uploadPath, storePath)
props["files"][key] = storePath setattr(props.files, key, storePath)
else: else:
props["files"][key] = None setattr(props.files, key, None)
# print("---------------------------------------------------2>", props)
info = self.voiceChangerManager.loadModel(props) info = self.voiceChangerManager.loadModel(props)
json_compatible_item_data = jsonable_encoder(info) json_compatible_item_data = jsonable_encoder(info)
return JSONResponse(content=json_compatible_item_data) return JSONResponse(content=json_compatible_item_data)
# return {"load": f"{configFilePath}, {pyTorchModelFilePath}, {onnxModelFilePath}"}
def post_load_model_for_train( def post_model_type(self, modelType: ModelType = Form(...)):
self,
modelGFilename: str = Form(...),
modelGFilenameChunkNum: int = Form(...),
modelDFilename: str = Form(...),
modelDFilenameChunkNum: int = Form(...),
):
modelGFilePath = concat_file_chunks(
UPLOAD_DIR, modelGFilename, modelGFilenameChunkNum, MODEL_DIR
)
modelDFilePath = concat_file_chunks(
UPLOAD_DIR, modelDFilename, modelDFilenameChunkNum, MODEL_DIR
)
return {"File saved": f"{modelGFilePath}, {modelDFilePath}"}
def post_extract_voices(
self,
zipFilename: str = Form(...),
zipFileChunkNum: int = Form(...),
):
zipFilePath = concat_file_chunks(
UPLOAD_DIR, zipFilename, zipFileChunkNum, UPLOAD_DIR
)
shutil.unpack_archive(zipFilePath, "MMVC_Trainer/dataset/textful/")
return {"Zip file unpacked": f"{zipFilePath}"}
def post_model_type(
self,
modelType: ModelType = Form(...),
):
info = self.voiceChangerManager.switchModelType(modelType) info = self.voiceChangerManager.switchModelType(modelType)
json_compatible_item_data = jsonable_encoder(info) json_compatible_item_data = jsonable_encoder(info)
return JSONResponse(content=json_compatible_item_data) return JSONResponse(content=json_compatible_item_data)
def get_model_type( def get_model_type(self):
self,
):
info = self.voiceChangerManager.getModelType() info = self.voiceChangerManager.getModelType()
json_compatible_item_data = jsonable_encoder(info) json_compatible_item_data = jsonable_encoder(info)
return JSONResponse(content=json_compatible_item_data) return JSONResponse(content=json_compatible_item_data)

View File

@ -30,7 +30,6 @@ class MMVC_Namespace(socketio.AsyncNamespace):
else: else:
unpackedData = np.array(struct.unpack('<%sh' % (len(data) // struct.calcsize('<h')), data)).astype(np.int16) unpackedData = np.array(struct.unpack('<%sh' % (len(data) // struct.calcsize('<h')), data)).astype(np.int16)
# audio1, perf = self.voiceChangerManager.changeVoice(unpackedData)
res = self.voiceChangerManager.changeVoice(unpackedData) res = self.voiceChangerManager.changeVoice(unpackedData)
audio1 = res[0] audio1 = res[0]
perf = res[1] if len(res) == 2 else [0, 0, 0] perf = res[1] if len(res) == 2 else [0, 0, 0]

Binary file not shown.

Binary file not shown.

View File

@ -4,8 +4,18 @@ import json
import resampy import resampy
from voice_changer.RVC.ModelWrapper import ModelWrapper from voice_changer.RVC.ModelWrapper import ModelWrapper
from Exceptions import NoModeLoadedException from Exceptions import NoModeLoadedException
from voice_changer.utils.LoadModelParams import LoadModelParams
from voice_changer.utils.VoiceChangerParams import VoiceChangerParams from voice_changer.utils.VoiceChangerParams import VoiceChangerParams
from dataclasses import dataclass, asdict, field
import numpy as np
import torch
from fairseq import checkpoint_utils
from const import TMP_DIR
# avoiding parse arg error in RVC # avoiding parse arg error in RVC
sys.argv = ["MMVCServerSIO.py"] sys.argv = ["MMVCServerSIO.py"]
@ -19,25 +29,12 @@ if sys.platform.startswith("darwin"):
else: else:
sys.path.append("RVC") sys.path.append("RVC")
import io
from dataclasses import dataclass, asdict, field
from functools import reduce
import numpy as np
import torch
import onnxruntime
# onnxruntime.set_default_logger_severity(3)
from const import HUBERT_ONNX_MODEL_PATH, TMP_DIR
import pyworld as pw
from voice_changer.RVC.custom_vc_infer_pipeline import VC
from infer_pack.models import SynthesizerTrnMs256NSFsid, SynthesizerTrnMs256NSFsid_nono
from .models import SynthesizerTrnMsNSFsid as SynthesizerTrnMsNSFsid_webui from .models import SynthesizerTrnMsNSFsid as SynthesizerTrnMsNSFsid_webui
from .models import SynthesizerTrnMsNSFsidNono as SynthesizerTrnMsNSFsidNono_webui from .models import SynthesizerTrnMsNSFsidNono as SynthesizerTrnMsNSFsidNono_webui
from .const import RVC_MODEL_TYPE_RVC, RVC_MODEL_TYPE_WEBUI from .const import RVC_MODEL_TYPE_RVC, RVC_MODEL_TYPE_WEBUI
from fairseq import checkpoint_utils from voice_changer.RVC.custom_vc_infer_pipeline import VC
from infer_pack.models import SynthesizerTrnMs256NSFsid, SynthesizerTrnMs256NSFsid_nono
providers = [ providers = [
"OpenVINOExecutionProvider", "OpenVINOExecutionProvider",
@ -53,7 +50,7 @@ class ModelSlot:
onnxModelFile: str = "" onnxModelFile: str = ""
featureFile: str = "" featureFile: str = ""
indexFile: str = "" indexFile: str = ""
defaultTrans: int = "" defaultTrans: int = 0
modelType: int = RVC_MODEL_TYPE_RVC modelType: int = RVC_MODEL_TYPE_RVC
samplingRate: int = -1 samplingRate: int = -1
f0: bool = True f0: bool = True
@ -125,19 +122,19 @@ class RVC:
print("RVC initialization: ", params) print("RVC initialization: ", params)
print("mps: ", self.mps_enabled) print("mps: ", self.mps_enabled)
def loadModel(self, props): def loadModel(self, props: LoadModelParams):
self.is_half = props["isHalf"] self.is_half = props.isHalf
tmp_slot = props["slot"] tmp_slot = props.slot
params_str = props["params"] params_str = props.params
params = json.loads(params_str) params = json.loads(params_str)
newSlot = asdict(self.settings.modelSlots[tmp_slot]) newSlot = asdict(self.settings.modelSlots[tmp_slot])
newSlot.update( newSlot.update(
{ {
"pyTorchModelFile": props["files"]["pyTorchModelFilename"], "pyTorchModelFile": props.files.pyTorchModelFilename,
"onnxModelFile": props["files"]["onnxModelFilename"], "onnxModelFile": props.files.onnxModelFilename,
"featureFile": props["files"]["featureFilename"], "featureFile": props.files.featureFilename,
"indexFile": props["files"]["indexFilename"], "indexFile": props.files.indexFilename,
"defaultTrans": params["trans"], "defaultTrans": params["trans"],
} }
) )
@ -147,14 +144,14 @@ class RVC:
# Load metadata # Load metadata
if ( if (
self.settings.modelSlots[tmp_slot].pyTorchModelFile != None self.settings.modelSlots[tmp_slot].pyTorchModelFile is not None
and self.settings.modelSlots[tmp_slot].pyTorchModelFile != "" and self.settings.modelSlots[tmp_slot].pyTorchModelFile != ""
): ):
self._setInfoByPytorch( self._setInfoByPytorch(
tmp_slot, self.settings.modelSlots[tmp_slot].pyTorchModelFile tmp_slot, self.settings.modelSlots[tmp_slot].pyTorchModelFile
) )
if ( if (
self.settings.modelSlots[tmp_slot].onnxModelFile != None self.settings.modelSlots[tmp_slot].onnxModelFile is not None
and self.settings.modelSlots[tmp_slot].onnxModelFile != "" and self.settings.modelSlots[tmp_slot].onnxModelFile != ""
): ):
self._setInfoByONNX( self._setInfoByONNX(
@ -241,24 +238,24 @@ class RVC:
if ( if (
self.settings.modelSlots[slot].modelType == RVC_MODEL_TYPE_RVC self.settings.modelSlots[slot].modelType == RVC_MODEL_TYPE_RVC
and self.settings.modelSlots[slot].f0 == True and self.settings.modelSlots[slot].f0 is True
): ):
net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=self.is_half) net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=self.is_half)
elif ( elif (
self.settings.modelSlots[slot].modelType == RVC_MODEL_TYPE_RVC self.settings.modelSlots[slot].modelType == RVC_MODEL_TYPE_RVC
and self.settings.modelSlots[slot].f0 == False and self.settings.modelSlots[slot].f0 is False
): ):
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"])
elif ( elif (
self.settings.modelSlots[slot].modelType == RVC_MODEL_TYPE_WEBUI self.settings.modelSlots[slot].modelType == RVC_MODEL_TYPE_WEBUI
and self.settings.modelSlots[slot].f0 == True and self.settings.modelSlots[slot].f0 is True
): ):
net_g = SynthesizerTrnMsNSFsid_webui( net_g = SynthesizerTrnMsNSFsid_webui(
**cpt["params"], is_half=self.is_half **cpt["params"], is_half=self.is_half
) )
elif ( elif (
self.settings.modelSlots[slot].modelType == RVC_MODEL_TYPE_WEBUI self.settings.modelSlots[slot].modelType == RVC_MODEL_TYPE_WEBUI
and self.settings.modelSlots[slot].f0 == False and self.settings.modelSlots[slot].f0 is False
): ):
net_g = SynthesizerTrnMsNSFsidNono_webui( net_g = SynthesizerTrnMsNSFsidNono_webui(
**cpt["params"], is_half=self.is_half **cpt["params"], is_half=self.is_half
@ -295,7 +292,9 @@ class RVC:
self.next_index_file = self.settings.modelSlots[slot].indexFile self.next_index_file = self.settings.modelSlots[slot].indexFile
self.next_trans = self.settings.modelSlots[slot].defaultTrans self.next_trans = self.settings.modelSlots[slot].defaultTrans
self.next_samplingRate = self.settings.modelSlots[slot].samplingRate self.next_samplingRate = self.settings.modelSlots[slot].samplingRate
self.next_framework = "ONNX" if self.next_onnx_session != None else "PyTorch" self.next_framework = (
"ONNX" if self.next_onnx_session is not None else "PyTorch"
)
print( print(
"[Voice Changer] Prepare done.", "[Voice Changer] Prepare done.",
) )
@ -321,7 +320,7 @@ class RVC:
) )
def update_settings(self, key: str, val: any): def update_settings(self, key: str, val: any):
if key == "onnxExecutionProvider" and self.onnx_session != None: if key == "onnxExecutionProvider" and self.onnx_session is not None:
if val == "CUDAExecutionProvider": if val == "CUDAExecutionProvider":
if self.settings.gpu < 0 or self.settings.gpu >= self.gpu_num: if self.settings.gpu < 0 or self.settings.gpu >= self.gpu_num:
self.settings.gpu = 0 self.settings.gpu = 0
@ -345,7 +344,7 @@ class RVC:
key == "gpu" key == "gpu"
and val >= 0 and val >= 0
and val < self.gpu_num and val < self.gpu_num
and self.onnx_session != None and self.onnx_session is not None
): ):
providers = self.onnx_session.get_providers() providers = self.onnx_session.get_providers()
print("Providers:", providers) print("Providers:", providers)
@ -374,11 +373,11 @@ class RVC:
data = asdict(self.settings) data = asdict(self.settings)
data["onnxExecutionProviders"] = ( data["onnxExecutionProviders"] = (
self.onnx_session.get_providers() if self.onnx_session != None else [] self.onnx_session.get_providers() if self.onnx_session is not None else []
) )
files = ["configFile", "pyTorchModelFile", "onnxModelFile"] files = ["configFile", "pyTorchModelFile", "onnxModelFile"]
for f in files: for f in files:
if data[f] != None and os.path.exists(data[f]): if data[f] is not None and os.path.exists(data[f]):
data[f] = os.path.basename(data[f]) data[f] = os.path.basename(data[f])
else: else:
data[f] = "" data[f] = ""
@ -477,7 +476,7 @@ class RVC:
return result return result
def _pyTorch_inference(self, data): def _pyTorch_inference(self, data):
if hasattr(self, "net_g") == False or self.net_g == None: if hasattr(self, "net_g") is False or self.net_g is None:
print( print(
"[Voice Changer] No pyTorch session.", "[Voice Changer] No pyTorch session.",
hasattr(self, "net_g"), hasattr(self, "net_g"),
@ -485,7 +484,7 @@ class RVC:
) )
raise NoModeLoadedException("pytorch") raise NoModeLoadedException("pytorch")
if self.settings.gpu < 0 or (self.gpu_num == 0 and self.mps_enabled == False): if self.settings.gpu < 0 or (self.gpu_num == 0 and self.mps_enabled is False):
dev = torch.device("cpu") dev = torch.device("cpu")
elif self.mps_enabled: elif self.mps_enabled:
dev = torch.device("mps") dev = torch.device("mps")

View File

@ -1,10 +1,2 @@
# RVC_MODEL_TYPE_NORMAL = 0
# RVC_MODEL_TYPE_PITCHLESS = 1
# RVC_MODEL_TYPE_WEBUI_256_NORMAL = 2
# RVC_MODEL_TYPE_WEBUI_256_PITCHLESS = 3
# RVC_MODEL_TYPE_WEBUI_768_NORMAL = 4
# RVC_MODEL_TYPE_WEBUI_768_PITCHLESS = 5
# RVC_MODEL_TYPE_UNKNOWN = 99
RVC_MODEL_TYPE_RVC = 0 RVC_MODEL_TYPE_RVC = 0
RVC_MODEL_TYPE_WEBUI = 1 RVC_MODEL_TYPE_WEBUI = 1

View File

@ -1,4 +1,4 @@
from typing import Any, Callable, Optional, Protocol, TypeAlias, Union, cast from typing import Any, Union, cast
from const import TMP_DIR, ModelType from const import TMP_DIR, ModelType
import torch import torch
import os import os
@ -9,6 +9,7 @@ import resampy
from voice_changer.IORecorder import IORecorder from voice_changer.IORecorder import IORecorder
from voice_changer.utils.LoadModelParams import LoadModelParams
from voice_changer.utils.Timer import Timer from voice_changer.utils.Timer import Timer
from voice_changer.utils.VoiceChangerModel import VoiceChangerModel, AudioInOut from voice_changer.utils.VoiceChangerModel import VoiceChangerModel, AudioInOut
@ -24,8 +25,6 @@ providers = [
STREAM_INPUT_FILE = os.path.join(TMP_DIR, "in.wav") STREAM_INPUT_FILE = os.path.join(TMP_DIR, "in.wav")
STREAM_OUTPUT_FILE = os.path.join(TMP_DIR, "out.wav") STREAM_OUTPUT_FILE = os.path.join(TMP_DIR, "out.wav")
STREAM_ANALYZE_FILE_DIO = os.path.join(TMP_DIR, "analyze-dio.png")
STREAM_ANALYZE_FILE_HARVEST = os.path.join(TMP_DIR, "analyze-harvest.png")
@dataclass @dataclass
@ -51,18 +50,20 @@ class VoiceChangerSettings:
class VoiceChanger: class VoiceChanger:
settings: VoiceChangerSettings settings: VoiceChangerSettings
voiceChanger: VoiceChangerModel voiceChanger: VoiceChangerModel
ioRecorder: IORecorder
sola_buffer: AudioInOut
def __init__(self, params: VoiceChangerParams): def __init__(self, params: VoiceChangerParams):
# 初期化 # 初期化
self.settings = VoiceChangerSettings() self.settings = VoiceChangerSettings()
self.onnx_session = None self.onnx_session = None
self.currentCrossFadeOffsetRate = 0 self.currentCrossFadeOffsetRate = 0.0
self.currentCrossFadeEndRate = 0 self.currentCrossFadeEndRate = 0.0
self.currentCrossFadeOverlapSize = 0 # setting self.currentCrossFadeOverlapSize = 0 # setting
self.crossfadeSize = 0 # calculated self.crossfadeSize = 0 # calculated
self.voiceChanger = None self.voiceChanger = None
self.modelType = None self.modelType: ModelType | None = None
self.params = params self.params = params
self.gpu_num = torch.cuda.device_count() self.gpu_num = torch.cuda.device_count()
self.prev_audio = np.zeros(4096) self.prev_audio = np.zeros(4096)
@ -76,7 +77,7 @@ class VoiceChanger:
) )
def switchModelType(self, modelType: ModelType): def switchModelType(self, modelType: ModelType):
if hasattr(self, "voiceChanger") and self.voiceChanger != None: if hasattr(self, "voiceChanger") and self.voiceChanger is not None:
# return {"status": "ERROR", "msg": "vc is already selected. currently re-select is not implemented"} # return {"status": "ERROR", "msg": "vc is already selected. currently re-select is not implemented"}
del self.voiceChanger del self.voiceChanger
self.voiceChanger = None self.voiceChanger = None
@ -114,34 +115,18 @@ class VoiceChanger:
return {"status": "OK", "msg": "vc is switched."} return {"status": "OK", "msg": "vc is switched."}
def getModelType(self): def getModelType(self):
if self.modelType != None: if self.modelType is not None:
return {"status": "OK", "vc": self.modelType} return {"status": "OK", "vc": self.modelType}
else: else:
return {"status": "OK", "vc": "none"} return {"status": "OK", "vc": "none"}
def loadModel( def loadModel(self, props: LoadModelParams):
self,
props,
):
try: try:
return self.voiceChanger.loadModel(props) return self.voiceChanger.loadModel(props)
except Exception as e: except Exception as e:
print("[Voice Changer] Model Load Error! Check your model is valid.", e) print("[Voice Changer] Model Load Error! Check your model is valid.", e)
return {"status": "NG"} return {"status": "NG"}
# try:
# if self.modelType == "MMVCv15" or self.modelType == "MMVCv13":
# return self.voiceChanger.loadModel(config, pyTorch_model_file, onnx_model_file)
# elif self.modelType == "so-vits-svc-40" or self.modelType == "so-vits-svc-40_c" or self.modelType == "so-vits-svc-40v2":
# return self.voiceChanger.loadModel(config, pyTorch_model_file, onnx_model_file, clusterTorchModel)
# elif self.modelType == "RVC":
# return self.voiceChanger.loadModel(slot, config, pyTorch_model_file, onnx_model_file, feature_file, index_file, is_half)
# else:
# return self.voiceChanger.loadModel(config, pyTorch_model_file, onnx_model_file, clusterTorchModel)
# except Exception as e:
# print("[Voice Changer] Model Load Error! Check your model is valid.", e)
# return {"status": "NG"}
def get_info(self): def get_info(self):
data = asdict(self.settings) data = asdict(self.settings)
if hasattr(self, "voiceChanger"): if hasattr(self, "voiceChanger"):
@ -167,14 +152,6 @@ class VoiceChanger:
if hasattr(self, "ioRecorder"): if hasattr(self, "ioRecorder"):
self.ioRecorder.close() self.ioRecorder.close()
# if hasattr(self, "ioAnalyzer") == False:
# self.ioAnalyzer = IOAnalyzer()
# try:
# self.ioAnalyzer.analyze(STREAM_INPUT_FILE, STREAM_ANALYZE_FILE_DIO, STREAM_ANALYZE_FILE_HARVEST, self.settings.inputSampleRate)
# except Exception as e:
# print("recordIO exception", e)
elif key in self.settings.floatData: elif key in self.settings.floatData:
setattr(self.settings, key, float(val)) setattr(self.settings, key, float(val))
elif key in self.settings.strData: elif key in self.settings.strData:
@ -182,10 +159,10 @@ class VoiceChanger:
else: else:
if hasattr(self, "voiceChanger"): if hasattr(self, "voiceChanger"):
ret = self.voiceChanger.update_settings(key, val) ret = self.voiceChanger.update_settings(key, val)
if ret == False: if ret is False:
print(f"{key} is not mutable variable or unknown variable!") print(f"{key} is not mutable variable or unknown variable!")
else: else:
print(f"voice changer is not initialized!") print("voice changer is not initialized!")
return self.get_info() return self.get_info()
def _generate_strength(self, crossfadeSize: int): def _generate_strength(self, crossfadeSize: int):
@ -228,9 +205,9 @@ class VoiceChanger:
) )
# ひとつ前の結果とサイズが変わるため、記録は消去する。 # ひとつ前の結果とサイズが変わるため、記録は消去する。
if hasattr(self, "np_prev_audio1") == True: if hasattr(self, "np_prev_audio1") is True:
delattr(self, "np_prev_audio1") delattr(self, "np_prev_audio1")
if hasattr(self, "sola_buffer"): if hasattr(self, "sola_buffer") is True:
del self.sola_buffer del self.sola_buffer
# receivedData: tuple of short # receivedData: tuple of short
@ -275,9 +252,14 @@ class VoiceChanger:
# Inference # Inference
audio = self.voiceChanger.inference(data) audio = self.voiceChanger.inference(data)
if hasattr(self, "sola_buffer") == True: if hasattr(self, "sola_buffer") is True:
np.set_printoptions(threshold=10000) np.set_printoptions(threshold=10000)
audio = audio[-sola_search_frame - crossfade_frame - block_frame :] audio_offset = -1 * (
sola_search_frame + crossfade_frame + block_frame
)
audio = audio[audio_offset:]
a = 0
audio = audio[a:]
# SOLA algorithm from https://github.com/yxlllc/DDSP-SVC, https://github.com/liujing04/Retrieval-based-Voice-Conversion-WebUI # SOLA algorithm from https://github.com/yxlllc/DDSP-SVC, https://github.com/liujing04/Retrieval-based-Voice-Conversion-WebUI
cor_nom = np.convolve( cor_nom = np.convolve(
audio[: crossfade_frame + sola_search_frame], audio[: crossfade_frame + sola_search_frame],
@ -292,11 +274,9 @@ class VoiceChanger:
) )
+ 1e-3 + 1e-3
) )
sola_offset = np.argmax(cor_nom / cor_den) sola_offset = int(np.argmax(cor_nom / cor_den))
sola_end = sola_offset + block_frame
output_wav = audio[sola_offset : sola_offset + block_frame].astype( output_wav = audio[sola_offset:sola_end].astype(np.float64)
np.float64
)
output_wav[:crossfade_frame] *= self.np_cur_strength output_wav[:crossfade_frame] *= self.np_cur_strength
output_wav[:crossfade_frame] += self.sola_buffer[:] output_wav[:crossfade_frame] += self.sola_buffer[:]
@ -306,15 +286,12 @@ class VoiceChanger:
result = np.zeros(4096).astype(np.int16) result = np.zeros(4096).astype(np.int16)
if ( if (
hasattr(self, "sola_buffer") == True hasattr(self, "sola_buffer") is True
and sola_offset < sola_search_frame and sola_offset < sola_search_frame
): ):
sola_buf_org = audio[ offset = -1 * (sola_search_frame + crossfade_frame - sola_offset)
-sola_search_frame end = -1 * (sola_search_frame - sola_offset)
- crossfade_frame sola_buf_org = audio[offset:end]
+ sola_offset : -sola_search_frame
+ sola_offset
]
self.sola_buffer = sola_buf_org * self.np_prev_strength self.sola_buffer = sola_buf_org * self.np_prev_strength
else: else:
self.sola_buffer = audio[-crossfade_frame:] * self.np_prev_strength self.sola_buffer = audio[-crossfade_frame:] * self.np_prev_strength
@ -379,7 +356,7 @@ PRINT_CONVERT_PROCESSING: bool = False
def print_convert_processing(mess: str): def print_convert_processing(mess: str):
if PRINT_CONVERT_PROCESSING == True: if PRINT_CONVERT_PROCESSING is True:
print(mess) print(mess)

View File

@ -1,6 +1,8 @@
import numpy as np import numpy as np
from voice_changer.VoiceChanger import VoiceChanger from voice_changer.VoiceChanger import VoiceChanger
from const import ModelType from const import ModelType
from voice_changer.utils.LoadModelParams import LoadModelParams
from voice_changer.utils.VoiceChangerModel import AudioInOut
from voice_changer.utils.VoiceChangerParams import VoiceChangerParams from voice_changer.utils.VoiceChangerParams import VoiceChangerParams
@ -15,7 +17,7 @@ class VoiceChangerManager(object):
cls._instance.voiceChanger = VoiceChanger(params) cls._instance.voiceChanger = VoiceChanger(params)
return cls._instance return cls._instance
def loadModel(self, props): def loadModel(self, props: LoadModelParams):
info = self.voiceChanger.loadModel(props) info = self.voiceChanger.loadModel(props)
if hasattr(info, "status") and info["status"] == "NG": if hasattr(info, "status") and info["status"] == "NG":
return info return info
@ -31,7 +33,7 @@ class VoiceChangerManager(object):
else: else:
return {"status": "ERROR", "msg": "no model loaded"} return {"status": "ERROR", "msg": "no model loaded"}
def update_settings(self, key: str, val: any): def update_settings(self, key: str, val: str | int | float):
if hasattr(self, "voiceChanger"): if hasattr(self, "voiceChanger"):
info = self.voiceChanger.update_settings(key, val) info = self.voiceChanger.update_settings(key, val)
info["status"] = "OK" info["status"] = "OK"
@ -39,7 +41,7 @@ class VoiceChangerManager(object):
else: else:
return {"status": "ERROR", "msg": "no model loaded"} return {"status": "ERROR", "msg": "no model loaded"}
def changeVoice(self, receivedData: any): def changeVoice(self, receivedData: AudioInOut):
if hasattr(self, "voiceChanger") is True: if hasattr(self, "voiceChanger") is True:
return self.voiceChanger.on_request(receivedData) return self.voiceChanger.on_request(receivedData)
else: else:

View File

@ -0,0 +1,19 @@
from dataclasses import dataclass
@dataclass
class FilePaths:
configFilename: str
pyTorchModelFilename: str
onnxModelFilename: str
clusterTorchModelFilename: str
featureFilename: str
indexFilename: str
@dataclass
class LoadModelParams:
slot: int
isHalf: bool
files: FilePaths
params: str

View File

@ -1,14 +1,30 @@
from typing import Any, Callable, Protocol, TypeAlias from typing import Any, Protocol, TypeAlias
import numpy as np import numpy as np
from voice_changer.utils.LoadModelParams import LoadModelParams
AudioInOut: TypeAlias = np.ndarray[Any, np.dtype[np.int16]] AudioInOut: TypeAlias = np.ndarray[Any, np.dtype[np.int16]]
class VoiceChangerModel(Protocol): class VoiceChangerModel(Protocol):
loadModel: Callable[..., dict[str, Any]] # loadModel: Callable[..., dict[str, Any]]
def get_processing_sampling_rate(self) -> int: ... def loadModel(self, params: LoadModelParams):
def get_info(self) -> dict[str, Any]: ... ...
def inference(self, data: tuple[Any, ...]) -> Any: ...
def generate_input(self, newData: AudioInOut, inputSize: int, crossfadeSize: int) -> tuple[Any, ...]: ... def get_processing_sampling_rate(self) -> int:
def update_settings(self, key: str, val: Any) -> bool: ... ...
def get_info(self) -> dict[str, Any]:
...
def inference(self, data: tuple[Any, ...]) -> Any:
...
def generate_input(
self, newData: AudioInOut, inputSize: int, crossfadeSize: int
) -> tuple[Any, ...]:
...
def update_settings(self, key: str, val: Any) -> bool:
...