From f6675ccdd7a5997def3c4656f0e2c5dbbbed1fc8 Mon Sep 17 00:00:00 2001 From: Ebus-at-dockstar Date: Fri, 25 Jul 2014 22:13:55 +0200 Subject: embed xexpr-path --- datastore-leveldb/src/db.cpp | 74 ---------------- datastore-leveldb/src/db.h | 14 --- datastore-leveldb/src/server.cpp | 110 ------------------------ datastore-leveldb/src/web.cpp | 179 --------------------------------------- datastore-leveldb/src/web.h | 21 ----- 5 files changed, 398 deletions(-) delete mode 100644 datastore-leveldb/src/db.cpp delete mode 100644 datastore-leveldb/src/db.h delete mode 100644 datastore-leveldb/src/server.cpp delete mode 100644 datastore-leveldb/src/web.cpp delete mode 100644 datastore-leveldb/src/web.h (limited to 'datastore-leveldb/src') diff --git a/datastore-leveldb/src/db.cpp b/datastore-leveldb/src/db.cpp deleted file mode 100644 index 991a82e..0000000 --- a/datastore-leveldb/src/db.cpp +++ /dev/null @@ -1,74 +0,0 @@ -#include "db.h" - -#include -#include -#include -#include -#include - -static std::unordered_map dbs; -static std::mutex getDBmutex; - -static bool sensor_name_is_sane(std::string& name) { - for (auto it = name.begin(); it != name.end(); ++it) { - if (not ((*it >= '0' and *it <= '9') or - (*it >= 'A' and *it <= 'Z') or - (*it >= 'a' and *it <= 'z') or - (*it == '.'))) { - return false; - } - } - return true; -} - -std::string db_make_key(const uint64_t timestamp) { - std::stringstream key; - key << "ts-"; - key << std::setfill('0') << std::setw(20) << timestamp; - return key.str(); -} - -leveldb::DB *db_get(std::string& name) { - getDBmutex.lock(); - if (dbs.find(name) == dbs.end()) { - if (not sensor_name_is_sane(name)) { - getDBmutex.unlock(); - return nullptr; - } - leveldb::DB *db; - leveldb::Options options; - options.create_if_missing = true; - leveldb::Status status = leveldb::DB::Open(options, "data/"+name, &db); - if (not status.ok()) { - std::cout << status.ToString() << std::endl; - getDBmutex.unlock(); - return nullptr; - } - dbs[name] = db; - getDBmutex.unlock(); - return db; - } else { - getDBmutex.unlock(); - return dbs.at(name); - } -} - - -bool db_insert(std::string& name, const uint64_t timestamp, std::string& value) { - leveldb::DB *db = db_get(name); - if (db == nullptr) return false; - - auto status = db->Put(leveldb::WriteOptions(), db_make_key(timestamp), value); - return status.ok(); -} - -void db_close() { - auto it = dbs.begin(); - while (it != dbs.end()) { - std::cout << "Close Database: " << (*it).first << std::endl; - delete (*it).second; - dbs.erase(it++); //post increment! - } - std::cout << std::endl; -} - diff --git a/datastore-leveldb/src/db.h b/datastore-leveldb/src/db.h deleted file mode 100644 index 5e31b00..0000000 --- a/datastore-leveldb/src/db.h +++ /dev/null @@ -1,14 +0,0 @@ -#ifndef HAVE_DB_H -#define HAVE_DB_H - -#include - -leveldb::DB *db_get(std::string& name); - -bool db_insert(std::string& name, const uint64_t timestamp, std::string& value); - -void db_close(); - -std::string db_make_key(const uint64_t timestamp); - -#endif /*HAVE_DB_H*/ diff --git a/datastore-leveldb/src/server.cpp b/datastore-leveldb/src/server.cpp deleted file mode 100644 index 83f4cdb..0000000 --- a/datastore-leveldb/src/server.cpp +++ /dev/null @@ -1,110 +0,0 @@ -#include -#include -#include -#include -#include - -#include -#include -#include "mongoose.h" - -#include "db.h" -#include "web.h" - -typedef void(web_handler_t)(const boost::cmatch&, struct mg_connection *conn); - -std::forward_list>> web_handler; - -struct mg_context *ctx; - -int begin_request_handler(struct mg_connection *conn) { - boost::cmatch match; - - const struct mg_request_info *request_info = mg_get_request_info(conn); - - std::cout << request_info->request_method << " " - << request_info->uri << std::endl; - for (auto item = web_handler.begin(); item != web_handler.end(); ++item) { - if (boost::regex_match(request_info->uri, match, (*item).first)) { - (*item).second(match, conn); - return 1; - } - } - return 0; -} - -struct sigaction old_action; - -// Stop the server. -void sigint_handler(int s) { - mg_stop(ctx); - db_close(); - exit(0); -} - -namespace po = boost::program_options; - -int main(int argc, char **argv) { - // Program options - po::options_description desc("Program options"); - desc.add_options() - ("wwwroot", po::value()->required(), "Path to www root directory") - ("port", po::value()->default_value(8080), "HTTP Port") - ("help", "Print help message") - ; - - po::variables_map vm; - po::store(po::parse_command_line(argc, argv, desc), vm); - try { - po::notify(vm); - } catch(boost::program_options::required_option& e) { - std::cerr << "ERROR: " << e.what() << std::endl << std::endl; - return 1; - } - - if (vm.count("help")) { - std::cout << "Usage: " << argv[0] << std::endl << std::endl; - std::cout << desc << std::endl; - return 0; - } - const char *port = strdup(std::to_string(vm["port"].as()).c_str()); - const char *wwwroot = strdup(vm["wwwroot"].as().c_str()); - - // Mongoose options - const char *options[] = { - "num_threads", "8", - "listening_ports", port, - "document_root", wwwroot, - NULL}; - - // Mongoose callbacks - struct mg_callbacks callbacks; - memset(&callbacks, 0, sizeof(callbacks)); - callbacks.begin_request = begin_request_handler; - - // Routing - web_handler.push_front(std::make_pair( - web_handle_api_value_R, - web_handle_api_value)); - web_handler.push_front(std::make_pair( - web_handle_api_value_timestamp_R, - web_handle_api_value_timestamp)); - web_handler.push_front(std::make_pair( - web_handle_api_range_R, - web_handle_api_range)); - web_handler.push_front(std::make_pair( - web_handle_api_range_size_R, - web_handle_api_range_size)); - - // Signals: handle C-c - struct sigaction action; - memset(&action, 0, sizeof(action)); - action.sa_handler = &sigint_handler; - sigaction(SIGINT, &action, &old_action); - - // Start the web server. - ctx = mg_start(&callbacks, NULL, options); - while (1) - std::this_thread::sleep_for(std::chrono::seconds(1)); - return 1; -} diff --git a/datastore-leveldb/src/web.cpp b/datastore-leveldb/src/web.cpp deleted file mode 100644 index 443e782..0000000 --- a/datastore-leveldb/src/web.cpp +++ /dev/null @@ -1,179 +0,0 @@ -#include "web.h" -#include "db.h" -#include -#include -#include - -static const leveldb::Comparator *cmp = leveldb::BytewiseComparator(); - -static inline uint64_t now() { - return std::chrono::duration_cast( - std::chrono::high_resolution_clock::now().time_since_epoch()).count(); -} - -// uint64_t is long -inline void strToL(unsigned long int *l, std::string s) { - *l = std::stoul(s); -} -// uint64_t is long long -inline void strToL(unsigned long long int *l, std::string s) { - *l = std::stoull(s); -} - -static inline void reply_header(struct mg_connection *conn, const bool ok, const char *msg, const char *extra = "") { - mg_printf(conn, - "HTTP/1.1 %s %s\r\n" - "%s", - (ok) ? "200" : "500", - msg, - extra); -} - -static inline bool parse_key(leveldb::Slice &&key, uint64_t *value) { - if (key.size() != 20+3) - return false; - strToL(value, key.data()+3); - return true; -} - -const boost::regex web_handle_api_value_R( - "/api/value/([a-zA-Z0-9\\.]+)"); -void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn) { - std::string sensor(match[1].str()); - uint64_t timestamp = now(); - char buf[1024]; - int bytesRead = mg_read(conn, buf, 1024); - std::string value(buf, bytesRead); - - if (db_insert(sensor, timestamp, value)) { - reply_header(conn, true, "OK Value received", "\r\n"); - } else { - reply_header(conn, false, "Internal Error", "\r\n"); - } -} - -const boost::regex web_handle_api_value_timestamp_R( - "/api/value/([a-zA-Z0-9\\.]+)/([0-9]+)"); -void web_handle_api_value_timestamp(const boost::cmatch &match, struct mg_connection *conn) { - std::string sensor(match[1].str()); - uint64_t timestamp; strToL(×tamp, match[2].str()); - char buf[1024]; - int bytesRead = mg_read(conn, buf, 1024); - std::string value(buf, bytesRead); - - if (db_insert(sensor, timestamp, value)) { - reply_header(conn, true, "OK Value received", "\r\n"); - } else { - reply_header(conn, false, "Internal Error", "\r\n"); - } -} - -static inline void print_json_tuple(struct mg_connection *conn, - std::ostringstream &outbuf, - leveldb::Slice &&key, - leveldb::Slice &&value) { - - size_t key_size = key.size(); - if (key_size != 20+3) { - std::cerr << "invalid key" << std::endl; - return; - } - - unsigned int offset = 3; // "ts-" - // skip zeros in timestamp - while (offset < key_size-1 and *(key.data()+offset) == '0') - offset++; - - outbuf << '['; - outbuf.write(key.data()+offset, key_size-offset); - outbuf << ','; - outbuf.write(value.data(), value.size()); - outbuf << ']'; - - mg_write(conn, outbuf.str().c_str(), outbuf.tellp()); -} - - -const boost::regex web_handle_api_range_R( - "/api/range/([a-zA-Z0-9\\.]+)/([0-9]+)/([0-9]+)"); -void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn) { - std::string sensor(match[1].str()); - uint64_t start; strToL(&start, match[2].str()); - uint64_t end; strToL(&end, match[3].str()); - std::string key_start(std::move(db_make_key(start))); - std::string key_end(std::move(db_make_key(end))); - - leveldb::DB *db = db_get(sensor); - if (db == nullptr) { - reply_header(conn, false, "Internal Error", "\r\n"); - return; - } - - reply_header(conn, true, "OK", "Content-Type: application/json\r\n\r\n"); - - std::ostringstream out; - out << "{\"sensor\":\"" << sensor << "\", " - << "\"error\":null, " - << "\"data\":["; - mg_write(conn, out.str().c_str(), out.str().size()); - - leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); - bool first = true; - std::ostringstream outbuf; - for (it->Seek(key_start); - it->Valid() && cmp->Compare(it->key(), key_end) < 0; - it->Next()) { - if (first) first = false; - else outbuf << ","; - print_json_tuple(conn, outbuf, it->key(), it->value()); - outbuf.seekp(0); - } - mg_printf(conn, "]}\r\n"); - delete it; -} - -const boost::regex web_handle_api_range_size_R( - "/api/range/([a-zA-Z0-9\\.]+)/([0-9]+)/([0-9]+)/([0-9]+)"); -void web_handle_api_range_size(const boost::cmatch &match, struct mg_connection *conn) { - std::string sensor(match[1].str()); - uint64_t start; strToL(&start, match[2].str()); - uint64_t end; strToL(&end, match[3].str()); - uint64_t size; strToL(&size, match[4].str()); - - leveldb::DB *db = db_get(sensor); - if (db == nullptr) { - reply_header(conn, false, "Internal Error", "\r\n"); - return; - } - - reply_header(conn, true, "OK", "Content-Type: application/json\r\n\r\n"); - - uint64_t step = std::max((uint64_t)1, (end-start) / size); - - std::ostringstream out; - out << "{\"sensor\":\"" << sensor << "\", " - << "\"error\":null, " - << "\"size\":" << size << ", " - << "\"step\":" << step << ", " - << "\"data\":["; - mg_write(conn, out.str().c_str(), out.str().size()); - - bool first = true; - uint64_t actual_size = 0; - std::ostringstream outbuf; - leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); - for (uint64_t key = start; key < end; key += step) { - it->Seek(db_make_key(key)); - if (!it->Valid()) continue; - - if (first) first = false; - else outbuf << ","; - - print_json_tuple(conn, outbuf, it->key(), it->value()); - outbuf.seekp(0); - actual_size++; - } - outbuf << "], \"actual_size\":" << actual_size << "}\r\n"; - mg_write(conn, outbuf.str().c_str(), outbuf.tellp()); - delete it; -} diff --git a/datastore-leveldb/src/web.h b/datastore-leveldb/src/web.h deleted file mode 100644 index a9d4593..0000000 --- a/datastore-leveldb/src/web.h +++ /dev/null @@ -1,21 +0,0 @@ -#ifndef HAVE_WEB_H -#define HAVE_WEB_H - -extern "C" { -#include "mongoose.h" -} -#include - -extern const boost::regex web_handle_api_value_R; -void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn); - -extern const boost::regex web_handle_api_value_timestamp_R; -void web_handle_api_value_timestamp(const boost::cmatch &match, struct mg_connection *conn); - -extern const boost::regex web_handle_api_range_R; -void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn); - -extern const boost::regex web_handle_api_range_size_R; -void web_handle_api_range_size(const boost::cmatch &match, struct mg_connection *conn); - -#endif/*HAVE_WEB_H*/ -- cgit v1.2.1