from concurrent.futures import ThreadPoolExecutor import sys import os import resampy from dataclasses import asdict from typing import cast import numpy as np import torch from MMVCServerSIO import download from ModelSample import RVCModelSample, getModelSamples # avoiding parse arg error in RVC sys.argv = ["MMVCServerSIO.py"] if sys.platform.startswith("darwin"): baseDir = [x for x in sys.path if x.endswith("Contents/MacOS")] if len(baseDir) != 1: print("baseDir should be only one ", baseDir) sys.exit() modulePath = os.path.join(baseDir[0], "RVC") sys.path.append(modulePath) else: sys.path.append("RVC") from voice_changer.RVC.modelMerger.MergeModel import merge_model from voice_changer.RVC.modelMerger.MergeModelRequest import MergeModelRequest from voice_changer.RVC.ModelSlotGenerator import generateModelSlot from voice_changer.RVC.RVCSettings import RVCSettings from voice_changer.RVC.embedder.EmbedderManager import EmbedderManager from voice_changer.utils.LoadModelParams import LoadModelParams from voice_changer.utils.VoiceChangerModel import AudioInOut from voice_changer.utils.VoiceChangerParams import VoiceChangerParams from voice_changer.RVC.onnxExporter.export2onnx import export2onnx from voice_changer.RVC.pitchExtractor.PitchExtractorManager import PitchExtractorManager from voice_changer.RVC.pipeline.PipelineGenerator import createPipeline from voice_changer.RVC.deviceManager.DeviceManager import DeviceManager from voice_changer.RVC.pipeline.Pipeline import Pipeline from Exceptions import NoModeLoadedException from const import TMP_DIR, UPLOAD_DIR import shutil import json providers = [ "OpenVINOExecutionProvider", "CUDAExecutionProvider", "DmlExecutionProvider", "CPUExecutionProvider", ] RVC_MODEL_DIRNAME = "rvc" RVC_MAX_SLOT_NUM = 6 class RVC: initialLoad: bool = True settings: RVCSettings = RVCSettings() pipeline: Pipeline | None = None deviceManager = DeviceManager.get_instance() audio_buffer: AudioInOut | None = None prevVol: float = 0 params: VoiceChangerParams currentSlot: int = -1 needSwitch: bool = False def __init__(self, params: VoiceChangerParams): self.pitchExtractor = PitchExtractorManager.getPitchExtractor( self.settings.f0Detector ) self.params = params EmbedderManager.initialize(params) self.loadSlots() print("RVC initialization: ", params) sampleModels = getModelSamples(params.samples, "RVC") if sampleModels is not None: self.settings.sampleModels = sampleModels # 起動時にスロットにモデルがある場合はロードしておく if len(self.settings.modelSlots) > 0: for i, slot in enumerate(self.settings.modelSlots): if len(slot.modelFile) > 0: self.prepareModel(i) self.settings.modelSlotIndex = i self.switchModel() self.initialLoad = False break def getSampleInfo(self, id: str): sampleInfos = list(filter(lambda x: x.id == id, self.settings.sampleModels)) if len(sampleInfos) > 0: return sampleInfos[0] else: None def downloadModelFiles(self, sampleInfo: RVCModelSample): downloadParams = [] modelPath = os.path.join(TMP_DIR, os.path.basename(sampleInfo.modelUrl)) downloadParams.append( { "url": sampleInfo.modelUrl, "saveTo": modelPath, "position": 0, } ) indexPath = None if hasattr(sampleInfo, "indexUrl") and sampleInfo.indexUrl != "": indexPath = os.path.join(TMP_DIR, os.path.basename(sampleInfo.indexUrl)) downloadParams.append( { "url": sampleInfo.indexUrl, "saveTo": indexPath, "position": 1, } ) featurePath = None if hasattr(sampleInfo, "featureUrl") or sampleInfo.featureUrl != "": featurePath = os.path.join(TMP_DIR, os.path.basename(sampleInfo.featureUrl)) downloadParams.append( { "url": sampleInfo.featureUrl, "saveTo": featurePath, "position": 2, } ) with ThreadPoolExecutor() as pool: pool.map(download, downloadParams) return modelPath, indexPath, featurePath def loadModel(self, props: LoadModelParams): target_slot_idx = props.slot params = props.params print("loadModel", params) if len(params["sampleId"]) > 0: sampleInfo = self.getSampleInfo(params["sampleId"]) if sampleInfo is None: print("[Voice Changer] sampleInfo is None") return modelPath, indexPath, featurePath = self.downloadModelFiles(sampleInfo) params["files"]["rvcModel"] = modelPath if indexPath is not None: params["files"]["rvcIndex"] = indexPath if featurePath is not None: params["files"]["rvcFeature"] = featurePath params["credit"] = sampleInfo.credit params["description"] = sampleInfo.description params["name"] = sampleInfo.name params["sampleId"] = sampleInfo.id params["termOfUseUrl"] = sampleInfo.termOfUseUrl slotDir = os.path.join( self.params.model_dir, RVC_MODEL_DIRNAME, str(target_slot_idx) ) files = [params["files"]["rvcModel"]] if "rvcFeature" in params["files"]: files.append(params["files"]["rvcFeature"]) if "rvcIndex" in params["files"]: files.append(params["files"]["rvcIndex"]) os.makedirs(slotDir, exist_ok=True) for f in files: dst = os.path.join(slotDir, os.path.basename(f)) if os.path.exists(dst): os.remove(dst) shutil.move(f, dst) json.dump(params, open(os.path.join(slotDir, "params.json"), "w")) self.loadSlots() # 初回のみロード(起動時にスロットにモデルがあった場合はinitialLoadはFalseになっている) if self.initialLoad: self.prepareModel(target_slot_idx) self.settings.modelSlotIndex = target_slot_idx self.switchModel() self.initialLoad = False elif target_slot_idx == self.currentSlot: self.prepareModel(target_slot_idx) return self.get_info() def loadSlots(self): dirname = os.path.join(self.params.model_dir, RVC_MODEL_DIRNAME) self.settings.modelSlots = [] if not os.path.exists(dirname): return for slot_idx in range(RVC_MAX_SLOT_NUM): slotDir = os.path.join( self.params.model_dir, RVC_MODEL_DIRNAME, str(slot_idx) ) modelSlot = generateModelSlot(slotDir) self.settings.modelSlots.append(modelSlot) def update_settings(self, key: str, val: int | float | str): if key in self.settings.intData: # 設定前処理 val = cast(int, val) if key == "modelSlotIndex": if val < 0: return True val = val % 1000 # Quick hack for same slot is selected if ( self.settings.modelSlots[val].modelFile is None or self.settings.modelSlots[val].modelFile == "" ): print("[Voice Changer] slot does not have model.") return True self.prepareModel(val) # 設定 setattr(self.settings, key, val) if key == "gpu": dev = self.deviceManager.getDevice(val) half = self.deviceManager.halfPrecisionAvailable(val) # half-precisionの使用可否が変わるときは作り直し if self.pipeline is not None and self.pipeline.isHalf == half: print( "USE EXSISTING PIPELINE", half, ) self.pipeline.setDevice(dev) else: print("CHAGE TO NEW PIPELINE", half) self.prepareModel(self.settings.modelSlotIndex) if key == "enableDirectML": if self.pipeline is not None and val == 0: self.pipeline.setDirectMLEnable(False) elif self.pipeline is not None and val == 1: self.pipeline.setDirectMLEnable(True) elif key in self.settings.floatData: setattr(self.settings, key, float(val)) elif key in self.settings.strData: setattr(self.settings, key, str(val)) if key == "f0Detector" and self.pipeline is not None: pitchExtractor = PitchExtractorManager.getPitchExtractor( self.settings.f0Detector ) self.pipeline.setPitchExtractor(pitchExtractor) else: return False return True def prepareModel(self, slot: int): if slot < 0: return self.get_info() modelSlot = self.settings.modelSlots[slot] print("[Voice Changer] Prepare Model of slot:", slot) # pipelineの生成 self.next_pipeline = createPipeline( modelSlot, self.settings.gpu, self.settings.f0Detector ) # その他の設定 self.next_trans = modelSlot.defaultTrans self.next_samplingRate = modelSlot.samplingRate self.next_framework = "ONNX" if modelSlot.isONNX else "PyTorch" self.needSwitch = True print("[Voice Changer] Prepare done.") return self.get_info() def switchModel(self): print("[Voice Changer] Switching model..") self.pipeline = self.next_pipeline self.settings.tran = self.next_trans self.settings.modelSamplingRate = self.next_samplingRate self.settings.framework = self.next_framework print( "[Voice Changer] Switching model..done", ) def get_info(self): data = asdict(self.settings) return data def get_processing_sampling_rate(self): return self.settings.modelSamplingRate def generate_input( self, newData: AudioInOut, inputSize: int, crossfadeSize: int, solaSearchFrame: int = 0, ): newData = newData.astype(np.float32) / 32768.0 if self.audio_buffer is not None: # 過去のデータに連結 self.audio_buffer = np.concatenate([self.audio_buffer, newData], 0) else: self.audio_buffer = newData convertSize = ( inputSize + crossfadeSize + solaSearchFrame + self.settings.extraConvertSize ) if convertSize % 128 != 0: # モデルの出力のホップサイズで切り捨てが発生するので補う。 convertSize = convertSize + (128 - (convertSize % 128)) convertOffset = -1 * convertSize self.audio_buffer = self.audio_buffer[convertOffset:] # 変換対象の部分だけ抽出 # 出力部分だけ切り出して音量を確認。(TODO:段階的消音にする) cropOffset = -1 * (inputSize + crossfadeSize) cropEnd = -1 * (crossfadeSize) crop = self.audio_buffer[cropOffset:cropEnd] rms = np.sqrt(np.square(crop).mean(axis=0)) vol = max(rms, self.prevVol * 0.0) self.prevVol = vol return (self.audio_buffer, convertSize, vol) def inference(self, data): if self.settings.modelSlotIndex < 0: print( "[Voice Changer] wait for loading model...", self.settings.modelSlotIndex, self.currentSlot, ) raise NoModeLoadedException("model_common") if self.needSwitch: print( f"[Voice Changer] Switch model {self.currentSlot} -> {self.settings.modelSlotIndex}" ) self.currentSlot = self.settings.modelSlotIndex self.switchModel() self.needSwitch = False half = self.deviceManager.halfPrecisionAvailable(self.settings.gpu) audio = data[0] convertSize = data[1] vol = data[2] audio = resampy.resample(audio, self.settings.modelSamplingRate, 16000) if vol < self.settings.silentThreshold: return np.zeros(convertSize).astype(np.int16) repeat = 3 if half else 1 repeat *= self.settings.rvcQuality # 0 or 3 sid = 0 f0_up_key = self.settings.tran index_rate = self.settings.indexRatio if_f0 = 1 if self.settings.modelSlots[self.currentSlot].f0 else 0 embChannels = self.settings.modelSlots[self.currentSlot].embChannels audio_out = self.pipeline.exec( sid, audio, f0_up_key, index_rate, if_f0, self.settings.extraConvertSize / self.settings.modelSamplingRate, embChannels, repeat, ) result = audio_out * np.sqrt(vol) return result def __del__(self): del self.pipeline print("---------- REMOVING ---------------") remove_path = os.path.join("RVC") sys.path = [x for x in sys.path if x.endswith(remove_path) is False] for key in list(sys.modules): val = sys.modules.get(key) try: file_path = val.__file__ if file_path.find("RVC" + os.path.sep) >= 0: print("remove", key, file_path) sys.modules.pop(key) except Exception: # type:ignore # print(e) pass def export2onnx(self): modelSlot = self.settings.modelSlots[self.settings.modelSlotIndex] if modelSlot.isONNX: print("[Voice Changer] export2onnx, No pyTorch filepath.") return {"status": "ng", "path": ""} output_file_simple = export2onnx(self.settings.gpu, modelSlot) return { "status": "ok", "path": f"/tmp/{output_file_simple}", "filename": output_file_simple, } def merge_models(self, request: str): print("[Voice Changer] MergeRequest:", request) req: MergeModelRequest = MergeModelRequest.from_json(request) merged = merge_model(req) targetSlot = 0 if req.slot < 0: targetSlot = len(self.settings.modelSlots) - 1 else: targetSlot = req.slot storeDir = os.path.join(UPLOAD_DIR, f"{targetSlot}") print("[Voice Changer] store merged model to:", storeDir) os.makedirs(storeDir, exist_ok=True) storeFile = os.path.join(storeDir, "merged.pth") torch.save(merged, storeFile) params = {"trans": req.defaultTrans, "files": {"rvcModel": storeFile}} props: LoadModelParams = LoadModelParams( slot=targetSlot, isHalf=True, params=params ) self.loadModel(props) self.prepareModel(targetSlot) self.settings.modelSlotIndex = targetSlot self.currentSlot = self.settings.modelSlotIndex