123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686 |
- import configparser
- import json
- import logging
- import os
- import shutil
- import signal
- import threading
- import time
- import requests
- from pyarr import RadarrAPI
- from src.models.task_type import TaskType, ActionType
- from aria2p import API, Client
- class NasToolsClient:
- def __init__(self, base_url, api_key=None):
- self.base_url = base_url
- self.api_key = api_key
- self.token = None
- # def login(self, username, password):
- # """
- # 登录以获取Token
- # """
- # url = f'{self.base_url}/api/v1/user/login'
- # data = {'username': username, 'password': password}
- # headers = {'Content-Type': 'application/x-www-form-urlencoded'}
- # response = requests.post(url, data=data, headers=headers)
- # if response.status_code == 200:
- # self.token = response.json().get('token')
- # return self.token
- # else:
- # raise Exception("Login failed with status code: " + str(response.status_code))
- def login(self, username, password):
- """登录并获取令牌"""
- url = f'{self.base_url}/api/v1/user/login'
- data = {
- "username": username,
- "password": password
- }
- headers = {'Content-Type': 'application/x-www-form-urlencoded'}
- response = requests.post(url, data=data, headers=headers)
- if response.status_code == 200:
- self.token = response.json().get("data", {}).get("token")
- logging.info(self.token)
- logging.info(self.api_key)
- return self.token
- else:
- raise Exception("Failed to login:", response.text)
- def get(self, endpoint):
- """执行带有令牌的 GET 请求"""
- url = f"{self.base_url}{endpoint}"
- headers = {
- "Authorization": f"Bearer {self.token}" if self.token else f"ApiKey {self.api_key}"
- }
- response = requests.get(url, headers=headers)
- if response.status_code == 200:
- return response.json()
- else:
- raise Exception("Failed to get data:", response.text)
- def sync_list(self):
- """
- 同步目录
- """
- url = f'{self.base_url}/api/v1/sync/directory/list'
- # {"Bearer Auth": {"type": "apiKey", "name": "Authorization", "in": "header"}}
- headers = {
- 'accept': 'application/json',
- 'Content-Type': 'application/x-www-form-urlencoded',
- "Authorization": f"{self.token}",
- }
- logging.info(f"headers: {headers}")
- params = {
- 'apikey': self.api_key
- }
- response = requests.post(url, params=params, headers=headers)
- logging.info(response.json())
- return response.json()
- def sync(self, sid):
- """
- 同步目录 {cmd: "run_directory_sync", data: {sid: []}}
- """
- url = f'{self.base_url}/api/v1/sync/directory/run'
- # {"Bearer Auth": {"type": "apiKey", "name": "Authorization", "in": "header"}}
- headers = {
- 'accept': 'application/json',
- 'Content-Type': 'application/x-www-form-urlencoded',
- "Authorization": f"{self.token}",
- }
- logging.info(f"headers: {headers}")
- params = {
- 'apikey': self.api_key,
- 'sid': sid
- }
- response = requests.get(url, params=params, headers=headers)
- logging.info(response.json())
- return response.json()
- def run_service(self, service_name):
- """
- 运行指定的服务
- """
- url = f'{self.base_url}/api/v1/service/run'
- # {"Bearer Auth": {"type": "apiKey", "name": "Authorization", "in": "header"}}
- headers = {
- 'accept': 'application/json',
- 'Content-Type': 'application/x-www-form-urlencoded',
- 'security': self.api_key,
- "Authorization": f"{self.token}",
- }
- logging.info(f"headers: {headers}")
- payload = {'item': service_name}
- response = requests.post(url, data=payload, headers=headers)
- logging.info(response.json())
- return response.json()
- class Aria2API:
- def __init__(self, url, secret):
- self.aria2_client = Client(url, secret=secret)
- self.aria2_api = API(self.aria2_client)
- self.is_running = True
- self.check_interval = 10
- self.monitor_thread = None
- def remove(self, download):
- return self.aria2_api.remove(download)
- def get_downloads(self):
- # 返回所有下载信息的列表
- return self.aria2_api.get_downloads()
- def add_url(self, url, options=None):
- """
- 将下载链接添加到 Aria2。
- :param url: 下载链接
- :param options: Aria2下载选项,例如下载目录
- """
- try:
- # 添加下载链接到 Aria2
- download = self.aria2_api.add_uris([url], options=options if options else {})
- return download
- except Exception as e:
- print(f"Error adding URL to Aria2: {e}")
- return None
- def monitor_aria2_and_delete(self):
- logging.info('Start remote Aria2 download is_complete monitoring')
- while self.is_running:
- # 获取 Aria2 当前的下载列表
- downloads = self.aria2_api.get_downloads()
- for download in downloads:
- if download.is_complete:
- pass
- # movie_name = self.radar_client.get_all_movie_names()
- # for file_name in movie_name:
- # # logging.info(f"Download completed: {file_name}")
- # # 处理每个下载完成的文件
- # # 调用方法并获取返回的电影 ID
- # movie_id = self.radar_client.find_movie_id_by_filename(file_name)
- #
- # # 打印结果
- # if movie_id is not None:
- # print(f"Found movie ID for '{file_name}': {movie_id}")
- # else:
- # print(f"No movie found for '{file_name}'")
- time.sleep(self.check_interval)
- def start_monitoring(self):
- if not self.is_running:
- self.is_running = True
- self.monitor_thread = threading.Thread(target=self.monitor_aria2_and_delete)
- self.monitor_thread.start()
- def stop_monitoring(self):
- self.is_running = False
- if self.monitor_thread:
- self.monitor_thread.join()
- def construct_path(base_path, sub_path, file_name):
- # 确保路径部分不包含反斜杠
- base_path = base_path.replace('\\', '/')
- sub_path = sub_path.replace('\\', '/')
- file_name = file_name.replace('\\', '/')
- # 使用 os.path.join 构建整个路径
- return os.path.join(base_path, sub_path, file_name)
- class RadarClient:
- def __init__(self, url, key):
- self.url = url
- self.api_key = key
- self.client = RadarrAPI(self.url, self.api_key)
- def get_all_movies(self):
- return self.client.get_movie()
- def delete_movie(self, movie_id):
- return self.client.del_movie(movie_id, delete_files=True)
- def save_movies_to_json(self, file_name='data/movies.json'):
- movies = self.get_all_movies()
- with open(file_name, 'w') as f:
- json.dump(movies, f, indent=4)
- print(f"Movies saved to {file_name}")
- def get_already_movies(self):
- """获取全部已经跟踪已经处理完成的电影"""
- already_processed = set() # 用于跟踪已处理的电影
- movies = self.get_all_movies()
- for movie in movies:
- if 'movieFile' in movie and movie['movieFile'] and movie['id'] not in already_processed:
- already_processed.add(movie['id'])
- return already_processed
- def continuous_monitoring(self, check_interval=60, custom_action=None):
- """
- Continuously monitor movies and perform a custom action if movieFile exists.
- check_interval: Time in seconds between checks
- custom_action: Function to be called for each downloaded movie
- """
- already_processed = set() # 用于跟踪已处理的电影
- while True:
- movies = self.get_all_movies()
- for movie in movies:
- if 'movieFile' in movie and movie['movieFile'] and movie['id'] not in already_processed:
- custom_action(movie)
- already_processed.add(movie['id'])
- time.sleep(check_interval)
- def find_movie_id_by_filename(self, filename):
- """根据文件名搜索并返回已完成下载的电影的 ID"""
- try:
- movies = self.get_all_movies()
- for movie in movies:
- # 检查电影是否已下载
- if 'movieFile' in movie and movie['movieFile']:
- movie_file_name = movie['movieFile']['relativePath']
- # 检查文件名是否匹配
- if filename in movie_file_name:
- return movie['id']
- return None
- except Exception as e:
- return None
- def get_all_movie_names(self):
- """获取所有电影名称的集合"""
- movie_names = set()
- movies = self.get_all_movies()
- for movie in movies:
- if 'movieFile' in movie and movie['movieFile']:
- movie_file_name = movie['movieFile']['relativePath']
- movie_names.add(movie_file_name)
- return movie_names
- class AlistAPI:
- def __init__(self, url, username, password):
- self.url = url
- self.headers = {
- 'UserAgent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
- 'Chrome/87.0.4280.88 Safari/537.36',
- 'Content-Type': 'application/json'
- }
- # self.aria2_client = Client(rpc_url, secret=rpc_secret)
- # self.aria2_api = API(self.aria2_client)
- self.login(username, password)
- def login(self, username, password):
- data = {
- 'username': username,
- 'password': password
- }
- response = requests.post(f'{self.url}/auth/login', data=json.dumps(data), headers=self.headers)
- if response.status_code == 200:
- token = response.json()
- self.headers['Authorization'] = token['data']['token']
- else:
- raise Exception('Login failed')
- def get_directory(self, path="", password="", page=1, per_page=0, refresh=False):
- payload = {
- "path": path,
- "password": password,
- "page": page,
- "per_page": per_page,
- "refresh": refresh
- }
- response = requests.post(f'{self.url}/fs/dirs', data=json.dumps(payload), headers=self.headers)
- return response.json()
- def copy_file(self, src_dir, dst_dir, names):
- payload = {
- "src_dir": src_dir,
- "dst_dir": dst_dir,
- "names": names
- }
- response = requests.post(f'{self.url}/fs/copy', data=json.dumps(payload), headers=self.headers)
- return response.json()
- def get_completed_tasks(self, task_type):
- """获取指定任务类型的已完成任务列表"""
- if task_type == TaskType.UPLOAD:
- api_endpoint = '/admin/task/upload/done'
- elif task_type == TaskType.COPY:
- api_endpoint = '/admin/task/copy/done'
- elif task_type == TaskType.ARIA2_DOWNLOAD:
- api_endpoint = '/admin/task/aria2_down/done'
- elif task_type == TaskType.ARIA2_TRANSFER:
- api_endpoint = '/admin/task/aria2_transfer/done'
- elif task_type == TaskType.QBITTORRENT_DOWNLOAD:
- api_endpoint = '/admin/task/qbit_down/done'
- elif task_type == TaskType.QBITTORRENT_TRANSFER:
- api_endpoint = '/admin/task/qbit_transfer/done'
- else:
- raise ValueError("Invalid task type")
- response = requests.get(
- f'{self.url}{api_endpoint}',
- headers=self.headers
- )
- return response.json()
- def copy_directory(self, src_path, dst_path):
- file_list = self.list_directory(src_path)
- if not file_list:
- return
- for file_info in file_list['data']['content']:
- if file_info['is_dir']:
- new_src_path = src_path + "/" + file_info['name']
- new_dst_path = dst_path + "/" + file_info['name']
- # new_src_path = os.path.join(src_path, file_info['name'])
- # new_dst_path = os.path.join(dst_path, file_info['name'])
- print(f"Copying directory: {new_src_path} to {new_dst_path}")
- self.copy_directory(new_src_path, new_dst_path)
- else:
- file_name = file_info['name']
- print(f"Copying file: {src_path}/{file_name} to {dst_path}/{file_name}")
- # 这里原本是调用 self.copy_file,现在改为仅打印信息
- self.copy_file(src_path, dst_path, [file_name])
- def list_directory(self, path, password="", page=1, per_page=0, refresh=False):
- payload = {
- "path": path,
- "password": password,
- "page": page,
- "per_page": per_page,
- "refresh": refresh
- }
- response = requests.post(f'{self.url}/fs/list', data=json.dumps(payload), headers=self.headers)
- return response.json()
- def get_file_or_directory_info(self, path, password="", page=1, per_page=0, refresh=False):
- payload = {
- "path": path,
- "password": password,
- "page": page,
- "per_page": per_page,
- "refresh": refresh
- }
- response = requests.post(f"{self.url}/fs/get", data=json.dumps(payload), headers=self.headers)
- if response.status_code == 200:
- return response.json()
- return None
- def move_file(self, src_dir, dst_dir, names):
- payload = json.dumps({
- "src_dir": src_dir,
- "dst_dir": dst_dir,
- "names": names
- })
- response = requests.post(f"{self.url}/fs/move", data=payload, headers=self.headers)
- return response.json()
- def remove_files_or_folders(self, dir_path, names):
- """删除指定目录下的文件或文件夹"""
- payload = {
- "dir": dir_path,
- "names": names
- }
- response = requests.post(
- f'{self.url}/fs/remove',
- headers=self.headers,
- json=payload
- )
- return response.json()
- def remove_empty_directory(self, src_dir):
- """删除空文件夹"""
- payload = {
- "src_dir": src_dir
- }
- response = requests.post(
- f'{self.url}/fs/remove_empty_director',
- headers=self.headers,
- json=payload
- )
- return response.json()
- def recursive_collect_contents(self,
- remote_download_path,
- home_download_path,
- src_dir=None,
- dest_path=None,
- current_sub_path=''):
- contents = []
- file_list = self.list_directory(remote_download_path)
- # file_info_json = json.dumps(file_list, indent=4)
- # logging.info(f'file_info_json: {file_info_json}')
- if file_list['data']['total'] == 0:
- return []
- for file_info in file_list['data']['content']:
- # 拼接完整的远程路径
- full_path = os.path.join(remote_download_path, file_info['name']).replace('\\', '/')
- # 初始化本地下载路径和复制/移动目的地路径
- local_download_path = ''
- new_dest_path = ''
- new_src_dir = ''
- # 根据条件构建本地下载路径和复制/移动目的地路径
- if home_download_path is not None:
- local_download_path = os.path.join(home_download_path, current_sub_path, file_info['name']).replace(
- '\\',
- '/')
- if dest_path is not None:
- new_dest_path = os.path.join(dest_path, current_sub_path).replace('\\', '/')
- if src_dir is not None:
- new_src_dir = os.path.join(src_dir, current_sub_path).replace('\\', '/')
- item = {
- 'name': file_info['name'],
- 'is_dir': file_info['is_dir'],
- 'path': full_path, # 存储完整的远程路径
- 'downloads_path': local_download_path,
- 'src_dir': new_src_dir,
- 'dst_dir': new_dest_path
- }
- contents.append(item)
- if file_info['is_dir']:
- # 更新子路径为当前文件夹的路径
- new_sub_path = os.path.join(current_sub_path, file_info['name'])
- sub_contents = self.recursive_collect_contents(full_path,
- home_download_path,
- src_dir,
- dest_path,
- new_sub_path)
- contents.extend(sub_contents)
- return contents
- def copy_files(self, local_json_path, is_debug=False):
- """执行拷贝文件"""
- # 读取本地 JSON 文件
- with open(local_json_path, 'r', encoding='utf-8') as f:
- directory_contents = json.load(f)
- for item in directory_contents:
- if not item['is_dir']:
- file_name = item['name']
- original_path = item['src_dir'] # 获取原始文件路径
- des_path = item['dst_dir'] # 获取原始文件路径
- if is_debug:
- logging.info(f"Debug mode: Copy {file_name}")
- else:
- # 复制文件
- self.copy_file(original_path, des_path, [file_name])
- logging.info(
- f"Copied: {file_name} from {original_path} to {des_path}")
- def save_directory_contents(self,
- remote_download_path,
- local_download_path,
- scy_copy_path,
- des_copy_path,
- parent_dir):
- """获取远程和本地对应的目录结构,并保存为 JSON 文件"""
- # 获取远程目录结构
- remote_data = self.recursive_collect_contents(
- remote_download_path,
- local_download_path
- )
- remote_json = os.path.join(parent_dir, 'data', 'remote_data.json')
- with open(remote_json, 'w', encoding='utf-8') as f:
- json.dump(remote_data, f, indent=4)
- # # 获取本地目录结构
- # home_data = self.home_alist_api.recursive_collect_contents(
- # scy_copy_path, des_copy_path, scy_copy_path, des_copy_path
- # )
- # home_json_path = os.path.join(parent_dir, 'data', self.home_data)
- # with open(home_json_path, 'w', encoding='utf-8') as f:
- # json.dump(home_data, f, indent=4)
- return remote_data
- is_running = True
- def move_new_files(src_dir, dest_dir, nas_tools_api):
- """移动新文件和目录,如果它们在目标目录中不存在"""
- if not os.path.exists(dest_dir):
- os.makedirs(dest_dir)
- for item in os.listdir(src_dir):
- src_item = os.path.join(src_dir, item)
- dest_item = os.path.join(dest_dir, item)
- # 如果是目录
- if os.path.isdir(src_item):
- # 检查目标目录是否存在
- if not os.path.exists(dest_item):
- # 直接移动整个目录
- shutil.move(src_item, dest_item)
- logging.info(f"Moved directory from {src_item} to {dest_item}")
- else:
- # 如果目标目录已存在,递归处理子目录内容
- move_new_files(src_item, dest_item)
- elif not item.endswith('.aria2'): # 检查是否为 .aria2 文件
- # 如果是文件,且不是 .aria2 文件,则进行移动
- if not os.path.exists(dest_item):
- shutil.move(src_item, dest_item)
- logging.info(f"Moved file from {src_item} to {dest_item}")
- # 添加移动完成的提示
- logging.info(f"move complete {dest_item} ")
- logging.info("Notify nas tools synchronization")
- nas_tools_api.login('admin', 'password')
- # self.nas_tools_api.run_service('sync')
- sjson = nas_tools_api.sync_list()
- directory_ids = [sjson['data']['result']['2']['id'], sjson['data']['result']['3']['id']]
- for directory_id in directory_ids:
- nas_tools_api.sync(directory_id)
- logging.info("Completed checking for new files to move.")
- else:
- logging.info(f"File already exists, skipping: {dest_item}")
- def handle_signal(signum, frame):
- """处理中断信号,优雅地退出循环"""
- global is_running
- is_running = False
- print("Exiting...")
- def has_files_or_directories(directory_path):
- """
- 检查指定目录下是否存在文件或文件夹。
- :param directory_path: 目录的路径
- :return: 两个布尔值,第一个表示是否存在文件,第二个表示是否存在文件夹。
- """
- # 检查路径是否存在
- if not os.path.exists(directory_path):
- print(f"路径不存在: {directory_path}")
- return False, False # 路径不存在时返回两个False
- # 获取目录下的所有内容
- items = os.listdir(directory_path)
- # 检查是否有文件或文件夹
- has_files = any(os.path.isfile(os.path.join(directory_path, item)) for item in items)
- has_directories = any(os.path.isdir(os.path.join(directory_path, item)) for item in items)
- return has_files, has_directories
- if __name__ == '__main__':
- # 注册信号处理函数
- signal.signal(signal.SIGINT, handle_signal)
- # 初始化日志和配置
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
- # 获取当前脚本的绝对路径
- current_directory = os.path.dirname(os.path.abspath(__file__))
- # 获取当前脚本所在目录的上级目录
- # parent_directory = os.path.dirname(current_directory)
- # 构建 config.ini 文件的路径
- config_path = os.path.join(current_directory, 'config', 'config.ini')
- # 读取配置文件
- config = configparser.ConfigParser()
- config.read(config_path)
- base_url = config['NSTOOLS']['URL']
- api_key = config['NSTOOLS']['API_KEY']
- nas_tools_api = NasToolsClient(base_url, api_key)
- radar_client = RadarClient(config['RADAR']['URL'],
- config['RADAR']['API_KEY'])
- aria2_api = Aria2API(config['ARIA2']['RPC_URL'], config['ARIA2']['RPC_SECRET'])
- # aria2_api.start_monitoring()
- remote_alist_api = AlistAPI(config['REMOTE_ALIST']['API_URL'],
- config['REMOTE_ALIST']['USERNAME'],
- config['REMOTE_ALIST']['PASSWORD'])
- download_path = config['ARIA2']['DESTINATION_PATH']
- destination_path = config['ARIA2']['DES_COPY_PATH']
- # 获取远程下载目录
- remote_alist_download_path = config['REMOTE_ALIST']['DOWNLOAD_PATH']
- home_alist_download_path = config['HOME_ALIST']['DOWNLOAD_PATH']
- home_scy_alist_copy_path = config['HOME_ALIST']['SCY_COPY_PATH']
- home_des_alist_copy_path = config['HOME_ALIST']['DES_COPY_PATH']
- # 获取已完成电影的文件名集合
- already_movie_names = radar_client.get_all_movie_names()
- # 更新当前下载队列集合
- current_downloads = {os.path.basename(file.path) for download in aria2_api.get_downloads() for
- file in download.files if file.selected}
- download_url = None
- filename = None
- while is_running:
- # 获取远程目录结构
- remote_json_path = remote_alist_api.save_directory_contents(
- remote_alist_download_path,
- home_alist_download_path,
- home_scy_alist_copy_path,
- home_des_alist_copy_path,
- current_directory)
- for home_item in remote_json_path:
- # 如果当前下载集合中已经包含了这个文件名,跳过这次循环
- if home_item['name'] in current_downloads:
- continue
- if not home_item['is_dir']:
- cent_data = remote_alist_api.get_file_or_directory_info(home_item['path'])
- if cent_data:
- logging.info(f"name {home_item['name']}")
- download_url = aria2_api.add_url(cent_data['data']['raw_url'])
- logging.info(f"Started to download {cent_data['data']['raw_url']}")
- current_downloads.add(home_item['name'])
- else:
- logging.info("File not found")
- if download_url is not None:
- # 检查下载是否完成
- while not download_url.is_complete:
- download_url.update() # 更新下载状态
- print("下载中...")
- time.sleep(10) # 等待10秒再次检查
- # 下载完成后移动文件
- if download_url.is_complete:
- print("下载完成")
- for file in download_url.files:
- filePath = file.path
- print(filePath)
- filename = os.path.basename(file.path)
- print(filename)
- already_movie_names = radar_client.get_all_movie_names()
- for movie_name in already_movie_names:
- movie_id = radar_client.find_movie_id_by_filename(movie_name)
- if movie_id:
- radar_client.delete_movie(movie_id)
- files_exist, directories_exist = has_files_or_directories(download_path)
- if files_exist or directories_exist:
- move_new_files(download_path, destination_path + "/movie", nas_tools_api)
- print("移动文件完成")
- else:
- print("没有文件或者目录")
- time.sleep(10) # 模拟任务执行
- # aria2_api.stop_monitoring()
- # print("停止监控")
|