diff options
Diffstat (limited to 'datastore-leveldb/src/web.cpp')
-rw-r--r-- | datastore-leveldb/src/web.cpp | 179 |
1 files changed, 179 insertions, 0 deletions
diff --git a/datastore-leveldb/src/web.cpp b/datastore-leveldb/src/web.cpp new file mode 100644 index 0000000..443e782 --- /dev/null +++ b/datastore-leveldb/src/web.cpp @@ -0,0 +1,179 @@ +#include "web.h" +#include "db.h" +#include <algorithm> +#include <chrono> +#include <leveldb/comparator.h> + +static const leveldb::Comparator *cmp = leveldb::BytewiseComparator(); + +static inline uint64_t now() { + return std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::high_resolution_clock::now().time_since_epoch()).count(); +} + +// uint64_t is long +inline void strToL(unsigned long int *l, std::string s) { + *l = std::stoul(s); +} +// uint64_t is long long +inline void strToL(unsigned long long int *l, std::string s) { + *l = std::stoull(s); +} + +static inline void reply_header(struct mg_connection *conn, const bool ok, const char *msg, const char *extra = "") { + mg_printf(conn, + "HTTP/1.1 %s %s\r\n" + "%s", + (ok) ? "200" : "500", + msg, + extra); +} + +static inline bool parse_key(leveldb::Slice &&key, uint64_t *value) { + if (key.size() != 20+3) + return false; + strToL(value, key.data()+3); + return true; +} + +const boost::regex web_handle_api_value_R( + "/api/value/([a-zA-Z0-9\\.]+)"); +void web_handle_api_value(const boost::cmatch &match, struct mg_connection *conn) { + std::string sensor(match[1].str()); + uint64_t timestamp = now(); + char buf[1024]; + int bytesRead = mg_read(conn, buf, 1024); + std::string value(buf, bytesRead); + + if (db_insert(sensor, timestamp, value)) { + reply_header(conn, true, "OK Value received", "\r\n"); + } else { + reply_header(conn, false, "Internal Error", "\r\n"); + } +} + +const boost::regex web_handle_api_value_timestamp_R( + "/api/value/([a-zA-Z0-9\\.]+)/([0-9]+)"); +void web_handle_api_value_timestamp(const boost::cmatch &match, struct mg_connection *conn) { + std::string sensor(match[1].str()); + uint64_t timestamp; strToL(×tamp, match[2].str()); + char buf[1024]; + int bytesRead = mg_read(conn, buf, 1024); + std::string value(buf, bytesRead); + + if (db_insert(sensor, timestamp, value)) { + reply_header(conn, true, "OK Value received", "\r\n"); + } else { + reply_header(conn, false, "Internal Error", "\r\n"); + } +} + +static inline void print_json_tuple(struct mg_connection *conn, + std::ostringstream &outbuf, + leveldb::Slice &&key, + leveldb::Slice &&value) { + + size_t key_size = key.size(); + if (key_size != 20+3) { + std::cerr << "invalid key" << std::endl; + return; + } + + unsigned int offset = 3; // "ts-" + // skip zeros in timestamp + while (offset < key_size-1 and *(key.data()+offset) == '0') + offset++; + + outbuf << '['; + outbuf.write(key.data()+offset, key_size-offset); + outbuf << ','; + outbuf.write(value.data(), value.size()); + outbuf << ']'; + + mg_write(conn, outbuf.str().c_str(), outbuf.tellp()); +} + + +const boost::regex web_handle_api_range_R( + "/api/range/([a-zA-Z0-9\\.]+)/([0-9]+)/([0-9]+)"); +void web_handle_api_range(const boost::cmatch &match, struct mg_connection *conn) { + std::string sensor(match[1].str()); + uint64_t start; strToL(&start, match[2].str()); + uint64_t end; strToL(&end, match[3].str()); + std::string key_start(std::move(db_make_key(start))); + std::string key_end(std::move(db_make_key(end))); + + leveldb::DB *db = db_get(sensor); + if (db == nullptr) { + reply_header(conn, false, "Internal Error", "\r\n"); + return; + } + + reply_header(conn, true, "OK", "Content-Type: application/json\r\n\r\n"); + + std::ostringstream out; + out << "{\"sensor\":\"" << sensor << "\", " + << "\"error\":null, " + << "\"data\":["; + mg_write(conn, out.str().c_str(), out.str().size()); + + leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); + bool first = true; + std::ostringstream outbuf; + for (it->Seek(key_start); + it->Valid() && cmp->Compare(it->key(), key_end) < 0; + it->Next()) { + if (first) first = false; + else outbuf << ","; + print_json_tuple(conn, outbuf, it->key(), it->value()); + outbuf.seekp(0); + } + mg_printf(conn, "]}\r\n"); + delete it; +} + +const boost::regex web_handle_api_range_size_R( + "/api/range/([a-zA-Z0-9\\.]+)/([0-9]+)/([0-9]+)/([0-9]+)"); +void web_handle_api_range_size(const boost::cmatch &match, struct mg_connection *conn) { + std::string sensor(match[1].str()); + uint64_t start; strToL(&start, match[2].str()); + uint64_t end; strToL(&end, match[3].str()); + uint64_t size; strToL(&size, match[4].str()); + + leveldb::DB *db = db_get(sensor); + if (db == nullptr) { + reply_header(conn, false, "Internal Error", "\r\n"); + return; + } + + reply_header(conn, true, "OK", "Content-Type: application/json\r\n\r\n"); + + uint64_t step = std::max((uint64_t)1, (end-start) / size); + + std::ostringstream out; + out << "{\"sensor\":\"" << sensor << "\", " + << "\"error\":null, " + << "\"size\":" << size << ", " + << "\"step\":" << step << ", " + << "\"data\":["; + mg_write(conn, out.str().c_str(), out.str().size()); + + bool first = true; + uint64_t actual_size = 0; + std::ostringstream outbuf; + leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); + for (uint64_t key = start; key < end; key += step) { + it->Seek(db_make_key(key)); + if (!it->Valid()) continue; + + if (first) first = false; + else outbuf << ","; + + print_json_tuple(conn, outbuf, it->key(), it->value()); + outbuf.seekp(0); + actual_size++; + } + outbuf << "], \"actual_size\":" << actual_size << "}\r\n"; + mg_write(conn, outbuf.str().c_str(), outbuf.tellp()); + delete it; +} |