logo
2
10
WeChat Login

CNB 云原生构建是否支持全部由API调用,如何打造出一个在自己网站上触发并监控构建流程并下载产物的流水线(Github Actions已实现)#8

Resolved
created 2025-08-11
Edit

最终实现的效果如图:

3228601f-f78f-4580-b111-01006faf64eb.png

Github Action 版本 相关代码

import secrets
from pywebio.input import *
from pywebio.output import *
from pywebio import start_server, session
from datetime import datetime, timedelta, timezone
from db import sqlPool, r  # 引入 redis 连接
import requests
import json
import time
import os

GITHUB_TOKEN = ""  # 请替换为您的 GitHub Token
REPO_OWNER = ""
REPO_NAME = ""


def check_key(key):
    conn = sqlPool.connection()
    c = conn.cursor()
    c.execute('SELECT id, bind_domain, status, expiration_time FROM activation WHERE `key` = %s', (key,))
    result = c.fetchone()
    conn.close()
    return result


def update_domain(key, domain):
    conn = sqlPool.connection()
    c = conn.cursor()
    c.execute('UPDATE activation SET bind_domain = %s, update_time = %s WHERE `key` = %s',
              (domain, datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))), key))
    conn.commit()
    conn.close()


def get_build_logs(key_id):
    conn = sqlPool.connection()
    c = conn.cursor()
    c.execute('SELECT * FROM build_logs WHERE key_id = %s ORDER BY create_time DESC LIMIT 12', (key_id,))
    logs = c.fetchall()
    conn.close()
    return logs


def fetch_commits(page=1):
    url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/commits"
    headers = {
        "Authorization": f"token {GITHUB_TOKEN}",
        "Accept": "application/vnd.github.v3+json"
    }
    params = {
        "sha": "main",
        "since": "2024-10-11T08:00:00Z",
        "page": page,
        "per_page": 24
    }
    response = requests.get(url, headers=headers, params=params)

    if response.status_code != 200:
        print(f"Failed to fetch commits: {response.status_code} - {response.text}")
        return []

    try:
        commits = response.json()
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
        return []

    if not isinstance(commits, list):
        print(f"Unexpected response format: {commits}")
        return []

    filtered_commits = [commit for commit in commits if
                       ('[' in commit['commit']['message'] or '(' in commit['commit']['message']) and 
                       (']' in commit['commit']['message'] or ')' in commit['commit']['message']) and 
                       ':' in commit['commit']['message']]
    return filtered_commits


def trigger_build(domain, commit_hash):
    url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/dispatches"
    headers = {
        "Authorization": f"token {GITHUB_TOKEN}",
        "Accept": "application/vnd.github.v3+json"
    }
    data = {
        "event_type": "buildArtifacts",
        "client_payload": {
            "bind_domain": domain,
            "commit_hash": commit_hash
        }
    }
    response = requests.post(url, headers=headers, data=json.dumps(data))
    print(response.text)
    return response.status_code == 204


def get_latest_workflow_run():
    url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/actions/runs"
    headers = {
        "Authorization": f"token {GITHUB_TOKEN}",
        "Accept": "application/vnd.github.v3+json"
    }
    response = requests.get(url, headers=headers)
    runs = response.json().get('workflow_runs', [])

    if runs:
        return runs[0]
    return None


def get_run_jobs(run_id):
    url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/actions/runs/{run_id}/jobs"
    headers = {
        "Authorization": f"token {GITHUB_TOKEN}",
        "Accept": "application/vnd.github.v3+json"
    }
    response = requests.get(url, headers=headers)
    return response.json().get('jobs', [])


def format_time(time_str):
    if not time_str or time_str == 'N/A':
        return 'N/A'
    dt = datetime.strptime(time_str, '%Y-%m-%dT%H:%M:%SZ')
    dt = dt.replace(tzinfo=timezone.utc).astimezone(timezone(timedelta(hours=8)))
    return dt.strftime('%Y-%m-%d %H:%M:%S')


def monitor_workflow(run_id, domain, commit):
    put_markdown("## 正在监控构建进度...")
    while True:
        jobs = get_run_jobs(run_id)
        if jobs:
            with use_scope('jobs', clear=True):
                for job in jobs:
                    put_markdown(f"### 任务: {job['name']} (ID: {job['id']})")
                    steps_table = [['步骤序号', '步骤名称', '状态', '开始时间', '结束时间']]
                    for step in job.get('steps', []):
                        if "Post" in step['name']:
                            continue
                        
                        if step['status'] == 'completed':
                            status = '成功' if step['conclusion'] == 'success' else '失败'
                        elif step['status'] == 'in_progress':
                            status = '进行中'
                        elif step['status'] == 'pending':
                            status = '未开始'
                        else:
                            status = '失败'
                            
                        status_tag = put_html(
                            f'<span style="background-color: {"#28a745" if status == "成功" else ("#007bff" if status == "进行中" else ("#ffc107" if status == "未开始" else "#dc3545"))}; color: white; padding: 2px 6px; border-radius: 3px;">{status}</span>')
                        steps_table.append([
                            step['number'],
                            step['name'],
                            status_tag,
                            format_time(step.get('started_at', 'N/A')),
                            format_time(step.get('completed_at', 'N/A'))
                        ])
                    put_table(steps_table)
            if all(job['status'] == 'completed' for job in jobs):
                break
            else:
                time.sleep(5)
        else:
            time.sleep(5)

    put_success("构建完成!")
    return jobs


def get_artifacts(run_id):
    url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/actions/runs/{run_id}/artifacts"
    headers = {
        "Authorization": f"token {GITHUB_TOKEN}",
        "Accept": "application/vnd.github.v3+json"
    }
    response = requests.get(url, headers=headers)
    return response.json().get('artifacts', [])


def download_artifact(artifact, domain, job_id):
    artifact_url = artifact['archive_download_url']
    response = requests.get(artifact_url, headers={
        "Authorization": f"token {GITHUB_TOKEN}",
        "Accept": "application/vnd.github.v3+json"
    })
    if response.status_code == 200:
        os.makedirs('download', exist_ok=True)
        filename = f"{artifact['name']}-{domain}-{job_id}.zip"
        filepath = os.path.join('download', filename)
        with open(filepath, 'wb') as f:
            f.write(response.content)
        put_file(filename, open(filepath, 'rb').read(), scope='build_info')
        return True
    else:
        put_error('下载失败,可能是下载链接已过期。')
        return False


def download_artifact_by_url(download_url, build_time, domain, job_id, artifact_name):
    toast('正在取回构建文件,请滑至底部查看!', color='success')
    if datetime.now() - build_time > timedelta(days=3):
        put_error('构建产物已失效,请重新构建。', scope='build_info')
        return
    put_text('正在从服务器取回构建...', scope='build_info')
    response = requests.get(download_url, headers={
        "Authorization": f"token {GITHUB_TOKEN}",
        "Accept": "application/vnd.github.v3+json"
    })
    if response.status_code == 200:
        os.makedirs('download', exist_ok=True)
        filename = f"{artifact_name}-{domain}-{job_id}.zip"
        filepath = os.path.join('download', filename)
        with open(filepath, 'wb') as f:
            f.write(response.content)
        put_file(filename, open(filepath, 'rb').read(), scope='build_info')
        put_success('从服务器取回构建成功,请点击文件名下载。', scope='build_info')
        put_markdown('''
        ### 构建提示
        - 若上传解压后访问网站出现卡在首屏的情况, 重新构建即可解决
        ''', scope='build_info')
    else:
        put_error('下载失败,可能是下载链接已过期。', scope='build_info')


def is_second_level_domain(domain):
    parts = domain.split('.')
    return len(parts) == 2


def view_build_history(key_id):
    logs = get_build_logs(key_id)
    if logs:
        table = [['时间', '域名', 'Commit 信息', '状态', '操作']]
        for log in logs:
            build_time = log[10]
            build_time_str = build_time.strftime('%Y-%m-%d %H:%M:%S') if isinstance(build_time,
                                                                                    datetime) else str(build_time)
            domain = log[2]
            commit_hash = log[3]
            commit_msg = log[4]
            if len(commit_msg) > 20:
                commit_msg_display = put_html(
                    '<span id="hash_{}">{}</span>'
                    '<script>'
                    'tippy("#hash_{}", {{'
                    '    content: `{}`,'.format(
                        commit_hash,
                        commit_msg[:20] + "...",
                        commit_hash,
                        commit_msg.replace('\n', '<br>') if '\n' in commit_msg else commit_msg
                    ) + 
                    '    allowHTML: true'
                    '}});'
                    '</script>'
                )
            else:
                commit_msg_display = commit_msg
            status = log[5]

            if status:
                status_tag = put_html(
                    '<span style="background-color: #28a745; color: white; padding: 2px 6px; border-radius: 3px;">成功</span>')
            else:
                status_tag = put_html(
                    '<span style="background-color: #dc3545; color: white; padding: 2px 6px; border-radius: 3px;">失败</span>')

            download_url = log[8]
            job_id = log[5]
            artifact_name = log[9]

            if download_url:
                def make_download_handler(url, build_time, domain, job_id, artifact_name):
                    return lambda: download_artifact_by_url(url, build_time, domain, job_id, artifact_name)

                action = put_button('下载', onclick=make_download_handler(download_url, build_time, domain, job_id,
                                                                          artifact_name), small=True)
            else:
                action = '无'

            table.append([build_time_str, domain, commit_msg_display, status_tag, action])

        with use_scope('table', clear=True):
            put_markdown("## 历史生成记录")
            put_table(table)
            with use_scope('build_info', clear=True):
                pass
    else:
        with use_scope('table', clear=True):
            put_markdown("## 历史生成记录")
            put_text('暂无历史记录')


def build(commit, domain, key_id, key):
    global build_in_progress
    if build_in_progress:
        toast("有项目正在构建中, 请构建完成后再执行新构建")
        return
    start_time = datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8)))

    today = datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))).strftime('%Y%m%d')
    redis_key = f"build_count:{key}:{today}"
    count = r.get(redis_key)
    if count and int(count) >= 6:
        toast("您今天的构建次数已达上限,请明天再试。", color='warning')
        put_error("您今天的构建次数已达上限,请明天再试。")
        return

    r.incr(redis_key)
    r.expireat(redis_key, datetime.now().replace(hour=23, minute=59, second=59, microsecond=0))

    build_in_progress = True

    # 添加绿色提示
    toast('代码正在自动构建,请滑至底部查看,构建完成之后即可下载!', color='success')

    try:
        with use_scope('build_info', clear=True):
            put_text(f"选择构建: {commit['sha'][:7]}: {commit['commit']['message']}")
            put_text(f"构建域名: {domain}")
            put_text("提交构建中...")

            if trigger_build(domain, commit['sha']):
                put_success("构建任务已触发")

                time.sleep(3)

                run = get_latest_workflow_run()
                if run:
                    put_text(f"构建 jobs ID: {run['id']}")
                    jobs = monitor_workflow(run['id'], domain, commit)

                    artifacts = get_artifacts(run['id'])
                    while not artifacts:
                        time.sleep(3)
                        put_text('未获取构建产物, 重新获取中...', scope='build_info')
                        artifacts = get_artifacts(run['id'])
                    artifact = artifacts[0]
                    artifact_name = artifact['name']

                    def download_artifact_with_check():
                        if datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))) - start_time > timedelta(
                                days=3):
                            put_error('构建产物已失效,请重新构建。', scope='build_info')
                            return
                        put_text('正在从服务器取回构建...', scope='build_info')
                        job_id = jobs[0]['id'] if jobs else 'unknown'
                        success = download_artifact(artifact, domain, job_id)
                        if success:
                            put_success('从服务器取回构建成功,请点击文件名下载。', scope='build_info')

                            put_markdown('''
                            ### 构建提示
                            - 若上传解压后访问网站出现卡在首屏的情况, 重新构建即可解决
                            ''', scope='build_info')

                    put_button(f"下载安装包", onclick=download_artifact_with_check, scope='build_info')

                    conn = sqlPool.connection()
                    c = conn.cursor()
                    c.execute('''
                    INSERT INTO build_logs (key_id, build_domain, commit_hash, commit_msg, status, download_url, log, create_time, update_time, job_id, artifact_name)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    ''', (
                        key_id,
                        domain,
                        commit['sha'],
                        commit['commit']['message'],
                        True,
                        artifact['archive_download_url'] if artifacts else None,
                        json.dumps(jobs) if jobs else None,
                        start_time,
                        datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))),
                        jobs[0]['id'] if jobs else None,
                        artifact_name
                    ))
                    conn.commit()
                    conn.close()

                    put_markdown("## 构建完成")
                    put_text(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}")
                    put_text(
                        f"结束时间:{datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))).strftime('%Y-%m-%d %H:%M:%S')}")
                else:
                    put_error("无法获取构建运行信息,请稍后重试")
            else:
                put_error("触发构建失败,请稍后重试")
    finally:
        build_in_progress = False


def main():
    global build_in_progress
    build_in_progress = False
    js_urls = [
        "https://npm-private.onmicrosoft.cn/popper.js@1",
        "https://npm-private.onmicrosoft.cn/tippy.js@5"
    ]

    script_tags = """
        <script>
            function loadScript(url) {
                return new Promise((resolve, reject) => {
                    const script = document.createElement('script');
                    script.src = url;
                    script.onload = resolve;
                    script.onerror = reject;
                    document.head.appendChild(script);
                });
            }

            Promise.all([
                %s
            ]).catch(error => console.error('Script loading error:', error));
        </script>
    """ % ',\n                '.join(f"loadScript('{url}')" for url in js_urls)

    put_html(script_tags)
    put_markdown("#系统最新版本前端代码安装包下载平台")

    put_markdown("""
    ## 构建说明:
    - 一个密钥只能绑定一个域名
    - 绑定根域名(例如 example.com)时,赠送一个 www.example.com 域名
    """)

    while True:
        key = input("请输入您的密钥:", type=TEXT)
        key_info = check_key(key)

        if not key_info:
            put_error("无效的密钥,请重新输入。")
            continue

        key_id, bind_domain, status, expiration_time = key_info

        if not status:
            put_error("此密钥已被禁用。")
            continue

        # 将 expiration_time 转换为时区感知的 datetime 对象
        if expiration_time:
            expiration_time = expiration_time.replace(tzinfo=timezone.utc).astimezone(timezone(timedelta(hours=8)))

        if expiration_time and expiration_time < datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))):
            put_error("此密钥已过期。")
            continue

        if not bind_domain:
            domain = input("请输入要绑定的域名:", type=TEXT)
            update_domain(key, domain)
            put_success(f"域名 {domain} 已成功绑定到您的密钥。")
            if is_second_level_domain(domain):
                put_info(f"您还可以使用 www.{domain}")
            bind_domain = domain
        else:
            put_success(f"您的密钥已绑定域名: {bind_domain}")
            if is_second_level_domain(bind_domain):
                put_info(f"您还可以使用 www.{bind_domain}")

        break

    page = 1
    viewing_history = False

    while True:
        clear('table')
        clear('jobs')
        clear('build_info')
        with use_scope('table', clear=True):
            if viewing_history:
                view_build_history(key_id)
            else:
                put_markdown("## 版本列表:")
                commits = fetch_commits(page)
                if not commits:
                    put_info("没有更多的 commits 了")
                    if page > 1:
                        page -= 1
                    else:
                        break
                    continue

                table = [['版本号', '更新信息', '上线时间', '操作']]
                for commit in commits:
                    sha_short = commit['sha'][:7]
                    message = commit['commit']['message']
                    if len(message) > 26:
                        message_display = put_html(
                            '<span id="hash_{}">{}</span>'
                            '<script>'
                            'tippy("#hash_{}", {{'
                            '    content: `{}`,'.format(
                                sha_short,
                                message[:26] + "...",
                                sha_short,
                                message.replace('\n', '<br>') if '\n' in message else message
                            ) + 
                            '    allowHTML: true'
                            '});'
                            '</script>'
                        )
                    else:
                        message_display = message
                    commit_time = datetime.strptime(commit['commit']['author']['date'], '%Y-%m-%dT%H:%M:%SZ')
                    commit_time = commit_time.replace(tzinfo=timezone.utc).astimezone(timezone(timedelta(hours=8)))
                    commit_time_str = commit_time.strftime('%Y-%m-%d %H:%M:%S')

                    def make_build_handler(c):
                        return lambda c=c: build(c, bind_domain, key_id, key)

                    action_button = put_button('下载', onclick=make_build_handler(commit), small=True)

                    table.append([
                        sha_short,
                        message_display,
                        commit_time_str,
                        action_button
                    ])
                put_table(table)

        if build_in_progress and viewing_history:
            toast("构建中,请勿切换模式!", duration=3, color='warning')
            viewing_history = False
            page = 1
            continue
        else:
            action_buttons = []
            if not viewing_history:
                if page > 1:
                    action_buttons.append(('查看上一页', 'prev'))
                action_buttons.append(('查看下一页', 'next'))
            action_buttons.append(('查看历史生成记录' if not viewing_history else '选择 Commit 构建', 'toggle_view'))

        if action_buttons:
            action = actions('请选择操作:', [btn[0] for btn in action_buttons])
            action_map = {btn[0]: btn[1] for btn in action_buttons}

            if action_map[action] == 'next':
                page += 1
            elif action_map[action] == 'prev':
                if page > 1:
                    page -= 1
            elif action_map[action] == 'toggle_view':
                viewing_history = not viewing_history
                page = 1
            else:
                break
        else:
            time.sleep(1)

    put_markdown("感谢使用 在线构建平台!")


if __name__ == '__main__':
    start_server(main, port=9003, host='', debug=False, cdn=False, static_dir=None, allowed_origins=None,
                 check_origin=None, auto_open_webbrowser=False, session_expire_seconds=None,
                 session_cleanup_interval=None, max_payload_size='200M', protocol='http',)

相关用到的 API

以及相关的流水线 Action 代码

name: 构建并上传制品

on:
  workflow_dispatch:
  repository_dispatch:
    types:
      - buildArtifacts

jobs:
  build:
    runs-on: ubuntu-latest

    steps:
      - name: 从仓库检出代码(切换至指定分支)
        uses: actions/checkout@v3
        with:
          ref: ${{ github.event.client_payload.commit_hash || github.ref }}

      - name: 设置 Node.js 环境
        uses: actions/setup-node@v3
        with:
          node-version: '20.14.0'

      - name: 安装 pnpm
        run: npm install -g pnpm

      - name: 安装依赖项
        run: pnpm install

      - name: 构建项目 (生产模式)
        env:
          BIND_DOMAIN: ${{ github.event.client_payload.bind_domain }}
        run: pnpm build

      - name: 上传构建产物
        uses: actions/upload-artifact@v4
        with:
          name: build-artifacts
          path: |
            dist/**

如果可以实现的话,希望可以给到相关用到的API以及文档(希望可以一一对应),并且 迁移到CNB之后yaml又应该如何编写呢

changed title
CNB 自定义构建是否支持全部由API调用,如何打造出一个在自己网站上触发并监控构建流程并下载产物的流水线(Github Actions已实现)
CNB 云原生构建是否支持全部由API调用,如何打造出一个在自己网站上触发并监控构建流程并下载产物的流水线(Github Actions已实现)
added labels
有效问题
assigned self
referenced ISSUE
assigned self
unassigned
Developer

haorwen/cnb-build-platform · Cloud Native Build按照你的原本的代码写了一份,直接参考里面的api就可以了,cnb.yml文件的编写也可以参考仓库里

added labels
擂主团已回答
added labels
优质问题
Resolved ISSUE
Assignee
(haorwen)
Label
优质问题
擂主团已回答
有效问题
Priority
None yet
Time period
-
Property
Add custom properties to record and label key information
Participant