summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYves Fischer <yvesf-git@xapek.org>2015-08-02 13:21:03 +0200
committerYves Fischer <yvesf-git@xapek.org>2015-08-02 13:21:03 +0200
commit2f55918ab64a9ae6edb407bd967a4c1ec188b2d9 (patch)
tree771d5287ce991bec7bca4209c450710a7a809100
parent653fe83aad54dad0abd9e2d61843b3cb559d458b (diff)
downloadinfluxdb-tools-2f55918ab64a9ae6edb407bd967a4c1ec188b2d9.tar.gz
influxdb-tools-2f55918ab64a9ae6edb407bd967a4c1ec188b2d9.zip
Running example with quartz in akka
-rw-r--r--pom.xml6
-rw-r--r--src/main/scala/org/xapek/influxdb/Influxdb.scala83
-rw-r--r--src/main/scala/org/xapek/influxdb/Main.scala137
-rw-r--r--src/test/scala/org/xapek/influxdb/InfluxdbTest.scala2
4 files changed, 111 insertions, 117 deletions
diff --git a/pom.xml b/pom.xml
index fd9a28b..8cb399b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,7 +58,11 @@
<version>1.2.5</version>
<scope>test</scope>
</dependency>
-
+ <dependency>
+ <groupId>com.enragedginger</groupId>
+ <artifactId>akka-quartz-scheduler_${scala.version}</artifactId>
+ <version>1.4.0-akka-2.3.x</version>
+ </dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
diff --git a/src/main/scala/org/xapek/influxdb/Influxdb.scala b/src/main/scala/org/xapek/influxdb/Influxdb.scala
index ece64a0..172e85b 100644
--- a/src/main/scala/org/xapek/influxdb/Influxdb.scala
+++ b/src/main/scala/org/xapek/influxdb/Influxdb.scala
@@ -1,10 +1,8 @@
package org.xapek.influxdb
import java.util.Date
-import org.apache.http.client.methods.HttpGet
-import org.apache.http.client.utils.URIBuilder
-import org.apache.http.impl.client.HttpClients
-import org.apache.http.util.EntityUtils
+import java.net.URI
+import scala.collection.JavaConversions._
import org.slf4j.LoggerFactory
import org.json4s._
import org.json4s.jackson.JsonMethods._
@@ -16,13 +14,16 @@ import shapeless.syntax._
import shapeless.syntax.NatOps
import shapeless.syntax.sized._
import shapeless.Succ
-import shapeless.ops.sized._
-import shapeless.ops.nat._
import shapeless.ops._
-import java.net.URI
-import org.apache.http.client.utils.URLEncodedUtils
-import scala.collection.JavaConversions._
+import shapeless.ops.nat._
+import shapeless.ops.sized._
import org.apache.http.NameValuePair
+import org.apache.http.client.utils.URLEncodedUtils
+import org.apache.http.client.HttpClient
+import org.apache.http.client.methods.HttpGet
+import org.apache.http.client.utils.URIBuilder
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.util.EntityUtils
abstract class QueryBuilderWithSelectAndFrom[L <: Nat](
val columns: Sized[List[InfluxIdentifier], L],
@@ -73,10 +74,6 @@ class QueryBuilderGroupBy[L <: Nat, WhereT <: Expr](
extends QueryBuilderWithSelectAndFrom[L](where.columns, where.table) {
def apply(column: String) = new QueryBuilderGroupBy(where, groupBy :+ InfluxDB.influxIdentifier(column))
- // {
- // groupBy = groupBy :+ InfluxDB.influxIdentifier(column)
- // this
- // }
override def toString() = where.toString() + " GROUP BY " + groupBy.mkString(", ")
}
@@ -154,6 +151,9 @@ class InfluxDate(val value: Date) extends Expr with InfluxValue {
val time: Long = value.getTime() / 1000
s"${time}s"
}
+
+ // TODO fromUTC (String "2015-07-29T10:23:22.991260576Z")
+ // and fromTimestamp (String "NNNNs")
}
class InfluxIdentifier(val name: String) extends Expr {
@@ -187,7 +187,7 @@ case class JsonResult(series: List[JsonSeries], error: String) {
}
case class JsonSeries(name: String, columns: List[String], values: List[List[Any]]) {
- override def toString() = values.toString() //map { _.map { _.getClass() } }.mkString(", ")
+ override def toString() = values.toString
}
class InfluxdbError(val serverMessage: String)
@@ -213,26 +213,26 @@ trait ToFunction[N <: Nat] extends Serializable { self =>
def convert[B](f: self.Out[B], columns: Col): B
}
class toFunction0 extends ToFunction[Nat._0] {
- type Out[X] = Function0[X]
- override def convert[B](f: Out[B], columns: Col): B = f()
+ type Out[X] = Function1[String, X]
+ override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString)
}
class toFunction1 extends ToFunction[Nat._1] {
- type Out[X] = Function1[InfluxValue, X]
- override def convert[B](f: Out[B], columns: Col): B = f(columns(0))
+ type Out[X] = Function2[String, InfluxValue, X]
+ override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1))
}
class toFunction2 extends ToFunction[Nat._2] {
- type Out[X] = Function2[InfluxValue, InfluxValue, X]
- override def convert[B](f: Out[B], columns: Col): B = f(columns(0), columns(1))
+ type Out[X] = Function3[String, InfluxValue, InfluxValue, X]
+ override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2))
}
-class ToFunction3 extends ToFunction[Nat._3] {
- type Out[X] = Function3[InfluxValue, InfluxValue, InfluxValue, X]
- override def convert[B](f: Out[B], columns: Col): B = f(columns(0), columns(1), columns(2))
+class toFunction3 extends ToFunction[Nat._3] {
+ type Out[X] = Function4[String, InfluxValue, InfluxValue, InfluxValue, X]
+ override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2), columns(3))
}
object ToFunction {
implicit def iToFunction0 = new toFunction0()
implicit def iToFunction1 = new toFunction1()
implicit def iToFunction2 = new toFunction2()
- implicit def iToFunction3 = new ToFunction3()
+ implicit def iToFunction3 = new toFunction3()
def toFunction[L <: Nat](implicit toF: ToFunction[L]) = toF
}
@@ -244,11 +244,11 @@ object Converter {
class Mapper[L <: Nat, Z <: ToFunction[L]](influxdb: InfluxDB,
query: QueryBuilderWithSelectAndFrom[L])(
implicit toF: Z) {
- def map[B](f: toF.Out[B]): List[B] = {
- implicit val formats = DefaultFormats
- val httpClient = HttpClients.createDefault();
- val logger = Logger(LoggerFactory.getLogger(getClass))
- val uriBuilder = new URIBuilder()
+ private def httpClient = HttpClients.createDefault()
+ private def logger = Logger(LoggerFactory.getLogger(getClass))
+
+ private def buildURI(): URI = {
+ return new URIBuilder()
.setScheme("http")
.setHost(influxdb.host)
.setPort(influxdb.port)
@@ -256,19 +256,26 @@ class Mapper[L <: Nat, Z <: ToFunction[L]](influxdb: InfluxDB,
.setParameter("u", influxdb.username)
.setParameter("p", influxdb.password)
.setParameter("db", influxdb.db)
- .setParameter("q", query.toString())
+ .setParameter("q", query.toString)
+ .build
+ }
+
+ def map[B](f: toF.Out[B]): List[B] = {
+ implicit val formats = DefaultFormats
+ val httpGet = new HttpGet(buildURI())
- val httpGet = new HttpGet(uriBuilder.build())
logger.debug("Query={}", query.toString())
logger.debug("Call: {}", httpGet.getURI)
- val response1 = httpClient.execute(httpGet);
+ val response = httpClient.execute(httpGet);
try {
- System.out.println(response1.getStatusLine());
- val entity1 = response1.getEntity();
+ if (response.getStatusLine().getStatusCode() != 200) {
+ throw new InfluxdbError("Status code " + response.getStatusLine.getStatusCode)
+ }
+ val responseEntity = response.getEntity();
- val json = parse(entity1.getContent)
- val results = (json \ "results").extract[List[JsonResult]]
+ val responseJson = parse(responseEntity.getContent)
+ val results = (responseJson \ "results").extract[List[JsonResult]]
val errors = results.filter { _.error != null }.map { _.error }
if (errors.size > 0) {
throw new InfluxdbError(errors.mkString(". "))
@@ -280,7 +287,7 @@ class Mapper[L <: Nat, Z <: ToFunction[L]](influxdb: InfluxDB,
}
return res
} finally {
- response1.close();
+ response.close();
}
}
}
@@ -308,7 +315,7 @@ object InfluxDB {
implicit def influxDate(n: Date): InfluxDate = new InfluxDate(n)
- implicit def influxDb(url: String): InfluxDB = {
+ def fromUrl(url: String): InfluxDB = {
val uri = new URI(url)
var influxdb = new InfluxDB(uri.getHost)
URLEncodedUtils.parse(uri, "UTF-8").toList
diff --git a/src/main/scala/org/xapek/influxdb/Main.scala b/src/main/scala/org/xapek/influxdb/Main.scala
index 10e8511..89fcd2f 100644
--- a/src/main/scala/org/xapek/influxdb/Main.scala
+++ b/src/main/scala/org/xapek/influxdb/Main.scala
@@ -1,99 +1,82 @@
package org.xapek.influxdb
+
+import java.util.Date
+import java.util.Calendar
+
import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging
import akka.actor.ActorDSL
+import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.ActorLogging
+
+import com.typesafe.akka.extension.quartz.QuartzSchedulerExtension
+import com.typesafe.config.ConfigFactory
+import org.quartz.impl.StdSchedulerFactory
+
import org.xapek.influxdb.InfluxDB._
-import java.util.Date
-import java.util.Calendar
-import shapeless.Nat
-import shapeless.Nat._
-import shapeless.ops.nat._
-import shapeless.ops.product.ToRecord
-import shapeless.Sized
-import shapeless.Succ
-import shapeless.SizedOps
-
-case class Row(columns: Seq[String])
-
-case class Write(measurement: String, columns: Seq[String])
-class GenericReader(query: String) extends Actor with ActorLogging {
+
+
+case object TimerSignal
+
+class InfluxReader(db: InfluxDB, date1: Date, date2: Date, receiver: ActorRef) extends Actor with ActorLogging {
+ val query = SELECT("Bid")("Ask")("Volume") FROM "yahoo.GOOGL" WHERE time > date1 && time < date2
+ val mapper = new Mapper(db, query)
def receive = {
- case row: Row => log.info("Thanks for the pint: " + row.columns.toString())
+ case TimerSignal =>
+ log.info("Run query")
+ receiver ! (mapper.map { (time, bid, ask, volume) => s"time=$time bid=$bid ask=$ask volume=$volume" })
}
}
-//object MyReader extends GenericReader(Influxdb SELECT "value" FROM "test" WHERE col("time") == 30)
-//trait ToResult[N <: Nat] extends Serializable {
-// def apply(x: Int): Int
-//}
-
-//object ToResult {
-// def apply[N <: Nat](implicit toInt: ToResult[N]): ToResult[N] = toInt
-//
-// implicit val toInt0 = new ToResult[Nat._0] {
-// def apply(x: Int) = x
-// }
-// implicit def toIntSucc[N <: Nat](x: Int)(implicit toResultN: ToResult[N]) = new ToResult[Succ[N]] {
-// def apply(x: Int) = toResultN(x - 1) + x
-// }
-//}
+class ConsoleWriter extends Actor with ActorLogging {
+ def receive = {
+ case value =>
+ log.info(value.toString)
+ }
+}
object Main {
- import shapeless._
- import syntax.sized._
-
- def process(x: Sized[List[InfluxValue], _]) =
- (x(0), x(9)) // functor!
- // Sized.sizedOps(x).apply(_0)
-
def main(args: Array[String]): Unit = {
- val db: InfluxDB = "http://root:root@localhost:8086/query?db=data&user=root&password=root"
- println(db)
-
- val z = Sized(1)
-
- val s2 = SELECT("foo")("asd")("ads") FROM ("asd") WHERE (col("asd") == "asd")
-
- val s3 = SELECT("foo")("bla")("baz") FROM "asd" WHERE time > "asd"
+ val db: InfluxDB = fromUrl("http://root:root@db.2.localnet.cc:8086/query?db=data&user=root&password=root")
val calendar = Calendar.getInstance
- calendar.set(Calendar.HOUR_OF_DAY, 0)
- calendar.set(Calendar.MINUTE, 0)
- calendar.set(Calendar.SECOND, 0)
- calendar.set(Calendar.MILLISECOND, 0)
- // calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) - 1)
-
- val x: Function3[Int, Int, Int, String] = { (a: Int, b: Int, c: Int) => null }
- // SELECT mean(price) FROM "esg" WHERE $timeFilter AND sku='01011072' GROUP BY time($interval) ORDER BY asc
- val s4 = SELECT("product_name") FROM "esg" WHERE col("sku") == "01011072" && time > calendar.getTime
-
- val xx = new Mapper(db, s4).map { x => "adads: " + x.toString() }
- println(xx)
-
- // println(mapper.map[String]({ (a: InfluxValue, b: InfluxValue, c: InfluxValue) => a + "+" + b + "+" + c }))
-
- // val x = Converter.toSized[Int, _4](List(1,2,3))
- // println(x)
- // println(new Foo(s4.columns).count(List(1, 2, 3, 4, 5, 6)))
-
- // val z = ToInt[s4.LT]()
- // val foo = db(s4)
- // .map({ x => x })
- // println(foo)
-
- // val system = ActorSystem("test")
- // val alice = system.actorOf(Props(MyReader), "alice")
- //
- // alice ! new Row(List("a"))
- //
- // system.shutdown()
- //
- // val x : Tuple2[Int,Int] = (1,2)
+ calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) - 4)
+ val date1 = calendar.getTime
+ calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) + 1)
+ val date2 = calendar.getTime
+
+ // Disable silly update check
+ System.setProperty(StdSchedulerFactory.PROP_SCHED_SKIP_UPDATE_CHECK, "true");
+
+ val akkaConfig = ConfigFactory.parseString("""
+ akka {
+ quartz {
+ defaultTimezone = "UTC"
+ schedules {
+ cronEvery5Seconds {
+ description = "A cron job that fires off every 10 seconds"
+ expression = "*/5 * * ? * *"
+ }
+ }
+ }
+ }""".stripMargin)
+
+ println(akkaConfig)
+ val system = ActorSystem("test", akkaConfig)
+ val scheduler = QuartzSchedulerExtension(system)
+
+ val writer = system.actorOf(Props(new ConsoleWriter()), "writer")
+ val reader = system.actorOf(Props(new InfluxReader(db, date1, date2, writer)), "reader")
+
+ scheduler.schedule("cronEvery5Seconds", reader, TimerSignal)
+
+ println("Press key to exit")
+ System.in.read()
+ system.shutdown()
}
}
diff --git a/src/test/scala/org/xapek/influxdb/InfluxdbTest.scala b/src/test/scala/org/xapek/influxdb/InfluxdbTest.scala
index 9284780..084b25f 100644
--- a/src/test/scala/org/xapek/influxdb/InfluxdbTest.scala
+++ b/src/test/scala/org/xapek/influxdb/InfluxdbTest.scala
@@ -9,7 +9,7 @@ class AppTest {
@Test
def testStatements() = {
- val db :InfluxDB = "http://localhost"
+ val db :InfluxDB = fromUrl("http://localhost")
assertTrue((SELECT("foo") FROM "bla" WHERE col("a") == "asd")
.toString().contains("as"))