summaryrefslogtreecommitdiff
path: root/src/main/scala/org/xapek
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/org/xapek')
-rw-r--r--src/main/scala/org/xapek/influxdb/Influxdb.scala285
-rw-r--r--src/main/scala/org/xapek/influxdb/Main.scala26
-rw-r--r--src/main/scala/org/xapek/influxdb/Sized.scala1
3 files changed, 198 insertions, 114 deletions
diff --git a/src/main/scala/org/xapek/influxdb/Influxdb.scala b/src/main/scala/org/xapek/influxdb/Influxdb.scala
index 9ed778c..f5d33f4 100644
--- a/src/main/scala/org/xapek/influxdb/Influxdb.scala
+++ b/src/main/scala/org/xapek/influxdb/Influxdb.scala
@@ -1,16 +1,28 @@
package org.xapek.influxdb
-trait InfluxValue {
- type JavaT
- def toString(): String
- def toJava: JavaT
+import java.util.Date
+import org.apache.http.client.methods.HttpGet
+import org.apache.http.client.utils.URIBuilder
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.util.EntityUtils
+import org.slf4j.LoggerFactory
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+import com.typesafe.scalalogging.Logger
+import scala.util.parsing.json.JSONObject
+
+abstract class QueryBuilderWithSelectAndFrom[L <: Nat, IdentT <% InfluxIdentifier](
+ val columns: Sized[IdentT, L],
+ val table: IdentT) {
+ def toString: String
}
-class QueryBuilderSelect[L <: Nat, IdentT <% InfluxIdentifier](val columns: Sized[IdentT, L]) {
- def apply(column: IdentT): QueryBuilderSelect[Succ[L], IdentT] =
+class QueryBuilderSelect[L <: Nat, IdentT <% InfluxIdentifier](
+ val columns: Sized[IdentT, L]) {
+ def apply(column: IdentT) =
new QueryBuilderSelect(SizedOp.add(columns, column))
- def FROM(table: IdentT) = new QueryBuilderFrom(columns, table)
+ def FROM(table: IdentT) = new QueryBuilderFrom(this, table)
implicit def influxIdentifier(s: String): InfluxIdentifier = {
new InfluxIdentifier(s)
@@ -19,64 +31,38 @@ class QueryBuilderSelect[L <: Nat, IdentT <% InfluxIdentifier](val columns: Size
override def toString() = "SELECT " + columns.unsized.mkString(", ")
}
-class QueryBuilderFrom[L <: Nat, IdentT <% InfluxIdentifier](columns: Sized[IdentT, L], val table: IdentT)
- extends QueryBuilderSelect(columns) {
+class QueryBuilderFrom[L <: Nat, IdentT <% InfluxIdentifier](
+ select: QueryBuilderSelect[L, IdentT],
+ table: IdentT)
+ extends QueryBuilderWithSelectAndFrom[L, IdentT](select.columns, table) {
def WHERE[WhereT <: Expr](where: WhereT) =
- new QueryBuilderWhere(columns, table, where)
-
- override def toString() = super.toString() + " FROM " + table
-}
-
-class QueryBuilderWhere[L <: Nat, IdentT <% InfluxIdentifier, WhereT <: Expr](columns: Sized[IdentT, L],
- table: IdentT,
- val where: WhereT)
- extends QueryBuilderFrom(columns, table) {
- override def toString() = super.toString() + " WHERE " + where
-}
-//class QueryBuilder1(select: Seq[InfluxIdentifier]) {
-// def FROM(table: String): QueryBuilder2 = {
-// new QueryBuilder2(select, table)
-// }
-//}
-
-//class QueryBuilder2(select: Seq[InfluxIdentifier], from: String) extends QueryBuilder1(select) {
-// def WHERE[E <: Expr](eq: E): QueryBuilder3[E] = {
-// new QueryBuilder3(eq, select, from)
-// }
-//
-// override def toString(): String = {
-// "SELECT " + select.mkString(", ") + " FROM " + from
-// }
-//}
-
-//class QueryBuilder3[WhereT <: Expr](whereExpr: WhereT, select: Seq[InfluxIdentifier], from: String) extends QueryBuilder2(select, from) {
-// def &&[E2 <: Expr](other: E2): QueryBuilder3[AndExpr[WhereT, E2]] = {
-// new QueryBuilder3(new AndExpr(whereExpr, other), select, from)
-// }
-//
-// def ||[E2 <: Expr](other: E2): QueryBuilder3[OrExpr[WhereT, E2]] = {
-// new QueryBuilder3(new OrExpr(whereExpr, other), select, from)
-// }
-//
-// def GROUP_BY[ColumnT <% InfluxIdentifier](column: ColumnT): QueryBuilder4[WhereT] = {
-// new QueryBuilder4(List(column), whereExpr, select, from)
-// }
-//
-// override def toString(): String = {
-// super.toString() + " WHERE " + whereExpr.toString()
-// }
-//}
-//
-//class QueryBuilder4[WhereT <: Expr](groupBy: Seq[InfluxIdentifier], whereExpr: WhereT, select: Seq[InfluxIdentifier], from: String) extends QueryBuilder3(whereExpr, select, from) {
-// def <<=[ColumnT <% InfluxIdentifier](column: ColumnT): QueryBuilder4[WhereT] = {
-// val x: InfluxIdentifier = column
-// new QueryBuilder4(groupBy :+ x, whereExpr, select, from)
-// }
-//
-// override def toString(): String = {
-// super.toString() + " GROUP BY " + groupBy.mkString(", ")
-// }
-//}
+ new QueryBuilderWhere(this, where)
+
+ override def toString() = select.toString() + " FROM " + table
+}
+
+class QueryBuilderWhere[L <: Nat, IdentT <% InfluxIdentifier, WhereT <: Expr](
+ val from: QueryBuilderFrom[L, IdentT],
+ val where: WhereT)
+ extends QueryBuilderWithSelectAndFrom[L, IdentT](from.columns, from.table) {
+
+ def GROUP_BY(column: IdentT) =
+ new QueryBuilderGroupBy(this, List(column))
+
+ override def toString() = from.toString() + " WHERE " + where
+}
+
+class QueryBuilderGroupBy[L <: Nat, IdentT <% InfluxIdentifier, WhereT <: Expr](
+ where: QueryBuilderWhere[L, IdentT, WhereT],
+ var groupBy: Seq[IdentT])
+ extends QueryBuilderWithSelectAndFrom[L, IdentT](where.columns, where.table) {
+
+ def apply(column: IdentT): QueryBuilderGroupBy[L, IdentT, WhereT] = {
+ groupBy = groupBy ++ List(column)
+ this
+ }
+ override def toString() = where.toString() + " GROUP BY " + groupBy.mkString(", ")
+}
protected trait E {
def &&[E2 <: Expr](other: E2): AndExpr[this.type, E2]
@@ -103,29 +89,29 @@ class ValueExpr[ValueT <: InfluxValue](val value: ValueT) extends Expr {
override def toString(): String = value.toString
}
-abstract class BinaryOp[E1 <: E, E2 <: E](val op1: E1, val op2: E2) extends Expr {
- def str: String
+final object BinaryOp {
+ val OP_EQ = "="
+ val OP_NE = "!="
+ val OP_GT = ">"
+ val OP_GE = ">="
+ val OP_LT = "<"
+ val OP_LE = "<="
+}
+class BinaryOp[E1 <: E, E2 <: E](val str: String, val op1: E1, val op2: E2) extends Expr {
override def toString(): String = {
"(" + op1.toString() + " " + str + " " + op2.toString() + ")"
}
}
-class AndExpr[E1 <: E, E2 <: E](op1: E1, op2: E2) extends BinaryOp[E1, E2](op1, op2) {
- def str = "AND"
-}
-
-class OrExpr[E1 <: E, E2 <: E](op1: E1, op2: E2) extends BinaryOp[E1, E2](op1, op2) {
- override def str = "OR"
-}
+class AndExpr[E1 <: E, E2 <: E](op1: E1, op2: E2)
+ extends BinaryOp[E1, E2]("AND", op1, op2)
-class EqExpr[T <: InfluxValue](column: InfluxIdentifier, op: ValueExpr[T])
- extends BinaryOp[InfluxIdentifier, ValueExpr[T]](column, op) {
- def str = "=="
-}
+class OrExpr[E1 <: E, E2 <: E](op1: E1, op2: E2)
+ extends BinaryOp[E1, E2]("OR", op1, op2)
-class NeqExpr[T <: InfluxValue](column: InfluxIdentifier, op: ValueExpr[T])
- extends BinaryOp[InfluxIdentifier, ValueExpr[T]](column, op) {
- def str = "=="
+trait InfluxValue {
+ type JavaT
+ def toString(): String
}
class InfluxString(val value: String) extends Expr with InfluxValue {
@@ -133,7 +119,6 @@ class InfluxString(val value: String) extends Expr with InfluxValue {
override def toString(): String = {
"'" + value.replace("\\", "\\\\").replace("'", "\\'") + "'" // TODO
}
- override def toJava(): String = value
}
class InfluxNumber(val value: Number) extends Expr with InfluxValue {
@@ -142,7 +127,15 @@ class InfluxNumber(val value: Number) extends Expr with InfluxValue {
override def toString(): String = {
value.toString
}
- override def toJava(): Number = value
+}
+
+class InfluxDate(val value: Date) extends Expr with InfluxValue {
+ type JavaT = Date
+
+ override def toString(): String = {
+ val time: Long = value.getTime() / 1000
+ s"${time}s"
+ }
}
class InfluxIdentifier(val name: String) extends Expr {
@@ -150,40 +143,120 @@ class InfluxIdentifier(val name: String) extends Expr {
"\"" + name.replace("\\", "\\\\").replace("\"", "\\\"") + "\""
}
- def ==[Z <: InfluxValue, ValueT <% Z](value: ValueT): EqExpr[Z] = {
- new EqExpr(this, new ValueExpr(value))
- }
+ def ==[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
+ new BinaryOp(BinaryOp.OP_EQ, this, new ValueExpr[ValueT](value))
- def !=[Z <: InfluxValue, ValueT <% Z](value: ValueT): NeqExpr[Z] = {
- new NeqExpr(this, new ValueExpr(value))
- }
-}
+ def !=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
+ new BinaryOp(BinaryOp.OP_NE, this, new ValueExpr[ValueT](value))
-object Influxdb {
- def SELECT(column: String) =
- new QueryBuilderSelect(SizedOp.wrap(Influxdb.influxColumn(column)))
-
- implicit def influxString(s: String): InfluxString = {
- new InfluxString(s)
- }
+ def >[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
+ new BinaryOp(BinaryOp.OP_GT, this, new ValueExpr[ValueT](value))
- implicit def influxNumber(n: Int): InfluxNumber = {
- new InfluxNumber(n)
- }
+ def >=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
+ new BinaryOp(BinaryOp.OP_GE, this, new ValueExpr[ValueT](value))
- implicit def influxNumber(n: Double): InfluxNumber = {
- new InfluxNumber(n)
- }
+ def <[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
+ new BinaryOp(BinaryOp.OP_LT, this, new ValueExpr[ValueT](value))
- implicit def influxColumn(s: String): InfluxIdentifier = {
- new InfluxIdentifier(s)
+ def <=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
+ new BinaryOp(BinaryOp.OP_LE, this, new ValueExpr[ValueT](value))
+}
+
+case class JsonResult(series: List[JsonSeries], error: String) {
+ def this(series: List[JsonSeries]) = this(series, null)
+}
+case class JsonSeries(name: String, columns: List[String], values: List[List[Any]]) {
+ override def toString() = values.map { _.map { _.getClass() } }.mkString(", ")
+}
+
+class InfluxdbError(val serverMessage: String)
+ extends Exception(s"Server responded: $serverMessage")
+
+class InfluxdbExecutor[L <: Nat, IdentT <% InfluxIdentifier](
+ connection: InfluxDB,
+ query: QueryBuilderWithSelectAndFrom[L, IdentT])(implicit ev: ToInt[L]) {
+ private val httpClient = HttpClients.createDefault();
+ private val logger = Logger(LoggerFactory.getLogger(getClass))
+ private val uriBuilder = new URIBuilder()
+ .setScheme("http")
+ .setHost(connection.host)
+ .setPort(connection.port)
+ .setPath("/query")
+ .setParameter("u", connection.username)
+ .setParameter("p", connection.password)
+ .setParameter("db", connection.database)
+
+ def map[B, That](f: Sized[InfluxValue, L] => B): Seq[String] = {
+ implicit val formats = DefaultFormats
+
+ //{"results":[
+ // {"series":[
+ // {"name":"esg",
+ // "columns":["time","product_name","sku","price"],
+ // "values":[
+ // ["2015-06-05T09:30:13.084815572Z","Goldbarren 100g Heraeus","01011072",3400.73],
+ // ["2015-06-05T17:00:13.888788593Z","Goldbarren 100g Heraeus","01011072",3425.29]]}]}]}
+ uriBuilder.setParameter("q", query.toString())
+
+ val httpGet = new HttpGet(uriBuilder.build())
+ logger.debug("Query={}", query.toString())
+ logger.debug("Call: {}", httpGet.getURI)
+
+ val response1 = httpClient.execute(httpGet);
+ try {
+ System.out.println(response1.getStatusLine());
+ val entity1 = response1.getEntity();
+
+ val json = parse(entity1.getContent)
+ val results = (json \ "results").extract[List[JsonResult]]
+ val errors = results.filter { _.error != null }.map { _.error }
+ if (errors.size > 0) {
+ throw new InfluxdbError(errors.mkString(". "))
+ }
+ println(results)
+ } finally {
+ response1.close();
+ }
+
+ val columns = query.columns
+ println(SizedOp.count(columns))
+ // toIntSucc(columns)
+//collection.mutable.LinkedList
+ List(columns.toString())
}
+}
- def col(col: String): InfluxIdentifier = {
- new InfluxIdentifier(col)
+class InfluxDB(val database: String,
+ val host: String,
+ val port: Int = 8086,
+ val username: String = "",
+ val password: String = "") {
+ def apply[L <: Nat, IdentT <% InfluxIdentifier](
+ q: QueryBuilderWithSelectAndFrom[L, IdentT])(implicit ev: ToInt[L]) = {
+ new InfluxdbExecutor[L, IdentT](this, q)
}
+ // def server(database: String, host: String, port: Long = 8080L, username: String = "", password: String = "") =
// implicit def queryToString(q: QueryBuilder2): String = q.toString
// implicit def queryToString[T <: Expr](q: QueryBuilder3[T]): String = q.toString
// implicit def queryToString[T <: Expr](q: QueryBuilder4[T]): String = q.toString
}
+
+object InfluxDB {
+ def SELECT(column: String) =
+ new QueryBuilderSelect(SizedOp.wrap(influxColumn(column)))
+
+ implicit def influxString(s: String) = new InfluxString(s)
+
+ implicit def influxNumber(n: Int) = new InfluxNumber(n)
+
+ implicit def influxNumber(n: Double) = new InfluxNumber(n)
+
+ implicit def influxDate(n: Date): InfluxDate = new InfluxDate(n)
+
+ implicit def influxColumn(s: String) = new InfluxIdentifier(s)
+
+ def col(col: String) = new InfluxIdentifier(col)
+
+ def time = new InfluxIdentifier("time")
+} \ No newline at end of file
diff --git a/src/main/scala/org/xapek/influxdb/Main.scala b/src/main/scala/org/xapek/influxdb/Main.scala
index 58fc318..0f9bc12 100644
--- a/src/main/scala/org/xapek/influxdb/Main.scala
+++ b/src/main/scala/org/xapek/influxdb/Main.scala
@@ -6,7 +6,9 @@ import akka.event.Logging
import akka.actor.ActorDSL
import akka.actor.ActorSystem
import akka.actor.ActorLogging
-import org.xapek.influxdb.Influxdb._
+import org.xapek.influxdb.InfluxDB._
+import java.util.Date
+import java.util.Calendar
case class Row(columns: Seq[String])
@@ -22,15 +24,23 @@ class GenericReader(query: String) extends Actor with ActorLogging {
object Main {
def main(args: Array[String]): Unit = {
- def url = "http://localhost:8083/query"
- val s1 = Influxdb.SELECT("test")
- println(s1.columns)
+ val db = new InfluxDB("data", "localhost")
- val s2 = Influxdb.SELECT("foo")("asd").FROM("table").WHERE(col("asd") == "asd")
- println(s2)
+ val s2 = SELECT("foo")("asd").FROM("table").WHERE(col("asd") == "asd")
- val s3 = Influxdb.SELECT("foo")("bla")("baz") FROM "asd" WHERE col("asd") == "asd"
- println(s3)
+ val s3 = SELECT("foo")("bla")("baz") FROM "asd" WHERE time > "asd"
+
+ val calendar = Calendar.getInstance
+ calendar.set(Calendar.HOUR_OF_DAY, 0)
+ calendar.set(Calendar.MINUTE, 0)
+ calendar.set(Calendar.SECOND, 0)
+ calendar.set(Calendar.MILLISECOND, 0)
+ calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) - 2)
+
+ // SELECT mean(price) FROM "esg" WHERE $timeFilter AND sku='01011072' GROUP BY time($interval) ORDER BY asc
+ val foo = db(SELECT("product_name")("sku")("price") FROM "esg" WHERE col("sku") == "01011072" && time > calendar.getTime)
+ .map({ x => x })
+ println(foo)
// val system = ActorSystem("test")
// val alice = system.actorOf(Props(MyReader), "alice")
//
diff --git a/src/main/scala/org/xapek/influxdb/Sized.scala b/src/main/scala/org/xapek/influxdb/Sized.scala
index bf3bca6..2b4e6cc 100644
--- a/src/main/scala/org/xapek/influxdb/Sized.scala
+++ b/src/main/scala/org/xapek/influxdb/Sized.scala
@@ -30,6 +30,7 @@ object ToInt {
implicit val toInt0 = new ToInt[_0] {
def apply() = 0
}
+
implicit def toIntSucc[N <: Nat](implicit toIntN: ToInt[N]) = new ToInt[Succ[N]] {
def apply() = toIntN() + 1
}