dummy-link

SimpleTasks

Julia package mirror.

Readme

SimpleTasks Build Status

Why do you want this?

You need to parallelize simple processes in the cloud using Julia.

How does it work?

You provide a QueueService (AWS SQS implementation provided) and a BucketService (AWS S3 and GCS implementation provided). Tasks are scheduled into your QueueService and data is pulled from your BucketService for processing and results are pushed back to your BucketService

See Advanced for additional customization options.

Sounds great! Set me up! (Installation)

See REQUIRE file for required packages. You may need to run sudo apt-get install build-essential cmake to build certain required projects.

Julia's package manager should handle the rest Pkg.add("SimpleTasks").

To use BucketService with AWS, you must have the latest AWS CLI version

# Check current version -- too old :(
$aws --version
aws-cli/1.2.9 Python/3.4.3 Linux/3.13.0-74-generic

# Update aws cli
$sudo pip install awscli 
...

# Check new version -- Great! :D
$aws --version
aws-cli/1.10.56 Python/2.7.6 Linux/3.13.0-74-generic botocore/1.4.46

Let's do this!

Tutorial

  1. Create one or many DaemonTasks See nooptask.jl for inspiration.

Relevant parts are:

  • Custom Task Type

    type NoOpTaskDetails <: DaemonTaskDetails
      basic_info::BasicTask.Info
      payload_info::AbstractString
    end
    

    This type must be JSON serializable. basic_info is described below payload_info does not need to be of AbstractString. See Custom Payload for more details.

  • BasicTask.Info already implemented as:

    type Info
      id::Int64                           # Numerical identifier for specific task
      name::AbstractString                # String identifer for the type of task
      base_directory::AbstractString       # Base directory for fetching input data
      inputs::Array{AbstractString, 1}    # Names of files we are fetching
    end
    
  • Const unique identifier saved in the task to let SimpleTasks to know how to run it.

    const NAME = "NO_OP"
    
  • DaemonTask.prepare - function for data prep / download data

  • DaemonTask.execute - function for task execution

  • DaemonTask.finalize - function for cleanup / upload data

  1. Create a Run File to register your tasks See runawsdaemon.jl for inspiration.

Relevant parts are:

  # Load AWS credentials via AWS library (either through environment
  # variables or ~/.awssecret or query permissions server)
  env = AWS.AWSEnv()

  # Create a queue to read tasks from
  task_queue = AWSQueueService(env, task_queue_name)

  # Create a queue to write errors to
  error_queue = AWSQueueService(env, error_queue_name)

  # Create a datasource to read and write data from
  bucket = CLIBucketService(AWSCLIProvider.Details(env), bucket_name)
  cache = FileSystemCacheService(cache_directory)
  datasource = BucketCacheDatasourceService(bucket, cache)

  # create a daemon to run tasks
  daemon = DaemonService(task_queue, error_queue, bucket, datasource,
      poll_frequency_seconds)

  # Register the NOOP task
  register!(daemon, NoOpTask.NAME, NoOpTaskDetails)

  # Start the daemon
  Daemon.run(daemon)
  1. Schedule events See awsscheduler.jl for inspiration.

Relevant parts are:

   task = NoOpTaskDetails(basic_info, "NoOp Task for $task_indices")
   ...
   map((task) -> Queue.push_message(queue; message_body = JSON.json(task)), tasks)
  1. Run julia -e julia /home/ubuntu/.julia/v0.4/SimpleTasks/src/examples/runhybriddaemon.jl TASK_QUEUE_NAME ERROR_QUEUE_NAME BUCKET_NAME CACHE_DIRECTORY POLL_FREQUENCY_SECONDS

Generated Results

  • Example of generated task

    {
    "basic_info": {
      "id": 4,
      "name": "NO_OP",
      "base_directory": "datasets\/noop_dataset",
      "inputs": [
        "0_input\/4.dat",
        "0_input\/5.dat"
      ]
    },
    "payload_info": "NoOp Task for 4:5"
    }
    
  • Example of generated data:

    • Input
    • AWS S3 html s3://BUCKET_NAME/datasets/noop_dataset/0_input/4.dat s3://BUCKET_NAME/datasets/noop_dataset/0_input/5.dat
    • Local html /var/tmp/taskdaemon/datasets/noop_dataset/0_input/4.dat /var/tmp/taskdaemon/datasets/nooop_dataset/0_input/5.dat
    • Output
    • AWS S3 html s3://BUCKET_NAME/datasets/noop_dataset/0_output/4.dat s3://BUCKET_NAME/datasets/noop_dataset/0_output/5.dat
    • Local html /var/tmp/taskdaemon/datasets/noop_dataset/0_output/4.dat /var/tmp/taskdaemon/datasets/noop_dataset/0_output/5.dat

Advanced

Custom Payload

Use case: You have many inputs to your task that are not captured by a simple AbstractString

  1. Create a new type for your task specific inputs

    type ComplexPayload
    complexID::Int64
    thresholds::Array{Int64, 1}
    end
    
  2. Create a custom constructor for your custom payload type that accepts a JSON parsed dictionary

    function ComplexPayload.Info{String <: AbstractString}(dict::Dict{String, Any})
    # do your parsing from JSON parsed dictionary here
    ...
    return ComplexPayload.Info(...)
    end
    
  3. Set your task's payload_info to use that type

    type ComplexPayloadTask <: DaemonTaskDetails
      basic_info::BasicTask.Info
      payload_info::ComplexPayload.Info
    end
    
  4. Create your task constructor to automatically call the payload dictionary constructor

    ComplexPayloadTask{String <: AbstractString}(basic_info::BasicTask.Info,
    dict::Dict{String, Any}) = ComplexPayloadTask(basic_info, ComplexPayload.Info(dict))
    

What if I don't use AWS or GCS?

Extend queue.jl and/or bucket.jl and plug those into the daemon.

Datasource

Note that the bucket layout that corresponds to the BasicTask.Info

s3://BUCKET_NAME/datasets/dataset_name/task_folder/input.h5
# info is of type BasicTask.info
info.base_directory = "datasets/dataset_name"
info.inputs[1] = "task_folder/input.h5"

Warning

The provided FileSystemCache uses the filesystem. No guarantees are made to make this safe (for now)

Cloud

Google Cloud startup script:

#! /bin/bash
export AWS_ACCESS_KEY_ID=$(curl http://metadata.google.internal/computeMetadata/v1/project/attributes/aws-sqs-access-id -H "Metadata-Flavor: Google")
export AWS_SECRET_ACCESS_KEY=$(curl http://metadata.google.internal/computeMetadata/v1/project/attributes/aws-sqs-secret-access-key -H "Metadata-Flavor: Google")
export TASK_QUEUE=$(curl http://metadata.google.internal/computeMetadata/v1/instance/attributes/task-queue -H "Metadata-Flavor: Google")
export ERROR_QUEUE=$(curl http://metadata.google.internal/computeMetadata/v1/instance/attributes/error-queue -H "Metadata-Flavor: Google")
export BUCKET_NAME=$(curl http://metadata.google.internal/computeMetadata/v1/instance/attributes/bucket-name -H "Metadata-Flavor: Google")
export CACHE_DIRECTORY=$(curl http://metadata.google.internal/computeMetadata/v1/instance/attributes/cache-directory -H "Metadata-Flavor: Google")
export POLL_FREQUENCY_SECONDS=$(curl http://metadata.google.internal/computeMetadata/v1/instance/attributes/poll-frequency-seconds -H "Metadata-Flavor: Google")
sudo -u ubuntu -H sh -c "stdbuf -oL -eL julia /home/ubuntu/.julia/v0.4/SimpleTasks/src/examples/runhybriddaemon.jl $TASK_QUEUE $ERROR_QUEUE $BUCKET_NAME $CACHE_DIRECTORY $POLL_FREQUENCY_SECONDS | tee -a /home/ubuntu/daemon.out &"

Troubleshooting

My downloaded files are showing up as download: s3://xxxx to ./-

Your AWS CLI is TOO OLD! Update it!

Feature Wishlist

  • Dependency scheduling ? (retasking)

First Commit

09/10/2016

Last Touched

about 2 years ago

Commits

64 commits

Used By: