import requests
import time
import os
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://api.yelinai.com/v1"
# 步骤1: 创建视频任务
def create_video_task(prompt, model="veo-3.1", image_paths=None):
"""创建视频生成任务
Args:
prompt: 视频生成提示词
model: 模型名称
image_paths: 图片路径,可以是:
- None: 文生视频
- str: 单张图片路径
- list: 多张图片路径(首尾帧模式,最多2张)
"""
url = f"{BASE_URL}/videos"
headers = {"Authorization": f"Bearer {API_KEY}"}
if image_paths:
# 统一转为列表处理
if isinstance(image_paths, str):
image_paths = [image_paths]
# 图生视频:使用 multipart/form-data 上传图片
files = []
for path in image_paths:
if not os.path.exists(path):
raise FileNotFoundError(f"图片文件不存在: {path}")
files.append(("input_reference", (os.path.basename(path), open(path, 'rb'), "image/jpeg")))
data = {"model": model, "prompt": prompt}
response = requests.post(url, headers=headers, files=files, data=data)
# 关闭文件句柄
for _, (_, f, _) in files:
f.close()
else:
# 文生视频:使用 JSON 格式
headers["Content-Type"] = "application/json"
data = {"model": model, "prompt": prompt}
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
return response.json()
# 步骤2: 轮询查询状态
def wait_for_video(video_id, poll_interval=5, timeout=600):
"""等待视频生成完成"""
url = f"{BASE_URL}/videos/{video_id}"
headers = {"Authorization": f"Bearer {API_KEY}"}
start_time = time.time()
while True:
# 检查超时
if time.time() - start_time > timeout:
raise TimeoutError(f"视频生成超时({timeout}秒)")
# 查询状态
response = requests.get(url, headers=headers)
response.raise_for_status()
task = response.json()
status = task["status"]
print(f"状态: {status}")
if status == "completed":
return task
elif status == "failed":
raise Exception(f"生成失败")
# 等待后重试
time.sleep(poll_interval)
# 步骤3: 获取视频内容
def get_video_content(video_id):
"""获取视频内容和URL"""
url = f"{BASE_URL}/videos/{video_id}/content"
headers = {"Authorization": f"Bearer {API_KEY}"}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
# 步骤4: 下载视频
def download_video(video_url, save_path="video.mp4"):
"""下载视频文件"""
response = requests.get(video_url, stream=True)
response.raise_for_status()
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"视频已保存到: {save_path}")
# 完整流程
def generate_video_async(prompt, model="veo-3.1"):
"""异步生成视频的完整流程"""
print("1. 创建视频任务...")
task = create_video_task(prompt, model)
video_id = task["id"]
print(f" 任务ID: {video_id}")
print("\n2. 等待视频生成...")
completed_task = wait_for_video(video_id)
print(" 生成完成!")
print("\n3. 获取视频内容...")
content = get_video_content(video_id)
video_url = content["url"]
print(f" 视频URL: {video_url}")
print("\n4. 下载视频...")
download_video(video_url)
print("\n✅ 完成!")
# 使用示例
if __name__ == "__main__":
# 文生视频 - 竖屏标准版
# 文生视频 - 竖屏标准版
generate_video_async(
prompt="一只可爱的猫咪在阳光明媚的花园里玩球",
model="veo-3.1"
)
# 图生视频 - 单图模式
# task = create_video_task(
# prompt="让这只猫咪慢慢眨眼睛",
# model="veo-3.1-fl",
# image_paths="test.jpg"
# )
# 图生视频 - 首尾帧模式(传入2张图片)
# 第一张图作为开始帧,第二张图作为结束帧
# task = create_video_task(
# prompt="让画面从开始帧平滑过渡到结束帧,加入动态效果",
# model="veo-3.1-landscape-fl",
# image_paths=["start_frame.jpg", "end_frame.jpg"]
# )
# 文生视频 - 横屏快速版
# generate_video_async(
# prompt="日落时分,海浪轻轻拍打沙滩",
# model="veo-3.1-landscape-fast"
# )