5581f94aa890983015080e224cc7130caf4b758e
[gc-dialer] / hand_tests / threading.py
1 #!/usr/bin/env python
2
3 from __future__ import with_statement
4 from __future__ import division
5
6 import time
7
8 import sys
9 sys.path.insert(0,"./src")
10 from util import qt_compat
11
12
13 class QThread44(qt_compat.QtCore.QThread):
14         """
15         This is to imitate QThread in Qt 4.4+ for when running on older version
16         See http://labs.trolltech.com/blogs/2010/06/17/youre-doing-it-wrong
17         (On Lucid I have Qt 4.7 and this is still an issue)
18         """
19
20         def __init__(self, parent = None):
21                 qt_compat.QtCore.QThread.__init__(self, parent)
22
23         def run(self):
24                 self.exec_()
25
26
27 class Producer(qt_compat.QtCore.QObject):
28
29         data = qt_compat.Signal(int)
30         done = qt_compat.Signal()
31
32         def __init__(self):
33                 qt_compat.QtCore.QObject.__init__(self)
34
35         @qt_compat.Slot()
36         def process(self):
37                 print "Starting producer"
38                 for i in xrange(10):
39                         self.data.emit(i)
40                         time.sleep(0.1)
41                 self.done.emit()
42
43
44 class Consumer(qt_compat.QtCore.QObject):
45
46         def __init__(self):
47                 qt_compat.QtCore.QObject.__init__(self)
48
49         @qt_compat.Slot()
50         def process(self):
51                 print "Starting consumer"
52
53         @qt_compat.Slot()
54         def print_done(self):
55                 print "Done"
56
57         @qt_compat.Slot(int)
58         def print_data(self, i):
59                 print i
60
61
62 if __name__ == "__main__":
63         app = qt_compat.QtCore.QCoreApplication([])
64
65         producerThread = QThread44()
66         producer = Producer()
67         producer.moveToThread(producerThread)
68         producerThread.started.connect(producer.process)
69
70         consumerThread = QThread44()
71         consumer = Consumer()
72         consumer.moveToThread(consumerThread)
73         consumerThread.started.connect(consumer.process)
74
75         producer.data.connect(consumer.print_data)
76         producer.done.connect(consumer.print_done)
77
78         @qt_compat.Slot()
79         def producer_done():
80                 print "Shutting down"
81                 producerThread.quit()
82                 consumerThread.quit()
83                 print "Done"
84         producer.done.connect(producer_done)
85
86         count = [0]
87
88         @qt_compat.Slot()
89         def thread_done():
90                 print "Thread done"
91                 count[0] += 1
92                 if count[0] == 2:
93                         print "Quitting"
94                         app.exit(0)
95                 print "Done"
96         producerThread.finished.connect(thread_done)
97         consumerThread.finished.connect(thread_done)
98
99         producerThread.start()
100         consumerThread.start()
101         print "Status", app.exec_()