summaryrefslogtreecommitdiff
path: root/datastore-leveldb/src
diff options
context:
space:
mode:
authorYves Fischer <yvesf-git@xapek.org>2013-04-18 14:35:49 +0200
committerYves Fischer <yvesf-git@xapek.org>2013-04-18 14:35:49 +0200
commite5342fa4f51b3fcbf931cabeb996183f96ceedbc (patch)
tree430887755433e59a2f2b82705f6290ef23affa7a /datastore-leveldb/src
parent1ba325a1679acc7692cb9f674cfbe452cf763e12 (diff)
downloadebus-alt-e5342fa4f51b3fcbf931cabeb996183f96ceedbc.tar.gz
ebus-alt-e5342fa4f51b3fcbf931cabeb996183f96ceedbc.zip
leveldb: split up in 3 files
Diffstat (limited to 'datastore-leveldb/src')
-rw-r--r--datastore-leveldb/src/db.cpp55
-rw-r--r--datastore-leveldb/src/db.h10
-rw-r--r--datastore-leveldb/src/main.cpp170
-rw-r--r--datastore-leveldb/src/web.cpp95
-rw-r--r--datastore-leveldb/src/web.h18
5 files changed, 190 insertions, 158 deletions
diff --git a/datastore-leveldb/src/db.cpp b/datastore-leveldb/src/db.cpp
new file mode 100644
index 0000000..aae378f
--- /dev/null
+++ b/datastore-leveldb/src/db.cpp
@@ -0,0 +1,55 @@
+#include "db.h"
+
+#include <iostream>
+#include <mutex>
+#include <sstream>
+#include <unordered_map>
+
+static std::unordered_map<std::string,leveldb::DB*> dbs;
+static std::mutex getDBmutex;
+
+static bool sensor_name_is_sane(std::string& name) {
+ for (auto it = name.begin(); it != name.end(); ++it) {
+ if (not ((*it >= '0' and *it <= '9') or
+ (*it >= 'A' and *it <= 'Z') or
+ (*it >= 'a' and *it <= 'z'))) {
+ return false;
+ }
+ }
+ return true;
+}
+
+leveldb::DB *getDB(std::string& name) {
+ getDBmutex.lock();
+ if (dbs.find(name) == dbs.end()) {
+ if (not sensor_name_is_sane(name)) {
+ getDBmutex.unlock();
+ return nullptr;
+ }
+ leveldb::DB *db;
+ leveldb::Options options;
+ options.create_if_missing = true;
+ leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb."+name, &db);
+ if (not status.ok()) {
+ std::cout << status.ToString() << std::endl;
+ getDBmutex.unlock();
+ return nullptr;
+ }
+ dbs[name] = db;
+ getDBmutex.unlock();
+ return db;
+ } else {
+ getDBmutex.unlock();
+ return dbs.at(name);
+ }
+}
+
+
+void closeDB() {
+ for (auto it = dbs.begin(); it != dbs.end(); ++it) {
+ std::cout << "Close " << (*it).first << std::endl;
+ delete (*it).second;
+ dbs.erase(it);
+ }
+}
+
diff --git a/datastore-leveldb/src/db.h b/datastore-leveldb/src/db.h
new file mode 100644
index 0000000..c7ed25b
--- /dev/null
+++ b/datastore-leveldb/src/db.h
@@ -0,0 +1,10 @@
+#ifndef HAVE_DB_H
+#define HAVE_DB_H
+
+#include <leveldb/db.h>
+
+leveldb::DB *getDB(std::string& name);
+
+void closeDB();
+
+#endif /*HAVE_DB_H*/
diff --git a/datastore-leveldb/src/main.cpp b/datastore-leveldb/src/main.cpp
index f7f2f5d..3b0dd98 100644
--- a/datastore-leveldb/src/main.cpp
+++ b/datastore-leveldb/src/main.cpp
@@ -1,159 +1,21 @@
-extern "C" {
-#include <unistd.h>
-#include <signal.h>
-#include <string.h>
-#include "mongoose.h"
-}
-
-#include <algorithm>
-#include <iostream>
-#include <iomanip>
-#include <string>
-#include <sstream>
-#include <map>
+#include <csignal>
#include <forward_list>
#include <functional>
-#include <mutex>
+#include <chrono>
+#include <thread>
#include <boost/regex.hpp>
+#include "mongoose.h"
-#include "leveldb/db.h"
-#include "leveldb/comparator.h"
+#include "db.h"
+#include "web.h"
-std::forward_list<std::pair<boost::regex,std::function<void(const boost::cmatch&, struct mg_connection *conn)>>> web_handler;
+typedef void(web_handler_t)(const boost::cmatch&, struct mg_connection *conn);
-static std::map<std::string,leveldb::DB*> dbs;
+std::forward_list<std::pair<boost::regex,std::function<web_handler_t>>> web_handler;
struct mg_context *ctx;
-bool sensor_name_is_sane(std::string& name) {
- for (auto it = name.begin(); it != name.end(); ++it) {
- if (not (*it >= '0' and *it <= '9' or
- *it >= 'A' and *it <= 'Z' or
- *it >= 'a' and *it <= 'z')) {
- return false;
- }
- }
- return true;
-}
-
-std::mutex getDBmutex;
-
-leveldb::DB *getDB(std::string& name) {
- getDBmutex.lock();
- if (dbs.find(name) == dbs.end()) {
- if (not sensor_name_is_sane(name)) {
- getDBmutex.unlock();
- return nullptr;
- }
- leveldb::DB *db;
- leveldb::Options options;
- options.create_if_missing = true;
- leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb."+name, &db);
- if (not status.ok()) {
- std::cout << status.ToString() << std::endl;
- getDBmutex.unlock();
- return nullptr;
- }
- dbs[name] = db;
- getDBmutex.unlock();
- return db;
- } else {
- getDBmutex.unlock();
- return dbs.at(name);
- }
-}
-
-std::string make_key(uint64_t timestamp) {
- std::stringstream key;
- key << "ts-";
- key << std::setfill('0') << std::setw(20) << timestamp;
- return key.str();
-}
-
-void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn) {
- const struct mg_request_info *request_info = mg_get_request_info(conn);
-
- std::string sensor(match[1].str());
- uint64_t timestamp = std::stoul(match[2].str());
- char buf[1024];
- int count = mg_read(conn, buf, 1024);
- std::string value(buf, count);
-
- leveldb::DB *db = getDB(sensor);
- if (db == nullptr) {
- std::cout << "failed to get db for " << sensor << std::endl;
- mg_printf(conn, "HTTP/1.1 500 Internal Error\r\n\r\n");
- return;
- }
-
- std::cout << "sensor=" << sensor << " key=" << make_key(timestamp) << std::endl;
- db->Put(leveldb::WriteOptions(), make_key(timestamp), value);
- mg_printf(conn, "HTTP/1.1 200 Value received\r\n\r\n");
-}
-
-void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn) {
- static const leveldb::Comparator *cmp = leveldb::BytewiseComparator();
-
- std::string sensor(match[1].str());
- uint64_t start = std::stoul(match[2].str());
- uint64_t end = std::stoul(match[3].str());
- std::string key_start(std::move(make_key(start)));
- std::string key_end(std::move(make_key(end)));
-
- leveldb::DB *db = getDB(sensor);
- if (db == nullptr) {
- mg_printf(conn, "HTTP/1.1 500 Internal Error\r\n\r\n");
- return;
- }
-
- mg_printf(conn,
- "HTTP/1.1 200 Value received\r\n"
- "Content-Type: application/json; encoding=UTF-8\r\n"
- "\r\n");
-
- std::cout << "sensor=" << sensor << " start=" << start << " end=" << end << std::endl;
-
- std::ostringstream out;
- out << "{'sensor':'" << sensor << "', 'error':null, 'data':[";
- mg_write(conn, out.str().c_str(), out.str().size());
-
- leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions());
- bool first = true;
- std::ostringstream outbuf;
- for (it->Seek(key_start);
- it->Valid() && cmp->Compare(it->key(), key_end) < 0;
- it->Next()) {
- const char *key = it->key().data();
- size_t key_size = it->key().size();
- if (key_size != 20+3) {
- std::cerr << "invalid key" << std::endl;
- return;
- }
-
- int offset = 3; //ts-
- // skip zeros in timestamp
- while (offset < key_size-1 and *(key+offset) == '0')
- offset++;
-
- if (first)
- first = false;
- else
- outbuf << ',';
-
- outbuf << '[';
- outbuf.write(key+offset, key_size-offset);
- outbuf << ',';
- outbuf.write(it->value().data(), it->value().size());
- outbuf << ']';
-
- mg_write(conn, outbuf.str().c_str(), outbuf.tellp());
- outbuf.seekp(0);
- }
- mg_printf(conn, "]}\r\n");
- delete it;
-}
-
int begin_request_handler(struct mg_connection *conn) {
boost::cmatch match;
@@ -169,23 +31,15 @@ int begin_request_handler(struct mg_connection *conn) {
return 0;
}
-
struct sigaction old_action;
+// Stop the server.
void sigint_handler(int s) {
- // Stop the server.
mg_stop(ctx);
-
- // Close databases
- for (auto it = dbs.begin(); it != dbs.end(); ++it) {
- std::cout << "Close " << (*it).first << std::endl;
- delete (*it).second;
- dbs.erase(it);
- }
+ closeDB();
exit(0);
}
-
int main(int argc, char **argv) {
struct mg_callbacks callbacks;
const char *options[] = {
@@ -196,7 +50,6 @@ int main(int argc, char **argv) {
memset(&callbacks, 0, sizeof(callbacks));
callbacks.begin_request = begin_request_handler;
-
// Routing
web_handler.push_front(std::make_pair(
boost::regex("/api/value/([a-zA-Z0-9]+)/([0-9]+)"),
@@ -214,6 +67,7 @@ int main(int argc, char **argv) {
// Start the web server.
ctx = mg_start(&callbacks, NULL, options);
- pause();
+ while (1)
+ std::this_thread::sleep_for(std::chrono::seconds(1));
return 1;
}
diff --git a/datastore-leveldb/src/web.cpp b/datastore-leveldb/src/web.cpp
new file mode 100644
index 0000000..4898451
--- /dev/null
+++ b/datastore-leveldb/src/web.cpp
@@ -0,0 +1,95 @@
+#include "web.h"
+#include "db.h"
+#include <leveldb/comparator.h>
+#include <iomanip>
+
+static inline std::string make_key(uint64_t timestamp) {
+ std::stringstream key;
+ key << "ts-";
+ key << std::setfill('0') << std::setw(20) << timestamp;
+ return key.str();
+}
+
+void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn) {
+// const struct mg_request_info *request_info = mg_get_request_info(conn);
+
+ std::string sensor(match[1].str());
+ uint64_t timestamp = std::stoul(match[2].str());
+ char buf[1024];
+ int count = mg_read(conn, buf, 1024);
+ std::string value(buf, count);
+
+ leveldb::DB *db = getDB(sensor);
+ if (db == nullptr) {
+ std::cout << "failed to get db for " << sensor << std::endl;
+ mg_printf(conn, "HTTP/1.1 500 Internal Error\r\n\r\n");
+ return;
+ }
+
+ std::cout << "sensor=" << sensor << " key=" << make_key(timestamp) << std::endl;
+ db->Put(leveldb::WriteOptions(), make_key(timestamp), value);
+ mg_printf(conn, "HTTP/1.1 200 Value received\r\n\r\n");
+}
+
+void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn) {
+ static const leveldb::Comparator *cmp = leveldb::BytewiseComparator();
+
+ std::string sensor(match[1].str());
+ uint64_t start = std::stoul(match[2].str());
+ uint64_t end = std::stoul(match[3].str());
+ std::string key_start(std::move(make_key(start)));
+ std::string key_end(std::move(make_key(end)));
+
+ leveldb::DB *db = getDB(sensor);
+ if (db == nullptr) {
+ mg_printf(conn, "HTTP/1.1 500 Internal Error\r\n\r\n");
+ return;
+ }
+
+ mg_printf(conn,
+ "HTTP/1.1 200 Value received\r\n"
+ "Content-Type: application/json; encoding=UTF-8\r\n"
+ "\r\n");
+
+ std::cout << "sensor=" << sensor << " start=" << start << " end=" << end << std::endl;
+
+ std::ostringstream out;
+ out << "{'sensor':'" << sensor << "', 'error':null, 'data':[";
+ mg_write(conn, out.str().c_str(), out.str().size());
+
+ leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions());
+ bool first = true;
+ std::ostringstream outbuf;
+ for (it->Seek(key_start);
+ it->Valid() && cmp->Compare(it->key(), key_end) < 0;
+ it->Next()) {
+ const char *key = it->key().data();
+ size_t key_size = it->key().size();
+ if (key_size != 20+3) {
+ std::cerr << "invalid key" << std::endl;
+ return;
+ }
+
+ unsigned int offset = 3; // "ts-"
+ // skip zeros in timestamp
+ while (offset < key_size-1 and *(key+offset) == '0')
+ offset++;
+
+ if (first)
+ first = false;
+ else
+ outbuf << ',';
+
+ outbuf << '[';
+ outbuf.write(key+offset, key_size-offset);
+ outbuf << ',';
+ outbuf.write(it->value().data(), it->value().size());
+ outbuf << ']';
+
+ mg_write(conn, outbuf.str().c_str(), outbuf.tellp());
+ outbuf.seekp(0);
+ }
+ mg_printf(conn, "]}\r\n");
+ delete it;
+}
+
diff --git a/datastore-leveldb/src/web.h b/datastore-leveldb/src/web.h
new file mode 100644
index 0000000..1360d3d
--- /dev/null
+++ b/datastore-leveldb/src/web.h
@@ -0,0 +1,18 @@
+#ifndef HAVE_WEB_H
+#define HAVE_WEB_H
+
+extern "C" {
+#include "mongoose.h"
+}
+#ifdef __clang__
+#include <cstddef>
+#warning "Using clang"
+#define BOOST_NO_CXX11_NUMERIC_LIMITS
+#endif
+#include <boost/regex.hpp>
+
+void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn);
+
+void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn);
+
+#endif/*HAVE_WEB_H*/