Task-api
Welcome to the documentation of Task API, here you can find anything you need to know about it, from a high-level overview to the low level-details. Use the sidebar to quickly navigate to the chapter you are most interested in.
What’s the Task API?
Golem - application communication interface
The Task API is a python library containing a programming interface and utility functions. When using this library for their project, developers can build apps and have them easily run on Golem.
Thanks to the Task API you can leverage all the computing power in the Golem Network and not rely on centralized cloud providers.
In more layman terms, the Task API is an agreement or protocol between Golem and apps so they can interoperate. Golem can load computational environments where apps can run. All these layers have been split so we can add environments (like Docker, gWASM) and apps (like Blender, and Tutorialapp).
Task-API is...
- A GitHub repo with a code library task-api
- A protocol definition: these are messages and functions describing how the task-api client and server interact. The library used for this is protobuf
- A python implementation with:
- interfaces and helpers for a client and server, the library used for this is grpclib
- helper functions for testing
- build tools (in progress)
- A task-api client implementation inside the Golem core
- environments and runtimes (a runtime is a computation job like requesting or providing a task)
- computational environments, a docker installed on the system and configured properly, that spawn runtime when an application is run (currently, docker_cpu and docker_gpu are the only environments available)
- applications, aka task-api servers
- application descriptor JSON files in the data-dir
- loaded, registered and benchmarked on startup
- enabled by the user
- application descriptor JSON files in the data-dir
- App descriptor file:
this JSON file is only required by the requestor, the provider needs to whitelist the prerequisite source to pick up the task
- Name
- Version
- requestor prerequisites (docker image)
- Environment
- Market strategy
- Support for any task-api server in any environment
- applications can be added for requestors by adding the JSON to the golem data-dir
- all applications can be provided for as long as the environment is enabled and the prerequisite source is whitelisted
- environments can be added in golem core releases
- a task-api server implementation in blenderapp and tutorialapp
- blenderapp is an extraction of the current blender logic in golem core
- tutorialapp is an example application running a simple proof of work computation
How does it work? Requestor
With the golem application and network
Task JSON file
Just like with the current golem tasks, you create a task by passing input data in the form of JSON to the golemapp running in the background.
When using the GUI this JSON will be built under the hood for you.
The only difference between the old and new JSON is for task-api the inputs are split into 2 groups, golem
and app
.
The golem
inputs are required by the requestor to configure the output paths and market options.
The app
inputs differ per app, as examples the blenderapp
needs a scene_file
to render and the tutorialapp
needs a difficulty
input for the complexity of the computation.
Creating the task
When creating a task in golem the JSON is checked and a task-id is generated.
Both are stored in the database with the task in creating
state.
When saved, golem prepares the input folder for the computation
Then the task-api app/server is called with the task-id and the app block from the input JSON ( RequestorAppClient.create_task()
).
The app should do a local test to validate the input data and return the environment, prerequisites and memory requirements for the provider.
These are added to the database, as they are needed to give out subtasks.
Last but not least the task is put in waiting
state, ready for subtasks to pick up
Spreading tasks over the network
Periodically Golem nodes ask each other for a list of known tasks. This list starts with your tasks and ends with known tasks from other requestors. This way the task gets stored in more and more potential providers. Providers pick a random task to send an offer for when they are not doing any computations.
Assigning subtasks
When an offer is received, its task is checked in the database ( not finished yet ) and the task-api app/server is asked if there are available subtasks ( RequestorAppClient.has_pending_subtasks()
).
From the first offer, there is a 15-second window for other providers to apply before the first set of subtasks is assigned and given out.
After the offers are selected the task is checked again and the task-api app/server is asked for a subtask ( RequestorAppClient.next_subtask()
)
The subtask data is added to a TaskToCompute message and send to the Provider.
With the first assigned subtask, the task state is updated to computing
Verify computation
The provider will send a SubtaskComputeResult message with the output data of the subtask when it is done.
Golem will check the task status and call the task-api app/server for a verification RequestorAppClient.verify()
Based on the result of the verification, Golem will send: a SubtaskResultAccepted and mark the subtask as completed, or a SubtaskResultRejected and mark the subtask as failed.
When the last subtask is marked as completed the output is copied to the output folder from the task JSON and the task is marked as completed.
How does it work? Provider
With the golem application and network
Receiving and responding to a task header
From the provider side, a task starts with receiving a task header that is being broadcasted over the network. This can be from the requestor directly or via another golem node. When the provider is not computing, it picks a random supported task from the task keeper every few seconds and sends an offer to compute. Supported means the environment is enabled and the prerequisites are installed and benchmarked successfully.
Receiving a subtask to compute
Since there has been some time from the moment the offer was sent, the provider first re-checks the previous checks: - still not computing any task - environment still enabled - prerequisites installed and benchmarked
Then the provider checks if it has enough disk space and if the requestor has enough funds to pay for the subtask.
When all checks have passed the resources are downloaded and the ProviderAppClient.compute
call is made to the task-api app/server.
The results of the subtask are sent back in a ReportComputedSubtask message. In both failure and success scenarios.
On failure the error is returned, on success, the output folder of the compute() is passed along.
The API
Introduction
This repository contains the interface that the Golem compatible application should implement as well as constants used by the protocol. The interface and the constants are defined under the golem_task_api/proto
directory in the Protocol Buffers files.
This repository also contains programming language-specific packages of the gRPC
protocol which may be used for concrete implementation. This is for the ease of development of the application but it's not required to use them in the application.
If you don't see a programming language you're interested in, feel free to create an issue or even a pull request and we will add it.
The API is divided into two independent parts - requestor and provider.
Requestor API
Requestor
service RequestorApp {
rpc CreateTask (CreateTaskRequest) returns (CreateTaskReply) {}
rpc NextSubtask (NextSubtaskRequest) returns (NextSubtaskReply) {}
rpc Verify (VerifyRequest) returns (VerifyReply) {}
rpc DiscardSubtasks (DiscardSubtasksRequest) returns (DiscardSubtasksReply) {}
rpc RunBenchmark (RunBenchmarkRequest) returns (RunBenchmarkReply) {}
rpc HasPendingSubtasks (HasPendingSubtasksRequest) returns (HasPendingSubtasksReply) {}
rpc AbortTask (AbortTaskRequest) returns (AbortTaskReply) {}
rpc AbortSubtask (AbortSubtaskRequest) returns (AbortSubtaskReply) {}
rpc Shutdown (ShutdownRequest) returns (ShutdownReply) {}
}
For requestor, the app should implement a long-running RPC service that implements the RequestorApp
interface from the proto files. The app should assume it will have access to a single directory (let's call it work_dir
). Each task will have its separate working directory under the main work_dir
. You can assume that for a given task_id
the first call will always be CreateTask
and the following directories will exist under work_dir
and they will be empty:
{task_id}
{task_id}/{constants.TASK_INPUTS_DIR}
{task_id}/{constants.SUBTASK_INPUTS_DIR}
{task_id}/{constants.TASK_OUTPUTS_DIR}
{task_id}/{constants.SUBTASK_OUTPUTS_DIR}
RPC methods
CreateTask
rpc CreateTask (CreateTaskRequest) returns (CreateTaskReply) {}
message CreateTaskRequest {
string task_id = 1;
int32 max_subtasks_count = 2;
string task_params_json = 3;
}
message CreateTaskReply {
string env_id = 1;
string prerequisites_json = 2;
Infrastructure inf_requirements = 3;
}
message Infrastructure {
float min_memory_mib = 1;
}
- Takes three arguments:
task_id
,max_subtasks_count
, andtask_params_json
. - Should treat
{work_dir}/{task_id}
as the working directory for the given task. task_params_json
is a JSON string containing app-specific task parameters. Format of these parameters is entirely up to the application developer.- Will only be called once with given
task_id
. - Can assume that
task_id
is unique per node. - Can assume
{task_id}/{constants.TASK_INPUTS_DIR}
contains all the resources provided by task creator. - Returns
env_id
andprerequisites_json
specifying the environment and prerequisites required for providers to compute the task. See environments section for details.
NextSubtask
rpc NextSubtask (NextSubtaskRequest) returns (NextSubtaskReply) {}
message NextSubtaskRequest {
string task_id = 1;
string subtask_id = 2;
string opaque_node_id = 3;
}
message NextSubtaskReply {
// workaround for the lack of null values in proto3
// where "one of" means "at most one of"
oneof subtask_oneof {
SubtaskReply subtask = 1;
}
}
message SubtaskReply {
string subtask_params_json = 1;
repeated string resources = 2;
}
- Takes three arguments:
task_id
,subtask_id
andopaque_node_id
. opaque_node_id
is an identifier of the node which is going to compute the requested subtask. 'Opaque' means that the identifier doesn't allow to obtain any further information about the node (e.g. public key, IP address).- Can assume that
subtask_id
is unique per node. - Can assume
CreateTask
was called earlier with the sametask_id
. - Can return an empty message meaning that the app refuses to assign a subtask to the provider node (for whatever reason).
- Returns
subtask_params_json
which is the JSON string containing subtask specific parameters. - Also returns
resources
which is a list of names of files required for computing the subtask. Files with these names are required to be present in{task_id}/{constants.SUBTASK_INPUTS_DIR}
directory.
Verify
rpc Verify (VerifyRequest) returns (VerifyReply) {}
message VerifyRequest {
string task_id = 1;
string subtask_id = 2;
}
message VerifyReply {
enum VerifyResult {
SUCCESS = 0;
FAILURE = 1;
AWAITING_DATA = 2;
INCONCLUSIVE = 3;
}
VerifyResult result = 1;
string reason = 2;
}
- Takes two arguments:
task_id
andsubtask_id
which specify which subtask results should be verified. - Will be called with only valid
task_id
andsubtask_id
values. - Returns
result
which is one of the defined verification result statuses:SUCCESS
- the subtask was computed correctly,FAILURE
- the subtask was computed incorrectly,INCONCLUSIVE
- cannot determine whether the subtask was computed correctly,AWAITING_DATA
- cannot perform verification until results of other subtasks are available.
- Also returns
reason
which is a string providing more detail about the result. - For successfully verified subtasks it can also perform merging of the partial results into the final one.
DiscardSubtasks
rpc DiscardSubtasks (DiscardSubtasksRequest) returns (DiscardSubtasksReply) {}
message DiscardSubtasksRequest {
string task_id = 1;
repeated string subtask_ids = 2;
}
message DiscardSubtasksReply {
repeated string discarded_subtask_ids = 1;
}
- Takes two arguments:
task_id
andsubtask_ids
. - Should discard results of given subtasks and any dependent subtasks.
- Returns list of subtask IDs that have been discarded.
- In a simple case where subtasks are independent of each other it will return the same list as it received.
Benchmark
rpc RunBenchmark (RunBenchmarkRequest) returns (RunBenchmarkReply) {}
message RunBenchmarkRequest {
}
message RunBenchmarkReply {
float score = 1;
}
- Takes no arguments.
- Returns a score which indicates how efficient the machine is for this type of task.
- It shouldn't take much time (preferably less than a minute for medium-range machines).
HasPendingSubtasks
rpc HasPendingSubtasks (HasPendingSubtasksRequest) returns (HasPendingSubtasksReply) {}
message HasPendingSubtasksRequest {
string task_id = 1;
}
message HasPendingSubtasksReply {
bool has_pending_subtasks = 1;
}
- Takes one argument
task_id
. - Returns a boolean indicating whether there are any more pending subtasks waiting for computation at given moment.
- In case when it returns
true
, the nextNextSubtask
call should return successfully (although it can still return an empty message).
AbortTask
rpc AbortTask (AbortTaskRequest) returns (AbortTaskReply) {}
message AbortTaskRequest {
string task_id = 1;
}
message AbortTaskReply {
}
- Takes one argument:
task_id
. - Will be called when the task is aborted by the user or timed out. Should stop all running subtask verifications for this task and perform any other necessary cleanup.
AbortSubtask
rpc AbortSubtask (AbortSubtaskRequest) returns (AbortSubtaskReply) {}
message AbortSubtaskRequest {
string task_id = 1;
string subtask_id = 2;
}
message AbortSubtaskReply {
}
- Takes two arguments:
task_id
andsubtask_id
. - Will be called when the subtask is aborted by the user or timed out. Should stop verification of the subtask (if it's running) and perform any other necessary cleanup.
Shutdown
rpc Shutdown (ShutdownRequest) returns (ShutdownReply) {}
mmessage ShutdownRequest {
}
message ShutdownReply {
}
- takes no arguments
- should gracefully terminate the service
When the last subtask is successfully verified on the requestor's side, the work_dir/task_id/constants.RESULTS
directory should contain all result files and nothing else.
Provider API
service ProviderApp {
rpc Compute (ComputeRequest) returns (ComputeReply) {}
rpc RunBenchmark (RunBenchmarkRequest) returns (RunBenchmarkReply) {}
rpc Shutdown (ShutdownRequest) returns (ShutdownReply) {}
}
Provider app should implement a short-lived RPC service that implements the ProviderApp
interface from the proto files. Short-lived means that there will be only one request issued per service instance, i.e. the service should shut down automatically after handling the first and only request.
RPC commands
Compute
rpc Compute (ComputeRequest) returns (ComputeReply) {}
message ComputeRequest {
string task_id = 1;
string subtask_id = 2;
string subtask_params_json = 3;
}
message ComputeReply {
string output_filepath = 1;
}
Compute
- Gets a single working directory task_work_dir
to operate on.
- Different subtasks of the same task will have the same task_work_dir
.
- Takes task_id
, subtask_id
, subtask_params_json
as arguments.
- Can assume the {task_work_dir}/{constants.SUBTASK_INPUTS_DIR}
directory exists.
- Can assume that under {task_work_dir}/{constants.SUBTASK_INPUTS_DIR}
are the resources specified in the corresponding
Benchmark
rpc RunBenchmark (RunBenchmarkRequest) returns (RunBenchmarkReply) {}
message RunBenchmarkRequest {
}
message RunBenchmarkReply {
float score = 1;
}
- Takes no arguments.
- Returns a score which indicates how efficient the machine is for this type of task.
- It shouldn't take much time (preferably less than a minute for medium-range machines).
Shutdown
rpc Shutdown (ShutdownRequest) returns (ShutdownReply) {}
mmessage ShutdownRequest {
}
message ShutdownReply {
}
Takes no arguments.
- Should gracefully terminate the service.
- Can be called in case the provider wants to interrupt task computation or benchmark.
## Environments
Both provider and requestor apps run on top of Golem's execution environments. The environment for requestor is specified in the application definition and cannot vary. The provider environment is specified by the return value of CreateTask
call. A single application could use a different environments for different types of tasks, it could also use different environment for requestor and provider. Environments have their unique IDs and prerequisites formats. Prerequisites are additional requirements for the environment to run the app (e.g. Docker environment prerequisites specify image). Environment IDs and prerequisites formats are listed in envs.proto file.
Currently, the following environments are supported:
- docker_cpu
- standard Docker environment
Prerequisites format:
json
{
"image": "...",
"tag": "..."
}
- docker_gpu
- GPU-enabled Docker environment, Linux only
Prerequisites format: same as docker_cpu
tutorialapp in depth
Introduction
This tutorial will guide you through the process of creating a sample Golem application. We will work with the Golem-Task-Api
library to quickly bootstrap and test our app.
Note: the
Golem-Task-Api
helper library is currently written in Python (3.6) and serves as a foundation for apps built in that language. The Rust and static binary versions of that library are currently on the roadmap.
The Tutorial-App
is a simple Proof of Work application. Golem tasks are initialized with a PoW difficulty parameter, set by the application user (requestor). After publishing the task in the network, Golem finds providers that are willing to participate in the computation. Each provider then computes a PoW with the set input difficulty and for different chunks of input data. The results will be sent back to the requestor and verified. For a more detailed description of the task's lifecycle in the network, please refer to this section.
The github
repository for the Tutorial-App
can be found here.
Scope of the tutorial
The guide for creating Tutorial-App
will cover the following aspects of Task API app development:
- Bootstrapping the application with the
Golem-Task-Api
library. - Implementing requestor-side logic:
- creating a new task
- splitting a task into parts
- creating subtasks for each of the parts
- verifying subtask computation results
- Implementing provider-side logic:
- computing subtasks
- benchmarking the application
- Bundling the application inside a Docker image.
- Testing.
- Publishing the application.
Preparation
The Task API library provides an out-of-the-box gRPC server compliant with the API specification. The messages used for the API calls are specified in the Protocol Buffers format. The Task API library hides these details from developers and provides a convenient way to implement the application's logic.
The protocol and message definition files can be found here.
Currently, the Task API apps are required to be built as Docker images. This section provides more details on the subject.
Development tools
To develop the tutorial app you will need:
- Python 3.6
- Docker
Golem docs will guide you through installing these tools for your operating system.
Prerequisites
This tutorial assumes that you possess the knowledge of:
- Python 3.6+ language and features
asyncio
asynchronous programming module- basic Docker usage
Step by step walkthrough
Structuring the project
Tutorial-App
├── image/
│ └── tutorial_app/
└── tests/
On your disk, create the following directory structure:
image
is the main project directory, containing the application code and the Docker image definitionimage/tutorial_app
is the root directory for the app's Python moduletests
is the place for thepytest
test suite
Bootstrapping the application
Tutorial-App
└── image/
└── tutorial_app/
├── tutorial_app/
├── setup.py
└── bash.txt
Let's switch to Tutorial-App/image/tutorial_app
as our main working directory. To define the Python package, create the following:
tutorial_app
will contain the Python module source codesetup.py
is thesetuptools
project definition filerequirements.txt
defines the project's requirements used bypip
setup.py
def parse_requirements(file_name: str = 'requirements.txt') -> List[str]:
file_path = Path(__file__).parent / file_name
with open(file_path, 'r') as requirements_file:
return [
line for line in requirements_file
if line and not line.strip().startswith(('-', '#'))
]
This file is composed of 2 parts:
- parsing the
requirements.txt
file - calling the
setuptools.setup
function
The parse_requirements
function converts the contents of a pip
requirements file to a format understood by the setup
function. This eliminates a need to double the work by specifying the requirements manually. parse_requirements
is implemented as follows
setup(
name='Tutorial-App',
version=VERSION, # defined in constants.py
packages=['tutorial_app'],
python_requires='>=3.6',
install_requires=parse_requirements(),
)
Now, to call the setup
function:
requirements.txt
--extra-index-url https://builds.golem.network
# peewee ORM for database-backed persistence
peewee==3.11.2
golem_task_api==0.24.1
dataclasses==0.6; python_version < '3.7'
async_generator; python_version < '3.7'
Our requirements file will use an extra PIP package repository to fetch the golem_task_api
package.
Note that the project also requires two features backported from Python 3.7 - dataclasses
and async_generator
.
tutorial_app/
Tutorial-App
└── image/
└── tutorial_app/
└── tutorial_app/
├── __init__.py
└── entrypoint.py
Now, let's define our Python module.
__init__.py
is an empty module init fileentrypoint.py
will start the application server
entrypoint.py
The code in entrypoint.py
source file is responsible for starting the gRPC server and executing requestor or provider logic. All server initialization code and message handling is done by the Golem-Task-Api
library (here referred to by api
).
executing server
The role that the server will be executed in is determined by command line arguments:
async def main():
await api.entrypoint(
work_dir=Path(f'/{api.constants.WORK_DIR}'),
argv=sys.argv[1:],
requestor_handler=RequestorHandler(),
provider_handler=ProviderHandler(),
)
python entrypoint.py requestor [port]
starts the requestor app server on the specified portpython entrypoint.py provider [port]
analogically, starts the provider app server on the specified port
The initialization logic itself is defined in the main
function
work_dir
is the application's working directory, as seen from inside the Docker container. A real file system location is mounted on that directoryargv
is a list / tuple of command line argumentsrequestor_handler
is an instance of requestor's app logic handlerprovider_handler
is an instance of provider's app logic handler
entrypoint
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
The entrypoint is run directly via the __main__
execution entry point. The following code initializes the asyncio
event loop and executes the main
function within it:
The actual logic for RequestorHandler
and ProviderHandler
will be defined elsewhere as these classes are here to satisfy the implementation of the interfaces. By writing our code this way, we isolate the implementation from the bootstrapping code.
RequestorHandler
The RequestorHandler
is derived from the Task API's RequestorAppHandler
and defined as follows:
class RequestorHandler(api.RequestorAppHandler):
async def create_task(
self,
task_work_dir: RequestorTaskDir,
max_subtasks_count: int,
task_params: dict,
) -> Task:
return await create_task(task_work_dir, max_subtasks_count, task_params)
async def next_subtask(
self,
task_work_dir: RequestorTaskDir,
subtask_id: str,
opaque_node_id: str,
) -> Optional[Subtask]:
return await next_subtask(task_work_dir, subtask_id)
async def verify(
self,
task_work_dir: RequestorTaskDir,
subtask_id: str,
) -> Tuple[VerifyResult, Optional[str]]:
return await verify_subtask(task_work_dir, subtask_id)
async def discard_subtasks(
self,
task_work_dir: RequestorTaskDir,
subtask_ids: List[str],
) -> List[str]:
return await discard_subtasks(task_work_dir, subtask_ids)
async def has_pending_subtasks(
self,
task_work_dir: RequestorTaskDir,
) -> bool:
return await has_pending_subtasks(task_work_dir)
async def run_benchmark(
self,
task_work_dir: RequestorTaskDir,
) -> float:
return await run_benchmark()
async def abort_task(
self,
task_work_dir: RequestorTaskDir,
) -> None:
return await abort_task(task_work_dir)
async def abort_subtask(
self,
task_work_dir: RequestorTaskDir,
subtask_id: str
) -> None:
return await abort_subtask(task_work_dir, subtask_id)
ProviderHandler
Next, the ProviderHandler
(Task API's ProviderAppHandler
):
class ProviderHandler(api.ProviderAppHandler):
async def compute(
self,
task_work_dir: ProviderTaskDir,
subtask_id: str,
subtask_params: dict,
) -> Path:
return await compute_subtask(task_work_dir, subtask_id, subtask_params)
async def run_benchmark(
self,
task_work_dir: Path,
) -> float:
return await run_benchmark()
We will go back to actual command implementations in the next sections. For now, let's focus on the core logic of the application - Proof of Work.
Implementation
According to Wikipedia: A Proof-of-Work (PoW) system (or protocol, or function) is a consensus mechanism. It allows to deter denial of service attacks and other service abuses such as spam on a network by requiring some work from the service requester, usually meaning processing time by a computer.
The PoW used in the Tutorial-App
is literal - providers execute a computationally expensive function, whose result can be easily verified by the requestor.
proof_of_work.py
Tutorial-App
└── image/
└── tutorial_app/
└── tutorial_app/
├── __init__.py
├── entrypoint.py
└── proof_of_work.py
To implement a PoW system we need at least 2 functions - compute
and verify
. With PoW computation, we try to find a digest that satisfies a certain difficulty threshold.
compute
function
_MAX_NONCE: int = 2 ** 256
def compute(
input_data: str,
difficulty: int,
) -> Tuple[str, int]:
target = 2 ** (256 - difficulty)
for nonce in range(_MAX_NONCE):
if api.threading.Executor.is_shutting_down():
raise RuntimeError("Interrupted")
hash_result = _sha256(input_data + str(nonce))
if int(hash_result, 16) < target:
return hash_result, nonce
raise RuntimeError("Solution not found")
In order not to block the event loop thread, we're going to execute compute
in a dedicated thread. Executor.is_shutting_down
will signal whether we should stop the iteration, since the result is going to be discarded anyway.
verify
function
def verify(
input_data: str,
difficulty: int,
against_result: str,
against_nonce: int,
) -> None:
target = 2 ** (256 - difficulty)
result = _sha256(input_data + str(against_nonce))
if against_result != result:
raise ValueError(f"Invalid result hash: {against_result} != {result}")
if int(result, 16) >= target:
raise ValueError(f"Invalid result hash difficulty")
For verification purposes, a hash is computed from the same input_data
and compared with the received hash. Then, we perform a sanity check on the hash difficulty
benchmarking
def benchmark(
iterations: int = 2 ** 8,
) -> float:
started = time.time()
for nonce in range(iterations):
if api.threading.Executor.is_shutting_down():
raise RuntimeError("Interrupted")
hash_result = _sha256('benchmark' + str(nonce))
elapsed = time.time() - started
if elapsed:
return 1000. / elapsed
return 1000.
To satisfy the Task API interface, we need to provide a benchmarking function. It will measure the execution time of an arbitrary number of iterations. The result will be converted to a fixed score range.
def _sha256(input_data: str) -> str:
input_bytes = input_data.encode('utf-8')
return hashlib.sha256(input_bytes).hexdigest()
Here is the remaining _sha256
function, which converts the input data to bytes
and returns a hex-encoded digest of that data:
constants.py
Tutorial-App
└── image/
└── tutorial_app/
└── tutorial_app/
├── __init__.py
├── entrypoint.py
├── proof_of_work.py
└── constants.py
This file contains the Docker image name and the application version.
DOCKER_IMAGE = 'golemfactory/tutorialapp'
VERSION = '1.0.0'
task_manager.py
Tutorial-App
└── image/
└── tutorial_app/
└── tutorial_app/
├── __init__.py
├── entrypoint.py
├── proof_of_work.py
└── task_manager.py
The Task API library is equipped with an out-of-the-box task manager. It is based on the following concepts:
part
An outcome of splitting a task into separate units of work. There usually exists a constant number of parts.
subtask
A clone of a chosen task part that will be assigned to a computing node.
Each subtask is given a unique identifier in order to distinguish computation attempts of the same part, which may fail due to unexpected errors or simply time out.
A successful subtask computation concludes the computation of the corresponding part.
The included task manager will help you in the following areas:
- splitting the task into parts
- creating and managing subtasks bound to task parts
- updating the status of each subtask
- persisting the state to disk
The default TaskManager
class is built on the peewee
library and uses an SQLite database by default.
SubtaskStatus
class SubtaskStatus(enum.Enum):
WAITING = None
COMPUTING = 'computing'
VERIFYING = 'verifying'
SUCCESS = 'success'
FAILURE = 'failure'
ABORTED = 'aborted'
SubtaskStatus
is defined in golem_task_api.apputils.task
as
Extending the task part model
from golem_task_api.apputils.task import database
class Part(database.Part):
input_data = peewee.CharField(null=True)
difficulty = peewee.FloatField(null=True)
class TaskManager(database.DBTaskManager):
def __init__(self, work_dir: Path) -> None:
super().__init__(work_dir, part_model=Part)
For the purpose of this project, we're going to extend the task part model with additional fields.
We've added two fields to the Part
model:
input_data
, which will be the source data for our PoW compute functiondifficulty
, representing the PoW function difficulty threshold
Now we want the DBTaskManager
to use our Part
model. We can do that by calling super().__init__(work_dir, part_model=Part)
.
commands.py
Tutorial-App
└── image/
└── tutorial_app/
└── tutorial_app/
├── __init__.py
├── entrypoint.py
├── proof_of_work.py
├── task.py
└── commands.py
Helper function
def _read_zip_contents(path: Path) -> str:
with zipfile.ZipFile(path, 'r') as f:
input_file = f.namelist()[0]
with f.open(input_file) as zf:
return zf.read().decode('utf-8')
- open the archive
- assume there's a single file inside
- open the archived file
- read the contents as a string In the tutorial app, we're going to assume that each resource (be it subtask input data or subtask computation result) is a ZIP file containing a single file. To simplify read operations on files inside of those archives, we will define the following helper function:
Let's go through the commands, one by one.
1. create_task
async def create_task(
work_dir: RequestorTaskDir,
max_part_count: int,
task_params: dict,
) -> Task:
validate the 'difficulty' parameter
difficulty = int(task_params['difficulty'])
if difficulty < 0:
raise ValueError(f"difficulty={difficulty}")
check whether resources were provided
resources = task_params.get('resources')
if not resources:
raise ValueError(f"resources={resources}")
read the input resource file, provided by the user (requestor)
try:
task_input_file = work_dir.task_inputs_dir / resources[0]
input_data = task_input_file.read_text('utf-8')
except (IOError, StopIteration) as exc:
raise ValueError(f"Invalid resource file: {resources} ({exc})")
create the task within the task manager
task_manager = TaskManager(work_dir)
task_manager.create_task(max_part_count)
update the parts in the database with our input data
for num in range(max_part_count):
part = task_manager.get_part(num)
part.input_data = input_data + str(uuid.uuid4())
part.difficulty = difficulty + (difficulty % 2)
part.save()
return task's definition. We need to specify: - the execution environment ID. Here: Docker CPU-only compute - specify the Docker CPU environment prerequisites - a minimum memory requirement in MiB
return Task(
env_id=DOCKER_CPU_ENV_ID, # 'docker_cpu'
prerequisites=PREREQUISITES,
inf_requirements=Infrastructure(min_memory_mib=50.))
Returns a computation environment ID and prerequisites JSON, specifying the parameters needed to execute task computation.
Where PREREQUISITES
tell us which Docker image providers should pull and execute:
python
PREREQUISITES = {
"image": 'golemfactory/tutorialapp',
"tag": "1.0",
}
2. abort_task
async def abort_task(
work_dir: RequestorTaskDir,
) -> None:
task_manager = TaskManager(work_dir)
abort the task and currently running subtasks
task_manager.abort_task()
Will be called when the subtask is aborted by the user or timed out. Should stop verification of the subtask (if it's running) and perform any other necessary cleanup.
3. abort_subtask
async def abort_subtask(
work_dir: RequestorTaskDir,
subtask_id: str
) -> None:
task_manager = TaskManager(work_dir)
abort a single subtask by changing its status to 'ABORTED'; task manager will automatically handle the new status
task_manager.update_subtask_status(subtask_id, SubtaskStatus.ABORTED)
Will be called when the subtask is aborted by the user or timed out.
4. next_subtask
async def next_subtask(
work_dir: RequestorTaskDir,
subtask_id: str,
) -> Optional[api.structs.Subtask]:
task_manager = TaskManager(work_dir)
check whether we have any parts left for computation
part_num = task_manager.get_next_computable_part_num()
if part_num is None:
return None
get the part model from the database
part = task_manager.get_part(part_num)
write subtask input data file under a predefined directory
subtask_input_file = work_dir.subtask_inputs_dir / f'{subtask_id}.zip'
with zipfile.ZipFile(subtask_input_file, 'w') as zf:
zf.writestr(subtask_id, part.input_data)
resources = [subtask_input_file.name]
bind the subtask to the part number and mark it as started
task_manager.start_subtask(part_num, subtask_id)
create subtask's definition. We need to specify: - subtask parameters, which will be passed to providers as input computation parameters - a list of resources (file names) for providers to download and use for the computation. Resource transfers are handled automatically by Golem
return api.structs.Subtask(
params={
'difficulty': part.difficulty,
'resources': resources,
},
resources=resources,
)
Returns subtask_params_json which is the JSON string containing subtask specific parameters.
Also returns resources which is a list of names of files required for computing the subtask. Files with these names are required to be present in {task_id}/{constants.SUBTASK_INPUTS_DIR}
directory.
Can return an empty message meaning that the app refuses to assign a subtask to the provider node (for whatever reason).
5. verify_subtask
async def verify_subtask(
work_dir: RequestorTaskDir,
subtask_id: str,
) -> Tuple[VerifyResult, Optional[str]]:
subtask_outputs_dir = work_dir.subtask_outputs_dir(subtask_id)
read contents of the subtask input data file
output_data = _read_zip_contents(subtask_outputs_dir / f'{subtask_id}.zip')
parse the read data as PoW computation result and nonce
provider_result, provider_nonce = output_data.rsplit(' ', maxsplit=1)
provider_nonce = int(provider_nonce)
notify the task manager that the subtask is being verified
task_manager = TaskManager(work_dir)
task_manager.update_subtask_status(subtask_id, SubtaskStatus.VERIFYING)
try:
retrieve current part model from the database
part_num = task_manager.get_part_num(subtask_id)
part = task_manager.get_part(part_num)
execute the verification function
proof_of_work.verify(
part.input_data,
difficulty=part.difficulty,
against_result=provider_result,
against_nonce=provider_nonce)
except (AttributeError, ValueError) as err:
verification has failed; update the status in the task manager
task_manager.update_subtask_status(subtask_id, SubtaskStatus.FAILURE)
return VerifyResult.FAILURE, err.message
verification has succeeded copy results to the output directory, set by the requestor
shutil.copy(
subtask_outputs_dir / f'{subtask_id}.zip',
work_dir.task_outputs_dir / f'{subtask_id}.zip')
update the status in the task manager
task_manager.update_subtask_status(subtask_id, SubtaskStatus.SUCCESS)
return VerifyResult.SUCCESS, None
Called when computation results have been downloaded by Golem. For successfully verified subtasks it can also perform merging of the partial results into the final one.
6. discard_subtasks
async def discard_subtasks(
work_dir: RequestorTaskDir,
subtask_ids: List[str],
) -> List[str]:
task_manager = TaskManager(work_dir)
the PoW app simply aborts the subtasks in this case
for subtask_id in subtask_ids:
task_manager.update_subtask_status(subtask_id, SubtaskStatus.ABORTED)
return subtask_ids
Should discard results of given subtasks and any dependent subtasks.
7. has_pending_subtasks
async def has_pending_subtasks(
work_dir: RequestorTaskDir,
) -> bool:
task_manager = TaskManager(work_dir)
return task_manager.get_next_computable_part_num() is not None
Returns a boolean indicating whether there are any more pending subtasks waiting for computation at given moment.
8. run_benchmark
async def run_benchmark() -> float:
execute the benchmark function in the background and wait for the result
return await api.threading.Executor.run(proof_of_work.benchmark)
Returns a score which indicates how efficient the machine is for this type of task.
9. compute_subtask
async def compute_subtask(
work_dir: ProviderTaskDir,
subtask_id: str,
subtask_params: dict,
) -> Path:
validate subtask input parameters
resources = subtask_params['resources']
if not resources:
raise ValueError(f"resources={resources}")
difficulty = int(subtask_params['difficulty'])
if difficulty < 0:
raise ValueError(f"difficulty={difficulty}")
read the subtask input data from file, saved in a predefined directory
subtask_input_file = work_dir.subtask_inputs_dir / resources[0]
subtask_input = _read_zip_contents(subtask_input_file)
execute computation in background and wait for the result
hash_result, nonce = await api.threading.Executor.run(
proof_of_work.compute,
input_data=subtask_input,
difficulty=difficulty)
bundle computation output and save it in a predefined directory
subtask_output_file = work_dir / f'{subtask_id}.zip'
with zipfile.ZipFile(subtask_output_file, 'w') as zf:
zf.writestr(subtask_id, f'{hash_result} {nonce}')
return the name of our output file
return subtask_output_file.name
Executes the computation.
Bundling the application
Tutorial-App
└── image/
├── tutorial_app/
└── Dockerfile
In this section we're going to build a Docker image with our application. Please create an empty Dockerfile
in the Tutorial-App/image/
directory. Then, add the following lines:
FROM golemfactory/base:1.5
The image is going to be derived from a base Golem image.
RUN apt update
RUN apt install -y python3-pip
Install prerequisites.
COPY tutorial_app /golem/tutorial_app
Copy application code.
RUN python3 -m pip install --no-cache-dir --upgrade pip
RUN python3 -m pip install --no-cache-dir -r /golem/tutorial_app/requirements.txt
RUN python3 -m pip install --no-cache-dir /golem/tutorial_app
Install required packages and the app itself.
RUN apt remove -y python3-pip
RUN apt clean
Clean up the no longer needed packages.
WORKDIR /golem/work
ENTRYPOINT ["python3", "-m", "tutorial_app.entrypoint"]
Set up the working directory inside the image and the entrypoint.
In order to build the image, execute the following command in the tutorialapp
directory:
docker build image -t golemfactory/tutorialapp:1.0.0
That's it!
tutorialapp hands on
Preparare hands on
In this section we are going to run your application locally inside golem. Before we start make sure you have the prerequisites:
../
├── tutorialapp/
└── golem/
- Cloned
tutorialapp
from githubgit clone https://github.com/golemfactory/tutorialapp.git
- Working golem node from source
For this guide, the tutorialapp
and golem
repositories are in the same folder.
Build the docker image
$ docker image golemfactory/tutorialapp
REPOSITORY TAG IMAGE ID CREATED SIZE
golemfactory/tutorialapp 1.0.0 <random_id> <some time ago> <not so large>
If you have followed the in depth guide, you should have a build docker image.
To build this docker image, navigate to the tutorialapp
folder and run:
docker build image -t golemfactory/tutorialapp:1.0.0
Run the basic_integration
test
The easiest way to test your app is to run a basic_integration
test, provided in the golem source files.
In this example we will run the test on the tutorialapp
.
The required input files are available in the examples
on github. Put both source folders next to each other to copy/paste the commands below.
Preparation
Navigate to the golem source folder and enable your virtual env ( optional ), make sure golem is not running.
For windows users: please load your docker-machine env golem
, by copying the last line of the output without the first characters
pip install -r requirements.txt
python scripts/task_api_tests/basic_integration.py task-from-app-def ../tutorialapp/examples/app_descriptor.json ../tutorialapp/examples/app_parameters.json --resources ../tutorialapp/examples/input.txt --max-subtasks=1
Testing the app
Now you are ready to test the app! Run the following commands:
Run the app on a local golem setup
When the basic_integration
test passes your app is ready to be ran on 2 golem nodes:
Add the app descriptor JSON file to your requestor's data-dir
An app descriptor file is generated by the developer to be loaded into your golem.
For this tutorial you can use app_descriptor.json
from the tutorialapp/examples
folder.
The app descriptor file is needed by requesting nodes and should be placed under <golem_data_dir>/apps
directory.
golem_data_dir
can be found at the following locations:
macOS
~/Library/Application Support/golem/default/<mainnet|rinkeby>
Linux
~/.local/share/golem/default/<mainnet|rinkeby>
Windows
%LOCALAPPDATA%\golem\golem\default\<mainnet|rinkeby>
The first time you start golem the app is enabled by default, the second time you need to update the database to enable it.
To enable the app, wait for golemapp
to start and then run:
golemcli debug rpc apps.update 801964d531675a235c9ddcfda00075cb 1
Build the docker image on all nodes
Check if you have built the docker image on all participating nodes.
{
"golem": {
"app_id": "801964d531675a235c9ddcfda00075cb",
"name": "test task",
"resources": ["/absolute/path/to/tutorialapp/examples/input.txt"],
"max_price_per_hour": "1000000000000000000",
"max_subtasks": 4,
"task_timeout": 180,
"subtask_timeout": 60,
"output_directory": "/absolute/path/to/output/directory"
},
"app": {
"difficulty": "1",
"resources": [
"input.txt"
]
}
}
Create a task.json
file to request a task
Create a task.json
file with the following contents, make sure to update the paths to your setup:
golemapp --protocol_id=1337
Enter a private network ( optional, recommended to not have others pick up your task )
(optional) Enter a private network by starting all golem nodes with the protocol_id
option
golemapp --task-api-dev
Start golem with task-api-dev flag
To make sure your new app is whitelisted and enabled run both provider and requestor with the task-api-dev flag:
golemcli tasks create ./task.json
Create a task using the golemcli
Now you can can start the task using golemcli:
Publishing the application
Publish for requestor
In order to make your app available to others, you will need to:
Upload the app image to Docker Hub
This short tutorial will guide you through the process.
{
"name": "repository/image_name",
"description": "My app",
"version": "1.0.0",
"author": "Me <me@company.org>, Others <others@company.org>",
"license": "GPLv3",
"requestor_env": "docker_cpu",
"requestor_prereq": {
"image": "repository/image_name",
"tag": "1.0.0"
},
"market_strategy": "brass",
"max_benchmark_score": 10000.0
}
Create an app descriptor file and make it publicly available. The file has the following format:
The app descriptor file is needed by requesting nodes and should be placed under <golem_data_dir>/apps
directory.
golem_data_dir
can be found at the following locations:
macOS
~/Library/Application Support/golem/default/<mainnet|rinkeby>
Linux
~/.local/share/golem/default/<mainnet|rinkeby>
Windows
%LOCALAPPDATA%\golem\golem\default\<mainnet|rinkeby>
Enable the application
Before you can run any new application in golem you need to enable it for the requestor
To list currently installed apps:
- golemcli debug rpc apps.list
To enable an app run:
- golemcli debug rpc apps.update <app_id> <enabled_flag>
Whitelist your image repository within Golem.
As a requestor, you also have to whitelist the "requestor prerequisite" docker hub repository
In order to manage a repository whitelist use the following CLI commands:
golemcli debug rpc env.docker.repos.whitelist
To display all whitelisted repositories.
golemcli debug rpc env.docker.repos.whitelist.add <repository_name>
Whitelist <repository_name>
.
golemcli debug rpc env.docker.repos.whitelist.remove <repository_name>
Remove the <repository_name>
from your whitelist.
golemcli debug rpc env.docker.images.discovered
List all app images seen by your node on the network.
Publish for provider
By default only apps verified and uploaded by the Golem Factory to our docker repository are whitelisted. Before whitelisting any other repository, make sure you trust the developer. Any apps they upload in the future will compute on your machine, also when they lose control over their docker repository account.
In order to provide for other developers apps, you will need to whitelist the "provider prerequisite" docker hub repository.
Whitelist your image repository within Golem.
In order to manage a repository whitelist use the following CLI commands:
golemcli debug rpc env.docker.repos.whitelist
To display all whitelisted repositories.
golemcli debug rpc env.docker.repos.whitelist.add <repository_name>
Whitelist <repository_name>
.
golemcli debug rpc env.docker.repos.whitelist.remove <repository_name>
Remove the <repository_name>
from your whitelist.
golemcli debug rpc env.docker.images.discovered
List all app images seen by your node on the network.
Running a task on Task-Api
Task-API has been enabled on testnet since the 0.22.0 release. For this first release there is no GUI support yet, so this guide will use the CLI only. When you create a task with CLI it will not be supported from the GUI
Short summary:
- Create new task-api JSON file
- Run the file using the cli
Prepare a JSON
Here is an example of a new task-api JSON file.
{
"golem": {
"app_id": "6b39331ac73484ff596447cefdfba9e5",
"name": "",
"resources": ["/absolute/path/to/resources/file.blend"],
"max_price_per_hour": "1_000_000_000_000_000_000",
"max_subtasks": 1,
"task_timeout": 600,
"subtask_timeout": 590,
"output_directory": "/absolute/path/to/output/"
},
"app": {
"resources": ["file.blend"],
"resolution": [320, 240],
"frames": "1",
"format": "PNG",
"compositing": "False"
}
}
golem
...
"golem": {
"app_id": "6b39331ac73484ff596447cefdfba9e5",
"name": "",
"resources": ["/absolute/path/to/resources/file.blend"],
"max_price_per_hour": "1_000_000_000_000_000_000",
"max_subtasks": 1,
"task_timeout": 600,
"subtask_timeout": 590,
"output_directory": "/absolute/path/to/output/"
},
...
The golem block of the JSON is meant for the input Golem needs, these are the same for all apps
golem.app_id
App id is the unique identifier of the app including its version.
You can get the build in app_id's from the logs when starting golem
6b39331ac73484ff596447cefdfba9e5
is golemfactoryapps/blenderapp:0.7.3
the only available option at the time.
golem.name
Name of the task in the GUI, not related to task-api. Allowed to be empty.
golem.resources
List of absolute paths to the files required for running this task
golem.max_price_per_hour
Max price to pay for the computation per hour, always passed as string ( in ""). The golem token has 18 digits, so for 1 GNT add 18 zero's.
golem.max_subtasks
Amount of subtasks to split the task into.
golem.task_timeout
Task timeout in seconds, 600 is 10 minutes.
golem.subtask_timeout
Subtask timeout in seconds, 600 is 10 minutes.
app
...
"app": {
"resources": ["file.blend"],
"resolution": [320, 240],
"frames": "1",
"format": "PNG",
"compositing": "False"
}
...
The app block contains app specific input parameters, these are different per app.
app.resources
A relative list of the resources, currently only one level.
app.resolution
Resolution of the blender render in pixels
app.frames
Frames to select during the blender render
app.format
Output format for the blender render ( PNG, JPEG or EXR )
app.compositing
Use compositing for the blender render?
Run a task-api task
To run a task-api task you use the same command as the old ones.
golemcli tasks create ./task_api_json_file.json
Then you can use golemcli tasks show
to list the task
We also implemented golemcli tasks abort
, .. delete
and .. restart_subtask
Other commands are not supported yet, they will be added soon
To help debug the task-api computation there are extra logs stored in your logs/app_name/
folder.
Please provide the generated logs next to the regular logs when creating issues.