|
@@ -0,0 +1,390 @@
|
|
|
+import json
|
|
|
+import logging
|
|
|
+import os
|
|
|
+import shutil
|
|
|
+import time
|
|
|
+
|
|
|
+import requests
|
|
|
+from aria2p import API, Client
|
|
|
+
|
|
|
+from src.models.task_type import TaskType, ActionType
|
|
|
+
|
|
|
+
|
|
|
+def construct_path(base_path, sub_path, file_name):
|
|
|
+ # 确保路径部分不包含反斜杠
|
|
|
+ base_path = base_path.replace('\\', '/')
|
|
|
+ sub_path = sub_path.replace('\\', '/')
|
|
|
+ file_name = file_name.replace('\\', '/')
|
|
|
+
|
|
|
+ # 使用 os.path.join 构建整个路径
|
|
|
+ return os.path.join(base_path, sub_path, file_name)
|
|
|
+
|
|
|
+
|
|
|
+class AlistAPI:
|
|
|
+ def __init__(self, url, username, password, rpc_url, rpc_secret):
|
|
|
+ self.url = url
|
|
|
+ self.headers = {
|
|
|
+ 'UserAgent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
|
|
|
+ 'Chrome/87.0.4280.88 Safari/537.36',
|
|
|
+ 'Content-Type': 'application/json'
|
|
|
+ }
|
|
|
+ self.aria2_client = Client(rpc_url, secret=rpc_secret)
|
|
|
+ self.aria2_api = API(self.aria2_client)
|
|
|
+ self.login(username, password)
|
|
|
+
|
|
|
+ def login(self, username, password):
|
|
|
+ data = {
|
|
|
+ 'username': username,
|
|
|
+ 'password': password
|
|
|
+ }
|
|
|
+ response = requests.post(f'{self.url}/auth/login', data=json.dumps(data), headers=self.headers)
|
|
|
+ if response.status_code == 200:
|
|
|
+ token = response.json()
|
|
|
+ self.headers['Authorization'] = token['data']['token']
|
|
|
+ else:
|
|
|
+ raise Exception('Login failed')
|
|
|
+
|
|
|
+ def get_directory(self, path="", password="", page=1, per_page=0, refresh=False):
|
|
|
+ payload = {
|
|
|
+ "path": path,
|
|
|
+ "password": password,
|
|
|
+ "page": page,
|
|
|
+ "per_page": per_page,
|
|
|
+ "refresh": refresh
|
|
|
+ }
|
|
|
+ response = requests.post(f'{self.url}/fs/dirs', data=json.dumps(payload), headers=self.headers)
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def copy_file(self, src_dir, dst_dir, names):
|
|
|
+ payload = {
|
|
|
+ "src_dir": src_dir,
|
|
|
+ "dst_dir": dst_dir,
|
|
|
+ "names": names
|
|
|
+ }
|
|
|
+ response = requests.post(f'{self.url}/fs/copy', data=json.dumps(payload), headers=self.headers)
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def get_completed_tasks(self, task_type):
|
|
|
+ """获取指定任务类型的已完成任务列表"""
|
|
|
+ if task_type == TaskType.UPLOAD:
|
|
|
+ api_endpoint = '/admin/task/upload/done'
|
|
|
+ elif task_type == TaskType.COPY:
|
|
|
+ api_endpoint = '/admin/task/copy/done'
|
|
|
+ elif task_type == TaskType.ARIA2_DOWNLOAD:
|
|
|
+ api_endpoint = '/admin/task/aria2_down/done'
|
|
|
+ elif task_type == TaskType.ARIA2_TRANSFER:
|
|
|
+ api_endpoint = '/admin/task/aria2_transfer/done'
|
|
|
+ elif task_type == TaskType.QBITTORRENT_DOWNLOAD:
|
|
|
+ api_endpoint = '/admin/task/qbit_down/done'
|
|
|
+ elif task_type == TaskType.QBITTORRENT_TRANSFER:
|
|
|
+ api_endpoint = '/admin/task/qbit_transfer/done'
|
|
|
+ else:
|
|
|
+ raise ValueError("Invalid task type")
|
|
|
+
|
|
|
+ response = requests.get(
|
|
|
+ f'{self.url}{api_endpoint}',
|
|
|
+ headers=self.headers
|
|
|
+ )
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def copy_directory(self, src_path, dst_path):
|
|
|
+ file_list = self.list_directory(src_path)
|
|
|
+ if not file_list:
|
|
|
+ return
|
|
|
+ for file_info in file_list['data']['content']:
|
|
|
+ if file_info['is_dir']:
|
|
|
+ new_src_path = src_path + "/" + file_info['name']
|
|
|
+ new_dst_path = dst_path + "/" + file_info['name']
|
|
|
+ # new_src_path = os.path.join(src_path, file_info['name'])
|
|
|
+ # new_dst_path = os.path.join(dst_path, file_info['name'])
|
|
|
+ print(f"Copying directory: {new_src_path} to {new_dst_path}")
|
|
|
+ self.copy_directory(new_src_path, new_dst_path)
|
|
|
+ else:
|
|
|
+ file_name = file_info['name']
|
|
|
+ print(f"Copying file: {src_path}/{file_name} to {dst_path}/{file_name}")
|
|
|
+ # 这里原本是调用 self.copy_file,现在改为仅打印信息
|
|
|
+ self.copy_file(src_path, dst_path, [file_name])
|
|
|
+
|
|
|
+ def list_directory(self, path, password="", page=1, per_page=0, refresh=False):
|
|
|
+ payload = {
|
|
|
+ "path": path,
|
|
|
+ "password": password,
|
|
|
+ "page": page,
|
|
|
+ "per_page": per_page,
|
|
|
+ "refresh": refresh
|
|
|
+ }
|
|
|
+ response = requests.post(f'{self.url}/fs/list', data=json.dumps(payload), headers=self.headers)
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def get_file_or_directory_info(self, path, password="", page=1, per_page=0, refresh=False):
|
|
|
+ payload = {
|
|
|
+ "path": path,
|
|
|
+ "password": password,
|
|
|
+ "page": page,
|
|
|
+ "per_page": per_page,
|
|
|
+ "refresh": refresh
|
|
|
+ }
|
|
|
+ response = requests.post(f"{self.url}/fs/get", data=json.dumps(payload), headers=self.headers)
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def move_file(self, src_dir, dst_dir, names):
|
|
|
+ payload = json.dumps({
|
|
|
+ "src_dir": src_dir,
|
|
|
+ "dst_dir": dst_dir,
|
|
|
+ "names": names
|
|
|
+ })
|
|
|
+ response = requests.post(f"{self.url}/fs/move", data=payload, headers=self.headers)
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def remove_files_or_folders(self, dir_path, names):
|
|
|
+ """删除指定目录下的文件或文件夹"""
|
|
|
+ payload = {
|
|
|
+ "dir": dir_path,
|
|
|
+ "names": names
|
|
|
+ }
|
|
|
+ response = requests.post(
|
|
|
+ f'{self.url}/fs/remove',
|
|
|
+ headers=self.headers,
|
|
|
+ json=payload
|
|
|
+ )
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def remove_empty_directory(self, src_dir):
|
|
|
+ """删除空文件夹"""
|
|
|
+ payload = {
|
|
|
+ "src_dir": src_dir
|
|
|
+ }
|
|
|
+ response = requests.post(
|
|
|
+ f'{self.url}/fs/remove_empty_director',
|
|
|
+ headers=self.headers,
|
|
|
+ json=payload
|
|
|
+ )
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def save_directory_contents_to_json(self, remote_path, local_base_path=None, copy_or_move_destination_path=None,
|
|
|
+ json_file_path=''):
|
|
|
+ file_list = self.list_directory(remote_path)
|
|
|
+ if not file_list:
|
|
|
+ return
|
|
|
+
|
|
|
+ directory_contents = self._recursive_collect_contents(remote_path, local_base_path,
|
|
|
+ copy_or_move_destination_path)
|
|
|
+
|
|
|
+ with open(json_file_path, 'w', encoding='utf-8') as f:
|
|
|
+ json.dump(directory_contents, f, indent=4, ensure_ascii=False)
|
|
|
+
|
|
|
+ def _recursive_collect_contents(self, path, local_base_path, copy_or_move_destination_path, current_sub_path=''):
|
|
|
+ contents = []
|
|
|
+ file_list = self.list_directory(path)
|
|
|
+ for file_info in file_list['data']['content']:
|
|
|
+ # 拼接完整的远程路径
|
|
|
+ full_path = os.path.join(path, file_info['name']).replace('\\', '/')
|
|
|
+
|
|
|
+ # 初始化本地下载路径和复制/移动目的地路径
|
|
|
+ local_download_path = ''
|
|
|
+ copy_move_dest_path = ''
|
|
|
+
|
|
|
+ # 根据条件构建本地下载路径和复制/移动目的地路径
|
|
|
+ if local_base_path is not None:
|
|
|
+ local_download_path = os.path.join(local_base_path, current_sub_path, file_info['name']).replace('\\',
|
|
|
+ '/')
|
|
|
+ if copy_or_move_destination_path is not None:
|
|
|
+ copy_move_dest_path = os.path.join(copy_or_move_destination_path, current_sub_path,
|
|
|
+ file_info['name']).replace('\\', '/')
|
|
|
+
|
|
|
+ item = {
|
|
|
+ 'name': file_info['name'],
|
|
|
+ 'is_dir': file_info['is_dir'],
|
|
|
+ 'path': full_path, # 存储完整的远程路径
|
|
|
+ 'downloads_path': local_download_path,
|
|
|
+ 'copy_des_path': copy_move_dest_path
|
|
|
+ }
|
|
|
+ contents.append(item)
|
|
|
+
|
|
|
+ if file_info['is_dir']:
|
|
|
+ # 更新子路径为当前文件夹的路径
|
|
|
+ new_sub_path = os.path.join(current_sub_path, file_info['name'])
|
|
|
+ sub_contents = self._recursive_collect_contents(full_path, local_base_path,
|
|
|
+ copy_or_move_destination_path, new_sub_path)
|
|
|
+ contents.extend(sub_contents)
|
|
|
+
|
|
|
+ return contents
|
|
|
+
|
|
|
+ def download_directory(self, is_debug=False, json_file_path='directory_contents.json'):
|
|
|
+ # 读取 JSON 文件中的目录内容
|
|
|
+ with open(json_file_path, 'r', encoding='utf-8') as f:
|
|
|
+ directory_contents = json.load(f)
|
|
|
+
|
|
|
+ # 获取 Aria2 的所有下载记录
|
|
|
+ downloads = self.aria2_api.get_downloads()
|
|
|
+
|
|
|
+ # 获取已完成下载的文件名列表
|
|
|
+ completed_files = []
|
|
|
+ for download in downloads:
|
|
|
+ if download.is_complete:
|
|
|
+ for file in download.files:
|
|
|
+ if file.selected:
|
|
|
+ file_name = os.path.basename(file.path)
|
|
|
+ completed_files.append(file_name)
|
|
|
+
|
|
|
+ for item in directory_contents:
|
|
|
+ if not item['is_dir']:
|
|
|
+ # 构建完整的本地路径
|
|
|
+ # full_local_path = os.path.join(os.path.relpath(item['path'], start="/")).replace('\\', '/')
|
|
|
+ # local_file_path = os.path.join(local_base_path, item['name']).replace('\\', '/')
|
|
|
+ full_local_path = item['downloads_path']
|
|
|
+ local_file = item['name']
|
|
|
+ if local_file not in completed_files:
|
|
|
+ file_detail = self.get_file_or_directory_info(item['path'])
|
|
|
+ if file_detail and file_detail['code'] == 200:
|
|
|
+ raw_url = file_detail['data']['raw_url']
|
|
|
+ if not is_debug:
|
|
|
+ # 添加到 Aria2 下载队列
|
|
|
+ self.aria2_api.add_uris([raw_url], options={"dir": full_local_path})
|
|
|
+ else:
|
|
|
+ logging.error(f"Failed to get detailed information for {local_file}")
|
|
|
+ else:
|
|
|
+ logging.info(f"File already downloaded, skipping: {local_file}")
|
|
|
+
|
|
|
+ def monitor_and_copy(self, local_json_path, check_interval=10, is_debug=False, is_running=True):
|
|
|
+ """监控 Aria2 下载完成后,执行拷贝操作"""
|
|
|
+ try:
|
|
|
+ while is_running:
|
|
|
+ # 读取本地 JSON 文件
|
|
|
+ with open(local_json_path, 'r', encoding='utf-8') as f:
|
|
|
+ directory_contents = json.load(f)
|
|
|
+
|
|
|
+ downloads = self.aria2_api.get_downloads()
|
|
|
+ for download in downloads:
|
|
|
+ if download.is_complete:
|
|
|
+ for file in download.files:
|
|
|
+ if file.selected:
|
|
|
+ file_name = os.path.basename(file.path)
|
|
|
+
|
|
|
+ for item in directory_contents:
|
|
|
+ if item['name'] == file_name and not item['is_dir']:
|
|
|
+ original_path = item['path'] # 获取原始文件路径
|
|
|
+ des_path = item['copy_des_path'] # 获取原始文件路径
|
|
|
+
|
|
|
+ if is_debug:
|
|
|
+ logging.info(f"Debug mode: Copy {file_name}")
|
|
|
+ else:
|
|
|
+ # 复制文件
|
|
|
+ self.copy_file(original_path, des_path, [file_name])
|
|
|
+ logging.info(
|
|
|
+ f"Copied: {file_name} from {original_path} to {des_path}")
|
|
|
+
|
|
|
+ time.sleep(check_interval)
|
|
|
+ except Exception as e:
|
|
|
+ logging.error(f"Error occurred: {e}")
|
|
|
+ finally:
|
|
|
+ logging.info("Monitoring and copying completed")
|
|
|
+
|
|
|
+ def monitor_and_move_or_copy(self, local_json_path, check_interval=10,
|
|
|
+ is_debug=False, is_running=True, timeout=3600,
|
|
|
+ condition=None, action_type=ActionType.COPY):
|
|
|
+ """监控 Aria2 下载完成后,执行移动操作并检查任务完成状态"""
|
|
|
+ start_time = time.time()
|
|
|
+ try:
|
|
|
+ while is_running:
|
|
|
+ current_time = time.time()
|
|
|
+ if current_time - start_time > timeout:
|
|
|
+ logging.info(f"Timeout: {timeout} seconds")
|
|
|
+ break
|
|
|
+
|
|
|
+ # 读取本地 JSON 文件
|
|
|
+ with open(local_json_path, 'r', encoding='utf-8') as f:
|
|
|
+ directory_contents = json.load(f)
|
|
|
+
|
|
|
+ for item in directory_contents:
|
|
|
+ if not item['is_dir']:
|
|
|
+ local_file_path = item['downloads_path']
|
|
|
+ remote_file_name = item['remote_file_name']
|
|
|
+ scy_des_path = item['path']
|
|
|
+ des_des_path = item['copy_des_path']
|
|
|
+ # 检查 Aria2 是否已完成该文件的下载
|
|
|
+ downloads = self.aria2_api.get_downloads()
|
|
|
+ for download in downloads:
|
|
|
+ if download.is_complete:
|
|
|
+ for file in download.files:
|
|
|
+ if file.selected and os.path.basename(file.path) == remote_file_name:
|
|
|
+ if is_debug:
|
|
|
+ logging.info(f"Debug mode: {action_type} {remote_file_name}")
|
|
|
+ else:
|
|
|
+ if action_type == ActionType.MOVE:
|
|
|
+ self.move_file(scy_des_path, des_des_path)
|
|
|
+ logging.info(f"Moved: {scy_des_path} to {des_des_path}")
|
|
|
+ elif action_type == ActionType.COPY:
|
|
|
+ self.copy_file(scy_des_path, des_des_path)
|
|
|
+ logging.info(f"Copied: {scy_des_path} to {des_des_path}")
|
|
|
+ # self.remove_files_or_folders([scy_des_path], item['path'])
|
|
|
+ # logging.info(
|
|
|
+ # f"Removed original: {scy_des_path} from {item['path']}")
|
|
|
+
|
|
|
+ if condition is not None and condition():
|
|
|
+ logging.info("Condition met, stopping monitoring")
|
|
|
+ break
|
|
|
+
|
|
|
+ time.sleep(check_interval)
|
|
|
+ except Exception as e:
|
|
|
+ logging.error(f"Error occurred: {e}")
|
|
|
+ finally:
|
|
|
+ logging.info("Monitoring stopped")
|
|
|
+ # def monitor_and_move_or_copy(self, download_path, destination_path, check_interval=10,
|
|
|
+ # is_debug=False, is_running=True, timeout=3600,
|
|
|
+ # condition=None, action_type=ActionType.COPY):
|
|
|
+ # """监控 Aria2 下载完成后,执行移动操作并检查任务完成状态"""
|
|
|
+ # # 记录开始监控的时间
|
|
|
+ # start_time = time.time()
|
|
|
+ # try:
|
|
|
+ # while is_running:
|
|
|
+ # # 检查是否超时
|
|
|
+ # current_time = time.time()
|
|
|
+ # if current_time - start_time > timeout:
|
|
|
+ # logging.info(f"Timeout: {timeout} seconds")
|
|
|
+ # break
|
|
|
+ # downloads = self.aria2_api.get_downloads()
|
|
|
+ # for download in downloads:
|
|
|
+ # if download.is_complete:
|
|
|
+ # all_tasks_completed = True
|
|
|
+ # for file in download.files:
|
|
|
+ # if file.selected:
|
|
|
+ # file_name = os.path.basename(file.path)
|
|
|
+ # names = [file_name]
|
|
|
+ #
|
|
|
+ # if is_debug is False:
|
|
|
+ # if action_type == ActionType.MOVE:
|
|
|
+ # # 移动文件
|
|
|
+ # self.move_file(download_path, destination_path, names)
|
|
|
+ # logging.info(f"Moved: {file_name} from {download_path} to {destination_path}")
|
|
|
+ # if action_type == ActionType.COPY:
|
|
|
+ # # 复制文件
|
|
|
+ # self.copy_file(download_path, destination_path, names)
|
|
|
+ # logging.info(f"Copied: {file_name} from {download_path} to {destination_path}")
|
|
|
+ #
|
|
|
+ # # 检查任务完成状态
|
|
|
+
|
|
|
+ # else:
|
|
|
+ # """调试信息不真到进行操作"""
|
|
|
+ # if action_type == ActionType.MOVE:
|
|
|
+ # # 移动文件
|
|
|
+ # logging.info(f"Moved: {file_name} from {download_path} to {destination_path}")
|
|
|
+ # if action_type == ActionType.COPY:
|
|
|
+ # # 复制文件
|
|
|
+ # logging.info(f"Copied: {file_name} from {download_path} to {destination_path}")
|
|
|
+ #
|
|
|
+ # # 检查是否满足条件
|
|
|
+ # if condition is not None and condition():
|
|
|
+ # logging.info(f"Condition met: {condition}")
|
|
|
+ # break
|
|
|
+ #
|
|
|
+ # time.sleep(check_interval)
|
|
|
+ # except Exception as e:
|
|
|
+ # # 捕获异常,并记录日志
|
|
|
+ # logging.error(f"An error occurred: {e}")
|
|
|
+ # finally:
|
|
|
+ # # 做一些清理工作,比如关闭 Aria2 的连接,删除临时文件等
|
|
|
+ # # self.aria2_api.close()
|
|
|
+ # logging.info(f"Closed Aria2 connection")
|
|
|
+ # # ...其他清理工作
|
|
|
+ # logging.info(f"monitor_and_move_or_copy is run {is_running}")
|