package org.xapek.influxdbExample import java.util.Date import java.util.Calendar import java.util.concurrent.TimeUnit import akka.actor.Actor import akka.actor.Props import akka.actor.ActorRef import akka.actor.ActorSystem import akka.actor.ActorLogging import scala.concurrent.duration.Duration import org.xapek.influxdb.InfluxDB._ import org.xapek.influxdb.InfluxDB import org.xapek.influxdb.Mapper case object TimerSignal class InfluxReader(db: InfluxDB, date1: Date, date2: Date, receiver: ActorRef) extends Actor with ActorLogging { val query = SELECT("Bid")("Ask")("Volume") FROM "yahoo.GOOGL" WHERE time > date1 && time < date2 val mapper = new Mapper(db, query) def receive = { case TimerSignal => log.info("Run query") receiver ! (mapper.map { (time, bid, ask, volume) => s"time=$time bid=$bid ask=$ask volume=$volume" }) } } class ConsoleWriter extends Actor with ActorLogging { def receive = { case value => log.info(value.toString) } } object Main { def main(args: Array[String]): Unit = { val db: InfluxDB = fromUrl("http://root:root@db.2.localnet.cc:8086/query?db=data&user=root&password=root") val calendar = Calendar.getInstance calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) - 4) val date1 = calendar.getTime calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) + 1) val date2 = calendar.getTime val system = ActorSystem("AkkaSystem") val writer = system.actorOf(Props(new ConsoleWriter()), "writer") val reader = system.actorOf(Props(new InfluxReader(db, date1, date2, writer)), "reader") system.scheduler.schedule(Duration.Zero, Duration.create(5, TimeUnit.SECONDS), reader, TimerSignal)(system.dispatcher, null) println("Press key to exit") System.in.read() system.shutdown() } }