diff options
-rw-r--r-- | pom.xml | 35 | ||||
-rw-r--r-- | src/main/scala/org/xapek/influxdb/Influxdb.scala | 285 | ||||
-rw-r--r-- | src/main/scala/org/xapek/influxdb/Main.scala | 26 | ||||
-rw-r--r-- | src/main/scala/org/xapek/influxdb/Sized.scala | 1 | ||||
-rw-r--r-- | src/test/scala/org/xapek/influxdb/InfluxdbTest.scala | 22 | ||||
-rw-r--r-- | src/test/scala/org/xapek/influxdb/SizedTest.scala | 1 | ||||
-rw-r--r-- | src/test/scala/simplelogger.properties | 34 |
7 files changed, 274 insertions, 130 deletions
@@ -4,9 +4,9 @@ <groupId>org.xapek</groupId> <artifactId>influxdb-tools</artifactId> <version>0.0.1-SNAPSHOT</version> - <inceptionYear>2008</inceptionYear> <properties> - <scala.version>2.11.6</scala.version> + <scala.version>2.11</scala.version> + <scala.version.exact>2.11.6</scala.version.exact> </properties> <repositories> @@ -39,11 +39,11 @@ <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> - <version>${scala.version}</version> + <version>${scala.version.exact}</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> - <artifactId>akka-actor_2.11</artifactId> + <artifactId>akka-actor_${scala.version}</artifactId> <version>2.3.9</version> </dependency> <dependency> @@ -58,6 +58,33 @@ <version>1.2.5</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.5</version> + </dependency> + <dependency> + <groupId>org.json4s</groupId> + <artifactId>json4s-jackson_${scala.version}</artifactId> + <version>3.2.11</version> + </dependency> + <dependency> + <groupId>org.json4s</groupId> + <artifactId>json4s-native_${scala.version}</artifactId> + <version>3.2.11</version> + </dependency> + + <dependency> + <groupId>com.typesafe.scala-logging</groupId> + <artifactId>scala-logging_${scala.version}</artifactId> + <version>3.1.0</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <version>1.7.7</version> + </dependency> </dependencies> <build> diff --git a/src/main/scala/org/xapek/influxdb/Influxdb.scala b/src/main/scala/org/xapek/influxdb/Influxdb.scala index 9ed778c..f5d33f4 100644 --- a/src/main/scala/org/xapek/influxdb/Influxdb.scala +++ b/src/main/scala/org/xapek/influxdb/Influxdb.scala @@ -1,16 +1,28 @@ package org.xapek.influxdb -trait InfluxValue { - type JavaT - def toString(): String - def toJava: JavaT +import java.util.Date +import org.apache.http.client.methods.HttpGet +import org.apache.http.client.utils.URIBuilder +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils +import org.slf4j.LoggerFactory +import org.json4s._ +import org.json4s.jackson.JsonMethods._ +import com.typesafe.scalalogging.Logger +import scala.util.parsing.json.JSONObject + +abstract class QueryBuilderWithSelectAndFrom[L <: Nat, IdentT <% InfluxIdentifier]( + val columns: Sized[IdentT, L], + val table: IdentT) { + def toString: String } -class QueryBuilderSelect[L <: Nat, IdentT <% InfluxIdentifier](val columns: Sized[IdentT, L]) { - def apply(column: IdentT): QueryBuilderSelect[Succ[L], IdentT] = +class QueryBuilderSelect[L <: Nat, IdentT <% InfluxIdentifier]( + val columns: Sized[IdentT, L]) { + def apply(column: IdentT) = new QueryBuilderSelect(SizedOp.add(columns, column)) - def FROM(table: IdentT) = new QueryBuilderFrom(columns, table) + def FROM(table: IdentT) = new QueryBuilderFrom(this, table) implicit def influxIdentifier(s: String): InfluxIdentifier = { new InfluxIdentifier(s) @@ -19,64 +31,38 @@ class QueryBuilderSelect[L <: Nat, IdentT <% InfluxIdentifier](val columns: Size override def toString() = "SELECT " + columns.unsized.mkString(", ") } -class QueryBuilderFrom[L <: Nat, IdentT <% InfluxIdentifier](columns: Sized[IdentT, L], val table: IdentT) - extends QueryBuilderSelect(columns) { +class QueryBuilderFrom[L <: Nat, IdentT <% InfluxIdentifier]( + select: QueryBuilderSelect[L, IdentT], + table: IdentT) + extends QueryBuilderWithSelectAndFrom[L, IdentT](select.columns, table) { def WHERE[WhereT <: Expr](where: WhereT) = - new QueryBuilderWhere(columns, table, where) - - override def toString() = super.toString() + " FROM " + table -} - -class QueryBuilderWhere[L <: Nat, IdentT <% InfluxIdentifier, WhereT <: Expr](columns: Sized[IdentT, L], - table: IdentT, - val where: WhereT) - extends QueryBuilderFrom(columns, table) { - override def toString() = super.toString() + " WHERE " + where -} -//class QueryBuilder1(select: Seq[InfluxIdentifier]) { -// def FROM(table: String): QueryBuilder2 = { -// new QueryBuilder2(select, table) -// } -//} - -//class QueryBuilder2(select: Seq[InfluxIdentifier], from: String) extends QueryBuilder1(select) { -// def WHERE[E <: Expr](eq: E): QueryBuilder3[E] = { -// new QueryBuilder3(eq, select, from) -// } -// -// override def toString(): String = { -// "SELECT " + select.mkString(", ") + " FROM " + from -// } -//} - -//class QueryBuilder3[WhereT <: Expr](whereExpr: WhereT, select: Seq[InfluxIdentifier], from: String) extends QueryBuilder2(select, from) { -// def &&[E2 <: Expr](other: E2): QueryBuilder3[AndExpr[WhereT, E2]] = { -// new QueryBuilder3(new AndExpr(whereExpr, other), select, from) -// } -// -// def ||[E2 <: Expr](other: E2): QueryBuilder3[OrExpr[WhereT, E2]] = { -// new QueryBuilder3(new OrExpr(whereExpr, other), select, from) -// } -// -// def GROUP_BY[ColumnT <% InfluxIdentifier](column: ColumnT): QueryBuilder4[WhereT] = { -// new QueryBuilder4(List(column), whereExpr, select, from) -// } -// -// override def toString(): String = { -// super.toString() + " WHERE " + whereExpr.toString() -// } -//} -// -//class QueryBuilder4[WhereT <: Expr](groupBy: Seq[InfluxIdentifier], whereExpr: WhereT, select: Seq[InfluxIdentifier], from: String) extends QueryBuilder3(whereExpr, select, from) { -// def <<=[ColumnT <% InfluxIdentifier](column: ColumnT): QueryBuilder4[WhereT] = { -// val x: InfluxIdentifier = column -// new QueryBuilder4(groupBy :+ x, whereExpr, select, from) -// } -// -// override def toString(): String = { -// super.toString() + " GROUP BY " + groupBy.mkString(", ") -// } -//} + new QueryBuilderWhere(this, where) + + override def toString() = select.toString() + " FROM " + table +} + +class QueryBuilderWhere[L <: Nat, IdentT <% InfluxIdentifier, WhereT <: Expr]( + val from: QueryBuilderFrom[L, IdentT], + val where: WhereT) + extends QueryBuilderWithSelectAndFrom[L, IdentT](from.columns, from.table) { + + def GROUP_BY(column: IdentT) = + new QueryBuilderGroupBy(this, List(column)) + + override def toString() = from.toString() + " WHERE " + where +} + +class QueryBuilderGroupBy[L <: Nat, IdentT <% InfluxIdentifier, WhereT <: Expr]( + where: QueryBuilderWhere[L, IdentT, WhereT], + var groupBy: Seq[IdentT]) + extends QueryBuilderWithSelectAndFrom[L, IdentT](where.columns, where.table) { + + def apply(column: IdentT): QueryBuilderGroupBy[L, IdentT, WhereT] = { + groupBy = groupBy ++ List(column) + this + } + override def toString() = where.toString() + " GROUP BY " + groupBy.mkString(", ") +} protected trait E { def &&[E2 <: Expr](other: E2): AndExpr[this.type, E2] @@ -103,29 +89,29 @@ class ValueExpr[ValueT <: InfluxValue](val value: ValueT) extends Expr { override def toString(): String = value.toString } -abstract class BinaryOp[E1 <: E, E2 <: E](val op1: E1, val op2: E2) extends Expr { - def str: String +final object BinaryOp { + val OP_EQ = "=" + val OP_NE = "!=" + val OP_GT = ">" + val OP_GE = ">=" + val OP_LT = "<" + val OP_LE = "<=" +} +class BinaryOp[E1 <: E, E2 <: E](val str: String, val op1: E1, val op2: E2) extends Expr { override def toString(): String = { "(" + op1.toString() + " " + str + " " + op2.toString() + ")" } } -class AndExpr[E1 <: E, E2 <: E](op1: E1, op2: E2) extends BinaryOp[E1, E2](op1, op2) { - def str = "AND" -} - -class OrExpr[E1 <: E, E2 <: E](op1: E1, op2: E2) extends BinaryOp[E1, E2](op1, op2) { - override def str = "OR" -} +class AndExpr[E1 <: E, E2 <: E](op1: E1, op2: E2) + extends BinaryOp[E1, E2]("AND", op1, op2) -class EqExpr[T <: InfluxValue](column: InfluxIdentifier, op: ValueExpr[T]) - extends BinaryOp[InfluxIdentifier, ValueExpr[T]](column, op) { - def str = "==" -} +class OrExpr[E1 <: E, E2 <: E](op1: E1, op2: E2) + extends BinaryOp[E1, E2]("OR", op1, op2) -class NeqExpr[T <: InfluxValue](column: InfluxIdentifier, op: ValueExpr[T]) - extends BinaryOp[InfluxIdentifier, ValueExpr[T]](column, op) { - def str = "==" +trait InfluxValue { + type JavaT + def toString(): String } class InfluxString(val value: String) extends Expr with InfluxValue { @@ -133,7 +119,6 @@ class InfluxString(val value: String) extends Expr with InfluxValue { override def toString(): String = { "'" + value.replace("\\", "\\\\").replace("'", "\\'") + "'" // TODO } - override def toJava(): String = value } class InfluxNumber(val value: Number) extends Expr with InfluxValue { @@ -142,7 +127,15 @@ class InfluxNumber(val value: Number) extends Expr with InfluxValue { override def toString(): String = { value.toString } - override def toJava(): Number = value +} + +class InfluxDate(val value: Date) extends Expr with InfluxValue { + type JavaT = Date + + override def toString(): String = { + val time: Long = value.getTime() / 1000 + s"${time}s" + } } class InfluxIdentifier(val name: String) extends Expr { @@ -150,40 +143,120 @@ class InfluxIdentifier(val name: String) extends Expr { "\"" + name.replace("\\", "\\\\").replace("\"", "\\\"") + "\"" } - def ==[Z <: InfluxValue, ValueT <% Z](value: ValueT): EqExpr[Z] = { - new EqExpr(this, new ValueExpr(value)) - } + def ==[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = + new BinaryOp(BinaryOp.OP_EQ, this, new ValueExpr[ValueT](value)) - def !=[Z <: InfluxValue, ValueT <% Z](value: ValueT): NeqExpr[Z] = { - new NeqExpr(this, new ValueExpr(value)) - } -} + def !=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = + new BinaryOp(BinaryOp.OP_NE, this, new ValueExpr[ValueT](value)) -object Influxdb { - def SELECT(column: String) = - new QueryBuilderSelect(SizedOp.wrap(Influxdb.influxColumn(column))) - - implicit def influxString(s: String): InfluxString = { - new InfluxString(s) - } + def >[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = + new BinaryOp(BinaryOp.OP_GT, this, new ValueExpr[ValueT](value)) - implicit def influxNumber(n: Int): InfluxNumber = { - new InfluxNumber(n) - } + def >=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = + new BinaryOp(BinaryOp.OP_GE, this, new ValueExpr[ValueT](value)) - implicit def influxNumber(n: Double): InfluxNumber = { - new InfluxNumber(n) - } + def <[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = + new BinaryOp(BinaryOp.OP_LT, this, new ValueExpr[ValueT](value)) - implicit def influxColumn(s: String): InfluxIdentifier = { - new InfluxIdentifier(s) + def <=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = + new BinaryOp(BinaryOp.OP_LE, this, new ValueExpr[ValueT](value)) +} + +case class JsonResult(series: List[JsonSeries], error: String) { + def this(series: List[JsonSeries]) = this(series, null) +} +case class JsonSeries(name: String, columns: List[String], values: List[List[Any]]) { + override def toString() = values.map { _.map { _.getClass() } }.mkString(", ") +} + +class InfluxdbError(val serverMessage: String) + extends Exception(s"Server responded: $serverMessage") + +class InfluxdbExecutor[L <: Nat, IdentT <% InfluxIdentifier]( + connection: InfluxDB, + query: QueryBuilderWithSelectAndFrom[L, IdentT])(implicit ev: ToInt[L]) { + private val httpClient = HttpClients.createDefault(); + private val logger = Logger(LoggerFactory.getLogger(getClass)) + private val uriBuilder = new URIBuilder() + .setScheme("http") + .setHost(connection.host) + .setPort(connection.port) + .setPath("/query") + .setParameter("u", connection.username) + .setParameter("p", connection.password) + .setParameter("db", connection.database) + + def map[B, That](f: Sized[InfluxValue, L] => B): Seq[String] = { + implicit val formats = DefaultFormats + + //{"results":[ + // {"series":[ + // {"name":"esg", + // "columns":["time","product_name","sku","price"], + // "values":[ + // ["2015-06-05T09:30:13.084815572Z","Goldbarren 100g Heraeus","01011072",3400.73], + // ["2015-06-05T17:00:13.888788593Z","Goldbarren 100g Heraeus","01011072",3425.29]]}]}]} + uriBuilder.setParameter("q", query.toString()) + + val httpGet = new HttpGet(uriBuilder.build()) + logger.debug("Query={}", query.toString()) + logger.debug("Call: {}", httpGet.getURI) + + val response1 = httpClient.execute(httpGet); + try { + System.out.println(response1.getStatusLine()); + val entity1 = response1.getEntity(); + + val json = parse(entity1.getContent) + val results = (json \ "results").extract[List[JsonResult]] + val errors = results.filter { _.error != null }.map { _.error } + if (errors.size > 0) { + throw new InfluxdbError(errors.mkString(". ")) + } + println(results) + } finally { + response1.close(); + } + + val columns = query.columns + println(SizedOp.count(columns)) + // toIntSucc(columns) +//collection.mutable.LinkedList + List(columns.toString()) } +} - def col(col: String): InfluxIdentifier = { - new InfluxIdentifier(col) +class InfluxDB(val database: String, + val host: String, + val port: Int = 8086, + val username: String = "", + val password: String = "") { + def apply[L <: Nat, IdentT <% InfluxIdentifier]( + q: QueryBuilderWithSelectAndFrom[L, IdentT])(implicit ev: ToInt[L]) = { + new InfluxdbExecutor[L, IdentT](this, q) } + // def server(database: String, host: String, port: Long = 8080L, username: String = "", password: String = "") = // implicit def queryToString(q: QueryBuilder2): String = q.toString // implicit def queryToString[T <: Expr](q: QueryBuilder3[T]): String = q.toString // implicit def queryToString[T <: Expr](q: QueryBuilder4[T]): String = q.toString } + +object InfluxDB { + def SELECT(column: String) = + new QueryBuilderSelect(SizedOp.wrap(influxColumn(column))) + + implicit def influxString(s: String) = new InfluxString(s) + + implicit def influxNumber(n: Int) = new InfluxNumber(n) + + implicit def influxNumber(n: Double) = new InfluxNumber(n) + + implicit def influxDate(n: Date): InfluxDate = new InfluxDate(n) + + implicit def influxColumn(s: String) = new InfluxIdentifier(s) + + def col(col: String) = new InfluxIdentifier(col) + + def time = new InfluxIdentifier("time") +}
\ No newline at end of file diff --git a/src/main/scala/org/xapek/influxdb/Main.scala b/src/main/scala/org/xapek/influxdb/Main.scala index 58fc318..0f9bc12 100644 --- a/src/main/scala/org/xapek/influxdb/Main.scala +++ b/src/main/scala/org/xapek/influxdb/Main.scala @@ -6,7 +6,9 @@ import akka.event.Logging import akka.actor.ActorDSL import akka.actor.ActorSystem import akka.actor.ActorLogging -import org.xapek.influxdb.Influxdb._ +import org.xapek.influxdb.InfluxDB._ +import java.util.Date +import java.util.Calendar case class Row(columns: Seq[String]) @@ -22,15 +24,23 @@ class GenericReader(query: String) extends Actor with ActorLogging { object Main { def main(args: Array[String]): Unit = { - def url = "http://localhost:8083/query" - val s1 = Influxdb.SELECT("test") - println(s1.columns) + val db = new InfluxDB("data", "localhost") - val s2 = Influxdb.SELECT("foo")("asd").FROM("table").WHERE(col("asd") == "asd") - println(s2) + val s2 = SELECT("foo")("asd").FROM("table").WHERE(col("asd") == "asd") - val s3 = Influxdb.SELECT("foo")("bla")("baz") FROM "asd" WHERE col("asd") == "asd" - println(s3) + val s3 = SELECT("foo")("bla")("baz") FROM "asd" WHERE time > "asd" + + val calendar = Calendar.getInstance + calendar.set(Calendar.HOUR_OF_DAY, 0) + calendar.set(Calendar.MINUTE, 0) + calendar.set(Calendar.SECOND, 0) + calendar.set(Calendar.MILLISECOND, 0) + calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) - 2) + + // SELECT mean(price) FROM "esg" WHERE $timeFilter AND sku='01011072' GROUP BY time($interval) ORDER BY asc + val foo = db(SELECT("product_name")("sku")("price") FROM "esg" WHERE col("sku") == "01011072" && time > calendar.getTime) + .map({ x => x }) + println(foo) // val system = ActorSystem("test") // val alice = system.actorOf(Props(MyReader), "alice") // diff --git a/src/main/scala/org/xapek/influxdb/Sized.scala b/src/main/scala/org/xapek/influxdb/Sized.scala index bf3bca6..2b4e6cc 100644 --- a/src/main/scala/org/xapek/influxdb/Sized.scala +++ b/src/main/scala/org/xapek/influxdb/Sized.scala @@ -30,6 +30,7 @@ object ToInt { implicit val toInt0 = new ToInt[_0] { def apply() = 0 } + implicit def toIntSucc[N <: Nat](implicit toIntN: ToInt[N]) = new ToInt[Succ[N]] { def apply() = toIntN() + 1 } diff --git a/src/test/scala/org/xapek/influxdb/InfluxdbTest.scala b/src/test/scala/org/xapek/influxdb/InfluxdbTest.scala index 806c798..4473a47 100644 --- a/src/test/scala/org/xapek/influxdb/InfluxdbTest.scala +++ b/src/test/scala/org/xapek/influxdb/InfluxdbTest.scala @@ -2,31 +2,31 @@ package org.xapek.influxdb import org.junit._ import Assert._ -import org.xapek.influxdb.Influxdb._ +import org.xapek.influxdb.InfluxDB._ @Test class AppTest { @Test def testStatements() = { - val url = "http://localhost:8086/query" + val db = new InfluxDB("data", "localhost") - assertTrue((Influxdb.SELECT("foo") FROM "bla" WHERE col("a") == "asd") + assertTrue((SELECT("foo") FROM "bla" WHERE col("a") == "asd") .toString().contains("as")) - assertTrue((Influxdb.SELECT("foo")("bla") FROM "bla" WHERE col("a") == "asd") + assertTrue((SELECT("foo")("bla") FROM "bla" WHERE col("a") == "asd") .toString().contains("foo")) + + assertTrue((SELECT("foo")("bla") FROM "bla" WHERE col("b") == 1) + .toString().contains("\"b\" = 1")) - assertTrue((Influxdb.SELECT("foo")("bla") FROM "bla") - .toString().contains("== 1")) - - assertTrue((Influxdb - SELECT "foo" + assertTrue(( + SELECT("foo") FROM "bla" WHERE col("a") == "asd" || col("b") == "C" && col("c") == "d").toString().contains("bla")) - assertTrue((Influxdb - SELECT "foo" + assertTrue(( + SELECT("foo") FROM "bla" WHERE (col("a") == "asd" || col("b") == "C") && col("c") == "d").toString().contains("foo")) diff --git a/src/test/scala/org/xapek/influxdb/SizedTest.scala b/src/test/scala/org/xapek/influxdb/SizedTest.scala index 5f51e91..525ffd9 100644 --- a/src/test/scala/org/xapek/influxdb/SizedTest.scala +++ b/src/test/scala/org/xapek/influxdb/SizedTest.scala @@ -2,7 +2,6 @@ package org.xapek.influxdb import org.junit._ import org.junit.Assert._ -import org.xapek.influxdb.Influxdb._ @Test class SizedTest { diff --git a/src/test/scala/simplelogger.properties b/src/test/scala/simplelogger.properties new file mode 100644 index 0000000..a8665f6 --- /dev/null +++ b/src/test/scala/simplelogger.properties @@ -0,0 +1,34 @@ +# SLF4J's SimpleLogger configuration file +# Simple implementation of Logger that sends all enabled log messages, for all defined loggers, to System.err. + +# Default logging detail level for all instances of SimpleLogger. +# Must be one of ("trace", "debug", "info", "warn", or "error"). +# If not specified, defaults to "info". +org.slf4j.simpleLogger.defaultLogLevel=trace + +# Logging detail level for a SimpleLogger instance named "xxxxx". +# Must be one of ("trace", "debug", "info", "warn", or "error"). +# If not specified, the default logging detail level is used. +#org.slf4j.simpleLogger.log.xxxxx= + +# Set to true if you want the current date and time to be included in output messages. +# Default is false, and will output the number of milliseconds elapsed since startup. +#org.slf4j.simpleLogger.showDateTime= + +# The date and time format to be used in the output messages. +# The pattern describing the date and time format is the same that is used in java.text.SimpleDateFormat. +# If the format is not specified or is invalid, the default format is used. +# The default format is yyyy-MM-dd HH:mm:ss:SSS Z. +#org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss:SSS Z + +# Set to true if you want to output the current thread name. +# Defaults to true. +#org.slf4j.simpleLogger.showThreadName=true + +# Set to true if you want the Logger instance name to be included in output messages. +# Defaults to true. +#org.slf4j.simpleLogger.showLogName=true + +# Set to true if you want the last component of the name to be included in output messages. +# Defaults to false. +#org.slf4j.simpleLogger.showShortLogName=false |