package org.xapek.influxdb import java.net.URI import shapeless.Nat import shapeless.Sized import com.typesafe.scalalogging.Logger import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods import org.apache.http.NameValuePair import org.apache.http.client.utils.URLEncodedUtils import org.apache.http.client.HttpClient import org.apache.http.client.methods.HttpGet import org.apache.http.client.utils.URIBuilder import org.apache.http.impl.client.HttpClients import org.apache.http.util.EntityUtils import org.slf4j.LoggerFactory import org.xapek.influxdb.types.InfluxValue import org.xapek.influxdb.types.InfluxString private case class JsonResult(series: List[JsonSeries], error: String) { def this(series: List[JsonSeries]) = this(series, null) } private[influxdb] case class JsonSeries(name: String, columns: List[String], values: List[List[Any]]) { override def toString() = values.toString } private[influxdb] trait ToFunction[N <: Nat] extends Serializable { self => type Out[X] type Col = Sized[Seq[InfluxValue], N] def convert[B](f: self.Out[B], columns: Col): B } private[influxdb] class toFunction0 extends ToFunction[Nat._0] { type Out[X] = Function1[String, X] override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString) } private[influxdb] class toFunction1 extends ToFunction[Nat._1] { type Out[X] = Function2[String, InfluxValue, X] override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1)) } private[influxdb] class toFunction2 extends ToFunction[Nat._2] { type Out[X] = Function3[String, InfluxValue, InfluxValue, X] override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2)) } private[influxdb] class toFunction3 extends ToFunction[Nat._3] { type Out[X] = Function4[String, InfluxValue, InfluxValue, InfluxValue, X] override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2), columns(3)) } private[influxdb] object ToFunction { implicit def iToFunction0 = new toFunction0() implicit def iToFunction1 = new toFunction1() implicit def iToFunction2 = new toFunction2() implicit def iToFunction3 = new toFunction3() def toFunction[L <: Nat](implicit toF: ToFunction[L]) = toF } class Mapper[L <: Nat, Z <: ToFunction[L]](influxdb: InfluxDB, query: QueryBuilderWithSelectAndFrom[L])( implicit toF: Z) { private def httpClient = HttpClients.createDefault() private def logger = Logger(LoggerFactory.getLogger(getClass)) private def buildURI(): URI = { return new URIBuilder() .setScheme("http") .setHost(influxdb.host) .setPort(influxdb.port) .setPath("/query") .setParameter("u", influxdb.username) .setParameter("p", influxdb.password) .setParameter("db", influxdb.db) .setParameter("q", query.toString) .build } def map[B](f: toF.Out[B]): List[B] = { implicit val formats = DefaultFormats val httpGet = new HttpGet(buildURI()) logger.debug("Query={}", query.toString()) logger.debug("Call: {}", httpGet.getURI) val response = httpClient.execute(httpGet); try { if (response.getStatusLine().getStatusCode() != 200) { throw new InfluxdbError("Status code " + response.getStatusLine.getStatusCode) } val responseEntity = response.getEntity(); val responseJson = JsonMethods.parse(responseEntity.getContent) val results = (responseJson \ "results").extract[List[JsonResult]] val errors = results.filter { _.error != null }.map { _.error } if (errors.size > 0) { throw new InfluxdbError(errors.mkString(". ")) } val res = results(0).series(0).values.map { x => val cc: List[InfluxValue] = x.map { x => new InfluxString(x.toString()) } val z: Sized[Seq[InfluxValue], L] = Sized.wrap[List[InfluxValue], L](cc) toF.convert(f, z) } return res } finally { response.close(); } } }