1a666df42a59189ad76c8625caa5753c14565acc
[gc-dialer] / hand_tests / threading.py
1 #!/usr/bin/env python
2
3 from __future__ import with_statement
4 from __future__ import division
5
6 import time
7
8 import sys
9 sys.path.insert(0,"./src")
10 from util import qt_compat
11 from util import qore_utils
12
13
14 class QThread44(qt_compat.QtCore.QThread):
15         """
16         This is to imitate QThread in Qt 4.4+ for when running on older version
17         See http://labs.trolltech.com/blogs/2010/06/17/youre-doing-it-wrong
18         (On Lucid I have Qt 4.7 and this is still an issue)
19         """
20
21         def __init__(self, parent = None):
22                 qt_compat.QtCore.QThread.__init__(self, parent)
23
24         def run(self):
25                 self.exec_()
26
27
28 class Producer(qt_compat.QtCore.QObject):
29
30         data = qt_compat.Signal(int)
31         done = qt_compat.Signal()
32
33         def __init__(self):
34                 qt_compat.QtCore.QObject.__init__(self)
35
36         @qt_compat.Slot()
37         def process(self):
38                 print "Starting producer"
39                 for i in xrange(10):
40                         self.data.emit(i)
41                         time.sleep(0.1)
42                 self.done.emit()
43
44
45 class Consumer(qt_compat.QtCore.QObject):
46
47         def __init__(self):
48                 qt_compat.QtCore.QObject.__init__(self)
49
50         @qt_compat.Slot()
51         def process(self):
52                 print "Starting consumer"
53
54         @qt_compat.Slot()
55         def print_done(self):
56                 print "Done"
57
58         @qt_compat.Slot(int)
59         def print_data(self, i):
60                 print i
61
62
63 if __name__ == "__main__":
64         app = qt_compat.QtCore.QCoreApplication([])
65
66         producerThread = qore_utils.QThread44()
67         producer = Producer()
68         producer.moveToThread(producerThread)
69         producerThread.started.connect(producer.process)
70
71         consumerThread = qore_utils.QThread44()
72         consumer = Consumer()
73         consumer.moveToThread(consumerThread)
74         consumerThread.started.connect(consumer.process)
75
76         producer.data.connect(consumer.print_data)
77         producer.done.connect(consumer.print_done)
78
79         @qt_compat.Slot()
80         def producer_done():
81                 print "Shutting down"
82                 producerThread.quit()
83                 consumerThread.quit()
84                 print "Done"
85         producer.done.connect(producer_done)
86
87         count = [0]
88
89         @qt_compat.Slot()
90         def thread_done():
91                 print "Thread done"
92                 count[0] += 1
93                 if count[0] == 2:
94                         print "Quitting"
95                         app.exit(0)
96                 print "Done"
97         producerThread.finished.connect(thread_done)
98         consumerThread.finished.connect(thread_done)
99
100         producerThread.start()
101         consumerThread.start()
102         print "Status", app.exec_()