summaryrefslogtreecommitdiff
path: root/jobs/prix_carburant.py
blob: 5855d4da62ec016027e26c8facab3d4d2832d788 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import codecs
import logging
import typing
from enum import Enum
from html.parser import HTMLParser
from urllib import request


rupture = 'Rupture de stock'

class Station:
    def __init__(self):
        self.station_name = ""
        self.prices = {}
        self.id = None

    def clean(self):
        self.prices = filter(lambda kv: kv[1] != '', self.prices.items())
        self.prices = filter(lambda kv: kv[1] != rupture, self.prices)
        self.prices = dict(map(lambda kv: (kv[0], float(kv[1])), self.prices))

    def __repr__(self):
        return "Prix: {} {}".format(self.station_name, self.prices)


State = Enum('State', 'pricelist fuel_name fuel_price station_name idle')


class Parser(HTMLParser):
    def error(self, message):
        logging.error("Parser error: %s", message)

    def __init__(self):
        super().__init__()
        self._prix = Station()
        self._current_fuel_name = ""
        self._state = State.idle

    def get_prix(self):
        self._prix.clean()
        return self._prix

    def handle_starttag(self, tag, attrs):
        attrs = dict(attrs)
        if self._state == State.idle and tag == "div" and "id" in attrs and attrs['id'] == 'prix':
            self._state = State.pricelist
        elif self._state in [State.pricelist, State.fuel_price] and tag == 'strong':
            self._state = State.fuel_name
            self._current_fuel_name = ''
        elif self._state == State.idle and tag == 'div' and 'id' in attrs and attrs['id'] == 'colg':
            self._state = State.station_name

    def handle_endtag(self, tag):
        if self._state == State.pricelist and tag == 'div':
            self._state = State.idle
        elif self._state == State.fuel_name and tag == 'strong':
            self._state = State.fuel_price
        elif self._state == State.fuel_price and tag == 'div':
            self._state = State.idle
        elif self._state == State.station_name and tag == 'p':
            self._state = State.idle

    def handle_data(self, data: str):
        if self._state == State.fuel_name:
            self._current_fuel_name += data.strip().replace(':', '')
            self._prix.prices[self._current_fuel_name] = ""
        elif self._state == State.fuel_price:
            if data.strip() != "0.000":
                self._prix.prices[self._current_fuel_name] += data.strip()
        elif self._state == State.station_name:
            if len(data.strip()) > 0:
                self._prix.station_name += data.strip() + ". "


URL = "https://www.prix-carburants.gouv.fr/map/recupererInfosPdv/"


def _execute(station_id: str):
    parser = Parser()
    r = request.Request(URL + station_id, data=b"")
    r.add_header('User-Agent', 'Mozilla/5.0 (X11; Linux x86_64; rv:64.0) Gecko/20100101 Firefox/64.0')
    r.add_header('X-Requested-With', 'XMLHttpRequest')
    r.add_header('X-Prototype-Version', '1.7')
    r.add_header('Connection', 'keep-alive')
    r.add_header('Content-type', 'application/x-www-form-urlencoded; charset=UTF-8')
    with request.urlopen(r) as f:
        # with open("info.html", 'rb') as f:
        f2 = codecs.getreader('utf-8')(f)
        f2.errors = 'ignore'
        for line in f2.readlines():
            parser.feed(line)

        try:
            prix = parser.get_prix()
            prix.id = station_id
            return prix
        except Exception as e:
            raise Exception("Failed for station: {}".format(station_id), e)


def execute(*ids) -> typing.Iterable[Station]:
    for station_id in ids:
        try:
            yield _execute(station_id)
        except Exception as e:
            raise Exception("Failed for station {}".format(station_id), e)


if __name__ == "__main__":
    from pprint import pprint

    pprint(list(execute('1630001', '67760001', '1210003', '1630003', '1210002', '1710001')))