summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorYves Fischer <yvesf-git@xapek.org>2015-06-07 00:50:45 +0200
committerYves Fischer <yvesf-git@xapek.org>2015-06-07 00:50:45 +0200
commitbc8bf89edc3e24a756c1c9628087a5ae6f5af80a (patch)
treee6861d07d09e839f3222ca862747da165addc746 /src
parent8a17a2f6d77cb6bd3e5629d3944b30f449cb1237 (diff)
downloadinfluxdb-tools-bc8bf89edc3e24a756c1c9628087a5ae6f5af80a.tar.gz
influxdb-tools-bc8bf89edc3e24a756c1c9628087a5ae6f5af80a.zip
first request worked. no result transformation yet
Diffstat (limited to 'src')
-rw-r--r--src/main/scala/org/xapek/influxdb/Influxdb.scala285
-rw-r--r--src/main/scala/org/xapek/influxdb/Main.scala26
-rw-r--r--src/main/scala/org/xapek/influxdb/Sized.scala1
-rw-r--r--src/test/scala/org/xapek/influxdb/InfluxdbTest.scala22
-rw-r--r--src/test/scala/org/xapek/influxdb/SizedTest.scala1
-rw-r--r--src/test/scala/simplelogger.properties34
6 files changed, 243 insertions, 126 deletions
diff --git a/src/main/scala/org/xapek/influxdb/Influxdb.scala b/src/main/scala/org/xapek/influxdb/Influxdb.scala
index 9ed778c..f5d33f4 100644
--- a/src/main/scala/org/xapek/influxdb/Influxdb.scala
+++ b/src/main/scala/org/xapek/influxdb/Influxdb.scala
@@ -1,16 +1,28 @@
package org.xapek.influxdb
-trait InfluxValue {
- type JavaT
- def toString(): String
- def toJava: JavaT
+import java.util.Date
+import org.apache.http.client.methods.HttpGet
+import org.apache.http.client.utils.URIBuilder
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.util.EntityUtils
+import org.slf4j.LoggerFactory
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+import com.typesafe.scalalogging.Logger
+import scala.util.parsing.json.JSONObject
+
+abstract class QueryBuilderWithSelectAndFrom[L <: Nat, IdentT <% InfluxIdentifier](
+ val columns: Sized[IdentT, L],
+ val table: IdentT) {
+ def toString: String
}
-class QueryBuilderSelect[L <: Nat, IdentT <% InfluxIdentifier](val columns: Sized[IdentT, L]) {
- def apply(column: IdentT): QueryBuilderSelect[Succ[L], IdentT] =
+class QueryBuilderSelect[L <: Nat, IdentT <% InfluxIdentifier](
+ val columns: Sized[IdentT, L]) {
+ def apply(column: IdentT) =
new QueryBuilderSelect(SizedOp.add(columns, column))
- def FROM(table: IdentT) = new QueryBuilderFrom(columns, table)
+ def FROM(table: IdentT) = new QueryBuilderFrom(this, table)
implicit def influxIdentifier(s: String): InfluxIdentifier = {
new InfluxIdentifier(s)
@@ -19,64 +31,38 @@ class QueryBuilderSelect[L <: Nat, IdentT <% InfluxIdentifier](val columns: Size
override def toString() = "SELECT " + columns.unsized.mkString(", ")
}
-class QueryBuilderFrom[L <: Nat, IdentT <% InfluxIdentifier](columns: Sized[IdentT, L], val table: IdentT)
- extends QueryBuilderSelect(columns) {
+class QueryBuilderFrom[L <: Nat, IdentT <% InfluxIdentifier](
+ select: QueryBuilderSelect[L, IdentT],
+ table: IdentT)
+ extends QueryBuilderWithSelectAndFrom[L, IdentT](select.columns, table) {
def WHERE[WhereT <: Expr](where: WhereT) =
- new QueryBuilderWhere(columns, table, where)
-
- override def toString() = super.toString() + " FROM " + table
-}
-
-class QueryBuilderWhere[L <: Nat, IdentT <% InfluxIdentifier, WhereT <: Expr](columns: Sized[IdentT, L],
- table: IdentT,
- val where: WhereT)
- extends QueryBuilderFrom(columns, table) {
- override def toString() = super.toString() + " WHERE " + where
-}
-//class QueryBuilder1(select: Seq[InfluxIdentifier]) {
-// def FROM(table: String): QueryBuilder2 = {
-// new QueryBuilder2(select, table)
-// }
-//}
-
-//class QueryBuilder2(select: Seq[InfluxIdentifier], from: String) extends QueryBuilder1(select) {
-// def WHERE[E <: Expr](eq: E): QueryBuilder3[E] = {
-// new QueryBuilder3(eq, select, from)
-// }
-//
-// override def toString(): String = {
-// "SELECT " + select.mkString(", ") + " FROM " + from
-// }
-//}
-
-//class QueryBuilder3[WhereT <: Expr](whereExpr: WhereT, select: Seq[InfluxIdentifier], from: String) extends QueryBuilder2(select, from) {
-// def &&[E2 <: Expr](other: E2): QueryBuilder3[AndExpr[WhereT, E2]] = {
-// new QueryBuilder3(new AndExpr(whereExpr, other), select, from)
-// }
-//
-// def ||[E2 <: Expr](other: E2): QueryBuilder3[OrExpr[WhereT, E2]] = {
-// new QueryBuilder3(new OrExpr(whereExpr, other), select, from)
-// }
-//
-// def GROUP_BY[ColumnT <% InfluxIdentifier](column: ColumnT): QueryBuilder4[WhereT] = {
-// new QueryBuilder4(List(column), whereExpr, select, from)
-// }
-//
-// override def toString(): String = {
-// super.toString() + " WHERE " + whereExpr.toString()
-// }
-//}
-//
-//class QueryBuilder4[WhereT <: Expr](groupBy: Seq[InfluxIdentifier], whereExpr: WhereT, select: Seq[InfluxIdentifier], from: String) extends QueryBuilder3(whereExpr, select, from) {
-// def <<=[ColumnT <% InfluxIdentifier](column: ColumnT): QueryBuilder4[WhereT] = {
-// val x: InfluxIdentifier = column
-// new QueryBuilder4(groupBy :+ x, whereExpr, select, from)
-// }
-//
-// override def toString(): String = {
-// super.toString() + " GROUP BY " + groupBy.mkString(", ")
-// }
-//}
+ new QueryBuilderWhere(this, where)
+
+ override def toString() = select.toString() + " FROM " + table
+}
+
+class QueryBuilderWhere[L <: Nat, IdentT <% InfluxIdentifier, WhereT <: Expr](
+ val from: QueryBuilderFrom[L, IdentT],
+ val where: WhereT)
+ extends QueryBuilderWithSelectAndFrom[L, IdentT](from.columns, from.table) {
+
+ def GROUP_BY(column: IdentT) =
+ new QueryBuilderGroupBy(this, List(column))
+
+ override def toString() = from.toString() + " WHERE " + where
+}
+
+class QueryBuilderGroupBy[L <: Nat, IdentT <% InfluxIdentifier, WhereT <: Expr](
+ where: QueryBuilderWhere[L, IdentT, WhereT],
+ var groupBy: Seq[IdentT])
+ extends QueryBuilderWithSelectAndFrom[L, IdentT](where.columns, where.table) {
+
+ def apply(column: IdentT): QueryBuilderGroupBy[L, IdentT, WhereT] = {
+ groupBy = groupBy ++ List(column)
+ this
+ }
+ override def toString() = where.toString() + " GROUP BY " + groupBy.mkString(", ")
+}
protected trait E {
def &&[E2 <: Expr](other: E2): AndExpr[this.type, E2]
@@ -103,29 +89,29 @@ class ValueExpr[ValueT <: InfluxValue](val value: ValueT) extends Expr {
override def toString(): String = value.toString
}
-abstract class BinaryOp[E1 <: E, E2 <: E](val op1: E1, val op2: E2) extends Expr {
- def str: String
+final object BinaryOp {
+ val OP_EQ = "="
+ val OP_NE = "!="
+ val OP_GT = ">"
+ val OP_GE = ">="
+ val OP_LT = "<"
+ val OP_LE = "<="
+}
+class BinaryOp[E1 <: E, E2 <: E](val str: String, val op1: E1, val op2: E2) extends Expr {
override def toString(): String = {
"(" + op1.toString() + " " + str + " " + op2.toString() + ")"
}
}
-class AndExpr[E1 <: E, E2 <: E](op1: E1, op2: E2) extends BinaryOp[E1, E2](op1, op2) {
- def str = "AND"
-}
-
-class OrExpr[E1 <: E, E2 <: E](op1: E1, op2: E2) extends BinaryOp[E1, E2](op1, op2) {
- override def str = "OR"
-}
+class AndExpr[E1 <: E, E2 <: E](op1: E1, op2: E2)
+ extends BinaryOp[E1, E2]("AND", op1, op2)
-class EqExpr[T <: InfluxValue](column: InfluxIdentifier, op: ValueExpr[T])
- extends BinaryOp[InfluxIdentifier, ValueExpr[T]](column, op) {
- def str = "=="
-}
+class OrExpr[E1 <: E, E2 <: E](op1: E1, op2: E2)
+ extends BinaryOp[E1, E2]("OR", op1, op2)
-class NeqExpr[T <: InfluxValue](column: InfluxIdentifier, op: ValueExpr[T])
- extends BinaryOp[InfluxIdentifier, ValueExpr[T]](column, op) {
- def str = "=="
+trait InfluxValue {
+ type JavaT
+ def toString(): String
}
class InfluxString(val value: String) extends Expr with InfluxValue {
@@ -133,7 +119,6 @@ class InfluxString(val value: String) extends Expr with InfluxValue {
override def toString(): String = {
"'" + value.replace("\\", "\\\\").replace("'", "\\'") + "'" // TODO
}
- override def toJava(): String = value
}
class InfluxNumber(val value: Number) extends Expr with InfluxValue {
@@ -142,7 +127,15 @@ class InfluxNumber(val value: Number) extends Expr with InfluxValue {
override def toString(): String = {
value.toString
}
- override def toJava(): Number = value
+}
+
+class InfluxDate(val value: Date) extends Expr with InfluxValue {
+ type JavaT = Date
+
+ override def toString(): String = {
+ val time: Long = value.getTime() / 1000
+ s"${time}s"
+ }
}
class InfluxIdentifier(val name: String) extends Expr {
@@ -150,40 +143,120 @@ class InfluxIdentifier(val name: String) extends Expr {
"\"" + name.replace("\\", "\\\\").replace("\"", "\\\"") + "\""
}
- def ==[Z <: InfluxValue, ValueT <% Z](value: ValueT): EqExpr[Z] = {
- new EqExpr(this, new ValueExpr(value))
- }
+ def ==[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
+ new BinaryOp(BinaryOp.OP_EQ, this, new ValueExpr[ValueT](value))
- def !=[Z <: InfluxValue, ValueT <% Z](value: ValueT): NeqExpr[Z] = {
- new NeqExpr(this, new ValueExpr(value))
- }
-}
+ def !=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
+ new BinaryOp(BinaryOp.OP_NE, this, new ValueExpr[ValueT](value))
-object Influxdb {
- def SELECT(column: String) =
- new QueryBuilderSelect(SizedOp.wrap(Influxdb.influxColumn(column)))
-
- implicit def influxString(s: String): InfluxString = {
- new InfluxString(s)
- }
+ def >[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
+ new BinaryOp(BinaryOp.OP_GT, this, new ValueExpr[ValueT](value))
- implicit def influxNumber(n: Int): InfluxNumber = {
- new InfluxNumber(n)
- }
+ def >=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
+ new BinaryOp(BinaryOp.OP_GE, this, new ValueExpr[ValueT](value))
- implicit def influxNumber(n: Double): InfluxNumber = {
- new InfluxNumber(n)
- }
+ def <[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
+ new BinaryOp(BinaryOp.OP_LT, this, new ValueExpr[ValueT](value))
- implicit def influxColumn(s: String): InfluxIdentifier = {
- new InfluxIdentifier(s)
+ def <=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
+ new BinaryOp(BinaryOp.OP_LE, this, new ValueExpr[ValueT](value))
+}
+
+case class JsonResult(series: List[JsonSeries], error: String) {
+ def this(series: List[JsonSeries]) = this(series, null)
+}
+case class JsonSeries(name: String, columns: List[String], values: List[List[Any]]) {
+ override def toString() = values.map { _.map { _.getClass() } }.mkString(", ")
+}
+
+class InfluxdbError(val serverMessage: String)
+ extends Exception(s"Server responded: $serverMessage")
+
+class InfluxdbExecutor[L <: Nat, IdentT <% InfluxIdentifier](
+ connection: InfluxDB,
+ query: QueryBuilderWithSelectAndFrom[L, IdentT])(implicit ev: ToInt[L]) {
+ private val httpClient = HttpClients.createDefault();
+ private val logger = Logger(LoggerFactory.getLogger(getClass))
+ private val uriBuilder = new URIBuilder()
+ .setScheme("http")
+ .setHost(connection.host)
+ .setPort(connection.port)
+ .setPath("/query")
+ .setParameter("u", connection.username)
+ .setParameter("p", connection.password)
+ .setParameter("db", connection.database)
+
+ def map[B, That](f: Sized[InfluxValue, L] => B): Seq[String] = {
+ implicit val formats = DefaultFormats
+
+ //{"results":[
+ // {"series":[
+ // {"name":"esg",
+ // "columns":["time","product_name","sku","price"],
+ // "values":[
+ // ["2015-06-05T09:30:13.084815572Z","Goldbarren 100g Heraeus","01011072",3400.73],
+ // ["2015-06-05T17:00:13.888788593Z","Goldbarren 100g Heraeus","01011072",3425.29]]}]}]}
+ uriBuilder.setParameter("q", query.toString())
+
+ val httpGet = new HttpGet(uriBuilder.build())
+ logger.debug("Query={}", query.toString())
+ logger.debug("Call: {}", httpGet.getURI)
+
+ val response1 = httpClient.execute(httpGet);
+ try {
+ System.out.println(response1.getStatusLine());
+ val entity1 = response1.getEntity();
+
+ val json = parse(entity1.getContent)
+ val results = (json \ "results").extract[List[JsonResult]]
+ val errors = results.filter { _.error != null }.map { _.error }
+ if (errors.size > 0) {
+ throw new InfluxdbError(errors.mkString(". "))
+ }
+ println(results)
+ } finally {
+ response1.close();
+ }
+
+ val columns = query.columns
+ println(SizedOp.count(columns))
+ // toIntSucc(columns)
+//collection.mutable.LinkedList
+ List(columns.toString())
}
+}
- def col(col: String): InfluxIdentifier = {
- new InfluxIdentifier(col)
+class InfluxDB(val database: String,
+ val host: String,
+ val port: Int = 8086,
+ val username: String = "",
+ val password: String = "") {
+ def apply[L <: Nat, IdentT <% InfluxIdentifier](
+ q: QueryBuilderWithSelectAndFrom[L, IdentT])(implicit ev: ToInt[L]) = {
+ new InfluxdbExecutor[L, IdentT](this, q)
}
+ // def server(database: String, host: String, port: Long = 8080L, username: String = "", password: String = "") =
// implicit def queryToString(q: QueryBuilder2): String = q.toString
// implicit def queryToString[T <: Expr](q: QueryBuilder3[T]): String = q.toString
// implicit def queryToString[T <: Expr](q: QueryBuilder4[T]): String = q.toString
}
+
+object InfluxDB {
+ def SELECT(column: String) =
+ new QueryBuilderSelect(SizedOp.wrap(influxColumn(column)))
+
+ implicit def influxString(s: String) = new InfluxString(s)
+
+ implicit def influxNumber(n: Int) = new InfluxNumber(n)
+
+ implicit def influxNumber(n: Double) = new InfluxNumber(n)
+
+ implicit def influxDate(n: Date): InfluxDate = new InfluxDate(n)
+
+ implicit def influxColumn(s: String) = new InfluxIdentifier(s)
+
+ def col(col: String) = new InfluxIdentifier(col)
+
+ def time = new InfluxIdentifier("time")
+} \ No newline at end of file
diff --git a/src/main/scala/org/xapek/influxdb/Main.scala b/src/main/scala/org/xapek/influxdb/Main.scala
index 58fc318..0f9bc12 100644
--- a/src/main/scala/org/xapek/influxdb/Main.scala
+++ b/src/main/scala/org/xapek/influxdb/Main.scala
@@ -6,7 +6,9 @@ import akka.event.Logging
import akka.actor.ActorDSL
import akka.actor.ActorSystem
import akka.actor.ActorLogging
-import org.xapek.influxdb.Influxdb._
+import org.xapek.influxdb.InfluxDB._
+import java.util.Date
+import java.util.Calendar
case class Row(columns: Seq[String])
@@ -22,15 +24,23 @@ class GenericReader(query: String) extends Actor with ActorLogging {
object Main {
def main(args: Array[String]): Unit = {
- def url = "http://localhost:8083/query"
- val s1 = Influxdb.SELECT("test")
- println(s1.columns)
+ val db = new InfluxDB("data", "localhost")
- val s2 = Influxdb.SELECT("foo")("asd").FROM("table").WHERE(col("asd") == "asd")
- println(s2)
+ val s2 = SELECT("foo")("asd").FROM("table").WHERE(col("asd") == "asd")
- val s3 = Influxdb.SELECT("foo")("bla")("baz") FROM "asd" WHERE col("asd") == "asd"
- println(s3)
+ val s3 = SELECT("foo")("bla")("baz") FROM "asd" WHERE time > "asd"
+
+ val calendar = Calendar.getInstance
+ calendar.set(Calendar.HOUR_OF_DAY, 0)
+ calendar.set(Calendar.MINUTE, 0)
+ calendar.set(Calendar.SECOND, 0)
+ calendar.set(Calendar.MILLISECOND, 0)
+ calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) - 2)
+
+ // SELECT mean(price) FROM "esg" WHERE $timeFilter AND sku='01011072' GROUP BY time($interval) ORDER BY asc
+ val foo = db(SELECT("product_name")("sku")("price") FROM "esg" WHERE col("sku") == "01011072" && time > calendar.getTime)
+ .map({ x => x })
+ println(foo)
// val system = ActorSystem("test")
// val alice = system.actorOf(Props(MyReader), "alice")
//
diff --git a/src/main/scala/org/xapek/influxdb/Sized.scala b/src/main/scala/org/xapek/influxdb/Sized.scala
index bf3bca6..2b4e6cc 100644
--- a/src/main/scala/org/xapek/influxdb/Sized.scala
+++ b/src/main/scala/org/xapek/influxdb/Sized.scala
@@ -30,6 +30,7 @@ object ToInt {
implicit val toInt0 = new ToInt[_0] {
def apply() = 0
}
+
implicit def toIntSucc[N <: Nat](implicit toIntN: ToInt[N]) = new ToInt[Succ[N]] {
def apply() = toIntN() + 1
}
diff --git a/src/test/scala/org/xapek/influxdb/InfluxdbTest.scala b/src/test/scala/org/xapek/influxdb/InfluxdbTest.scala
index 806c798..4473a47 100644
--- a/src/test/scala/org/xapek/influxdb/InfluxdbTest.scala
+++ b/src/test/scala/org/xapek/influxdb/InfluxdbTest.scala
@@ -2,31 +2,31 @@ package org.xapek.influxdb
import org.junit._
import Assert._
-import org.xapek.influxdb.Influxdb._
+import org.xapek.influxdb.InfluxDB._
@Test
class AppTest {
@Test
def testStatements() = {
- val url = "http://localhost:8086/query"
+ val db = new InfluxDB("data", "localhost")
- assertTrue((Influxdb.SELECT("foo") FROM "bla" WHERE col("a") == "asd")
+ assertTrue((SELECT("foo") FROM "bla" WHERE col("a") == "asd")
.toString().contains("as"))
- assertTrue((Influxdb.SELECT("foo")("bla") FROM "bla" WHERE col("a") == "asd")
+ assertTrue((SELECT("foo")("bla") FROM "bla" WHERE col("a") == "asd")
.toString().contains("foo"))
+
+ assertTrue((SELECT("foo")("bla") FROM "bla" WHERE col("b") == 1)
+ .toString().contains("\"b\" = 1"))
- assertTrue((Influxdb.SELECT("foo")("bla") FROM "bla")
- .toString().contains("== 1"))
-
- assertTrue((Influxdb
- SELECT "foo"
+ assertTrue((
+ SELECT("foo")
FROM "bla"
WHERE col("a") == "asd" || col("b") == "C" && col("c") == "d").toString().contains("bla"))
- assertTrue((Influxdb
- SELECT "foo"
+ assertTrue((
+ SELECT("foo")
FROM "bla"
WHERE (col("a") == "asd" || col("b") == "C") && col("c") == "d").toString().contains("foo"))
diff --git a/src/test/scala/org/xapek/influxdb/SizedTest.scala b/src/test/scala/org/xapek/influxdb/SizedTest.scala
index 5f51e91..525ffd9 100644
--- a/src/test/scala/org/xapek/influxdb/SizedTest.scala
+++ b/src/test/scala/org/xapek/influxdb/SizedTest.scala
@@ -2,7 +2,6 @@ package org.xapek.influxdb
import org.junit._
import org.junit.Assert._
-import org.xapek.influxdb.Influxdb._
@Test
class SizedTest {
diff --git a/src/test/scala/simplelogger.properties b/src/test/scala/simplelogger.properties
new file mode 100644
index 0000000..a8665f6
--- /dev/null
+++ b/src/test/scala/simplelogger.properties
@@ -0,0 +1,34 @@
+# SLF4J's SimpleLogger configuration file
+# Simple implementation of Logger that sends all enabled log messages, for all defined loggers, to System.err.
+
+# Default logging detail level for all instances of SimpleLogger.
+# Must be one of ("trace", "debug", "info", "warn", or "error").
+# If not specified, defaults to "info".
+org.slf4j.simpleLogger.defaultLogLevel=trace
+
+# Logging detail level for a SimpleLogger instance named "xxxxx".
+# Must be one of ("trace", "debug", "info", "warn", or "error").
+# If not specified, the default logging detail level is used.
+#org.slf4j.simpleLogger.log.xxxxx=
+
+# Set to true if you want the current date and time to be included in output messages.
+# Default is false, and will output the number of milliseconds elapsed since startup.
+#org.slf4j.simpleLogger.showDateTime=
+
+# The date and time format to be used in the output messages.
+# The pattern describing the date and time format is the same that is used in java.text.SimpleDateFormat.
+# If the format is not specified or is invalid, the default format is used.
+# The default format is yyyy-MM-dd HH:mm:ss:SSS Z.
+#org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss:SSS Z
+
+# Set to true if you want to output the current thread name.
+# Defaults to true.
+#org.slf4j.simpleLogger.showThreadName=true
+
+# Set to true if you want the Logger instance name to be included in output messages.
+# Defaults to true.
+#org.slf4j.simpleLogger.showLogName=true
+
+# Set to true if you want the last component of the name to be included in output messages.
+# Defaults to false.
+#org.slf4j.simpleLogger.showShortLogName=false