extern "C" { #include #include #include #include "mongoose.h" } #include #include #include #include #include #include #include #include #include #include #include "leveldb/db.h" #include "leveldb/comparator.h" std::forward_list>> web_handler; static std::map dbs; struct mg_context *ctx; bool sensor_name_is_sane(std::string& name) { for (auto it = name.begin(); it != name.end(); ++it) { if (not (*it >= '0' and *it <= '9' or *it >= 'A' and *it <= 'Z' or *it >= 'a' and *it <= 'z')) { return false; } } return true; } std::mutex getDBmutex; leveldb::DB *getDB(std::string& name) { getDBmutex.lock(); if (dbs.find(name) == dbs.end()) { if (not sensor_name_is_sane(name)) { getDBmutex.unlock(); return nullptr; } leveldb::DB *db; leveldb::Options options; options.create_if_missing = true; leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb."+name, &db); if (not status.ok()) { std::cout << status.ToString() << std::endl; getDBmutex.unlock(); return nullptr; } dbs[name] = db; getDBmutex.unlock(); return db; } else { getDBmutex.unlock(); return dbs.at(name); } } std::string make_key(uint64_t timestamp) { std::stringstream key; key << "ts-"; key << std::setfill('0') << std::setw(20) << timestamp; return key.str(); } void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn) { const struct mg_request_info *request_info = mg_get_request_info(conn); std::string sensor(match[1].str()); uint64_t timestamp = std::stoul(match[2].str()); char buf[1024]; int count = mg_read(conn, buf, 1024); std::string value(buf, count); leveldb::DB *db = getDB(sensor); if (db == nullptr) { std::cout << "failed to get db for " << sensor << std::endl; mg_printf(conn, "HTTP/1.1 500 Internal Error\r\n\r\n"); return; } std::cout << "sensor=" << sensor << " key=" << make_key(timestamp) << std::endl; db->Put(leveldb::WriteOptions(), make_key(timestamp), value); mg_printf(conn, "HTTP/1.1 200 Value received\r\n\r\n"); } void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn) { static const leveldb::Comparator *cmp = leveldb::BytewiseComparator(); std::string sensor(match[1].str()); uint64_t start = std::stoul(match[2].str()); uint64_t end = std::stoul(match[3].str()); std::string key_start(std::move(make_key(start))); std::string key_end(std::move(make_key(end))); leveldb::DB *db = getDB(sensor); if (db == nullptr) { mg_printf(conn, "HTTP/1.1 500 Internal Error\r\n\r\n"); return; } mg_printf(conn, "HTTP/1.1 200 Value received\r\n" "Content-Type: application/json; encoding=UTF-8\r\n" "\r\n"); std::cout << "sensor=" << sensor << " start=" << start << " end=" << end << std::endl; std::ostringstream out; out << "{'sensor':'" << sensor << "', 'error':null, 'data':["; mg_write(conn, out.str().c_str(), out.str().size()); leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); bool first = true; std::ostringstream outbuf; for (it->Seek(key_start); it->Valid() && cmp->Compare(it->key(), key_end) < 0; it->Next()) { const char *key = it->key().data(); size_t key_size = it->key().size(); if (key_size != 20+3) { std::cerr << "invalid key" << std::endl; return; } int offset = 3; //ts- // skip zeros in timestamp while (offset < key_size-1 and *(key+offset) == '0') offset++; if (first) first = false; else outbuf << ','; outbuf << '['; outbuf.write(key+offset, key_size-offset); outbuf << ','; outbuf.write(it->value().data(), it->value().size()); outbuf << ']'; mg_write(conn, outbuf.str().c_str(), outbuf.tellp()); outbuf.seekp(0); } mg_printf(conn, "]}\r\n"); delete it; } int begin_request_handler(struct mg_connection *conn) { boost::cmatch match; const char *url = mg_get_request_info(conn)->uri; for (auto item = web_handler.begin(); item != web_handler.end(); ++item) { if (boost::regex_match(url, match, (*item).first)) { (*item).second(match, conn); return 1; } } return 0; } struct sigaction old_action; void sigint_handler(int s) { // Stop the server. mg_stop(ctx); // Close databases for (auto it = dbs.begin(); it != dbs.end(); ++it) { std::cout << "Close " << (*it).first << std::endl; delete (*it).second; dbs.erase(it); } exit(0); } int main(int argc, char **argv) { struct mg_callbacks callbacks; const char *options[] = { "listening_ports", "8080", "document_root", "wwwroot", NULL}; memset(&callbacks, 0, sizeof(callbacks)); callbacks.begin_request = begin_request_handler; // Routing web_handler.push_front(std::make_pair( boost::regex("/api/value/([a-zA-Z0-9]+)/([0-9]+)"), web_handle_api_value)); web_handler.push_front(std::make_pair( boost::regex("/api/range/([a-zA-Z0-9]+)/([0-9]+)/([0-9]+)"), web_handle_api_range)); // Signals: handle C-c struct sigaction action; memset(&action, 0, sizeof(action)); action.sa_handler = &sigint_handler; sigaction(SIGINT, &action, &old_action); // Start the web server. ctx = mg_start(&callbacks, NULL, options); pause(); return 1; }