From 2f55918ab64a9ae6edb407bd967a4c1ec188b2d9 Mon Sep 17 00:00:00 2001 From: Yves Fischer Date: Sun, 2 Aug 2015 13:21:03 +0200 Subject: Running example with quartz in akka --- src/main/scala/org/xapek/influxdb/Influxdb.scala | 83 +++++++------- src/main/scala/org/xapek/influxdb/Main.scala | 137 ++++++++++------------- 2 files changed, 105 insertions(+), 115 deletions(-) (limited to 'src/main/scala/org') diff --git a/src/main/scala/org/xapek/influxdb/Influxdb.scala b/src/main/scala/org/xapek/influxdb/Influxdb.scala index ece64a0..172e85b 100644 --- a/src/main/scala/org/xapek/influxdb/Influxdb.scala +++ b/src/main/scala/org/xapek/influxdb/Influxdb.scala @@ -1,10 +1,8 @@ package org.xapek.influxdb import java.util.Date -import org.apache.http.client.methods.HttpGet -import org.apache.http.client.utils.URIBuilder -import org.apache.http.impl.client.HttpClients -import org.apache.http.util.EntityUtils +import java.net.URI +import scala.collection.JavaConversions._ import org.slf4j.LoggerFactory import org.json4s._ import org.json4s.jackson.JsonMethods._ @@ -16,13 +14,16 @@ import shapeless.syntax._ import shapeless.syntax.NatOps import shapeless.syntax.sized._ import shapeless.Succ -import shapeless.ops.sized._ -import shapeless.ops.nat._ import shapeless.ops._ -import java.net.URI -import org.apache.http.client.utils.URLEncodedUtils -import scala.collection.JavaConversions._ +import shapeless.ops.nat._ +import shapeless.ops.sized._ import org.apache.http.NameValuePair +import org.apache.http.client.utils.URLEncodedUtils +import org.apache.http.client.HttpClient +import org.apache.http.client.methods.HttpGet +import org.apache.http.client.utils.URIBuilder +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils abstract class QueryBuilderWithSelectAndFrom[L <: Nat]( val columns: Sized[List[InfluxIdentifier], L], @@ -73,10 +74,6 @@ class QueryBuilderGroupBy[L <: Nat, WhereT <: Expr]( extends QueryBuilderWithSelectAndFrom[L](where.columns, where.table) { def apply(column: String) = new QueryBuilderGroupBy(where, groupBy :+ InfluxDB.influxIdentifier(column)) - // { - // groupBy = groupBy :+ InfluxDB.influxIdentifier(column) - // this - // } override def toString() = where.toString() + " GROUP BY " + groupBy.mkString(", ") } @@ -154,6 +151,9 @@ class InfluxDate(val value: Date) extends Expr with InfluxValue { val time: Long = value.getTime() / 1000 s"${time}s" } + + // TODO fromUTC (String "2015-07-29T10:23:22.991260576Z") + // and fromTimestamp (String "NNNNs") } class InfluxIdentifier(val name: String) extends Expr { @@ -187,7 +187,7 @@ case class JsonResult(series: List[JsonSeries], error: String) { } case class JsonSeries(name: String, columns: List[String], values: List[List[Any]]) { - override def toString() = values.toString() //map { _.map { _.getClass() } }.mkString(", ") + override def toString() = values.toString } class InfluxdbError(val serverMessage: String) @@ -213,26 +213,26 @@ trait ToFunction[N <: Nat] extends Serializable { self => def convert[B](f: self.Out[B], columns: Col): B } class toFunction0 extends ToFunction[Nat._0] { - type Out[X] = Function0[X] - override def convert[B](f: Out[B], columns: Col): B = f() + type Out[X] = Function1[String, X] + override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString) } class toFunction1 extends ToFunction[Nat._1] { - type Out[X] = Function1[InfluxValue, X] - override def convert[B](f: Out[B], columns: Col): B = f(columns(0)) + type Out[X] = Function2[String, InfluxValue, X] + override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1)) } class toFunction2 extends ToFunction[Nat._2] { - type Out[X] = Function2[InfluxValue, InfluxValue, X] - override def convert[B](f: Out[B], columns: Col): B = f(columns(0), columns(1)) + type Out[X] = Function3[String, InfluxValue, InfluxValue, X] + override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2)) } -class ToFunction3 extends ToFunction[Nat._3] { - type Out[X] = Function3[InfluxValue, InfluxValue, InfluxValue, X] - override def convert[B](f: Out[B], columns: Col): B = f(columns(0), columns(1), columns(2)) +class toFunction3 extends ToFunction[Nat._3] { + type Out[X] = Function4[String, InfluxValue, InfluxValue, InfluxValue, X] + override def convert[B](f: Out[B], columns: Col): B = f(columns(0).toString, columns(1), columns(2), columns(3)) } object ToFunction { implicit def iToFunction0 = new toFunction0() implicit def iToFunction1 = new toFunction1() implicit def iToFunction2 = new toFunction2() - implicit def iToFunction3 = new ToFunction3() + implicit def iToFunction3 = new toFunction3() def toFunction[L <: Nat](implicit toF: ToFunction[L]) = toF } @@ -244,11 +244,11 @@ object Converter { class Mapper[L <: Nat, Z <: ToFunction[L]](influxdb: InfluxDB, query: QueryBuilderWithSelectAndFrom[L])( implicit toF: Z) { - def map[B](f: toF.Out[B]): List[B] = { - implicit val formats = DefaultFormats - val httpClient = HttpClients.createDefault(); - val logger = Logger(LoggerFactory.getLogger(getClass)) - val uriBuilder = new URIBuilder() + private def httpClient = HttpClients.createDefault() + private def logger = Logger(LoggerFactory.getLogger(getClass)) + + private def buildURI(): URI = { + return new URIBuilder() .setScheme("http") .setHost(influxdb.host) .setPort(influxdb.port) @@ -256,19 +256,26 @@ class Mapper[L <: Nat, Z <: ToFunction[L]](influxdb: InfluxDB, .setParameter("u", influxdb.username) .setParameter("p", influxdb.password) .setParameter("db", influxdb.db) - .setParameter("q", query.toString()) + .setParameter("q", query.toString) + .build + } + + def map[B](f: toF.Out[B]): List[B] = { + implicit val formats = DefaultFormats + val httpGet = new HttpGet(buildURI()) - val httpGet = new HttpGet(uriBuilder.build()) logger.debug("Query={}", query.toString()) logger.debug("Call: {}", httpGet.getURI) - val response1 = httpClient.execute(httpGet); + val response = httpClient.execute(httpGet); try { - System.out.println(response1.getStatusLine()); - val entity1 = response1.getEntity(); + if (response.getStatusLine().getStatusCode() != 200) { + throw new InfluxdbError("Status code " + response.getStatusLine.getStatusCode) + } + val responseEntity = response.getEntity(); - val json = parse(entity1.getContent) - val results = (json \ "results").extract[List[JsonResult]] + val responseJson = parse(responseEntity.getContent) + val results = (responseJson \ "results").extract[List[JsonResult]] val errors = results.filter { _.error != null }.map { _.error } if (errors.size > 0) { throw new InfluxdbError(errors.mkString(". ")) @@ -280,7 +287,7 @@ class Mapper[L <: Nat, Z <: ToFunction[L]](influxdb: InfluxDB, } return res } finally { - response1.close(); + response.close(); } } } @@ -308,7 +315,7 @@ object InfluxDB { implicit def influxDate(n: Date): InfluxDate = new InfluxDate(n) - implicit def influxDb(url: String): InfluxDB = { + def fromUrl(url: String): InfluxDB = { val uri = new URI(url) var influxdb = new InfluxDB(uri.getHost) URLEncodedUtils.parse(uri, "UTF-8").toList diff --git a/src/main/scala/org/xapek/influxdb/Main.scala b/src/main/scala/org/xapek/influxdb/Main.scala index 10e8511..89fcd2f 100644 --- a/src/main/scala/org/xapek/influxdb/Main.scala +++ b/src/main/scala/org/xapek/influxdb/Main.scala @@ -1,99 +1,82 @@ package org.xapek.influxdb + +import java.util.Date +import java.util.Calendar + import akka.actor.Actor import akka.actor.Props import akka.event.Logging import akka.actor.ActorDSL +import akka.actor.ActorRef import akka.actor.ActorSystem import akka.actor.ActorLogging + +import com.typesafe.akka.extension.quartz.QuartzSchedulerExtension +import com.typesafe.config.ConfigFactory +import org.quartz.impl.StdSchedulerFactory + import org.xapek.influxdb.InfluxDB._ -import java.util.Date -import java.util.Calendar -import shapeless.Nat -import shapeless.Nat._ -import shapeless.ops.nat._ -import shapeless.ops.product.ToRecord -import shapeless.Sized -import shapeless.Succ -import shapeless.SizedOps - -case class Row(columns: Seq[String]) - -case class Write(measurement: String, columns: Seq[String]) -class GenericReader(query: String) extends Actor with ActorLogging { + + +case object TimerSignal + +class InfluxReader(db: InfluxDB, date1: Date, date2: Date, receiver: ActorRef) extends Actor with ActorLogging { + val query = SELECT("Bid")("Ask")("Volume") FROM "yahoo.GOOGL" WHERE time > date1 && time < date2 + val mapper = new Mapper(db, query) def receive = { - case row: Row => log.info("Thanks for the pint: " + row.columns.toString()) + case TimerSignal => + log.info("Run query") + receiver ! (mapper.map { (time, bid, ask, volume) => s"time=$time bid=$bid ask=$ask volume=$volume" }) } } -//object MyReader extends GenericReader(Influxdb SELECT "value" FROM "test" WHERE col("time") == 30) -//trait ToResult[N <: Nat] extends Serializable { -// def apply(x: Int): Int -//} - -//object ToResult { -// def apply[N <: Nat](implicit toInt: ToResult[N]): ToResult[N] = toInt -// -// implicit val toInt0 = new ToResult[Nat._0] { -// def apply(x: Int) = x -// } -// implicit def toIntSucc[N <: Nat](x: Int)(implicit toResultN: ToResult[N]) = new ToResult[Succ[N]] { -// def apply(x: Int) = toResultN(x - 1) + x -// } -//} +class ConsoleWriter extends Actor with ActorLogging { + def receive = { + case value => + log.info(value.toString) + } +} object Main { - import shapeless._ - import syntax.sized._ - - def process(x: Sized[List[InfluxValue], _]) = - (x(0), x(9)) // functor! - // Sized.sizedOps(x).apply(_0) - def main(args: Array[String]): Unit = { - val db: InfluxDB = "http://root:root@localhost:8086/query?db=data&user=root&password=root" - println(db) - - val z = Sized(1) - - val s2 = SELECT("foo")("asd")("ads") FROM ("asd") WHERE (col("asd") == "asd") - - val s3 = SELECT("foo")("bla")("baz") FROM "asd" WHERE time > "asd" + val db: InfluxDB = fromUrl("http://root:root@db.2.localnet.cc:8086/query?db=data&user=root&password=root") val calendar = Calendar.getInstance - calendar.set(Calendar.HOUR_OF_DAY, 0) - calendar.set(Calendar.MINUTE, 0) - calendar.set(Calendar.SECOND, 0) - calendar.set(Calendar.MILLISECOND, 0) - // calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) - 1) - - val x: Function3[Int, Int, Int, String] = { (a: Int, b: Int, c: Int) => null } - // SELECT mean(price) FROM "esg" WHERE $timeFilter AND sku='01011072' GROUP BY time($interval) ORDER BY asc - val s4 = SELECT("product_name") FROM "esg" WHERE col("sku") == "01011072" && time > calendar.getTime - - val xx = new Mapper(db, s4).map { x => "adads: " + x.toString() } - println(xx) - - // println(mapper.map[String]({ (a: InfluxValue, b: InfluxValue, c: InfluxValue) => a + "+" + b + "+" + c })) - - // val x = Converter.toSized[Int, _4](List(1,2,3)) - // println(x) - // println(new Foo(s4.columns).count(List(1, 2, 3, 4, 5, 6))) - - // val z = ToInt[s4.LT]() - // val foo = db(s4) - // .map({ x => x }) - // println(foo) - - // val system = ActorSystem("test") - // val alice = system.actorOf(Props(MyReader), "alice") - // - // alice ! new Row(List("a")) - // - // system.shutdown() - // - // val x : Tuple2[Int,Int] = (1,2) + calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) - 4) + val date1 = calendar.getTime + calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) + 1) + val date2 = calendar.getTime + + // Disable silly update check + System.setProperty(StdSchedulerFactory.PROP_SCHED_SKIP_UPDATE_CHECK, "true"); + + val akkaConfig = ConfigFactory.parseString(""" + akka { + quartz { + defaultTimezone = "UTC" + schedules { + cronEvery5Seconds { + description = "A cron job that fires off every 10 seconds" + expression = "*/5 * * ? * *" + } + } + } + }""".stripMargin) + + println(akkaConfig) + val system = ActorSystem("test", akkaConfig) + val scheduler = QuartzSchedulerExtension(system) + + val writer = system.actorOf(Props(new ConsoleWriter()), "writer") + val reader = system.actorOf(Props(new InfluxReader(db, date1, date2, writer)), "reader") + + scheduler.schedule("cronEvery5Seconds", reader, TimerSignal) + + println("Press key to exit") + System.in.read() + system.shutdown() } } -- cgit v1.2.1