|
@@ -0,0 +1,234 @@
|
|
|
+import json
|
|
|
+import logging
|
|
|
+import os
|
|
|
+import requests
|
|
|
+from src.models.task_type import TaskType, ActionType
|
|
|
+
|
|
|
+
|
|
|
+def construct_path(base_path, sub_path, file_name):
|
|
|
+ # 确保路径部分不包含反斜杠
|
|
|
+ base_path = base_path.replace('\\', '/')
|
|
|
+ sub_path = sub_path.replace('\\', '/')
|
|
|
+ file_name = file_name.replace('\\', '/')
|
|
|
+
|
|
|
+ # 使用 os.path.join 构建整个路径
|
|
|
+ return os.path.join(base_path, sub_path, file_name)
|
|
|
+
|
|
|
+
|
|
|
+class AlistAPI:
|
|
|
+ def __init__(self, url, username, password):
|
|
|
+ self.url = url
|
|
|
+ self.headers = {
|
|
|
+ 'UserAgent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
|
|
|
+ 'Chrome/87.0.4280.88 Safari/537.36',
|
|
|
+ 'Content-Type': 'application/json'
|
|
|
+ }
|
|
|
+ # self.aria2_client = Client(rpc_url, secret=rpc_secret)
|
|
|
+ # self.aria2_api = API(self.aria2_client)
|
|
|
+ self.login(username, password)
|
|
|
+
|
|
|
+ def login(self, username, password):
|
|
|
+ data = {
|
|
|
+ 'username': username,
|
|
|
+ 'password': password
|
|
|
+ }
|
|
|
+ response = requests.post(f'{self.url}/auth/login', data=json.dumps(data), headers=self.headers)
|
|
|
+ if response.status_code == 200:
|
|
|
+ token = response.json()
|
|
|
+ self.headers['Authorization'] = token['data']['token']
|
|
|
+ else:
|
|
|
+ raise Exception('Login failed')
|
|
|
+
|
|
|
+ def get_directory(self, path="", password="", page=1, per_page=0, refresh=False):
|
|
|
+ payload = {
|
|
|
+ "path": path,
|
|
|
+ "password": password,
|
|
|
+ "page": page,
|
|
|
+ "per_page": per_page,
|
|
|
+ "refresh": refresh
|
|
|
+ }
|
|
|
+ response = requests.post(f'{self.url}/fs/dirs', data=json.dumps(payload), headers=self.headers)
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def copy_file(self, src_dir, dst_dir, names):
|
|
|
+ payload = {
|
|
|
+ "src_dir": src_dir,
|
|
|
+ "dst_dir": dst_dir,
|
|
|
+ "names": names
|
|
|
+ }
|
|
|
+ response = requests.post(f'{self.url}/fs/copy', data=json.dumps(payload), headers=self.headers)
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def get_completed_tasks(self, task_type):
|
|
|
+ """获取指定任务类型的已完成任务列表"""
|
|
|
+ if task_type == TaskType.UPLOAD:
|
|
|
+ api_endpoint = '/admin/task/upload/done'
|
|
|
+ elif task_type == TaskType.COPY:
|
|
|
+ api_endpoint = '/admin/task/copy/done'
|
|
|
+ elif task_type == TaskType.ARIA2_DOWNLOAD:
|
|
|
+ api_endpoint = '/admin/task/aria2_down/done'
|
|
|
+ elif task_type == TaskType.ARIA2_TRANSFER:
|
|
|
+ api_endpoint = '/admin/task/aria2_transfer/done'
|
|
|
+ elif task_type == TaskType.QBITTORRENT_DOWNLOAD:
|
|
|
+ api_endpoint = '/admin/task/qbit_down/done'
|
|
|
+ elif task_type == TaskType.QBITTORRENT_TRANSFER:
|
|
|
+ api_endpoint = '/admin/task/qbit_transfer/done'
|
|
|
+ else:
|
|
|
+ raise ValueError("Invalid task type")
|
|
|
+
|
|
|
+ response = requests.get(
|
|
|
+ f'{self.url}{api_endpoint}',
|
|
|
+ headers=self.headers
|
|
|
+ )
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def copy_directory(self, src_path, dst_path):
|
|
|
+ file_list = self.list_directory(src_path)
|
|
|
+ if not file_list:
|
|
|
+ return
|
|
|
+ for file_info in file_list['data']['content']:
|
|
|
+ if file_info['is_dir']:
|
|
|
+ new_src_path = src_path + "/" + file_info['name']
|
|
|
+ new_dst_path = dst_path + "/" + file_info['name']
|
|
|
+ # new_src_path = os.path.join(src_path, file_info['name'])
|
|
|
+ # new_dst_path = os.path.join(dst_path, file_info['name'])
|
|
|
+ print(f"Copying directory: {new_src_path} to {new_dst_path}")
|
|
|
+ self.copy_directory(new_src_path, new_dst_path)
|
|
|
+ else:
|
|
|
+ file_name = file_info['name']
|
|
|
+ print(f"Copying file: {src_path}/{file_name} to {dst_path}/{file_name}")
|
|
|
+ # 这里原本是调用 self.copy_file,现在改为仅打印信息
|
|
|
+ self.copy_file(src_path, dst_path, [file_name])
|
|
|
+
|
|
|
+ def list_directory(self, path, password="", page=1, per_page=0, refresh=False):
|
|
|
+ payload = {
|
|
|
+ "path": path,
|
|
|
+ "password": password,
|
|
|
+ "page": page,
|
|
|
+ "per_page": per_page,
|
|
|
+ "refresh": refresh
|
|
|
+ }
|
|
|
+ response = requests.post(f'{self.url}/fs/list', data=json.dumps(payload), headers=self.headers)
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def get_file_or_directory_info(self, path, password="", page=1, per_page=0, refresh=False):
|
|
|
+ payload = {
|
|
|
+ "path": path,
|
|
|
+ "password": password,
|
|
|
+ "page": page,
|
|
|
+ "per_page": per_page,
|
|
|
+ "refresh": refresh
|
|
|
+ }
|
|
|
+ response = requests.post(f"{self.url}/fs/get", data=json.dumps(payload), headers=self.headers)
|
|
|
+ if response.status_code == 200:
|
|
|
+ return response.json()
|
|
|
+ return None
|
|
|
+
|
|
|
+ def move_file(self, src_dir, dst_dir, names):
|
|
|
+ payload = json.dumps({
|
|
|
+ "src_dir": src_dir,
|
|
|
+ "dst_dir": dst_dir,
|
|
|
+ "names": names
|
|
|
+ })
|
|
|
+ response = requests.post(f"{self.url}/fs/move", data=payload, headers=self.headers)
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def remove_files_or_folders(self, dir_path, names):
|
|
|
+ """删除指定目录下的文件或文件夹"""
|
|
|
+ payload = {
|
|
|
+ "dir": dir_path,
|
|
|
+ "names": names
|
|
|
+ }
|
|
|
+ response = requests.post(
|
|
|
+ f'{self.url}/fs/remove',
|
|
|
+ headers=self.headers,
|
|
|
+ json=payload
|
|
|
+ )
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def remove_empty_directory(self, src_dir):
|
|
|
+ """删除空文件夹"""
|
|
|
+ payload = {
|
|
|
+ "src_dir": src_dir
|
|
|
+ }
|
|
|
+ response = requests.post(
|
|
|
+ f'{self.url}/fs/remove_empty_director',
|
|
|
+ headers=self.headers,
|
|
|
+ json=payload
|
|
|
+ )
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def recursive_collect_contents(self,
|
|
|
+ remote_download_path,
|
|
|
+ home_download_path,
|
|
|
+ src_dir=None,
|
|
|
+ dest_path=None,
|
|
|
+ current_sub_path=''):
|
|
|
+ contents = []
|
|
|
+ file_list = self.list_directory(remote_download_path)
|
|
|
+ # file_info_json = json.dumps(file_list, indent=4)
|
|
|
+ # logging.info(f'file_info_json: {file_info_json}')
|
|
|
+ if file_list['data']['total'] == 0:
|
|
|
+ return []
|
|
|
+
|
|
|
+ for file_info in file_list['data']['content']:
|
|
|
+ # 拼接完整的远程路径
|
|
|
+ full_path = os.path.join(remote_download_path, file_info['name']).replace('\\', '/')
|
|
|
+
|
|
|
+ # 初始化本地下载路径和复制/移动目的地路径
|
|
|
+ local_download_path = ''
|
|
|
+ new_dest_path = ''
|
|
|
+ new_src_dir = ''
|
|
|
+ # 根据条件构建本地下载路径和复制/移动目的地路径
|
|
|
+ if home_download_path is not None:
|
|
|
+ local_download_path = os.path.join(home_download_path, current_sub_path, file_info['name']).replace(
|
|
|
+ '\\',
|
|
|
+ '/')
|
|
|
+ if dest_path is not None:
|
|
|
+ new_dest_path = os.path.join(dest_path, current_sub_path).replace('\\', '/')
|
|
|
+
|
|
|
+ if src_dir is not None:
|
|
|
+ new_src_dir = os.path.join(src_dir, current_sub_path).replace('\\', '/')
|
|
|
+
|
|
|
+ item = {
|
|
|
+ 'name': file_info['name'],
|
|
|
+ 'is_dir': file_info['is_dir'],
|
|
|
+ 'path': full_path, # 存储完整的远程路径
|
|
|
+ 'downloads_path': local_download_path,
|
|
|
+ 'src_dir': new_src_dir,
|
|
|
+ 'dst_dir': new_dest_path
|
|
|
+ }
|
|
|
+ contents.append(item)
|
|
|
+
|
|
|
+ if file_info['is_dir']:
|
|
|
+ # 更新子路径为当前文件夹的路径
|
|
|
+ new_sub_path = os.path.join(current_sub_path, file_info['name'])
|
|
|
+ sub_contents = self.recursive_collect_contents(full_path,
|
|
|
+ home_download_path,
|
|
|
+ src_dir,
|
|
|
+ dest_path,
|
|
|
+ new_sub_path)
|
|
|
+ contents.extend(sub_contents)
|
|
|
+
|
|
|
+ return contents
|
|
|
+
|
|
|
+ def copy_files(self, local_json_path, is_debug=False):
|
|
|
+ """执行拷贝文件"""
|
|
|
+ # 读取本地 JSON 文件
|
|
|
+ with open(local_json_path, 'r', encoding='utf-8') as f:
|
|
|
+ directory_contents = json.load(f)
|
|
|
+
|
|
|
+ for item in directory_contents:
|
|
|
+
|
|
|
+ if not item['is_dir']:
|
|
|
+ file_name = item['name']
|
|
|
+ original_path = item['src_dir'] # 获取原始文件路径
|
|
|
+ des_path = item['dst_dir'] # 获取原始文件路径
|
|
|
+
|
|
|
+ if is_debug:
|
|
|
+ logging.info(f"Debug mode: Copy {file_name}")
|
|
|
+ else:
|
|
|
+ # 复制文件
|
|
|
+ self.copy_file(original_path, des_path, [file_name])
|
|
|
+ logging.info(
|
|
|
+ f"Copied: {file_name} from {original_path} to {des_path}")
|