voice-changer/server/restapi/mods/FileUploader.py

49 lines
1.7 KiB
Python
Raw Normal View History

2023-02-16 21:03:21 +03:00
import os
import shutil
2022-12-31 14:25:28 +03:00
from fastapi import UploadFile
def sanitize_filename(filename: str) -> str:
safe_filename = os.path.basename(filename)
max_length = 255
if len(safe_filename) > max_length:
file_root, file_ext = os.path.splitext(safe_filename)
safe_filename = file_root[: max_length - len(file_ext)] + file_ext
return safe_filename
2022-12-31 14:25:28 +03:00
2023-02-16 21:03:21 +03:00
def upload_file(upload_dirname: str, file: UploadFile, filename: str):
2022-12-31 14:25:28 +03:00
if file and filename:
fileobj = file.file
filename = sanitize_filename(filename)
2023-05-07 23:51:24 +03:00
target_path = os.path.join(upload_dirname, filename)
target_dir = os.path.dirname(target_path)
os.makedirs(target_dir, exist_ok=True)
upload_dir = open(target_path, "wb+")
2022-12-31 14:25:28 +03:00
shutil.copyfileobj(fileobj, upload_dir)
upload_dir.close()
2023-01-10 16:49:16 +03:00
2023-02-16 21:03:21 +03:00
return {"status": "OK", "msg": f"uploaded files {filename} "}
return {"status": "ERROR", "msg": "uploaded file is not found."}
2022-12-31 14:25:28 +03:00
def concat_file_chunks(upload_dirname: str, filename: str, chunkNum: int, dest_dirname: str):
filename = sanitize_filename(filename)
2023-05-07 23:51:24 +03:00
target_path = os.path.join(upload_dirname, filename)
target_dir = os.path.dirname(target_path)
2023-04-14 03:18:34 +03:00
os.makedirs(target_dir, exist_ok=True)
2023-05-07 23:51:24 +03:00
if os.path.exists(target_path):
os.remove(target_path)
with open(target_path, "ab") as out:
2022-12-31 14:25:28 +03:00
for i in range(chunkNum):
chunkName = f"{filename}_{i}"
chunk_file_path = os.path.join(upload_dirname, chunkName)
2023-05-07 23:51:24 +03:00
stored_chunk_file = open(chunk_file_path, "rb")
out.write(stored_chunk_file.read())
2022-12-31 14:25:28 +03:00
stored_chunk_file.close()
2023-02-16 21:03:21 +03:00
os.remove(chunk_file_path)
2023-05-07 23:51:24 +03:00
out.close()
return {"status": "OK", "msg": f"concat files {out} "}