import eventlet
import socketio
import sys,os , math, struct, argparse, logging
from distutils.util import strtobool
from datetime import datetime
from OpenSSL import SSL, crypto
import torch, torchaudio
import numpy as np
from scipy.io.wavfile import write, read
from hubert import hubert_discrete, hubert_soft, kmeans100
from acoustic import hubert_discrete, hubert_soft
from hifigan import hifigan
hubert_model = torch.load("/models/bshall_hubert_main.pt").cuda()
acoustic_model = torch.load("/models/bshall_acoustic-model_main.pt").cuda()
hifigan_model = torch.load("/models/bshall_hifigan_main.pt").cuda()
def applyVol(i, chunk, vols):
curVol = vols[i] / 2
if curVol < 0.0001:
line = torch.zeros(chunk.size())
line = torch.ones(chunk.size())
volApplied = torch.mul(line, chunk)
volApplied = volApplied.unsqueeze(0)
return volApplied
class MyCustomNamespace(socketio.Namespace):
def __init__(self, namespace):
def on_connect(self, sid, environ):
print('[{}] connet sid : {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S') , sid))
def on_request_message(self, sid, msg):
# print("Processing Request...")
gpu = int(msg[0])
srcId = int(msg[1])
dstId = int(msg[2])
timestamp = int(msg[3])
data = msg[4]
# print(srcId, dstId, timestamp)
unpackedData = np.array(struct.unpack('<%sh'%(len(data) // struct.calcsize('<h') ), data))
write("logs/received_data.wav", 24000, unpackedData.astype(np.int16))
source, sr = torchaudio.load("logs/received_data.wav") # デフォルトでnormalize=Trueがついており、float32に変換して読んでくれるらしいのでこれを使う。https://pytorch.org/audio/stable/backend.html
source_16k = torchaudio.functional.resample(source, 24000, 16000)
source_16k = source_16k.unsqueeze(0).cuda()
with torch.inference_mode():
units = hubert_model.units(source_16k)
mel = acoustic_model.generate(units).transpose(1, 2)
target = hifigan_model(mel)
dest = torchaudio.functional.resample(target, 16000,24000)
dest = dest.squeeze().cpu()
# ソースの音量取得
source = source.cpu()
specgram = torchaudio.transforms.MelSpectrogram(sample_rate=24000)(source)
vol_apply_window_size = math.ceil(len(source[0]) / specgram.size()[2])
specgram = specgram.transpose(1,2)
vols = [ torch.max(i) for i in specgram[0]]
chunks = torch.split(dest, vol_apply_window_size,0)
chunks = [applyVol(i,c,vols) for i, c in enumerate(chunks)]
dest = torch.cat(chunks,1)
arr = np.array(dest.squeeze())
int_size = 2**(16 - 1) - 1
arr = (arr * int_size).astype(np.int16)
bin = struct.pack('<%sh'%len(arr), *arr)
self.emit('response',[timestamp, bin])
def on_disconnect(self, sid):
def setupArgParser():
parser = argparse.ArgumentParser()
parser.add_argument("-p", type=int, default=8080, help="port")
parser.add_argument("--https", type=strtobool, default=False, help="use https")
parser.add_argument("--httpsKey", type=str, default="ssl.key", help="path for the key of https")
parser.add_argument("--httpsCert", type=str, default="ssl.cert", help="path for the cert of https")
parser.add_argument("--httpsSelfSigned", type=strtobool, default=True, help="generate self-signed certificate")
return parser
def create_self_signed_cert(certfile, keyfile, certargs, cert_dir="."):
C_F = os.path.join(cert_dir, certfile)
K_F = os.path.join(cert_dir, keyfile)
if not os.path.exists(C_F) or not os.path.exists(K_F):
k = crypto.PKey()
k.generate_key(crypto.TYPE_RSA, 2048)
cert = crypto.X509()
cert.get_subject().C = certargs["Country"]
cert.get_subject().ST = certargs["State"]
cert.get_subject().L = certargs["City"]
cert.get_subject().O = certargs["Organization"]
cert.get_subject().OU = certargs["Org. Unit"]
cert.get_subject().CN = 'Example'
cert.sign(k, 'sha1')
open(C_F, "wb").write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
open(K_F, "wb").write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k))
def printMessage(message, level=0):
if level == 0:
elif level == 1:
print(f"\033[34m {message}\033[0m")
elif level == 2:
print(f"\033[32m {message}\033[0m")
print(f"\033[47m {message}\033[0m")
if __name__ == '__main__':
parser = setupArgParser()
args = parser.parse_args()
PORT = args.p
printMessage(f"Start SoftVC SocketIO Server", level=0)
if os.environ["EX_PORT"]:
EX_PORT = os.environ["EX_PORT"]
printMessage(f"External_Port:{EX_PORT} Internal_Port:{PORT}", level=1)
printMessage(f"Internal_Port:{PORT}", level=1)
if os.environ["EX_IP"]:
EX_IP = os.environ["EX_IP"]
printMessage(f"External_IP:{EX_IP}", level=1)
if args.https == 1 and args.httpsSelfSigned == 1:
# HTTPS(おれおれ証明書生成)
os.makedirs("./key", exist_ok=True)
key_base_name = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}"
keyname = f"{key_base_name}.key"
certname = f"{key_base_name}.cert"
create_self_signed_cert(certname, keyname, certargs=
{"Country": "JP",
"State": "Tokyo",
"City": "Chuo-ku",
"Organization": "F",
"Org. Unit": "F"}, cert_dir="./key")
key_path = os.path.join("./key", keyname)
cert_path = os.path.join("./key", certname)
printMessage(f"protocol: HTTPS(self-signed), key:{key_path}, cert:{cert_path}", level=1)
elif args.https == 1 and args.httpsSelfSigned == 0:
key_path = args.httpsKey
cert_path = args.httpsCert
printMessage(f"protocol: HTTPS, key:{key_path}, cert:{cert_path}", level=1)
printMessage(f"protocol: HTTP", level=1)
# アドレス表示
if args.https == 1:
printMessage(f"open https://<IP>:<PORT>/ with your browser.", level=0)
printMessage(f"open http://<IP>:<PORT>/ with your browser.", level=0)
if EX_PORT and EX_IP and args.https == 1:
printMessage(f"In many cases it is one of the following", level=1)
printMessage(f"https://localhost:{EX_PORT}/", level=1)
for ip in EX_IP.strip().split(" "):
printMessage(f"https://{ip}:{EX_PORT}/", level=1)
elif EX_PORT and EX_IP and args.https == 0:
printMessage(f"In many cases it is one of the following", level=1)
printMessage(f"http://localhost:{EX_PORT}/", level=1)
# for ip in EX_IP.strip().split(" "):
# print(f" http://{ip}:{EX_PORT}/")
# SocketIOセットアップ
sio = socketio.Server(cors_allowed_origins='*')
app = socketio.WSGIApp(sio,static_files={
'': '../frontend/dist',
'/': '../frontend/dist/index.html',
### log を設定すると通常出力されないログが取得できるようだ。(ログ出力抑制には役立たない?
# logger = logging.getLogger("logger")
# logger.propagate=False
# handler = logging.FileHandler(filename="logger.log")
# logger.addHandler(handler)
if args.https:
# HTTPS サーバ起動
sslWrapper = eventlet.wrap_ssl(
# server_side=True
### log を設定すると通常出力されないログが取得できるようだ。(ログ出力抑制には役立たない?
# eventlet.wsgi.server(sslWrapper, app, log=logger)
eventlet.wsgi.server(sslWrapper, app)
# HTTP サーバ起動
### log を設定すると通常出力されないログが取得できるようだ。(ログ出力抑制には役立たない?
# eventlet.wsgi.server(eventlet.listen(('',int(PORT))), app, log=logger)
eventlet.wsgi.server(eventlet.listen(('',int(PORT))), app)