diff options
author | Yves Fischer <yvesf-git@xapek.org> | 2015-08-03 00:18:20 +0200 |
---|---|---|
committer | Yves Fischer <yvesf-git@xapek.org> | 2015-08-03 00:18:20 +0200 |
commit | dcabe1a774ac3ab6eee7e837aa09ee26b60e8b82 (patch) | |
tree | 1647576f1d327c8720acda6dc489e051402aa6e9 /src/main/scala/org/xapek/influxdb/mapper.scala | |
parent | ab3a129bfd7572d6d3d9457b5ded57c1a811f748 (diff) | |
download | influxdb-tools-master.tar.gz influxdb-tools-master.zip |
Diffstat (limited to 'src/main/scala/org/xapek/influxdb/mapper.scala')
-rw-r--r-- | src/main/scala/org/xapek/influxdb/mapper.scala | 110 |
1 files changed, 110 insertions, 0 deletions
diff --git a/src/main/scala/org/xapek/influxdb/mapper.scala b/src/main/scala/org/xapek/influxdb/mapper.scala new file mode 100644 index 0000000..fc0345b --- /dev/null +++ b/src/main/scala/org/xapek/influxdb/mapper.scala @@ -0,0 +1,110 @@ +package org.xapek.influxdb + +import java.net.URI +import shapeless.Nat +import shapeless.Sized +import com.typesafe.scalalogging.Logger +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods +import org.apache.http.NameValuePair +import org.apache.http.client.utils.URLEncodedUtils +import org.apache.http.client.HttpClient +import org.apache.http.client.methods.HttpGet +import org.apache.http.client.utils.URIBuilder +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils +import org.slf4j.LoggerFactory + +import org.xapek.influxdb.types.InfluxValue +import org.xapek.influxdb.types.InfluxString + +private case class JsonResult(series: List[JsonSeries], error: String) { + def this(series: List[JsonSeries]) = this(series, null) +} + +private[influxdb] case class JsonSeries(name: String, columns: List[String], values: List[List[Any]]) { + override def toString() = values.toString +} + +private[influxdb] trait ToFunction[N <: Nat] extends Serializable { self => + type Out[X] + type Col = Sized[Seq[InfluxValue], N] + def convert[B](f: self.Out[B], columns: Col): B +} + +private[influxdb] class toFunction0 extends ToFunction[Nat._0] { + type Out[X] = Function1[String, X] + override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString) +} +private[influxdb] class toFunction1 extends ToFunction[Nat._1] { + type Out[X] = Function2[String, InfluxValue, X] + override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1)) +} +private[influxdb] class toFunction2 extends ToFunction[Nat._2] { + type Out[X] = Function3[String, InfluxValue, InfluxValue, X] + override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2)) +} +private[influxdb] class toFunction3 extends ToFunction[Nat._3] { + type Out[X] = Function4[String, InfluxValue, InfluxValue, InfluxValue, X] + override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2), columns(3)) +} +private[influxdb] object ToFunction { + implicit def iToFunction0 = new toFunction0() + implicit def iToFunction1 = new toFunction1() + implicit def iToFunction2 = new toFunction2() + implicit def iToFunction3 = new toFunction3() + + def toFunction[L <: Nat](implicit toF: ToFunction[L]) = toF +} + + +class Mapper[L <: Nat, Z <: ToFunction[L]](influxdb: InfluxDB, + query: QueryBuilderWithSelectAndFrom[L])( + implicit toF: Z) { + private def httpClient = HttpClients.createDefault() + private def logger = Logger(LoggerFactory.getLogger(getClass)) + + private def buildURI(): URI = { + return new URIBuilder() + .setScheme("http") + .setHost(influxdb.host) + .setPort(influxdb.port) + .setPath("/query") + .setParameter("u", influxdb.username) + .setParameter("p", influxdb.password) + .setParameter("db", influxdb.db) + .setParameter("q", query.toString) + .build + } + + def map[B](f: toF.Out[B]): List[B] = { + implicit val formats = DefaultFormats + val httpGet = new HttpGet(buildURI()) + + logger.debug("Query={}", query.toString()) + logger.debug("Call: {}", httpGet.getURI) + + val response = httpClient.execute(httpGet); + try { + if (response.getStatusLine().getStatusCode() != 200) { + throw new InfluxdbError("Status code " + response.getStatusLine.getStatusCode) + } + val responseEntity = response.getEntity(); + + val responseJson = JsonMethods.parse(responseEntity.getContent) + val results = (responseJson \ "results").extract[List[JsonResult]] + val errors = results.filter { _.error != null }.map { _.error } + if (errors.size > 0) { + throw new InfluxdbError(errors.mkString(". ")) + } + val res = results(0).series(0).values.map { x => + val cc: List[InfluxValue] = x.map { x => new InfluxString(x.toString()) } + val z: Sized[Seq[InfluxValue], L] = Sized.wrap[List[InfluxValue], L](cc) + toF.convert(f, z) + } + return res + } finally { + response.close(); + } + } +} |