summaryrefslogtreecommitdiff
path: root/src/main/scala/org/xapek/influxdb/mapper.scala
blob: fc0345bc98c458a1cbf1286640e026bac7af8bfd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package org.xapek.influxdb

import java.net.URI
import shapeless.Nat
import shapeless.Sized
import com.typesafe.scalalogging.Logger
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods
import org.apache.http.NameValuePair
import org.apache.http.client.utils.URLEncodedUtils
import org.apache.http.client.HttpClient
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.utils.URIBuilder
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
import org.slf4j.LoggerFactory

import org.xapek.influxdb.types.InfluxValue
import org.xapek.influxdb.types.InfluxString

private case class JsonResult(series: List[JsonSeries], error: String) {
  def this(series: List[JsonSeries]) = this(series, null)
}

private[influxdb] case class JsonSeries(name: String, columns: List[String], values: List[List[Any]]) {
  override def toString() = values.toString
}

private[influxdb] trait ToFunction[N <: Nat] extends Serializable { self =>
  type Out[X]
  type Col = Sized[Seq[InfluxValue], N]
  def convert[B](f: self.Out[B], columns: Col): B
}

private[influxdb] class toFunction0 extends ToFunction[Nat._0] {
  type Out[X] = Function1[String, X]
  override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString)
}
private[influxdb] class toFunction1 extends ToFunction[Nat._1] {
  type Out[X] = Function2[String, InfluxValue, X]
  override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1))
}
private[influxdb] class toFunction2 extends ToFunction[Nat._2] {
  type Out[X] = Function3[String, InfluxValue, InfluxValue, X]
  override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2))
}
private[influxdb] class toFunction3 extends ToFunction[Nat._3] {
  type Out[X] = Function4[String, InfluxValue, InfluxValue, InfluxValue, X]
  override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2), columns(3))
}
private[influxdb] object ToFunction {
  implicit def iToFunction0 = new toFunction0()
  implicit def iToFunction1 = new toFunction1()
  implicit def iToFunction2 = new toFunction2()
  implicit def iToFunction3 = new toFunction3()

  def toFunction[L <: Nat](implicit toF: ToFunction[L]) = toF
}


class Mapper[L <: Nat, Z <: ToFunction[L]](influxdb: InfluxDB,
                                           query: QueryBuilderWithSelectAndFrom[L])(
                                               implicit toF: Z) {
  private def httpClient = HttpClients.createDefault()
  private def logger = Logger(LoggerFactory.getLogger(getClass))

  private def buildURI(): URI = {
    return new URIBuilder()
      .setScheme("http")
      .setHost(influxdb.host)
      .setPort(influxdb.port)
      .setPath("/query")
      .setParameter("u", influxdb.username)
      .setParameter("p", influxdb.password)
      .setParameter("db", influxdb.db)
      .setParameter("q", query.toString)
      .build
  }

  def map[B](f: toF.Out[B]): List[B] = {
    implicit val formats = DefaultFormats
    val httpGet = new HttpGet(buildURI())

    logger.debug("Query={}", query.toString())
    logger.debug("Call: {}", httpGet.getURI)

    val response = httpClient.execute(httpGet);
    try {
      if (response.getStatusLine().getStatusCode() != 200) {
        throw new InfluxdbError("Status code " + response.getStatusLine.getStatusCode)
      }
      val responseEntity = response.getEntity();

      val responseJson = JsonMethods.parse(responseEntity.getContent)
      val results = (responseJson \ "results").extract[List[JsonResult]]
      val errors = results.filter { _.error != null }.map { _.error }
      if (errors.size > 0) {
        throw new InfluxdbError(errors.mkString(". "))
      }
      val res = results(0).series(0).values.map { x =>
        val cc: List[InfluxValue] = x.map { x => new InfluxString(x.toString()) }
        val z: Sized[Seq[InfluxValue], L] = Sized.wrap[List[InfluxValue], L](cc)
        toF.convert(f, z)
      }
      return res
    } finally {
      response.close();
    }
  }
}