summaryrefslogtreecommitdiff
path: root/wikifolio/__init__.py
blob: 6537b86b5c234c50b22d39854951366a6fdee3de (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import typing
import logging
import codecs
import time
import datetime
from urllib.request import urlopen, Request

from lxml.html import parse

from . import model

logger = logging.getLogger(__name__)

COMMENT_URL = "https://www.wikifolio.com/dynamic/de/de/invest/" \
              "getpagedmessagesforwikifolio/{name}?Id={id}" \
              "&tv=False" \
              "&id={id}" \
              "&page=1" \
              "&pageSize=15" \
              "&_={timestamp}"
TRADES_URL = "https://www.wikifolio.com/dynamic/de/de/invest/" \
             "getpagedtradesforwikifolio/{name}?id={id}" \
             "&page=1&pageSize=100"
USER_AGENT = "Mozilla/4.0 (compatible; MSIE 6.0; " \
             "Windows NT 5.1; FSL 7.0.6.01001)"


def make_request(url) -> Request:
    logging.info("Make request: {}".format(url))
    request = Request(url)
    request.add_header("User-Agent", USER_AGENT)
    return request


def get_id_from_name(name: str) -> model.Certificate:
    request = make_request(model.BASE_URL + name)
    with urlopen(request) as input_raw:
        document = parse(codecs.getreader('utf-8')(input_raw))
        try:
            return model.Certificate(
                name,
                document.find('//input[@id="wikifolio"]').value,
                document.find('//input[@id="wikifolio-shortdesc"]').value,
                document.find('//input[@id="wikifolio-isin"]').value,
                document.find('//div[@data-trader]').get('data-trader'))
        except:
            raise Exception("Failed to find wikifolio infos (id,name,isin) in html page")


def get_comments(cert: model.Certificate) -> typing.Iterable[model.Comment]:
    logger.info("Fetch comments of {.name}".format(cert))
    request = make_request(COMMENT_URL.format(
        id=cert.guid, name=cert.name, timestamp=int(time.time())))
    with urlopen(request) as input_raw:
        document = parse(codecs.getreader('utf-8')(input_raw))
        comments = document.findall('//div[@class="user-comment"]')
        for div_comment in comments:
            pub_date = div_comment.find('div/time').get('datetime')
            yield model.Comment(
                time.strptime(pub_date, "%d.%m.%Y %H:%M:%S"),
                "{trader} <{trader}@localhost>".format(trader=cert.trader),
                div_comment.find('div[@class="message-item-content"]').text,
                div_comment.get('id'),
                cert.make_url())


def get_trades(cert):
    """:type cert: model.Certificate"""
    request = make_request(TRADES_URL.format(name=cert.name, id=cert.guid))
    with urlopen(request) as input_raw:
        document = parse(codecs.getreader('utf-8')(input_raw))
        trade_blocks = document.findall('//table/tr')

        share_name = share_isin = kurs = None
        trades = []
        for trade_block in trade_blocks:
            try:
                typ = trade_block.find('td[2]').text.strip()
                if typ != "" and trade_block.find('td[1]/div/a/span') is not None:  # not a continuation
                    share_name = trade_block.find('td[1]/div/a/span').text.strip()
                    share_isin = trade_block.find('td[1]/div/div').text.strip()
                else:  # a continuaton, read type from first column
                    typ = trade_block.find('td[1]/span').text.strip()
                if trade_block.find('td[4]').text and trade_block.find('td[4]').text.strip() != "":
                    kurs = trade_block.find('td[4]').text.strip().replace('.', '').replace(',', '.')
                volume = trade_block.find('td[5]').text.strip().replace('.', '').replace(',', '.')
                timestamp = trade_block.find('td[3]/div[2]').text.strip()
                timestamp = timestamp.replace('\xa0', ' ')
                timestamp = datetime.datetime.strptime(timestamp, "%d.%m.%Y %H:%M")
                status = trade_block.find('td[3]/div[1]').text.strip()
                if status != model.Trade.STATUS_AUSGEFUEHRT:
                    continue  # skip pending ones

                trades.append(model.Trade(
                    share_name, share_isin, typ, status, timestamp, float(kurs), float(volume)
                ))
            except:
                raise Exception("failed to decode trade")  # to do: where?

        return trades