import json import logging import os from enum import unique, Enum import requests class ActionType(Enum): COPY = 1 MOVE = 2 @unique class TaskType(Enum): UPLOAD = 'upload' COPY = 'copy' ARIA2_DOWNLOAD = 'aria2_down' ARIA2_TRANSFER = 'aria2_transfer' QBITTORRENT_DOWNLOAD = 'qbit_down' QBITTORRENT_TRANSFER = 'qbit_transfer' class AlistAPI: def __init__(self, url, username, password): self.url = url self.headers = { 'UserAgent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/87.0.4280.88 Safari/537.36', 'Content-Type': 'application/json' } # self.aria2_client = Client(rpc_url, secret=rpc_secret) # self.aria2_api = API(self.aria2_client) def login(self, username, password): data = { 'username': username, 'password': password } response = requests.post(f'{self.url}/auth/login', data=json.dumps(data), headers=self.headers) if response.status_code == 200: token = response.json() self.headers['Authorization'] = token['data']['token'] else: raise Exception('Login failed') def get_directory(self, path="", password="", page=1, per_page=0, refresh=False): payload = { "path": path, "password": password, "page": page, "per_page": per_page, "refresh": refresh } response = requests.post(f'{self.url}/fs/dirs', data=json.dumps(payload), headers=self.headers) return response.json() def copy_file(self, src_dir, dst_dir, names): payload = { "src_dir": src_dir, "dst_dir": dst_dir, "names": names } response = requests.post(f'{self.url}/fs/copy', data=json.dumps(payload), headers=self.headers) return response.json() def get_completed_tasks(self, task_type): """获取指定任务类型的已完成任务列表""" if task_type == TaskType.UPLOAD: api_endpoint = '/admin/task/upload/done' elif task_type == TaskType.COPY: api_endpoint = '/admin/task/copy/done' elif task_type == TaskType.ARIA2_DOWNLOAD: api_endpoint = '/admin/task/aria2_down/done' elif task_type == TaskType.ARIA2_TRANSFER: api_endpoint = '/admin/task/aria2_transfer/done' elif task_type == TaskType.QBITTORRENT_DOWNLOAD: api_endpoint = '/admin/task/qbit_down/done' elif task_type == TaskType.QBITTORRENT_TRANSFER: api_endpoint = '/admin/task/qbit_transfer/done' else: raise ValueError("Invalid task type") response = requests.get( f'{self.url}{api_endpoint}', headers=self.headers ) return response.json() def copy_directory(self, src_path, dst_path): file_list = self.list_directory(src_path) if not file_list: return for file_info in file_list['data']['content']: if file_info['is_dir']: new_src_path = src_path + "/" + file_info['name'] new_dst_path = dst_path + "/" + file_info['name'] # new_src_path = os.path.join(src_path, file_info['name']) # new_dst_path = os.path.join(dst_path, file_info['name']) print(f"Copying directory: {new_src_path} to {new_dst_path}") self.copy_directory(new_src_path, new_dst_path) else: file_name = file_info['name'] print(f"Copying file: {src_path}/{file_name} to {dst_path}/{file_name}") # 这里原本是调用 self.copy_file,现在改为仅打印信息 self.copy_file(src_path, dst_path, [file_name]) def list_directory(self, path, password="", page=1, per_page=0, refresh=False): payload = { "path": path, "password": password, "page": page, "per_page": per_page, "refresh": refresh } response = requests.post(f'{self.url}/fs/list', data=json.dumps(payload), headers=self.headers) if response.status_code == 200: return response.json() else: logging.error(f"Failed to list directory: {response.text}") return None # 或者 return {} 以保持返回类型的一致性 def get_file_or_directory_info(self, path, password="", page=1, per_page=0, refresh=False): payload = { "path": path, "password": password, "page": page, "per_page": per_page, "refresh": refresh } response = requests.post(f"{self.url}/fs/get", data=json.dumps(payload), headers=self.headers) if response.status_code == 200: return response.json() return None def move_file(self, src_dir, dst_dir, names): payload = json.dumps({ "src_dir": src_dir, "dst_dir": dst_dir, "names": names }) response = requests.post(f"{self.url}/fs/move", data=payload, headers=self.headers) return response.json() def remove_files_or_folders(self, dir_path, names): """删除指定目录下的文件或文件夹""" payload = { "dir": dir_path, "names": names } response = requests.post( f'{self.url}/fs/remove', headers=self.headers, json=payload ) return response.json() def remove_empty_directory(self, src_dir): """删除空文件夹""" payload = { "src_dir": src_dir } response = requests.post( f'{self.url}/fs/remove_empty_director', headers=self.headers, json=payload ) return response.json() def recursive_collect_contents(self, remote_download_path, home_download_path, src_dir=None, dest_path=None, current_sub_path='', parent_folder_name=''): contents = [] file_list = self.list_directory(remote_download_path) # file_info_json = json.dumps(file_list, indent=4) # logging.info(f'file_info_json: {file_info_json}') if file_list is None or 'data' not in file_list or 'total' not in file_list['data']: logging.error("Directory listing failed or returned unexpected data") return [] if file_list is None: return [] if file_list['data']['total'] == 0: return [] for file_info in file_list['data']['content']: # 拼接完整的远程路径 full_path = os.path.join(remote_download_path, file_info['name']).replace('\\', '/') # 初始化本地下载路径和复制/移动目的地路径 local_download_path = '' new_dest_path = '' new_src_dir = '' # 根据条件构建本地下载路径和复制/移动目的地路径 if home_download_path is not None: local_download_path = os.path.join(home_download_path, current_sub_path, file_info['name']).replace( '\\', '/') if dest_path is not None: new_dest_path = os.path.join(dest_path, current_sub_path).replace('\\', '/') if src_dir is not None: new_src_dir = os.path.join(src_dir, current_sub_path).replace('\\', '/') parent_path = os.path.dirname(full_path) folder_name = os.path.basename(parent_path) item = { 'name': file_info['name'], 'folder_name': folder_name, 'parent_folder_name': parent_folder_name, 'is_dir': file_info['is_dir'], 'path': full_path, # 存储完整的远程路径 'downloads_path': local_download_path, 'src_dir': new_src_dir, 'dst_dir': new_dest_path } contents.append(item) if file_info['is_dir']: # 更新子路径为当前文件夹的路径 new_sub_path = os.path.join(current_sub_path, file_info['name']) sub_contents = self.recursive_collect_contents(full_path, home_download_path, src_dir, dest_path, new_sub_path, parent_folder_name=folder_name) contents.extend(sub_contents) return contents def copy_files(self, local_json_path, is_debug=False): """执行拷贝文件""" # 读取本地 JSON 文件 with open(local_json_path, 'r', encoding='utf-8') as f: directory_contents = json.load(f) for item in directory_contents: if not item['is_dir']: file_name = item['name'] original_path = item['src_dir'] # 获取原始文件路径 des_path = item['dst_dir'] # 获取原始文件路径 if is_debug: logging.info(f"Debug mode: Copy {file_name}") else: # 复制文件 self.copy_file(original_path, des_path, [file_name]) logging.info( f"Copied: {file_name} from {original_path} to {des_path}") def save_directory_contents(self, remote_download_path, local_download_path, scy_copy_path, des_copy_path, parent_dir): """获取远程和本地对应的目录结构,并保存为 JSON 文件""" # 获取远程目录结构 remote_data = self.recursive_collect_contents( remote_download_path, local_download_path ) remote_json = os.path.join(parent_dir, 'data', 'remote_data.json') with open(remote_json, 'w', encoding='utf-8') as f: json.dump(remote_data, f, indent=4) # # 获取本地目录结构 # home_data = self.home_alist_api.recursive_collect_contents( # scy_copy_path, des_copy_path, scy_copy_path, des_copy_path # ) # home_json_path = os.path.join(parent_dir, 'data', self.home_data) # with open(home_json_path, 'w', encoding='utf-8') as f: # json.dump(home_data, f, indent=4) return remote_data