|
@@ -0,0 +1,154 @@
|
|
|
+import json
|
|
|
+import os
|
|
|
+import shutil
|
|
|
+import time
|
|
|
+
|
|
|
+import requests
|
|
|
+from aria2p import API, Client
|
|
|
+
|
|
|
+
|
|
|
+class AlistAPI:
|
|
|
+ def __init__(self, url, aria2_rpc_url=None, aria2_rpc_secret=None, username=None, password=None):
|
|
|
+ self.url = url
|
|
|
+ self.UserAgent = ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
|
|
|
+ 'Chrome/87.0.4280.88 Safari/537.36')
|
|
|
+ self.headers = {
|
|
|
+ 'UserAgent': self.UserAgent,
|
|
|
+ 'Content-Type': 'application/json'
|
|
|
+ }
|
|
|
+ self.aria2_client = Client(aria2_rpc_url, secret=aria2_rpc_secret)
|
|
|
+ self.aria2_api = API(self.aria2_client)
|
|
|
+ if username and password:
|
|
|
+ self.login(username, password)
|
|
|
+
|
|
|
+ def login(self, username, password):
|
|
|
+ data = {
|
|
|
+ 'username': username,
|
|
|
+ 'password': password
|
|
|
+ }
|
|
|
+ response = requests.post(f'{self.url}/auth/login', data=json.dumps(data), headers=self.headers)
|
|
|
+ if response.status_code == 200:
|
|
|
+ token = response.json()
|
|
|
+ self.headers['Authorization'] = token['data']['token']
|
|
|
+ else:
|
|
|
+ raise Exception('Login failed')
|
|
|
+
|
|
|
+ def get_directory(self, path="", password="", page=1, per_page=0, refresh=False):
|
|
|
+ payload = {
|
|
|
+ "path": path,
|
|
|
+ "password": password,
|
|
|
+ "page": page,
|
|
|
+ "per_page": per_page,
|
|
|
+ "refresh": refresh
|
|
|
+ }
|
|
|
+ response = requests.post(f'{self.url}/fs/dirs', data=json.dumps(payload), headers=self.headers)
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def copy_file(self, src_dir, dst_dir, names):
|
|
|
+ payload = {
|
|
|
+ "src_dir": src_dir,
|
|
|
+ "dst_dir": dst_dir,
|
|
|
+ "names": names
|
|
|
+ }
|
|
|
+ response = requests.post(f'{self.url}/fs/copy', data=json.dumps(payload), headers=self.headers)
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def list_directory(self, path, password="", page=1, per_page=0, refresh=False):
|
|
|
+ payload = {
|
|
|
+ "path": path,
|
|
|
+ "password": password,
|
|
|
+ "page": page,
|
|
|
+ "per_page": per_page,
|
|
|
+ "refresh": refresh
|
|
|
+ }
|
|
|
+ response = requests.post(f'{self.url}/fs/list', data=json.dumps(payload), headers=self.headers)
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def get_file_or_directory_info(self, path, password="", page=1, per_page=0, refresh=False):
|
|
|
+ payload = {
|
|
|
+ "path": path,
|
|
|
+ "password": password,
|
|
|
+ "page": page,
|
|
|
+ "per_page": per_page,
|
|
|
+ "refresh": refresh
|
|
|
+ }
|
|
|
+ response = requests.post(f"{self.url}/fs/get", data=json.dumps(payload), headers=self.headers)
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def move_file(self, src_dir, dst_dir, names):
|
|
|
+ payload = json.dumps({
|
|
|
+ "src_dir": src_dir,
|
|
|
+ "dst_dir": dst_dir,
|
|
|
+ "names": names
|
|
|
+ })
|
|
|
+ response = requests.post(f"{self.url}/fs/move", data=payload, headers=self.headers)
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def download_directory(self, path, download_path):
|
|
|
+ file_list = self.list_directory(path)
|
|
|
+ if not file_list:
|
|
|
+ return
|
|
|
+ for file_info in file_list['data']['content']:
|
|
|
+ if file_info['is_dir']:
|
|
|
+ self.download_directory(path + "/" + file_info['name'], download_path)
|
|
|
+ else:
|
|
|
+ file_download_url = self.url + "/d" + path + "/" + file_info['name']
|
|
|
+ sign = file_info.get('sign')
|
|
|
+ if sign:
|
|
|
+ file_download_url += "?sign=" + sign
|
|
|
+ self.aria2_api.add_uris([file_download_url], options={"dir": download_path})
|
|
|
+
|
|
|
+ def debug_directory(self, path, download_path):
|
|
|
+ file_list = self.list_directory(path)
|
|
|
+ if not file_list:
|
|
|
+ return
|
|
|
+ for file_info in file_list['data']['content']:
|
|
|
+ if file_info['is_dir']:
|
|
|
+ print(f"Directory: {path}/{file_info['name']}")
|
|
|
+ self.debug_directory(path + "/" + file_info['name'], download_path)
|
|
|
+ else:
|
|
|
+ file_download_url = self.url + "/d" + path + "/" + file_info['name']
|
|
|
+ sign = file_info.get('sign')
|
|
|
+ if sign:
|
|
|
+ file_download_url += "?sign=" + sign
|
|
|
+ print(f"File: {file_download_url}, Save to: {download_path}")
|
|
|
+
|
|
|
+ # alist 下载完成复制
|
|
|
+ def monitor_and_copy(self, download_path, destination_path, check_interval=10):
|
|
|
+ while True:
|
|
|
+ downloads = self.aria2_api.get_downloads()
|
|
|
+ for download in downloads:
|
|
|
+ if download.is_complete:
|
|
|
+ for file in download.files:
|
|
|
+ if file.selected:
|
|
|
+ file_name = os.path.basename(file.path)
|
|
|
+ names = [file_name]
|
|
|
+
|
|
|
+ # 复制文件
|
|
|
+ self.copy_file(download_path, destination_path, names)
|
|
|
+ print(f"Copied: {file_name} from {download_path} to {destination_path}")
|
|
|
+
|
|
|
+ # 从 Aria2 中移除已完成的下载记录
|
|
|
+ download.remove()
|
|
|
+
|
|
|
+ time.sleep(check_interval)
|
|
|
+
|
|
|
+ # alist 下载完成移动
|
|
|
+ def monitor_and_move(self, download_path, destination_path, check_interval=10):
|
|
|
+ while True:
|
|
|
+ downloads = self.aria2_api.get_downloads()
|
|
|
+ for download in downloads:
|
|
|
+ if download.is_complete:
|
|
|
+ for file in download.files:
|
|
|
+ if file.selected:
|
|
|
+ file_name = os.path.basename(file.path)
|
|
|
+ names = [file_name]
|
|
|
+
|
|
|
+ # 移动文件
|
|
|
+ self.move_file(download_path, destination_path, names)
|
|
|
+ print(f"Moved: {file_name} from {download_path} to {destination_path}")
|
|
|
+
|
|
|
+ # 从 Aria2 中移除已完成的下载记录
|
|
|
+ download.remove()
|
|
|
+
|
|
|
+ time.sleep(check_interval)
|