e27d560badf5ccac092847de9a43ce4a98ee06ef
[gc-dialer] / hand_tests / threading.py
1 #!/usr/bin/env python
2
3 from __future__ import with_statement
4 from __future__ import division
5
6 import time
7
8 import sys
9 sys.path.insert(0,"./src")
10 from util import qt_compat
11 from util import qore_utils
12
13
14 class QThread44(qt_compat.QtCore.QThread):
15         """
16         This is to imitate QThread in Qt 4.4+ for when running on older version
17         See http://labs.trolltech.com/blogs/2010/06/17/youre-doing-it-wrong
18         (On Lucid I have Qt 4.7 and this is still an issue)
19         """
20
21         def __init__(self, parent = None):
22                 qt_compat.QtCore.QThread.__init__(self, parent)
23
24         def run(self):
25                 self.exec_()
26
27
28 class Producer(qt_compat.QtCore.QObject):
29
30         data = qt_compat.Signal(int)
31         done = qt_compat.Signal()
32
33         def __init__(self):
34                 qt_compat.QtCore.QObject.__init__(self)
35
36         @qt_compat.Slot()
37         def process(self):
38                 print "Starting producer"
39                 for i in xrange(10):
40                         self.data.emit(i)
41                         time.sleep(0.1)
42                 self.done.emit()
43
44
45 class Consumer(qt_compat.QtCore.QObject):
46
47         def __init__(self):
48                 qt_compat.QtCore.QObject.__init__(self)
49
50         @qt_compat.Slot()
51         def process(self):
52                 print "Starting consumer"
53
54         @qt_compat.Slot()
55         def print_done(self):
56                 print "Done"
57
58         @qt_compat.Slot(int)
59         def print_data(self, i):
60                 print i
61
62
63 def run_producer_consumer():
64         app = qt_compat.QtCore.QCoreApplication([])
65
66         producerThread = qore_utils.QThread44()
67         producer = Producer()
68         producer.moveToThread(producerThread)
69         producerThread.started.connect(producer.process)
70
71         consumerThread = qore_utils.QThread44()
72         consumer = Consumer()
73         consumer.moveToThread(consumerThread)
74         consumerThread.started.connect(consumer.process)
75
76         producer.data.connect(consumer.print_data)
77         producer.done.connect(consumer.print_done)
78
79         @qt_compat.Slot()
80         def producer_done():
81                 print "Shutting down"
82                 producerThread.quit()
83                 consumerThread.quit()
84                 print "Done"
85         producer.done.connect(producer_done)
86
87         count = [0]
88
89         @qt_compat.Slot()
90         def thread_done():
91                 print "Thread done"
92                 count[0] += 1
93                 if count[0] == 2:
94                         print "Quitting"
95                         app.exit(0)
96                 print "Done"
97         producerThread.finished.connect(thread_done)
98         consumerThread.finished.connect(thread_done)
99
100         producerThread.start()
101         consumerThread.start()
102         print "Status %s" % app.exec_()
103
104
105 def run_task():
106         app = qt_compat.QtCore.QCoreApplication([])
107
108         bright = qore_utils.FutureThread()
109         def on_failure(*args):
110                 print "Failure", args
111
112         def on_success(*args):
113                 print "Success", args
114
115         def task(*args):
116                 print "Task", args
117
118         timer = qt_compat.QtCore.QTimer()
119         timeouts = [0]
120         @qt_compat.Slot()
121         def on_timeout():
122                 timeouts[0] += 1
123                 print timeouts[0]
124                 bright.add_task(task, (timeouts[0], ), {}, on_success, on_failure)
125                 if timeouts[0] == 5:
126                         timer.stop()
127                         bright.stop()
128                         app.exit(0)
129         timer.timeout.connect(on_timeout)
130         timer.start(10)
131         bright.start()
132
133         print "Status %s" % app.exec_()
134
135
136 if __name__ == "__main__":
137         #run_producer_consumer()
138         run_task()