CNB 云原生构建是否支持全部由API调用,如何打造出一个在自己网站上触发并监控构建流程并下载产物的流水线(Github Actions已实现)#8
最终实现的效果如图:
Github Action 版本 相关代码
import secrets from pywebio.input import * from pywebio.output import * from pywebio import start_server, session from datetime import datetime, timedelta, timezone from db import sqlPool, r # 引入 redis 连接 import requests import json import time import os GITHUB_TOKEN = "" # 请替换为您的 GitHub Token REPO_OWNER = "" REPO_NAME = "" def check_key(key): conn = sqlPool.connection() c = conn.cursor() c.execute('SELECT id, bind_domain, status, expiration_time FROM activation WHERE `key` = %s', (key,)) result = c.fetchone() conn.close() return result def update_domain(key, domain): conn = sqlPool.connection() c = conn.cursor() c.execute('UPDATE activation SET bind_domain = %s, update_time = %s WHERE `key` = %s', (domain, datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))), key)) conn.commit() conn.close() def get_build_logs(key_id): conn = sqlPool.connection() c = conn.cursor() c.execute('SELECT * FROM build_logs WHERE key_id = %s ORDER BY create_time DESC LIMIT 12', (key_id,)) logs = c.fetchall() conn.close() return logs def fetch_commits(page=1): url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/commits" headers = { "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" } params = { "sha": "main", "since": "2024-10-11T08:00:00Z", "page": page, "per_page": 24 } response = requests.get(url, headers=headers, params=params) if response.status_code != 200: print(f"Failed to fetch commits: {response.status_code} - {response.text}") return [] try: commits = response.json() except json.JSONDecodeError as e: print(f"Error decoding JSON: {e}") return [] if not isinstance(commits, list): print(f"Unexpected response format: {commits}") return [] filtered_commits = [commit for commit in commits if ('[' in commit['commit']['message'] or '(' in commit['commit']['message']) and (']' in commit['commit']['message'] or ')' in commit['commit']['message']) and ':' in commit['commit']['message']] return filtered_commits def trigger_build(domain, commit_hash): url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/dispatches" headers = { "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" } data = { "event_type": "buildArtifacts", "client_payload": { "bind_domain": domain, "commit_hash": commit_hash } } response = requests.post(url, headers=headers, data=json.dumps(data)) print(response.text) return response.status_code == 204 def get_latest_workflow_run(): url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/actions/runs" headers = { "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" } response = requests.get(url, headers=headers) runs = response.json().get('workflow_runs', []) if runs: return runs[0] return None def get_run_jobs(run_id): url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/actions/runs/{run_id}/jobs" headers = { "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" } response = requests.get(url, headers=headers) return response.json().get('jobs', []) def format_time(time_str): if not time_str or time_str == 'N/A': return 'N/A' dt = datetime.strptime(time_str, '%Y-%m-%dT%H:%M:%SZ') dt = dt.replace(tzinfo=timezone.utc).astimezone(timezone(timedelta(hours=8))) return dt.strftime('%Y-%m-%d %H:%M:%S') def monitor_workflow(run_id, domain, commit): put_markdown("## 正在监控构建进度...") while True: jobs = get_run_jobs(run_id) if jobs: with use_scope('jobs', clear=True): for job in jobs: put_markdown(f"### 任务: {job['name']} (ID: {job['id']})") steps_table = [['步骤序号', '步骤名称', '状态', '开始时间', '结束时间']] for step in job.get('steps', []): if "Post" in step['name']: continue if step['status'] == 'completed': status = '成功' if step['conclusion'] == 'success' else '失败' elif step['status'] == 'in_progress': status = '进行中' elif step['status'] == 'pending': status = '未开始' else: status = '失败' status_tag = put_html( f'<span style="background-color: {"#28a745" if status == "成功" else ("#007bff" if status == "进行中" else ("#ffc107" if status == "未开始" else "#dc3545"))}; color: white; padding: 2px 6px; border-radius: 3px;">{status}</span>') steps_table.append([ step['number'], step['name'], status_tag, format_time(step.get('started_at', 'N/A')), format_time(step.get('completed_at', 'N/A')) ]) put_table(steps_table) if all(job['status'] == 'completed' for job in jobs): break else: time.sleep(5) else: time.sleep(5) put_success("构建完成!") return jobs def get_artifacts(run_id): url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/actions/runs/{run_id}/artifacts" headers = { "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" } response = requests.get(url, headers=headers) return response.json().get('artifacts', []) def download_artifact(artifact, domain, job_id): artifact_url = artifact['archive_download_url'] response = requests.get(artifact_url, headers={ "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" }) if response.status_code == 200: os.makedirs('download', exist_ok=True) filename = f"{artifact['name']}-{domain}-{job_id}.zip" filepath = os.path.join('download', filename) with open(filepath, 'wb') as f: f.write(response.content) put_file(filename, open(filepath, 'rb').read(), scope='build_info') return True else: put_error('下载失败,可能是下载链接已过期。') return False def download_artifact_by_url(download_url, build_time, domain, job_id, artifact_name): toast('正在取回构建文件,请滑至底部查看!', color='success') if datetime.now() - build_time > timedelta(days=3): put_error('构建产物已失效,请重新构建。', scope='build_info') return put_text('正在从服务器取回构建...', scope='build_info') response = requests.get(download_url, headers={ "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" }) if response.status_code == 200: os.makedirs('download', exist_ok=True) filename = f"{artifact_name}-{domain}-{job_id}.zip" filepath = os.path.join('download', filename) with open(filepath, 'wb') as f: f.write(response.content) put_file(filename, open(filepath, 'rb').read(), scope='build_info') put_success('从服务器取回构建成功,请点击文件名下载。', scope='build_info') put_markdown(''' ### 构建提示 - 若上传解压后访问网站出现卡在首屏的情况, 重新构建即可解决 ''', scope='build_info') else: put_error('下载失败,可能是下载链接已过期。', scope='build_info') def is_second_level_domain(domain): parts = domain.split('.') return len(parts) == 2 def view_build_history(key_id): logs = get_build_logs(key_id) if logs: table = [['时间', '域名', 'Commit 信息', '状态', '操作']] for log in logs: build_time = log[10] build_time_str = build_time.strftime('%Y-%m-%d %H:%M:%S') if isinstance(build_time, datetime) else str(build_time) domain = log[2] commit_hash = log[3] commit_msg = log[4] if len(commit_msg) > 20: commit_msg_display = put_html( '<span id="hash_{}">{}</span>' '<script>' 'tippy("#hash_{}", {{' ' content: `{}`,'.format( commit_hash, commit_msg[:20] + "...", commit_hash, commit_msg.replace('\n', '<br>') if '\n' in commit_msg else commit_msg ) + ' allowHTML: true' '}});' '</script>' ) else: commit_msg_display = commit_msg status = log[5] if status: status_tag = put_html( '<span style="background-color: #28a745; color: white; padding: 2px 6px; border-radius: 3px;">成功</span>') else: status_tag = put_html( '<span style="background-color: #dc3545; color: white; padding: 2px 6px; border-radius: 3px;">失败</span>') download_url = log[8] job_id = log[5] artifact_name = log[9] if download_url: def make_download_handler(url, build_time, domain, job_id, artifact_name): return lambda: download_artifact_by_url(url, build_time, domain, job_id, artifact_name) action = put_button('下载', onclick=make_download_handler(download_url, build_time, domain, job_id, artifact_name), small=True) else: action = '无' table.append([build_time_str, domain, commit_msg_display, status_tag, action]) with use_scope('table', clear=True): put_markdown("## 历史生成记录") put_table(table) with use_scope('build_info', clear=True): pass else: with use_scope('table', clear=True): put_markdown("## 历史生成记录") put_text('暂无历史记录') def build(commit, domain, key_id, key): global build_in_progress if build_in_progress: toast("有项目正在构建中, 请构建完成后再执行新构建") return start_time = datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))) today = datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))).strftime('%Y%m%d') redis_key = f"build_count:{key}:{today}" count = r.get(redis_key) if count and int(count) >= 6: toast("您今天的构建次数已达上限,请明天再试。", color='warning') put_error("您今天的构建次数已达上限,请明天再试。") return r.incr(redis_key) r.expireat(redis_key, datetime.now().replace(hour=23, minute=59, second=59, microsecond=0)) build_in_progress = True # 添加绿色提示 toast('代码正在自动构建,请滑至底部查看,构建完成之后即可下载!', color='success') try: with use_scope('build_info', clear=True): put_text(f"选择构建: {commit['sha'][:7]}: {commit['commit']['message']}") put_text(f"构建域名: {domain}") put_text("提交构建中...") if trigger_build(domain, commit['sha']): put_success("构建任务已触发") time.sleep(3) run = get_latest_workflow_run() if run: put_text(f"构建 jobs ID: {run['id']}") jobs = monitor_workflow(run['id'], domain, commit) artifacts = get_artifacts(run['id']) while not artifacts: time.sleep(3) put_text('未获取构建产物, 重新获取中...', scope='build_info') artifacts = get_artifacts(run['id']) artifact = artifacts[0] artifact_name = artifact['name'] def download_artifact_with_check(): if datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))) - start_time > timedelta( days=3): put_error('构建产物已失效,请重新构建。', scope='build_info') return put_text('正在从服务器取回构建...', scope='build_info') job_id = jobs[0]['id'] if jobs else 'unknown' success = download_artifact(artifact, domain, job_id) if success: put_success('从服务器取回构建成功,请点击文件名下载。', scope='build_info') put_markdown(''' ### 构建提示 - 若上传解压后访问网站出现卡在首屏的情况, 重新构建即可解决 ''', scope='build_info') put_button(f"下载安装包", onclick=download_artifact_with_check, scope='build_info') conn = sqlPool.connection() c = conn.cursor() c.execute(''' INSERT INTO build_logs (key_id, build_domain, commit_hash, commit_msg, status, download_url, log, create_time, update_time, job_id, artifact_name) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ''', ( key_id, domain, commit['sha'], commit['commit']['message'], True, artifact['archive_download_url'] if artifacts else None, json.dumps(jobs) if jobs else None, start_time, datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))), jobs[0]['id'] if jobs else None, artifact_name )) conn.commit() conn.close() put_markdown("## 构建完成") put_text(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}") put_text( f"结束时间:{datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))).strftime('%Y-%m-%d %H:%M:%S')}") else: put_error("无法获取构建运行信息,请稍后重试") else: put_error("触发构建失败,请稍后重试") finally: build_in_progress = False def main(): global build_in_progress build_in_progress = False js_urls = [ "https://npm-private.onmicrosoft.cn/popper.js@1", "https://npm-private.onmicrosoft.cn/tippy.js@5" ] script_tags = """ <script> function loadScript(url) { return new Promise((resolve, reject) => { const script = document.createElement('script'); script.src = url; script.onload = resolve; script.onerror = reject; document.head.appendChild(script); }); } Promise.all([ %s ]).catch(error => console.error('Script loading error:', error)); </script> """ % ',\n '.join(f"loadScript('{url}')" for url in js_urls) put_html(script_tags) put_markdown("#系统最新版本前端代码安装包下载平台") put_markdown(""" ## 构建说明: - 一个密钥只能绑定一个域名 - 绑定根域名(例如 example.com)时,赠送一个 www.example.com 域名 """) while True: key = input("请输入您的密钥:", type=TEXT) key_info = check_key(key) if not key_info: put_error("无效的密钥,请重新输入。") continue key_id, bind_domain, status, expiration_time = key_info if not status: put_error("此密钥已被禁用。") continue # 将 expiration_time 转换为时区感知的 datetime 对象 if expiration_time: expiration_time = expiration_time.replace(tzinfo=timezone.utc).astimezone(timezone(timedelta(hours=8))) if expiration_time and expiration_time < datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))): put_error("此密钥已过期。") continue if not bind_domain: domain = input("请输入要绑定的域名:", type=TEXT) update_domain(key, domain) put_success(f"域名 {domain} 已成功绑定到您的密钥。") if is_second_level_domain(domain): put_info(f"您还可以使用 www.{domain}") bind_domain = domain else: put_success(f"您的密钥已绑定域名: {bind_domain}") if is_second_level_domain(bind_domain): put_info(f"您还可以使用 www.{bind_domain}") break page = 1 viewing_history = False while True: clear('table') clear('jobs') clear('build_info') with use_scope('table', clear=True): if viewing_history: view_build_history(key_id) else: put_markdown("## 版本列表:") commits = fetch_commits(page) if not commits: put_info("没有更多的 commits 了") if page > 1: page -= 1 else: break continue table = [['版本号', '更新信息', '上线时间', '操作']] for commit in commits: sha_short = commit['sha'][:7] message = commit['commit']['message'] if len(message) > 26: message_display = put_html( '<span id="hash_{}">{}</span>' '<script>' 'tippy("#hash_{}", {{' ' content: `{}`,'.format( sha_short, message[:26] + "...", sha_short, message.replace('\n', '<br>') if '\n' in message else message ) + ' allowHTML: true' '});' '</script>' ) else: message_display = message commit_time = datetime.strptime(commit['commit']['author']['date'], '%Y-%m-%dT%H:%M:%SZ') commit_time = commit_time.replace(tzinfo=timezone.utc).astimezone(timezone(timedelta(hours=8))) commit_time_str = commit_time.strftime('%Y-%m-%d %H:%M:%S') def make_build_handler(c): return lambda c=c: build(c, bind_domain, key_id, key) action_button = put_button('下载', onclick=make_build_handler(commit), small=True) table.append([ sha_short, message_display, commit_time_str, action_button ]) put_table(table) if build_in_progress and viewing_history: toast("构建中,请勿切换模式!", duration=3, color='warning') viewing_history = False page = 1 continue else: action_buttons = [] if not viewing_history: if page > 1: action_buttons.append(('查看上一页', 'prev')) action_buttons.append(('查看下一页', 'next')) action_buttons.append(('查看历史生成记录' if not viewing_history else '选择 Commit 构建', 'toggle_view')) if action_buttons: action = actions('请选择操作:', [btn[0] for btn in action_buttons]) action_map = {btn[0]: btn[1] for btn in action_buttons} if action_map[action] == 'next': page += 1 elif action_map[action] == 'prev': if page > 1: page -= 1 elif action_map[action] == 'toggle_view': viewing_history = not viewing_history page = 1 else: break else: time.sleep(1) put_markdown("感谢使用 在线构建平台!") if __name__ == '__main__': start_server(main, port=9003, host='', debug=False, cdn=False, static_dir=None, allowed_origins=None, check_origin=None, auto_open_webbrowser=False, session_expire_seconds=None, session_cleanup_interval=None, max_payload_size='200M', protocol='http',)
相关用到的 API
以及相关的流水线 Action 代码
name: 构建并上传制品 on: workflow_dispatch: repository_dispatch: types: - buildArtifacts jobs: build: runs-on: ubuntu-latest steps: - name: 从仓库检出代码(切换至指定分支) uses: actions/checkout@v3 with: ref: ${{ github.event.client_payload.commit_hash || github.ref }} - name: 设置 Node.js 环境 uses: actions/setup-node@v3 with: node-version: '20.14.0' - name: 安装 pnpm run: npm install -g pnpm - name: 安装依赖项 run: pnpm install - name: 构建项目 (生产模式) env: BIND_DOMAIN: ${{ github.event.client_payload.bind_domain }} run: pnpm build - name: 上传构建产物 uses: actions/upload-artifact@v4 with: name: build-artifacts path: | dist/**
如果可以实现的话,希望可以给到相关用到的API以及文档(希望可以一一对应),并且 迁移到CNB之后yaml又应该如何编写呢
haorwen/cnb-build-platform · Cloud Native Build按照你的原本的代码写了一份,直接参考里面的api就可以了,cnb.yml文件的编写也可以参考仓库里
最终实现的效果如图:
Github Action 版本 相关代码
import secrets from pywebio.input import * from pywebio.output import * from pywebio import start_server, session from datetime import datetime, timedelta, timezone from db import sqlPool, r # 引入 redis 连接 import requests import json import time import os GITHUB_TOKEN = "" # 请替换为您的 GitHub Token REPO_OWNER = "" REPO_NAME = "" def check_key(key): conn = sqlPool.connection() c = conn.cursor() c.execute('SELECT id, bind_domain, status, expiration_time FROM activation WHERE `key` = %s', (key,)) result = c.fetchone() conn.close() return result def update_domain(key, domain): conn = sqlPool.connection() c = conn.cursor() c.execute('UPDATE activation SET bind_domain = %s, update_time = %s WHERE `key` = %s', (domain, datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))), key)) conn.commit() conn.close() def get_build_logs(key_id): conn = sqlPool.connection() c = conn.cursor() c.execute('SELECT * FROM build_logs WHERE key_id = %s ORDER BY create_time DESC LIMIT 12', (key_id,)) logs = c.fetchall() conn.close() return logs def fetch_commits(page=1): url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/commits" headers = { "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" } params = { "sha": "main", "since": "2024-10-11T08:00:00Z", "page": page, "per_page": 24 } response = requests.get(url, headers=headers, params=params) if response.status_code != 200: print(f"Failed to fetch commits: {response.status_code} - {response.text}") return [] try: commits = response.json() except json.JSONDecodeError as e: print(f"Error decoding JSON: {e}") return [] if not isinstance(commits, list): print(f"Unexpected response format: {commits}") return [] filtered_commits = [commit for commit in commits if ('[' in commit['commit']['message'] or '(' in commit['commit']['message']) and (']' in commit['commit']['message'] or ')' in commit['commit']['message']) and ':' in commit['commit']['message']] return filtered_commits def trigger_build(domain, commit_hash): url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/dispatches" headers = { "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" } data = { "event_type": "buildArtifacts", "client_payload": { "bind_domain": domain, "commit_hash": commit_hash } } response = requests.post(url, headers=headers, data=json.dumps(data)) print(response.text) return response.status_code == 204 def get_latest_workflow_run(): url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/actions/runs" headers = { "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" } response = requests.get(url, headers=headers) runs = response.json().get('workflow_runs', []) if runs: return runs[0] return None def get_run_jobs(run_id): url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/actions/runs/{run_id}/jobs" headers = { "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" } response = requests.get(url, headers=headers) return response.json().get('jobs', []) def format_time(time_str): if not time_str or time_str == 'N/A': return 'N/A' dt = datetime.strptime(time_str, '%Y-%m-%dT%H:%M:%SZ') dt = dt.replace(tzinfo=timezone.utc).astimezone(timezone(timedelta(hours=8))) return dt.strftime('%Y-%m-%d %H:%M:%S') def monitor_workflow(run_id, domain, commit): put_markdown("## 正在监控构建进度...") while True: jobs = get_run_jobs(run_id) if jobs: with use_scope('jobs', clear=True): for job in jobs: put_markdown(f"### 任务: {job['name']} (ID: {job['id']})") steps_table = [['步骤序号', '步骤名称', '状态', '开始时间', '结束时间']] for step in job.get('steps', []): if "Post" in step['name']: continue if step['status'] == 'completed': status = '成功' if step['conclusion'] == 'success' else '失败' elif step['status'] == 'in_progress': status = '进行中' elif step['status'] == 'pending': status = '未开始' else: status = '失败' status_tag = put_html( f'<span style="background-color: {"#28a745" if status == "成功" else ("#007bff" if status == "进行中" else ("#ffc107" if status == "未开始" else "#dc3545"))}; color: white; padding: 2px 6px; border-radius: 3px;">{status}</span>') steps_table.append([ step['number'], step['name'], status_tag, format_time(step.get('started_at', 'N/A')), format_time(step.get('completed_at', 'N/A')) ]) put_table(steps_table) if all(job['status'] == 'completed' for job in jobs): break else: time.sleep(5) else: time.sleep(5) put_success("构建完成!") return jobs def get_artifacts(run_id): url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/actions/runs/{run_id}/artifacts" headers = { "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" } response = requests.get(url, headers=headers) return response.json().get('artifacts', []) def download_artifact(artifact, domain, job_id): artifact_url = artifact['archive_download_url'] response = requests.get(artifact_url, headers={ "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" }) if response.status_code == 200: os.makedirs('download', exist_ok=True) filename = f"{artifact['name']}-{domain}-{job_id}.zip" filepath = os.path.join('download', filename) with open(filepath, 'wb') as f: f.write(response.content) put_file(filename, open(filepath, 'rb').read(), scope='build_info') return True else: put_error('下载失败,可能是下载链接已过期。') return False def download_artifact_by_url(download_url, build_time, domain, job_id, artifact_name): toast('正在取回构建文件,请滑至底部查看!', color='success') if datetime.now() - build_time > timedelta(days=3): put_error('构建产物已失效,请重新构建。', scope='build_info') return put_text('正在从服务器取回构建...', scope='build_info') response = requests.get(download_url, headers={ "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" }) if response.status_code == 200: os.makedirs('download', exist_ok=True) filename = f"{artifact_name}-{domain}-{job_id}.zip" filepath = os.path.join('download', filename) with open(filepath, 'wb') as f: f.write(response.content) put_file(filename, open(filepath, 'rb').read(), scope='build_info') put_success('从服务器取回构建成功,请点击文件名下载。', scope='build_info') put_markdown(''' ### 构建提示 - 若上传解压后访问网站出现卡在首屏的情况, 重新构建即可解决 ''', scope='build_info') else: put_error('下载失败,可能是下载链接已过期。', scope='build_info') def is_second_level_domain(domain): parts = domain.split('.') return len(parts) == 2 def view_build_history(key_id): logs = get_build_logs(key_id) if logs: table = [['时间', '域名', 'Commit 信息', '状态', '操作']] for log in logs: build_time = log[10] build_time_str = build_time.strftime('%Y-%m-%d %H:%M:%S') if isinstance(build_time, datetime) else str(build_time) domain = log[2] commit_hash = log[3] commit_msg = log[4] if len(commit_msg) > 20: commit_msg_display = put_html( '<span id="hash_{}">{}</span>' '<script>' 'tippy("#hash_{}", {{' ' content: `{}`,'.format( commit_hash, commit_msg[:20] + "...", commit_hash, commit_msg.replace('\n', '<br>') if '\n' in commit_msg else commit_msg ) + ' allowHTML: true' '}});' '</script>' ) else: commit_msg_display = commit_msg status = log[5] if status: status_tag = put_html( '<span style="background-color: #28a745; color: white; padding: 2px 6px; border-radius: 3px;">成功</span>') else: status_tag = put_html( '<span style="background-color: #dc3545; color: white; padding: 2px 6px; border-radius: 3px;">失败</span>') download_url = log[8] job_id = log[5] artifact_name = log[9] if download_url: def make_download_handler(url, build_time, domain, job_id, artifact_name): return lambda: download_artifact_by_url(url, build_time, domain, job_id, artifact_name) action = put_button('下载', onclick=make_download_handler(download_url, build_time, domain, job_id, artifact_name), small=True) else: action = '无' table.append([build_time_str, domain, commit_msg_display, status_tag, action]) with use_scope('table', clear=True): put_markdown("## 历史生成记录") put_table(table) with use_scope('build_info', clear=True): pass else: with use_scope('table', clear=True): put_markdown("## 历史生成记录") put_text('暂无历史记录') def build(commit, domain, key_id, key): global build_in_progress if build_in_progress: toast("有项目正在构建中, 请构建完成后再执行新构建") return start_time = datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))) today = datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))).strftime('%Y%m%d') redis_key = f"build_count:{key}:{today}" count = r.get(redis_key) if count and int(count) >= 6: toast("您今天的构建次数已达上限,请明天再试。", color='warning') put_error("您今天的构建次数已达上限,请明天再试。") return r.incr(redis_key) r.expireat(redis_key, datetime.now().replace(hour=23, minute=59, second=59, microsecond=0)) build_in_progress = True # 添加绿色提示 toast('代码正在自动构建,请滑至底部查看,构建完成之后即可下载!', color='success') try: with use_scope('build_info', clear=True): put_text(f"选择构建: {commit['sha'][:7]}: {commit['commit']['message']}") put_text(f"构建域名: {domain}") put_text("提交构建中...") if trigger_build(domain, commit['sha']): put_success("构建任务已触发") time.sleep(3) run = get_latest_workflow_run() if run: put_text(f"构建 jobs ID: {run['id']}") jobs = monitor_workflow(run['id'], domain, commit) artifacts = get_artifacts(run['id']) while not artifacts: time.sleep(3) put_text('未获取构建产物, 重新获取中...', scope='build_info') artifacts = get_artifacts(run['id']) artifact = artifacts[0] artifact_name = artifact['name'] def download_artifact_with_check(): if datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))) - start_time > timedelta( days=3): put_error('构建产物已失效,请重新构建。', scope='build_info') return put_text('正在从服务器取回构建...', scope='build_info') job_id = jobs[0]['id'] if jobs else 'unknown' success = download_artifact(artifact, domain, job_id) if success: put_success('从服务器取回构建成功,请点击文件名下载。', scope='build_info') put_markdown(''' ### 构建提示 - 若上传解压后访问网站出现卡在首屏的情况, 重新构建即可解决 ''', scope='build_info') put_button(f"下载安装包", onclick=download_artifact_with_check, scope='build_info') conn = sqlPool.connection() c = conn.cursor() c.execute(''' INSERT INTO build_logs (key_id, build_domain, commit_hash, commit_msg, status, download_url, log, create_time, update_time, job_id, artifact_name) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ''', ( key_id, domain, commit['sha'], commit['commit']['message'], True, artifact['archive_download_url'] if artifacts else None, json.dumps(jobs) if jobs else None, start_time, datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))), jobs[0]['id'] if jobs else None, artifact_name )) conn.commit() conn.close() put_markdown("## 构建完成") put_text(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}") put_text( f"结束时间:{datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))).strftime('%Y-%m-%d %H:%M:%S')}") else: put_error("无法获取构建运行信息,请稍后重试") else: put_error("触发构建失败,请稍后重试") finally: build_in_progress = False def main(): global build_in_progress build_in_progress = False js_urls = [ "https://npm-private.onmicrosoft.cn/popper.js@1", "https://npm-private.onmicrosoft.cn/tippy.js@5" ] script_tags = """ <script> function loadScript(url) { return new Promise((resolve, reject) => { const script = document.createElement('script'); script.src = url; script.onload = resolve; script.onerror = reject; document.head.appendChild(script); }); } Promise.all([ %s ]).catch(error => console.error('Script loading error:', error)); </script> """ % ',\n '.join(f"loadScript('{url}')" for url in js_urls) put_html(script_tags) put_markdown("#系统最新版本前端代码安装包下载平台") put_markdown(""" ## 构建说明: - 一个密钥只能绑定一个域名 - 绑定根域名(例如 example.com)时,赠送一个 www.example.com 域名 """) while True: key = input("请输入您的密钥:", type=TEXT) key_info = check_key(key) if not key_info: put_error("无效的密钥,请重新输入。") continue key_id, bind_domain, status, expiration_time = key_info if not status: put_error("此密钥已被禁用。") continue # 将 expiration_time 转换为时区感知的 datetime 对象 if expiration_time: expiration_time = expiration_time.replace(tzinfo=timezone.utc).astimezone(timezone(timedelta(hours=8))) if expiration_time and expiration_time < datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))): put_error("此密钥已过期。") continue if not bind_domain: domain = input("请输入要绑定的域名:", type=TEXT) update_domain(key, domain) put_success(f"域名 {domain} 已成功绑定到您的密钥。") if is_second_level_domain(domain): put_info(f"您还可以使用 www.{domain}") bind_domain = domain else: put_success(f"您的密钥已绑定域名: {bind_domain}") if is_second_level_domain(bind_domain): put_info(f"您还可以使用 www.{bind_domain}") break page = 1 viewing_history = False while True: clear('table') clear('jobs') clear('build_info') with use_scope('table', clear=True): if viewing_history: view_build_history(key_id) else: put_markdown("## 版本列表:") commits = fetch_commits(page) if not commits: put_info("没有更多的 commits 了") if page > 1: page -= 1 else: break continue table = [['版本号', '更新信息', '上线时间', '操作']] for commit in commits: sha_short = commit['sha'][:7] message = commit['commit']['message'] if len(message) > 26: message_display = put_html( '<span id="hash_{}">{}</span>' '<script>' 'tippy("#hash_{}", {{' ' content: `{}`,'.format( sha_short, message[:26] + "...", sha_short, message.replace('\n', '<br>') if '\n' in message else message ) + ' allowHTML: true' '});' '</script>' ) else: message_display = message commit_time = datetime.strptime(commit['commit']['author']['date'], '%Y-%m-%dT%H:%M:%SZ') commit_time = commit_time.replace(tzinfo=timezone.utc).astimezone(timezone(timedelta(hours=8))) commit_time_str = commit_time.strftime('%Y-%m-%d %H:%M:%S') def make_build_handler(c): return lambda c=c: build(c, bind_domain, key_id, key) action_button = put_button('下载', onclick=make_build_handler(commit), small=True) table.append([ sha_short, message_display, commit_time_str, action_button ]) put_table(table) if build_in_progress and viewing_history: toast("构建中,请勿切换模式!", duration=3, color='warning') viewing_history = False page = 1 continue else: action_buttons = [] if not viewing_history: if page > 1: action_buttons.append(('查看上一页', 'prev')) action_buttons.append(('查看下一页', 'next')) action_buttons.append(('查看历史生成记录' if not viewing_history else '选择 Commit 构建', 'toggle_view')) if action_buttons: action = actions('请选择操作:', [btn[0] for btn in action_buttons]) action_map = {btn[0]: btn[1] for btn in action_buttons} if action_map[action] == 'next': page += 1 elif action_map[action] == 'prev': if page > 1: page -= 1 elif action_map[action] == 'toggle_view': viewing_history = not viewing_history page = 1 else: break else: time.sleep(1) put_markdown("感谢使用 在线构建平台!") if __name__ == '__main__': start_server(main, port=9003, host='', debug=False, cdn=False, static_dir=None, allowed_origins=None, check_origin=None, auto_open_webbrowser=False, session_expire_seconds=None, session_cleanup_interval=None, max_payload_size='200M', protocol='http',)相关用到的 API
以及相关的流水线 Action 代码
name: 构建并上传制品 on: workflow_dispatch: repository_dispatch: types: - buildArtifacts jobs: build: runs-on: ubuntu-latest steps: - name: 从仓库检出代码(切换至指定分支) uses: actions/checkout@v3 with: ref: ${{ github.event.client_payload.commit_hash || github.ref }} - name: 设置 Node.js 环境 uses: actions/setup-node@v3 with: node-version: '20.14.0' - name: 安装 pnpm run: npm install -g pnpm - name: 安装依赖项 run: pnpm install - name: 构建项目 (生产模式) env: BIND_DOMAIN: ${{ github.event.client_payload.bind_domain }} run: pnpm build - name: 上传构建产物 uses: actions/upload-artifact@v4 with: name: build-artifacts path: | dist/**如果可以实现的话,希望可以给到相关用到的API以及文档(希望可以一一对应),并且 迁移到CNB之后yaml又应该如何编写呢