summaryrefslogtreecommitdiff
path: root/functions.py
blob: e26e43f86388afb5a4c90b146eac62c1ca974a5b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import os
import re
import time
import struct
import hashlib
from urllib.parse import quote as urlencode


def _normalize_token(token):
    return re.sub(r"[^A-F0-9]", "", token.upper())


def _generate_token(username, secret, time):
    input = "{}{}{}".format(secret, username, time).encode('utf-8')
    output = struct.unpack(b"<L", hashlib.md5(input).digest()[:4])[0]
    token = "{:02X}-{:02X}-{:02X}".format(
        (output >> 16) & 0xff, (output >> 8) & 0xff, output & 0xff)
    return token


def file_lock(lock_file):
    from contextlib import contextmanager

    @contextmanager
    def file_lock():
        try:
            with open(lock_file, "x") as fh:
                try:
                    yield
                except:
                    raise
                finally:
                    fh.close()
                    os.remove(lock_file)
        except FileExistsError:
            raise Exception("Locking failed on {}".format(lock_file))

    return file_lock()


def token_message(username, secret, validsec, url):
    time_now = int(time.time())
    time_now_start = int(time_now - time_now % validsec)
    time_next_end = time_now_start + 2 * validsec
    token = _generate_token(username, secret, time_now_start)
    message = "Username: {} Token: {}".format(username, token)
    message += "\nValid from: {} to: {}".format(
        time.strftime("%c %Z(%z)", time.gmtime(time_now_start)),
        time.strftime("%c %Z(%z)", time.gmtime(time_next_end)))
    if url is not None:
        message += re.sub('(https?://)(.*)',
                          ' \\1' + urlencode(username) + ':' + urlencode(token) + '@\\2',
                          url)
    return message


def send_message(jid, password, recipient, message):
    import sleekxmpp

    def start(event):
        cl.send_message(mto=recipient, mtype='chat', mbody=message)
        cl.disconnect(wait=True)

    cl = sleekxmpp.ClientXMPP(jid, password)
    cl.add_event_handler("session_start", start, threaded=True)
    if cl.connect():
        cl.process(block=True)
    else:
        raise Exception("Unable to connect to xmpp server")


def verify_token(username, password, conf_secret, conf_validsec):
    time_now = int(time.time())
    time_now_start = int(time_now - time_now % conf_validsec)
    time_prev_start = time_now_start - conf_validsec
    valid_tokens = list(map(_normalize_token, (
        _generate_token(username, conf_secret, time_now_start),
        _generate_token(username, conf_secret, time_prev_start)
    )))
    return _normalize_token(password) in valid_tokens