diff options
author | Yves Fischer <yvesf-git@xapek.org> | 2013-04-18 14:35:49 +0200 |
---|---|---|
committer | Yves Fischer <yvesf-git@xapek.org> | 2013-04-18 14:35:49 +0200 |
commit | e5342fa4f51b3fcbf931cabeb996183f96ceedbc (patch) | |
tree | 430887755433e59a2f2b82705f6290ef23affa7a /datastore-leveldb/src/main.cpp | |
parent | 1ba325a1679acc7692cb9f674cfbe452cf763e12 (diff) | |
download | ebus-alt-e5342fa4f51b3fcbf931cabeb996183f96ceedbc.tar.gz ebus-alt-e5342fa4f51b3fcbf931cabeb996183f96ceedbc.zip |
leveldb: split up in 3 files
Diffstat (limited to 'datastore-leveldb/src/main.cpp')
-rw-r--r-- | datastore-leveldb/src/main.cpp | 170 |
1 files changed, 12 insertions, 158 deletions
diff --git a/datastore-leveldb/src/main.cpp b/datastore-leveldb/src/main.cpp index f7f2f5d..3b0dd98 100644 --- a/datastore-leveldb/src/main.cpp +++ b/datastore-leveldb/src/main.cpp @@ -1,159 +1,21 @@ -extern "C" { -#include <unistd.h> -#include <signal.h> -#include <string.h> -#include "mongoose.h" -} - -#include <algorithm> -#include <iostream> -#include <iomanip> -#include <string> -#include <sstream> -#include <map> +#include <csignal> #include <forward_list> #include <functional> -#include <mutex> +#include <chrono> +#include <thread> #include <boost/regex.hpp> +#include "mongoose.h" -#include "leveldb/db.h" -#include "leveldb/comparator.h" +#include "db.h" +#include "web.h" -std::forward_list<std::pair<boost::regex,std::function<void(const boost::cmatch&, struct mg_connection *conn)>>> web_handler; +typedef void(web_handler_t)(const boost::cmatch&, struct mg_connection *conn); -static std::map<std::string,leveldb::DB*> dbs; +std::forward_list<std::pair<boost::regex,std::function<web_handler_t>>> web_handler; struct mg_context *ctx; -bool sensor_name_is_sane(std::string& name) { - for (auto it = name.begin(); it != name.end(); ++it) { - if (not (*it >= '0' and *it <= '9' or - *it >= 'A' and *it <= 'Z' or - *it >= 'a' and *it <= 'z')) { - return false; - } - } - return true; -} - -std::mutex getDBmutex; - -leveldb::DB *getDB(std::string& name) { - getDBmutex.lock(); - if (dbs.find(name) == dbs.end()) { - if (not sensor_name_is_sane(name)) { - getDBmutex.unlock(); - return nullptr; - } - leveldb::DB *db; - leveldb::Options options; - options.create_if_missing = true; - leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb."+name, &db); - if (not status.ok()) { - std::cout << status.ToString() << std::endl; - getDBmutex.unlock(); - return nullptr; - } - dbs[name] = db; - getDBmutex.unlock(); - return db; - } else { - getDBmutex.unlock(); - return dbs.at(name); - } -} - -std::string make_key(uint64_t timestamp) { - std::stringstream key; - key << "ts-"; - key << std::setfill('0') << std::setw(20) << timestamp; - return key.str(); -} - -void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn) { - const struct mg_request_info *request_info = mg_get_request_info(conn); - - std::string sensor(match[1].str()); - uint64_t timestamp = std::stoul(match[2].str()); - char buf[1024]; - int count = mg_read(conn, buf, 1024); - std::string value(buf, count); - - leveldb::DB *db = getDB(sensor); - if (db == nullptr) { - std::cout << "failed to get db for " << sensor << std::endl; - mg_printf(conn, "HTTP/1.1 500 Internal Error\r\n\r\n"); - return; - } - - std::cout << "sensor=" << sensor << " key=" << make_key(timestamp) << std::endl; - db->Put(leveldb::WriteOptions(), make_key(timestamp), value); - mg_printf(conn, "HTTP/1.1 200 Value received\r\n\r\n"); -} - -void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn) { - static const leveldb::Comparator *cmp = leveldb::BytewiseComparator(); - - std::string sensor(match[1].str()); - uint64_t start = std::stoul(match[2].str()); - uint64_t end = std::stoul(match[3].str()); - std::string key_start(std::move(make_key(start))); - std::string key_end(std::move(make_key(end))); - - leveldb::DB *db = getDB(sensor); - if (db == nullptr) { - mg_printf(conn, "HTTP/1.1 500 Internal Error\r\n\r\n"); - return; - } - - mg_printf(conn, - "HTTP/1.1 200 Value received\r\n" - "Content-Type: application/json; encoding=UTF-8\r\n" - "\r\n"); - - std::cout << "sensor=" << sensor << " start=" << start << " end=" << end << std::endl; - - std::ostringstream out; - out << "{'sensor':'" << sensor << "', 'error':null, 'data':["; - mg_write(conn, out.str().c_str(), out.str().size()); - - leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); - bool first = true; - std::ostringstream outbuf; - for (it->Seek(key_start); - it->Valid() && cmp->Compare(it->key(), key_end) < 0; - it->Next()) { - const char *key = it->key().data(); - size_t key_size = it->key().size(); - if (key_size != 20+3) { - std::cerr << "invalid key" << std::endl; - return; - } - - int offset = 3; //ts- - // skip zeros in timestamp - while (offset < key_size-1 and *(key+offset) == '0') - offset++; - - if (first) - first = false; - else - outbuf << ','; - - outbuf << '['; - outbuf.write(key+offset, key_size-offset); - outbuf << ','; - outbuf.write(it->value().data(), it->value().size()); - outbuf << ']'; - - mg_write(conn, outbuf.str().c_str(), outbuf.tellp()); - outbuf.seekp(0); - } - mg_printf(conn, "]}\r\n"); - delete it; -} - int begin_request_handler(struct mg_connection *conn) { boost::cmatch match; @@ -169,23 +31,15 @@ int begin_request_handler(struct mg_connection *conn) { return 0; } - struct sigaction old_action; +// Stop the server. void sigint_handler(int s) { - // Stop the server. mg_stop(ctx); - - // Close databases - for (auto it = dbs.begin(); it != dbs.end(); ++it) { - std::cout << "Close " << (*it).first << std::endl; - delete (*it).second; - dbs.erase(it); - } + closeDB(); exit(0); } - int main(int argc, char **argv) { struct mg_callbacks callbacks; const char *options[] = { @@ -196,7 +50,6 @@ int main(int argc, char **argv) { memset(&callbacks, 0, sizeof(callbacks)); callbacks.begin_request = begin_request_handler; - // Routing web_handler.push_front(std::make_pair( boost::regex("/api/value/([a-zA-Z0-9]+)/([0-9]+)"), @@ -214,6 +67,7 @@ int main(int argc, char **argv) { // Start the web server. ctx = mg_start(&callbacks, NULL, options); - pause(); + while (1) + std::this_thread::sleep_for(std::chrono::seconds(1)); return 1; } |