-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
89 lines (74 loc) · 2.7 KB
/
main.py
File metadata and controls
89 lines (74 loc) · 2.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from ast import Not
from typing import Union
from fastapi import FastAPI, Query
from fastapi.responses import FileResponse
from uvr import my_uvr
import tempfile
import os
import uuid
import requests
app = FastAPI()
@app.get("/")
async def read_root():
return {
"Hello": "World"
}
@app.get("/voice/items/{item_id}")
async def read_item(item_id: int, q: Union[str, None] = None):
return {
"item_id": item_id,
"q": q
}
async def download_file(url: str, temp_file_path: str):
try:
response = requests.get(url, stream=True)
response.raise_for_status() # Raise an exception for HTTP errors
with open(temp_file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"File downloaded successfully to {temp_file_path}")
except requests.exceptions.RequestException as e:
raise Exception(f"Error downloading file: {e}")
def get_file(keyword: str, endswith: str, in_dir: str) -> str | None:
# 获取所有子文件
subfiles = os.listdir(in_dir)
for file_name in subfiles:
if file_name.endswith(endswith) and keyword in file_name:
return os.path.join(in_dir, file_name)
return None
@app.post("/voice/vocal_process", response_class=FileResponse)
async def process_vocal(
ossFilePath: str = Query(
None,
title="aliyun OSS file path",
description="The path to the audio file in aliyun OSS",
example="https://bucket-name/path/to/file.mp3"
),
):
tempdir = tempfile.gettempdir()
random_uuid = str(uuid.uuid4())
temp_file_path = os.path.join(tempdir, f"{random_uuid}.mp3")
try:
await download_file(ossFilePath, temp_file_path)
## clean ./vocal_voice/ folder
if os.path.exists('./vocal_voice/'):
for file in os.listdir('./vocal_voice/'):
file_path = os.path.join('./vocal_voice/', file)
if os.path.isfile(file_path):
print(f"Deleting file: {file_path}")
os.remove(file_path)
my_uvr(
model_name='HP5_only_main_vocal',
save_root_vocal='./vocal_voice/',
input_path=temp_file_path,
audio_format='mp3'
)
if os.path.exists(temp_file_path):
print(f"Deleting temp file: {temp_file_path}")
os.remove(temp_file_path)
saved_vocal_path = get_file(random_uuid, '.mp3', './vocal_voice/')
if saved_vocal_path is None:
raise Exception("Vocal file not found")
return FileResponse(saved_vocal_path, media_type="audio/mpeg")
except Exception as e:
print(f"Error processing vocal: {e}")