summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYves Fischer <yvesf-git@xapek.org>2015-05-09 00:07:35 +0200
committerYves Fischer <yvesf-git@xapek.org>2016-08-21 15:13:57 +0200
commite4e413ede34a2ae306ddd130324435d0e735f064 (patch)
treed6d438ee1702b8c3bd294baa415be6d6694b2698
downloadinfluxdb-guard-e4e413ede34a2ae306ddd130324435d0e735f064.tar.gz
influxdb-guard-e4e413ede34a2ae306ddd130324435d0e735f064.zip
influxdb-guard
-rw-r--r--.gitignore5
-rw-r--r--LICENSE20
-rw-r--r--README.md23
-rw-r--r--guard/__init__.py1
-rw-r--r--guard/application.py106
-rw-r--r--guard/model.py16
-rw-r--r--guard/proxyfix.py31
-rw-r--r--guard/templates.py65
-rw-r--r--requirements.txt5
-rwxr-xr-xrun.py65
-rwxr-xr-xtester.py76
11 files changed, 413 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..871bde4
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,5 @@
+__pycache__
+*pyc
+db.sqlite
+*config.ini
+config \ No newline at end of file
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..ff8652d
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,20 @@
+The MIT License (MIT)
+
+Copyright (c) 2016 Yves Fischer <yvesf+git@xapek.org>
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..fd712bc
--- /dev/null
+++ b/README.md
@@ -0,0 +1,23 @@
+# influxdb guard
+
+Simple *firewall* for influxdb database
+
+## Running
+
+Requires [pyinflux](https://github.com/yvesf/pyinflux) library.
+
+### development
+
+```
+# ./run.py
+... prints default config
+# ./run.py <path-to-config>
+```
+
+### gunicorn/waitress
+
+```
+# waitress-serve run:application
+# gunicorn3 run
+```
+Searches for a file called `config` in current working directory. \ No newline at end of file
diff --git a/guard/__init__.py b/guard/__init__.py
new file mode 100644
index 0000000..ab2fc5d
--- /dev/null
+++ b/guard/__init__.py
@@ -0,0 +1 @@
+# empty file \ No newline at end of file
diff --git a/guard/application.py b/guard/application.py
new file mode 100644
index 0000000..4782bac
--- /dev/null
+++ b/guard/application.py
@@ -0,0 +1,106 @@
+from .model import Access
+from . import templates
+
+import logging
+from fnmatch import fnmatchcase
+from urllib.request import urlopen
+from urllib.parse import urlencode
+
+from pyinflux.parser import LineParser
+from flask import request, redirect, session, url_for, make_response, Response
+from werkzeug.exceptions import InternalServerError, Forbidden, Unauthorized, PreconditionFailed
+
+
+def make_app(app, config):
+ cookiename = config['cookiename']
+
+ @app.route("/", methods=["POST", "GET"])
+ def index():
+ if "valid" not in session:
+ return redirect(url_for("login"))
+
+ if request.method == "POST":
+ Access.create(token=request.form["token"],
+ pattern=request.form["pattern"],
+ comment=request.form["comment"])
+ return templates.index(config['port'], cookiename)
+
+ @app.route("/delete", methods=["POST"])
+ def delete():
+ if "valid" not in session:
+ return redirect(url_for("login"))
+ Access.delete().where(Access.id == int(request.form["id"])).execute()
+ return redirect(url_for("index"))
+
+ @app.route("/logout")
+ def logout():
+ del session["valid"]
+ return redirect(url_for("login"))
+
+ @app.route("/login", methods=["POST", "GET"])
+ def login():
+ if request.method == "POST":
+ if "secret" in request.form and \
+ request.form["secret"] == config["adminsecret"]:
+ logging.info("Login successful")
+ session["valid"] = True
+ return redirect(url_for("index"))
+ else:
+ logging.info("Login failed")
+ return redirect(url_for("login"))
+ else:
+ return templates.login()
+
+ @app.route("/write/<token>", methods=["POST"])
+ @app.route("/write", methods=["POST"])
+ def write(token=None):
+ # Find token cookie in request
+ if not token:
+ token = request.cookies.get(cookiename)
+
+ valid_access_patterns = tuple(map(lambda x: x[0],
+ Access.select(Access.pattern).where(Access.token == token).tuples()))
+ if not valid_access_patterns:
+ return make_response("Token {} is not configured\n".format(token), Forbidden.code)
+
+ # Read request
+ database = request.args.get("db")
+ if not database:
+ return make_response("No database name given in parameter 'db'\n", PreconditionFailed.code)
+
+ # Read data
+ data = request.get_data(as_text=False)
+ data_text = data.decode(request.charset, request.encoding_errors)
+
+ # validate access
+ for line in data_text.split("\n"):
+ identifier = database + '.' + LineParser.parse_identifier(line)
+ if not any(map(lambda pattern: fnmatchcase(identifier, pattern), valid_access_patterns)):
+ logging.info("Reject write to %s", identifier)
+ return make_response("Invalid path: {}\n".format(identifier), Unauthorized.code)
+
+
+ # checks passed, forward the request
+ params = {'db': database}
+ if request.args.get("rp"):
+ params["rp"] = request.args.get("rp")
+ if request.args.get("precision"):
+ params["precision"] = request.args.get("precision")
+ if request.args.get("consistency"):
+ params["consistency"] = request.args.get("consistency")
+
+ if config.get("username"):
+ params["u"] = config["username"]
+ if config.get("password"):
+ params["p"] = config["password"]
+
+ url = config['url'] + "?" + urlencode(params)
+ try:
+ with urlopen(url, data) as resp:
+ if logging.root.isEnabledFor(logging.INFO):
+ logging.info("Forwarded request to database '%s' for '%s' with '%s': %s",
+ database, request.remote_addr, token, data_text)
+ return Response(resp.fp, status=resp.getcode())
+ except:
+ logging.exception("Request failed for:\n%s", data)
+ return make_response("Failed", InternalServerError.code)
diff --git a/guard/model.py b/guard/model.py
new file mode 100644
index 0000000..7372da0
--- /dev/null
+++ b/guard/model.py
@@ -0,0 +1,16 @@
+from datetime import datetime
+
+import peewee as p
+
+db = p.SqliteDatabase(None)
+
+
+class Access(p.Model):
+ class Meta:
+ database = db
+
+ id = p.PrimaryKeyField()
+ token = p.CharField(null=False, index=True)
+ pattern = p.CharField(null=False)
+ comment = p.CharField()
+ create_date = p.DateTimeField(default=datetime.now)
diff --git a/guard/proxyfix.py b/guard/proxyfix.py
new file mode 100644
index 0000000..bd86203
--- /dev/null
+++ b/guard/proxyfix.py
@@ -0,0 +1,31 @@
+from werkzeug.contrib.fixers import ProxyFix
+
+def _reverse_proxified(app):
+ """
+ Configure apache as:
+ RequestHeader set X-Script-Name /videos
+ """
+
+ def wsgi_call(environ, start_response):
+ script_name = environ.get('HTTP_X_SCRIPT_NAME', '')
+ if script_name:
+ environ['SCRIPT_NAME'] = script_name
+ path_info = environ['PATH_INFO']
+ if path_info.startswith(script_name):
+ environ['PATH_INFO'] = path_info[len(script_name):]
+
+ scheme = environ.get('HTTP_X_SCHEME', '')
+ if scheme:
+ environ['wsgi.url_scheme'] = scheme
+ return app(environ, start_response)
+
+ return wsgi_call
+
+"""
+'ProxyFix' applied for reading X-Forwarded-Proto, X-Forwarded-For, X-Forwarded-Host
+
+'_reverse_proxified' applied for 'X-Script-Name'.
+That's required when external request path is different from application server path.
+"""
+def fix(app):
+ return _reverse_proxified(ProxyFix(app))
diff --git a/guard/templates.py b/guard/templates.py
new file mode 100644
index 0000000..e8d63a0
--- /dev/null
+++ b/guard/templates.py
@@ -0,0 +1,65 @@
+from . import model
+
+from typing import Iterable
+
+from lxml.builder import E
+from lxml import etree
+from flask import url_for
+
+
+def _html(title: str, body: etree.Element) -> str:
+ return etree.tostring(E.html(
+ E.head(E.title(title)),
+ E.body(body)
+ ), encoding='utf-8')
+
+
+def login() -> str:
+ return _html("Login", E.form(
+ E.input(type="password", name="secret", placeholder="Secret admin token"),
+ E.button("login", type="submit"),
+ method="POST"
+ ))
+
+
+def _index_access(accesses: Iterable[model.Access]):
+ return map(lambda access: (
+ E.tr(
+ E.td(access.token),
+ E.td(access.pattern),
+ E.td(str(access.create_date)),
+ E.td(access.comment),
+ E.td(
+ E.form(
+ E.input(type="hidden", name="id", value=str(access.id)),
+ E.button("X", title="Delete", type="submit"),
+ method="POST", action=url_for("delete")))
+ )
+ ), accesses)
+
+
+def index(serverport: str, cookiename: str) -> str:
+ return _html(
+ "Manage access tokens",
+ E.div(
+ E.p(E.a("Logout", href=url_for("logout"))),
+ E.p("Call the write-service like: ",
+ E.pre("http://thisMachine:{}/write".format(serverport)),
+ "with the token in a cookie named {}".format(cookiename)),
+ E.p("Or directly in the url: ",
+ E.pre("http://thisMachine:{}/write/<token>".format(serverport))),
+ E.form(
+ E.input(type="hidden", name="action", value="create"),
+ E.input(name="token", placeholder="Token secret"),
+ E.input(name="pattern", placeholder="Paths pattern"),
+ E.input(name="comment", placeholder="Comment"),
+ E.button("Create", type="submit"), method="POST"),
+ E.hr(),
+ E.table(
+ E.tr(
+ E.th("cookie: " + cookiename + " value"), E.th("Pattern"),
+ E.th("Create Date"), E.th("Comment"), E.th()
+ ), border="1", style="width: 100%",
+ *_index_access(model.Access.select().order_by(model.Access.create_date))
+ ))
+ )
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..f7f861e
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,5 @@
+peewee == 2.8.1
+flask == 0.11.1
+lxml
+# for pyinflux[parser]
+funcparserlib==0.3.6 \ No newline at end of file
diff --git a/run.py b/run.py
new file mode 100755
index 0000000..bfd9975
--- /dev/null
+++ b/run.py
@@ -0,0 +1,65 @@
+#!/usr/bin/env python3
+import os
+import sys
+import logging
+from configparser import ConfigParser
+
+from flask import Flask
+
+from guard import application, model, proxyfix
+
+
+def make_app(config):
+ app = Flask("Guard")
+ app.secret_key = config["Guard"]["Adminsecret"] + os.uname()[1]
+
+ model.db.init(os.path.join(os.path.dirname(__file__), "db.sqlite"))
+ model.db.connect()
+ model.db.create_tables(filter(lambda t: not t.table_exists(), [model.Access]))
+
+ log_format = "%(asctime)-15s %(levelname)s [%(module) 15s] %(message)s"
+ if config["Guard"]["Debug"] == "true":
+ logging.basicConfig(level=logging.INFO, format=log_format)
+ logging.info("Running in DEBUG mode!")
+ else:
+ logging.basicConfig(level=logging.ERROR, format=log_format)
+ logging.info("Running in production mode!")
+
+ application.make_app(app, config["Guard"])
+
+ app.wsgi_app = proxyfix.fix(app.wsgi_app)
+ return app
+
+
+def get_config(path: str = None) -> ConfigParser:
+ config = ConfigParser()
+ config["Guard"] = {
+ 'url': "http://localhost:8086/write",
+ "port": "8001",
+ "cookiename": "token",
+ "adminsecret": "changeme",
+ "debug": "false"
+ }
+ if path is not None:
+ config.read(path)
+ return config
+
+
+def start():
+ if len(sys.argv) == 2:
+ config = get_config(sys.argv[1])
+ app = make_app(config)
+ app.run(host="0.0.0.0", port=int(config["Guard"]["Port"]),
+ debug=config["Guard"]["Debug"] == "true",
+ use_reloader=config["Guard"]["Debug"] == "true")
+ else:
+ print("# {} >config.defaults.ini".format(sys.argv[0]))
+ get_config().write(sys.stdout)
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ start()
+else: # for running gunicorn3
+ config = get_config('config')
+ application = make_app(config)
diff --git a/tester.py b/tester.py
new file mode 100755
index 0000000..9900b4a
--- /dev/null
+++ b/tester.py
@@ -0,0 +1,76 @@
+#!/usr/bin/env python3
+import itertools
+from multiprocessing.pool import ThreadPool as Pool
+from pyinflux.client import InfluxDB, Line
+
+
+class TwoWayInfluxDB(InfluxDB):
+ def __init__(self, write_url):
+ super().__init__('test', 'localhost')
+ self._write_url = write_url
+
+influxdb = TwoWayInfluxDB('http://localhost:8001/write/test?')
+value_generator = itertools.count()
+
+def test(number):
+ for value in value_generator:
+ if value > 200:
+ return
+ if (value + 1) % 50 == 0:
+ print("thread {} at {}".format(number, value))
+ expected = "value{value}".format(value=value)
+ line = Line('series' + str(value),
+ {'tag': 'tag' + expected},
+ {'field': 'field' + expected})
+
+ try:
+ influxdb.write([line])
+ except:
+ print(line)
+ raise
+
+ query = """\
+ SELECT *
+ FROM "series{value}"
+ WHERE time >= now() - 2s""".format(value=value)
+ try:
+ results = influxdb.query(query).as_json()['results']
+ except:
+ print("""\
+ Failure. With the following INSERT:
+ {line}""".format(line=line))
+ raise
+ try:
+ assert len(results) == 1
+ series = results[0]['series']
+ assert len(series) == 1
+ series_result = series[0]
+ assert series_result['name'] == 'series' + str(value)
+ assert len(series_result['values']) == 1
+ values = series_result['values']
+ assert len(values) == 1
+ assert len(values[0]) == 3
+ columns = series_result['columns']
+ assert columns == ['time', 'field', 'tag']
+
+ assert values[0][columns.index('field')] == 'fieldvalue' + str(value)
+ assert values[0][columns.index('tag')] == 'tagvalue' + str(value)
+ except:
+ print("""\
+ Failure. With the following INSERT:
+ {line}
+
+ Got this result:
+ {results}""".format(line=line, results=results))
+ raise
+
+
+print(influxdb.query('DROP DATABASE test').as_text())
+print(influxdb.query('CREATE DATABASE test').as_text())
+
+N_PROC = 1
+with Pool(processes=N_PROC) as pool:
+ for res in pool.imap_unordered(test, range(1, N_PROC + 1)):
+ # they are suppossed to run endlessly, if one returns then there was a failure
+ pool.terminate()
+print('Done')