diff options
Diffstat (limited to 'src/main/scala/org/xapek/influxdb/Influxdb.scala')
-rw-r--r-- | src/main/scala/org/xapek/influxdb/Influxdb.scala | 341 |
1 files changed, 0 insertions, 341 deletions
diff --git a/src/main/scala/org/xapek/influxdb/Influxdb.scala b/src/main/scala/org/xapek/influxdb/Influxdb.scala deleted file mode 100644 index 172e85b..0000000 --- a/src/main/scala/org/xapek/influxdb/Influxdb.scala +++ /dev/null @@ -1,341 +0,0 @@ -package org.xapek.influxdb - -import java.util.Date -import java.net.URI -import scala.collection.JavaConversions._ -import org.slf4j.LoggerFactory -import org.json4s._ -import org.json4s.jackson.JsonMethods._ -import com.typesafe.scalalogging.Logger -import shapeless.Nat -import shapeless.Sized -import shapeless.Sized._ -import shapeless.syntax._ -import shapeless.syntax.NatOps -import shapeless.syntax.sized._ -import shapeless.Succ -import shapeless.ops._ -import shapeless.ops.nat._ -import shapeless.ops.sized._ -import org.apache.http.NameValuePair -import org.apache.http.client.utils.URLEncodedUtils -import org.apache.http.client.HttpClient -import org.apache.http.client.methods.HttpGet -import org.apache.http.client.utils.URIBuilder -import org.apache.http.impl.client.HttpClients -import org.apache.http.util.EntityUtils - -abstract class QueryBuilderWithSelectAndFrom[L <: Nat]( - val columns: Sized[List[InfluxIdentifier], L], - val table: InfluxIdentifier) { - - def count()(implicit toInt: ToInt[L]) = toInt() - - def count1()(implicit toInt: ToInt[Succ[L]]) = toInt() //length + 1 (for "time") - - def toString: String -} - -class QueryBuilderSelect[L <: Nat]( - val columns: Sized[List[InfluxIdentifier], L]) { - - def apply(column: String) = - new QueryBuilderSelect(columns :+ InfluxDB.influxIdentifier(column)) - - def FROM(table: String) = new QueryBuilderFrom(this, InfluxDB.influxIdentifier(table)) - - override def toString() = "SELECT " + columns.unsized.mkString(", ") -} - -class QueryBuilderFrom[L <: Nat]( - select: QueryBuilderSelect[L], - table: InfluxIdentifier) - extends QueryBuilderWithSelectAndFrom[L](select.columns, table) { - - def WHERE[WhereT <: Expr](where: WhereT) = - new QueryBuilderWhere(this, where) - - override def toString() = select.toString() + " FROM " + table -} - -class QueryBuilderWhere[L <: Nat, WhereT <: Expr]( - val from: QueryBuilderFrom[L], - val where: WhereT) - extends QueryBuilderWithSelectAndFrom[L](from.columns, from.table) { - - def GROUP_BY(column: String) = new QueryBuilderGroupBy(this, List(InfluxDB.influxIdentifier(column))) - - override def toString() = from.toString() + " WHERE " + where -} - -class QueryBuilderGroupBy[L <: Nat, WhereT <: Expr]( - where: QueryBuilderWhere[L, WhereT], - groupBy: Seq[InfluxIdentifier]) - extends QueryBuilderWithSelectAndFrom[L](where.columns, where.table) { - - def apply(column: String) = new QueryBuilderGroupBy(where, groupBy :+ InfluxDB.influxIdentifier(column)) - - override def toString() = where.toString() + " GROUP BY " + groupBy.mkString(", ") -} - -protected trait E { - def &&[E2 <: Expr](other: E2): AndExpr[this.type, E2] - def ||[E2 <: Expr](other: E2): OrExpr[this.type, E2] -} - -abstract class Expr extends E { - def toString(): String - - def &&[E2 <: Expr](other: E2) = { - new AndExpr(this, other) - } - - def ||[E2 <: Expr](other: E2) = { - new OrExpr(this, other) - } - - def test(other: Expr): Expr = { - return this - } -} - -class ValueExpr[ValueT <: InfluxValue](val value: ValueT) extends Expr { - override def toString(): String = value.toString -} - -final object BinaryOp { - val OP_EQ = "=" - val OP_NE = "!=" - val OP_GT = ">" - val OP_GE = ">=" - val OP_LT = "<" - val OP_LE = "<=" -} - -class BinaryOp[E1 <: E, E2 <: E](val str: String, val op1: E1, val op2: E2) extends Expr { - override def toString(): String = { - "(" + op1.toString() + " " + str + " " + op2.toString() + ")" - } -} - -class AndExpr[E1 <: E, E2 <: E](op1: E1, op2: E2) - extends BinaryOp[E1, E2]("AND", op1, op2) - -class OrExpr[E1 <: E, E2 <: E](op1: E1, op2: E2) - extends BinaryOp[E1, E2]("OR", op1, op2) - -trait InfluxValue { - type JavaT - def toString(): String -} - -class InfluxString(val value: String) extends Expr with InfluxValue { - type JavaT = String - override def toString(): String = { - "'" + value.replace("\\", "\\\\").replace("'", "\\'") + "'" // TODO - } -} - -class InfluxNumber(val value: Number) extends Expr with InfluxValue { - type JavaT = Number - - override def toString(): String = { - value.toString - } -} - -class InfluxDate(val value: Date) extends Expr with InfluxValue { - type JavaT = Date - - override def toString(): String = { - val time: Long = value.getTime() / 1000 - s"${time}s" - } - - // TODO fromUTC (String "2015-07-29T10:23:22.991260576Z") - // and fromTimestamp (String "NNNNs") -} - -class InfluxIdentifier(val name: String) extends Expr { - override def toString(): String = { - "\"" + name.replace("\\", "\\\\").replace("\"", "\\\"") + "\"" - } - - def ==[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = - new BinaryOp(BinaryOp.OP_EQ, this, new ValueExpr[ValueT](value)) - - def !=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = - new BinaryOp(BinaryOp.OP_NE, this, new ValueExpr[ValueT](value)) - - def >[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = - new BinaryOp(BinaryOp.OP_GT, this, new ValueExpr[ValueT](value)) - - def >=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = - new BinaryOp(BinaryOp.OP_GE, this, new ValueExpr[ValueT](value)) - - def <[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = - new BinaryOp(BinaryOp.OP_LT, this, new ValueExpr[ValueT](value)) - - def <=[ToValueT <% ValueT, ValueT <: InfluxValue](value: ToValueT) = - new BinaryOp(BinaryOp.OP_LE, this, new ValueExpr[ValueT](value)) - - implicit def influxIdentifier(s: String): InfluxIdentifier = new InfluxIdentifier(s) -} - -case class JsonResult(series: List[JsonSeries], error: String) { - def this(series: List[JsonSeries]) = this(series, null) -} - -case class JsonSeries(name: String, columns: List[String], values: List[List[Any]]) { - override def toString() = values.toString -} - -class InfluxdbError(val serverMessage: String) - extends Exception(s"Server responded: $serverMessage") - -trait ToSized[T, N <: Nat] extends Serializable { - def apply(x: Seq[T]): Sized[IndexedSeq[T], Succ[N]] - type Out <: Nat -} - -object ToSizedX { - def apply[T, N <: Nat](implicit toSized: ToSized[T, N]): ToSized[T, N] = toSized - - implicit def toSized0[T, N <: Nat] = new ToSized[T, Nat._0] { - type Out = Nat._0 - def apply(x: Seq[T]) = Sized(x(0)) - } -} - -trait ToFunction[N <: Nat] extends Serializable { self => - type Out[X] - type Col = Sized[Seq[InfluxValue], N] - def convert[B](f: self.Out[B], columns: Col): B -} -class toFunction0 extends ToFunction[Nat._0] { - type Out[X] = Function1[String, X] - override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString) -} -class toFunction1 extends ToFunction[Nat._1] { - type Out[X] = Function2[String, InfluxValue, X] - override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1)) -} -class toFunction2 extends ToFunction[Nat._2] { - type Out[X] = Function3[String, InfluxValue, InfluxValue, X] - override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2)) -} -class toFunction3 extends ToFunction[Nat._3] { - type Out[X] = Function4[String, InfluxValue, InfluxValue, InfluxValue, X] - override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2), columns(3)) -} -object ToFunction { - implicit def iToFunction0 = new toFunction0() - implicit def iToFunction1 = new toFunction1() - implicit def iToFunction2 = new toFunction2() - implicit def iToFunction3 = new toFunction3() - - def toFunction[L <: Nat](implicit toF: ToFunction[L]) = toF -} - -object Converter { - def toSized[T, L <: Nat](x: Seq[T])(implicit toSizedF: ToSized[T, L]) = toSizedF(x) -} - -class Mapper[L <: Nat, Z <: ToFunction[L]](influxdb: InfluxDB, - query: QueryBuilderWithSelectAndFrom[L])( - implicit toF: Z) { - private def httpClient = HttpClients.createDefault() - private def logger = Logger(LoggerFactory.getLogger(getClass)) - - private def buildURI(): URI = { - return new URIBuilder() - .setScheme("http") - .setHost(influxdb.host) - .setPort(influxdb.port) - .setPath("/query") - .setParameter("u", influxdb.username) - .setParameter("p", influxdb.password) - .setParameter("db", influxdb.db) - .setParameter("q", query.toString) - .build - } - - def map[B](f: toF.Out[B]): List[B] = { - implicit val formats = DefaultFormats - val httpGet = new HttpGet(buildURI()) - - logger.debug("Query={}", query.toString()) - logger.debug("Call: {}", httpGet.getURI) - - val response = httpClient.execute(httpGet); - try { - if (response.getStatusLine().getStatusCode() != 200) { - throw new InfluxdbError("Status code " + response.getStatusLine.getStatusCode) - } - val responseEntity = response.getEntity(); - - val responseJson = parse(responseEntity.getContent) - val results = (responseJson \ "results").extract[List[JsonResult]] - val errors = results.filter { _.error != null }.map { _.error } - if (errors.size > 0) { - throw new InfluxdbError(errors.mkString(". ")) - } - val res = results(0).series(0).values.map { x => - val cc: List[InfluxValue] = x.map { x => new InfluxString(x.toString()) } - val z: Sized[Seq[InfluxValue], L] = Sized.wrap[List[InfluxValue], L](cc) - toF.convert(f, z) - } - return res - } finally { - response.close(); - } - } -} - -case class InfluxDB(val host: String, val port: Int = 8086, - val db: String = "data", val path: String = "query", - val username: String = "", val password: String = "") - -private case class UriNameValue(val name: String, val value: String) { - def this(nvp: NameValuePair) = this(nvp.getName, nvp.getValue) -} - -object InfluxDB { - def SELECT(column: String) = { - new QueryBuilderSelect(Sized[List](influxIdentifier(column))) - } - - implicit def influxIdentifier(s: String): InfluxIdentifier = new InfluxIdentifier(s) - - implicit def influxString(s: String) = new InfluxString(s) - - implicit def influxNumber(n: Int) = new InfluxNumber(n) - - implicit def influxNumber(n: Double) = new InfluxNumber(n) - - implicit def influxDate(n: Date): InfluxDate = new InfluxDate(n) - - def fromUrl(url: String): InfluxDB = { - val uri = new URI(url) - var influxdb = new InfluxDB(uri.getHost) - URLEncodedUtils.parse(uri, "UTF-8").toList - .map(new UriNameValue(_)).foreach { x => - x match { - case UriNameValue("db", db) => influxdb = influxdb.copy(db = db) - case UriNameValue("username", username) => influxdb = influxdb.copy(username = username) - case UriNameValue("password", password) => influxdb = influxdb.copy(password = password) - case _ => - } - } - if (uri.getPort != -1) - influxdb = influxdb.copy(port = uri.getPort) - - if (uri.getPath.length() > 1) - influxdb = influxdb.copy(path = uri.getPath) - influxdb - } - - def col(col: String) = new InfluxIdentifier(col) - - def time = new InfluxIdentifier("time") -}
\ No newline at end of file |