summaryrefslogtreecommitdiff
path: root/datastore-leveldb/src
diff options
context:
space:
mode:
Diffstat (limited to 'datastore-leveldb/src')
-rw-r--r--datastore-leveldb/src/db.cpp74
-rw-r--r--datastore-leveldb/src/db.h14
-rw-r--r--datastore-leveldb/src/server.cpp110
-rw-r--r--datastore-leveldb/src/web.cpp179
-rw-r--r--datastore-leveldb/src/web.h21
5 files changed, 398 insertions, 0 deletions
diff --git a/datastore-leveldb/src/db.cpp b/datastore-leveldb/src/db.cpp
new file mode 100644
index 0000000..991a82e
--- /dev/null
+++ b/datastore-leveldb/src/db.cpp
@@ -0,0 +1,74 @@
+#include "db.h"
+
+#include <iomanip>
+#include <iostream>
+#include <mutex>
+#include <unordered_map>
+#include <sstream>
+
+static std::unordered_map<std::string,leveldb::DB*> dbs;
+static std::mutex getDBmutex;
+
+static bool sensor_name_is_sane(std::string& name) {
+ for (auto it = name.begin(); it != name.end(); ++it) {
+ if (not ((*it >= '0' and *it <= '9') or
+ (*it >= 'A' and *it <= 'Z') or
+ (*it >= 'a' and *it <= 'z') or
+ (*it == '.'))) {
+ return false;
+ }
+ }
+ return true;
+}
+
+std::string db_make_key(const uint64_t timestamp) {
+ std::stringstream key;
+ key << "ts-";
+ key << std::setfill('0') << std::setw(20) << timestamp;
+ return key.str();
+}
+
+leveldb::DB *db_get(std::string& name) {
+ getDBmutex.lock();
+ if (dbs.find(name) == dbs.end()) {
+ if (not sensor_name_is_sane(name)) {
+ getDBmutex.unlock();
+ return nullptr;
+ }
+ leveldb::DB *db;
+ leveldb::Options options;
+ options.create_if_missing = true;
+ leveldb::Status status = leveldb::DB::Open(options, "data/"+name, &db);
+ if (not status.ok()) {
+ std::cout << status.ToString() << std::endl;
+ getDBmutex.unlock();
+ return nullptr;
+ }
+ dbs[name] = db;
+ getDBmutex.unlock();
+ return db;
+ } else {
+ getDBmutex.unlock();
+ return dbs.at(name);
+ }
+}
+
+
+bool db_insert(std::string& name, const uint64_t timestamp, std::string& value) {
+ leveldb::DB *db = db_get(name);
+ if (db == nullptr) return false;
+
+ auto status = db->Put(leveldb::WriteOptions(), db_make_key(timestamp), value);
+ return status.ok();
+}
+
+void db_close() {
+ auto it = dbs.begin();
+ while (it != dbs.end()) {
+ std::cout << "Close Database: " << (*it).first << std::endl;
+ delete (*it).second;
+ dbs.erase(it++); //post increment!
+ }
+ std::cout << std::endl;
+}
+
diff --git a/datastore-leveldb/src/db.h b/datastore-leveldb/src/db.h
new file mode 100644
index 0000000..5e31b00
--- /dev/null
+++ b/datastore-leveldb/src/db.h
@@ -0,0 +1,14 @@
+#ifndef HAVE_DB_H
+#define HAVE_DB_H
+
+#include <leveldb/db.h>
+
+leveldb::DB *db_get(std::string& name);
+
+bool db_insert(std::string& name, const uint64_t timestamp, std::string& value);
+
+void db_close();
+
+std::string db_make_key(const uint64_t timestamp);
+
+#endif /*HAVE_DB_H*/
diff --git a/datastore-leveldb/src/server.cpp b/datastore-leveldb/src/server.cpp
new file mode 100644
index 0000000..83f4cdb
--- /dev/null
+++ b/datastore-leveldb/src/server.cpp
@@ -0,0 +1,110 @@
+#include <csignal>
+#include <forward_list>
+#include <functional>
+#include <chrono>
+#include <thread>
+
+#include <boost/regex.hpp>
+#include <boost/program_options.hpp>
+#include "mongoose.h"
+
+#include "db.h"
+#include "web.h"
+
+typedef void(web_handler_t)(const boost::cmatch&, struct mg_connection *conn);
+
+std::forward_list<std::pair<boost::regex,std::function<web_handler_t>>> web_handler;
+
+struct mg_context *ctx;
+
+int begin_request_handler(struct mg_connection *conn) {
+ boost::cmatch match;
+
+ const struct mg_request_info *request_info = mg_get_request_info(conn);
+
+ std::cout << request_info->request_method << " "
+ << request_info->uri << std::endl;
+ for (auto item = web_handler.begin(); item != web_handler.end(); ++item) {
+ if (boost::regex_match(request_info->uri, match, (*item).first)) {
+ (*item).second(match, conn);
+ return 1;
+ }
+ }
+ return 0;
+}
+
+struct sigaction old_action;
+
+// Stop the server.
+void sigint_handler(int s) {
+ mg_stop(ctx);
+ db_close();
+ exit(0);
+}
+
+namespace po = boost::program_options;
+
+int main(int argc, char **argv) {
+ // Program options
+ po::options_description desc("Program options");
+ desc.add_options()
+ ("wwwroot", po::value<std::string>()->required(), "Path to www root directory")
+ ("port", po::value<int>()->default_value(8080), "HTTP Port")
+ ("help", "Print help message")
+ ;
+
+ po::variables_map vm;
+ po::store(po::parse_command_line(argc, argv, desc), vm);
+ try {
+ po::notify(vm);
+ } catch(boost::program_options::required_option& e) {
+ std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
+ return 1;
+ }
+
+ if (vm.count("help")) {
+ std::cout << "Usage: " << argv[0] << std::endl << std::endl;
+ std::cout << desc << std::endl;
+ return 0;
+ }
+ const char *port = strdup(std::to_string(vm["port"].as<int>()).c_str());
+ const char *wwwroot = strdup(vm["wwwroot"].as<std::string>().c_str());
+
+ // Mongoose options
+ const char *options[] = {
+ "num_threads", "8",
+ "listening_ports", port,
+ "document_root", wwwroot,
+ NULL};
+
+ // Mongoose callbacks
+ struct mg_callbacks callbacks;
+ memset(&callbacks, 0, sizeof(callbacks));
+ callbacks.begin_request = begin_request_handler;
+
+ // Routing
+ web_handler.push_front(std::make_pair(
+ web_handle_api_value_R,
+ web_handle_api_value));
+ web_handler.push_front(std::make_pair(
+ web_handle_api_value_timestamp_R,
+ web_handle_api_value_timestamp));
+ web_handler.push_front(std::make_pair(
+ web_handle_api_range_R,
+ web_handle_api_range));
+ web_handler.push_front(std::make_pair(
+ web_handle_api_range_size_R,
+ web_handle_api_range_size));
+
+ // Signals: handle C-c
+ struct sigaction action;
+ memset(&action, 0, sizeof(action));
+ action.sa_handler = &sigint_handler;
+ sigaction(SIGINT, &action, &old_action);
+
+ // Start the web server.
+ ctx = mg_start(&callbacks, NULL, options);
+ while (1)
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ return 1;
+}
diff --git a/datastore-leveldb/src/web.cpp b/datastore-leveldb/src/web.cpp
new file mode 100644
index 0000000..443e782
--- /dev/null
+++ b/datastore-leveldb/src/web.cpp
@@ -0,0 +1,179 @@
+#include "web.h"
+#include "db.h"
+#include <algorithm>
+#include <chrono>
+#include <leveldb/comparator.h>
+
+static const leveldb::Comparator *cmp = leveldb::BytewiseComparator();
+
+static inline uint64_t now() {
+ return std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::high_resolution_clock::now().time_since_epoch()).count();
+}
+
+// uint64_t is long
+inline void strToL(unsigned long int *l, std::string s) {
+ *l = std::stoul(s);
+}
+// uint64_t is long long
+inline void strToL(unsigned long long int *l, std::string s) {
+ *l = std::stoull(s);
+}
+
+static inline void reply_header(struct mg_connection *conn, const bool ok, const char *msg, const char *extra = "") {
+ mg_printf(conn,
+ "HTTP/1.1 %s %s\r\n"
+ "%s",
+ (ok) ? "200" : "500",
+ msg,
+ extra);
+}
+
+static inline bool parse_key(leveldb::Slice &&key, uint64_t *value) {
+ if (key.size() != 20+3)
+ return false;
+ strToL(value, key.data()+3);
+ return true;
+}
+
+const boost::regex web_handle_api_value_R(
+ "/api/value/([a-zA-Z0-9\\.]+)");
+void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn) {
+ std::string sensor(match[1].str());
+ uint64_t timestamp = now();
+ char buf[1024];
+ int bytesRead = mg_read(conn, buf, 1024);
+ std::string value(buf, bytesRead);
+
+ if (db_insert(sensor, timestamp, value)) {
+ reply_header(conn, true, "OK Value received", "\r\n");
+ } else {
+ reply_header(conn, false, "Internal Error", "\r\n");
+ }
+}
+
+const boost::regex web_handle_api_value_timestamp_R(
+ "/api/value/([a-zA-Z0-9\\.]+)/([0-9]+)");
+void web_handle_api_value_timestamp(const boost::cmatch &match, struct mg_connection *conn) {
+ std::string sensor(match[1].str());
+ uint64_t timestamp; strToL(&timestamp, match[2].str());
+ char buf[1024];
+ int bytesRead = mg_read(conn, buf, 1024);
+ std::string value(buf, bytesRead);
+
+ if (db_insert(sensor, timestamp, value)) {
+ reply_header(conn, true, "OK Value received", "\r\n");
+ } else {
+ reply_header(conn, false, "Internal Error", "\r\n");
+ }
+}
+
+static inline void print_json_tuple(struct mg_connection *conn,
+ std::ostringstream &outbuf,
+ leveldb::Slice &&key,
+ leveldb::Slice &&value) {
+
+ size_t key_size = key.size();
+ if (key_size != 20+3) {
+ std::cerr << "invalid key" << std::endl;
+ return;
+ }
+
+ unsigned int offset = 3; // "ts-"
+ // skip zeros in timestamp
+ while (offset < key_size-1 and *(key.data()+offset) == '0')
+ offset++;
+
+ outbuf << '[';
+ outbuf.write(key.data()+offset, key_size-offset);
+ outbuf << ',';
+ outbuf.write(value.data(), value.size());
+ outbuf << ']';
+
+ mg_write(conn, outbuf.str().c_str(), outbuf.tellp());
+}
+
+
+const boost::regex web_handle_api_range_R(
+ "/api/range/([a-zA-Z0-9\\.]+)/([0-9]+)/([0-9]+)");
+void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn) {
+ std::string sensor(match[1].str());
+ uint64_t start; strToL(&start, match[2].str());
+ uint64_t end; strToL(&end, match[3].str());
+ std::string key_start(std::move(db_make_key(start)));
+ std::string key_end(std::move(db_make_key(end)));
+
+ leveldb::DB *db = db_get(sensor);
+ if (db == nullptr) {
+ reply_header(conn, false, "Internal Error", "\r\n");
+ return;
+ }
+
+ reply_header(conn, true, "OK", "Content-Type: application/json\r\n\r\n");
+
+ std::ostringstream out;
+ out << "{\"sensor\":\"" << sensor << "\", "
+ << "\"error\":null, "
+ << "\"data\":[";
+ mg_write(conn, out.str().c_str(), out.str().size());
+
+ leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions());
+ bool first = true;
+ std::ostringstream outbuf;
+ for (it->Seek(key_start);
+ it->Valid() && cmp->Compare(it->key(), key_end) < 0;
+ it->Next()) {
+ if (first) first = false;
+ else outbuf << ",";
+ print_json_tuple(conn, outbuf, it->key(), it->value());
+ outbuf.seekp(0);
+ }
+ mg_printf(conn, "]}\r\n");
+ delete it;
+}
+
+const boost::regex web_handle_api_range_size_R(
+ "/api/range/([a-zA-Z0-9\\.]+)/([0-9]+)/([0-9]+)/([0-9]+)");
+void web_handle_api_range_size(const boost::cmatch &match, struct mg_connection *conn) {
+ std::string sensor(match[1].str());
+ uint64_t start; strToL(&start, match[2].str());
+ uint64_t end; strToL(&end, match[3].str());
+ uint64_t size; strToL(&size, match[4].str());
+
+ leveldb::DB *db = db_get(sensor);
+ if (db == nullptr) {
+ reply_header(conn, false, "Internal Error", "\r\n");
+ return;
+ }
+
+ reply_header(conn, true, "OK", "Content-Type: application/json\r\n\r\n");
+
+ uint64_t step = std::max((uint64_t)1, (end-start) / size);
+
+ std::ostringstream out;
+ out << "{\"sensor\":\"" << sensor << "\", "
+ << "\"error\":null, "
+ << "\"size\":" << size << ", "
+ << "\"step\":" << step << ", "
+ << "\"data\":[";
+ mg_write(conn, out.str().c_str(), out.str().size());
+
+ bool first = true;
+ uint64_t actual_size = 0;
+ std::ostringstream outbuf;
+ leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions());
+ for (uint64_t key = start; key < end; key += step) {
+ it->Seek(db_make_key(key));
+ if (!it->Valid()) continue;
+
+ if (first) first = false;
+ else outbuf << ",";
+
+ print_json_tuple(conn, outbuf, it->key(), it->value());
+ outbuf.seekp(0);
+ actual_size++;
+ }
+ outbuf << "], \"actual_size\":" << actual_size << "}\r\n";
+ mg_write(conn, outbuf.str().c_str(), outbuf.tellp());
+ delete it;
+}
diff --git a/datastore-leveldb/src/web.h b/datastore-leveldb/src/web.h
new file mode 100644
index 0000000..a9d4593
--- /dev/null
+++ b/datastore-leveldb/src/web.h
@@ -0,0 +1,21 @@
+#ifndef HAVE_WEB_H
+#define HAVE_WEB_H
+
+extern "C" {
+#include "mongoose.h"
+}
+#include <boost/regex.hpp>
+
+extern const boost::regex web_handle_api_value_R;
+void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn);
+
+extern const boost::regex web_handle_api_value_timestamp_R;
+void web_handle_api_value_timestamp(const boost::cmatch &match, struct mg_connection *conn);
+
+extern const boost::regex web_handle_api_range_R;
+void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn);
+
+extern const boost::regex web_handle_api_range_size_R;
+void web_handle_api_range_size(const boost::cmatch &match, struct mg_connection *conn);
+
+#endif/*HAVE_WEB_H*/