alist.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. import json
  2. import logging
  3. import os
  4. from enum import unique, Enum
  5. import requests
  6. class ActionType(Enum):
  7. COPY = 1
  8. MOVE = 2
  9. @unique
  10. class TaskType(Enum):
  11. UPLOAD = 'upload'
  12. COPY = 'copy'
  13. ARIA2_DOWNLOAD = 'aria2_down'
  14. ARIA2_TRANSFER = 'aria2_transfer'
  15. QBITTORRENT_DOWNLOAD = 'qbit_down'
  16. QBITTORRENT_TRANSFER = 'qbit_transfer'
  17. class AlistAPI:
  18. def __init__(self, url, username, password):
  19. self.url = url
  20. self.headers = {
  21. 'UserAgent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
  22. 'Chrome/87.0.4280.88 Safari/537.36',
  23. 'Content-Type': 'application/json'
  24. }
  25. # self.aria2_client = Client(rpc_url, secret=rpc_secret)
  26. # self.aria2_api = API(self.aria2_client)
  27. def login(self, username, password):
  28. data = {
  29. 'username': username,
  30. 'password': password
  31. }
  32. response = requests.post(f'{self.url}/auth/login', data=json.dumps(data), headers=self.headers)
  33. if response.status_code == 200:
  34. token = response.json()
  35. self.headers['Authorization'] = token['data']['token']
  36. else:
  37. raise Exception('Login failed')
  38. def get_directory(self, path="", password="", page=1, per_page=0, refresh=False):
  39. payload = {
  40. "path": path,
  41. "password": password,
  42. "page": page,
  43. "per_page": per_page,
  44. "refresh": refresh
  45. }
  46. response = requests.post(f'{self.url}/fs/dirs', data=json.dumps(payload), headers=self.headers)
  47. return response.json()
  48. def copy_file(self, src_dir, dst_dir, names):
  49. payload = {
  50. "src_dir": src_dir,
  51. "dst_dir": dst_dir,
  52. "names": names
  53. }
  54. response = requests.post(f'{self.url}/fs/copy', data=json.dumps(payload), headers=self.headers)
  55. return response.json()
  56. def get_completed_tasks(self, task_type):
  57. """获取指定任务类型的已完成任务列表"""
  58. if task_type == TaskType.UPLOAD:
  59. api_endpoint = '/admin/task/upload/done'
  60. elif task_type == TaskType.COPY:
  61. api_endpoint = '/admin/task/copy/done'
  62. elif task_type == TaskType.ARIA2_DOWNLOAD:
  63. api_endpoint = '/admin/task/aria2_down/done'
  64. elif task_type == TaskType.ARIA2_TRANSFER:
  65. api_endpoint = '/admin/task/aria2_transfer/done'
  66. elif task_type == TaskType.QBITTORRENT_DOWNLOAD:
  67. api_endpoint = '/admin/task/qbit_down/done'
  68. elif task_type == TaskType.QBITTORRENT_TRANSFER:
  69. api_endpoint = '/admin/task/qbit_transfer/done'
  70. else:
  71. raise ValueError("Invalid task type")
  72. response = requests.get(
  73. f'{self.url}{api_endpoint}',
  74. headers=self.headers
  75. )
  76. return response.json()
  77. def copy_directory(self, src_path, dst_path):
  78. file_list = self.list_directory(src_path)
  79. if not file_list:
  80. return
  81. for file_info in file_list['data']['content']:
  82. if file_info['is_dir']:
  83. new_src_path = src_path + "/" + file_info['name']
  84. new_dst_path = dst_path + "/" + file_info['name']
  85. # new_src_path = os.path.join(src_path, file_info['name'])
  86. # new_dst_path = os.path.join(dst_path, file_info['name'])
  87. print(f"Copying directory: {new_src_path} to {new_dst_path}")
  88. self.copy_directory(new_src_path, new_dst_path)
  89. else:
  90. file_name = file_info['name']
  91. print(f"Copying file: {src_path}/{file_name} to {dst_path}/{file_name}")
  92. # 这里原本是调用 self.copy_file,现在改为仅打印信息
  93. self.copy_file(src_path, dst_path, [file_name])
  94. def list_directory(self, path, password="", page=1, per_page=0, refresh=False):
  95. payload = {
  96. "path": path,
  97. "password": password,
  98. "page": page,
  99. "per_page": per_page,
  100. "refresh": refresh
  101. }
  102. response = requests.post(f'{self.url}/fs/list', data=json.dumps(payload), headers=self.headers)
  103. if response.status_code == 200:
  104. return response.json()
  105. else:
  106. logging.error(f"Failed to list directory: {response.text}")
  107. return None # 或者 return {} 以保持返回类型的一致性
  108. def get_file_or_directory_info(self, path, password="", page=1, per_page=0, refresh=False):
  109. payload = {
  110. "path": path,
  111. "password": password,
  112. "page": page,
  113. "per_page": per_page,
  114. "refresh": refresh
  115. }
  116. response = requests.post(f"{self.url}/fs/get", data=json.dumps(payload), headers=self.headers)
  117. if response.status_code == 200:
  118. return response.json()
  119. return None
  120. def move_file(self, src_dir, dst_dir, names):
  121. payload = json.dumps({
  122. "src_dir": src_dir,
  123. "dst_dir": dst_dir,
  124. "names": names
  125. })
  126. response = requests.post(f"{self.url}/fs/move", data=payload, headers=self.headers)
  127. return response.json()
  128. def remove_files_or_folders(self, dir_path, names):
  129. """删除指定目录下的文件或文件夹"""
  130. payload = {
  131. "dir": dir_path,
  132. "names": names
  133. }
  134. response = requests.post(
  135. f'{self.url}/fs/remove',
  136. headers=self.headers,
  137. json=payload
  138. )
  139. return response.json()
  140. def remove_empty_directory(self, src_dir):
  141. """删除空文件夹"""
  142. payload = {
  143. "src_dir": src_dir
  144. }
  145. response = requests.post(
  146. f'{self.url}/fs/remove_empty_director',
  147. headers=self.headers,
  148. json=payload
  149. )
  150. return response.json()
  151. def recursive_collect_contents(self,
  152. remote_download_path,
  153. home_download_path,
  154. src_dir=None,
  155. dest_path=None,
  156. current_sub_path='',
  157. parent_folder_name=''):
  158. contents = []
  159. file_list = self.list_directory(remote_download_path)
  160. # file_info_json = json.dumps(file_list, indent=4)
  161. # logging.info(f'file_info_json: {file_info_json}')
  162. if file_list is None or 'data' not in file_list or 'total' not in file_list['data']:
  163. logging.error("Directory listing failed or returned unexpected data")
  164. return []
  165. if file_list is None:
  166. return []
  167. if file_list['data']['total'] == 0:
  168. return []
  169. for file_info in file_list['data']['content']:
  170. # 拼接完整的远程路径
  171. full_path = os.path.join(remote_download_path, file_info['name']).replace('\\', '/')
  172. # 初始化本地下载路径和复制/移动目的地路径
  173. local_download_path = ''
  174. new_dest_path = ''
  175. new_src_dir = ''
  176. # 根据条件构建本地下载路径和复制/移动目的地路径
  177. if home_download_path is not None:
  178. local_download_path = os.path.join(home_download_path, current_sub_path, file_info['name']).replace(
  179. '\\',
  180. '/')
  181. if dest_path is not None:
  182. new_dest_path = os.path.join(dest_path, current_sub_path).replace('\\', '/')
  183. if src_dir is not None:
  184. new_src_dir = os.path.join(src_dir, current_sub_path).replace('\\', '/')
  185. parent_path = os.path.dirname(full_path)
  186. folder_name = os.path.basename(parent_path)
  187. item = {
  188. 'name': file_info['name'],
  189. 'folder_name': folder_name,
  190. 'parent_folder_name': parent_folder_name,
  191. 'is_dir': file_info['is_dir'],
  192. 'path': full_path, # 存储完整的远程路径
  193. 'downloads_path': local_download_path,
  194. 'src_dir': new_src_dir,
  195. 'dst_dir': new_dest_path
  196. }
  197. contents.append(item)
  198. if file_info['is_dir']:
  199. # 更新子路径为当前文件夹的路径
  200. new_sub_path = os.path.join(current_sub_path, file_info['name'])
  201. sub_contents = self.recursive_collect_contents(full_path,
  202. home_download_path,
  203. src_dir,
  204. dest_path,
  205. new_sub_path,
  206. parent_folder_name=folder_name)
  207. contents.extend(sub_contents)
  208. return contents
  209. def copy_files(self, local_json_path, is_debug=False):
  210. """执行拷贝文件"""
  211. # 读取本地 JSON 文件
  212. with open(local_json_path, 'r', encoding='utf-8') as f:
  213. directory_contents = json.load(f)
  214. for item in directory_contents:
  215. if not item['is_dir']:
  216. file_name = item['name']
  217. original_path = item['src_dir'] # 获取原始文件路径
  218. des_path = item['dst_dir'] # 获取原始文件路径
  219. if is_debug:
  220. logging.info(f"Debug mode: Copy {file_name}")
  221. else:
  222. # 复制文件
  223. self.copy_file(original_path, des_path, [file_name])
  224. logging.info(
  225. f"Copied: {file_name} from {original_path} to {des_path}")
  226. def save_directory_contents(self,
  227. remote_download_path,
  228. local_download_path,
  229. scy_copy_path,
  230. des_copy_path,
  231. parent_dir):
  232. """获取远程和本地对应的目录结构,并保存为 JSON 文件"""
  233. # 获取远程目录结构
  234. remote_data = self.recursive_collect_contents(
  235. remote_download_path,
  236. local_download_path
  237. )
  238. remote_json = os.path.join(parent_dir, 'data', 'remote_data.json')
  239. with open(remote_json, 'w', encoding='utf-8') as f:
  240. json.dump(remote_data, f, indent=4)
  241. # # 获取本地目录结构
  242. # home_data = self.home_alist_api.recursive_collect_contents(
  243. # scy_copy_path, des_copy_path, scy_copy_path, des_copy_path
  244. # )
  245. # home_json_path = os.path.join(parent_dir, 'data', self.home_data)
  246. # with open(home_json_path, 'w', encoding='utf-8') as f:
  247. # json.dump(home_data, f, indent=4)
  248. return remote_data