summaryrefslogtreecommitdiff
path: root/datastore-leveldb/src
diff options
context:
space:
mode:
Diffstat (limited to 'datastore-leveldb/src')
-rw-r--r--datastore-leveldb/src/db.cpp74
-rw-r--r--datastore-leveldb/src/db.h14
-rw-r--r--datastore-leveldb/src/server.cpp110
-rw-r--r--datastore-leveldb/src/web.cpp179
-rw-r--r--datastore-leveldb/src/web.h21
5 files changed, 0 insertions, 398 deletions
diff --git a/datastore-leveldb/src/db.cpp b/datastore-leveldb/src/db.cpp
deleted file mode 100644
index 991a82e..0000000
--- a/datastore-leveldb/src/db.cpp
+++ /dev/null
@@ -1,74 +0,0 @@
-#include "db.h"
-
-#include <iomanip>
-#include <iostream>
-#include <mutex>
-#include <unordered_map>
-#include <sstream>
-
-static std::unordered_map<std::string,leveldb::DB*> dbs;
-static std::mutex getDBmutex;
-
-static bool sensor_name_is_sane(std::string& name) {
- for (auto it = name.begin(); it != name.end(); ++it) {
- if (not ((*it >= '0' and *it <= '9') or
- (*it >= 'A' and *it <= 'Z') or
- (*it >= 'a' and *it <= 'z') or
- (*it == '.'))) {
- return false;
- }
- }
- return true;
-}
-
-std::string db_make_key(const uint64_t timestamp) {
- std::stringstream key;
- key << "ts-";
- key << std::setfill('0') << std::setw(20) << timestamp;
- return key.str();
-}
-
-leveldb::DB *db_get(std::string& name) {
- getDBmutex.lock();
- if (dbs.find(name) == dbs.end()) {
- if (not sensor_name_is_sane(name)) {
- getDBmutex.unlock();
- return nullptr;
- }
- leveldb::DB *db;
- leveldb::Options options;
- options.create_if_missing = true;
- leveldb::Status status = leveldb::DB::Open(options, "data/"+name, &db);
- if (not status.ok()) {
- std::cout << status.ToString() << std::endl;
- getDBmutex.unlock();
- return nullptr;
- }
- dbs[name] = db;
- getDBmutex.unlock();
- return db;
- } else {
- getDBmutex.unlock();
- return dbs.at(name);
- }
-}
-
-
-bool db_insert(std::string& name, const uint64_t timestamp, std::string& value) {
- leveldb::DB *db = db_get(name);
- if (db == nullptr) return false;
-
- auto status = db->Put(leveldb::WriteOptions(), db_make_key(timestamp), value);
- return status.ok();
-}
-
-void db_close() {
- auto it = dbs.begin();
- while (it != dbs.end()) {
- std::cout << "Close Database: " << (*it).first << std::endl;
- delete (*it).second;
- dbs.erase(it++); //post increment!
- }
- std::cout << std::endl;
-}
-
diff --git a/datastore-leveldb/src/db.h b/datastore-leveldb/src/db.h
deleted file mode 100644
index 5e31b00..0000000
--- a/datastore-leveldb/src/db.h
+++ /dev/null
@@ -1,14 +0,0 @@
-#ifndef HAVE_DB_H
-#define HAVE_DB_H
-
-#include <leveldb/db.h>
-
-leveldb::DB *db_get(std::string& name);
-
-bool db_insert(std::string& name, const uint64_t timestamp, std::string& value);
-
-void db_close();
-
-std::string db_make_key(const uint64_t timestamp);
-
-#endif /*HAVE_DB_H*/
diff --git a/datastore-leveldb/src/server.cpp b/datastore-leveldb/src/server.cpp
deleted file mode 100644
index 83f4cdb..0000000
--- a/datastore-leveldb/src/server.cpp
+++ /dev/null
@@ -1,110 +0,0 @@
-#include <csignal>
-#include <forward_list>
-#include <functional>
-#include <chrono>
-#include <thread>
-
-#include <boost/regex.hpp>
-#include <boost/program_options.hpp>
-#include "mongoose.h"
-
-#include "db.h"
-#include "web.h"
-
-typedef void(web_handler_t)(const boost::cmatch&, struct mg_connection *conn);
-
-std::forward_list<std::pair<boost::regex,std::function<web_handler_t>>> web_handler;
-
-struct mg_context *ctx;
-
-int begin_request_handler(struct mg_connection *conn) {
- boost::cmatch match;
-
- const struct mg_request_info *request_info = mg_get_request_info(conn);
-
- std::cout << request_info->request_method << " "
- << request_info->uri << std::endl;
- for (auto item = web_handler.begin(); item != web_handler.end(); ++item) {
- if (boost::regex_match(request_info->uri, match, (*item).first)) {
- (*item).second(match, conn);
- return 1;
- }
- }
- return 0;
-}
-
-struct sigaction old_action;
-
-// Stop the server.
-void sigint_handler(int s) {
- mg_stop(ctx);
- db_close();
- exit(0);
-}
-
-namespace po = boost::program_options;
-
-int main(int argc, char **argv) {
- // Program options
- po::options_description desc("Program options");
- desc.add_options()
- ("wwwroot", po::value<std::string>()->required(), "Path to www root directory")
- ("port", po::value<int>()->default_value(8080), "HTTP Port")
- ("help", "Print help message")
- ;
-
- po::variables_map vm;
- po::store(po::parse_command_line(argc, argv, desc), vm);
- try {
- po::notify(vm);
- } catch(boost::program_options::required_option& e) {
- std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
- return 1;
- }
-
- if (vm.count("help")) {
- std::cout << "Usage: " << argv[0] << std::endl << std::endl;
- std::cout << desc << std::endl;
- return 0;
- }
- const char *port = strdup(std::to_string(vm["port"].as<int>()).c_str());
- const char *wwwroot = strdup(vm["wwwroot"].as<std::string>().c_str());
-
- // Mongoose options
- const char *options[] = {
- "num_threads", "8",
- "listening_ports", port,
- "document_root", wwwroot,
- NULL};
-
- // Mongoose callbacks
- struct mg_callbacks callbacks;
- memset(&callbacks, 0, sizeof(callbacks));
- callbacks.begin_request = begin_request_handler;
-
- // Routing
- web_handler.push_front(std::make_pair(
- web_handle_api_value_R,
- web_handle_api_value));
- web_handler.push_front(std::make_pair(
- web_handle_api_value_timestamp_R,
- web_handle_api_value_timestamp));
- web_handler.push_front(std::make_pair(
- web_handle_api_range_R,
- web_handle_api_range));
- web_handler.push_front(std::make_pair(
- web_handle_api_range_size_R,
- web_handle_api_range_size));
-
- // Signals: handle C-c
- struct sigaction action;
- memset(&action, 0, sizeof(action));
- action.sa_handler = &sigint_handler;
- sigaction(SIGINT, &action, &old_action);
-
- // Start the web server.
- ctx = mg_start(&callbacks, NULL, options);
- while (1)
- std::this_thread::sleep_for(std::chrono::seconds(1));
- return 1;
-}
diff --git a/datastore-leveldb/src/web.cpp b/datastore-leveldb/src/web.cpp
deleted file mode 100644
index 443e782..0000000
--- a/datastore-leveldb/src/web.cpp
+++ /dev/null
@@ -1,179 +0,0 @@
-#include "web.h"
-#include "db.h"
-#include <algorithm>
-#include <chrono>
-#include <leveldb/comparator.h>
-
-static const leveldb::Comparator *cmp = leveldb::BytewiseComparator();
-
-static inline uint64_t now() {
- return std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::high_resolution_clock::now().time_since_epoch()).count();
-}
-
-// uint64_t is long
-inline void strToL(unsigned long int *l, std::string s) {
- *l = std::stoul(s);
-}
-// uint64_t is long long
-inline void strToL(unsigned long long int *l, std::string s) {
- *l = std::stoull(s);
-}
-
-static inline void reply_header(struct mg_connection *conn, const bool ok, const char *msg, const char *extra = "") {
- mg_printf(conn,
- "HTTP/1.1 %s %s\r\n"
- "%s",
- (ok) ? "200" : "500",
- msg,
- extra);
-}
-
-static inline bool parse_key(leveldb::Slice &&key, uint64_t *value) {
- if (key.size() != 20+3)
- return false;
- strToL(value, key.data()+3);
- return true;
-}
-
-const boost::regex web_handle_api_value_R(
- "/api/value/([a-zA-Z0-9\\.]+)");
-void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn) {
- std::string sensor(match[1].str());
- uint64_t timestamp = now();
- char buf[1024];
- int bytesRead = mg_read(conn, buf, 1024);
- std::string value(buf, bytesRead);
-
- if (db_insert(sensor, timestamp, value)) {
- reply_header(conn, true, "OK Value received", "\r\n");
- } else {
- reply_header(conn, false, "Internal Error", "\r\n");
- }
-}
-
-const boost::regex web_handle_api_value_timestamp_R(
- "/api/value/([a-zA-Z0-9\\.]+)/([0-9]+)");
-void web_handle_api_value_timestamp(const boost::cmatch &match, struct mg_connection *conn) {
- std::string sensor(match[1].str());
- uint64_t timestamp; strToL(&timestamp, match[2].str());
- char buf[1024];
- int bytesRead = mg_read(conn, buf, 1024);
- std::string value(buf, bytesRead);
-
- if (db_insert(sensor, timestamp, value)) {
- reply_header(conn, true, "OK Value received", "\r\n");
- } else {
- reply_header(conn, false, "Internal Error", "\r\n");
- }
-}
-
-static inline void print_json_tuple(struct mg_connection *conn,
- std::ostringstream &outbuf,
- leveldb::Slice &&key,
- leveldb::Slice &&value) {
-
- size_t key_size = key.size();
- if (key_size != 20+3) {
- std::cerr << "invalid key" << std::endl;
- return;
- }
-
- unsigned int offset = 3; // "ts-"
- // skip zeros in timestamp
- while (offset < key_size-1 and *(key.data()+offset) == '0')
- offset++;
-
- outbuf << '[';
- outbuf.write(key.data()+offset, key_size-offset);
- outbuf << ',';
- outbuf.write(value.data(), value.size());
- outbuf << ']';
-
- mg_write(conn, outbuf.str().c_str(), outbuf.tellp());
-}
-
-
-const boost::regex web_handle_api_range_R(
- "/api/range/([a-zA-Z0-9\\.]+)/([0-9]+)/([0-9]+)");
-void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn) {
- std::string sensor(match[1].str());
- uint64_t start; strToL(&start, match[2].str());
- uint64_t end; strToL(&end, match[3].str());
- std::string key_start(std::move(db_make_key(start)));
- std::string key_end(std::move(db_make_key(end)));
-
- leveldb::DB *db = db_get(sensor);
- if (db == nullptr) {
- reply_header(conn, false, "Internal Error", "\r\n");
- return;
- }
-
- reply_header(conn, true, "OK", "Content-Type: application/json\r\n\r\n");
-
- std::ostringstream out;
- out << "{\"sensor\":\"" << sensor << "\", "
- << "\"error\":null, "
- << "\"data\":[";
- mg_write(conn, out.str().c_str(), out.str().size());
-
- leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions());
- bool first = true;
- std::ostringstream outbuf;
- for (it->Seek(key_start);
- it->Valid() && cmp->Compare(it->key(), key_end) < 0;
- it->Next()) {
- if (first) first = false;
- else outbuf << ",";
- print_json_tuple(conn, outbuf, it->key(), it->value());
- outbuf.seekp(0);
- }
- mg_printf(conn, "]}\r\n");
- delete it;
-}
-
-const boost::regex web_handle_api_range_size_R(
- "/api/range/([a-zA-Z0-9\\.]+)/([0-9]+)/([0-9]+)/([0-9]+)");
-void web_handle_api_range_size(const boost::cmatch &match, struct mg_connection *conn) {
- std::string sensor(match[1].str());
- uint64_t start; strToL(&start, match[2].str());
- uint64_t end; strToL(&end, match[3].str());
- uint64_t size; strToL(&size, match[4].str());
-
- leveldb::DB *db = db_get(sensor);
- if (db == nullptr) {
- reply_header(conn, false, "Internal Error", "\r\n");
- return;
- }
-
- reply_header(conn, true, "OK", "Content-Type: application/json\r\n\r\n");
-
- uint64_t step = std::max((uint64_t)1, (end-start) / size);
-
- std::ostringstream out;
- out << "{\"sensor\":\"" << sensor << "\", "
- << "\"error\":null, "
- << "\"size\":" << size << ", "
- << "\"step\":" << step << ", "
- << "\"data\":[";
- mg_write(conn, out.str().c_str(), out.str().size());
-
- bool first = true;
- uint64_t actual_size = 0;
- std::ostringstream outbuf;
- leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions());
- for (uint64_t key = start; key < end; key += step) {
- it->Seek(db_make_key(key));
- if (!it->Valid()) continue;
-
- if (first) first = false;
- else outbuf << ",";
-
- print_json_tuple(conn, outbuf, it->key(), it->value());
- outbuf.seekp(0);
- actual_size++;
- }
- outbuf << "], \"actual_size\":" << actual_size << "}\r\n";
- mg_write(conn, outbuf.str().c_str(), outbuf.tellp());
- delete it;
-}
diff --git a/datastore-leveldb/src/web.h b/datastore-leveldb/src/web.h
deleted file mode 100644
index a9d4593..0000000
--- a/datastore-leveldb/src/web.h
+++ /dev/null
@@ -1,21 +0,0 @@
-#ifndef HAVE_WEB_H
-#define HAVE_WEB_H
-
-extern "C" {
-#include "mongoose.h"
-}
-#include <boost/regex.hpp>
-
-extern const boost::regex web_handle_api_value_R;
-void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn);
-
-extern const boost::regex web_handle_api_value_timestamp_R;
-void web_handle_api_value_timestamp(const boost::cmatch &match, struct mg_connection *conn);
-
-extern const boost::regex web_handle_api_range_R;
-void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn);
-
-extern const boost::regex web_handle_api_range_size_R;
-void web_handle_api_range_size(const boost::cmatch &match, struct mg_connection *conn);
-
-#endif/*HAVE_WEB_H*/