summaryrefslogtreecommitdiff
path: root/src/main/scala/org/xapek
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/org/xapek')
-rw-r--r--src/main/scala/org/xapek/influxdb/Influxdb.scala341
-rw-r--r--src/main/scala/org/xapek/influxdb/influxdb.scala62
-rw-r--r--src/main/scala/org/xapek/influxdb/mapper.scala110
-rw-r--r--src/main/scala/org/xapek/influxdb/query.scala62
-rw-r--r--src/main/scala/org/xapek/influxdb/types/expr.scala48
-rw-r--r--src/main/scala/org/xapek/influxdb/types/influx.scala61
-rw-r--r--src/main/scala/org/xapek/influxdbExample/example1.scala (renamed from src/main/scala/org/xapek/influxdb/Main.scala)20
7 files changed, 351 insertions, 353 deletions
diff --git a/src/main/scala/org/xapek/influxdb/Influxdb.scala b/src/main/scala/org/xapek/influxdb/Influxdb.scala
deleted file mode 100644
index 172e85b..0000000
--- a/src/main/scala/org/xapek/influxdb/Influxdb.scala
+++ /dev/null
@@ -1,341 +0,0 @@
-package org.xapek.influxdb
-
-import java.util.Date
-import java.net.URI
-import scala.collection.JavaConversions._
-import org.slf4j.LoggerFactory
-import org.json4s._
-import org.json4s.jackson.JsonMethods._
-import com.typesafe.scalalogging.Logger
-import shapeless.Nat
-import shapeless.Sized
-import shapeless.Sized._
-import shapeless.syntax._
-import shapeless.syntax.NatOps
-import shapeless.syntax.sized._
-import shapeless.Succ
-import shapeless.ops._
-import shapeless.ops.nat._
-import shapeless.ops.sized._
-import org.apache.http.NameValuePair
-import org.apache.http.client.utils.URLEncodedUtils
-import org.apache.http.client.HttpClient
-import org.apache.http.client.methods.HttpGet
-import org.apache.http.client.utils.URIBuilder
-import org.apache.http.impl.client.HttpClients
-import org.apache.http.util.EntityUtils
-
-abstract class QueryBuilderWithSelectAndFrom[L <: Nat](
- val columns: Sized[List[InfluxIdentifier], L],
- val table: InfluxIdentifier) {
-
- def count()(implicit toInt: ToInt[L]) = toInt()
-
- def count1()(implicit toInt: ToInt[Succ[L]]) = toInt() //length + 1 (for "time")
-
- def toString: String
-}
-
-class QueryBuilderSelect[L <: Nat](
- val columns: Sized[List[InfluxIdentifier], L]) {
-
- def apply(column: String) =
- new QueryBuilderSelect(columns :+ InfluxDB.influxIdentifier(column))
-
- def FROM(table: String) = new QueryBuilderFrom(this, InfluxDB.influxIdentifier(table))
-
- override def toString() = "SELECT " + columns.unsized.mkString(", ")
-}
-
-class QueryBuilderFrom[L <: Nat](
- select: QueryBuilderSelect[L],
- table: InfluxIdentifier)
- extends QueryBuilderWithSelectAndFrom[L](select.columns, table) {
-
- def WHERE[WhereT <: Expr](where: WhereT) =
- new QueryBuilderWhere(this, where)
-
- override def toString() = select.toString() + " FROM " + table
-}
-
-class QueryBuilderWhere[L <: Nat, WhereT <: Expr](
- val from: QueryBuilderFrom[L],
- val where: WhereT)
- extends QueryBuilderWithSelectAndFrom[L](from.columns, from.table) {
-
- def GROUP_BY(column: String) = new QueryBuilderGroupBy(this, List(InfluxDB.influxIdentifier(column)))
-
- override def toString() = from.toString() + " WHERE " + where
-}
-
-class QueryBuilderGroupBy[L <: Nat, WhereT <: Expr](
- where: QueryBuilderWhere[L, WhereT],
- groupBy: Seq[InfluxIdentifier])
- extends QueryBuilderWithSelectAndFrom[L](where.columns, where.table) {
-
- def apply(column: String) = new QueryBuilderGroupBy(where, groupBy :+ InfluxDB.influxIdentifier(column))
-
- override def toString() = where.toString() + " GROUP BY " + groupBy.mkString(", ")
-}
-
-protected trait E {
- def &&[E2 <: Expr](other: E2): AndExpr[this.type, E2]
- def ||[E2 <: Expr](other: E2): OrExpr[this.type, E2]
-}
-
-abstract class Expr extends E {
- def toString(): String
-
- def &&[E2 <: Expr](other: E2) = {
- new AndExpr(this, other)
- }
-
- def ||[E2 <: Expr](other: E2) = {
- new OrExpr(this, other)
- }
-
- def test(other: Expr): Expr = {
- return this
- }
-}
-
-class ValueExpr[ValueT <: InfluxValue](val value: ValueT) extends Expr {
- override def toString(): String = value.toString
-}
-
-final object BinaryOp {
- val OP_EQ = "="
- val OP_NE = "!="
- val OP_GT = ">"
- val OP_GE = ">="
- val OP_LT = "<"
- val OP_LE = "<="
-}
-
-class BinaryOp[E1 <: E, E2 <: E](val str: String, val op1: E1, val op2: E2) extends Expr {
- override def toString(): String = {
- "(" + op1.toString() + " " + str + " " + op2.toString() + ")"
- }
-}
-
-class AndExpr[E1 <: E, E2 <: E](op1: E1, op2: E2)
- extends BinaryOp[E1, E2]("AND", op1, op2)
-
-class OrExpr[E1 <: E, E2 <: E](op1: E1, op2: E2)
- extends BinaryOp[E1, E2]("OR", op1, op2)
-
-trait InfluxValue {
- type JavaT
- def toString(): String
-}
-
-class InfluxString(val value: String) extends Expr with InfluxValue {
- type JavaT = String
- override def toString(): String = {
- "'" + value.replace("\\", "\\\\").replace("'", "\\'") + "'" // TODO
- }
-}
-
-class InfluxNumber(val value: Number) extends Expr with InfluxValue {
- type JavaT = Number
-
- override def toString(): String = {
- value.toString
- }
-}
-
-class InfluxDate(val value: Date) extends Expr with InfluxValue {
- type JavaT = Date
-
- override def toString(): String = {
- val time: Long = value.getTime() / 1000
- s"${time}s"
- }
-
- // TODO fromUTC (String "2015-07-29T10:23:22.991260576Z")
- // and fromTimestamp (String "NNNNs")
-}
-
-class InfluxIdentifier(val name: String) extends Expr {
- override def toString(): String = {
- "\"" + name.replace("\\", "\\\\").replace("\"", "\\\"") + "\""
- }
-
- def ==[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
- new BinaryOp(BinaryOp.OP_EQ, this, new ValueExpr[ValueT](value))
-
- def !=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
- new BinaryOp(BinaryOp.OP_NE, this, new ValueExpr[ValueT](value))
-
- def >[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
- new BinaryOp(BinaryOp.OP_GT, this, new ValueExpr[ValueT](value))
-
- def >=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
- new BinaryOp(BinaryOp.OP_GE, this, new ValueExpr[ValueT](value))
-
- def <[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
- new BinaryOp(BinaryOp.OP_LT, this, new ValueExpr[ValueT](value))
-
- def <=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
- new BinaryOp(BinaryOp.OP_LE, this, new ValueExpr[ValueT](value))
-
- implicit def influxIdentifier(s: String): InfluxIdentifier = new InfluxIdentifier(s)
-}
-
-case class JsonResult(series: List[JsonSeries], error: String) {
- def this(series: List[JsonSeries]) = this(series, null)
-}
-
-case class JsonSeries(name: String, columns: List[String], values: List[List[Any]]) {
- override def toString() = values.toString
-}
-
-class InfluxdbError(val serverMessage: String)
- extends Exception(s"Server responded: $serverMessage")
-
-trait ToSized[T, N <: Nat] extends Serializable {
- def apply(x: Seq[T]): Sized[IndexedSeq[T], Succ[N]]
- type Out <: Nat
-}
-
-object ToSizedX {
- def apply[T, N <: Nat](implicit toSized: ToSized[T, N]): ToSized[T, N] = toSized
-
- implicit def toSized0[T, N <: Nat] = new ToSized[T, Nat._0] {
- type Out = Nat._0
- def apply(x: Seq[T]) = Sized(x(0))
- }
-}
-
-trait ToFunction[N <: Nat] extends Serializable { self =>
- type Out[X]
- type Col = Sized[Seq[InfluxValue], N]
- def convert[B](f: self.Out[B], columns: Col): B
-}
-class toFunction0 extends ToFunction[Nat._0] {
- type Out[X] = Function1[String, X]
- override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString)
-}
-class toFunction1 extends ToFunction[Nat._1] {
- type Out[X] = Function2[String, InfluxValue, X]
- override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1))
-}
-class toFunction2 extends ToFunction[Nat._2] {
- type Out[X] = Function3[String, InfluxValue, InfluxValue, X]
- override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2))
-}
-class toFunction3 extends ToFunction[Nat._3] {
- type Out[X] = Function4[String, InfluxValue, InfluxValue, InfluxValue, X]
- override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2), columns(3))
-}
-object ToFunction {
- implicit def iToFunction0 = new toFunction0()
- implicit def iToFunction1 = new toFunction1()
- implicit def iToFunction2 = new toFunction2()
- implicit def iToFunction3 = new toFunction3()
-
- def toFunction[L <: Nat](implicit toF: ToFunction[L]) = toF
-}
-
-object Converter {
- def toSized[T, L <: Nat](x: Seq[T])(implicit toSizedF: ToSized[T, L]) = toSizedF(x)
-}
-
-class Mapper[L <: Nat, Z <: ToFunction[L]](influxdb: InfluxDB,
- query: QueryBuilderWithSelectAndFrom[L])(
- implicit toF: Z) {
- private def httpClient = HttpClients.createDefault()
- private def logger = Logger(LoggerFactory.getLogger(getClass))
-
- private def buildURI(): URI = {
- return new URIBuilder()
- .setScheme("http")
- .setHost(influxdb.host)
- .setPort(influxdb.port)
- .setPath("/query")
- .setParameter("u", influxdb.username)
- .setParameter("p", influxdb.password)
- .setParameter("db", influxdb.db)
- .setParameter("q", query.toString)
- .build
- }
-
- def map[B](f: toF.Out[B]): List[B] = {
- implicit val formats = DefaultFormats
- val httpGet = new HttpGet(buildURI())
-
- logger.debug("Query={}", query.toString())
- logger.debug("Call: {}", httpGet.getURI)
-
- val response = httpClient.execute(httpGet);
- try {
- if (response.getStatusLine().getStatusCode() != 200) {
- throw new InfluxdbError("Status code " + response.getStatusLine.getStatusCode)
- }
- val responseEntity = response.getEntity();
-
- val responseJson = parse(responseEntity.getContent)
- val results = (responseJson \ "results").extract[List[JsonResult]]
- val errors = results.filter { _.error != null }.map { _.error }
- if (errors.size > 0) {
- throw new InfluxdbError(errors.mkString(". "))
- }
- val res = results(0).series(0).values.map { x =>
- val cc: List[InfluxValue] = x.map { x => new InfluxString(x.toString()) }
- val z: Sized[Seq[InfluxValue], L] = Sized.wrap[List[InfluxValue], L](cc)
- toF.convert(f, z)
- }
- return res
- } finally {
- response.close();
- }
- }
-}
-
-case class InfluxDB(val host: String, val port: Int = 8086,
- val db: String = "data", val path: String = "query",
- val username: String = "", val password: String = "")
-
-private case class UriNameValue(val name: String, val value: String) {
- def this(nvp: NameValuePair) = this(nvp.getName, nvp.getValue)
-}
-
-object InfluxDB {
- def SELECT(column: String) = {
- new QueryBuilderSelect(Sized[List](influxIdentifier(column)))
- }
-
- implicit def influxIdentifier(s: String): InfluxIdentifier = new InfluxIdentifier(s)
-
- implicit def influxString(s: String) = new InfluxString(s)
-
- implicit def influxNumber(n: Int) = new InfluxNumber(n)
-
- implicit def influxNumber(n: Double) = new InfluxNumber(n)
-
- implicit def influxDate(n: Date): InfluxDate = new InfluxDate(n)
-
- def fromUrl(url: String): InfluxDB = {
- val uri = new URI(url)
- var influxdb = new InfluxDB(uri.getHost)
- URLEncodedUtils.parse(uri, "UTF-8").toList
- .map(new UriNameValue(_)).foreach { x =>
- x match {
- case UriNameValue("db", db) => influxdb = influxdb.copy(db = db)
- case UriNameValue("username", username) => influxdb = influxdb.copy(username = username)
- case UriNameValue("password", password) => influxdb = influxdb.copy(password = password)
- case _ =>
- }
- }
- if (uri.getPort != -1)
- influxdb = influxdb.copy(port = uri.getPort)
-
- if (uri.getPath.length() > 1)
- influxdb = influxdb.copy(path = uri.getPath)
- influxdb
- }
-
- def col(col: String) = new InfluxIdentifier(col)
-
- def time = new InfluxIdentifier("time")
-} \ No newline at end of file
diff --git a/src/main/scala/org/xapek/influxdb/influxdb.scala b/src/main/scala/org/xapek/influxdb/influxdb.scala
new file mode 100644
index 0000000..0aeb856
--- /dev/null
+++ b/src/main/scala/org/xapek/influxdb/influxdb.scala
@@ -0,0 +1,62 @@
+package org.xapek.influxdb
+
+import java.util.Date
+import java.net.URI
+import scala.collection.JavaConversions._
+import shapeless.Sized
+import org.apache.http.client.utils.URLEncodedUtils
+import org.apache.http.NameValuePair
+
+import org.xapek.influxdb.types.{InfluxIdentifier, InfluxString, InfluxNumber, InfluxDate}
+
+
+class InfluxdbError(val serverMessage: String)
+ extends Exception(s"Server responded: $serverMessage")
+
+case class InfluxDB(val host: String, val port: Int = 8086,
+ val db: String = "data", val path: String = "query",
+ val username: String = "", val password: String = "")
+
+private case class UriNameValue(val name: String, val value: String) {
+ def this(nvp: NameValuePair) = this(nvp.getName, nvp.getValue)
+}
+
+object InfluxDB {
+ def SELECT(column: String) = {
+ new QueryBuilderSelect(Sized[List](influxIdentifier(column)))
+ }
+
+ implicit def influxIdentifier(s: String): InfluxIdentifier = new InfluxIdentifier(s)
+
+ implicit def influxString(s: String) = new InfluxString(s)
+
+ implicit def influxNumber(n: Int) = new InfluxNumber(n)
+
+ implicit def influxNumber(n: Double) = new InfluxNumber(n)
+
+ implicit def influxDate(n: Date): InfluxDate = new InfluxDate(n)
+
+ def fromUrl(url: String): InfluxDB = {
+ val uri = new URI(url)
+ var influxdb = new InfluxDB(uri.getHost)
+ URLEncodedUtils.parse(uri, "UTF-8").toList
+ .map(new UriNameValue(_)).foreach { x =>
+ x match {
+ case UriNameValue("db", db) => influxdb = influxdb.copy(db = db)
+ case UriNameValue("username", username) => influxdb = influxdb.copy(username = username)
+ case UriNameValue("password", password) => influxdb = influxdb.copy(password = password)
+ case _ =>
+ }
+ }
+ if (uri.getPort != -1)
+ influxdb = influxdb.copy(port = uri.getPort)
+
+ if (uri.getPath.length() > 1)
+ influxdb = influxdb.copy(path = uri.getPath)
+ influxdb
+ }
+
+ def col(col: String) = new InfluxIdentifier(col)
+
+ def time = new InfluxIdentifier("time")
+} \ No newline at end of file
diff --git a/src/main/scala/org/xapek/influxdb/mapper.scala b/src/main/scala/org/xapek/influxdb/mapper.scala
new file mode 100644
index 0000000..fc0345b
--- /dev/null
+++ b/src/main/scala/org/xapek/influxdb/mapper.scala
@@ -0,0 +1,110 @@
+package org.xapek.influxdb
+
+import java.net.URI
+import shapeless.Nat
+import shapeless.Sized
+import com.typesafe.scalalogging.Logger
+import org.json4s.DefaultFormats
+import org.json4s.jackson.JsonMethods
+import org.apache.http.NameValuePair
+import org.apache.http.client.utils.URLEncodedUtils
+import org.apache.http.client.HttpClient
+import org.apache.http.client.methods.HttpGet
+import org.apache.http.client.utils.URIBuilder
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.util.EntityUtils
+import org.slf4j.LoggerFactory
+
+import org.xapek.influxdb.types.InfluxValue
+import org.xapek.influxdb.types.InfluxString
+
+private case class JsonResult(series: List[JsonSeries], error: String) {
+ def this(series: List[JsonSeries]) = this(series, null)
+}
+
+private[influxdb] case class JsonSeries(name: String, columns: List[String], values: List[List[Any]]) {
+ override def toString() = values.toString
+}
+
+private[influxdb] trait ToFunction[N <: Nat] extends Serializable { self =>
+ type Out[X]
+ type Col = Sized[Seq[InfluxValue], N]
+ def convert[B](f: self.Out[B], columns: Col): B
+}
+
+private[influxdb] class toFunction0 extends ToFunction[Nat._0] {
+ type Out[X] = Function1[String, X]
+ override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString)
+}
+private[influxdb] class toFunction1 extends ToFunction[Nat._1] {
+ type Out[X] = Function2[String, InfluxValue, X]
+ override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1))
+}
+private[influxdb] class toFunction2 extends ToFunction[Nat._2] {
+ type Out[X] = Function3[String, InfluxValue, InfluxValue, X]
+ override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2))
+}
+private[influxdb] class toFunction3 extends ToFunction[Nat._3] {
+ type Out[X] = Function4[String, InfluxValue, InfluxValue, InfluxValue, X]
+ override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2), columns(3))
+}
+private[influxdb] object ToFunction {
+ implicit def iToFunction0 = new toFunction0()
+ implicit def iToFunction1 = new toFunction1()
+ implicit def iToFunction2 = new toFunction2()
+ implicit def iToFunction3 = new toFunction3()
+
+ def toFunction[L <: Nat](implicit toF: ToFunction[L]) = toF
+}
+
+
+class Mapper[L <: Nat, Z <: ToFunction[L]](influxdb: InfluxDB,
+ query: QueryBuilderWithSelectAndFrom[L])(
+ implicit toF: Z) {
+ private def httpClient = HttpClients.createDefault()
+ private def logger = Logger(LoggerFactory.getLogger(getClass))
+
+ private def buildURI(): URI = {
+ return new URIBuilder()
+ .setScheme("http")
+ .setHost(influxdb.host)
+ .setPort(influxdb.port)
+ .setPath("/query")
+ .setParameter("u", influxdb.username)
+ .setParameter("p", influxdb.password)
+ .setParameter("db", influxdb.db)
+ .setParameter("q", query.toString)
+ .build
+ }
+
+ def map[B](f: toF.Out[B]): List[B] = {
+ implicit val formats = DefaultFormats
+ val httpGet = new HttpGet(buildURI())
+
+ logger.debug("Query={}", query.toString())
+ logger.debug("Call: {}", httpGet.getURI)
+
+ val response = httpClient.execute(httpGet);
+ try {
+ if (response.getStatusLine().getStatusCode() != 200) {
+ throw new InfluxdbError("Status code " + response.getStatusLine.getStatusCode)
+ }
+ val responseEntity = response.getEntity();
+
+ val responseJson = JsonMethods.parse(responseEntity.getContent)
+ val results = (responseJson \ "results").extract[List[JsonResult]]
+ val errors = results.filter { _.error != null }.map { _.error }
+ if (errors.size > 0) {
+ throw new InfluxdbError(errors.mkString(". "))
+ }
+ val res = results(0).series(0).values.map { x =>
+ val cc: List[InfluxValue] = x.map { x => new InfluxString(x.toString()) }
+ val z: Sized[Seq[InfluxValue], L] = Sized.wrap[List[InfluxValue], L](cc)
+ toF.convert(f, z)
+ }
+ return res
+ } finally {
+ response.close();
+ }
+ }
+}
diff --git a/src/main/scala/org/xapek/influxdb/query.scala b/src/main/scala/org/xapek/influxdb/query.scala
new file mode 100644
index 0000000..022c888
--- /dev/null
+++ b/src/main/scala/org/xapek/influxdb/query.scala
@@ -0,0 +1,62 @@
+package org.xapek.influxdb
+
+import shapeless.Sized
+import shapeless.Nat
+import shapeless.ops.nat.ToInt
+import shapeless.Succ
+
+import org.xapek.influxdb.types.InfluxIdentifier
+import org.xapek.influxdb.types.Expr
+
+abstract class QueryBuilderWithSelectAndFrom[L <: Nat](
+ val columns: Sized[List[InfluxIdentifier], L],
+ val table: InfluxIdentifier) {
+
+ def count()(implicit toInt: ToInt[L]) = toInt()
+
+ def count1()(implicit toInt: ToInt[Succ[L]]) = toInt() //length + 1 (for "time")
+
+ def toString: String
+}
+
+class QueryBuilderSelect[L <: Nat](
+ val columns: Sized[List[InfluxIdentifier], L]) {
+
+ def apply(column: String) =
+ new QueryBuilderSelect(columns :+ InfluxDB.influxIdentifier(column))
+
+ def FROM(table: String) = new QueryBuilderFrom(this, InfluxDB.influxIdentifier(table))
+
+ override def toString() = "SELECT " + columns.unsized.mkString(", ")
+}
+
+class QueryBuilderFrom[L <: Nat](
+ select: QueryBuilderSelect[L],
+ table: InfluxIdentifier)
+ extends QueryBuilderWithSelectAndFrom[L](select.columns, table) {
+
+ def WHERE[WhereT <: Expr](where: WhereT) =
+ new QueryBuilderWhere(this, where)
+
+ override def toString() = select.toString() + " FROM " + table
+}
+
+class QueryBuilderWhere[L <: Nat, WhereT <: Expr](
+ val from: QueryBuilderFrom[L],
+ val where: WhereT)
+ extends QueryBuilderWithSelectAndFrom[L](from.columns, from.table) {
+
+ def GROUP_BY(column: String) = new QueryBuilderGroupBy(this, List(InfluxDB.influxIdentifier(column)))
+
+ override def toString() = from.toString() + " WHERE " + where
+}
+
+class QueryBuilderGroupBy[L <: Nat, WhereT <: Expr](
+ where: QueryBuilderWhere[L, WhereT],
+ groupBy: Seq[InfluxIdentifier])
+ extends QueryBuilderWithSelectAndFrom[L](where.columns, where.table) {
+
+ def apply(column: String) = new QueryBuilderGroupBy(where, groupBy :+ InfluxDB.influxIdentifier(column))
+
+ override def toString() = where.toString() + " GROUP BY " + groupBy.mkString(", ")
+}
diff --git a/src/main/scala/org/xapek/influxdb/types/expr.scala b/src/main/scala/org/xapek/influxdb/types/expr.scala
new file mode 100644
index 0000000..7074e04
--- /dev/null
+++ b/src/main/scala/org/xapek/influxdb/types/expr.scala
@@ -0,0 +1,48 @@
+package org.xapek.influxdb.types
+
+
+private[types] trait E {
+ def &&[E2 <: Expr](other: E2): AndExpr[this.type, E2]
+ def ||[E2 <: Expr](other: E2): OrExpr[this.type, E2]
+}
+
+abstract class Expr extends E {
+ def toString(): String
+
+ def &&[E2 <: Expr](other: E2) = {
+ new AndExpr(this, other)
+ }
+
+ def ||[E2 <: Expr](other: E2) = {
+ new OrExpr(this, other)
+ }
+
+ def test(other: Expr): Expr = {
+ return this
+ }
+}
+
+class ValueExpr[ValueT <: InfluxValue](val value: ValueT) extends Expr {
+ override def toString(): String = value.toString
+}
+
+final object BinaryOp {
+ val OP_EQ = "="
+ val OP_NE = "!="
+ val OP_GT = ">"
+ val OP_GE = ">="
+ val OP_LT = "<"
+ val OP_LE = "<="
+}
+
+class BinaryOp[E1 <: E, E2 <: E](val str: String, val op1: E1, val op2: E2) extends Expr {
+ override def toString(): String = {
+ "(" + op1.toString() + " " + str + " " + op2.toString() + ")"
+ }
+}
+
+class AndExpr[E1 <: E, E2 <: E](op1: E1, op2: E2)
+ extends BinaryOp[E1, E2]("AND", op1, op2)
+
+class OrExpr[E1 <: E, E2 <: E](op1: E1, op2: E2)
+ extends BinaryOp[E1, E2]("OR", op1, op2)
diff --git a/src/main/scala/org/xapek/influxdb/types/influx.scala b/src/main/scala/org/xapek/influxdb/types/influx.scala
new file mode 100644
index 0000000..e2806f0
--- /dev/null
+++ b/src/main/scala/org/xapek/influxdb/types/influx.scala
@@ -0,0 +1,61 @@
+package org.xapek.influxdb.types
+
+import java.util.Date
+
+trait InfluxValue {
+ type JavaT
+ def toString(): String
+}
+
+class InfluxString(val value: String) extends Expr with InfluxValue {
+ type JavaT = String
+ override def toString(): String = {
+ "'" + value.replace("\\", "\\\\").replace("'", "\\'") + "'" // TODO
+ }
+}
+
+class InfluxNumber(val value: Number) extends Expr with InfluxValue {
+ type JavaT = Number
+
+ override def toString(): String = {
+ value.toString
+ }
+}
+
+class InfluxDate(val value: Date) extends Expr with InfluxValue {
+ type JavaT = Date
+
+ override def toString(): String = {
+ val time: Long = value.getTime() / 1000
+ s"${time}s"
+ }
+
+ // TODO fromUTC (String "2015-07-29T10:23:22.991260576Z")
+ // and fromTimestamp (String "NNNNs")
+}
+
+class InfluxIdentifier(val name: String) extends Expr {
+ override def toString(): String = {
+ "\"" + name.replace("\\", "\\\\").replace("\"", "\\\"") + "\""
+ }
+
+ def ==[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
+ new BinaryOp(BinaryOp.OP_EQ, this, new ValueExpr[ValueT](value))
+
+ def !=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
+ new BinaryOp(BinaryOp.OP_NE, this, new ValueExpr[ValueT](value))
+
+ def >[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
+ new BinaryOp(BinaryOp.OP_GT, this, new ValueExpr[ValueT](value))
+
+ def >=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
+ new BinaryOp(BinaryOp.OP_GE, this, new ValueExpr[ValueT](value))
+
+ def <[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
+ new BinaryOp(BinaryOp.OP_LT, this, new ValueExpr[ValueT](value))
+
+ def <=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
+ new BinaryOp(BinaryOp.OP_LE, this, new ValueExpr[ValueT](value))
+
+ implicit def influxIdentifier(s: String): InfluxIdentifier = new InfluxIdentifier(s)
+}
diff --git a/src/main/scala/org/xapek/influxdb/Main.scala b/src/main/scala/org/xapek/influxdbExample/example1.scala
index 206183c..85b48eb 100644
--- a/src/main/scala/org/xapek/influxdb/Main.scala
+++ b/src/main/scala/org/xapek/influxdbExample/example1.scala
@@ -1,19 +1,17 @@
-package org.xapek.influxdb
-
+package org.xapek.influxdbExample
import java.util.Date
import java.util.Calendar
+import java.util.concurrent.TimeUnit
import akka.actor.Actor
import akka.actor.Props
-import akka.event.Logging
-import akka.actor.ActorDSL
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.ActorLogging
-import com.typesafe.config.ConfigFactory
-import org.xapek.influxdb.InfluxDB._
import scala.concurrent.duration.Duration
-import java.util.concurrent.TimeUnit
+import org.xapek.influxdb.InfluxDB._
+import org.xapek.influxdb.InfluxDB
+import org.xapek.influxdb.Mapper
case object TimerSignal
@@ -37,7 +35,7 @@ class ConsoleWriter extends Actor with ActorLogging {
object Main {
def main(args: Array[String]): Unit = {
val db: InfluxDB = fromUrl("http://root:root@db.2.localnet.cc:8086/query?db=data&user=root&password=root")
-
+
val calendar = Calendar.getInstance
calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) - 4)
val date1 = calendar.getTime
@@ -49,10 +47,8 @@ object Main {
val writer = system.actorOf(Props(new ConsoleWriter()), "writer")
val reader = system.actorOf(Props(new InfluxReader(db, date1, date2, writer)), "reader")
- system.scheduler.schedule(
- Duration.create(5, TimeUnit.SECONDS),
- Duration.create(5, TimeUnit.SECONDS),
- reader, TimerSignal)(system.dispatcher, null)
+ system.scheduler.schedule(Duration.Zero, Duration.create(5, TimeUnit.SECONDS),
+ reader, TimerSignal)(system.dispatcher, null)
println("Press key to exit")
System.in.read()