A very simple task queue rq

There a lot of tasks in computer that are not appropriate for working synchronizing, such as media transcoding.

For these tasks, the work will be submitted to a central controlled service, and then dispatched to many workers.

Message queue is commonly used in this case for communication during controller and workers, and sometimes even used to implemented task queue.

RQ is a very simple task queue with Redis, and is easy using, you even don't need to write specific code for workers.

The producer in RQ side enqueues the data and object to Redis queue, the object is serialized with Python's pickle lib. And the worker retries task from the Redis queue, and deserializes the job and fork one process to do the actual work.

There is a simple example to show the simpliciy for the logic.

You have one function to do the real job, such as:

# fib.py
def slow_fib(n):
    if n <= 1:
        return 1
    else:
        return slow_fib(n-1) + slow_fib(n-2)

And you create jobs and enqueue:

# run_example.py
import os
import time

from rq import Connection, Queue

from fib import slow_fib


def main():
    # Range of Fibonacci numbers to compute
    fib_range = range(20, 34)

    # Kick off the tasks asynchronously
    async_results = {}
    q = Queue()
    for x in fib_range:
        async_results[x] = q.enqueue(slow_fib, x)

    start_time = time.time()
    done = False
    while not done:
        os.system('clear')
        print('Asynchronously: (now = %.2f)' % (time.time() - start_time,))
        done = True
        for x in fib_range:
            result = async_results[x].return_value
            if result is None:
                done = False
                result = '(calculating)'
            print('fib(%d) = %s' % (x, result))
        print('')
        print('To start the actual in the background, run a worker:')
        print('    python examples/run_worker.py')
        time.sleep(0.2)

    print('Done')


if __name__ == '__main__':
    # Tell RQ what Redis connection to use
    with Connection():
        main()

On the producer side, you run like this:

python3 run_example.py
Asynchronously: (now = 8.04)
fib(20) = 10946
fib(21) = 17711
fib(22) = 28657
fib(23) = 46368
fib(24) = 75025
fib(25) = 121393
fib(26) = 196418
fib(27) = 317811
fib(28) = 514229
fib(29) = 832040
fib(30) = 1346269
fib(31) = 2178309
fib(32) = 3524578
fib(33) = 5702887

To start the actual in the background, run a worker:
    python examples/run_worker.py
Done

On the worker side, you only need this(but make sure this command executed at the same directory of fib.py):

rqworker
15:36:32 Worker rq:worker:bd9fbdd72217489288bcf6c47e499f9c: started, version 1.11.0
15:36:32 Subscribing to channel rq:pubsub:bd9fbdd72217489288bcf6c47e499f9c
15:36:32 *** Listening on default...
15:36:32 Cleaning registries for queue: default
15:36:32 default: fib.slow_fib(20) (0c5dc1dd-b8b8-4a23-9220-1f4f03781c53)
15:36:32 default: Job OK (0c5dc1dd-b8b8-4a23-9220-1f4f03781c53)
15:36:32 Result is kept for 500 seconds

But there are two things we need to pay attention, a) the actually function for the task need to be in a separate file(fib.py in this case), and b)rqworker need to be executed under the same source directory of producer(run_example.py in this case).

#rq #redis #taskq