@@ -0,0 +1,686 @@
+import configparser
+import json
+import logging
+import os
+import shutil
+import signal
+import threading
+import time
+import requests
+from pyarr import RadarrAPI
+from src.models.task_type import TaskType, ActionType
+from aria2p import API, Client
+class NasToolsClient:
+ def __init__(self, base_url, api_key=None):
+ self.base_url = base_url
+ self.api_key = api_key
+ self.token = None
+ def login(self, username, password):
+ """登录并获取令牌"""
+ url = f'{self.base_url}/api/v1/user/login'
+ data = {
+ "username": username,
+ "password": password
+ }
+ headers = {'Content-Type': 'application/x-www-form-urlencoded'}
+ response = requests.post(url, data=data, headers=headers)
+ if response.status_code == 200:
+ self.token = response.json().get("data", {}).get("token")
+ logging.info(self.token)
+ logging.info(self.api_key)
+ return self.token
+ else:
+ raise Exception("Failed to login:", response.text)
+ def get(self, endpoint):
+ """执行带有令牌的 GET 请求"""
+ url = f"{self.base_url}{endpoint}"
+ headers = {
+ "Authorization": f"Bearer {self.token}" if self.token else f"ApiKey {self.api_key}"
+ }
+ response = requests.get(url, headers=headers)
+ if response.status_code == 200:
+ return response.json()
+ else:
+ raise Exception("Failed to get data:", response.text)
+ def sync_list(self):
+ """
+ 同步目录
+ """
+ url = f'{self.base_url}/api/v1/sync/directory/list'
+ headers = {
+ 'accept': 'application/json',
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ "Authorization": f"{self.token}",
+ }
+ logging.info(f"headers: {headers}")
+ params = {
+ 'apikey': self.api_key
+ }
+ response = requests.post(url, params=params, headers=headers)
+ logging.info(response.json())
+ return response.json()
+ def sync(self, sid):
+ """
+ 同步目录 {cmd: "run_directory_sync", data: {sid: []}}
+ """
+ url = f'{self.base_url}/api/v1/sync/directory/run'
+ headers = {
+ 'accept': 'application/json',
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ "Authorization": f"{self.token}",
+ }
+ logging.info(f"headers: {headers}")
+ params = {
+ 'apikey': self.api_key,
+ 'sid': sid
+ }
+ response = requests.get(url, params=params, headers=headers)
+ logging.info(response.json())
+ return response.json()
+ def run_service(self, service_name):
+ """
+ 运行指定的服务
+ """
+ url = f'{self.base_url}/api/v1/service/run'
+ headers = {
+ 'accept': 'application/json',
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ 'security': self.api_key,
+ "Authorization": f"{self.token}",
+ }
+ logging.info(f"headers: {headers}")
+ payload = {'item': service_name}
+ response = requests.post(url, data=payload, headers=headers)
+ logging.info(response.json())
+ return response.json()
+class Aria2API:
+ def __init__(self, url, secret):
+ self.aria2_client = Client(url, secret=secret)
+ self.aria2_api = API(self.aria2_client)
+ self.is_running = True
+ self.check_interval = 10
+ self.monitor_thread = None
+ def remove(self, download):
+ return self.aria2_api.remove(download)
+ def get_downloads(self):
+ return self.aria2_api.get_downloads()
+ def add_url(self, url, options=None):
+ """
+ 将下载链接添加到 Aria2。
+ :param url: 下载链接
+ :param options: Aria2下载选项,例如下载目录
+ """
+ try:
+ download = self.aria2_api.add_uris([url], options=options if options else {})
+ return download
+ except Exception as e:
+ print(f"Error adding URL to Aria2: {e}")
+ return None
+ def monitor_aria2_and_delete(self):
+ logging.info('Start remote Aria2 download is_complete monitoring')
+ while self.is_running:
+ downloads = self.aria2_api.get_downloads()
+ for download in downloads:
+ if download.is_complete:
+ pass
+ time.sleep(self.check_interval)
+ def start_monitoring(self):
+ if not self.is_running:
+ self.is_running = True
+ self.monitor_thread = threading.Thread(target=self.monitor_aria2_and_delete)
+ self.monitor_thread.start()
+ def stop_monitoring(self):
+ self.is_running = False
+ if self.monitor_thread:
+ self.monitor_thread.join()
+def construct_path(base_path, sub_path, file_name):
+ base_path = base_path.replace('\\', '/')
+ sub_path = sub_path.replace('\\', '/')
+ file_name = file_name.replace('\\', '/')
+ return os.path.join(base_path, sub_path, file_name)
+class RadarClient:
+ def __init__(self, url, key):
+ self.url = url
+ self.api_key = key
+ self.client = RadarrAPI(self.url, self.api_key)
+ def get_all_movies(self):
+ return self.client.get_movie()
+ def delete_movie(self, movie_id):
+ return self.client.del_movie(movie_id, delete_files=True)
+ def save_movies_to_json(self, file_name='data/movies.json'):
+ movies = self.get_all_movies()
+ with open(file_name, 'w') as f:
+ json.dump(movies, f, indent=4)
+ print(f"Movies saved to {file_name}")
+ def get_already_movies(self):
+ """获取全部已经跟踪已经处理完成的电影"""
+ already_processed = set()
+ movies = self.get_all_movies()
+ for movie in movies:
+ if 'movieFile' in movie and movie['movieFile'] and movie['id'] not in already_processed:
+ already_processed.add(movie['id'])
+ return already_processed
+ def continuous_monitoring(self, check_interval=60, custom_action=None):
+ """
+ Continuously monitor movies and perform a custom action if movieFile exists.
+ check_interval: Time in seconds between checks
+ custom_action: Function to be called for each downloaded movie
+ """
+ already_processed = set()
+ while True:
+ movies = self.get_all_movies()
+ for movie in movies:
+ if 'movieFile' in movie and movie['movieFile'] and movie['id'] not in already_processed:
+ custom_action(movie)
+ already_processed.add(movie['id'])
+ time.sleep(check_interval)
+ def find_movie_id_by_filename(self, filename):
+ """根据文件名搜索并返回已完成下载的电影的 ID"""
+ try:
+ movies = self.get_all_movies()
+ for movie in movies:
+ if 'movieFile' in movie and movie['movieFile']:
+ movie_file_name = movie['movieFile']['relativePath']
+ if filename in movie_file_name:
+ return movie['id']
+ return None
+ except Exception as e:
+ return None
+ def get_all_movie_names(self):
+ """获取所有电影名称的集合"""
+ movie_names = set()
+ movies = self.get_all_movies()
+ for movie in movies:
+ if 'movieFile' in movie and movie['movieFile']:
+ movie_file_name = movie['movieFile']['relativePath']
+ movie_names.add(movie_file_name)
+ return movie_names
+class AlistAPI:
+ def __init__(self, url, username, password):
+ self.url = url
+ self.headers = {
+ 'UserAgent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
+ 'Chrome/87.0.4280.88 Safari/537.36',
+ 'Content-Type': 'application/json'
+ }
+ self.login(username, password)
+ def login(self, username, password):
+ data = {
+ 'username': username,
+ 'password': password
+ }
+ response = requests.post(f'{self.url}/auth/login', data=json.dumps(data), headers=self.headers)
+ if response.status_code == 200:
+ token = response.json()
+ self.headers['Authorization'] = token['data']['token']
+ else:
+ raise Exception('Login failed')
+ def get_directory(self, path="", password="", page=1, per_page=0, refresh=False):
+ payload = {
+ "path": path,
+ "password": password,
+ "page": page,
+ "per_page": per_page,
+ "refresh": refresh
+ }
+ response = requests.post(f'{self.url}/fs/dirs', data=json.dumps(payload), headers=self.headers)
+ return response.json()
+ def copy_file(self, src_dir, dst_dir, names):
+ payload = {
+ "src_dir": src_dir,
+ "dst_dir": dst_dir,
+ "names": names
+ }
+ response = requests.post(f'{self.url}/fs/copy', data=json.dumps(payload), headers=self.headers)
+ return response.json()
+ def get_completed_tasks(self, task_type):
+ """获取指定任务类型的已完成任务列表"""
+ if task_type == TaskType.UPLOAD:
+ api_endpoint = '/admin/task/upload/done'
+ elif task_type == TaskType.COPY:
+ api_endpoint = '/admin/task/copy/done'
+ elif task_type == TaskType.ARIA2_DOWNLOAD:
+ api_endpoint = '/admin/task/aria2_down/done'
+ elif task_type == TaskType.ARIA2_TRANSFER:
+ api_endpoint = '/admin/task/aria2_transfer/done'
+ elif task_type == TaskType.QBITTORRENT_DOWNLOAD:
+ api_endpoint = '/admin/task/qbit_down/done'
+ elif task_type == TaskType.QBITTORRENT_TRANSFER:
+ api_endpoint = '/admin/task/qbit_transfer/done'
+ else:
+ raise ValueError("Invalid task type")
+ response = requests.get(
+ f'{self.url}{api_endpoint}',
+ headers=self.headers
+ )
+ return response.json()
+ def copy_directory(self, src_path, dst_path):
+ file_list = self.list_directory(src_path)
+ if not file_list:
+ return
+ for file_info in file_list['data']['content']:
+ if file_info['is_dir']:
+ new_src_path = src_path + "/" + file_info['name']
+ new_dst_path = dst_path + "/" + file_info['name']
+ print(f"Copying directory: {new_src_path} to {new_dst_path}")
+ self.copy_directory(new_src_path, new_dst_path)
+ else:
+ file_name = file_info['name']
+ print(f"Copying file: {src_path}/{file_name} to {dst_path}/{file_name}")
+ self.copy_file(src_path, dst_path, [file_name])
+ def list_directory(self, path, password="", page=1, per_page=0, refresh=False):
+ payload = {
+ "path": path,
+ "password": password,
+ "page": page,
+ "per_page": per_page,
+ "refresh": refresh
+ }
+ response = requests.post(f'{self.url}/fs/list', data=json.dumps(payload), headers=self.headers)
+ return response.json()
+ def get_file_or_directory_info(self, path, password="", page=1, per_page=0, refresh=False):
+ payload = {
+ "path": path,
+ "password": password,
+ "page": page,
+ "per_page": per_page,
+ "refresh": refresh
+ }
+ response = requests.post(f"{self.url}/fs/get", data=json.dumps(payload), headers=self.headers)
+ if response.status_code == 200:
+ return response.json()
+ return None
+ def move_file(self, src_dir, dst_dir, names):
+ payload = json.dumps({
+ "src_dir": src_dir,
+ "dst_dir": dst_dir,
+ "names": names
+ })
+ response = requests.post(f"{self.url}/fs/move", data=payload, headers=self.headers)
+ return response.json()
+ def remove_files_or_folders(self, dir_path, names):
+ """删除指定目录下的文件或文件夹"""
+ payload = {
+ "dir": dir_path,
+ "names": names
+ }
+ response = requests.post(
+ f'{self.url}/fs/remove',
+ headers=self.headers,
+ json=payload
+ )
+ return response.json()
+ def remove_empty_directory(self, src_dir):
+ """删除空文件夹"""
+ payload = {
+ "src_dir": src_dir
+ }
+ response = requests.post(
+ f'{self.url}/fs/remove_empty_director',
+ headers=self.headers,
+ json=payload
+ )
+ return response.json()
+ def recursive_collect_contents(self,
+ remote_download_path,
+ home_download_path,
+ src_dir=None,
+ dest_path=None,
+ current_sub_path=''):
+ contents = []
+ file_list = self.list_directory(remote_download_path)
+ if file_list['data']['total'] == 0:
+ return []
+ for file_info in file_list['data']['content']:
+ full_path = os.path.join(remote_download_path, file_info['name']).replace('\\', '/')
+ local_download_path = ''
+ new_dest_path = ''
+ new_src_dir = ''
+ if home_download_path is not None:
+ local_download_path = os.path.join(home_download_path, current_sub_path, file_info['name']).replace(
+ '\\',
+ '/')
+ if dest_path is not None:
+ new_dest_path = os.path.join(dest_path, current_sub_path).replace('\\', '/')
+ if src_dir is not None:
+ new_src_dir = os.path.join(src_dir, current_sub_path).replace('\\', '/')
+ item = {
+ 'name': file_info['name'],
+ 'is_dir': file_info['is_dir'],
+ 'path': full_path,
+ 'downloads_path': local_download_path,
+ 'src_dir': new_src_dir,
+ 'dst_dir': new_dest_path
+ }
+ contents.append(item)
+ if file_info['is_dir']:
+ new_sub_path = os.path.join(current_sub_path, file_info['name'])
+ sub_contents = self.recursive_collect_contents(full_path,
+ home_download_path,
+ src_dir,
+ dest_path,
+ new_sub_path)
+ contents.extend(sub_contents)
+ return contents
+ def copy_files(self, local_json_path, is_debug=False):
+ """执行拷贝文件"""
+ with open(local_json_path, 'r', encoding='utf-8') as f:
+ directory_contents = json.load(f)
+ for item in directory_contents:
+ if not item['is_dir']:
+ file_name = item['name']
+ original_path = item['src_dir']
+ des_path = item['dst_dir']
+ if is_debug:
+ logging.info(f"Debug mode: Copy {file_name}")
+ else:
+ self.copy_file(original_path, des_path, [file_name])
+ logging.info(
+ f"Copied: {file_name} from {original_path} to {des_path}")
+ def save_directory_contents(self,
+ remote_download_path,
+ local_download_path,
+ scy_copy_path,
+ des_copy_path,
+ parent_dir):
+ """获取远程和本地对应的目录结构,并保存为 JSON 文件"""
+ remote_data = self.recursive_collect_contents(
+ remote_download_path,
+ local_download_path
+ )
+ remote_json = os.path.join(parent_dir, 'data', 'remote_data.json')
+ with open(remote_json, 'w', encoding='utf-8') as f:
+ json.dump(remote_data, f, indent=4)
+ return remote_data
+is_running = True
+def move_new_files(src_dir, dest_dir, nas_tools_api):
+ """移动新文件和目录,如果它们在目标目录中不存在"""
+ if not os.path.exists(dest_dir):
+ os.makedirs(dest_dir)
+ for item in os.listdir(src_dir):
+ src_item = os.path.join(src_dir, item)
+ dest_item = os.path.join(dest_dir, item)
+ if os.path.isdir(src_item):
+ if not os.path.exists(dest_item):
+ shutil.move(src_item, dest_item)
+ logging.info(f"Moved directory from {src_item} to {dest_item}")
+ else:
+ move_new_files(src_item, dest_item)
+ elif not item.endswith('.aria2'):
+ if not os.path.exists(dest_item):
+ shutil.move(src_item, dest_item)
+ logging.info(f"Moved file from {src_item} to {dest_item}")
+ logging.info(f"move complete {dest_item} ")
+ logging.info("Notify nas tools synchronization")
+ nas_tools_api.login('admin', 'password')
+ sjson = nas_tools_api.sync_list()
+ directory_ids = [sjson['data']['result']['2']['id'], sjson['data']['result']['3']['id']]
+ for directory_id in directory_ids:
+ nas_tools_api.sync(directory_id)
+ logging.info("Completed checking for new files to move.")
+ else:
+ logging.info(f"File already exists, skipping: {dest_item}")
+def handle_signal(signum, frame):
+ """处理中断信号,优雅地退出循环"""
+ global is_running
+ is_running = False
+ print("Exiting...")
+def has_files_or_directories(directory_path):
+ """
+ 检查指定目录下是否存在文件或文件夹。
+ :param directory_path: 目录的路径
+ :return: 两个布尔值,第一个表示是否存在文件,第二个表示是否存在文件夹。
+ """
+ if not os.path.exists(directory_path):
+ print(f"路径不存在: {directory_path}")
+ return False, False
+ items = os.listdir(directory_path)
+ has_files = any(os.path.isfile(os.path.join(directory_path, item)) for item in items)
+ has_directories = any(os.path.isdir(os.path.join(directory_path, item)) for item in items)
+ return has_files, has_directories
+if __name__ == '__main__':
+ signal.signal(signal.SIGINT, handle_signal)
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+ current_directory = os.path.dirname(os.path.abspath(__file__))
+ config_path = os.path.join(current_directory, 'config', 'config.ini')
+ config = configparser.ConfigParser()
+ config.read(config_path)
+ base_url = config['NSTOOLS']['URL']
+ api_key = config['NSTOOLS']['API_KEY']
+ nas_tools_api = NasToolsClient(base_url, api_key)
+ radar_client = RadarClient(config['RADAR']['URL'],
+ config['RADAR']['API_KEY'])
+ aria2_api = Aria2API(config['ARIA2']['RPC_URL'], config['ARIA2']['RPC_SECRET'])
+ remote_alist_api = AlistAPI(config['REMOTE_ALIST']['API_URL'],
+ download_path = config['ARIA2']['DESTINATION_PATH']
+ destination_path = config['ARIA2']['DES_COPY_PATH']
+ remote_alist_download_path = config['REMOTE_ALIST']['DOWNLOAD_PATH']
+ home_alist_download_path = config['HOME_ALIST']['DOWNLOAD_PATH']
+ home_scy_alist_copy_path = config['HOME_ALIST']['SCY_COPY_PATH']
+ home_des_alist_copy_path = config['HOME_ALIST']['DES_COPY_PATH']
+ already_movie_names = radar_client.get_all_movie_names()
+ current_downloads = {os.path.basename(file.path) for download in aria2_api.get_downloads() for
+ file in download.files if file.selected}
+ download_url = None
+ filename = None
+ while is_running:
+ remote_json_path = remote_alist_api.save_directory_contents(
+ remote_alist_download_path,
+ home_alist_download_path,
+ home_scy_alist_copy_path,
+ home_des_alist_copy_path,
+ current_directory)
+ for home_item in remote_json_path:
+ if home_item['name'] in current_downloads:
+ continue
+ if not home_item['is_dir']:
+ cent_data = remote_alist_api.get_file_or_directory_info(home_item['path'])
+ if cent_data:
+ logging.info(f"name {home_item['name']}")
+ download_url = aria2_api.add_url(cent_data['data']['raw_url'])
+ logging.info(f"Started to download {cent_data['data']['raw_url']}")
+ current_downloads.add(home_item['name'])
+ else:
+ logging.info("File not found")
+ if download_url is not None:
+ while not download_url.is_complete:
+ download_url.update()
+ print("下载中...")
+ time.sleep(10)
+ if download_url.is_complete:
+ print("下载完成")
+ for file in download_url.files:
+ filePath = file.path
+ print(filePath)
+ filename = os.path.basename(file.path)
+ print(filename)
+ already_movie_names = radar_client.get_all_movie_names()
+ for movie_name in already_movie_names:
+ movie_id = radar_client.find_movie_id_by_filename(movie_name)
+ if movie_id:
+ radar_client.delete_movie(movie_id)
+ files_exist, directories_exist = has_files_or_directories(download_path)
+ if files_exist or directories_exist:
+ move_new_files(download_path, destination_path + "/movie", nas_tools_api)
+ print("移动文件完成")
+ else:
+ print("没有文件或者目录")
+ time.sleep(10)