|
@@ -7,6 +7,16 @@ import requests
|
|
|
from aria2p import API, Client
|
|
|
|
|
|
|
|
|
+def construct_path(base_path, sub_path, file_name):
|
|
|
+ # 确保路径部分不包含反斜杠
|
|
|
+ base_path = base_path.replace('\\', '/')
|
|
|
+ sub_path = sub_path.replace('\\', '/')
|
|
|
+ file_name = file_name.replace('\\', '/')
|
|
|
+
|
|
|
+ # 使用 os.path.join 构建整个路径
|
|
|
+ return os.path.join(base_path, sub_path, file_name)
|
|
|
+
|
|
|
+
|
|
|
class AlistAPI:
|
|
|
def __init__(self, url, aria2_rpc_url=None, aria2_rpc_secret=None, username=None, password=None):
|
|
|
self.url = url
|
|
@@ -53,6 +63,24 @@ class AlistAPI:
|
|
|
response = requests.post(f'{self.url}/fs/copy', data=json.dumps(payload), headers=self.headers)
|
|
|
return response.json()
|
|
|
|
|
|
+ def copy_directory(self, src_path, dst_path):
|
|
|
+ file_list = self.list_directory(src_path)
|
|
|
+ if not file_list:
|
|
|
+ return
|
|
|
+ for file_info in file_list['data']['content']:
|
|
|
+ if file_info['is_dir']:
|
|
|
+ new_src_path = src_path + "/" + file_info['name']
|
|
|
+ new_dst_path = dst_path + "/" + file_info['name']
|
|
|
+ # new_src_path = os.path.join(src_path, file_info['name'])
|
|
|
+ # new_dst_path = os.path.join(dst_path, file_info['name'])
|
|
|
+ print(f"Copying directory: {new_src_path} to {new_dst_path}")
|
|
|
+ self.copy_directory(new_src_path, new_dst_path)
|
|
|
+ else:
|
|
|
+ file_name = file_info['name']
|
|
|
+ print(f"Copying file: {src_path}/{file_name} to {dst_path}/{file_name}")
|
|
|
+ # 这里原本是调用 self.copy_file,现在改为仅打印信息
|
|
|
+ self.copy_file(src_path, dst_path, [file_name])
|
|
|
+
|
|
|
def list_directory(self, path, password="", page=1, per_page=0, refresh=False):
|
|
|
payload = {
|
|
|
"path": path,
|
|
@@ -84,20 +112,6 @@ class AlistAPI:
|
|
|
response = requests.post(f"{self.url}/fs/move", data=payload, headers=self.headers)
|
|
|
return response.json()
|
|
|
|
|
|
- # def download_directory(self, path, download_path):
|
|
|
- # file_list = self.list_directory(path)
|
|
|
- # if not file_list:
|
|
|
- # return
|
|
|
- # for file_info in file_list['data']['content']:
|
|
|
- # if file_info['is_dir']:
|
|
|
- # self.download_directory(path + "/" + file_info['name'], download_path)
|
|
|
- # else:
|
|
|
- # file_download_url = self.url + "/d" + path + "/" + file_info['name']
|
|
|
- # sign = file_info.get('sign')
|
|
|
- # if sign:
|
|
|
- # file_download_url += "?sign=" + sign
|
|
|
- # self.aria2_api.add_uris([file_download_url], options={"dir": download_path})
|
|
|
-
|
|
|
def download_directory(self, remote_path, local_base_path, current_sub_path=''):
|
|
|
file_list = self.list_directory(remote_path)
|
|
|
if not file_list:
|
|
@@ -114,8 +128,13 @@ class AlistAPI:
|
|
|
file_detail = self.get_file_or_directory_info(remote_path + "/" + file_info['name'])
|
|
|
if file_detail and file_detail['code'] == 200:
|
|
|
raw_url = file_detail['data']['raw_url']
|
|
|
+ # 下载完整路径
|
|
|
+ full_download_path = local_base_path + "/" + current_sub_path
|
|
|
+ print(f"aria2 File: {file_info['name']}")
|
|
|
+ print(f"aria2 Download URL: {raw_url}")
|
|
|
+ print(f"aria2 Local Path: {full_download_path}")
|
|
|
# 将真实 URL 添加到 Aria2 下载队列
|
|
|
- self.aria2_api.add_uris([raw_url], options={"dir": current_sub_path})
|
|
|
+ self.aria2_api.add_uris([raw_url], options={"dir": full_download_path})
|
|
|
else:
|
|
|
print(f"Failed to get detailed information for {file_info['name']}")
|
|
|
|
|
@@ -124,37 +143,23 @@ class AlistAPI:
|
|
|
if not file_list:
|
|
|
return
|
|
|
for file_info in file_list['data']['content']:
|
|
|
- # 检查是否是目录
|
|
|
if file_info['is_dir']:
|
|
|
- # 构建新的子路径
|
|
|
new_sub_path = os.path.join(current_sub_path, file_info['name'])
|
|
|
- print(f"Directory: {remote_path}/{file_info['name']} -> Local: {local_base_path}/{new_sub_path}")
|
|
|
- # 递归遍历子目录
|
|
|
+ print(f"Directory: {remote_path}/{file_info['name']}")
|
|
|
+ print(f" Local Path: {local_base_path}/{new_sub_path}")
|
|
|
self.debug_directory(remote_path + "/" + file_info['name'], local_base_path, new_sub_path)
|
|
|
else:
|
|
|
- # 获取文件的详细信息
|
|
|
+ # print(f"remote_path: {remote_path} + / + {file_info['name']}")
|
|
|
file_detail = self.get_file_or_directory_info(remote_path + "/" + file_info['name'])
|
|
|
if file_detail and file_detail['code'] == 200:
|
|
|
raw_url = file_detail['data']['raw_url']
|
|
|
- print(f"File: {file_info['name']} -> Download URL: {raw_url}")
|
|
|
+ full_local_path = local_base_path + "/" + current_sub_path
|
|
|
+ print(f"File: {file_info['name']}")
|
|
|
+ print(f" Download URL: {raw_url}")
|
|
|
+ print(f" Local Path: {full_local_path}")
|
|
|
else:
|
|
|
print(f"Failed to get detailed information for {file_info['name']}")
|
|
|
|
|
|
- # def debug_directory(self, path, download_path):
|
|
|
- # file_list = self.list_directory(path)
|
|
|
- # if not file_list:
|
|
|
- # return
|
|
|
- # for file_info in file_list['data']['content']:
|
|
|
- # if file_info['is_dir']:
|
|
|
- # print(f"Directory: {path}/{file_info['name']}")
|
|
|
- # self.debug_directory(path + "/" + file_info['name'], download_path)
|
|
|
- # else:
|
|
|
- # file_download_url = self.url + "/d" + path + "/" + file_info['name']
|
|
|
- # sign = file_info.get('sign')
|
|
|
- # if sign:
|
|
|
- # file_download_url += "?sign=" + sign
|
|
|
- # print(f"File: {file_download_url}, Save to: {download_path}")
|
|
|
-
|
|
|
# alist 下载完成复制
|
|
|
def monitor_and_copy(self, download_path, destination_path, check_interval=10):
|
|
|
while True:
|
|
@@ -191,6 +196,6 @@ class AlistAPI:
|
|
|
print(f"Moved: {file_name} from {download_path} to {destination_path}")
|
|
|
|
|
|
# 从 Aria2 中移除已完成的下载记录
|
|
|
- download.remove()
|
|
|
+ # download.remove()
|
|
|
|
|
|
time.sleep(check_interval)
|