neverminedio/movie-orchestrator-agent
A TypeScript-based orchestrator agent demonstrating how to process tasks using Nevermined's Payments API. This agent orchestrates workflows involving multiple steps and sub-agent tasks in a structured and secure pipeline.
This project is part of a larger workflow that explores the interconnection between agents and how can they communicate and work together. Please, refer to these projects in order to have a full view of the whole process
Movie Script Generator Agent:
Workflow Diagram:
The Orchestrator Agent leverages Nevermined's Payments API to enable structured workflows with clear payment and task execution flows. The Payments API allows agents to:
This project demonstrates a real-world implementation of these concepts, integrating multiple sub-agents:
Clone the repository:
git clone https://github.com/nevermined-io/movie-orchestrator-agent.git
cd orchestrator-agent
Install dependencies:
npm install
Configure environment variables: Copy .env.example
to .env
and populate it:
NVM_API_KEY=your_nevermined_api_key
NVM_ENVIRONMENT=testing # or staging or production
PLAN_DID=your_plan_did
IMAGE_GENERATOR_PLAN_DID=your_image_plan_did
AGENT_DID=your_agent_did
SCRIPT_GENERATOR_DID=your_script_did
IMAGE_GENERATOR_DID=your_image_generator_did
Build and run the project:
npm run build
npm start
Variable | Description |
---|---|
NVM_API_KEY | Your Nevermined API Key |
NVM_ENVIRONMENT | Environment (testing , staging , or production ) |
PLAN_DID | DID of the main subscription plan |
IMAGE_GENERATOR_PLAN_DID | DID of the plan for image generation |
AGENT_DID | DID of the orchestrator agent |
SCRIPT_GENERATOR_DID | DID of the script generator sub-agent |
IMAGE_GENERATOR_DID | DID of the image generator sub-agent |
In Nevermined, a Plan (PLAN_DID) represents a subscription that allows agents to execute tasks. Plans have credits that are consumed as tasks are executed.
Example:
Relationship Diagram:
[Plan A: PLAN_DID] ----------- [Orchestrator Agent]
|-- [Script Generator Agent]
|-- [Character Extractor Agent]
[Plan B: IMAGE_GENERATOR_PLAN_DID] -- [Image Generator Agent]
Init Step:
generateScript
, generateImagesForCharacters
).Step Lifecycle:
The handleInitStep
function initializes the workflow by defining the subsequent steps:
export async function handleInitStep(step: any, payments: any) {
const scriptStepId = generateStepId();
const videoStepId = generateStepId();
const steps = [
{ step_id: scriptStepId, task_id: step.task_id, name: "generateScript", predecessor: step.step_id },
{ step_id: videoStepId, task_id: step.task_id, name: "generateVideoForCharacters", predecessor: scriptStepId },
];
await payments.query.createSteps(step.did, step.task_id, { steps });
}
All steps are interlinked using predecessor
, maintaining the execution order.
Within each step, sub-tasks are created for specific agents:
export async function handleStepWithAgent(
step: any,
agentDid: string,
agentName: string,
planDid: string,
payments: any
) {
const hasBalance = await ensureSufficientBalance(planDid, step, payments);
if (!hasBalance) return;
const accessConfig = await payments.query.getServiceAccessConfig(agentDid);
const taskData = { query: step.input_query, name: step.name };
await payments.query.createTask(
agentDid,
taskData,
accessConfig,
async (data) => {
const taskLog = JSON.parse(data);
if (taskLog.task_status === "Completed") {
await validateScriptGenerationTask(
taskLog.task_id,
agentDid,
accessConfig,
step,
payments
);
}
}
);
}
When a step involves multiple tasks (e.g., generating images), all tasks must complete successfully before marking the step as completed:
export async function handleImageGenerationForCharacters(step: any, payments: any) {
const characters = JSON.parse(step.input_artifacts);
const tasks = characters.map((character) =>
queryAgentWithPrompt(step, generateTextToImagePrompt(character), "Image Generator", payments)
);
await Promise.all(tasks);
await payments.query.updateStep(step.did, {
...step,
step_status: "Completed",
output: "All image generation tasks completed successfully.",
});
}
The orchestrator ensures sufficient balance before executing tasks:
export async function ensureSufficientBalance(planDid: string, step: any, payments: any): Promise<boolean> {
const balance = await payments.getPlanBalance(planDid);
if (balance < 1) {
const orderResult = await payments.orderPlan(planDid);
if (!orderResult.success) {
await payments.query.updateStep(step.did, {
...step,
step_status: "Failed",
output: "Insufficient balance and failed to order credits.",
});
return false;
}
}
return true;
}
Input and Output Parameters:
step.input_query
: Primary input for a step.step.output
: Standard output of the step.step.input_artifacts
: Artifacts passed from the previous step.step.output_artifacts
: Artifacts generated by the current step.Flow:
output
/output_artifacts
) from one step are automatically used as inputs (input_query
/input_artifacts
) for the subsequent step.Example:
output
).input_query
.Copyright 2024 Nevermined AG
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
docker pull neverminedio/movie-orchestrator-agent