我尝试写一块代码这并如下:-
- 读大csv文件从遥远的来源如s3。
- 过程的文件记录的记录。
- 将通知发送给用户
- 写的输出到一个遥远的位置
样品记录中输入csv:
recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000
我输入的情况下,这类表示的记录中输入csv:
case class InputRecord(recordId: String, name: String, salary: Long)
样品记录在输出csv(这需要编写):
recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager
我的输出类情况下,它表示一个记录中输入csv:
case class OutputRecord(recordId: String, name: String, designation: String)
读一记录使用akka流csv(使用Alpakka反应s3 https://doc.akka.io/docs/alpakka/current/s3.html):
def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] =
S3.download(s3Object.bucket, s3Object.path)
.runWith(Sink.head)
// This is then converted to csv
现在我有一个功能处理的记录:
def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer
能写OutputRecord为csv
def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] =
S3.multipartUpload(s3Object.bucket,
s3Object.path,
metaHeaders = MetaHeaders(Map())
能发送电子邮件通知:
def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info
缝在一起
readAsCSV.flatMap { recordSource =>
recordSource.map { record
val outputRecord = process(record)
outputRecord
}
.via(notify) //Error: Line 15
.to(writeOutput) //Error: Line 16
.run()
}
在线15和16我得到一个错误,我要么能够增加15行或行16但不都是由于两个 notify
& writeOutput
需要 outputRecord
. 一旦通知是叫我失去我的 outputRecord
.
有没有办法我可以添加两个 notify
和 writeOutput
到同样的图?
我不是寻找并行执行,因为我想到第一个电话 notify
然后只 writeOutput
. 所以这是不是有帮助: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing
使用的情况下似乎非常简单的我但是,有些如何我不能找到一个干净的解决方案。