diff options
Diffstat (limited to 'src/main/scala/org/xapek/influxdb')
-rw-r--r-- | src/main/scala/org/xapek/influxdb/Influxdb.scala | 148 | ||||
-rw-r--r-- | src/main/scala/org/xapek/influxdb/Main.scala | 51 |
2 files changed, 147 insertions, 52 deletions
diff --git a/src/main/scala/org/xapek/influxdb/Influxdb.scala b/src/main/scala/org/xapek/influxdb/Influxdb.scala index 7df8afa..ece64a0 100644 --- a/src/main/scala/org/xapek/influxdb/Influxdb.scala +++ b/src/main/scala/org/xapek/influxdb/Influxdb.scala @@ -19,6 +19,10 @@ import shapeless.Succ import shapeless.ops.sized._ import shapeless.ops.nat._ import shapeless.ops._ +import java.net.URI +import org.apache.http.client.utils.URLEncodedUtils +import scala.collection.JavaConversions._ +import org.apache.http.NameValuePair abstract class QueryBuilderWithSelectAndFrom[L <: Nat]( val columns: Sized[List[InfluxIdentifier], L], @@ -69,10 +73,10 @@ class QueryBuilderGroupBy[L <: Nat, WhereT <: Expr]( extends QueryBuilderWithSelectAndFrom[L](where.columns, where.table) { def apply(column: String) = new QueryBuilderGroupBy(where, groupBy :+ InfluxDB.influxIdentifier(column)) -// { -// groupBy = groupBy :+ InfluxDB.influxIdentifier(column) -// this -// } + // { + // groupBy = groupBy :+ InfluxDB.influxIdentifier(column) + // this + // } override def toString() = where.toString() + " GROUP BY " + groupBy.mkString(", ") } @@ -183,37 +187,76 @@ case class JsonResult(series: List[JsonSeries], error: String) { } case class JsonSeries(name: String, columns: List[String], values: List[List[Any]]) { - override def toString() = values.map { _.map { _.getClass() } }.mkString(", ") + override def toString() = values.toString() //map { _.map { _.getClass() } }.mkString(", ") } class InfluxdbError(val serverMessage: String) extends Exception(s"Server responded: $serverMessage") -class InfluxdbExecutor[L <: Nat]( - connection: InfluxDB, - query: QueryBuilderWithSelectAndFrom[L])(implicit toInt: ToInt[L]) { - private val httpClient = HttpClients.createDefault(); - private val logger = Logger(LoggerFactory.getLogger(getClass)) - private val uriBuilder = new URIBuilder() - .setScheme("http") - .setHost(connection.host) - .setPort(connection.port) - .setPath("/query") - .setParameter("u", connection.username) - .setParameter("p", connection.password) - .setParameter("db", connection.database) - - def map[B, That](f: Sized[InfluxValue, L] => B): Seq[String] = { - implicit val formats = DefaultFormats +trait ToSized[T, N <: Nat] extends Serializable { + def apply(x: Seq[T]): Sized[IndexedSeq[T], Succ[N]] + type Out <: Nat +} - //{"results":[ - // {"series":[ - // {"name":"esg", - // "columns":["time","product_name","sku","price"], - // "values":[ - // ["2015-06-05T09:30:13.084815572Z","GoldXXXeus","asd",3400.73], - // ["2015-06-05T17:00:13.888788593Z","GoldXXXaeus","asd",3425.29]]}]}]} - uriBuilder.setParameter("q", query.toString()) +object ToSizedX { + def apply[T, N <: Nat](implicit toSized: ToSized[T, N]): ToSized[T, N] = toSized + + implicit def toSized0[T, N <: Nat] = new ToSized[T, Nat._0] { + type Out = Nat._0 + def apply(x: Seq[T]) = Sized(x(0)) + } +} + +trait ToFunction[N <: Nat] extends Serializable { self => + type Out[X] + type Col = Sized[Seq[InfluxValue], N] + def convert[B](f: self.Out[B], columns: Col): B +} +class toFunction0 extends ToFunction[Nat._0] { + type Out[X] = Function0[X] + override def convert[B](f: Out[B], columns: Col): B = f() +} +class toFunction1 extends ToFunction[Nat._1] { + type Out[X] = Function1[InfluxValue, X] + override def convert[B](f: Out[B], columns: Col): B = f(columns(0)) +} +class toFunction2 extends ToFunction[Nat._2] { + type Out[X] = Function2[InfluxValue, InfluxValue, X] + override def convert[B](f: Out[B], columns: Col): B = f(columns(0), columns(1)) +} +class ToFunction3 extends ToFunction[Nat._3] { + type Out[X] = Function3[InfluxValue, InfluxValue, InfluxValue, X] + override def convert[B](f: Out[B], columns: Col): B = f(columns(0), columns(1), columns(2)) +} +object ToFunction { + implicit def iToFunction0 = new toFunction0() + implicit def iToFunction1 = new toFunction1() + implicit def iToFunction2 = new toFunction2() + implicit def iToFunction3 = new ToFunction3() + + def toFunction[L <: Nat](implicit toF: ToFunction[L]) = toF +} + +object Converter { + def toSized[T, L <: Nat](x: Seq[T])(implicit toSizedF: ToSized[T, L]) = toSizedF(x) +} + +class Mapper[L <: Nat, Z <: ToFunction[L]](influxdb: InfluxDB, + query: QueryBuilderWithSelectAndFrom[L])( + implicit toF: Z) { + def map[B](f: toF.Out[B]): List[B] = { + implicit val formats = DefaultFormats + val httpClient = HttpClients.createDefault(); + val logger = Logger(LoggerFactory.getLogger(getClass)) + val uriBuilder = new URIBuilder() + .setScheme("http") + .setHost(influxdb.host) + .setPort(influxdb.port) + .setPath("/query") + .setParameter("u", influxdb.username) + .setParameter("p", influxdb.password) + .setParameter("db", influxdb.db) + .setParameter("q", query.toString()) val httpGet = new HttpGet(uriBuilder.build()) logger.debug("Query={}", query.toString()) @@ -230,27 +273,24 @@ class InfluxdbExecutor[L <: Nat]( if (errors.size > 0) { throw new InfluxdbError(errors.mkString(". ")) } - println(results) + val res = results(0).series(0).values.map { x => + val cc: List[InfluxValue] = x.map { x => new InfluxString(x.toString()) } + val z: Sized[Seq[InfluxValue], L] = Sized.wrap[List[InfluxValue], L](cc) + toF.convert(f, z) + } + return res } finally { response1.close(); } - - val columns = query.columns - println(query.count()) - // toIntSucc(columns) - //collection.mutable.LinkedList - List(columns.toString()) } } -class InfluxDB(val database: String, - val host: String, - val port: Int = 8086, - val username: String = "", - val password: String = "") { - def apply[L <: Nat](q: QueryBuilderWithSelectAndFrom[L])(implicit toInt: ToInt[L]) = { - new InfluxdbExecutor[L](this, q) - } +case class InfluxDB(val host: String, val port: Int = 8086, + val db: String = "data", val path: String = "query", + val username: String = "", val password: String = "") + +private case class UriNameValue(val name: String, val value: String) { + def this(nvp: NameValuePair) = this(nvp.getName, nvp.getValue) } object InfluxDB { @@ -268,6 +308,26 @@ object InfluxDB { implicit def influxDate(n: Date): InfluxDate = new InfluxDate(n) + implicit def influxDb(url: String): InfluxDB = { + val uri = new URI(url) + var influxdb = new InfluxDB(uri.getHost) + URLEncodedUtils.parse(uri, "UTF-8").toList + .map(new UriNameValue(_)).foreach { x => + x match { + case UriNameValue("db", db) => influxdb = influxdb.copy(db = db) + case UriNameValue("username", username) => influxdb = influxdb.copy(username = username) + case UriNameValue("password", password) => influxdb = influxdb.copy(password = password) + case _ => + } + } + if (uri.getPort != -1) + influxdb = influxdb.copy(port = uri.getPort) + + if (uri.getPath.length() > 1) + influxdb = influxdb.copy(path = uri.getPath) + influxdb + } + def col(col: String) = new InfluxIdentifier(col) def time = new InfluxIdentifier("time") diff --git a/src/main/scala/org/xapek/influxdb/Main.scala b/src/main/scala/org/xapek/influxdb/Main.scala index f2b5a65..10e8511 100644 --- a/src/main/scala/org/xapek/influxdb/Main.scala +++ b/src/main/scala/org/xapek/influxdb/Main.scala @@ -12,6 +12,10 @@ import java.util.Calendar import shapeless.Nat import shapeless.Nat._ import shapeless.ops.nat._ +import shapeless.ops.product.ToRecord +import shapeless.Sized +import shapeless.Succ +import shapeless.SizedOps case class Row(columns: Seq[String]) @@ -23,11 +27,35 @@ class GenericReader(query: String) extends Actor with ActorLogging { } //object MyReader extends GenericReader(Influxdb SELECT "value" FROM "test" WHERE col("time") == 30) +//trait ToResult[N <: Nat] extends Serializable { +// def apply(x: Int): Int +//} + +//object ToResult { +// def apply[N <: Nat](implicit toInt: ToResult[N]): ToResult[N] = toInt +// +// implicit val toInt0 = new ToResult[Nat._0] { +// def apply(x: Int) = x +// } +// implicit def toIntSucc[N <: Nat](x: Int)(implicit toResultN: ToResult[N]) = new ToResult[Succ[N]] { +// def apply(x: Int) = toResultN(x - 1) + x +// } +//} object Main { + import shapeless._ + import syntax.sized._ + + def process(x: Sized[List[InfluxValue], _]) = + (x(0), x(9)) // functor! + // Sized.sizedOps(x).apply(_0) + def main(args: Array[String]): Unit = { - val db = new InfluxDB("data", "localhost") + val db: InfluxDB = "http://root:root@localhost:8086/query?db=data&user=root&password=root" + println(db) + + val z = Sized(1) val s2 = SELECT("foo")("asd")("ads") FROM ("asd") WHERE (col("asd") == "asd") @@ -38,14 +66,22 @@ object Main { calendar.set(Calendar.MINUTE, 0) calendar.set(Calendar.SECOND, 0) calendar.set(Calendar.MILLISECOND, 0) - calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) - 2) + // calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) - 1) + val x: Function3[Int, Int, Int, String] = { (a: Int, b: Int, c: Int) => null } // SELECT mean(price) FROM "esg" WHERE $timeFilter AND sku='01011072' GROUP BY time($interval) ORDER BY asc - val s4 = SELECT("product_name")("sku")("price") FROM "esg" WHERE col("sku") == "01011072" && time > calendar.getTime - val z = db(s4).map({ x => x }) - println(z) - - // val z = ToInt[s4.LT]() + val s4 = SELECT("product_name") FROM "esg" WHERE col("sku") == "01011072" && time > calendar.getTime + + val xx = new Mapper(db, s4).map { x => "adads: " + x.toString() } + println(xx) + + // println(mapper.map[String]({ (a: InfluxValue, b: InfluxValue, c: InfluxValue) => a + "+" + b + "+" + c })) + + // val x = Converter.toSized[Int, _4](List(1,2,3)) + // println(x) + // println(new Foo(s4.columns).count(List(1, 2, 3, 4, 5, 6))) + + // val z = ToInt[s4.LT]() // val foo = db(s4) // .map({ x => x }) // println(foo) @@ -58,7 +94,6 @@ object Main { // system.shutdown() // // val x : Tuple2[Int,Int] = (1,2) - } } |