import requests
import time
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://powertokens.ai/api"
HEADERS = {"Authorization": f"Bearer {API_KEY}"}
# ── 创建虚拟资产组 ───────────────────────────────────────
def create_group(group_name):
resp = requests.post(
f"{BASE_URL}/v1/asset/groups/create",
headers={**HEADERS, "Content-Type": "application/json"},
json={"group_name": group_name},
)
resp.raise_for_status()
return resp.json()["data"]["group_id"]
# ── 上传文件 ─────────────────────────────────────────────
def upload_asset(file_path, group_id):
with open(file_path, "rb") as f:
files = {"file": (file_path.split("/")[-1], f)}
data = {"group_id": group_id}
resp = requests.post(
f"{BASE_URL}/v1/asset/upload",
headers=HEADERS,
files=files,
data=data,
)
resp.raise_for_status()
return resp.json()["data"]["task_id"]
# ── 轮询获取 asset_id(此接口不需要鉴权)─────────────────
def get_asset_id(task_id, timeout=60, interval=3):
start = time.time()
while time.time() - start < timeout:
resp = requests.get(
f"{BASE_URL}/v1/asset/jobs/get-asset-id",
params={"task_id": task_id},
)
data = resp.json()
if data["code"] == 200 and data["data"].get("asset_id"):
return data["data"]["asset_id"]
time.sleep(interval)
raise TimeoutError(f"资产在 {timeout} 秒内未处理完成")
# ── 查看资产组内素材列表 ──────────────────────────────────
def list_group_assets(group_id, page=1, page_size=12):
resp = requests.post(
f"{BASE_URL}/v1/asset/groups/assets",
headers={**HEADERS, "Content-Type": "application/json"},
json={"page": page, "page_size": page_size, "group_id": group_id},
)
resp.raise_for_status()
return resp.json()["data"]
# ── 在生成请求中使用 ──────────────────────────────────────
def generate_video(asset_id, prompt):
resp = requests.post(
f"{BASE_URL}/v1/videos",
headers={**HEADERS, "Content-Type": "application/json"},
json={
"model": "wan-2.1-video",
"prompt": prompt,
"images": [asset_id],
},
)
resp.raise_for_status()
return resp.json()
# ── 运行 ────────────────────────────────────────────────
group_id = create_group("demo-project")
print(f"创建资产组 → group_id: {group_id}")
task_id = upload_asset("start_frame.jpg", group_id)
print(f"已上传 → task_id: {task_id}")
asset_id = get_asset_id(task_id)
print(f"处理完成 → asset_id: {asset_id}")
assets = list_group_assets(group_id)
print(f"资产组内共 {assets['total']} 个素材")
result = generate_video(asset_id, "赛博朋克风格的东京夜景")
print(f"生成任务已提交 → task_id: {result['data']['task_id']}")