A while back I wrote some (now deprecated) code that allowed you to easily test your Python App Engine applications and their interaction with the App Engine APIs. When we got acquired by Google, I started working with some of the awesome App Engine engineers on making that code part of the official App Engine codebase.

We launched that, but one of the APIs that was noticeably lacking helper-methods was the Task Queue. I added one of the old methods (get_filtered_tasks()) but a bit later I noticed a pretty serious bug where timezones weren’t handled properly.

So that code hid quietly in the App Engine code base, undocumented and technically broken. Sorry :(

I wrote some extra code to fix it, but that didn’t get merged into the main repository for quite a while.

But now it’s here.

If you’re curious about the change, feel free to check out the diff (scroll to the bottom). The method is still called get_filtered_tasks() and takes several arguments as “filters” for various properties of tasks — and it should properly handle timezones when you set the eta or countdown properties when creating tasks.

Sorry for the delay. And enjoy.

PS: Since this isn’t documented on the official App Engine page, the API may change. I don’t intend to change it, but I can’t promise it won’t. Apologies in advance if you use this helper method and have it break out from under you.

Also, check out the official code.

Some example code:

import unittest  
from google.appengine.api import taskqueue  
from google.appengine.ext import testbed


class TaskQueueTestCase(unittest.TestCase):

  def setUp(self):
    self.testbed = testbed.Testbed()
    self.testbed.activate()
    self.testbed.init_taskqueue_stub()
    self.task_queue_stub = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)

  def tearDown(self):
    self.testbed.deactivate()

  def testTaskAdded(self):
    taskqueue.add(url='/path/to/task')

    tasks = self.taskqueue_stub.get_filtered_tasks(url='/path/to/task')
    self.assertEqual(1, len(tasks))
    self.assertEqual('/path/to/task', tasks[0].url)

unittest.main()