171 lines
5.2 KiB
Python
171 lines
5.2 KiB
Python
import asyncio
|
|
import datetime
|
|
import os
|
|
from typing import Dict, List, Literal
|
|
import uuid
|
|
import re
|
|
import subprocess
|
|
|
|
from scripts.log_parser import parse_log_line
|
|
from scripts.bin import bin_file_path
|
|
from scripts.logger import logger
|
|
from scripts.tool import is_dev
|
|
|
|
|
|
class BaiduyunTask:
|
|
def __init__(
|
|
self,
|
|
subprocess: asyncio.subprocess.Process,
|
|
type: Literal["upload", "download"],
|
|
send_dirs: List[str],
|
|
recv_dir: str,
|
|
):
|
|
self.subprocess = subprocess
|
|
self.id = str(uuid.uuid4())
|
|
self.start_time = datetime.datetime.now()
|
|
self.running = True
|
|
self.logs = []
|
|
self.raw_logs = []
|
|
self.files_state = {}
|
|
self.type = type
|
|
self.send_dirs = send_dirs
|
|
self.recv_dir = recv_dir
|
|
self.n_files = 0
|
|
self.n_success_files = 0
|
|
self.n_failed_files = 0
|
|
self.canceled = False
|
|
|
|
def start_time_human_readable(self):
|
|
return self.start_time.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
def update_state(self):
|
|
self.n_files = 0
|
|
self.n_success_files = 0
|
|
self.n_failed_files = 0
|
|
for key in self.files_state:
|
|
status = self.files_state[key]["status"]
|
|
self.n_files += 1
|
|
if status == "upload-success" or status == "file-skipped" or status == "download-success":
|
|
self.n_success_files += 1
|
|
elif status == "upload-failed":
|
|
self.n_failed_files += 1
|
|
self.running = False if self.canceled else not isinstance(self.subprocess.returncode, int)
|
|
|
|
def append_log(self, parsed_log, raw_log):
|
|
self.raw_logs.append(raw_log)
|
|
self.logs.append(parsed_log)
|
|
if isinstance(parsed_log, dict) and "id" in parsed_log:
|
|
self.files_state[parsed_log["id"]] = parsed_log
|
|
|
|
def get_summary(task):
|
|
return {
|
|
"type": task.type,
|
|
"id": task.id,
|
|
"running": task.running,
|
|
"start_time": task.start_time_human_readable(),
|
|
"recv_dir": task.recv_dir,
|
|
"send_dirs": task.send_dirs,
|
|
"n_files": task.n_files,
|
|
"n_failed_files": task.n_failed_files,
|
|
"n_success_files": task.n_success_files,
|
|
"canceled": task.canceled
|
|
}
|
|
|
|
#
|
|
async def get_tick(self):
|
|
tasks = []
|
|
while True:
|
|
try:
|
|
line = await asyncio.wait_for(
|
|
self.subprocess.stdout.readline(), timeout=0.1
|
|
)
|
|
line = line.decode()
|
|
logger.info(line)
|
|
if not line:
|
|
break
|
|
if line.isspace():
|
|
continue
|
|
info = parse_log_line(line, self.type == "upload")
|
|
if is_dev and line.find("md5") != -1:
|
|
line = ''
|
|
tasks.append({"info": info, "log": line})
|
|
self.append_log(info, line)
|
|
except asyncio.TimeoutError:
|
|
break
|
|
self.update_state()
|
|
return {"tasks": tasks, "task_summary": self.get_summary()}
|
|
|
|
async def cancel(self):
|
|
self.subprocess.terminate()
|
|
self.canceled = True
|
|
last_tick = await self.get_tick()
|
|
return last_tick
|
|
|
|
|
|
|
|
@staticmethod
|
|
async def create(
|
|
type: Literal["upload", "download"], send_dirs: List[str], recv_dir: str
|
|
):
|
|
if type == "upload" :
|
|
process = await asyncio.create_subprocess_exec(
|
|
bin_file_path,
|
|
type,
|
|
*process_path_arr(send_dirs),
|
|
parse_and_replace_time(recv_dir),
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
elif type == "download":
|
|
process = await asyncio.create_subprocess_exec(
|
|
bin_file_path,
|
|
type,
|
|
*process_path_arr(send_dirs),
|
|
"--saveto",
|
|
parse_and_replace_time(recv_dir),
|
|
"-p",
|
|
"4",
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
else:
|
|
raise Exception("unknown task type")
|
|
task = BaiduyunTask(process, type, send_dirs, recv_dir)
|
|
task.update_state()
|
|
baiduyun_task_cache[task.id] = task
|
|
return task
|
|
|
|
@staticmethod
|
|
def get_by_id(id: str):
|
|
return baiduyun_task_cache.get(id)
|
|
|
|
@staticmethod
|
|
def get_cache():
|
|
return baiduyun_task_cache
|
|
|
|
|
|
baiduyun_task_cache: Dict[str, BaiduyunTask] = {}
|
|
|
|
def process_path_arr(path_arr):
|
|
"""
|
|
处理路径,顺便替换模板
|
|
如果是绝对路径直接返回,
|
|
如果是相对路径则与当前工作目录拼接返回。
|
|
"""
|
|
cwd = os.getcwd()
|
|
result = []
|
|
for path in path_arr:
|
|
if os.path.isabs(path):
|
|
result.append(path)
|
|
else:
|
|
result.append(os.path.join(cwd, path))
|
|
return list(map(parse_and_replace_time, result))
|
|
|
|
def parse_and_replace_time(s):
|
|
pattern = r'<#(.+?)#>'
|
|
matches = re.findall(pattern, s)
|
|
for match in matches:
|
|
formatted_time = datetime.datetime.now().strftime(match)
|
|
s = s.replace(f'<#{match}#>', formatted_time)
|
|
return s
|