Cassandra source to retrieve data for specific data types with Apache Flink

The default CassandraSink of Apache Flink is used to store data to Cassandra. If you want to retrieve data from Cassandra, you need another implementation that allows the access of the Cassandra cluster and the mapping of the retrieved data.

One solution would be the usage of the CassandraInputFormat that specifies the datatype retrieved as Tuple. The problem with this approach is the limitation to a maximum of 25 entries for the Apache Flink Tuple.

If you have more columns and additional UDT specifications within your data type, you have to implement your own data type that is used to map the retrieved data from Cassandra.

Definition of own Cassandra output format

We have to specify an own Java class for the correct mapping of the data. This class can also be used from within Scala. A possible data type could be as follows:

import com.datastax.driver.mapping.annotations.*;

import java.io.Serializable;
import java.util.UUID;

@Table(name = "mydata", keyspace = "TestImport")
final public class MyDataRow implements Serializable {

  @Column(name = "uuid")
  @PartitionKey
  private UUID uuid = UUID.randomUUID();

  @Column(name = "name")
  private String name = null;

  @Column(name = "hobby")
  @ClusteringColumn(0)
  private String hobby = null;

  @Column(name = "hobby_data")
  @Frozen
  private HobbyMetaDataT hobbyData = new HobbyMetaDataT();

  // Constructors

  // Getter and Setter

The code above defines a row of the data model that is stored as wide row within Cassandra. The Clustering Key is relevant to store the hobbyData depending on the hobby as wide row.

The definiton of the UDT is as follows.

import com.datastax.driver.mapping.annotations.Field;
import com.datastax.driver.mapping.annotations.UDT;

@UDT(name = "hobbyT", keyspace = "TestImport")
final public class HobbyMetaDataT {

  @Field(name = "desc")
  private String description = "";

  @Field(name = "prio")
  private java.lang.Integer priority = null;

  // Constructors

  // Getter and Setter
}

The hobbyT name is the name of the UDT type in the Keyspace of Cassandra.

Implement the InputFormat that wraps the data model

Next step is the implementation of the InputFormat that wraps our previous data model.

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.NonParallelInput;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.collection.Seq;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

public class CassandraOutFormat<OUT extends MyDataRow> extends RichInputFormat<MyDataRow, InputSplit> implements NonParallelInput {

  /**
   * The list of columns that will be mapped by [[MyDataRowOps.set]]
   */
  private final List<String> myDataRowColumns = Arrays.asList(
    "uuid", "name", "hobby", "hobbyData"
  );

  private static final Logger LOG = LoggerFactory.getLogger(CassandraOutFormat.class);

  private final String query;
  private final ClusterBuilder builder;

  private transient Cluster cluster;
  private transient Session session;
  private transient ResultSet resultSet;

  public CassandraOutFormat(String query, ClusterBuilder builder) {
    Preconditions.checkArgument(!query.isEmpty(), "Query cannot be null or empty");
    Preconditions.checkArgument(builder != null, "Builder cannot be null");

      this.query = query;
      this.builder = builder;
  }

  @Override
  public void configure(Configuration parameters) {
    this.cluster = builder.getCluster();
  }

  @Override
  public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
    return cachedStatistics;
  }

  /**
   * Opens a Session and executes the query.
   *
   * @param ignored
   * @throws IOException
   */
  @Override
  public void open(InputSplit ignored) throws IOException {
    this.session = cluster.connect();
    this.resultSet = session.execute(query);
  }

  @Override
  public boolean reachedEnd() throws IOException {
    return resultSet.isExhausted();
  }

  @Override
  public MyDataRow nextRecord(MyDataRow reuse) throws IOException {
    final Row item = resultSet.one();
    Seq s = JavaConverters.asScalaIteratorConverter(myDataRowColumns.iterator()).asScala().toSeq();
    return FlinkRowOps$.MODULE$.toMyDataRowFromDatastax(s.toList(), item);
  }

  @Override
  public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
    GenericInputSplit[] split = {new GenericInputSplit(0, 1)};
    return split;
  }

  @Override
  public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
    return new DefaultInputSplitAssigner(inputSplits);
  }

  /**
   * Closes all resources used.
   */
  @Override
  public void close() throws IOException {
    try {
      if (session != null) {
        session.close();
      }
    } catch (Exception e) {
      LOG.error("Error while closing session.", e);
    }

    try {
      if (cluster != null) {
        cluster.close();
      }
    } catch (Exception e) {
      LOG.error("Error while closing cluster.", e);
    }
  }
}

The important and relevant steps are:

  • The OUT is necessary to have the correct mapping within Apache Flink Streams.
  • The RichInputFormat needs the specific class MyDataRow in its specification.

Helper classes and methods

Conversion of the retrieved Row to the internal data model

import your.path.to.MyDataRowOps.syntax._
import org.apache.flink.types.Row

object FlinkRowOps {
  /**
    * Parse the given flink row and create a MyDataRow row from the values
    * using the given column names and ordering.
    *
    * @param cs An ordered list of columns which must map to the order in the flink `Row`.
    * @param r  A Datastax row.
    * @return A table row of the mydatarow table.
    */
  @throws[IndexOutOfBoundsException](
    cause = "The given row did not contain a requested column (index)."
  )
  def toMyDataRowFromDatastax(
      cs: Seq[String]
  )(r: com.datastax.driver.core.Row): MyDataRow = {
    val blankRow = new MyDataRow()
    cs.zipWithIndex.foreach { t =>
      val (name, idx) = t
      val value       = r.getObject(idx)
      blankRow.set(name)(value)
    }
    blankRow
  }
}

Helper object that provides functionality to set the single fields of the internal data model.

import java.util.UUID
import scala.util.Try

trait MyDataRowOps {
  def set(r: MyDataRow)(n: String)(v: Any): Unit
}

object MyDataRowOps {
  /**
   * Convert the Datastax UDT of HobbyMetaDataT to the internal datatype.
   *
   * @param v Datastax UDT of HobbyMetaDataT
   * @return HobbyMetaDataT
   */
  def createHobbyMetaDataT(v: UDTValue): HobbyMetaDataT = {
    val desc = v.get("desc", classOf[String])
    val prio = v.get("prio", classOf[java.lang.Integer])
    new HobbyMetaDataT(desc, prio)
  }

  @throws[MatchError](cause = "The provided attribute name does not map to an attribute!")
  def setAttribute(r: MyDataRow)(n: String)(v: Any): Unit =
    n match {
      case "uuid" =>
        r.setUUID(Try(v.asInstanceOf[UUID]).toOption.orNull)
      case "name" =>
        r.setName(Try(v.asInstanceOf[String]).toOption.orNull)
      case "hobby" =>
        r.setHobby(Try(v.asInstanceOf[String]).toOption.orNull)
      case "hobbyData" =>
        Try(v.asInstanceOf[UDTValue]).toOption.fold(r.setHobbyData(null)) { u =>
          r.setHobbyData(Try(createHobbyMetaDataT(u)).toOption.orNull)
        }
    }

  /**
   * Implementation of the type class.
   */
  implicit object MyDataRowOpsImpl extends MyDataRowOps {
    override def set(r: MyDataRow)(n: String)(v: Any): Unit =
      setAttribute(r)(n)(v)
  }

  /**
   * Provide syntactic sugar for working on the table rows.
   */
  object syntax {
    /**
     * Concrete implementation to provide syntactic sugar on MyDataRow rows.
     *
     * @param r An instance of a table row.
     */
    implicit final class WrapMyDataRowOps(val r: MyDataRow) extends AnyVal {
      def set(name: String)(value: Any)(implicit ev: MyDataRowOps): Unit =
        ev.set(r)(name)(value)
    }
  }
}

Execute a stream query against Cassandra with own data type

The following example executes a query to retrieve data from Cassandra.

object Test {
  def main(args: Array[String]): Unit = {
    val senv = StreamExecutionEnvironment.getExecutionEnvironment

    val source = senv.createInput[MyDataRow](
      new CassandraOutFormat[MyDataRow](
        "SELECT uuid,name,hobby,hobby_data FROM TestImport.mydata WHERE uuid = x;",
        new ClusterBuilder() {
          override def buildCluster(builder: Cluster.Builder): Cluster =
            builder.addContactPoint("127.0.0.1").build() // local test
        }
      )
    )

    val result = source
      .setParallelism(1)
    val w = result.writeAsText("/tmp/data")
    val _ = senv.execute()
  }
}

It is important to mention that each query should specify the correct order of the columns of the specified data model.

Leave a Reply