diff options
author | Yves Fischer <yvesf-git@xapek.org> | 2015-06-07 00:50:45 +0200 |
---|---|---|
committer | Yves Fischer <yvesf-git@xapek.org> | 2015-06-07 00:50:45 +0200 |
commit | bc8bf89edc3e24a756c1c9628087a5ae6f5af80a (patch) | |
tree | e6861d07d09e839f3222ca862747da165addc746 /src/main/scala/org/xapek/influxdb | |
parent | 8a17a2f6d77cb6bd3e5629d3944b30f449cb1237 (diff) | |
download | influxdb-tools-bc8bf89edc3e24a756c1c9628087a5ae6f5af80a.tar.gz influxdb-tools-bc8bf89edc3e24a756c1c9628087a5ae6f5af80a.zip |
first request worked. no result transformation yet
Diffstat (limited to 'src/main/scala/org/xapek/influxdb')
-rw-r--r-- | src/main/scala/org/xapek/influxdb/Influxdb.scala | 285 | ||||
-rw-r--r-- | src/main/scala/org/xapek/influxdb/Main.scala | 26 | ||||
-rw-r--r-- | src/main/scala/org/xapek/influxdb/Sized.scala | 1 |
3 files changed, 198 insertions, 114 deletions
diff --git a/src/main/scala/org/xapek/influxdb/Influxdb.scala b/src/main/scala/org/xapek/influxdb/Influxdb.scala index 9ed778c..f5d33f4 100644 --- a/src/main/scala/org/xapek/influxdb/Influxdb.scala +++ b/src/main/scala/org/xapek/influxdb/Influxdb.scala @@ -1,16 +1,28 @@ package org.xapek.influxdb -trait InfluxValue { - type JavaT - def toString(): String - def toJava: JavaT +import java.util.Date +import org.apache.http.client.methods.HttpGet +import org.apache.http.client.utils.URIBuilder +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils +import org.slf4j.LoggerFactory +import org.json4s._ +import org.json4s.jackson.JsonMethods._ +import com.typesafe.scalalogging.Logger +import scala.util.parsing.json.JSONObject + +abstract class QueryBuilderWithSelectAndFrom[L <: Nat, IdentT <% InfluxIdentifier]( + val columns: Sized[IdentT, L], + val table: IdentT) { + def toString: String } -class QueryBuilderSelect[L <: Nat, IdentT <% InfluxIdentifier](val columns: Sized[IdentT, L]) { - def apply(column: IdentT): QueryBuilderSelect[Succ[L], IdentT] = +class QueryBuilderSelect[L <: Nat, IdentT <% InfluxIdentifier]( + val columns: Sized[IdentT, L]) { + def apply(column: IdentT) = new QueryBuilderSelect(SizedOp.add(columns, column)) - def FROM(table: IdentT) = new QueryBuilderFrom(columns, table) + def FROM(table: IdentT) = new QueryBuilderFrom(this, table) implicit def influxIdentifier(s: String): InfluxIdentifier = { new InfluxIdentifier(s) @@ -19,64 +31,38 @@ class QueryBuilderSelect[L <: Nat, IdentT <% InfluxIdentifier](val columns: Size override def toString() = "SELECT " + columns.unsized.mkString(", ") } -class QueryBuilderFrom[L <: Nat, IdentT <% InfluxIdentifier](columns: Sized[IdentT, L], val table: IdentT) - extends QueryBuilderSelect(columns) { +class QueryBuilderFrom[L <: Nat, IdentT <% InfluxIdentifier]( + select: QueryBuilderSelect[L, IdentT], + table: IdentT) + extends QueryBuilderWithSelectAndFrom[L, IdentT](select.columns, table) { def WHERE[WhereT <: Expr](where: WhereT) = - new QueryBuilderWhere(columns, table, where) - - override def toString() = super.toString() + " FROM " + table -} - -class QueryBuilderWhere[L <: Nat, IdentT <% InfluxIdentifier, WhereT <: Expr](columns: Sized[IdentT, L], - table: IdentT, - val where: WhereT) - extends QueryBuilderFrom(columns, table) { - override def toString() = super.toString() + " WHERE " + where -} -//class QueryBuilder1(select: Seq[InfluxIdentifier]) { -// def FROM(table: String): QueryBuilder2 = { -// new QueryBuilder2(select, table) -// } -//} - -//class QueryBuilder2(select: Seq[InfluxIdentifier], from: String) extends QueryBuilder1(select) { -// def WHERE[E <: Expr](eq: E): QueryBuilder3[E] = { -// new QueryBuilder3(eq, select, from) -// } -// -// override def toString(): String = { -// "SELECT " + select.mkString(", ") + " FROM " + from -// } -//} - -//class QueryBuilder3[WhereT <: Expr](whereExpr: WhereT, select: Seq[InfluxIdentifier], from: String) extends QueryBuilder2(select, from) { -// def &&[E2 <: Expr](other: E2): QueryBuilder3[AndExpr[WhereT, E2]] = { -// new QueryBuilder3(new AndExpr(whereExpr, other), select, from) -// } -// -// def ||[E2 <: Expr](other: E2): QueryBuilder3[OrExpr[WhereT, E2]] = { -// new QueryBuilder3(new OrExpr(whereExpr, other), select, from) -// } -// -// def GROUP_BY[ColumnT <% InfluxIdentifier](column: ColumnT): QueryBuilder4[WhereT] = { -// new QueryBuilder4(List(column), whereExpr, select, from) -// } -// -// override def toString(): String = { -// super.toString() + " WHERE " + whereExpr.toString() -// } -//} -// -//class QueryBuilder4[WhereT <: Expr](groupBy: Seq[InfluxIdentifier], whereExpr: WhereT, select: Seq[InfluxIdentifier], from: String) extends QueryBuilder3(whereExpr, select, from) { -// def <<=[ColumnT <% InfluxIdentifier](column: ColumnT): QueryBuilder4[WhereT] = { -// val x: InfluxIdentifier = column -// new QueryBuilder4(groupBy :+ x, whereExpr, select, from) -// } -// -// override def toString(): String = { -// super.toString() + " GROUP BY " + groupBy.mkString(", ") -// } -//} + new QueryBuilderWhere(this, where) + + override def toString() = select.toString() + " FROM " + table +} + +class QueryBuilderWhere[L <: Nat, IdentT <% InfluxIdentifier, WhereT <: Expr]( + val from: QueryBuilderFrom[L, IdentT], + val where: WhereT) + extends QueryBuilderWithSelectAndFrom[L, IdentT](from.columns, from.table) { + + def GROUP_BY(column: IdentT) = + new QueryBuilderGroupBy(this, List(column)) + + override def toString() = from.toString() + " WHERE " + where +} + +class QueryBuilderGroupBy[L <: Nat, IdentT <% InfluxIdentifier, WhereT <: Expr]( + where: QueryBuilderWhere[L, IdentT, WhereT], + var groupBy: Seq[IdentT]) + extends QueryBuilderWithSelectAndFrom[L, IdentT](where.columns, where.table) { + + def apply(column: IdentT): QueryBuilderGroupBy[L, IdentT, WhereT] = { + groupBy = groupBy ++ List(column) + this + } + override def toString() = where.toString() + " GROUP BY " + groupBy.mkString(", ") +} protected trait E { def &&[E2 <: Expr](other: E2): AndExpr[this.type, E2] @@ -103,29 +89,29 @@ class ValueExpr[ValueT <: InfluxValue](val value: ValueT) extends Expr { override def toString(): String = value.toString } -abstract class BinaryOp[E1 <: E, E2 <: E](val op1: E1, val op2: E2) extends Expr { - def str: String +final object BinaryOp { + val OP_EQ = "=" + val OP_NE = "!=" + val OP_GT = ">" + val OP_GE = ">=" + val OP_LT = "<" + val OP_LE = "<=" +} +class BinaryOp[E1 <: E, E2 <: E](val str: String, val op1: E1, val op2: E2) extends Expr { override def toString(): String = { "(" + op1.toString() + " " + str + " " + op2.toString() + ")" } } -class AndExpr[E1 <: E, E2 <: E](op1: E1, op2: E2) extends BinaryOp[E1, E2](op1, op2) { - def str = "AND" -} - -class OrExpr[E1 <: E, E2 <: E](op1: E1, op2: E2) extends BinaryOp[E1, E2](op1, op2) { - override def str = "OR" -} +class AndExpr[E1 <: E, E2 <: E](op1: E1, op2: E2) + extends BinaryOp[E1, E2]("AND", op1, op2) -class EqExpr[T <: InfluxValue](column: InfluxIdentifier, op: ValueExpr[T]) - extends BinaryOp[InfluxIdentifier, ValueExpr[T]](column, op) { - def str = "==" -} +class OrExpr[E1 <: E, E2 <: E](op1: E1, op2: E2) + extends BinaryOp[E1, E2]("OR", op1, op2) -class NeqExpr[T <: InfluxValue](column: InfluxIdentifier, op: ValueExpr[T]) - extends BinaryOp[InfluxIdentifier, ValueExpr[T]](column, op) { - def str = "==" +trait InfluxValue { + type JavaT + def toString(): String } class InfluxString(val value: String) extends Expr with InfluxValue { @@ -133,7 +119,6 @@ class InfluxString(val value: String) extends Expr with InfluxValue { override def toString(): String = { "'" + value.replace("\\", "\\\\").replace("'", "\\'") + "'" // TODO } - override def toJava(): String = value } class InfluxNumber(val value: Number) extends Expr with InfluxValue { @@ -142,7 +127,15 @@ class InfluxNumber(val value: Number) extends Expr with InfluxValue { override def toString(): String = { value.toString } - override def toJava(): Number = value +} + +class InfluxDate(val value: Date) extends Expr with InfluxValue { + type JavaT = Date + + override def toString(): String = { + val time: Long = value.getTime() / 1000 + s"${time}s" + } } class InfluxIdentifier(val name: String) extends Expr { @@ -150,40 +143,120 @@ class InfluxIdentifier(val name: String) extends Expr { "\"" + name.replace("\\", "\\\\").replace("\"", "\\\"") + "\"" } - def ==[Z <: InfluxValue, ValueT <% Z](value: ValueT): EqExpr[Z] = { - new EqExpr(this, new ValueExpr(value)) - } + def ==[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = + new BinaryOp(BinaryOp.OP_EQ, this, new ValueExpr[ValueT](value)) - def !=[Z <: InfluxValue, ValueT <% Z](value: ValueT): NeqExpr[Z] = { - new NeqExpr(this, new ValueExpr(value)) - } -} + def !=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = + new BinaryOp(BinaryOp.OP_NE, this, new ValueExpr[ValueT](value)) -object Influxdb { - def SELECT(column: String) = - new QueryBuilderSelect(SizedOp.wrap(Influxdb.influxColumn(column))) - - implicit def influxString(s: String): InfluxString = { - new InfluxString(s) - } + def >[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = + new BinaryOp(BinaryOp.OP_GT, this, new ValueExpr[ValueT](value)) - implicit def influxNumber(n: Int): InfluxNumber = { - new InfluxNumber(n) - } + def >=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = + new BinaryOp(BinaryOp.OP_GE, this, new ValueExpr[ValueT](value)) - implicit def influxNumber(n: Double): InfluxNumber = { - new InfluxNumber(n) - } + def <[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = + new BinaryOp(BinaryOp.OP_LT, this, new ValueExpr[ValueT](value)) - implicit def influxColumn(s: String): InfluxIdentifier = { - new InfluxIdentifier(s) + def <=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = + new BinaryOp(BinaryOp.OP_LE, this, new ValueExpr[ValueT](value)) +} + +case class JsonResult(series: List[JsonSeries], error: String) { + def this(series: List[JsonSeries]) = this(series, null) +} +case class JsonSeries(name: String, columns: List[String], values: List[List[Any]]) { + override def toString() = values.map { _.map { _.getClass() } }.mkString(", ") +} + +class InfluxdbError(val serverMessage: String) + extends Exception(s"Server responded: $serverMessage") + +class InfluxdbExecutor[L <: Nat, IdentT <% InfluxIdentifier]( + connection: InfluxDB, + query: QueryBuilderWithSelectAndFrom[L, IdentT])(implicit ev: ToInt[L]) { + private val httpClient = HttpClients.createDefault(); + private val logger = Logger(LoggerFactory.getLogger(getClass)) + private val uriBuilder = new URIBuilder() + .setScheme("http") + .setHost(connection.host) + .setPort(connection.port) + .setPath("/query") + .setParameter("u", connection.username) + .setParameter("p", connection.password) + .setParameter("db", connection.database) + + def map[B, That](f: Sized[InfluxValue, L] => B): Seq[String] = { + implicit val formats = DefaultFormats + + //{"results":[ + // {"series":[ + // {"name":"esg", + // "columns":["time","product_name","sku","price"], + // "values":[ + // ["2015-06-05T09:30:13.084815572Z","Goldbarren 100g Heraeus","01011072",3400.73], + // ["2015-06-05T17:00:13.888788593Z","Goldbarren 100g Heraeus","01011072",3425.29]]}]}]} + uriBuilder.setParameter("q", query.toString()) + + val httpGet = new HttpGet(uriBuilder.build()) + logger.debug("Query={}", query.toString()) + logger.debug("Call: {}", httpGet.getURI) + + val response1 = httpClient.execute(httpGet); + try { + System.out.println(response1.getStatusLine()); + val entity1 = response1.getEntity(); + + val json = parse(entity1.getContent) + val results = (json \ "results").extract[List[JsonResult]] + val errors = results.filter { _.error != null }.map { _.error } + if (errors.size > 0) { + throw new InfluxdbError(errors.mkString(". ")) + } + println(results) + } finally { + response1.close(); + } + + val columns = query.columns + println(SizedOp.count(columns)) + // toIntSucc(columns) +//collection.mutable.LinkedList + List(columns.toString()) } +} - def col(col: String): InfluxIdentifier = { - new InfluxIdentifier(col) +class InfluxDB(val database: String, + val host: String, + val port: Int = 8086, + val username: String = "", + val password: String = "") { + def apply[L <: Nat, IdentT <% InfluxIdentifier]( + q: QueryBuilderWithSelectAndFrom[L, IdentT])(implicit ev: ToInt[L]) = { + new InfluxdbExecutor[L, IdentT](this, q) } + // def server(database: String, host: String, port: Long = 8080L, username: String = "", password: String = "") = // implicit def queryToString(q: QueryBuilder2): String = q.toString // implicit def queryToString[T <: Expr](q: QueryBuilder3[T]): String = q.toString // implicit def queryToString[T <: Expr](q: QueryBuilder4[T]): String = q.toString } + +object InfluxDB { + def SELECT(column: String) = + new QueryBuilderSelect(SizedOp.wrap(influxColumn(column))) + + implicit def influxString(s: String) = new InfluxString(s) + + implicit def influxNumber(n: Int) = new InfluxNumber(n) + + implicit def influxNumber(n: Double) = new InfluxNumber(n) + + implicit def influxDate(n: Date): InfluxDate = new InfluxDate(n) + + implicit def influxColumn(s: String) = new InfluxIdentifier(s) + + def col(col: String) = new InfluxIdentifier(col) + + def time = new InfluxIdentifier("time") +}
\ No newline at end of file diff --git a/src/main/scala/org/xapek/influxdb/Main.scala b/src/main/scala/org/xapek/influxdb/Main.scala index 58fc318..0f9bc12 100644 --- a/src/main/scala/org/xapek/influxdb/Main.scala +++ b/src/main/scala/org/xapek/influxdb/Main.scala @@ -6,7 +6,9 @@ import akka.event.Logging import akka.actor.ActorDSL import akka.actor.ActorSystem import akka.actor.ActorLogging -import org.xapek.influxdb.Influxdb._ +import org.xapek.influxdb.InfluxDB._ +import java.util.Date +import java.util.Calendar case class Row(columns: Seq[String]) @@ -22,15 +24,23 @@ class GenericReader(query: String) extends Actor with ActorLogging { object Main { def main(args: Array[String]): Unit = { - def url = "http://localhost:8083/query" - val s1 = Influxdb.SELECT("test") - println(s1.columns) + val db = new InfluxDB("data", "localhost") - val s2 = Influxdb.SELECT("foo")("asd").FROM("table").WHERE(col("asd") == "asd") - println(s2) + val s2 = SELECT("foo")("asd").FROM("table").WHERE(col("asd") == "asd") - val s3 = Influxdb.SELECT("foo")("bla")("baz") FROM "asd" WHERE col("asd") == "asd" - println(s3) + val s3 = SELECT("foo")("bla")("baz") FROM "asd" WHERE time > "asd" + + val calendar = Calendar.getInstance + calendar.set(Calendar.HOUR_OF_DAY, 0) + calendar.set(Calendar.MINUTE, 0) + calendar.set(Calendar.SECOND, 0) + calendar.set(Calendar.MILLISECOND, 0) + calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) - 2) + + // SELECT mean(price) FROM "esg" WHERE $timeFilter AND sku='01011072' GROUP BY time($interval) ORDER BY asc + val foo = db(SELECT("product_name")("sku")("price") FROM "esg" WHERE col("sku") == "01011072" && time > calendar.getTime) + .map({ x => x }) + println(foo) // val system = ActorSystem("test") // val alice = system.actorOf(Props(MyReader), "alice") // diff --git a/src/main/scala/org/xapek/influxdb/Sized.scala b/src/main/scala/org/xapek/influxdb/Sized.scala index bf3bca6..2b4e6cc 100644 --- a/src/main/scala/org/xapek/influxdb/Sized.scala +++ b/src/main/scala/org/xapek/influxdb/Sized.scala @@ -30,6 +30,7 @@ object ToInt { implicit val toInt0 = new ToInt[_0] { def apply() = 0 } + implicit def toIntSucc[N <: Nat](implicit toIntN: ToInt[N]) = new ToInt[Succ[N]] { def apply() = toIntN() + 1 } |