Andre Schütz - Learn. Think. Reflect - Tech developer
Akka stream with Alpakka and FTP connector – close the stream

Akka stream with Alpakka and FTP connector – close the stream

If you use an Alpakka FTP connector, you have to close the stream at the end to finish the connection correctly. The default behavior is to sent a `akka.actor.Status.Success` message to the ActorRef.

actor ! akka.actor.Status.Success

I got the following error message when sending the message like above.

java.lang.ClassCastException: akka.actor.Status$Success$ cannot be cast to akka.util.ByteString

The FTP connector requires a ByteString when writing content to the target of the stream.

The following code demonstrates a simple implementation of a FTP connector:

val uri: java.net.URI: = ...
val username = USERNAME
val password = PASSWORD
val DEFAULT_CHARSET = "UTF-8"

val byteSource: Source[ByteString, ActorRef] =
  Source.actorRef[ByteString](Int.MaxValue, OverflowStrategy.fail)

val host = InetAddress.getByName(uri.getHost)
val port = 21
val credentials = NonAnonFtpCredentials(username, password)

val settings =
  FtpSettings(
    host, port, credentials, binary = true, passiveMode = false
  )
val ftpConnection: Sink[ByteString, Future[IOResult]] = 
  Ftp.toPath(path, settings, append = true)

val writer = Flow[ByteString].to(ftpConnection).runWith(byteSource)

You can send a message to the stream in the following manner:

writer ! ByteString("MESSAGE".getBytes(DEFAULT_CHARSET))

Back to the `ClassCastException`. If you want to close the stream correctly, you have to send a close message with the correct data type that is expected by the stream. Example:

actor ! akka.actor.Status.Success("Success".getBytes(DEFAULT_CHARSET))

Leave a Reply