diff options
Diffstat (limited to 'datastore-leveldb/src/main.cpp')
-rw-r--r-- | datastore-leveldb/src/main.cpp | 210 |
1 files changed, 98 insertions, 112 deletions
diff --git a/datastore-leveldb/src/main.cpp b/datastore-leveldb/src/main.cpp index 9f8142e..dab88cd 100644 --- a/datastore-leveldb/src/main.cpp +++ b/datastore-leveldb/src/main.cpp @@ -1,14 +1,9 @@ extern "C" { -#include "server_eh.h" #include <unistd.h> #include <signal.h> -#include <ev.h> #include <string.h> -#include <netinet/in.h> -#include <sys/sendfile.h> -#include <sys/stat.h> -#include <fcntl.h> #include <magic.h> +#include "mongoose.h" } #include <algorithm> @@ -19,18 +14,20 @@ extern "C" { #include <map> #include <forward_list> #include <functional> +#include <mutex> #include <boost/regex.hpp> #include "leveldb/db.h" #include "leveldb/comparator.h" -std::forward_list<std::pair<boost::regex,std::function<void(const boost::cmatch&, struct http_request*, const int)>>> web_handler; +std::forward_list<std::pair<boost::regex,std::function<void(const boost::cmatch&, struct mg_connection *conn)>>> web_handler; static std::map<std::string,leveldb::DB*> dbs; static magic_t magic_cookie; +struct mg_context *ctx; bool sensor_name_is_sane(std::string& name) { for (auto it = name.begin(); it != name.end(); ++it) { @@ -43,22 +40,31 @@ bool sensor_name_is_sane(std::string& name) { return true; } +std::mutex getDBmutex; leveldb::DB *getDB(std::string& name) { - if (not sensor_name_is_sane(name)) { - return nullptr; - } + getDBmutex.lock(); if (dbs.find(name) == dbs.end()) { + if (not sensor_name_is_sane(name)) { + getDBmutex.unlock(); + return nullptr; + } leveldb::DB *db; leveldb::Options options; options.create_if_missing = true; leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb."+name, &db); if (not status.ok()) { + std::cout << status.ToString() << std::endl; + getDBmutex.unlock(); return nullptr; } dbs[name] = db; + getDBmutex.unlock(); + return db; + } else { + getDBmutex.unlock(); + return dbs.at(name); } - return dbs.at(name); } // see http_parser.h @@ -81,25 +87,6 @@ static inline void http_ok(int fd, const char *content_type, const char *extra_h #undef write_const } - -static inline void send_file(const char *path, int fd_out) { - int fd_in; - struct stat stat_buf; - - const char *mime_type = magic_file(magic_cookie, path); - if (mime_type == NULL) { - mime_type = "application/octet-stream"; - } - - http_ok(fd_out, mime_type, nullptr); - - fd_in = open(path, O_RDONLY); - fstat(fd_in, &stat_buf); - std::cerr << "GET " << path << std::endl; - sendfile(fd_out, fd_in, 0, stat_buf.st_size); - close(fd_in); -} - std::string make_key(uint64_t timestamp) { std::stringstream key; key << "ts-"; @@ -107,32 +94,30 @@ std::string make_key(uint64_t timestamp) { return key.str(); } -void web_handle_api_value(const boost::cmatch &match, const struct http_request *request, const int fd) { - const char* reply_OK = "HTTP/1.1 200 Value received\r\n\r\n"; - const char* reply_ERR = "HTTP/1.1 500 Internal Error\r\n\r\n"; - +void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn) { + const struct mg_request_info *request_info = mg_get_request_info(conn); + std::string sensor(match[1].str()); uint64_t timestamp = std::stoul(match[2].str()); - std::string value(request->body); + char buf[1024]; + int count = mg_read(conn, buf, 1024); + std::string value(buf, count); leveldb::DB *db = getDB(sensor); if (db == nullptr) { - write(fd, reply_ERR, strlen(reply_ERR)); + std::cout << "failed to get db for " << sensor << std::endl; + mg_printf(conn, "HTTP/1.1 500 Internal Error\r\n\r\n"); return; } std::cout << "sensor=" << sensor << " key=" << make_key(timestamp) << std::endl; db->Put(leveldb::WriteOptions(), make_key(timestamp), value); - write(fd, reply_OK, strlen(reply_OK)); + mg_printf(conn, "HTTP/1.1 200 Value received\r\n\r\n"); } -void web_handle_api_range(const boost::cmatch &match, const struct http_request *request, const int fd) { +void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn) { static const leveldb::Comparator *cmp = leveldb::BytewiseComparator(); - const char* reply_OK = "HTTP/1.1 200 Value received\r\n"; - const char* reply_ERR = "HTTP/1.1 500 Internal Error\r\n\r\n"; - const char* content_type = "Content-Type: application/json; encoding=UTF-8\r\n"; - std::string sensor(match[1].str()); uint64_t start = std::stoul(match[2].str()); uint64_t end = std::stoul(match[3].str()); @@ -141,113 +126,101 @@ void web_handle_api_range(const boost::cmatch &match, const struct http_request leveldb::DB *db = getDB(sensor); if (db == nullptr) { - write(fd, reply_ERR, strlen(reply_ERR)); + mg_printf(conn, "HTTP/1.1 500 Internal Error\r\n\r\n"); return; } - http_ok(fd, "application/json; encoding=UTF-8", nullptr); + mg_printf(conn, + "HTTP/1.1 200 Value received\r\n" + "Content-Type: application/json; encoding=UTF-8\r\n" + "\r\n"); std::cout << "sensor=" << sensor << " start=" << start << " end=" << end << std::endl; std::ostringstream out; out << "{'sensor':'" << sensor << "', 'error':null, 'data':["; - write(fd, out.str().c_str(), out.str().size()); + mg_write(conn, out.str().c_str(), out.str().size()); - usleep(2000000); leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); bool first = true; + std::ostringstream outbuf; for (it->Seek(key_start); - it->Valid() && cmp->Compare(it->key(), key_end) < 0; - it->Next()) { - if (it->key().size() != 20+3) { + it->Valid() && cmp->Compare(it->key(), key_end) < 0; + it->Next()) { + const char *key = it->key().data(); + size_t key_size = it->key().size(); + if (key_size != 20+3) { std::cerr << "invalid key" << std::endl; return; } - uint64_t timestamp = std::stoul(it->key().data()+3); - std::string s_timestamp = std::to_string(timestamp); - if (first) { + int offset = 3; //ts- + // skip zeros in timestamp + while (offset < key_size-1 and *(key+offset) == '0') + offset++; + + if (first) first = false; - write(fd, "[", 1); - } else { - write(fd, ",[", 2); - } - write(fd, s_timestamp.c_str(), s_timestamp.size()); - write(fd, ",'", 2); - write(fd, it->value().data(), it->value().size()); - write(fd, "']", 2); + else + outbuf << ','; + + outbuf << '['; + outbuf.write(key+offset, key_size-offset); + outbuf << ','; + outbuf.write(it->value().data(), it->value().size()); + outbuf << ']'; + + mg_write(conn, outbuf.str().c_str(), outbuf.tellp()); + outbuf.seekp(0); } - + mg_printf(conn, "]}\r\n"); delete it; } -void handle_request(struct http_request *request, int fd) { +int begin_request_handler(struct mg_connection *conn) { boost::cmatch match; - const char *error = "HTTP/1.1 404 Not Found\r\n\r\n"; - if (request->url == NULL) { // happens only under high load, why? - std::cerr << "url is null" << std::endl; - goto handled; - } + const char *url = mg_get_request_info(conn)->uri; for (auto item = web_handler.begin(); item != web_handler.end(); ++item) { - if (boost::regex_match(request->url, match, (*item).first)) { - (*item).second(match, request, fd); - goto handled; + if (boost::regex_match(url, match, (*item).first)) { + (*item).second(match, conn); + return 1; } } - write(fd, error, strlen(error)); - - handled: - close(fd); + return 0; } -static struct http_server server; void sigint_handler(int s) { - struct ev_loop *loop = server.loop; - ev_io_stop(EV_A_ server.ev_accept); + // Stop the server. + mg_stop(ctx); + + // Close databases + for (auto it = dbs.begin(); it != dbs.end(); ++it) { + std::cout << "Close " << (*it).first << std::endl; + delete (*it).second; + dbs.erase(it); + } + exit(0); } + int main(int argc, char **argv) { - // configure server structures and desired listen address - struct sockaddr_in listen_addr; - memset(&listen_addr, 0, sizeof(listen_addr)); - listen_addr.sin_family = AF_INET; - listen_addr.sin_addr.s_addr = INADDR_ANY; - listen_addr.sin_port = htons(5000); - server.listen_addr = &listen_addr; - server.handle_request = handle_request; - - // ignore SIGPIPE - struct sigaction on_sigpipe; - on_sigpipe.sa_handler = SIG_IGN; - sigemptyset(&on_sigpipe.sa_mask); - sigaction(SIGPIPE, &on_sigpipe, NULL); - - // handle C-c - struct sigaction on_sigint; - on_sigint.sa_handler = sigint_handler; - sigemptyset(&on_sigint.sa_mask); - on_sigint.sa_flags = 0; - sigaction(SIGINT, &on_sigint, NULL); + struct mg_callbacks callbacks; + const char *options[] = { + "listening_ports", "8080", + "document_root", "wwwroot", + NULL}; + + memset(&callbacks, 0, sizeof(callbacks)); + callbacks.begin_request = begin_request_handler; + // Routing web_handler.push_front(std::make_pair( - boost::regex("/"), - [](const boost::cmatch &match, const struct http_request *request, const int fd){ - send_file("index.html", fd); - })); - web_handler.push_front(std::make_pair( - boost::regex("/public/(.+)"), - [](const boost::cmatch &match, const struct http_request *request, const int fd){ - /// XXX possible directory traversion - std::string path("public/" + match[1].str()); - send_file(path.c_str(), fd); - })); - web_handler.push_front(std::make_pair( boost::regex("/api/value/([a-zA-Z0-9]+)/([0-9]+)"), web_handle_api_value)); web_handler.push_front(std::make_pair( @@ -257,6 +230,19 @@ int main(int argc, char **argv) { magic_cookie = magic_open(MAGIC_MIME_TYPE); magic_load(magic_cookie, NULL); - // start the server - return http_server_loop(&server); + + // Signals: handle C-c + struct sigaction on_sigint; + on_sigint.sa_handler = sigint_handler; + sigemptyset(&on_sigint.sa_mask); + on_sigint.sa_flags = 0; + + sigaction(SIGINT, &on_sigint, NULL); + + + // Start the web server. + ctx = mg_start(&callbacks, NULL, options); + + while (1) sleep(1); + return 1; } |