[FIX] unittests
[pywienerlinien] / scotty.py
1 #!/usr/bin/env python
2 # -*- coding: UTF-8 -*-
3
4 from BeautifulSoup import BeautifulSoup, NavigableString
5 from urllib2 import urlopen
6 from urllib import urlencode
7 import settings
8 from datetime import datetime, time
9 from textwrap import wrap
10 import argparse
11 import sys
12 import os.path
13
14 POSITION_TYPES = ('stop', 'address', 'poi')
15 TIMEFORMAT = '%H:%M'
16 DEBUGLOG = os.path.expanduser('~/gotoVienna.debug')
17
18 class ParserError(Exception):
19     
20     def __init__(self, msg='Parser error'):
21         self.message = msg
22
23 class PageType:
24     UNKNOWN, CORRECTION, RESULT = range(3)
25     
26
27 def search(origin_tuple, destination_tuple, dtime=None):
28     """ build route request
29     returns html result (as urllib response)
30     """
31     if not dtime:
32         dtime = datetime.now()
33     
34     origin, origin_type = origin_tuple
35     destination, destination_type = destination_tuple
36     if not origin_type in POSITION_TYPES or\
37         not destination_type in POSITION_TYPES:
38         raise ParserError('Invalid position type')
39         
40     post = settings.search_post
41     post['name_origin'] = origin
42     post['type_origin'] = origin_type
43     post['name_destination'] = destination
44     post['type_destination'] = destination_type
45     post['itdDateDayMonthYear'] = dtime.strftime('%d.%m.%Y')
46     post['itdTime'] = dtime.strftime('%H:%M')
47     params = urlencode(post)
48     url = '%s?%s' % (settings.action, params)
49     
50     try:
51         f = open(DEBUGLOG, 'a')
52         f.write(url + '\n')
53         f.close()
54     except:
55         print 'Unable to write to DEBUGLOG: %s' % DEBUGLOG
56     
57     return urlopen(url)
58
59
60 class sParser:
61     """ Parser for search response
62     """
63
64     def __init__(self, html):
65         self.soup = BeautifulSoup(html)
66     
67     def check_page(self):
68         if self.soup.find('form', {'id': 'form_efaresults'}):
69             return PageType.RESULT
70         
71         if self.soup.find('div', {'class':'form_error'}):
72             return PageType.CORRECTION
73         
74         return PageType.UNKNOWN
75     
76     def get_correction(self):
77         nlo = self.soup.find('select', {'id': 'nameList_origin'})
78         nld = self.soup.find('select', {'id': 'nameList_destination'})
79         
80         if not nlo and not nld:
81             raise ParserError('Unable to parse html')
82         
83         if nlo:
84             origin = map(lambda x: x.text, nlo.findAll('option'))
85         else:
86             origin = []
87         if nld:
88             destination = map(lambda x: x.text, nld.findAll('option'))
89         else:
90             destination = []
91         
92         return (origin, destination)
93     
94     def get_result(self):
95         return rParser(str(self.soup))
96         
97         
98         
99 class rParser:
100     """ Parser for routing results
101     """
102
103     def __init__(self, html):
104         self.soup = BeautifulSoup(html)
105         self._overview = None
106         self._details = None
107
108     @classmethod
109     def get_tdtext(cls, x, cl):
110             return x.find('td', {'class': cl}).text
111     
112     @classmethod
113     def get_change(cls, x):
114         y = rParser.get_tdtext(x, 'col_change')
115         if y:
116             return int(y)
117         else:
118             return 0
119
120     @classmethod
121     def get_price(cls, x):
122         y = rParser.get_tdtext(x, 'col_price')
123         if y.find(','):
124             return float(y.replace(',', '.'))
125         else:
126             return 0.0
127
128     @classmethod
129     def get_date(cls, x):
130         y = rParser.get_tdtext(x, 'col_date')
131         if y:
132             return datetime.strptime(y, '%d.%m.%Y').date()
133         else:
134             return None
135         
136     @classmethod
137     def get_time(cls, x):
138         y = rParser.get_tdtext(x, 'col_time')
139         if y:
140             if (y.find("-") > 0):
141                 return map(lambda z: time(*map(int, z.split(':'))), y.split('-'))
142             else:
143                 return map(lambda z: time(*map(int, z.split(':'))), wrap(y, 5))
144         else:
145             return []
146         
147     @classmethod
148     def get_duration(cls, x):
149         y = rParser.get_tdtext(x, 'col_duration')
150         if y:
151             return time(*map(int, y.split(":")))
152         else:
153             return None
154
155     def __iter__(self):
156         for detail in self.details():
157             yield detail
158
159     def _parse_details(self):
160         tours = self.soup.findAll('div', {'class': 'data_table tourdetail'})
161
162         trips = map(lambda x: map(lambda y: {
163                         'time': rParser.get_time(y),
164                         'station': map(lambda z: z[2:].strip(),
165                                        filter(lambda x: type(x) == NavigableString, y.find('td', {'class': 'col_station'}).contents)), # filter non NaviStrings
166                         'info': map(lambda x: x.strip(),
167                                     filter(lambda z: type(z) == NavigableString, y.find('td', {'class': 'col_info'}).contents)),
168                     }, x.find('tbody').findAll('tr')),
169                     tours) # all routes
170         return trips
171
172     @property
173     def details(self):
174         """returns list of trip details
175         [ [ { 'time': [datetime.time, datetime.time] if time else [],
176               'station': [u'start', u'end'] if station else [],
177               'info': [u'start station' if station else u'details for walking', u'end station' if station else u'walking duration']
178             }, ... # next trip step 
179           ], ... # next trip possibility
180         ]
181         """
182         if not self._details:
183             self._details = self._parse_details()
184
185         return self._details
186
187     def _parse_overview(self):
188
189         # get overview table
190         table = self.soup.find('table', {'id': 'tbl_fahrten'})
191
192         # check if there is an overview table
193         if table and table.findAll('tr'):
194             # get rows
195             rows = table.findAll('tr')[1:] # cut off headline
196             
197             overview = map(lambda x: {
198                                'date': rParser.get_date(x),
199                                'time': rParser.get_time(x),
200                                'duration': rParser.get_duration(x), # grab duration
201                                'change': rParser.get_change(x),
202                                'price': rParser.get_price(x),
203                            },
204                            rows)
205         else:
206             raise ParserError('Unable to parse overview')
207
208         return overview
209
210     @property
211     def overview(self):
212         """dict containing
213         date: datetime
214         time: [time, time]
215         duration: time
216         change: int
217         price: float
218         """
219         if not self._overview:
220             try:
221                 self._overview = self._parse_overview()
222             except AttributeError:
223                 f = open(DEBUGLOG, 'w')
224                 f.write(str(self.soup))
225                 f.close()
226
227         return self._overview
228
229 if __name__ == '__main__':
230     parser = argparse.ArgumentParser(description='Get public transport route for Vienna')
231     parser.add_argument('-o', metavar='name', type=str, help='origin', required=True)
232     parser.add_argument('-d', metavar='name', type=str, help='destination', required=True)
233     parser.add_argument('-ot', metavar='type', type=str, help='origin type: %s' % ' | '.join(POSITION_TYPES), default='stop', choices=POSITION_TYPES)
234     parser.add_argument('-dt', metavar='type', type=str, help='destination type: %s' % ' | '.join(POSITION_TYPES), default='stop', choices=POSITION_TYPES)
235
236     args = parser.parse_args()
237     html = search((args.o, args.ot), (args.d, args.dt)).read()
238     
239     parser = sParser(html)
240     state = parser.check_page()
241     
242     if state == PageType.CORRECTION:
243         try:
244             cor = parser.get_correction()
245             if cor[0]:
246                 print
247                 print '* Origin ambiguous:'
248                 lo = None
249                 while not lo or not lo.isdigit() or int(lo) > len(cor[0]):
250                     i = 1
251                     for c in cor[0]:
252                         print '%d. %s' % (i, c)
253                         i += 1
254                     lo = sys.stdin.readline().strip()
255                 
256                 args.o = cor[0][int(lo) - 1]
257                 
258             if cor[1]:
259                 print
260                 print '* Destination ambiguous:'
261                 ld = None
262                 while not ld or not ld.isdigit() or int(ld) > len(cor[1]):
263                     j = 1
264                     for c in cor[1]:
265                         print '%d. %s' % (j, c)
266                         j += 1
267                     ld = sys.stdin.readline().strip()
268                     
269                 args.d = cor[1][int(ld) - 1]
270             
271             html = search((args.o.encode('UTF-8'), args.ot), (args.d.encode('UTF-8'), args.dt)).read()
272     
273             parser = sParser(html)
274             state = parser.check_page()
275             
276         except ParserError:
277             print 'PANIC at correction page'
278             
279     if state == PageType.RESULT:
280         parser = rParser(html)
281         try:
282             overviews = parser.overview
283             details = parser.details
284             l = ''
285             while not l == 'q':
286                 for r in range(len(overviews)):
287                     print '%d. [%s] %s-%s (%s)' % (r + 1, overviews[r]['date'], overviews[r]['time'][0], overviews[r]['time'][1], overviews[r]['duration'])
288                 print 'q. Quit'
289                 l = sys.stdin.readline().strip()
290                 print
291                 print '~' * 100
292                 
293                 if l.isdigit() and int(l) <= len(details):
294                     for detail in details[int(l) - 1]:
295                         if detail['time'] and detail['station']:
296                             time = '%s - %s' % (detail['time'][0].strftime(TIMEFORMAT), detail['time'][1].strftime(TIMEFORMAT))
297                             print '[%s] %s\n%s' % (time, ' -> '.join(detail['station']), '\n'.join(detail['info']))
298                         else:
299                             print '\n'.join(detail['info'])
300                         print '-' * 100
301                 print
302                 
303         except ParserError:
304             print 'parsererror'
305             
306     elif state == PageType.UNKNOWN:
307         print 'PANIC unknown result'