diff options
author | Yves Fischer <yvesf-git@xapek.org> | 2015-05-09 00:07:35 +0200 |
---|---|---|
committer | Yves Fischer <yvesf-git@xapek.org> | 2016-08-21 15:13:57 +0200 |
commit | e4e413ede34a2ae306ddd130324435d0e735f064 (patch) | |
tree | d6d438ee1702b8c3bd294baa415be6d6694b2698 | |
download | influxdb-guard-e4e413ede34a2ae306ddd130324435d0e735f064.tar.gz influxdb-guard-e4e413ede34a2ae306ddd130324435d0e735f064.zip |
influxdb-guard
-rw-r--r-- | .gitignore | 5 | ||||
-rw-r--r-- | LICENSE | 20 | ||||
-rw-r--r-- | README.md | 23 | ||||
-rw-r--r-- | guard/__init__.py | 1 | ||||
-rw-r--r-- | guard/application.py | 106 | ||||
-rw-r--r-- | guard/model.py | 16 | ||||
-rw-r--r-- | guard/proxyfix.py | 31 | ||||
-rw-r--r-- | guard/templates.py | 65 | ||||
-rw-r--r-- | requirements.txt | 5 | ||||
-rwxr-xr-x | run.py | 65 | ||||
-rwxr-xr-x | tester.py | 76 |
11 files changed, 413 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..871bde4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +__pycache__ +*pyc +db.sqlite +*config.ini +config
\ No newline at end of file @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2016 Yves Fischer <yvesf+git@xapek.org> + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..fd712bc --- /dev/null +++ b/README.md @@ -0,0 +1,23 @@ +# influxdb guard + +Simple *firewall* for influxdb database + +## Running + +Requires [pyinflux](https://github.com/yvesf/pyinflux) library. + +### development + +``` +# ./run.py +... prints default config +# ./run.py <path-to-config> +``` + +### gunicorn/waitress + +``` +# waitress-serve run:application +# gunicorn3 run +``` +Searches for a file called `config` in current working directory.
\ No newline at end of file diff --git a/guard/__init__.py b/guard/__init__.py new file mode 100644 index 0000000..ab2fc5d --- /dev/null +++ b/guard/__init__.py @@ -0,0 +1 @@ +# empty file
\ No newline at end of file diff --git a/guard/application.py b/guard/application.py new file mode 100644 index 0000000..4782bac --- /dev/null +++ b/guard/application.py @@ -0,0 +1,106 @@ +from .model import Access +from . import templates + +import logging +from fnmatch import fnmatchcase +from urllib.request import urlopen +from urllib.parse import urlencode + +from pyinflux.parser import LineParser +from flask import request, redirect, session, url_for, make_response, Response +from werkzeug.exceptions import InternalServerError, Forbidden, Unauthorized, PreconditionFailed + + +def make_app(app, config): + cookiename = config['cookiename'] + + @app.route("/", methods=["POST", "GET"]) + def index(): + if "valid" not in session: + return redirect(url_for("login")) + + if request.method == "POST": + Access.create(token=request.form["token"], + pattern=request.form["pattern"], + comment=request.form["comment"]) + return templates.index(config['port'], cookiename) + + @app.route("/delete", methods=["POST"]) + def delete(): + if "valid" not in session: + return redirect(url_for("login")) + Access.delete().where(Access.id == int(request.form["id"])).execute() + return redirect(url_for("index")) + + @app.route("/logout") + def logout(): + del session["valid"] + return redirect(url_for("login")) + + @app.route("/login", methods=["POST", "GET"]) + def login(): + if request.method == "POST": + if "secret" in request.form and \ + request.form["secret"] == config["adminsecret"]: + logging.info("Login successful") + session["valid"] = True + return redirect(url_for("index")) + else: + logging.info("Login failed") + return redirect(url_for("login")) + else: + return templates.login() + + @app.route("/write/<token>", methods=["POST"]) + @app.route("/write", methods=["POST"]) + def write(token=None): + # Find token cookie in request + if not token: + token = request.cookies.get(cookiename) + + valid_access_patterns = tuple(map(lambda x: x[0], + Access.select(Access.pattern).where(Access.token == token).tuples())) + if not valid_access_patterns: + return make_response("Token {} is not configured\n".format(token), Forbidden.code) + + # Read request + database = request.args.get("db") + if not database: + return make_response("No database name given in parameter 'db'\n", PreconditionFailed.code) + + # Read data + data = request.get_data(as_text=False) + data_text = data.decode(request.charset, request.encoding_errors) + + # validate access + for line in data_text.split("\n"): + identifier = database + '.' + LineParser.parse_identifier(line) + if not any(map(lambda pattern: fnmatchcase(identifier, pattern), valid_access_patterns)): + logging.info("Reject write to %s", identifier) + return make_response("Invalid path: {}\n".format(identifier), Unauthorized.code) + + + # checks passed, forward the request + params = {'db': database} + if request.args.get("rp"): + params["rp"] = request.args.get("rp") + if request.args.get("precision"): + params["precision"] = request.args.get("precision") + if request.args.get("consistency"): + params["consistency"] = request.args.get("consistency") + + if config.get("username"): + params["u"] = config["username"] + if config.get("password"): + params["p"] = config["password"] + + url = config['url'] + "?" + urlencode(params) + try: + with urlopen(url, data) as resp: + if logging.root.isEnabledFor(logging.INFO): + logging.info("Forwarded request to database '%s' for '%s' with '%s': %s", + database, request.remote_addr, token, data_text) + return Response(resp.fp, status=resp.getcode()) + except: + logging.exception("Request failed for:\n%s", data) + return make_response("Failed", InternalServerError.code) diff --git a/guard/model.py b/guard/model.py new file mode 100644 index 0000000..7372da0 --- /dev/null +++ b/guard/model.py @@ -0,0 +1,16 @@ +from datetime import datetime + +import peewee as p + +db = p.SqliteDatabase(None) + + +class Access(p.Model): + class Meta: + database = db + + id = p.PrimaryKeyField() + token = p.CharField(null=False, index=True) + pattern = p.CharField(null=False) + comment = p.CharField() + create_date = p.DateTimeField(default=datetime.now) diff --git a/guard/proxyfix.py b/guard/proxyfix.py new file mode 100644 index 0000000..bd86203 --- /dev/null +++ b/guard/proxyfix.py @@ -0,0 +1,31 @@ +from werkzeug.contrib.fixers import ProxyFix + +def _reverse_proxified(app): + """ + Configure apache as: + RequestHeader set X-Script-Name /videos + """ + + def wsgi_call(environ, start_response): + script_name = environ.get('HTTP_X_SCRIPT_NAME', '') + if script_name: + environ['SCRIPT_NAME'] = script_name + path_info = environ['PATH_INFO'] + if path_info.startswith(script_name): + environ['PATH_INFO'] = path_info[len(script_name):] + + scheme = environ.get('HTTP_X_SCHEME', '') + if scheme: + environ['wsgi.url_scheme'] = scheme + return app(environ, start_response) + + return wsgi_call + +""" +'ProxyFix' applied for reading X-Forwarded-Proto, X-Forwarded-For, X-Forwarded-Host + +'_reverse_proxified' applied for 'X-Script-Name'. +That's required when external request path is different from application server path. +""" +def fix(app): + return _reverse_proxified(ProxyFix(app)) diff --git a/guard/templates.py b/guard/templates.py new file mode 100644 index 0000000..e8d63a0 --- /dev/null +++ b/guard/templates.py @@ -0,0 +1,65 @@ +from . import model + +from typing import Iterable + +from lxml.builder import E +from lxml import etree +from flask import url_for + + +def _html(title: str, body: etree.Element) -> str: + return etree.tostring(E.html( + E.head(E.title(title)), + E.body(body) + ), encoding='utf-8') + + +def login() -> str: + return _html("Login", E.form( + E.input(type="password", name="secret", placeholder="Secret admin token"), + E.button("login", type="submit"), + method="POST" + )) + + +def _index_access(accesses: Iterable[model.Access]): + return map(lambda access: ( + E.tr( + E.td(access.token), + E.td(access.pattern), + E.td(str(access.create_date)), + E.td(access.comment), + E.td( + E.form( + E.input(type="hidden", name="id", value=str(access.id)), + E.button("X", title="Delete", type="submit"), + method="POST", action=url_for("delete"))) + ) + ), accesses) + + +def index(serverport: str, cookiename: str) -> str: + return _html( + "Manage access tokens", + E.div( + E.p(E.a("Logout", href=url_for("logout"))), + E.p("Call the write-service like: ", + E.pre("http://thisMachine:{}/write".format(serverport)), + "with the token in a cookie named {}".format(cookiename)), + E.p("Or directly in the url: ", + E.pre("http://thisMachine:{}/write/<token>".format(serverport))), + E.form( + E.input(type="hidden", name="action", value="create"), + E.input(name="token", placeholder="Token secret"), + E.input(name="pattern", placeholder="Paths pattern"), + E.input(name="comment", placeholder="Comment"), + E.button("Create", type="submit"), method="POST"), + E.hr(), + E.table( + E.tr( + E.th("cookie: " + cookiename + " value"), E.th("Pattern"), + E.th("Create Date"), E.th("Comment"), E.th() + ), border="1", style="width: 100%", + *_index_access(model.Access.select().order_by(model.Access.create_date)) + )) + ) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f7f861e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +peewee == 2.8.1 +flask == 0.11.1 +lxml +# for pyinflux[parser] +funcparserlib==0.3.6
\ No newline at end of file @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +import os +import sys +import logging +from configparser import ConfigParser + +from flask import Flask + +from guard import application, model, proxyfix + + +def make_app(config): + app = Flask("Guard") + app.secret_key = config["Guard"]["Adminsecret"] + os.uname()[1] + + model.db.init(os.path.join(os.path.dirname(__file__), "db.sqlite")) + model.db.connect() + model.db.create_tables(filter(lambda t: not t.table_exists(), [model.Access])) + + log_format = "%(asctime)-15s %(levelname)s [%(module) 15s] %(message)s" + if config["Guard"]["Debug"] == "true": + logging.basicConfig(level=logging.INFO, format=log_format) + logging.info("Running in DEBUG mode!") + else: + logging.basicConfig(level=logging.ERROR, format=log_format) + logging.info("Running in production mode!") + + application.make_app(app, config["Guard"]) + + app.wsgi_app = proxyfix.fix(app.wsgi_app) + return app + + +def get_config(path: str = None) -> ConfigParser: + config = ConfigParser() + config["Guard"] = { + 'url': "http://localhost:8086/write", + "port": "8001", + "cookiename": "token", + "adminsecret": "changeme", + "debug": "false" + } + if path is not None: + config.read(path) + return config + + +def start(): + if len(sys.argv) == 2: + config = get_config(sys.argv[1]) + app = make_app(config) + app.run(host="0.0.0.0", port=int(config["Guard"]["Port"]), + debug=config["Guard"]["Debug"] == "true", + use_reloader=config["Guard"]["Debug"] == "true") + else: + print("# {} >config.defaults.ini".format(sys.argv[0])) + get_config().write(sys.stdout) + sys.exit(1) + + +if __name__ == "__main__": + start() +else: # for running gunicorn3 + config = get_config('config') + application = make_app(config) diff --git a/tester.py b/tester.py new file mode 100755 index 0000000..9900b4a --- /dev/null +++ b/tester.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 +import itertools +from multiprocessing.pool import ThreadPool as Pool +from pyinflux.client import InfluxDB, Line + + +class TwoWayInfluxDB(InfluxDB): + def __init__(self, write_url): + super().__init__('test', 'localhost') + self._write_url = write_url + +influxdb = TwoWayInfluxDB('http://localhost:8001/write/test?') +value_generator = itertools.count() + +def test(number): + for value in value_generator: + if value > 200: + return + if (value + 1) % 50 == 0: + print("thread {} at {}".format(number, value)) + expected = "value{value}".format(value=value) + line = Line('series' + str(value), + {'tag': 'tag' + expected}, + {'field': 'field' + expected}) + + try: + influxdb.write([line]) + except: + print(line) + raise + + query = """\ + SELECT * + FROM "series{value}" + WHERE time >= now() - 2s""".format(value=value) + try: + results = influxdb.query(query).as_json()['results'] + except: + print("""\ + Failure. With the following INSERT: + {line}""".format(line=line)) + raise + try: + assert len(results) == 1 + series = results[0]['series'] + assert len(series) == 1 + series_result = series[0] + assert series_result['name'] == 'series' + str(value) + assert len(series_result['values']) == 1 + values = series_result['values'] + assert len(values) == 1 + assert len(values[0]) == 3 + columns = series_result['columns'] + assert columns == ['time', 'field', 'tag'] + + assert values[0][columns.index('field')] == 'fieldvalue' + str(value) + assert values[0][columns.index('tag')] == 'tagvalue' + str(value) + except: + print("""\ + Failure. With the following INSERT: + {line} + + Got this result: + {results}""".format(line=line, results=results)) + raise + + +print(influxdb.query('DROP DATABASE test').as_text()) +print(influxdb.query('CREATE DATABASE test').as_text()) + +N_PROC = 1 +with Pool(processes=N_PROC) as pool: + for res in pool.imap_unordered(test, range(1, N_PROC + 1)): + # they are suppossed to run endlessly, if one returns then there was a failure + pool.terminate() +print('Done') |