from chrysacase import ChrysalideTestCase from pychrysalide.glibext import GenericWork, WorkQueue from threading import Lock class TestWorks(ChrysalideTestCase): """TestCase for glibext.*Work*""" def testBasicWorkQueueBehaviour(self): """Check the default basic behaviour of a work queue.""" queue = WorkQueue() ret = queue.is_empty(123) self.assertTrue(ret) def testWorkScheduling(self): """Check scheduled works results.""" class SchedulableWork(GenericWork): def __init__(self, lck, val): super(SchedulableWork, self).__init__() self._lck = lck self._val = val def _run(self): self._lck.acquire() self._val['integer'] += 1 self._lck.release() lock = Lock() value = { 'integer': 0 } queue = WorkQueue() gid = queue.define_group(4) count = 31 for i in range(count): work = SchedulableWork(lock, value) queue.schedule(work, gid) while not(queue.wait_for_completion(gid)): pass self.assertEqual(value['integer'], count)