Worker 开发接入指南
本文档介绍如何开发Worker服务接入Bella-Queue系统,作为任务处理的下游服务。
Worker开发方式
系统支持两种Worker开发方式:
- 自部署服务接入:独立开发的HTTP服务,通过API与Bella-Queue交互
- Java SDK接入:使用Bella-Queue提供的Java SDK快速集成
1. 准备工作:队列和渠道配置
开发Worker前,需要先在Bella-Queue中注册队列并配置渠道映射。
1.1 注册Worker队列
为Worker服务注册一个队列:
curl -X POST "https://${Host}/v1/queue/register" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer ${apikey}" \
-d '{
"queue": "deepseek-r1",
"endpoint": "/v1/chat/completions"
}'
参数说明
| 参数 | 类型 | 必需 | 说明 |
|---|---|---|---|
queue | string | Required | 队列名称(只能包含字母、数字、下划线、分隔符,长度不超过64字符) |
endpoint | string | Required | 处理任务的能力点 |
响应
"deepseek-r1"
1.2 配置模型渠道
通过OpenAPI创建渠道(Channel),建立模型名称与队列的映射关系。
作用:当用户使用特定模型时,Bella-Queue会根据渠道配置将任务自动路由到对应的队列。
渠道配置的具体接 口和参数请参考OpenAPI接口文档。
1.3 任务路由说明
配置完成后,Bella-Queue通过以下方式将任务放入对应的队列,Worker从队列中拉取任务进行处理:
直接指定队列(Task API)
curl -X POST "https://${Host}/v1/queue/put" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer ${apikey}" \
-d '{
"queue": "deepseek-r1",
"endpoint": "/v1/chat/completions",
"level": 1,
"data": {
"model": "deepseek-r1-20250401",
"messages": [
{
"role": "user",
"content": "你好"
}
],
"stream": false
},
"callback_url": "http://localhost:8081/test/callback",
}'
批量任务路由(Batch API)
指定队列
curl -X POST "https://${Host}/v1/batches" \
-H "Content-Type: application/json" \
-H "X-BELLA-QUEUE-NAME: deepseek-r1" \
-H "Authorization: Bearer ${apikey}" \
-d '{
"input_file_id": "file-XXXXXXXXXXXXXXXXXXXXXXX-XXXXXXXXXX",
"endpoint": "/v1/chat/completions",
"completion_window": "24h",
"metadata": {
"description": "Customer support chat completions batch"
}
}'
自动路由
curl -X POST "https://${Host}/v1/batches" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer ${apikey}" \
-d '{
"input_file_id": "file-XXXXXXXXXXXXXXXXXXXXXXX-XXXXXXXXXX",
"endpoint": "/v1/chat/completions",
"completion_window": "24h",
"metadata": {
"description": "Customer support chat completions batch"
}
}'
input_file中的每一行请求示例:
{"custom_id": "req-1", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "deepseek-r1", "messages": [...]}}
系统根据deepseek-r1模型自动查找渠道配置,路由到deepseek-r1队列。
2. 自部署服务接入
适合已有推理服务或需要使用其他技术栈的场景。
2.1 Worker服务开发
Worker服务需要实现以下核心功能:
2.1.1 获取系统配置
获取EventBus配置,用于实时任务通信:
curl -X GET "https://${Host}/v1/queue/eventbus" \
-H "Authorization: Bearer ${apikey}"
响应示例
{
"url": "redis://localhost:6379",
"topic": "bella:eventbus:"
}
2.1.2 拉取任务
定期从Bella-Queue拉取待处理任务:
curl -X POST "https://${Host}/v1/queue/take" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer ${apikey}" \
-d '{
"queues": [
"deepseek-r1:1"
],
"size": 10,
"strategy": "fifo"
}'
响应示例(离线队列 - Batch/Callback模式)
{
"deepseek-r1:1": [
{
"ak": "xxx",
"endpoint": "/v1/chat/completions",
"queue": "test-queue",
"level": 1,
"data": {
"model": "deepseek-r1",
"messages": [
{
"role": "user",
"content": "Hello"
}
],
"temperature": 0.7
},
"status": "waiting",
"task_id": "TASK-X-X-X-XXXXXXXXXXXX-XXXX-XXXXXX",
"batch_id": "BATCH-X-X-XXXXXXXXXXXX-XXXX-XXXXXX",
"trace_id": "BATCH-X-X-XXXXXXXXXXXX-XXXX-XXXXXX",
"start_time": 1762863595000,
"running_time": 0,
"expire_time": 1762949994000,
"completed_time": 0,
"callback_url": "",
"response_mode": "batch"
}
]
}
响应示例(在线队列 - Blocking/Streaming模式)
{
"deepseek-r1:0": [
{
"ak": "xxx",
"endpoint": "/v1/chat/completions",
"queue": "test-queue",
"level": 0,
"data": {
"model": "deepseek-r1",
"messages": [
{
"role": "user",
"content": "你好"
}
],
"stream": false
},
"task_id": "TASK-X-X-X-XXXXXXXXXXXX-XXXX-XXXXXX",
"instance_id": "xx.xx.xx.xx:xxxx",
"start_time": 1762950826730,
"running_time": 0,
"expire_time": 1762951126730,
"completed_time": 0,
"response_mode": "blocking"
}
]
}
instanceId说明:在线任务会携带instance_id字段,标识发起请求的客户端实例地址(如IP:端口),Worker处理完成后需要向Redis
Stream发送事件到对应的实例。
2.1.3 处理任务并返回结果
根据response_mode采用不同的结果返回方式:
Callback/Batch模式 - HTTP接口返回
curl -X POST "https://${Host}/v1/queue/{taskId}/complete" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer ${apikey}" \
-d '{
"status_code": 200,
"request_id": "TASK-X-X-X-XXXXXXXXXXXX-XXXX-XXXXXX",
"body": {
// Worker处理结果 - chat completion响应
"id": "chatcmpl-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"object": "chat.completion",
"model": "deepseek-r1",
"choices": [{
"message": {
"role": "assistant",
"content": "Hello! How can I help you today?"
}
}]
}
}'
Blocking/Streaming模式 - Redis事件返回
在线任务携带instance_id字段,Worker需要通过Redis Stream发送事件到:{eventbus.topic}{instance_id}
例如:eventbus.topic = "bella:eventbus:",instance_id = "192.168.1.100:8080"
则发送到:bella:eventbus:192.168.1.100:8080
进度事件
{
"name": "task-progress-event",
"from": "",
"payload": {
"taskId": "TASK-X-X-X-XXXXXXXXXXXX-XXXX-XXXXXX",
"eventId": "progress-1",
"eventName": "chunk",
"eventData": {
"delta": {
"content": "Hello"
}
}
},
"context": ""
}
完成事件
{
"name": "task-completion-event",
"from": "",
"payload": {
"taskId": "TASK-X-X-X-XXXXXXXXXXXX-XXXX-XXXXXX",
"result": {
"status_code": 200,
"request_id": "TASK-X-X-X-XXXXXXXXXXXX-XXXX-XXXXXX",
"body": {
"id": "chatcmpl-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"object": "chat.completion",
"choices": [
{
"message": {
"role": "assistant",
"content": "Hello! How can I help you today?"
}
}
]
}
}
},
"context": ""
}
3. Java SDK接入
适合Java技术栈的应用,可以快速集成到Spring Boot项目中。
3.1 添加依赖
<dependency>
<groupId>top.bella</groupId>
<artifactId>openapi-spi</artifactId>
<version>1.1.68</version>
</dependency>