1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
package org.xapek.influxdbExample
import java.util.Date
import java.util.Calendar
import java.util.concurrent.TimeUnit
import akka.actor.Actor
import akka.actor.Props
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.ActorLogging
import scala.concurrent.duration.Duration
import org.xapek.influxdb.InfluxDB._
import org.xapek.influxdb.InfluxDB
import org.xapek.influxdb.Mapper
case object TimerSignal
class InfluxReader(db: InfluxDB, date1: Date, date2: Date, receiver: ActorRef) extends Actor with ActorLogging {
val query = SELECT("Bid")("Ask")("Volume") FROM "yahoo.GOOGL" WHERE time > date1 && time < date2
val mapper = new Mapper(db, query)
def receive = {
case TimerSignal =>
log.info("Run query")
receiver ! (mapper.map { (time, bid, ask, volume) => s"time=$time bid=$bid ask=$ask volume=$volume" })
}
}
class ConsoleWriter extends Actor with ActorLogging {
def receive = {
case value =>
log.info(value.toString)
}
}
object Main {
def main(args: Array[String]): Unit = {
val db: InfluxDB = fromUrl("http://root:root@db.2.localnet.cc:8086/query?db=data&user=root&password=root")
val calendar = Calendar.getInstance
calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) - 4)
val date1 = calendar.getTime
calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) + 1)
val date2 = calendar.getTime
val system = ActorSystem("AkkaSystem")
val writer = system.actorOf(Props(new ConsoleWriter()), "writer")
val reader = system.actorOf(Props(new InfluxReader(db, date1, date2, writer)), "reader")
system.scheduler.schedule(Duration.Zero, Duration.create(5, TimeUnit.SECONDS),
reader, TimerSignal)(system.dispatcher, null)
println("Press key to exit")
System.in.read()
system.shutdown()
}
}
|