diff options
Diffstat (limited to 'datastore-leveldb/src')
-rw-r--r-- | datastore-leveldb/src/db.cpp | 55 | ||||
-rw-r--r-- | datastore-leveldb/src/db.h | 10 | ||||
-rw-r--r-- | datastore-leveldb/src/main.cpp | 170 | ||||
-rw-r--r-- | datastore-leveldb/src/web.cpp | 95 | ||||
-rw-r--r-- | datastore-leveldb/src/web.h | 18 |
5 files changed, 190 insertions, 158 deletions
diff --git a/datastore-leveldb/src/db.cpp b/datastore-leveldb/src/db.cpp new file mode 100644 index 0000000..aae378f --- /dev/null +++ b/datastore-leveldb/src/db.cpp @@ -0,0 +1,55 @@ +#include "db.h" + +#include <iostream> +#include <mutex> +#include <sstream> +#include <unordered_map> + +static std::unordered_map<std::string,leveldb::DB*> dbs; +static std::mutex getDBmutex; + +static bool sensor_name_is_sane(std::string& name) { + for (auto it = name.begin(); it != name.end(); ++it) { + if (not ((*it >= '0' and *it <= '9') or + (*it >= 'A' and *it <= 'Z') or + (*it >= 'a' and *it <= 'z'))) { + return false; + } + } + return true; +} + +leveldb::DB *getDB(std::string& name) { + getDBmutex.lock(); + if (dbs.find(name) == dbs.end()) { + if (not sensor_name_is_sane(name)) { + getDBmutex.unlock(); + return nullptr; + } + leveldb::DB *db; + leveldb::Options options; + options.create_if_missing = true; + leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb."+name, &db); + if (not status.ok()) { + std::cout << status.ToString() << std::endl; + getDBmutex.unlock(); + return nullptr; + } + dbs[name] = db; + getDBmutex.unlock(); + return db; + } else { + getDBmutex.unlock(); + return dbs.at(name); + } +} + + +void closeDB() { + for (auto it = dbs.begin(); it != dbs.end(); ++it) { + std::cout << "Close " << (*it).first << std::endl; + delete (*it).second; + dbs.erase(it); + } +} + diff --git a/datastore-leveldb/src/db.h b/datastore-leveldb/src/db.h new file mode 100644 index 0000000..c7ed25b --- /dev/null +++ b/datastore-leveldb/src/db.h @@ -0,0 +1,10 @@ +#ifndef HAVE_DB_H +#define HAVE_DB_H + +#include <leveldb/db.h> + +leveldb::DB *getDB(std::string& name); + +void closeDB(); + +#endif /*HAVE_DB_H*/ diff --git a/datastore-leveldb/src/main.cpp b/datastore-leveldb/src/main.cpp index f7f2f5d..3b0dd98 100644 --- a/datastore-leveldb/src/main.cpp +++ b/datastore-leveldb/src/main.cpp @@ -1,159 +1,21 @@ -extern "C" { -#include <unistd.h> -#include <signal.h> -#include <string.h> -#include "mongoose.h" -} - -#include <algorithm> -#include <iostream> -#include <iomanip> -#include <string> -#include <sstream> -#include <map> +#include <csignal> #include <forward_list> #include <functional> -#include <mutex> +#include <chrono> +#include <thread> #include <boost/regex.hpp> +#include "mongoose.h" -#include "leveldb/db.h" -#include "leveldb/comparator.h" +#include "db.h" +#include "web.h" -std::forward_list<std::pair<boost::regex,std::function<void(const boost::cmatch&, struct mg_connection *conn)>>> web_handler; +typedef void(web_handler_t)(const boost::cmatch&, struct mg_connection *conn); -static std::map<std::string,leveldb::DB*> dbs; +std::forward_list<std::pair<boost::regex,std::function<web_handler_t>>> web_handler; struct mg_context *ctx; -bool sensor_name_is_sane(std::string& name) { - for (auto it = name.begin(); it != name.end(); ++it) { - if (not (*it >= '0' and *it <= '9' or - *it >= 'A' and *it <= 'Z' or - *it >= 'a' and *it <= 'z')) { - return false; - } - } - return true; -} - -std::mutex getDBmutex; - -leveldb::DB *getDB(std::string& name) { - getDBmutex.lock(); - if (dbs.find(name) == dbs.end()) { - if (not sensor_name_is_sane(name)) { - getDBmutex.unlock(); - return nullptr; - } - leveldb::DB *db; - leveldb::Options options; - options.create_if_missing = true; - leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb."+name, &db); - if (not status.ok()) { - std::cout << status.ToString() << std::endl; - getDBmutex.unlock(); - return nullptr; - } - dbs[name] = db; - getDBmutex.unlock(); - return db; - } else { - getDBmutex.unlock(); - return dbs.at(name); - } -} - -std::string make_key(uint64_t timestamp) { - std::stringstream key; - key << "ts-"; - key << std::setfill('0') << std::setw(20) << timestamp; - return key.str(); -} - -void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn) { - const struct mg_request_info *request_info = mg_get_request_info(conn); - - std::string sensor(match[1].str()); - uint64_t timestamp = std::stoul(match[2].str()); - char buf[1024]; - int count = mg_read(conn, buf, 1024); - std::string value(buf, count); - - leveldb::DB *db = getDB(sensor); - if (db == nullptr) { - std::cout << "failed to get db for " << sensor << std::endl; - mg_printf(conn, "HTTP/1.1 500 Internal Error\r\n\r\n"); - return; - } - - std::cout << "sensor=" << sensor << " key=" << make_key(timestamp) << std::endl; - db->Put(leveldb::WriteOptions(), make_key(timestamp), value); - mg_printf(conn, "HTTP/1.1 200 Value received\r\n\r\n"); -} - -void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn) { - static const leveldb::Comparator *cmp = leveldb::BytewiseComparator(); - - std::string sensor(match[1].str()); - uint64_t start = std::stoul(match[2].str()); - uint64_t end = std::stoul(match[3].str()); - std::string key_start(std::move(make_key(start))); - std::string key_end(std::move(make_key(end))); - - leveldb::DB *db = getDB(sensor); - if (db == nullptr) { - mg_printf(conn, "HTTP/1.1 500 Internal Error\r\n\r\n"); - return; - } - - mg_printf(conn, - "HTTP/1.1 200 Value received\r\n" - "Content-Type: application/json; encoding=UTF-8\r\n" - "\r\n"); - - std::cout << "sensor=" << sensor << " start=" << start << " end=" << end << std::endl; - - std::ostringstream out; - out << "{'sensor':'" << sensor << "', 'error':null, 'data':["; - mg_write(conn, out.str().c_str(), out.str().size()); - - leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); - bool first = true; - std::ostringstream outbuf; - for (it->Seek(key_start); - it->Valid() && cmp->Compare(it->key(), key_end) < 0; - it->Next()) { - const char *key = it->key().data(); - size_t key_size = it->key().size(); - if (key_size != 20+3) { - std::cerr << "invalid key" << std::endl; - return; - } - - int offset = 3; //ts- - // skip zeros in timestamp - while (offset < key_size-1 and *(key+offset) == '0') - offset++; - - if (first) - first = false; - else - outbuf << ','; - - outbuf << '['; - outbuf.write(key+offset, key_size-offset); - outbuf << ','; - outbuf.write(it->value().data(), it->value().size()); - outbuf << ']'; - - mg_write(conn, outbuf.str().c_str(), outbuf.tellp()); - outbuf.seekp(0); - } - mg_printf(conn, "]}\r\n"); - delete it; -} - int begin_request_handler(struct mg_connection *conn) { boost::cmatch match; @@ -169,23 +31,15 @@ int begin_request_handler(struct mg_connection *conn) { return 0; } - struct sigaction old_action; +// Stop the server. void sigint_handler(int s) { - // Stop the server. mg_stop(ctx); - - // Close databases - for (auto it = dbs.begin(); it != dbs.end(); ++it) { - std::cout << "Close " << (*it).first << std::endl; - delete (*it).second; - dbs.erase(it); - } + closeDB(); exit(0); } - int main(int argc, char **argv) { struct mg_callbacks callbacks; const char *options[] = { @@ -196,7 +50,6 @@ int main(int argc, char **argv) { memset(&callbacks, 0, sizeof(callbacks)); callbacks.begin_request = begin_request_handler; - // Routing web_handler.push_front(std::make_pair( boost::regex("/api/value/([a-zA-Z0-9]+)/([0-9]+)"), @@ -214,6 +67,7 @@ int main(int argc, char **argv) { // Start the web server. ctx = mg_start(&callbacks, NULL, options); - pause(); + while (1) + std::this_thread::sleep_for(std::chrono::seconds(1)); return 1; } diff --git a/datastore-leveldb/src/web.cpp b/datastore-leveldb/src/web.cpp new file mode 100644 index 0000000..4898451 --- /dev/null +++ b/datastore-leveldb/src/web.cpp @@ -0,0 +1,95 @@ +#include "web.h" +#include "db.h" +#include <leveldb/comparator.h> +#include <iomanip> + +static inline std::string make_key(uint64_t timestamp) { + std::stringstream key; + key << "ts-"; + key << std::setfill('0') << std::setw(20) << timestamp; + return key.str(); +} + +void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn) { +// const struct mg_request_info *request_info = mg_get_request_info(conn); + + std::string sensor(match[1].str()); + uint64_t timestamp = std::stoul(match[2].str()); + char buf[1024]; + int count = mg_read(conn, buf, 1024); + std::string value(buf, count); + + leveldb::DB *db = getDB(sensor); + if (db == nullptr) { + std::cout << "failed to get db for " << sensor << std::endl; + mg_printf(conn, "HTTP/1.1 500 Internal Error\r\n\r\n"); + return; + } + + std::cout << "sensor=" << sensor << " key=" << make_key(timestamp) << std::endl; + db->Put(leveldb::WriteOptions(), make_key(timestamp), value); + mg_printf(conn, "HTTP/1.1 200 Value received\r\n\r\n"); +} + +void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn) { + static const leveldb::Comparator *cmp = leveldb::BytewiseComparator(); + + std::string sensor(match[1].str()); + uint64_t start = std::stoul(match[2].str()); + uint64_t end = std::stoul(match[3].str()); + std::string key_start(std::move(make_key(start))); + std::string key_end(std::move(make_key(end))); + + leveldb::DB *db = getDB(sensor); + if (db == nullptr) { + mg_printf(conn, "HTTP/1.1 500 Internal Error\r\n\r\n"); + return; + } + + mg_printf(conn, + "HTTP/1.1 200 Value received\r\n" + "Content-Type: application/json; encoding=UTF-8\r\n" + "\r\n"); + + std::cout << "sensor=" << sensor << " start=" << start << " end=" << end << std::endl; + + std::ostringstream out; + out << "{'sensor':'" << sensor << "', 'error':null, 'data':["; + mg_write(conn, out.str().c_str(), out.str().size()); + + leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); + bool first = true; + std::ostringstream outbuf; + for (it->Seek(key_start); + it->Valid() && cmp->Compare(it->key(), key_end) < 0; + it->Next()) { + const char *key = it->key().data(); + size_t key_size = it->key().size(); + if (key_size != 20+3) { + std::cerr << "invalid key" << std::endl; + return; + } + + unsigned int offset = 3; // "ts-" + // skip zeros in timestamp + while (offset < key_size-1 and *(key+offset) == '0') + offset++; + + if (first) + first = false; + else + outbuf << ','; + + outbuf << '['; + outbuf.write(key+offset, key_size-offset); + outbuf << ','; + outbuf.write(it->value().data(), it->value().size()); + outbuf << ']'; + + mg_write(conn, outbuf.str().c_str(), outbuf.tellp()); + outbuf.seekp(0); + } + mg_printf(conn, "]}\r\n"); + delete it; +} + diff --git a/datastore-leveldb/src/web.h b/datastore-leveldb/src/web.h new file mode 100644 index 0000000..1360d3d --- /dev/null +++ b/datastore-leveldb/src/web.h @@ -0,0 +1,18 @@ +#ifndef HAVE_WEB_H +#define HAVE_WEB_H + +extern "C" { +#include "mongoose.h" +} +#ifdef __clang__ +#include <cstddef> +#warning "Using clang" +#define BOOST_NO_CXX11_NUMERIC_LIMITS +#endif +#include <boost/regex.hpp> + +void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn); + +void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn); + +#endif/*HAVE_WEB_H*/ |