From dcabe1a774ac3ab6eee7e837aa09ee26b60e8b82 Mon Sep 17 00:00:00 2001 From: Yves Fischer Date: Mon, 3 Aug 2015 00:18:20 +0200 Subject: Restructure into separate files --- src/main/scala/org/xapek/influxdb/Influxdb.scala | 341 --------------------- src/main/scala/org/xapek/influxdb/Main.scala | 62 ---- src/main/scala/org/xapek/influxdb/influxdb.scala | 62 ++++ src/main/scala/org/xapek/influxdb/mapper.scala | 110 +++++++ src/main/scala/org/xapek/influxdb/query.scala | 62 ++++ src/main/scala/org/xapek/influxdb/types/expr.scala | 48 +++ .../scala/org/xapek/influxdb/types/influx.scala | 61 ++++ .../scala/org/xapek/influxdbExample/example1.scala | 58 ++++ 8 files changed, 401 insertions(+), 403 deletions(-) delete mode 100644 src/main/scala/org/xapek/influxdb/Influxdb.scala delete mode 100644 src/main/scala/org/xapek/influxdb/Main.scala create mode 100644 src/main/scala/org/xapek/influxdb/influxdb.scala create mode 100644 src/main/scala/org/xapek/influxdb/mapper.scala create mode 100644 src/main/scala/org/xapek/influxdb/query.scala create mode 100644 src/main/scala/org/xapek/influxdb/types/expr.scala create mode 100644 src/main/scala/org/xapek/influxdb/types/influx.scala create mode 100644 src/main/scala/org/xapek/influxdbExample/example1.scala diff --git a/src/main/scala/org/xapek/influxdb/Influxdb.scala b/src/main/scala/org/xapek/influxdb/Influxdb.scala deleted file mode 100644 index 172e85b..0000000 --- a/src/main/scala/org/xapek/influxdb/Influxdb.scala +++ /dev/null @@ -1,341 +0,0 @@ -package org.xapek.influxdb - -import java.util.Date -import java.net.URI -import scala.collection.JavaConversions._ -import org.slf4j.LoggerFactory -import org.json4s._ -import org.json4s.jackson.JsonMethods._ -import com.typesafe.scalalogging.Logger -import shapeless.Nat -import shapeless.Sized -import shapeless.Sized._ -import shapeless.syntax._ -import shapeless.syntax.NatOps -import shapeless.syntax.sized._ -import shapeless.Succ -import shapeless.ops._ -import shapeless.ops.nat._ -import shapeless.ops.sized._ -import org.apache.http.NameValuePair -import org.apache.http.client.utils.URLEncodedUtils -import org.apache.http.client.HttpClient -import org.apache.http.client.methods.HttpGet -import org.apache.http.client.utils.URIBuilder -import org.apache.http.impl.client.HttpClients -import org.apache.http.util.EntityUtils - -abstract class QueryBuilderWithSelectAndFrom[L <: Nat]( - val columns: Sized[List[InfluxIdentifier], L], - val table: InfluxIdentifier) { - - def count()(implicit toInt: ToInt[L]) = toInt() - - def count1()(implicit toInt: ToInt[Succ[L]]) = toInt() //length + 1 (for "time") - - def toString: String -} - -class QueryBuilderSelect[L <: Nat]( - val columns: Sized[List[InfluxIdentifier], L]) { - - def apply(column: String) = - new QueryBuilderSelect(columns :+ InfluxDB.influxIdentifier(column)) - - def FROM(table: String) = new QueryBuilderFrom(this, InfluxDB.influxIdentifier(table)) - - override def toString() = "SELECT " + columns.unsized.mkString(", ") -} - -class QueryBuilderFrom[L <: Nat]( - select: QueryBuilderSelect[L], - table: InfluxIdentifier) - extends QueryBuilderWithSelectAndFrom[L](select.columns, table) { - - def WHERE[WhereT <: Expr](where: WhereT) = - new QueryBuilderWhere(this, where) - - override def toString() = select.toString() + " FROM " + table -} - -class QueryBuilderWhere[L <: Nat, WhereT <: Expr]( - val from: QueryBuilderFrom[L], - val where: WhereT) - extends QueryBuilderWithSelectAndFrom[L](from.columns, from.table) { - - def GROUP_BY(column: String) = new QueryBuilderGroupBy(this, List(InfluxDB.influxIdentifier(column))) - - override def toString() = from.toString() + " WHERE " + where -} - -class QueryBuilderGroupBy[L <: Nat, WhereT <: Expr]( - where: QueryBuilderWhere[L, WhereT], - groupBy: Seq[InfluxIdentifier]) - extends QueryBuilderWithSelectAndFrom[L](where.columns, where.table) { - - def apply(column: String) = new QueryBuilderGroupBy(where, groupBy :+ InfluxDB.influxIdentifier(column)) - - override def toString() = where.toString() + " GROUP BY " + groupBy.mkString(", ") -} - -protected trait E { - def &&[E2 <: Expr](other: E2): AndExpr[this.type, E2] - def ||[E2 <: Expr](other: E2): OrExpr[this.type, E2] -} - -abstract class Expr extends E { - def toString(): String - - def &&[E2 <: Expr](other: E2) = { - new AndExpr(this, other) - } - - def ||[E2 <: Expr](other: E2) = { - new OrExpr(this, other) - } - - def test(other: Expr): Expr = { - return this - } -} - -class ValueExpr[ValueT <: InfluxValue](val value: ValueT) extends Expr { - override def toString(): String = value.toString -} - -final object BinaryOp { - val OP_EQ = "=" - val OP_NE = "!=" - val OP_GT = ">" - val OP_GE = ">=" - val OP_LT = "<" - val OP_LE = "<=" -} - -class BinaryOp[E1 <: E, E2 <: E](val str: String, val op1: E1, val op2: E2) extends Expr { - override def toString(): String = { - "(" + op1.toString() + " " + str + " " + op2.toString() + ")" - } -} - -class AndExpr[E1 <: E, E2 <: E](op1: E1, op2: E2) - extends BinaryOp[E1, E2]("AND", op1, op2) - -class OrExpr[E1 <: E, E2 <: E](op1: E1, op2: E2) - extends BinaryOp[E1, E2]("OR", op1, op2) - -trait InfluxValue { - type JavaT - def toString(): String -} - -class InfluxString(val value: String) extends Expr with InfluxValue { - type JavaT = String - override def toString(): String = { - "'" + value.replace("\\", "\\\\").replace("'", "\\'") + "'" // TODO - } -} - -class InfluxNumber(val value: Number) extends Expr with InfluxValue { - type JavaT = Number - - override def toString(): String = { - value.toString - } -} - -class InfluxDate(val value: Date) extends Expr with InfluxValue { - type JavaT = Date - - override def toString(): String = { - val time: Long = value.getTime() / 1000 - s"${time}s" - } - - // TODO fromUTC (String "2015-07-29T10:23:22.991260576Z") - // and fromTimestamp (String "NNNNs") -} - -class InfluxIdentifier(val name: String) extends Expr { - override def toString(): String = { - "\"" + name.replace("\\", "\\\\").replace("\"", "\\\"") + "\"" - } - - def ==[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = - new BinaryOp(BinaryOp.OP_EQ, this, new ValueExpr[ValueT](value)) - - def !=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = - new BinaryOp(BinaryOp.OP_NE, this, new ValueExpr[ValueT](value)) - - def >[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = - new BinaryOp(BinaryOp.OP_GT, this, new ValueExpr[ValueT](value)) - - def >=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = - new BinaryOp(BinaryOp.OP_GE, this, new ValueExpr[ValueT](value)) - - def <[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = - new BinaryOp(BinaryOp.OP_LT, this, new ValueExpr[ValueT](value)) - - def <=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = - new BinaryOp(BinaryOp.OP_LE, this, new ValueExpr[ValueT](value)) - - implicit def influxIdentifier(s: String): InfluxIdentifier = new InfluxIdentifier(s) -} - -case class JsonResult(series: List[JsonSeries], error: String) { - def this(series: List[JsonSeries]) = this(series, null) -} - -case class JsonSeries(name: String, columns: List[String], values: List[List[Any]]) { - override def toString() = values.toString -} - -class InfluxdbError(val serverMessage: String) - extends Exception(s"Server responded: $serverMessage") - -trait ToSized[T, N <: Nat] extends Serializable { - def apply(x: Seq[T]): Sized[IndexedSeq[T], Succ[N]] - type Out <: Nat -} - -object ToSizedX { - def apply[T, N <: Nat](implicit toSized: ToSized[T, N]): ToSized[T, N] = toSized - - implicit def toSized0[T, N <: Nat] = new ToSized[T, Nat._0] { - type Out = Nat._0 - def apply(x: Seq[T]) = Sized(x(0)) - } -} - -trait ToFunction[N <: Nat] extends Serializable { self => - type Out[X] - type Col = Sized[Seq[InfluxValue], N] - def convert[B](f: self.Out[B], columns: Col): B -} -class toFunction0 extends ToFunction[Nat._0] { - type Out[X] = Function1[String, X] - override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString) -} -class toFunction1 extends ToFunction[Nat._1] { - type Out[X] = Function2[String, InfluxValue, X] - override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1)) -} -class toFunction2 extends ToFunction[Nat._2] { - type Out[X] = Function3[String, InfluxValue, InfluxValue, X] - override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2)) -} -class toFunction3 extends ToFunction[Nat._3] { - type Out[X] = Function4[String, InfluxValue, InfluxValue, InfluxValue, X] - override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2), columns(3)) -} -object ToFunction { - implicit def iToFunction0 = new toFunction0() - implicit def iToFunction1 = new toFunction1() - implicit def iToFunction2 = new toFunction2() - implicit def iToFunction3 = new toFunction3() - - def toFunction[L <: Nat](implicit toF: ToFunction[L]) = toF -} - -object Converter { - def toSized[T, L <: Nat](x: Seq[T])(implicit toSizedF: ToSized[T, L]) = toSizedF(x) -} - -class Mapper[L <: Nat, Z <: ToFunction[L]](influxdb: InfluxDB, - query: QueryBuilderWithSelectAndFrom[L])( - implicit toF: Z) { - private def httpClient = HttpClients.createDefault() - private def logger = Logger(LoggerFactory.getLogger(getClass)) - - private def buildURI(): URI = { - return new URIBuilder() - .setScheme("http") - .setHost(influxdb.host) - .setPort(influxdb.port) - .setPath("/query") - .setParameter("u", influxdb.username) - .setParameter("p", influxdb.password) - .setParameter("db", influxdb.db) - .setParameter("q", query.toString) - .build - } - - def map[B](f: toF.Out[B]): List[B] = { - implicit val formats = DefaultFormats - val httpGet = new HttpGet(buildURI()) - - logger.debug("Query={}", query.toString()) - logger.debug("Call: {}", httpGet.getURI) - - val response = httpClient.execute(httpGet); - try { - if (response.getStatusLine().getStatusCode() != 200) { - throw new InfluxdbError("Status code " + response.getStatusLine.getStatusCode) - } - val responseEntity = response.getEntity(); - - val responseJson = parse(responseEntity.getContent) - val results = (responseJson \ "results").extract[List[JsonResult]] - val errors = results.filter { _.error != null }.map { _.error } - if (errors.size > 0) { - throw new InfluxdbError(errors.mkString(". ")) - } - val res = results(0).series(0).values.map { x => - val cc: List[InfluxValue] = x.map { x => new InfluxString(x.toString()) } - val z: Sized[Seq[InfluxValue], L] = Sized.wrap[List[InfluxValue], L](cc) - toF.convert(f, z) - } - return res - } finally { - response.close(); - } - } -} - -case class InfluxDB(val host: String, val port: Int = 8086, - val db: String = "data", val path: String = "query", - val username: String = "", val password: String = "") - -private case class UriNameValue(val name: String, val value: String) { - def this(nvp: NameValuePair) = this(nvp.getName, nvp.getValue) -} - -object InfluxDB { - def SELECT(column: String) = { - new QueryBuilderSelect(Sized[List](influxIdentifier(column))) - } - - implicit def influxIdentifier(s: String): InfluxIdentifier = new InfluxIdentifier(s) - - implicit def influxString(s: String) = new InfluxString(s) - - implicit def influxNumber(n: Int) = new InfluxNumber(n) - - implicit def influxNumber(n: Double) = new InfluxNumber(n) - - implicit def influxDate(n: Date): InfluxDate = new InfluxDate(n) - - def fromUrl(url: String): InfluxDB = { - val uri = new URI(url) - var influxdb = new InfluxDB(uri.getHost) - URLEncodedUtils.parse(uri, "UTF-8").toList - .map(new UriNameValue(_)).foreach { x => - x match { - case UriNameValue("db", db) => influxdb = influxdb.copy(db = db) - case UriNameValue("username", username) => influxdb = influxdb.copy(username = username) - case UriNameValue("password", password) => influxdb = influxdb.copy(password = password) - case _ => - } - } - if (uri.getPort != -1) - influxdb = influxdb.copy(port = uri.getPort) - - if (uri.getPath.length() > 1) - influxdb = influxdb.copy(path = uri.getPath) - influxdb - } - - def col(col: String) = new InfluxIdentifier(col) - - def time = new InfluxIdentifier("time") -} \ No newline at end of file diff --git a/src/main/scala/org/xapek/influxdb/Main.scala b/src/main/scala/org/xapek/influxdb/Main.scala deleted file mode 100644 index 206183c..0000000 --- a/src/main/scala/org/xapek/influxdb/Main.scala +++ /dev/null @@ -1,62 +0,0 @@ -package org.xapek.influxdb - - -import java.util.Date -import java.util.Calendar -import akka.actor.Actor -import akka.actor.Props -import akka.event.Logging -import akka.actor.ActorDSL -import akka.actor.ActorRef -import akka.actor.ActorSystem -import akka.actor.ActorLogging -import com.typesafe.config.ConfigFactory -import org.xapek.influxdb.InfluxDB._ -import scala.concurrent.duration.Duration -import java.util.concurrent.TimeUnit - -case object TimerSignal - -class InfluxReader(db: InfluxDB, date1: Date, date2: Date, receiver: ActorRef) extends Actor with ActorLogging { - val query = SELECT("Bid")("Ask")("Volume") FROM "yahoo.GOOGL" WHERE time > date1 && time < date2 - val mapper = new Mapper(db, query) - def receive = { - case TimerSignal => - log.info("Run query") - receiver ! (mapper.map { (time, bid, ask, volume) => s"time=$time bid=$bid ask=$ask volume=$volume" }) - } -} - -class ConsoleWriter extends Actor with ActorLogging { - def receive = { - case value => - log.info(value.toString) - } -} - -object Main { - def main(args: Array[String]): Unit = { - val db: InfluxDB = fromUrl("http://root:root@db.2.localnet.cc:8086/query?db=data&user=root&password=root") - - val calendar = Calendar.getInstance - calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) - 4) - val date1 = calendar.getTime - calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) + 1) - val date2 = calendar.getTime - - val system = ActorSystem("AkkaSystem") - - val writer = system.actorOf(Props(new ConsoleWriter()), "writer") - val reader = system.actorOf(Props(new InfluxReader(db, date1, date2, writer)), "reader") - - system.scheduler.schedule( - Duration.create(5, TimeUnit.SECONDS), - Duration.create(5, TimeUnit.SECONDS), - reader, TimerSignal)(system.dispatcher, null) - - println("Press key to exit") - System.in.read() - system.shutdown() - } -} - diff --git a/src/main/scala/org/xapek/influxdb/influxdb.scala b/src/main/scala/org/xapek/influxdb/influxdb.scala new file mode 100644 index 0000000..0aeb856 --- /dev/null +++ b/src/main/scala/org/xapek/influxdb/influxdb.scala @@ -0,0 +1,62 @@ +package org.xapek.influxdb + +import java.util.Date +import java.net.URI +import scala.collection.JavaConversions._ +import shapeless.Sized +import org.apache.http.client.utils.URLEncodedUtils +import org.apache.http.NameValuePair + +import org.xapek.influxdb.types.{InfluxIdentifier, InfluxString, InfluxNumber, InfluxDate} + + +class InfluxdbError(val serverMessage: String) + extends Exception(s"Server responded: $serverMessage") + +case class InfluxDB(val host: String, val port: Int = 8086, + val db: String = "data", val path: String = "query", + val username: String = "", val password: String = "") + +private case class UriNameValue(val name: String, val value: String) { + def this(nvp: NameValuePair) = this(nvp.getName, nvp.getValue) +} + +object InfluxDB { + def SELECT(column: String) = { + new QueryBuilderSelect(Sized[List](influxIdentifier(column))) + } + + implicit def influxIdentifier(s: String): InfluxIdentifier = new InfluxIdentifier(s) + + implicit def influxString(s: String) = new InfluxString(s) + + implicit def influxNumber(n: Int) = new InfluxNumber(n) + + implicit def influxNumber(n: Double) = new InfluxNumber(n) + + implicit def influxDate(n: Date): InfluxDate = new InfluxDate(n) + + def fromUrl(url: String): InfluxDB = { + val uri = new URI(url) + var influxdb = new InfluxDB(uri.getHost) + URLEncodedUtils.parse(uri, "UTF-8").toList + .map(new UriNameValue(_)).foreach { x => + x match { + case UriNameValue("db", db) => influxdb = influxdb.copy(db = db) + case UriNameValue("username", username) => influxdb = influxdb.copy(username = username) + case UriNameValue("password", password) => influxdb = influxdb.copy(password = password) + case _ => + } + } + if (uri.getPort != -1) + influxdb = influxdb.copy(port = uri.getPort) + + if (uri.getPath.length() > 1) + influxdb = influxdb.copy(path = uri.getPath) + influxdb + } + + def col(col: String) = new InfluxIdentifier(col) + + def time = new InfluxIdentifier("time") +} \ No newline at end of file diff --git a/src/main/scala/org/xapek/influxdb/mapper.scala b/src/main/scala/org/xapek/influxdb/mapper.scala new file mode 100644 index 0000000..fc0345b --- /dev/null +++ b/src/main/scala/org/xapek/influxdb/mapper.scala @@ -0,0 +1,110 @@ +package org.xapek.influxdb + +import java.net.URI +import shapeless.Nat +import shapeless.Sized +import com.typesafe.scalalogging.Logger +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods +import org.apache.http.NameValuePair +import org.apache.http.client.utils.URLEncodedUtils +import org.apache.http.client.HttpClient +import org.apache.http.client.methods.HttpGet +import org.apache.http.client.utils.URIBuilder +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils +import org.slf4j.LoggerFactory + +import org.xapek.influxdb.types.InfluxValue +import org.xapek.influxdb.types.InfluxString + +private case class JsonResult(series: List[JsonSeries], error: String) { + def this(series: List[JsonSeries]) = this(series, null) +} + +private[influxdb] case class JsonSeries(name: String, columns: List[String], values: List[List[Any]]) { + override def toString() = values.toString +} + +private[influxdb] trait ToFunction[N <: Nat] extends Serializable { self => + type Out[X] + type Col = Sized[Seq[InfluxValue], N] + def convert[B](f: self.Out[B], columns: Col): B +} + +private[influxdb] class toFunction0 extends ToFunction[Nat._0] { + type Out[X] = Function1[String, X] + override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString) +} +private[influxdb] class toFunction1 extends ToFunction[Nat._1] { + type Out[X] = Function2[String, InfluxValue, X] + override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1)) +} +private[influxdb] class toFunction2 extends ToFunction[Nat._2] { + type Out[X] = Function3[String, InfluxValue, InfluxValue, X] + override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2)) +} +private[influxdb] class toFunction3 extends ToFunction[Nat._3] { + type Out[X] = Function4[String, InfluxValue, InfluxValue, InfluxValue, X] + override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2), columns(3)) +} +private[influxdb] object ToFunction { + implicit def iToFunction0 = new toFunction0() + implicit def iToFunction1 = new toFunction1() + implicit def iToFunction2 = new toFunction2() + implicit def iToFunction3 = new toFunction3() + + def toFunction[L <: Nat](implicit toF: ToFunction[L]) = toF +} + + +class Mapper[L <: Nat, Z <: ToFunction[L]](influxdb: InfluxDB, + query: QueryBuilderWithSelectAndFrom[L])( + implicit toF: Z) { + private def httpClient = HttpClients.createDefault() + private def logger = Logger(LoggerFactory.getLogger(getClass)) + + private def buildURI(): URI = { + return new URIBuilder() + .setScheme("http") + .setHost(influxdb.host) + .setPort(influxdb.port) + .setPath("/query") + .setParameter("u", influxdb.username) + .setParameter("p", influxdb.password) + .setParameter("db", influxdb.db) + .setParameter("q", query.toString) + .build + } + + def map[B](f: toF.Out[B]): List[B] = { + implicit val formats = DefaultFormats + val httpGet = new HttpGet(buildURI()) + + logger.debug("Query={}", query.toString()) + logger.debug("Call: {}", httpGet.getURI) + + val response = httpClient.execute(httpGet); + try { + if (response.getStatusLine().getStatusCode() != 200) { + throw new InfluxdbError("Status code " + response.getStatusLine.getStatusCode) + } + val responseEntity = response.getEntity(); + + val responseJson = JsonMethods.parse(responseEntity.getContent) + val results = (responseJson \ "results").extract[List[JsonResult]] + val errors = results.filter { _.error != null }.map { _.error } + if (errors.size > 0) { + throw new InfluxdbError(errors.mkString(". ")) + } + val res = results(0).series(0).values.map { x => + val cc: List[InfluxValue] = x.map { x => new InfluxString(x.toString()) } + val z: Sized[Seq[InfluxValue], L] = Sized.wrap[List[InfluxValue], L](cc) + toF.convert(f, z) + } + return res + } finally { + response.close(); + } + } +} diff --git a/src/main/scala/org/xapek/influxdb/query.scala b/src/main/scala/org/xapek/influxdb/query.scala new file mode 100644 index 0000000..022c888 --- /dev/null +++ b/src/main/scala/org/xapek/influxdb/query.scala @@ -0,0 +1,62 @@ +package org.xapek.influxdb + +import shapeless.Sized +import shapeless.Nat +import shapeless.ops.nat.ToInt +import shapeless.Succ + +import org.xapek.influxdb.types.InfluxIdentifier +import org.xapek.influxdb.types.Expr + +abstract class QueryBuilderWithSelectAndFrom[L <: Nat]( + val columns: Sized[List[InfluxIdentifier], L], + val table: InfluxIdentifier) { + + def count()(implicit toInt: ToInt[L]) = toInt() + + def count1()(implicit toInt: ToInt[Succ[L]]) = toInt() //length + 1 (for "time") + + def toString: String +} + +class QueryBuilderSelect[L <: Nat]( + val columns: Sized[List[InfluxIdentifier], L]) { + + def apply(column: String) = + new QueryBuilderSelect(columns :+ InfluxDB.influxIdentifier(column)) + + def FROM(table: String) = new QueryBuilderFrom(this, InfluxDB.influxIdentifier(table)) + + override def toString() = "SELECT " + columns.unsized.mkString(", ") +} + +class QueryBuilderFrom[L <: Nat]( + select: QueryBuilderSelect[L], + table: InfluxIdentifier) + extends QueryBuilderWithSelectAndFrom[L](select.columns, table) { + + def WHERE[WhereT <: Expr](where: WhereT) = + new QueryBuilderWhere(this, where) + + override def toString() = select.toString() + " FROM " + table +} + +class QueryBuilderWhere[L <: Nat, WhereT <: Expr]( + val from: QueryBuilderFrom[L], + val where: WhereT) + extends QueryBuilderWithSelectAndFrom[L](from.columns, from.table) { + + def GROUP_BY(column: String) = new QueryBuilderGroupBy(this, List(InfluxDB.influxIdentifier(column))) + + override def toString() = from.toString() + " WHERE " + where +} + +class QueryBuilderGroupBy[L <: Nat, WhereT <: Expr]( + where: QueryBuilderWhere[L, WhereT], + groupBy: Seq[InfluxIdentifier]) + extends QueryBuilderWithSelectAndFrom[L](where.columns, where.table) { + + def apply(column: String) = new QueryBuilderGroupBy(where, groupBy :+ InfluxDB.influxIdentifier(column)) + + override def toString() = where.toString() + " GROUP BY " + groupBy.mkString(", ") +} diff --git a/src/main/scala/org/xapek/influxdb/types/expr.scala b/src/main/scala/org/xapek/influxdb/types/expr.scala new file mode 100644 index 0000000..7074e04 --- /dev/null +++ b/src/main/scala/org/xapek/influxdb/types/expr.scala @@ -0,0 +1,48 @@ +package org.xapek.influxdb.types + + +private[types] trait E { + def &&[E2 <: Expr](other: E2): AndExpr[this.type, E2] + def ||[E2 <: Expr](other: E2): OrExpr[this.type, E2] +} + +abstract class Expr extends E { + def toString(): String + + def &&[E2 <: Expr](other: E2) = { + new AndExpr(this, other) + } + + def ||[E2 <: Expr](other: E2) = { + new OrExpr(this, other) + } + + def test(other: Expr): Expr = { + return this + } +} + +class ValueExpr[ValueT <: InfluxValue](val value: ValueT) extends Expr { + override def toString(): String = value.toString +} + +final object BinaryOp { + val OP_EQ = "=" + val OP_NE = "!=" + val OP_GT = ">" + val OP_GE = ">=" + val OP_LT = "<" + val OP_LE = "<=" +} + +class BinaryOp[E1 <: E, E2 <: E](val str: String, val op1: E1, val op2: E2) extends Expr { + override def toString(): String = { + "(" + op1.toString() + " " + str + " " + op2.toString() + ")" + } +} + +class AndExpr[E1 <: E, E2 <: E](op1: E1, op2: E2) + extends BinaryOp[E1, E2]("AND", op1, op2) + +class OrExpr[E1 <: E, E2 <: E](op1: E1, op2: E2) + extends BinaryOp[E1, E2]("OR", op1, op2) diff --git a/src/main/scala/org/xapek/influxdb/types/influx.scala b/src/main/scala/org/xapek/influxdb/types/influx.scala new file mode 100644 index 0000000..e2806f0 --- /dev/null +++ b/src/main/scala/org/xapek/influxdb/types/influx.scala @@ -0,0 +1,61 @@ +package org.xapek.influxdb.types + +import java.util.Date + +trait InfluxValue { + type JavaT + def toString(): String +} + +class InfluxString(val value: String) extends Expr with InfluxValue { + type JavaT = String + override def toString(): String = { + "'" + value.replace("\\", "\\\\").replace("'", "\\'") + "'" // TODO + } +} + +class InfluxNumber(val value: Number) extends Expr with InfluxValue { + type JavaT = Number + + override def toString(): String = { + value.toString + } +} + +class InfluxDate(val value: Date) extends Expr with InfluxValue { + type JavaT = Date + + override def toString(): String = { + val time: Long = value.getTime() / 1000 + s"${time}s" + } + + // TODO fromUTC (String "2015-07-29T10:23:22.991260576Z") + // and fromTimestamp (String "NNNNs") +} + +class InfluxIdentifier(val name: String) extends Expr { + override def toString(): String = { + "\"" + name.replace("\\", "\\\\").replace("\"", "\\\"") + "\"" + } + + def ==[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = + new BinaryOp(BinaryOp.OP_EQ, this, new ValueExpr[ValueT](value)) + + def !=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = + new BinaryOp(BinaryOp.OP_NE, this, new ValueExpr[ValueT](value)) + + def >[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = + new BinaryOp(BinaryOp.OP_GT, this, new ValueExpr[ValueT](value)) + + def >=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = + new BinaryOp(BinaryOp.OP_GE, this, new ValueExpr[ValueT](value)) + + def <[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = + new BinaryOp(BinaryOp.OP_LT, this, new ValueExpr[ValueT](value)) + + def <=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = + new BinaryOp(BinaryOp.OP_LE, this, new ValueExpr[ValueT](value)) + + implicit def influxIdentifier(s: String): InfluxIdentifier = new InfluxIdentifier(s) +} diff --git a/src/main/scala/org/xapek/influxdbExample/example1.scala b/src/main/scala/org/xapek/influxdbExample/example1.scala new file mode 100644 index 0000000..85b48eb --- /dev/null +++ b/src/main/scala/org/xapek/influxdbExample/example1.scala @@ -0,0 +1,58 @@ +package org.xapek.influxdbExample + +import java.util.Date +import java.util.Calendar +import java.util.concurrent.TimeUnit +import akka.actor.Actor +import akka.actor.Props +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.ActorLogging +import scala.concurrent.duration.Duration +import org.xapek.influxdb.InfluxDB._ +import org.xapek.influxdb.InfluxDB +import org.xapek.influxdb.Mapper + +case object TimerSignal + +class InfluxReader(db: InfluxDB, date1: Date, date2: Date, receiver: ActorRef) extends Actor with ActorLogging { + val query = SELECT("Bid")("Ask")("Volume") FROM "yahoo.GOOGL" WHERE time > date1 && time < date2 + val mapper = new Mapper(db, query) + def receive = { + case TimerSignal => + log.info("Run query") + receiver ! (mapper.map { (time, bid, ask, volume) => s"time=$time bid=$bid ask=$ask volume=$volume" }) + } +} + +class ConsoleWriter extends Actor with ActorLogging { + def receive = { + case value => + log.info(value.toString) + } +} + +object Main { + def main(args: Array[String]): Unit = { + val db: InfluxDB = fromUrl("http://root:root@db.2.localnet.cc:8086/query?db=data&user=root&password=root") + + val calendar = Calendar.getInstance + calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) - 4) + val date1 = calendar.getTime + calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) + 1) + val date2 = calendar.getTime + + val system = ActorSystem("AkkaSystem") + + val writer = system.actorOf(Props(new ConsoleWriter()), "writer") + val reader = system.actorOf(Props(new InfluxReader(db, date1, date2, writer)), "reader") + + system.scheduler.schedule(Duration.Zero, Duration.create(5, TimeUnit.SECONDS), + reader, TimerSignal)(system.dispatcher, null) + + println("Press key to exit") + System.in.read() + system.shutdown() + } +} + -- cgit v1.2.1