Worker Development Integration Guide
This document describes how to develop Worker services to integrate with the Bella-Queue system as downstream task processors.
Worker Development Methods
The system supports two Worker development methods:
- Self-hosted Service Integration: Independently developed HTTP services that interact with Bella-Queue through APIs
- Java SDK Integration: Quick integration using the Java SDK provided by Bella-Queue
1. Prerequisites: Queue and Channel Configuration
Before developing Workers, you need to register queues and configure channel mappings in Bella-Queue.
1.1 Register Worker Queue
Register a queue for your Worker service:
curl -X POST "https://${Host}/v1/queue/register" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer ${apikey}" \
-d '{
"queue": "deepseek-r1",
"endpoint": "/v1/chat/completions"
}'
Parameter Description
| Parameter | Type | Required | Description |
|---|---|---|---|
queue | string | Required | Queue name (can only contain letters, numbers, underscores, separators, max 64 characters) |
endpoint | string | Required | Task processing capability point |
Response
"deepseek-r1"
1.2 Configure Model Channels
Create channels through OpenAPI to establish mapping relationships between model names and queues.
Purpose: When users use specific models, Bella-Queue will automatically route tasks to the corresponding queues based on channel configuration.
For specific interfaces and parameters of channel configuration, please refer to the OpenAPI interface documentation.
1.3 Task Routing Description
After configuration, Bella-Queue places tasks into corresponding queues through the following methods, and Workers pull tasks from queues for processing:
Direct Queue Specification (Task API)
curl -X POST "https://${Host}/v1/queue/put" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer ${apikey}" \
-d '{
"queue": "deepseek-r1",
"endpoint": "/v1/chat/completions",
"level": 1,
"data": {"k":"v"},
"callback_url": "http://localhost:8081/test/callback",
}'
Batch Task Routing (Batch API)
Specify Queue
curl -X POST "https://${Host}/v1/batches" \
-H "Content-Type: application/json" \
-H "X-BELLA-QUEUE-NAME: deepseek-r1" \
-H "Authorization: Bearer ${apikey}" \
-d '{
"input_file_id": "file-XXXXXXXXXXXXXXXXXXXXXXX-XXXXXXXXXX",
"endpoint": "/v1/chat/completions",
"completion_window": "24h",
"metadata": {
"description": "Customer support chat completions batch"
}
}'
Automatic Routing
curl -X POST "https://${Host}/v1/batches" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer ${apikey}" \
-d '{
"input_file_id": "file-XXXXXXXXXXXXXXXXXXXXXXX-XXXXXXXXXX",
"endpoint": "/v1/chat/completions",
"completion_window": "24h",
"metadata": {
"description": "Customer support chat completions batch"
}
}'
{"custom_id": "req-1", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "deepseek-r1", "messages": [...]}}
The system automatically finds channel configuration based on the deepseek-r1 model and routes to the deepseek-r1 queue.
2. Self-hosted Service Integration
Suitable for scenarios with existing inference services or requirements to use other technology stacks.
2.1 Worker Service Development
Your Worker service needs to implement the following core functions:
2.1.1 Get System Configuration
Get EventBus configuration for real-time task communication:
curl -X GET "https://${Host}/v1/queue/eventbus" \
-H "Authorization: Bearer ${apikey}"
Response Example
{
"url": "redis://localhost:6379",
"topic": "bella:eventbus:"
}
2.1.2 Pull Tasks
Regularly pull pending tasks from Bella-Queue:
curl -X POST "https://${Host}/v1/queue/take" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer ${apikey}" \
-d '{
"queues": [
"deepseek-r1:1"
],
"size": 10,
"strategy": "fifo"
}'
Response Example (Offline Queue - Batch/Callback Mode)
{
"deepseek-r1:1": [
{
"ak": "xxx",
"endpoint": "/v1/chat/completions",
"queue": "test-queue",
"level": 1,
"data": {
"model": "deepseek-r1",
"messages": [
{
"role": "user",
"content": "Hello"
}
],
"temperature": 0.7
},
"status": "waiting",
"task_id": "TASK-X-X-X-XXXXXXXXXXXX-XXXX-XXXXXX",
"batch_id": "BATCH-X-X-XXXXXXXXXXXX-XXXX-XXXXXX",
"trace_id": "BATCH-X-X-XXXXXXXXXXXX-XXXX-XXXXXX",
"start_time": 1762863595000,
"running_time": 0,
"expire_time": 1762949994000,
"completed_time": 0,
"callback_url": "",
"response_mode": "batch"
}
]
}
Response Example (Online Queue - Blocking/Streaming Mode)
{
"deepseek-r1:0": [
{
"ak": "xxx",
"endpoint": "/v1/chat/completions",
"queue": "test-queue",
"level": 0,
"data": {
"model": "deepseek-r1",
"messages": [
{
"role": "user",
"content": "你好"
}
],
"stream": false
},
"task_id": "TASK-X-X-X-XXXXXXXXXXXX-XXXX-XXXXXX",
"instance_id": "xx.xx.xx.xx:xxxx",
"start_time": 1762950826730,
"running_time": 0,
"expire_time": 1762951126730,
"completed_time": 0,
"response_mode": "blocking"
}
]
}
instanceId Description: Online tasks carry an instance_id field that identifies the client instance address (such as IP:port) that initiated the request. After processing, Workers need to send events to the corresponding instance via Redis Stream.
2.1.3 Process Tasks and Return Results
Use different result return methods based on response_mode:
Callback/Batch Mode - HTTP Interface Return
curl -X POST "https://${Host}/v1/queue/{taskId}/complete" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer ${apikey}" \
-d '{
"status_code": 200,
"request_id": "TASK-X-X-X-XXXXXXXXXXXX-XXXX-XXXXXX",
"body": {
// Worker processing result - chat completion response
"id": "chatcmpl-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"object": "chat.completion",
"model": "deepseek-r1",
"choices": [{
"message": {
"role": "assistant",
"content": "Hello! How can I help you today?"
}
}]
}
}'
Blocking/Streaming Mode - Redis Event Return
Online tasks carry an instance_id field. Workers need to send events to Redis Stream at: {eventbus.topic}{instance_id}
For example: eventbus.topic = "bella:eventbus:", instance_id = "192.168.1.100:8080"
Send to: bella:eventbus:192.168.1.100:8080
Progress Event (Optional, for streaming mode)
{
"name": "task-progress-event",
"from": "",
"payload": {
"taskId": "TASK-X-X-X-XXXXXXXXXXXX-XXXX-XXXXXX",
"eventId": "progress-1",
"eventName": "chunk",
"eventData": {
"delta": {
"content": "Hello"
}
}
},
"context": ""
}
Completion Event (Required)
{
"name": "task-completion-event",
"from": "",
"payload": {
"taskId": "TASK-X-X-X-XXXXXXXXXXXX-XXXX-XXXXXX",
"result": {
"status_code": 200,
"request_id": "TASK-X-X-X-XXXXXXXXXXXX-XXXX-XXXXXX",
"body": {
"id": "chatcmpl-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"object": "chat.completion",
"choices": [
{
"message": {
"role": "assistant",
"content": "Hello! How can I help you today?"
}
}
]
}
}
},
"context": ""
}
Key Notes:
- Must use the
instance_idfrom the task as the Redis Stream target taskIdmust exactly match the pulled task ID- Completion events are required; progress events are optional only in streaming mode
3. Java SDK Integration
Suitable for Java technology stack applications, can be quickly integrated into Spring Boot projects.
3.1 Add Dependency
<dependency>
<groupId>top.bella</groupId>
<artifactId>openapi-spi</artifactId>
<version>1.1.68</version>
</dependency>
3.2 Enable Worker
@SpringBootApplication
@EnableWorker
public class WorkerApplication {
public static void main(String[] args) {
SpringApplication.run(WorkerApplication.class, args);
}
}
3.3 Configure OpenAiService
@Configuration
public class OpenAiConfig {
@Value("${openai.base-url}")
private String baseUrl;
@Value("${openai.token}")
private String token;
@Bean
public OpenAiService openAiService() {
return new OpenAiService(token, baseUrl);
}
}
3.4 Implement TaskExecutor
TaskExecutor is the core interface for Worker task processing, defining contracts for task submission, execution, and capacity management.
Interface Definition
public interface TaskExecutor {
/**
* Submit task for processing
* @param task Task wrapper containing task data and state management methods
*/
void submit(TaskWrapper task);
/**
* Return the current number of tasks that can be processed
* Worker can dynamically adjust the number of tasks pulled based on this parameter
* @return Remaining capacity, no new tasks will be pulled when returns 0
*/
Integer remainingCapacity();
}
TaskWrapper Method Description
getPayload(): Get task data submitted from upstream (Map format)markComplete(Object result): Mark task as complete, pass in processing resultmarkRetryLater(): Mark task for retry lateremitProgress(): Send progress events (streaming mode)
Implementation Example
@Component
public class TaskExecutorImpl implements TaskExecutor {
@Override
public void submit(TaskWrapper task) {
try {
// Get task data
Map<String, Object> data = task.getPayload();
// Determine task type
String responseMode = (String) data.get("responseMode");
if ("streaming".equals(responseMode)) {
// Streaming task processing
Map<String, Object> result = Map.of("content", "Processing...");
// Send progress event
task.emitProgress("onProgress", "onProgress", result);
// If completed, mark task as complete
task.markComplete(Map.of());
} else {
// Other task processing
Object result = Map.of("result", "success");
// Mark task as complete
task.markComplete(result);
}
} catch (Exception e) {
// Processing failed
if (shouldRetry(e)) {
task.markRetryLater();
} else {
task.markComplete(Map.of(
"error", Map.of(
"message", e.getMessage(),
"type", "processing_error"
)
));
}
}
}
@Override
public Integer remainingCapacity() {
return 10;
}
private boolean shouldRetry(Exception e) {
// Determine whether to retry
return false;
}
}
3.5 Configure Worker Scheduler
Default provides ScheduledWorker based on scheduled polling. Enable and configure scheduling parameters through configuration files.
Use Default ScheduledWorker
bella:
queue:
worker:
enabled: true
scheduled:
enabled: true
take-strategy: fifo
size: 10
queues:
- "my-task-queue:0"
- "my-task-queue:1"
Configuration Parameter Description
enabled: Whether to enable ScheduledWorker schedulerqueues: List of queues to monitor, format "queue-name:priority", smaller numbers have higher priority, tasks will be pulled according to configuration ordersize: Maximum number of tasks to pull each time, actual number is the smaller value between TaskExecutor.remainingCapacity() and sizetake-strategy: Queue pull strategyfifo: First-in-first-out, pull tasks according to queue entry time, earlier entries have higher priority, maximum 10 tasks can be pulledround_robin: Round-robin mode, alternately pull from multiple queuesactive_passive: Master-slave mode, prioritize pulling from master queue, pull from backup queue when master queue has no taskssequential: Global sequential execution, ensures only one task can run at a time per queue, avoiding concurrent competition
3.6 Custom Worker Scheduling Implementation
If the default ScheduledWorker doesn't meet requirements, you can reuse instantiated Worker components and customize task pulling and scheduling logic.
@Component
public class CustomWorkerScheduler {
@Autowired
private Worker worker;
@Scheduled(fixedDelay = 1000)
public void pullAndProcessTasks() {
// Custom task pulling and processing logic
worker.pullAndProcess();
}
}
4. Summary
- Worker is an independent service: Acts as a downstream task processor for Bella-Queue
- Queue registration is prerequisite: Workers need to register queues before receiving tasks
- Channel configuration establishes mapping: Let the system know which model tasks should be distributed to your Worker
- Support multiple response modes: Choose different result return methods according to user requirements
- Two development methods: Self-hosted services or Java SDK, choose according to technology stack