diff options
Diffstat (limited to 'datastore-leveldb/src')
-rw-r--r-- | datastore-leveldb/src/db.cpp | 74 | ||||
-rw-r--r-- | datastore-leveldb/src/db.h | 14 | ||||
-rw-r--r-- | datastore-leveldb/src/server.cpp | 110 | ||||
-rw-r--r-- | datastore-leveldb/src/web.cpp | 179 | ||||
-rw-r--r-- | datastore-leveldb/src/web.h | 21 |
5 files changed, 398 insertions, 0 deletions
diff --git a/datastore-leveldb/src/db.cpp b/datastore-leveldb/src/db.cpp new file mode 100644 index 0000000..991a82e --- /dev/null +++ b/datastore-leveldb/src/db.cpp @@ -0,0 +1,74 @@ +#include "db.h" + +#include <iomanip> +#include <iostream> +#include <mutex> +#include <unordered_map> +#include <sstream> + +static std::unordered_map<std::string,leveldb::DB*> dbs; +static std::mutex getDBmutex; + +static bool sensor_name_is_sane(std::string& name) { + for (auto it = name.begin(); it != name.end(); ++it) { + if (not ((*it >= '0' and *it <= '9') or + (*it >= 'A' and *it <= 'Z') or + (*it >= 'a' and *it <= 'z') or + (*it == '.'))) { + return false; + } + } + return true; +} + +std::string db_make_key(const uint64_t timestamp) { + std::stringstream key; + key << "ts-"; + key << std::setfill('0') << std::setw(20) << timestamp; + return key.str(); +} + +leveldb::DB *db_get(std::string& name) { + getDBmutex.lock(); + if (dbs.find(name) == dbs.end()) { + if (not sensor_name_is_sane(name)) { + getDBmutex.unlock(); + return nullptr; + } + leveldb::DB *db; + leveldb::Options options; + options.create_if_missing = true; + leveldb::Status status = leveldb::DB::Open(options, "data/"+name, &db); + if (not status.ok()) { + std::cout << status.ToString() << std::endl; + getDBmutex.unlock(); + return nullptr; + } + dbs[name] = db; + getDBmutex.unlock(); + return db; + } else { + getDBmutex.unlock(); + return dbs.at(name); + } +} + + +bool db_insert(std::string& name, const uint64_t timestamp, std::string& value) { + leveldb::DB *db = db_get(name); + if (db == nullptr) return false; + + auto status = db->Put(leveldb::WriteOptions(), db_make_key(timestamp), value); + return status.ok(); +} + +void db_close() { + auto it = dbs.begin(); + while (it != dbs.end()) { + std::cout << "Close Database: " << (*it).first << std::endl; + delete (*it).second; + dbs.erase(it++); //post increment! + } + std::cout << std::endl; +} + diff --git a/datastore-leveldb/src/db.h b/datastore-leveldb/src/db.h new file mode 100644 index 0000000..5e31b00 --- /dev/null +++ b/datastore-leveldb/src/db.h @@ -0,0 +1,14 @@ +#ifndef HAVE_DB_H +#define HAVE_DB_H + +#include <leveldb/db.h> + +leveldb::DB *db_get(std::string& name); + +bool db_insert(std::string& name, const uint64_t timestamp, std::string& value); + +void db_close(); + +std::string db_make_key(const uint64_t timestamp); + +#endif /*HAVE_DB_H*/ diff --git a/datastore-leveldb/src/server.cpp b/datastore-leveldb/src/server.cpp new file mode 100644 index 0000000..83f4cdb --- /dev/null +++ b/datastore-leveldb/src/server.cpp @@ -0,0 +1,110 @@ +#include <csignal> +#include <forward_list> +#include <functional> +#include <chrono> +#include <thread> + +#include <boost/regex.hpp> +#include <boost/program_options.hpp> +#include "mongoose.h" + +#include "db.h" +#include "web.h" + +typedef void(web_handler_t)(const boost::cmatch&, struct mg_connection *conn); + +std::forward_list<std::pair<boost::regex,std::function<web_handler_t>>> web_handler; + +struct mg_context *ctx; + +int begin_request_handler(struct mg_connection *conn) { + boost::cmatch match; + + const struct mg_request_info *request_info = mg_get_request_info(conn); + + std::cout << request_info->request_method << " " + << request_info->uri << std::endl; + for (auto item = web_handler.begin(); item != web_handler.end(); ++item) { + if (boost::regex_match(request_info->uri, match, (*item).first)) { + (*item).second(match, conn); + return 1; + } + } + return 0; +} + +struct sigaction old_action; + +// Stop the server. +void sigint_handler(int s) { + mg_stop(ctx); + db_close(); + exit(0); +} + +namespace po = boost::program_options; + +int main(int argc, char **argv) { + // Program options + po::options_description desc("Program options"); + desc.add_options() + ("wwwroot", po::value<std::string>()->required(), "Path to www root directory") + ("port", po::value<int>()->default_value(8080), "HTTP Port") + ("help", "Print help message") + ; + + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, desc), vm); + try { + po::notify(vm); + } catch(boost::program_options::required_option& e) { + std::cerr << "ERROR: " << e.what() << std::endl << std::endl; + return 1; + } + + if (vm.count("help")) { + std::cout << "Usage: " << argv[0] << std::endl << std::endl; + std::cout << desc << std::endl; + return 0; + } + const char *port = strdup(std::to_string(vm["port"].as<int>()).c_str()); + const char *wwwroot = strdup(vm["wwwroot"].as<std::string>().c_str()); + + // Mongoose options + const char *options[] = { + "num_threads", "8", + "listening_ports", port, + "document_root", wwwroot, + NULL}; + + // Mongoose callbacks + struct mg_callbacks callbacks; + memset(&callbacks, 0, sizeof(callbacks)); + callbacks.begin_request = begin_request_handler; + + // Routing + web_handler.push_front(std::make_pair( + web_handle_api_value_R, + web_handle_api_value)); + web_handler.push_front(std::make_pair( + web_handle_api_value_timestamp_R, + web_handle_api_value_timestamp)); + web_handler.push_front(std::make_pair( + web_handle_api_range_R, + web_handle_api_range)); + web_handler.push_front(std::make_pair( + web_handle_api_range_size_R, + web_handle_api_range_size)); + + // Signals: handle C-c + struct sigaction action; + memset(&action, 0, sizeof(action)); + action.sa_handler = &sigint_handler; + sigaction(SIGINT, &action, &old_action); + + // Start the web server. + ctx = mg_start(&callbacks, NULL, options); + while (1) + std::this_thread::sleep_for(std::chrono::seconds(1)); + return 1; +} diff --git a/datastore-leveldb/src/web.cpp b/datastore-leveldb/src/web.cpp new file mode 100644 index 0000000..443e782 --- /dev/null +++ b/datastore-leveldb/src/web.cpp @@ -0,0 +1,179 @@ +#include "web.h" +#include "db.h" +#include <algorithm> +#include <chrono> +#include <leveldb/comparator.h> + +static const leveldb::Comparator *cmp = leveldb::BytewiseComparator(); + +static inline uint64_t now() { + return std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::high_resolution_clock::now().time_since_epoch()).count(); +} + +// uint64_t is long +inline void strToL(unsigned long int *l, std::string s) { + *l = std::stoul(s); +} +// uint64_t is long long +inline void strToL(unsigned long long int *l, std::string s) { + *l = std::stoull(s); +} + +static inline void reply_header(struct mg_connection *conn, const bool ok, const char *msg, const char *extra = "") { + mg_printf(conn, + "HTTP/1.1 %s %s\r\n" + "%s", + (ok) ? "200" : "500", + msg, + extra); +} + +static inline bool parse_key(leveldb::Slice &&key, uint64_t *value) { + if (key.size() != 20+3) + return false; + strToL(value, key.data()+3); + return true; +} + +const boost::regex web_handle_api_value_R( + "/api/value/([a-zA-Z0-9\\.]+)"); +void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn) { + std::string sensor(match[1].str()); + uint64_t timestamp = now(); + char buf[1024]; + int bytesRead = mg_read(conn, buf, 1024); + std::string value(buf, bytesRead); + + if (db_insert(sensor, timestamp, value)) { + reply_header(conn, true, "OK Value received", "\r\n"); + } else { + reply_header(conn, false, "Internal Error", "\r\n"); + } +} + +const boost::regex web_handle_api_value_timestamp_R( + "/api/value/([a-zA-Z0-9\\.]+)/([0-9]+)"); +void web_handle_api_value_timestamp(const boost::cmatch &match, struct mg_connection *conn) { + std::string sensor(match[1].str()); + uint64_t timestamp; strToL(×tamp, match[2].str()); + char buf[1024]; + int bytesRead = mg_read(conn, buf, 1024); + std::string value(buf, bytesRead); + + if (db_insert(sensor, timestamp, value)) { + reply_header(conn, true, "OK Value received", "\r\n"); + } else { + reply_header(conn, false, "Internal Error", "\r\n"); + } +} + +static inline void print_json_tuple(struct mg_connection *conn, + std::ostringstream &outbuf, + leveldb::Slice &&key, + leveldb::Slice &&value) { + + size_t key_size = key.size(); + if (key_size != 20+3) { + std::cerr << "invalid key" << std::endl; + return; + } + + unsigned int offset = 3; // "ts-" + // skip zeros in timestamp + while (offset < key_size-1 and *(key.data()+offset) == '0') + offset++; + + outbuf << '['; + outbuf.write(key.data()+offset, key_size-offset); + outbuf << ','; + outbuf.write(value.data(), value.size()); + outbuf << ']'; + + mg_write(conn, outbuf.str().c_str(), outbuf.tellp()); +} + + +const boost::regex web_handle_api_range_R( + "/api/range/([a-zA-Z0-9\\.]+)/([0-9]+)/([0-9]+)"); +void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn) { + std::string sensor(match[1].str()); + uint64_t start; strToL(&start, match[2].str()); + uint64_t end; strToL(&end, match[3].str()); + std::string key_start(std::move(db_make_key(start))); + std::string key_end(std::move(db_make_key(end))); + + leveldb::DB *db = db_get(sensor); + if (db == nullptr) { + reply_header(conn, false, "Internal Error", "\r\n"); + return; + } + + reply_header(conn, true, "OK", "Content-Type: application/json\r\n\r\n"); + + std::ostringstream out; + out << "{\"sensor\":\"" << sensor << "\", " + << "\"error\":null, " + << "\"data\":["; + mg_write(conn, out.str().c_str(), out.str().size()); + + leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); + bool first = true; + std::ostringstream outbuf; + for (it->Seek(key_start); + it->Valid() && cmp->Compare(it->key(), key_end) < 0; + it->Next()) { + if (first) first = false; + else outbuf << ","; + print_json_tuple(conn, outbuf, it->key(), it->value()); + outbuf.seekp(0); + } + mg_printf(conn, "]}\r\n"); + delete it; +} + +const boost::regex web_handle_api_range_size_R( + "/api/range/([a-zA-Z0-9\\.]+)/([0-9]+)/([0-9]+)/([0-9]+)"); +void web_handle_api_range_size(const boost::cmatch &match, struct mg_connection *conn) { + std::string sensor(match[1].str()); + uint64_t start; strToL(&start, match[2].str()); + uint64_t end; strToL(&end, match[3].str()); + uint64_t size; strToL(&size, match[4].str()); + + leveldb::DB *db = db_get(sensor); + if (db == nullptr) { + reply_header(conn, false, "Internal Error", "\r\n"); + return; + } + + reply_header(conn, true, "OK", "Content-Type: application/json\r\n\r\n"); + + uint64_t step = std::max((uint64_t)1, (end-start) / size); + + std::ostringstream out; + out << "{\"sensor\":\"" << sensor << "\", " + << "\"error\":null, " + << "\"size\":" << size << ", " + << "\"step\":" << step << ", " + << "\"data\":["; + mg_write(conn, out.str().c_str(), out.str().size()); + + bool first = true; + uint64_t actual_size = 0; + std::ostringstream outbuf; + leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); + for (uint64_t key = start; key < end; key += step) { + it->Seek(db_make_key(key)); + if (!it->Valid()) continue; + + if (first) first = false; + else outbuf << ","; + + print_json_tuple(conn, outbuf, it->key(), it->value()); + outbuf.seekp(0); + actual_size++; + } + outbuf << "], \"actual_size\":" << actual_size << "}\r\n"; + mg_write(conn, outbuf.str().c_str(), outbuf.tellp()); + delete it; +} diff --git a/datastore-leveldb/src/web.h b/datastore-leveldb/src/web.h new file mode 100644 index 0000000..a9d4593 --- /dev/null +++ b/datastore-leveldb/src/web.h @@ -0,0 +1,21 @@ +#ifndef HAVE_WEB_H +#define HAVE_WEB_H + +extern "C" { +#include "mongoose.h" +} +#include <boost/regex.hpp> + +extern const boost::regex web_handle_api_value_R; +void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn); + +extern const boost::regex web_handle_api_value_timestamp_R; +void web_handle_api_value_timestamp(const boost::cmatch &match, struct mg_connection *conn); + +extern const boost::regex web_handle_api_range_R; +void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn); + +extern const boost::regex web_handle_api_range_size_R; +void web_handle_api_range_size(const boost::cmatch &match, struct mg_connection *conn); + +#endif/*HAVE_WEB_H*/ |