summaryrefslogtreecommitdiff
path: root/src/main/scala/org/xapek/influxdb/Influxdb.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/org/xapek/influxdb/Influxdb.scala')
-rw-r--r--src/main/scala/org/xapek/influxdb/Influxdb.scala341
1 files changed, 0 insertions, 341 deletions
diff --git a/src/main/scala/org/xapek/influxdb/Influxdb.scala b/src/main/scala/org/xapek/influxdb/Influxdb.scala
deleted file mode 100644
index 172e85b..0000000
--- a/src/main/scala/org/xapek/influxdb/Influxdb.scala
+++ /dev/null
@@ -1,341 +0,0 @@
-package org.xapek.influxdb
-
-import java.util.Date
-import java.net.URI
-import scala.collection.JavaConversions._
-import org.slf4j.LoggerFactory
-import org.json4s._
-import org.json4s.jackson.JsonMethods._
-import com.typesafe.scalalogging.Logger
-import shapeless.Nat
-import shapeless.Sized
-import shapeless.Sized._
-import shapeless.syntax._
-import shapeless.syntax.NatOps
-import shapeless.syntax.sized._
-import shapeless.Succ
-import shapeless.ops._
-import shapeless.ops.nat._
-import shapeless.ops.sized._
-import org.apache.http.NameValuePair
-import org.apache.http.client.utils.URLEncodedUtils
-import org.apache.http.client.HttpClient
-import org.apache.http.client.methods.HttpGet
-import org.apache.http.client.utils.URIBuilder
-import org.apache.http.impl.client.HttpClients
-import org.apache.http.util.EntityUtils
-
-abstract class QueryBuilderWithSelectAndFrom[L <: Nat](
- val columns: Sized[List[InfluxIdentifier], L],
- val table: InfluxIdentifier) {
-
- def count()(implicit toInt: ToInt[L]) = toInt()
-
- def count1()(implicit toInt: ToInt[Succ[L]]) = toInt() //length + 1 (for "time")
-
- def toString: String
-}
-
-class QueryBuilderSelect[L <: Nat](
- val columns: Sized[List[InfluxIdentifier], L]) {
-
- def apply(column: String) =
- new QueryBuilderSelect(columns :+ InfluxDB.influxIdentifier(column))
-
- def FROM(table: String) = new QueryBuilderFrom(this, InfluxDB.influxIdentifier(table))
-
- override def toString() = "SELECT " + columns.unsized.mkString(", ")
-}
-
-class QueryBuilderFrom[L <: Nat](
- select: QueryBuilderSelect[L],
- table: InfluxIdentifier)
- extends QueryBuilderWithSelectAndFrom[L](select.columns, table) {
-
- def WHERE[WhereT <: Expr](where: WhereT) =
- new QueryBuilderWhere(this, where)
-
- override def toString() = select.toString() + " FROM " + table
-}
-
-class QueryBuilderWhere[L <: Nat, WhereT <: Expr](
- val from: QueryBuilderFrom[L],
- val where: WhereT)
- extends QueryBuilderWithSelectAndFrom[L](from.columns, from.table) {
-
- def GROUP_BY(column: String) = new QueryBuilderGroupBy(this, List(InfluxDB.influxIdentifier(column)))
-
- override def toString() = from.toString() + " WHERE " + where
-}
-
-class QueryBuilderGroupBy[L <: Nat, WhereT <: Expr](
- where: QueryBuilderWhere[L, WhereT],
- groupBy: Seq[InfluxIdentifier])
- extends QueryBuilderWithSelectAndFrom[L](where.columns, where.table) {
-
- def apply(column: String) = new QueryBuilderGroupBy(where, groupBy :+ InfluxDB.influxIdentifier(column))
-
- override def toString() = where.toString() + " GROUP BY " + groupBy.mkString(", ")
-}
-
-protected trait E {
- def &&[E2 <: Expr](other: E2): AndExpr[this.type, E2]
- def ||[E2 <: Expr](other: E2): OrExpr[this.type, E2]
-}
-
-abstract class Expr extends E {
- def toString(): String
-
- def &&[E2 <: Expr](other: E2) = {
- new AndExpr(this, other)
- }
-
- def ||[E2 <: Expr](other: E2) = {
- new OrExpr(this, other)
- }
-
- def test(other: Expr): Expr = {
- return this
- }
-}
-
-class ValueExpr[ValueT <: InfluxValue](val value: ValueT) extends Expr {
- override def toString(): String = value.toString
-}
-
-final object BinaryOp {
- val OP_EQ = "="
- val OP_NE = "!="
- val OP_GT = ">"
- val OP_GE = ">="
- val OP_LT = "<"
- val OP_LE = "<="
-}
-
-class BinaryOp[E1 <: E, E2 <: E](val str: String, val op1: E1, val op2: E2) extends Expr {
- override def toString(): String = {
- "(" + op1.toString() + " " + str + " " + op2.toString() + ")"
- }
-}
-
-class AndExpr[E1 <: E, E2 <: E](op1: E1, op2: E2)
- extends BinaryOp[E1, E2]("AND", op1, op2)
-
-class OrExpr[E1 <: E, E2 <: E](op1: E1, op2: E2)
- extends BinaryOp[E1, E2]("OR", op1, op2)
-
-trait InfluxValue {
- type JavaT
- def toString(): String
-}
-
-class InfluxString(val value: String) extends Expr with InfluxValue {
- type JavaT = String
- override def toString(): String = {
- "'" + value.replace("\\", "\\\\").replace("'", "\\'") + "'" // TODO
- }
-}
-
-class InfluxNumber(val value: Number) extends Expr with InfluxValue {
- type JavaT = Number
-
- override def toString(): String = {
- value.toString
- }
-}
-
-class InfluxDate(val value: Date) extends Expr with InfluxValue {
- type JavaT = Date
-
- override def toString(): String = {
- val time: Long = value.getTime() / 1000
- s"${time}s"
- }
-
- // TODO fromUTC (String "2015-07-29T10:23:22.991260576Z")
- // and fromTimestamp (String "NNNNs")
-}
-
-class InfluxIdentifier(val name: String) extends Expr {
- override def toString(): String = {
- "\"" + name.replace("\\", "\\\\").replace("\"", "\\\"") + "\""
- }
-
- def ==[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
- new BinaryOp(BinaryOp.OP_EQ, this, new ValueExpr[ValueT](value))
-
- def !=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
- new BinaryOp(BinaryOp.OP_NE, this, new ValueExpr[ValueT](value))
-
- def >[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
- new BinaryOp(BinaryOp.OP_GT, this, new ValueExpr[ValueT](value))
-
- def >=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
- new BinaryOp(BinaryOp.OP_GE, this, new ValueExpr[ValueT](value))
-
- def <[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
- new BinaryOp(BinaryOp.OP_LT, this, new ValueExpr[ValueT](value))
-
- def <=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) =
- new BinaryOp(BinaryOp.OP_LE, this, new ValueExpr[ValueT](value))
-
- implicit def influxIdentifier(s: String): InfluxIdentifier = new InfluxIdentifier(s)
-}
-
-case class JsonResult(series: List[JsonSeries], error: String) {
- def this(series: List[JsonSeries]) = this(series, null)
-}
-
-case class JsonSeries(name: String, columns: List[String], values: List[List[Any]]) {
- override def toString() = values.toString
-}
-
-class InfluxdbError(val serverMessage: String)
- extends Exception(s"Server responded: $serverMessage")
-
-trait ToSized[T, N <: Nat] extends Serializable {
- def apply(x: Seq[T]): Sized[IndexedSeq[T], Succ[N]]
- type Out <: Nat
-}
-
-object ToSizedX {
- def apply[T, N <: Nat](implicit toSized: ToSized[T, N]): ToSized[T, N] = toSized
-
- implicit def toSized0[T, N <: Nat] = new ToSized[T, Nat._0] {
- type Out = Nat._0
- def apply(x: Seq[T]) = Sized(x(0))
- }
-}
-
-trait ToFunction[N <: Nat] extends Serializable { self =>
- type Out[X]
- type Col = Sized[Seq[InfluxValue], N]
- def convert[B](f: self.Out[B], columns: Col): B
-}
-class toFunction0 extends ToFunction[Nat._0] {
- type Out[X] = Function1[String, X]
- override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString)
-}
-class toFunction1 extends ToFunction[Nat._1] {
- type Out[X] = Function2[String, InfluxValue, X]
- override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1))
-}
-class toFunction2 extends ToFunction[Nat._2] {
- type Out[X] = Function3[String, InfluxValue, InfluxValue, X]
- override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2))
-}
-class toFunction3 extends ToFunction[Nat._3] {
- type Out[X] = Function4[String, InfluxValue, InfluxValue, InfluxValue, X]
- override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2), columns(3))
-}
-object ToFunction {
- implicit def iToFunction0 = new toFunction0()
- implicit def iToFunction1 = new toFunction1()
- implicit def iToFunction2 = new toFunction2()
- implicit def iToFunction3 = new toFunction3()
-
- def toFunction[L <: Nat](implicit toF: ToFunction[L]) = toF
-}
-
-object Converter {
- def toSized[T, L <: Nat](x: Seq[T])(implicit toSizedF: ToSized[T, L]) = toSizedF(x)
-}
-
-class Mapper[L <: Nat, Z <: ToFunction[L]](influxdb: InfluxDB,
- query: QueryBuilderWithSelectAndFrom[L])(
- implicit toF: Z) {
- private def httpClient = HttpClients.createDefault()
- private def logger = Logger(LoggerFactory.getLogger(getClass))
-
- private def buildURI(): URI = {
- return new URIBuilder()
- .setScheme("http")
- .setHost(influxdb.host)
- .setPort(influxdb.port)
- .setPath("/query")
- .setParameter("u", influxdb.username)
- .setParameter("p", influxdb.password)
- .setParameter("db", influxdb.db)
- .setParameter("q", query.toString)
- .build
- }
-
- def map[B](f: toF.Out[B]): List[B] = {
- implicit val formats = DefaultFormats
- val httpGet = new HttpGet(buildURI())
-
- logger.debug("Query={}", query.toString())
- logger.debug("Call: {}", httpGet.getURI)
-
- val response = httpClient.execute(httpGet);
- try {
- if (response.getStatusLine().getStatusCode() != 200) {
- throw new InfluxdbError("Status code " + response.getStatusLine.getStatusCode)
- }
- val responseEntity = response.getEntity();
-
- val responseJson = parse(responseEntity.getContent)
- val results = (responseJson \ "results").extract[List[JsonResult]]
- val errors = results.filter { _.error != null }.map { _.error }
- if (errors.size > 0) {
- throw new InfluxdbError(errors.mkString(". "))
- }
- val res = results(0).series(0).values.map { x =>
- val cc: List[InfluxValue] = x.map { x => new InfluxString(x.toString()) }
- val z: Sized[Seq[InfluxValue], L] = Sized.wrap[List[InfluxValue], L](cc)
- toF.convert(f, z)
- }
- return res
- } finally {
- response.close();
- }
- }
-}
-
-case class InfluxDB(val host: String, val port: Int = 8086,
- val db: String = "data", val path: String = "query",
- val username: String = "", val password: String = "")
-
-private case class UriNameValue(val name: String, val value: String) {
- def this(nvp: NameValuePair) = this(nvp.getName, nvp.getValue)
-}
-
-object InfluxDB {
- def SELECT(column: String) = {
- new QueryBuilderSelect(Sized[List](influxIdentifier(column)))
- }
-
- implicit def influxIdentifier(s: String): InfluxIdentifier = new InfluxIdentifier(s)
-
- implicit def influxString(s: String) = new InfluxString(s)
-
- implicit def influxNumber(n: Int) = new InfluxNumber(n)
-
- implicit def influxNumber(n: Double) = new InfluxNumber(n)
-
- implicit def influxDate(n: Date): InfluxDate = new InfluxDate(n)
-
- def fromUrl(url: String): InfluxDB = {
- val uri = new URI(url)
- var influxdb = new InfluxDB(uri.getHost)
- URLEncodedUtils.parse(uri, "UTF-8").toList
- .map(new UriNameValue(_)).foreach { x =>
- x match {
- case UriNameValue("db", db) => influxdb = influxdb.copy(db = db)
- case UriNameValue("username", username) => influxdb = influxdb.copy(username = username)
- case UriNameValue("password", password) => influxdb = influxdb.copy(password = password)
- case _ =>
- }
- }
- if (uri.getPort != -1)
- influxdb = influxdb.copy(port = uri.getPort)
-
- if (uri.getPath.length() > 1)
- influxdb = influxdb.copy(path = uri.getPath)
- influxdb
- }
-
- def col(col: String) = new InfluxIdentifier(col)
-
- def time = new InfluxIdentifier("time")
-} \ No newline at end of file