Simple offline task queues.
Project description
Simple offline task queues. For Python.
“See you later, alligator.”
Requirements
Python 2.6+ or Python 3.3+
(Optional) redis for the Redis backend
FUTURE (Optional) beanstalkc for the Beanstalk backend
WHY?!!1!
Because I have NIH-syndrome.
Or because I longed for something simple (~375 loc).
Or because I wanted something with tests (90%+ coverage) & docs.
Or because I wanted pluggable backends.
Or because testing some other queuing system was a pain.
Or because I’m an idiot.
Basic Usage
This example uses Django, but there’s nothing Django-specific about Alligator. You can use it with any Python code.
from alligator import Gator
from django.contrib.auth.models import User
from django.shortcuts import send_email
# Make a Gator instance.
# Under most circumstances, you would configure this in one place &
# import that instance instead.
gator = Gator('redis://localhost:6379/0')
# The task itself.
# Nothing special, just a plain *undecorated* function.
def follow_email(followee_username, follower_username):
followee = User.objects.get(username=followee_username)
follower = User.objects.get(username=follower_username)
subject = 'You got followed!'
message = 'Hey {}, you just got followed by {}! Whoohoo!'.format(
followee.username,
follower.username
)
send_email(subject, message, 'server@example.com', [followee.email])
# An simple, previously expensive view.
@login_required
def follow(request, username):
# You'd import the task function above.
if request.method == 'POST':
# Schedule the task.
# Use args & kwargs as normal.
gator.task(follow_email, request.user.username, username)
return redirect('...')
Running Tasks
Rather than trying to do autodiscovery, fanout, etc., you control how your workers are configured & what they consumer. Create a new executable file (bin script, management command, whatever) & drop in the following code.
from alligator import Gator, Worker
# Bonus points if you import that one pre-configured ``Gator`` instead.
gator = Gator('redis://localhost:6379/0')
# Consume & handle all tasks.
worker = Worker(gator)
worker.run_forever()
License
New BSD
Complex Usage (For Future Docs)
# We're re-using the above imports/setup.
def log_func(job):
# A simple example of logging a failed task.
if job.result != SUCCESS:
logging.error("Job {} failed.".format(job.id))
# A context manager for supplying options
with gator.options(retries=3, async=settings.ASYNC_TASKS, on_error=log_func) as task:
feeds_job = task(sketchy_fetch_feeds, timeout=30)
# Future wishlist items...
# Dependent tasks, will only run if the listed tasks succeed.
with gator.options(depends_on=[feeds_job]) as task:
task(rebuild_cache)
# Delayed tasks (run in an hour).
with gator.options(run_after=60 * 60) as task:
task(this_can_wait)
Running Tests
$ virtualenv env2 $ . env2/bin/activate $ pip install -r requirements.txt $ python setup.py develop $ py.test -s -v --cov=alligator --cov-report=html tests
TODO
Scheduled tasks
Dependent tasks
Cancellable tasks
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for alligator-0.5.1-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 47aaca57159f8f640cbde2e490da679f64b25abc711a350147ba74e8f7cf4a57 |
|
MD5 | 2a3e3ee314c8f0780ac6ca1165d7d8d8 |
|
BLAKE2b-256 | 72f52668e211dfc930a0e3bae2155c75d9958dbab6e5c5112d648215014d98d9 |