From e5342fa4f51b3fcbf931cabeb996183f96ceedbc Mon Sep 17 00:00:00 2001 From: Yves Fischer Date: Thu, 18 Apr 2013 14:35:49 +0200 Subject: leveldb: split up in 3 files --- datastore-leveldb/Makefile | 22 ++++-- datastore-leveldb/README | 1 - datastore-leveldb/src/db.cpp | 55 +++++++++++++ datastore-leveldb/src/db.h | 10 +++ datastore-leveldb/src/main.cpp | 170 +++-------------------------------------- datastore-leveldb/src/web.cpp | 95 +++++++++++++++++++++++ datastore-leveldb/src/web.h | 18 +++++ 7 files changed, 205 insertions(+), 166 deletions(-) create mode 100644 datastore-leveldb/src/db.cpp create mode 100644 datastore-leveldb/src/db.h create mode 100644 datastore-leveldb/src/web.cpp create mode 100644 datastore-leveldb/src/web.h diff --git a/datastore-leveldb/Makefile b/datastore-leveldb/Makefile index 3117123..03d8d43 100644 --- a/datastore-leveldb/Makefile +++ b/datastore-leveldb/Makefile @@ -1,15 +1,23 @@ -CC = gcc -CPP = foo -OBJ= src/main.o mongoose/mongoose.o +OBJ= src/web.o src/db.o src/main.o mongoose/mongoose.o OUT = main -CFLAGS = -CXXFLAGS = -std=c++11 -Imongoose -LDFLAGS = -lleveldb -lboost_regex + +CFLAGS += -Wall -DNO_CGI -DNO_POPEN -DUSE_IPV6 -DNO_SSL +CXXFLAGS += -Wall -std=c++11 -Imongoose +LDFLAGS += -lleveldb -lboost_regex + +ifdef DEBUG +CFLAGS += -g -O0 +CXXFLAGS += -g -O0 +else +CFLAGS += -O3 +CXXFLAGS += -O3 +endif + all: build build: $(OBJ) - $(CC) $(CPPFLAGS) $(OBJ) -o $(OUT) $(LDFLAGS) + $(CXX) $(CPPFLAGS) $(OBJ) -o $(OUT) $(LDFLAGS) clean: rm -rf $(OUT) $(OBJ) diff --git a/datastore-leveldb/README b/datastore-leveldb/README index a531658..f71ada1 100644 --- a/datastore-leveldb/README +++ b/datastore-leveldb/README @@ -1,4 +1,3 @@ apt-get install \ libleveldb-dev \ - libev-dev \ libboost1.49-dev diff --git a/datastore-leveldb/src/db.cpp b/datastore-leveldb/src/db.cpp new file mode 100644 index 0000000..aae378f --- /dev/null +++ b/datastore-leveldb/src/db.cpp @@ -0,0 +1,55 @@ +#include "db.h" + +#include +#include +#include +#include + +static std::unordered_map dbs; +static std::mutex getDBmutex; + +static bool sensor_name_is_sane(std::string& name) { + for (auto it = name.begin(); it != name.end(); ++it) { + if (not ((*it >= '0' and *it <= '9') or + (*it >= 'A' and *it <= 'Z') or + (*it >= 'a' and *it <= 'z'))) { + return false; + } + } + return true; +} + +leveldb::DB *getDB(std::string& name) { + getDBmutex.lock(); + if (dbs.find(name) == dbs.end()) { + if (not sensor_name_is_sane(name)) { + getDBmutex.unlock(); + return nullptr; + } + leveldb::DB *db; + leveldb::Options options; + options.create_if_missing = true; + leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb."+name, &db); + if (not status.ok()) { + std::cout << status.ToString() << std::endl; + getDBmutex.unlock(); + return nullptr; + } + dbs[name] = db; + getDBmutex.unlock(); + return db; + } else { + getDBmutex.unlock(); + return dbs.at(name); + } +} + + +void closeDB() { + for (auto it = dbs.begin(); it != dbs.end(); ++it) { + std::cout << "Close " << (*it).first << std::endl; + delete (*it).second; + dbs.erase(it); + } +} + diff --git a/datastore-leveldb/src/db.h b/datastore-leveldb/src/db.h new file mode 100644 index 0000000..c7ed25b --- /dev/null +++ b/datastore-leveldb/src/db.h @@ -0,0 +1,10 @@ +#ifndef HAVE_DB_H +#define HAVE_DB_H + +#include + +leveldb::DB *getDB(std::string& name); + +void closeDB(); + +#endif /*HAVE_DB_H*/ diff --git a/datastore-leveldb/src/main.cpp b/datastore-leveldb/src/main.cpp index f7f2f5d..3b0dd98 100644 --- a/datastore-leveldb/src/main.cpp +++ b/datastore-leveldb/src/main.cpp @@ -1,159 +1,21 @@ -extern "C" { -#include -#include -#include -#include "mongoose.h" -} - -#include -#include -#include -#include -#include -#include +#include #include #include -#include +#include +#include #include +#include "mongoose.h" -#include "leveldb/db.h" -#include "leveldb/comparator.h" +#include "db.h" +#include "web.h" -std::forward_list>> web_handler; +typedef void(web_handler_t)(const boost::cmatch&, struct mg_connection *conn); -static std::map dbs; +std::forward_list>> web_handler; struct mg_context *ctx; -bool sensor_name_is_sane(std::string& name) { - for (auto it = name.begin(); it != name.end(); ++it) { - if (not (*it >= '0' and *it <= '9' or - *it >= 'A' and *it <= 'Z' or - *it >= 'a' and *it <= 'z')) { - return false; - } - } - return true; -} - -std::mutex getDBmutex; - -leveldb::DB *getDB(std::string& name) { - getDBmutex.lock(); - if (dbs.find(name) == dbs.end()) { - if (not sensor_name_is_sane(name)) { - getDBmutex.unlock(); - return nullptr; - } - leveldb::DB *db; - leveldb::Options options; - options.create_if_missing = true; - leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb."+name, &db); - if (not status.ok()) { - std::cout << status.ToString() << std::endl; - getDBmutex.unlock(); - return nullptr; - } - dbs[name] = db; - getDBmutex.unlock(); - return db; - } else { - getDBmutex.unlock(); - return dbs.at(name); - } -} - -std::string make_key(uint64_t timestamp) { - std::stringstream key; - key << "ts-"; - key << std::setfill('0') << std::setw(20) << timestamp; - return key.str(); -} - -void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn) { - const struct mg_request_info *request_info = mg_get_request_info(conn); - - std::string sensor(match[1].str()); - uint64_t timestamp = std::stoul(match[2].str()); - char buf[1024]; - int count = mg_read(conn, buf, 1024); - std::string value(buf, count); - - leveldb::DB *db = getDB(sensor); - if (db == nullptr) { - std::cout << "failed to get db for " << sensor << std::endl; - mg_printf(conn, "HTTP/1.1 500 Internal Error\r\n\r\n"); - return; - } - - std::cout << "sensor=" << sensor << " key=" << make_key(timestamp) << std::endl; - db->Put(leveldb::WriteOptions(), make_key(timestamp), value); - mg_printf(conn, "HTTP/1.1 200 Value received\r\n\r\n"); -} - -void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn) { - static const leveldb::Comparator *cmp = leveldb::BytewiseComparator(); - - std::string sensor(match[1].str()); - uint64_t start = std::stoul(match[2].str()); - uint64_t end = std::stoul(match[3].str()); - std::string key_start(std::move(make_key(start))); - std::string key_end(std::move(make_key(end))); - - leveldb::DB *db = getDB(sensor); - if (db == nullptr) { - mg_printf(conn, "HTTP/1.1 500 Internal Error\r\n\r\n"); - return; - } - - mg_printf(conn, - "HTTP/1.1 200 Value received\r\n" - "Content-Type: application/json; encoding=UTF-8\r\n" - "\r\n"); - - std::cout << "sensor=" << sensor << " start=" << start << " end=" << end << std::endl; - - std::ostringstream out; - out << "{'sensor':'" << sensor << "', 'error':null, 'data':["; - mg_write(conn, out.str().c_str(), out.str().size()); - - leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); - bool first = true; - std::ostringstream outbuf; - for (it->Seek(key_start); - it->Valid() && cmp->Compare(it->key(), key_end) < 0; - it->Next()) { - const char *key = it->key().data(); - size_t key_size = it->key().size(); - if (key_size != 20+3) { - std::cerr << "invalid key" << std::endl; - return; - } - - int offset = 3; //ts- - // skip zeros in timestamp - while (offset < key_size-1 and *(key+offset) == '0') - offset++; - - if (first) - first = false; - else - outbuf << ','; - - outbuf << '['; - outbuf.write(key+offset, key_size-offset); - outbuf << ','; - outbuf.write(it->value().data(), it->value().size()); - outbuf << ']'; - - mg_write(conn, outbuf.str().c_str(), outbuf.tellp()); - outbuf.seekp(0); - } - mg_printf(conn, "]}\r\n"); - delete it; -} - int begin_request_handler(struct mg_connection *conn) { boost::cmatch match; @@ -169,23 +31,15 @@ int begin_request_handler(struct mg_connection *conn) { return 0; } - struct sigaction old_action; +// Stop the server. void sigint_handler(int s) { - // Stop the server. mg_stop(ctx); - - // Close databases - for (auto it = dbs.begin(); it != dbs.end(); ++it) { - std::cout << "Close " << (*it).first << std::endl; - delete (*it).second; - dbs.erase(it); - } + closeDB(); exit(0); } - int main(int argc, char **argv) { struct mg_callbacks callbacks; const char *options[] = { @@ -196,7 +50,6 @@ int main(int argc, char **argv) { memset(&callbacks, 0, sizeof(callbacks)); callbacks.begin_request = begin_request_handler; - // Routing web_handler.push_front(std::make_pair( boost::regex("/api/value/([a-zA-Z0-9]+)/([0-9]+)"), @@ -214,6 +67,7 @@ int main(int argc, char **argv) { // Start the web server. ctx = mg_start(&callbacks, NULL, options); - pause(); + while (1) + std::this_thread::sleep_for(std::chrono::seconds(1)); return 1; } diff --git a/datastore-leveldb/src/web.cpp b/datastore-leveldb/src/web.cpp new file mode 100644 index 0000000..4898451 --- /dev/null +++ b/datastore-leveldb/src/web.cpp @@ -0,0 +1,95 @@ +#include "web.h" +#include "db.h" +#include +#include + +static inline std::string make_key(uint64_t timestamp) { + std::stringstream key; + key << "ts-"; + key << std::setfill('0') << std::setw(20) << timestamp; + return key.str(); +} + +void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn) { +// const struct mg_request_info *request_info = mg_get_request_info(conn); + + std::string sensor(match[1].str()); + uint64_t timestamp = std::stoul(match[2].str()); + char buf[1024]; + int count = mg_read(conn, buf, 1024); + std::string value(buf, count); + + leveldb::DB *db = getDB(sensor); + if (db == nullptr) { + std::cout << "failed to get db for " << sensor << std::endl; + mg_printf(conn, "HTTP/1.1 500 Internal Error\r\n\r\n"); + return; + } + + std::cout << "sensor=" << sensor << " key=" << make_key(timestamp) << std::endl; + db->Put(leveldb::WriteOptions(), make_key(timestamp), value); + mg_printf(conn, "HTTP/1.1 200 Value received\r\n\r\n"); +} + +void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn) { + static const leveldb::Comparator *cmp = leveldb::BytewiseComparator(); + + std::string sensor(match[1].str()); + uint64_t start = std::stoul(match[2].str()); + uint64_t end = std::stoul(match[3].str()); + std::string key_start(std::move(make_key(start))); + std::string key_end(std::move(make_key(end))); + + leveldb::DB *db = getDB(sensor); + if (db == nullptr) { + mg_printf(conn, "HTTP/1.1 500 Internal Error\r\n\r\n"); + return; + } + + mg_printf(conn, + "HTTP/1.1 200 Value received\r\n" + "Content-Type: application/json; encoding=UTF-8\r\n" + "\r\n"); + + std::cout << "sensor=" << sensor << " start=" << start << " end=" << end << std::endl; + + std::ostringstream out; + out << "{'sensor':'" << sensor << "', 'error':null, 'data':["; + mg_write(conn, out.str().c_str(), out.str().size()); + + leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); + bool first = true; + std::ostringstream outbuf; + for (it->Seek(key_start); + it->Valid() && cmp->Compare(it->key(), key_end) < 0; + it->Next()) { + const char *key = it->key().data(); + size_t key_size = it->key().size(); + if (key_size != 20+3) { + std::cerr << "invalid key" << std::endl; + return; + } + + unsigned int offset = 3; // "ts-" + // skip zeros in timestamp + while (offset < key_size-1 and *(key+offset) == '0') + offset++; + + if (first) + first = false; + else + outbuf << ','; + + outbuf << '['; + outbuf.write(key+offset, key_size-offset); + outbuf << ','; + outbuf.write(it->value().data(), it->value().size()); + outbuf << ']'; + + mg_write(conn, outbuf.str().c_str(), outbuf.tellp()); + outbuf.seekp(0); + } + mg_printf(conn, "]}\r\n"); + delete it; +} + diff --git a/datastore-leveldb/src/web.h b/datastore-leveldb/src/web.h new file mode 100644 index 0000000..1360d3d --- /dev/null +++ b/datastore-leveldb/src/web.h @@ -0,0 +1,18 @@ +#ifndef HAVE_WEB_H +#define HAVE_WEB_H + +extern "C" { +#include "mongoose.h" +} +#ifdef __clang__ +#include +#warning "Using clang" +#define BOOST_NO_CXX11_NUMERIC_LIMITS +#endif +#include + +void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn); + +void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn); + +#endif/*HAVE_WEB_H*/ -- cgit v1.2.1