Akka流输入(`在`)作为输出(`退出`)

0

的问题

我尝试写一块代码这并如下:-

  1. 读大csv文件从遥远的来源如s3。
  2. 过程的文件记录的记录。
  3. 将通知发送给用户
  4. 写的输出到一个遥远的位置

样品记录中输入csv:

recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000

我输入的情况下,这类表示的记录中输入csv:

case class InputRecord(recordId: String, name: String, salary: Long)

样品记录在输出csv(这需要编写):

recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager

我的输出类情况下,它表示一个记录中输入csv:

case class OutputRecord(recordId: String, name: String, designation: String)

读一记录使用akka流csv(使用Alpakka反应s3 https://doc.akka.io/docs/alpakka/current/s3.html):

def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] = 
S3.download(s3Object.bucket, s3Object.path)
      .runWith(Sink.head)
// This is then converted to csv

现在我有一个功能处理的记录:

def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer

能写OutputRecord为csv

def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] = 
S3.multipartUpload(s3Object.bucket,
                       s3Object.path,
                       metaHeaders = MetaHeaders(Map())

能发送电子邮件通知:

def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info

缝在一起

readAsCSV.flatMap { recordSource =>
  recordSource.map { record
    val outputRecord = process(record)
    outputRecord
  }
  .via(notify) //Error: Line 15
  .to(writeOutput) //Error: Line 16
  .run()
}

在线15和16我得到一个错误,我要么能够增加15行或行16但不都是由于两个 notify & writeOutput 需要 outputRecord. 一旦通知是叫我失去我的 outputRecord.

有没有办法我可以添加两个 notifywriteOutput 到同样的图?

我不是寻找并行执行,因为我想到第一个电话 notify 然后只 writeOutput. 所以这是不是有帮助: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing

使用的情况下似乎非常简单的我但是,有些如何我不能找到一个干净的解决方案。

akka akka-stream alpakka amazon-s3
2021-11-23 22:36:54
1

最好的答案

1

输出 notify 是一个 PushResult但输入的 writeOutputByteString. 一旦改变,它将编译。 如果你需要 ByteString,获得相同 OutputRecord.

顺便说一句,在这样的代码,你已提供的,一个类似的错误的存在 readCSVprocess.

2021-11-24 03:36:16

其他语言

此页面有其他语言版本

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................