voice-changer/server/voice_changer/VoiceChanger.py

423 lines
19 KiB
Python
Raw Normal View History

2023-04-10 03:28:00 +03:00
from typing import Any, Callable, Optional, Protocol, TypeAlias, Union, cast
2023-04-10 18:21:17 +03:00
from const import TMP_DIR, ModelType
2022-12-31 10:08:14 +03:00
import torch
2023-01-28 09:56:56 +03:00
import os
import traceback
2022-12-31 10:08:14 +03:00
import numpy as np
2023-04-10 18:21:17 +03:00
from dataclasses import dataclass, asdict, field
2023-02-18 14:53:15 +03:00
import resampy
2023-02-10 18:59:44 +03:00
from voice_changer.IORecorder import IORecorder
2023-03-20 00:21:00 +03:00
# from voice_changer.IOAnalyzer import IOAnalyzer
from voice_changer.utils.Timer import Timer
from voice_changer.utils.VoiceChangerModel import VoiceChangerModel, AudioInOut
2023-02-20 22:07:43 +03:00
import time
2023-04-10 03:28:00 +03:00
2023-01-28 09:56:56 +03:00
providers = ['OpenVINOExecutionProvider', "CUDAExecutionProvider", "DmlExecutionProvider", "CPUExecutionProvider"]
STREAM_INPUT_FILE = os.path.join(TMP_DIR, "in.wav")
STREAM_OUTPUT_FILE = os.path.join(TMP_DIR, "out.wav")
STREAM_ANALYZE_FILE_DIO = os.path.join(TMP_DIR, "analyze-dio.png")
STREAM_ANALYZE_FILE_HARVEST = os.path.join(TMP_DIR, "analyze-harvest.png")
2023-02-12 06:25:57 +03:00
2023-01-08 10:18:20 +03:00
@dataclass
class VoiceChangerSettings():
2023-02-21 00:03:37 +03:00
inputSampleRate: int = 24000 # 48000 or 24000
2023-01-28 09:56:56 +03:00
crossFadeOffsetRate: float = 0.1
crossFadeEndRate: float = 0.9
crossFadeOverlapSize: int = 4096
2023-04-14 03:18:34 +03:00
solaEnabled: int = 1 # 0:off, 1:on
2023-02-20 01:14:05 +03:00
recordIO: int = 0 # 0:off, 1:on
2023-02-10 18:59:44 +03:00
2023-01-08 10:18:20 +03:00
# ↓mutableな物だけ列挙
2023-04-10 18:21:17 +03:00
intData: list[str] = field(
2023-04-14 03:18:34 +03:00
default_factory=lambda: ["inputSampleRate", "crossFadeOverlapSize", "recordIO", "solaEnabled"]
2023-04-10 18:21:17 +03:00
)
floatData: list[str] = field(
default_factory=lambda: ["crossFadeOffsetRate", "crossFadeEndRate"]
)
strData: list[str] = field(
default_factory=lambda: []
)
2023-01-08 10:18:20 +03:00
2023-01-28 09:56:56 +03:00
2022-12-31 10:08:14 +03:00
class VoiceChanger():
2023-04-10 02:18:14 +03:00
settings: VoiceChangerSettings
voiceChanger: VoiceChangerModel
2023-01-08 10:18:20 +03:00
2023-03-16 02:11:38 +03:00
def __init__(self, params):
2023-01-08 10:18:20 +03:00
# 初期化
2023-04-10 02:18:14 +03:00
self.settings = VoiceChangerSettings()
2023-01-10 16:49:16 +03:00
self.onnx_session = None
2023-01-28 09:56:56 +03:00
self.currentCrossFadeOffsetRate = 0
self.currentCrossFadeEndRate = 0
self.currentCrossFadeOverlapSize = 0 # setting
self.crossfadeSize = 0 # calculated
2023-01-28 09:56:56 +03:00
2023-04-10 18:21:17 +03:00
self.voiceChanger = None
self.modelType = None
self.params = params
self.gpu_num = torch.cuda.device_count()
self.prev_audio = np.zeros(4096)
self.mps_enabled: bool = getattr(torch.backends, "mps", None) is not None and torch.backends.mps.is_available()
print(f"VoiceChanger Initialized (GPU_NUM:{self.gpu_num}, mps_enabled:{self.mps_enabled})")
def switchModelType(self, modelType: ModelType):
if hasattr(self, "voiceChanger") and self.voiceChanger != None:
# return {"status": "ERROR", "msg": "vc is already selected. currently re-select is not implemented"}
del self.voiceChanger
self.voiceChanger = None
self.modelType = modelType
2023-03-13 15:07:35 +03:00
if self.modelType == "MMVCv15":
2023-03-08 03:48:50 +03:00
from voice_changer.MMVCv15.MMVCv15 import MMVCv15
2023-04-10 03:34:03 +03:00
self.voiceChanger = MMVCv15() # type: ignore
2023-03-13 15:07:35 +03:00
elif self.modelType == "MMVCv13":
2023-03-10 19:56:10 +03:00
from voice_changer.MMVCv13.MMVCv13 import MMVCv13
self.voiceChanger = MMVCv13()
2023-04-01 21:17:32 +03:00
elif self.modelType == "so-vits-svc-40v2":
2023-03-10 19:56:10 +03:00
from voice_changer.SoVitsSvc40v2.SoVitsSvc40v2 import SoVitsSvc40v2
2023-04-10 18:21:17 +03:00
self.voiceChanger = SoVitsSvc40v2(self.params)
2023-03-30 05:11:41 +03:00
elif self.modelType == "so-vits-svc-40" or self.modelType == "so-vits-svc-40_c":
2023-03-18 19:43:36 +03:00
from voice_changer.SoVitsSvc40.SoVitsSvc40 import SoVitsSvc40
2023-04-10 18:21:17 +03:00
self.voiceChanger = SoVitsSvc40(self.params)
2023-03-24 02:44:06 +03:00
elif self.modelType == "DDSP-SVC":
from voice_changer.DDSP_SVC.DDSP_SVC import DDSP_SVC
2023-04-10 18:21:17 +03:00
self.voiceChanger = DDSP_SVC(self.params)
2023-04-05 20:31:10 +03:00
elif self.modelType == "RVC":
from voice_changer.RVC.RVC import RVC
2023-04-10 18:21:17 +03:00
self.voiceChanger = RVC(self.params)
2023-03-07 19:46:08 +03:00
else:
2023-03-08 03:48:50 +03:00
from voice_changer.MMVCv13.MMVCv13 import MMVCv13
2023-03-07 19:46:08 +03:00
self.voiceChanger = MMVCv13()
2023-04-10 18:21:17 +03:00
return {"status": "OK", "msg": "vc is switched."}
2022-12-31 10:08:14 +03:00
2023-04-10 18:21:17 +03:00
def getModelType(self):
if self.modelType != None:
return {"status": "OK", "vc": self.modelType}
else:
return {"status": "OK", "vc": "none"}
2023-01-04 20:28:36 +03:00
2023-04-10 03:28:00 +03:00
def loadModel(
self,
config: str,
pyTorch_model_file: Optional[str] = None,
onnx_model_file: Optional[str] = None,
clusterTorchModel: Optional[str] = None,
feature_file: Optional[str] = None,
index_file: Optional[str] = None,
is_half: bool = True,
):
2023-04-14 05:03:52 +03:00
try:
if self.modelType == "MMVCv15" or self.modelType == "MMVCv13":
return self.voiceChanger.loadModel(config, pyTorch_model_file, onnx_model_file)
elif self.modelType == "so-vits-svc-40" or self.modelType == "so-vits-svc-40_c" or self.modelType == "so-vits-svc-40v2":
return self.voiceChanger.loadModel(config, pyTorch_model_file, onnx_model_file, clusterTorchModel)
elif self.modelType == "RVC":
return self.voiceChanger.loadModel(config, pyTorch_model_file, onnx_model_file, feature_file, index_file, is_half)
else:
return self.voiceChanger.loadModel(config, pyTorch_model_file, onnx_model_file, clusterTorchModel)
except Exception as e:
print("[Voice Changer] Model Load Error! Check your model is valid.", e)
return {"status": "NG"}
2022-12-31 10:08:14 +03:00
2023-01-07 18:25:21 +03:00
def get_info(self):
2023-01-08 10:18:20 +03:00
data = asdict(self.settings)
2023-04-10 18:21:17 +03:00
if hasattr(self, "voiceChanger"):
data.update(self.voiceChanger.get_info())
2023-01-08 10:18:20 +03:00
return data
2023-04-10 03:28:00 +03:00
def update_settings(self, key: str, val: Any):
if key in self.settings.intData:
2023-01-08 10:18:20 +03:00
setattr(self.settings, key, int(val))
2023-01-08 15:19:44 +03:00
if key == "crossFadeOffsetRate" or key == "crossFadeEndRate":
self.crossfadeSize = 0
2023-02-14 23:02:51 +03:00
if key == "recordIO" and val == 1:
if hasattr(self, "ioRecorder"):
self.ioRecorder.close()
self.ioRecorder = IORecorder(STREAM_INPUT_FILE, STREAM_OUTPUT_FILE, self.settings.inputSampleRate)
2023-02-15 01:18:05 +03:00
if key == "recordIO" and val == 0:
if hasattr(self, "ioRecorder"):
self.ioRecorder.close()
2023-02-16 21:03:21 +03:00
pass
if key == "recordIO" and val == 2:
if hasattr(self, "ioRecorder"):
self.ioRecorder.close()
2023-03-20 00:21:00 +03:00
# if hasattr(self, "ioAnalyzer") == False:
# self.ioAnalyzer = IOAnalyzer()
2023-03-20 00:21:00 +03:00
# try:
# self.ioAnalyzer.analyze(STREAM_INPUT_FILE, STREAM_ANALYZE_FILE_DIO, STREAM_ANALYZE_FILE_HARVEST, self.settings.inputSampleRate)
2023-02-15 01:18:05 +03:00
2023-03-20 00:21:00 +03:00
# except Exception as e:
# print("recordIO exception", e)
2023-01-08 10:18:20 +03:00
elif key in self.settings.floatData:
setattr(self.settings, key, float(val))
elif key in self.settings.strData:
setattr(self.settings, key, str(val))
2023-01-08 03:45:58 +03:00
else:
2023-04-10 18:21:17 +03:00
if hasattr(self, "voiceChanger"):
ret = self.voiceChanger.update_settings(key, val)
if ret == False:
print(f"{key} is not mutable variable or unknown variable!")
else:
print(f"voice changer is not initialized!")
2023-01-10 18:59:09 +03:00
return self.get_info()
2023-01-08 10:18:20 +03:00
def _generate_strength(self, crossfadeSize: int):
2023-01-07 14:07:39 +03:00
if self.crossfadeSize != crossfadeSize or \
self.currentCrossFadeOffsetRate != self.settings.crossFadeOffsetRate or \
self.currentCrossFadeEndRate != self.settings.crossFadeEndRate or \
self.currentCrossFadeOverlapSize != self.settings.crossFadeOverlapSize:
self.crossfadeSize = crossfadeSize
2023-01-10 18:59:09 +03:00
self.currentCrossFadeOffsetRate = self.settings.crossFadeOffsetRate
self.currentCrossFadeEndRate = self.settings.crossFadeEndRate
self.currentCrossFadeOverlapSize = self.settings.crossFadeOverlapSize
2023-01-11 19:05:38 +03:00
cf_offset = int(crossfadeSize * self.settings.crossFadeOffsetRate)
cf_end = int(crossfadeSize * self.settings.crossFadeEndRate)
2023-01-04 20:28:36 +03:00
cf_range = cf_end - cf_offset
percent = np.arange(cf_range) / cf_range
2023-01-28 09:56:56 +03:00
np_prev_strength = np.cos(percent * 0.5 * np.pi) ** 2
np_cur_strength = np.cos((1 - percent) * 0.5 * np.pi) ** 2
2023-01-04 20:28:36 +03:00
self.np_prev_strength = np.concatenate([np.ones(cf_offset), np_prev_strength,
np.zeros(crossfadeSize - cf_offset - len(np_prev_strength))])
self.np_cur_strength = np.concatenate([np.zeros(cf_offset), np_cur_strength, np.ones(crossfadeSize - cf_offset - len(np_cur_strength))])
2023-01-04 20:28:36 +03:00
print(f"Generated Strengths: for prev:{self.np_prev_strength.shape}, for cur:{self.np_cur_strength.shape}")
2023-01-28 09:56:56 +03:00
2023-01-04 20:28:36 +03:00
# ひとつ前の結果とサイズが変わるため、記録は消去する。
if hasattr(self, 'np_prev_audio1') == True:
delattr(self, "np_prev_audio1")
2023-04-14 05:03:52 +03:00
if hasattr(self, "sola_buffer"):
del self.sola_buffer
2023-04-14 03:18:34 +03:00
# receivedData: tuple of short
def on_request(self, receivedData: AudioInOut) -> tuple[AudioInOut, list[Union[int, float]]]:
2023-04-14 22:36:41 +03:00
if self.settings.solaEnabled and (self.modelType == "RVC" or self.modelType == "MMVCv13" or self.modelType == "MMVCv15"):
2023-04-14 03:18:34 +03:00
return self.on_request_sola(receivedData)
else:
return self.on_request_legacy(receivedData)
def on_request_sola(self, receivedData: AudioInOut) -> tuple[AudioInOut, list[Union[int, float]]]:
2023-04-14 22:25:30 +03:00
print("processing with sola")
2023-03-10 19:56:10 +03:00
processing_sampling_rate = self.voiceChanger.get_processing_sampling_rate()
2023-04-14 03:18:34 +03:00
# 前処理
with Timer("pre-process") as t:
if self.settings.inputSampleRate != processing_sampling_rate:
newData = cast(AudioInOut, resampy.resample(receivedData, self.settings.inputSampleRate, processing_sampling_rate))
else:
newData = receivedData
sola_search_frame = int(0.012 * processing_sampling_rate)
# sola_search_frame = 0
block_frame = newData.shape[0]
crossfade_frame = min(self.settings.crossFadeOverlapSize, block_frame)
self._generate_strength(crossfade_frame)
data = self.voiceChanger.generate_input(newData, block_frame, crossfade_frame, True, sola_search_frame)
preprocess_time = t.secs
# 変換処理
with Timer("main-process") as t:
try:
# Inference
audio = self.voiceChanger.inference(data)
if hasattr(self, 'sola_buffer') == True:
np.set_printoptions(threshold=10000)
audio = audio[-sola_search_frame - crossfade_frame - block_frame:]
# SOLA algorithm from https://github.com/yxlllc/DDSP-SVC, https://github.com/liujing04/Retrieval-based-Voice-Conversion-WebUI
cor_nom = np.convolve(audio[: crossfade_frame + sola_search_frame], np.flip(self.sola_buffer), 'valid')
cor_den = np.sqrt(np.convolve(audio[: crossfade_frame + sola_search_frame] ** 2, np.ones(crossfade_frame), 'valid') + 1e-3)
sola_offset = np.argmax(cor_nom / cor_den)
output_wav = audio[sola_offset: sola_offset + block_frame].astype(np.float64)
output_wav[:crossfade_frame] *= self.np_cur_strength
output_wav[:crossfade_frame] += self.sola_buffer[:]
result = output_wav
else:
print("no sola buffer")
result = np.zeros(4096).astype(np.int16)
if hasattr(self, 'sola_buffer') == True and sola_offset < sola_search_frame:
sola_buf_org = audio[- sola_search_frame - crossfade_frame + sola_offset: -sola_search_frame + sola_offset]
self.sola_buffer = sola_buf_org * self.np_prev_strength
else:
self.sola_buffer = audio[- crossfade_frame:] * self.np_prev_strength
# self.sola_buffer = audio[- crossfade_frame:]
except Exception as e:
print("VC PROCESSING!!!! EXCEPTION!!!", e)
print(traceback.format_exc())
return np.zeros(1).astype(np.int16), [0, 0, 0]
mainprocess_time = t.secs
# 後処理
with Timer("post-process") as t:
result = result.astype(np.int16)
if self.settings.inputSampleRate != processing_sampling_rate:
outputData = cast(AudioInOut, resampy.resample(result, processing_sampling_rate, self.settings.inputSampleRate).astype(np.int16))
else:
outputData = result
print_convert_processing(
f" Output data size of {result.shape[0]}/{processing_sampling_rate}hz {outputData.shape[0]}/{self.settings.inputSampleRate}hz")
if self.settings.recordIO == 1:
self.ioRecorder.writeInput(receivedData)
self.ioRecorder.writeOutput(outputData.tobytes())
# if receivedData.shape[0] != outputData.shape[0]:
# print(f"Padding, in:{receivedData.shape[0]} out:{outputData.shape[0]}")
# outputData = pad_array(outputData, receivedData.shape[0])
# # print_convert_processing(
# # f" Padded!, Output data size of {result.shape[0]}/{processing_sampling_rate}hz {outputData.shape[0]}/{self.settings.inputSampleRate}hz")
postprocess_time = t.secs
print_convert_processing(f" [fin] Input/Output size:{receivedData.shape[0]},{outputData.shape[0]}")
perf = [preprocess_time, mainprocess_time, postprocess_time]
return outputData, perf
def on_request_legacy(self, receivedData: AudioInOut) -> tuple[AudioInOut, list[Union[int, float]]]:
# print("processing with legacy")
processing_sampling_rate = self.voiceChanger.get_processing_sampling_rate()
print_convert_processing(f"------------ Convert processing.... ------------")
# 前処理
2023-02-20 22:07:43 +03:00
with Timer("pre-process") as t:
2023-03-18 00:30:50 +03:00
with Timer("pre-process") as t1:
2023-03-18 00:30:50 +03:00
if self.settings.inputSampleRate != processing_sampling_rate:
newData = cast(AudioInOut, resampy.resample(receivedData, self.settings.inputSampleRate, processing_sampling_rate))
2023-03-18 00:30:50 +03:00
else:
newData = receivedData
# print("t1::::", t1.secs)
inputSize = newData.shape[0]
crossfadeSize = min(self.settings.crossFadeOverlapSize, inputSize)
print_convert_processing(
f" Input data size: {receivedData.shape[0]}/{self.settings.inputSampleRate}hz {inputSize}/{processing_sampling_rate}hz")
print_convert_processing(
f" Crossfade data size: crossfade:{crossfadeSize}, crossfade setting:{self.settings.crossFadeOverlapSize}, input size:{inputSize}")
2023-03-10 19:56:10 +03:00
print_convert_processing(f" Convert data size of {inputSize + crossfadeSize} (+ extra size)")
print_convert_processing(f" will be cropped:{-1 * (inputSize + crossfadeSize)}, {-1 * (crossfadeSize)}")
self._generate_strength(crossfadeSize)
2023-03-18 00:30:50 +03:00
with Timer("pre-process") as t2:
data = self.voiceChanger.generate_input(newData, inputSize, crossfadeSize)
# print("t2::::", t2.secs)
2023-02-20 22:07:43 +03:00
preprocess_time = t.secs
# 変換処理
2023-02-20 22:07:43 +03:00
with Timer("main-process") as t:
try:
# Inference
audio = self.voiceChanger.inference(data)
if hasattr(self, 'np_prev_audio1') == True:
np.set_printoptions(threshold=10000)
prev_overlap_start = -1 * crossfadeSize
prev_overlap = self.np_prev_audio1[prev_overlap_start:]
cur_overlap_start = -1 * (inputSize + crossfadeSize)
2023-03-10 20:31:10 +03:00
cur_overlap_end = -1 * inputSize
cur_overlap = audio[cur_overlap_start:cur_overlap_end]
print_convert_processing(
f" audio:{audio.shape}, prev_overlap:{prev_overlap.shape}, self.np_prev_strength:{self.np_prev_strength.shape}")
powered_prev = prev_overlap * self.np_prev_strength
print_convert_processing(
f" audio:{audio.shape}, cur_overlap:{cur_overlap.shape}, self.np_cur_strength:{self.np_cur_strength.shape}")
print_convert_processing(f" cur_overlap_strt:{cur_overlap_start}, cur_overlap_end{cur_overlap_end}")
2023-04-05 20:31:10 +03:00
powered_cur = cur_overlap * self.np_cur_strength
powered_result = powered_prev + powered_cur
cur = audio[-1 * inputSize:-1 * crossfadeSize]
result = np.concatenate([powered_result, cur], axis=0)
print_convert_processing(
f" overlap:{crossfadeSize}, current:{cur.shape[0]}, result:{result.shape[0]}... result should be same as input")
if cur.shape[0] != result.shape[0]:
print_convert_processing(f" current and result should be same as input")
else:
2023-03-10 20:31:10 +03:00
result = np.zeros(4096).astype(np.int16)
self.np_prev_audio1 = audio
2023-02-20 22:07:43 +03:00
except Exception as e:
print("VC PROCESSING!!!! EXCEPTION!!!", e)
print(traceback.format_exc())
if hasattr(self, "np_prev_audio1"):
del self.np_prev_audio1
2023-03-01 16:33:51 +03:00
return np.zeros(1).astype(np.int16), [0, 0, 0]
2023-02-20 22:07:43 +03:00
mainprocess_time = t.secs
# 後処理
2023-02-20 22:07:43 +03:00
with Timer("post-process") as t:
result = result.astype(np.int16)
2023-03-10 19:56:10 +03:00
if self.settings.inputSampleRate != processing_sampling_rate:
outputData = cast(AudioInOut, resampy.resample(result, processing_sampling_rate, self.settings.inputSampleRate).astype(np.int16))
2023-03-10 19:56:10 +03:00
else:
outputData = result
2023-03-15 00:39:33 +03:00
# outputData = result
2023-03-10 19:56:10 +03:00
print_convert_processing(
f" Output data size of {result.shape[0]}/{processing_sampling_rate}hz {outputData.shape[0]}/{self.settings.inputSampleRate}hz")
2023-02-20 22:07:43 +03:00
if self.settings.recordIO == 1:
self.ioRecorder.writeInput(receivedData)
2023-03-10 19:56:10 +03:00
self.ioRecorder.writeOutput(outputData.tobytes())
2023-03-12 21:32:21 +03:00
# if receivedData.shape[0] != outputData.shape[0]:
2023-03-24 10:19:01 +03:00
# print(f"Padding, in:{receivedData.shape[0]} out:{outputData.shape[0]}")
2023-03-12 21:32:21 +03:00
# outputData = pad_array(outputData, receivedData.shape[0])
# # print_convert_processing(
# # f" Padded!, Output data size of {result.shape[0]}/{processing_sampling_rate}hz {outputData.shape[0]}/{self.settings.inputSampleRate}hz")
2023-02-20 22:07:43 +03:00
postprocess_time = t.secs
2023-03-12 21:38:39 +03:00
print_convert_processing(f" [fin] Input/Output size:{receivedData.shape[0]},{outputData.shape[0]}")
2023-02-20 22:07:43 +03:00
perf = [preprocess_time, mainprocess_time, postprocess_time]
2023-03-10 19:56:10 +03:00
return outputData, perf
2023-02-20 22:07:43 +03:00
2023-04-13 02:00:28 +03:00
def export2onnx(self):
return self.voiceChanger.export2onnx()
2023-02-20 22:07:43 +03:00
2023-04-14 22:36:41 +03:00
2023-04-13 02:00:28 +03:00
##############
2023-04-10 03:28:00 +03:00
PRINT_CONVERT_PROCESSING: bool = False
# PRINT_CONVERT_PROCESSING = True
def print_convert_processing(mess: str):
if PRINT_CONVERT_PROCESSING == True:
print(mess)
def pad_array(arr: AudioInOut, target_length: int):
2023-03-10 19:56:10 +03:00
current_length = arr.shape[0]
if current_length >= target_length:
return arr
else:
pad_width = target_length - current_length
pad_left = pad_width // 2
pad_right = pad_width - pad_left
padded_arr = np.pad(arr, (pad_left, pad_right), 'constant', constant_values=(0, 0))
return padded_arr