summaryrefslogtreecommitdiff
path: root/src/main/scala/org/xapek/influxdb/mapper.scala
diff options
context:
space:
mode:
authorYves Fischer <yvesf-git@xapek.org>2015-08-03 00:18:20 +0200
committerYves Fischer <yvesf-git@xapek.org>2015-08-03 00:18:20 +0200
commitdcabe1a774ac3ab6eee7e837aa09ee26b60e8b82 (patch)
tree1647576f1d327c8720acda6dc489e051402aa6e9 /src/main/scala/org/xapek/influxdb/mapper.scala
parentab3a129bfd7572d6d3d9457b5ded57c1a811f748 (diff)
downloadinfluxdb-tools-master.tar.gz
influxdb-tools-master.zip
Restructure into separate filesHEADmaster
Diffstat (limited to 'src/main/scala/org/xapek/influxdb/mapper.scala')
-rw-r--r--src/main/scala/org/xapek/influxdb/mapper.scala110
1 files changed, 110 insertions, 0 deletions
diff --git a/src/main/scala/org/xapek/influxdb/mapper.scala b/src/main/scala/org/xapek/influxdb/mapper.scala
new file mode 100644
index 0000000..fc0345b
--- /dev/null
+++ b/src/main/scala/org/xapek/influxdb/mapper.scala
@@ -0,0 +1,110 @@
+package org.xapek.influxdb
+
+import java.net.URI
+import shapeless.Nat
+import shapeless.Sized
+import com.typesafe.scalalogging.Logger
+import org.json4s.DefaultFormats
+import org.json4s.jackson.JsonMethods
+import org.apache.http.NameValuePair
+import org.apache.http.client.utils.URLEncodedUtils
+import org.apache.http.client.HttpClient
+import org.apache.http.client.methods.HttpGet
+import org.apache.http.client.utils.URIBuilder
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.util.EntityUtils
+import org.slf4j.LoggerFactory
+
+import org.xapek.influxdb.types.InfluxValue
+import org.xapek.influxdb.types.InfluxString
+
+private case class JsonResult(series: List[JsonSeries], error: String) {
+ def this(series: List[JsonSeries]) = this(series, null)
+}
+
+private[influxdb] case class JsonSeries(name: String, columns: List[String], values: List[List[Any]]) {
+ override def toString() = values.toString
+}
+
+private[influxdb] trait ToFunction[N <: Nat] extends Serializable { self =>
+ type Out[X]
+ type Col = Sized[Seq[InfluxValue], N]
+ def convert[B](f: self.Out[B], columns: Col): B
+}
+
+private[influxdb] class toFunction0 extends ToFunction[Nat._0] {
+ type Out[X] = Function1[String, X]
+ override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString)
+}
+private[influxdb] class toFunction1 extends ToFunction[Nat._1] {
+ type Out[X] = Function2[String, InfluxValue, X]
+ override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1))
+}
+private[influxdb] class toFunction2 extends ToFunction[Nat._2] {
+ type Out[X] = Function3[String, InfluxValue, InfluxValue, X]
+ override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2))
+}
+private[influxdb] class toFunction3 extends ToFunction[Nat._3] {
+ type Out[X] = Function4[String, InfluxValue, InfluxValue, InfluxValue, X]
+ override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2), columns(3))
+}
+private[influxdb] object ToFunction {
+ implicit def iToFunction0 = new toFunction0()
+ implicit def iToFunction1 = new toFunction1()
+ implicit def iToFunction2 = new toFunction2()
+ implicit def iToFunction3 = new toFunction3()
+
+ def toFunction[L <: Nat](implicit toF: ToFunction[L]) = toF
+}
+
+
+class Mapper[L <: Nat, Z <: ToFunction[L]](influxdb: InfluxDB,
+ query: QueryBuilderWithSelectAndFrom[L])(
+ implicit toF: Z) {
+ private def httpClient = HttpClients.createDefault()
+ private def logger = Logger(LoggerFactory.getLogger(getClass))
+
+ private def buildURI(): URI = {
+ return new URIBuilder()
+ .setScheme("http")
+ .setHost(influxdb.host)
+ .setPort(influxdb.port)
+ .setPath("/query")
+ .setParameter("u", influxdb.username)
+ .setParameter("p", influxdb.password)
+ .setParameter("db", influxdb.db)
+ .setParameter("q", query.toString)
+ .build
+ }
+
+ def map[B](f: toF.Out[B]): List[B] = {
+ implicit val formats = DefaultFormats
+ val httpGet = new HttpGet(buildURI())
+
+ logger.debug("Query={}", query.toString())
+ logger.debug("Call: {}", httpGet.getURI)
+
+ val response = httpClient.execute(httpGet);
+ try {
+ if (response.getStatusLine().getStatusCode() != 200) {
+ throw new InfluxdbError("Status code " + response.getStatusLine.getStatusCode)
+ }
+ val responseEntity = response.getEntity();
+
+ val responseJson = JsonMethods.parse(responseEntity.getContent)
+ val results = (responseJson \ "results").extract[List[JsonResult]]
+ val errors = results.filter { _.error != null }.map { _.error }
+ if (errors.size > 0) {
+ throw new InfluxdbError(errors.mkString(". "))
+ }
+ val res = results(0).series(0).values.map { x =>
+ val cc: List[InfluxValue] = x.map { x => new InfluxString(x.toString()) }
+ val z: Sized[Seq[InfluxValue], L] = Sized.wrap[List[InfluxValue], L](cc)
+ toF.convert(f, z)
+ }
+ return res
+ } finally {
+ response.close();
+ }
+ }
+}