123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- import json
- import logging
- import os
- from enum import unique, Enum
- import requests
- class ActionType(Enum):
- COPY = 1
- MOVE = 2
- @unique
- class TaskType(Enum):
- UPLOAD = 'upload'
- COPY = 'copy'
- ARIA2_DOWNLOAD = 'aria2_down'
- ARIA2_TRANSFER = 'aria2_transfer'
- QBITTORRENT_DOWNLOAD = 'qbit_down'
- QBITTORRENT_TRANSFER = 'qbit_transfer'
- class AlistAPI:
- def __init__(self, url, username, password):
- self.url = url
- self.headers = {
- 'UserAgent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
- 'Chrome/87.0.4280.88 Safari/537.36',
- 'Content-Type': 'application/json'
- }
- # self.aria2_client = Client(rpc_url, secret=rpc_secret)
- # self.aria2_api = API(self.aria2_client)
- def login(self, username, password):
- data = {
- 'username': username,
- 'password': password
- }
- response = requests.post(f'{self.url}/auth/login', data=json.dumps(data), headers=self.headers)
- if response.status_code == 200:
- token = response.json()
- self.headers['Authorization'] = token['data']['token']
- else:
- raise Exception('Login failed')
- def get_directory(self, path="", password="", page=1, per_page=0, refresh=False):
- payload = {
- "path": path,
- "password": password,
- "page": page,
- "per_page": per_page,
- "refresh": refresh
- }
- response = requests.post(f'{self.url}/fs/dirs', data=json.dumps(payload), headers=self.headers)
- return response.json()
- def copy_file(self, src_dir, dst_dir, names):
- payload = {
- "src_dir": src_dir,
- "dst_dir": dst_dir,
- "names": names
- }
- response = requests.post(f'{self.url}/fs/copy', data=json.dumps(payload), headers=self.headers)
- return response.json()
- def get_completed_tasks(self, task_type):
- """获取指定任务类型的已完成任务列表"""
- if task_type == TaskType.UPLOAD:
- api_endpoint = '/admin/task/upload/done'
- elif task_type == TaskType.COPY:
- api_endpoint = '/admin/task/copy/done'
- elif task_type == TaskType.ARIA2_DOWNLOAD:
- api_endpoint = '/admin/task/aria2_down/done'
- elif task_type == TaskType.ARIA2_TRANSFER:
- api_endpoint = '/admin/task/aria2_transfer/done'
- elif task_type == TaskType.QBITTORRENT_DOWNLOAD:
- api_endpoint = '/admin/task/qbit_down/done'
- elif task_type == TaskType.QBITTORRENT_TRANSFER:
- api_endpoint = '/admin/task/qbit_transfer/done'
- else:
- raise ValueError("Invalid task type")
- response = requests.get(
- f'{self.url}{api_endpoint}',
- headers=self.headers
- )
- return response.json()
- def copy_directory(self, src_path, dst_path):
- file_list = self.list_directory(src_path)
- if not file_list:
- return
- for file_info in file_list['data']['content']:
- if file_info['is_dir']:
- new_src_path = src_path + "/" + file_info['name']
- new_dst_path = dst_path + "/" + file_info['name']
- # new_src_path = os.path.join(src_path, file_info['name'])
- # new_dst_path = os.path.join(dst_path, file_info['name'])
- print(f"Copying directory: {new_src_path} to {new_dst_path}")
- self.copy_directory(new_src_path, new_dst_path)
- else:
- file_name = file_info['name']
- print(f"Copying file: {src_path}/{file_name} to {dst_path}/{file_name}")
- # 这里原本是调用 self.copy_file,现在改为仅打印信息
- self.copy_file(src_path, dst_path, [file_name])
- def list_directory(self, path, password="", page=1, per_page=0, refresh=False):
- payload = {
- "path": path,
- "password": password,
- "page": page,
- "per_page": per_page,
- "refresh": refresh
- }
- response = requests.post(f'{self.url}/fs/list', data=json.dumps(payload), headers=self.headers)
- if response.status_code == 200:
- return response.json()
- else:
- logging.error(f"Failed to list directory: {response.text}")
- return None # 或者 return {} 以保持返回类型的一致性
- def get_file_or_directory_info(self, path, password="", page=1, per_page=0, refresh=False):
- payload = {
- "path": path,
- "password": password,
- "page": page,
- "per_page": per_page,
- "refresh": refresh
- }
- response = requests.post(f"{self.url}/fs/get", data=json.dumps(payload), headers=self.headers)
- if response.status_code == 200:
- return response.json()
- return None
- def move_file(self, src_dir, dst_dir, names):
- payload = json.dumps({
- "src_dir": src_dir,
- "dst_dir": dst_dir,
- "names": names
- })
- response = requests.post(f"{self.url}/fs/move", data=payload, headers=self.headers)
- return response.json()
- def remove_files_or_folders(self, dir_path, names):
- """删除指定目录下的文件或文件夹"""
- payload = {
- "dir": dir_path,
- "names": names
- }
- response = requests.post(
- f'{self.url}/fs/remove',
- headers=self.headers,
- json=payload
- )
- return response.json()
- def remove_empty_directory(self, src_dir):
- """删除空文件夹"""
- payload = {
- "src_dir": src_dir
- }
- response = requests.post(
- f'{self.url}/fs/remove_empty_director',
- headers=self.headers,
- json=payload
- )
- return response.json()
- def recursive_collect_contents(self,
- remote_download_path,
- home_download_path,
- src_dir=None,
- dest_path=None,
- current_sub_path='',
- parent_folder_name=''):
- contents = []
- file_list = self.list_directory(remote_download_path)
- # file_info_json = json.dumps(file_list, indent=4)
- # logging.info(f'file_info_json: {file_info_json}')
- if file_list is None or 'data' not in file_list or 'total' not in file_list['data']:
- logging.error("Directory listing failed or returned unexpected data")
- return []
- if file_list is None:
- return []
- if file_list['data']['total'] == 0:
- return []
- for file_info in file_list['data']['content']:
- # 拼接完整的远程路径
- full_path = os.path.join(remote_download_path, file_info['name']).replace('\\', '/')
- # 初始化本地下载路径和复制/移动目的地路径
- local_download_path = ''
- new_dest_path = ''
- new_src_dir = ''
- # 根据条件构建本地下载路径和复制/移动目的地路径
- if home_download_path is not None:
- local_download_path = os.path.join(home_download_path, current_sub_path, file_info['name']).replace(
- '\\',
- '/')
- if dest_path is not None:
- new_dest_path = os.path.join(dest_path, current_sub_path).replace('\\', '/')
- if src_dir is not None:
- new_src_dir = os.path.join(src_dir, current_sub_path).replace('\\', '/')
- parent_path = os.path.dirname(full_path)
- folder_name = os.path.basename(parent_path)
- item = {
- 'name': file_info['name'],
- 'folder_name': folder_name,
- 'parent_folder_name': parent_folder_name,
- 'is_dir': file_info['is_dir'],
- 'path': full_path, # 存储完整的远程路径
- 'downloads_path': local_download_path,
- 'src_dir': new_src_dir,
- 'dst_dir': new_dest_path
- }
- contents.append(item)
- if file_info['is_dir']:
- # 更新子路径为当前文件夹的路径
- new_sub_path = os.path.join(current_sub_path, file_info['name'])
- sub_contents = self.recursive_collect_contents(full_path,
- home_download_path,
- src_dir,
- dest_path,
- new_sub_path,
- parent_folder_name=folder_name)
- contents.extend(sub_contents)
- return contents
- def copy_files(self, local_json_path, is_debug=False):
- """执行拷贝文件"""
- # 读取本地 JSON 文件
- with open(local_json_path, 'r', encoding='utf-8') as f:
- directory_contents = json.load(f)
- for item in directory_contents:
- if not item['is_dir']:
- file_name = item['name']
- original_path = item['src_dir'] # 获取原始文件路径
- des_path = item['dst_dir'] # 获取原始文件路径
- if is_debug:
- logging.info(f"Debug mode: Copy {file_name}")
- else:
- # 复制文件
- self.copy_file(original_path, des_path, [file_name])
- logging.info(
- f"Copied: {file_name} from {original_path} to {des_path}")
- def save_directory_contents(self,
- remote_download_path,
- local_download_path,
- scy_copy_path,
- des_copy_path,
- parent_dir):
- """获取远程和本地对应的目录结构,并保存为 JSON 文件"""
- # 获取远程目录结构
- remote_data = self.recursive_collect_contents(
- remote_download_path,
- local_download_path
- )
- remote_json = os.path.join(parent_dir, 'data', 'remote_data.json')
- with open(remote_json, 'w', encoding='utf-8') as f:
- json.dump(remote_data, f, indent=4)
- # # 获取本地目录结构
- # home_data = self.home_alist_api.recursive_collect_contents(
- # scy_copy_path, des_copy_path, scy_copy_path, des_copy_path
- # )
- # home_json_path = os.path.join(parent_dir, 'data', self.home_data)
- # with open(home_json_path, 'w', encoding='utf-8') as f:
- # json.dump(home_data, f, indent=4)
- return remote_data
|