summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/scala/org/xapek/influxdb/Influxdb.scala148
-rw-r--r--src/main/scala/org/xapek/influxdb/Main.scala51
-rw-r--r--src/test/scala/org/xapek/influxdb/InfluxdbTest.scala2
3 files changed, 148 insertions, 53 deletions
diff --git a/src/main/scala/org/xapek/influxdb/Influxdb.scala b/src/main/scala/org/xapek/influxdb/Influxdb.scala
index 7df8afa..ece64a0 100644
--- a/src/main/scala/org/xapek/influxdb/Influxdb.scala
+++ b/src/main/scala/org/xapek/influxdb/Influxdb.scala
@@ -19,6 +19,10 @@ import shapeless.Succ
import shapeless.ops.sized._
import shapeless.ops.nat._
import shapeless.ops._
+import java.net.URI
+import org.apache.http.client.utils.URLEncodedUtils
+import scala.collection.JavaConversions._
+import org.apache.http.NameValuePair
abstract class QueryBuilderWithSelectAndFrom[L <: Nat](
val columns: Sized[List[InfluxIdentifier], L],
@@ -69,10 +73,10 @@ class QueryBuilderGroupBy[L <: Nat, WhereT <: Expr](
extends QueryBuilderWithSelectAndFrom[L](where.columns, where.table) {
def apply(column: String) = new QueryBuilderGroupBy(where, groupBy :+ InfluxDB.influxIdentifier(column))
-// {
-// groupBy = groupBy :+ InfluxDB.influxIdentifier(column)
-// this
-// }
+ // {
+ // groupBy = groupBy :+ InfluxDB.influxIdentifier(column)
+ // this
+ // }
override def toString() = where.toString() + " GROUP BY " + groupBy.mkString(", ")
}
@@ -183,37 +187,76 @@ case class JsonResult(series: List[JsonSeries], error: String) {
}
case class JsonSeries(name: String, columns: List[String], values: List[List[Any]]) {
- override def toString() = values.map { _.map { _.getClass() } }.mkString(", ")
+ override def toString() = values.toString() //map { _.map { _.getClass() } }.mkString(", ")
}
class InfluxdbError(val serverMessage: String)
extends Exception(s"Server responded: $serverMessage")
-class InfluxdbExecutor[L <: Nat](
- connection: InfluxDB,
- query: QueryBuilderWithSelectAndFrom[L])(implicit toInt: ToInt[L]) {
- private val httpClient = HttpClients.createDefault();
- private val logger = Logger(LoggerFactory.getLogger(getClass))
- private val uriBuilder = new URIBuilder()
- .setScheme("http")
- .setHost(connection.host)
- .setPort(connection.port)
- .setPath("/query")
- .setParameter("u", connection.username)
- .setParameter("p", connection.password)
- .setParameter("db", connection.database)
-
- def map[B, That](f: Sized[InfluxValue, L] => B): Seq[String] = {
- implicit val formats = DefaultFormats
+trait ToSized[T, N <: Nat] extends Serializable {
+ def apply(x: Seq[T]): Sized[IndexedSeq[T], Succ[N]]
+ type Out <: Nat
+}
- //{"results":[
- // {"series":[
- // {"name":"esg",
- // "columns":["time","product_name","sku","price"],
- // "values":[
- // ["2015-06-05T09:30:13.084815572Z","GoldXXXeus","asd",3400.73],
- // ["2015-06-05T17:00:13.888788593Z","GoldXXXaeus","asd",3425.29]]}]}]}
- uriBuilder.setParameter("q", query.toString())
+object ToSizedX {
+ def apply[T, N <: Nat](implicit toSized: ToSized[T, N]): ToSized[T, N] = toSized
+
+ implicit def toSized0[T, N <: Nat] = new ToSized[T, Nat._0] {
+ type Out = Nat._0
+ def apply(x: Seq[T]) = Sized(x(0))
+ }
+}
+
+trait ToFunction[N <: Nat] extends Serializable { self =>
+ type Out[X]
+ type Col = Sized[Seq[InfluxValue], N]
+ def convert[B](f: self.Out[B], columns: Col): B
+}
+class toFunction0 extends ToFunction[Nat._0] {
+ type Out[X] = Function0[X]
+ override def convert[B](f: Out[B], columns: Col): B = f()
+}
+class toFunction1 extends ToFunction[Nat._1] {
+ type Out[X] = Function1[InfluxValue, X]
+ override def convert[B](f: Out[B], columns: Col): B = f(columns(0))
+}
+class toFunction2 extends ToFunction[Nat._2] {
+ type Out[X] = Function2[InfluxValue, InfluxValue, X]
+ override def convert[B](f: Out[B], columns: Col): B = f(columns(0), columns(1))
+}
+class ToFunction3 extends ToFunction[Nat._3] {
+ type Out[X] = Function3[InfluxValue, InfluxValue, InfluxValue, X]
+ override def convert[B](f: Out[B], columns: Col): B = f(columns(0), columns(1), columns(2))
+}
+object ToFunction {
+ implicit def iToFunction0 = new toFunction0()
+ implicit def iToFunction1 = new toFunction1()
+ implicit def iToFunction2 = new toFunction2()
+ implicit def iToFunction3 = new ToFunction3()
+
+ def toFunction[L <: Nat](implicit toF: ToFunction[L]) = toF
+}
+
+object Converter {
+ def toSized[T, L <: Nat](x: Seq[T])(implicit toSizedF: ToSized[T, L]) = toSizedF(x)
+}
+
+class Mapper[L <: Nat, Z <: ToFunction[L]](influxdb: InfluxDB,
+ query: QueryBuilderWithSelectAndFrom[L])(
+ implicit toF: Z) {
+ def map[B](f: toF.Out[B]): List[B] = {
+ implicit val formats = DefaultFormats
+ val httpClient = HttpClients.createDefault();
+ val logger = Logger(LoggerFactory.getLogger(getClass))
+ val uriBuilder = new URIBuilder()
+ .setScheme("http")
+ .setHost(influxdb.host)
+ .setPort(influxdb.port)
+ .setPath("/query")
+ .setParameter("u", influxdb.username)
+ .setParameter("p", influxdb.password)
+ .setParameter("db", influxdb.db)
+ .setParameter("q", query.toString())
val httpGet = new HttpGet(uriBuilder.build())
logger.debug("Query={}", query.toString())
@@ -230,27 +273,24 @@ class InfluxdbExecutor[L <: Nat](
if (errors.size > 0) {
throw new InfluxdbError(errors.mkString(". "))
}
- println(results)
+ val res = results(0).series(0).values.map { x =>
+ val cc: List[InfluxValue] = x.map { x => new InfluxString(x.toString()) }
+ val z: Sized[Seq[InfluxValue], L] = Sized.wrap[List[InfluxValue], L](cc)
+ toF.convert(f, z)
+ }
+ return res
} finally {
response1.close();
}
-
- val columns = query.columns
- println(query.count())
- // toIntSucc(columns)
- //collection.mutable.LinkedList
- List(columns.toString())
}
}
-class InfluxDB(val database: String,
- val host: String,
- val port: Int = 8086,
- val username: String = "",
- val password: String = "") {
- def apply[L <: Nat](q: QueryBuilderWithSelectAndFrom[L])(implicit toInt: ToInt[L]) = {
- new InfluxdbExecutor[L](this, q)
- }
+case class InfluxDB(val host: String, val port: Int = 8086,
+ val db: String = "data", val path: String = "query",
+ val username: String = "", val password: String = "")
+
+private case class UriNameValue(val name: String, val value: String) {
+ def this(nvp: NameValuePair) = this(nvp.getName, nvp.getValue)
}
object InfluxDB {
@@ -268,6 +308,26 @@ object InfluxDB {
implicit def influxDate(n: Date): InfluxDate = new InfluxDate(n)
+ implicit def influxDb(url: String): InfluxDB = {
+ val uri = new URI(url)
+ var influxdb = new InfluxDB(uri.getHost)
+ URLEncodedUtils.parse(uri, "UTF-8").toList
+ .map(new UriNameValue(_)).foreach { x =>
+ x match {
+ case UriNameValue("db", db) => influxdb = influxdb.copy(db = db)
+ case UriNameValue("username", username) => influxdb = influxdb.copy(username = username)
+ case UriNameValue("password", password) => influxdb = influxdb.copy(password = password)
+ case _ =>
+ }
+ }
+ if (uri.getPort != -1)
+ influxdb = influxdb.copy(port = uri.getPort)
+
+ if (uri.getPath.length() > 1)
+ influxdb = influxdb.copy(path = uri.getPath)
+ influxdb
+ }
+
def col(col: String) = new InfluxIdentifier(col)
def time = new InfluxIdentifier("time")
diff --git a/src/main/scala/org/xapek/influxdb/Main.scala b/src/main/scala/org/xapek/influxdb/Main.scala
index f2b5a65..10e8511 100644
--- a/src/main/scala/org/xapek/influxdb/Main.scala
+++ b/src/main/scala/org/xapek/influxdb/Main.scala
@@ -12,6 +12,10 @@ import java.util.Calendar
import shapeless.Nat
import shapeless.Nat._
import shapeless.ops.nat._
+import shapeless.ops.product.ToRecord
+import shapeless.Sized
+import shapeless.Succ
+import shapeless.SizedOps
case class Row(columns: Seq[String])
@@ -23,11 +27,35 @@ class GenericReader(query: String) extends Actor with ActorLogging {
}
//object MyReader extends GenericReader(Influxdb SELECT "value" FROM "test" WHERE col("time") == 30)
+//trait ToResult[N <: Nat] extends Serializable {
+// def apply(x: Int): Int
+//}
+
+//object ToResult {
+// def apply[N <: Nat](implicit toInt: ToResult[N]): ToResult[N] = toInt
+//
+// implicit val toInt0 = new ToResult[Nat._0] {
+// def apply(x: Int) = x
+// }
+// implicit def toIntSucc[N <: Nat](x: Int)(implicit toResultN: ToResult[N]) = new ToResult[Succ[N]] {
+// def apply(x: Int) = toResultN(x - 1) + x
+// }
+//}
object Main {
+ import shapeless._
+ import syntax.sized._
+
+ def process(x: Sized[List[InfluxValue], _]) =
+ (x(0), x(9)) // functor!
+ // Sized.sizedOps(x).apply(_0)
+
def main(args: Array[String]): Unit = {
- val db = new InfluxDB("data", "localhost")
+ val db: InfluxDB = "http://root:root@localhost:8086/query?db=data&user=root&password=root"
+ println(db)
+
+ val z = Sized(1)
val s2 = SELECT("foo")("asd")("ads") FROM ("asd") WHERE (col("asd") == "asd")
@@ -38,14 +66,22 @@ object Main {
calendar.set(Calendar.MINUTE, 0)
calendar.set(Calendar.SECOND, 0)
calendar.set(Calendar.MILLISECOND, 0)
- calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) - 2)
+ // calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) - 1)
+ val x: Function3[Int, Int, Int, String] = { (a: Int, b: Int, c: Int) => null }
// SELECT mean(price) FROM "esg" WHERE $timeFilter AND sku='01011072' GROUP BY time($interval) ORDER BY asc
- val s4 = SELECT("product_name")("sku")("price") FROM "esg" WHERE col("sku") == "01011072" && time > calendar.getTime
- val z = db(s4).map({ x => x })
- println(z)
-
- // val z = ToInt[s4.LT]()
+ val s4 = SELECT("product_name") FROM "esg" WHERE col("sku") == "01011072" && time > calendar.getTime
+
+ val xx = new Mapper(db, s4).map { x => "adads: " + x.toString() }
+ println(xx)
+
+ // println(mapper.map[String]({ (a: InfluxValue, b: InfluxValue, c: InfluxValue) => a + "+" + b + "+" + c }))
+
+ // val x = Converter.toSized[Int, _4](List(1,2,3))
+ // println(x)
+ // println(new Foo(s4.columns).count(List(1, 2, 3, 4, 5, 6)))
+
+ // val z = ToInt[s4.LT]()
// val foo = db(s4)
// .map({ x => x })
// println(foo)
@@ -58,7 +94,6 @@ object Main {
// system.shutdown()
//
// val x : Tuple2[Int,Int] = (1,2)
-
}
}
diff --git a/src/test/scala/org/xapek/influxdb/InfluxdbTest.scala b/src/test/scala/org/xapek/influxdb/InfluxdbTest.scala
index 2ec83c3..9284780 100644
--- a/src/test/scala/org/xapek/influxdb/InfluxdbTest.scala
+++ b/src/test/scala/org/xapek/influxdb/InfluxdbTest.scala
@@ -9,7 +9,7 @@ class AppTest {
@Test
def testStatements() = {
- val db = new InfluxDB("data", "localhost")
+ val db :InfluxDB = "http://localhost"
assertTrue((SELECT("foo") FROM "bla" WHERE col("a") == "asd")
.toString().contains("as"))