SDP(6):分散式資料庫運算環境- Cassandra-Engine

来源:https://www.cnblogs.com/tiger-xc/archive/2018/02/22/8457938.html
-Advertisement-
Play Games

現代信息系統應該是避不開大數據處理的。作為一個通用的系統集成工具也必須具備大數據存儲和讀取能力。cassandra是一種分散式的資料庫,具備了分散式資料庫高可用性(high-availability)特性,對於一個實時大型分散式集成系統來說是核心支柱。與傳統的關係資料庫對比,cassandra從數據 ...


    現代信息系統應該是避不開大數據處理的。作為一個通用的系統集成工具也必須具備大數據存儲和讀取能力。cassandra是一種分散式的資料庫,具備了分散式資料庫高可用性(high-availability)特性,對於一個實時大型分散式集成系統來說是核心支柱。與傳統的關係資料庫對比,cassandra從數據存儲結構、讀取方式等可以說是皆然不同的。如:cassandra庫表設計是反範式的(denormalized)、表結構設計是反過來根據query要求設計的,等等。幸運的是自版本3.0後cassandra提供了CQL來支持資料庫操作。簡單來說CQL就是cassandra的SQL。CQL是一種query語言,在語法上與SQL相近。最重要的是CQL用SQL的呈現方式來描述cassandra底層數據的存儲方式,讓熟悉了關係資料庫SQL編程人員能夠容易開始使用cassandra。與SQL一樣,CQL也是一種純文本語言,可以通過多種終端介面軟體包括java-client來運行CQL腳本。 目前在市面上有一些現成的cassandra客戶端編程軟體,有些為了實現類型安全(type-safety)還提供了Linq-DSL(language-integrated-query),但因為我們需要面向各種cassandra資料庫用戶,所以還是決定提供一種CQL腳本運算環境,也就是說Cassandra-Engine接受CQL腳本然後運算得出結果。

和JDBC的運算結構很相似:CQL運算也是先構建statement然後execute。與JDBC不同的是:CQL還提供non-blocking腳本運算: 

   /**
     * Executes the provided query asynchronously.
     * <p/>
     * This method does not block. It returns as soon as the query has been
     * passed to the underlying network stack. In particular, returning from
     * this method does not guarantee that the query is valid or has even been
     * submitted to a live node. Any exception pertaining to the failure of the
     * query will be thrown when accessing the {@link ResultSetFuture}.
     * <p/>
     * Note that for queries that don't return a result (INSERT, UPDATE and
     * DELETE), you will need to access the ResultSetFuture (that is, call one of
     * its {@code get} methods to make sure the query was successful.
     *
     * @param statement the CQL query to execute (that can be any {@code Statement}).
     * @return a future on the result of the query.
     * @throws UnsupportedFeatureException if the protocol version 1 is in use and
     *                                     a feature not supported has been used. Features that are not supported by
     *                                     the version protocol 1 include: BatchStatement, ResultSet paging and binary
     *                                     values in RegularStatement.
     */
    ResultSetFuture executeAsync(Statement statement);

executeAsync返回結果ResultSsetFuture是個google-guava-future。我們可以用隱式轉換(implicit conversion)把它轉換成scala-future來使用: 

 implicit def listenableFutureToFuture[T](
               listenableFuture: ListenableFuture[T]): Future[T] = {
    val promise = Promise[T]()
    Futures.addCallback(listenableFuture, new FutureCallback[T] {
      def onFailure(error: Throwable): Unit = {
        promise.failure(error)
        ()
      }
      def onSuccess(result: T): Unit = {
        promise.success(result)
        ()
      }
    })
    promise.future
  }

有了這個隱式實例executeAsync返回結果自動轉成Future[?],如下:

  def cqlSingleUpdate(ctx: CQLContext)(
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
...
      session.executeAsync(boundStmt).map(_.wasApplied())
  }

我們還是通過某種Context方式來構建完整可執行的statement:

case class CQLContext(
                       statements: Seq[String],
                       parameters: Seq[Seq[Object]] = Nil,
                       consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None
                     ) { ctx =>
  def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext =
    ctx.copy(consistency = Some(_consistency))
  def setCommand(_statement: String, _parameters: Object*): CQLContext =
    ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters))
  def appendCommand(_statement: String, _parameters: Object*): CQLContext =
    ctx.copy(statements = ctx.statements :+ _statement,
      parameters = ctx.parameters ++ Seq(_parameters))
}

與JDBCContext不同的是這個consistencyLevel。因為數據是重覆分佈在多個集群節點上的,所以需要通過consistencyLevel來註明分散式數據的讀寫方式:

  def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => {
    consistency match {
      case ALL => ConsistencyLevel.ALL
      case ONE => ConsistencyLevel.ONE
      case TWO => ConsistencyLevel.TWO
      case THREE => ConsistencyLevel.THREE
      case ANY => ConsistencyLevel.ANY
      case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM
      case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE
      case QUORUM => ConsistencyLevel.QUORUM
      case SERIAL => ConsistencyLevel.SERIAL
      case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL
    }
  }

CQL statement 分simplestatement, preparedstatement和boundstatement。boundstatement可以覆蓋所有類型的CQL statement構建要求。下麵是一個構建boundstatement的例子:

   val prepStmt = session.prepare(ctx.statement)

    var boundStmt =  prepStmt.bind()
    if (ctx.parameter != Nil) {
      val params = processParameters(ctx.parameter)
      boundStmt = prepStmt.bind(params:_*)
    }

CQL statement參數類型比較複雜,包括date,timestamp等都必須經過processParameters函數進行預處理:

  case class CQLDate(year: Int, month: Int, day: Int)
  case object CQLTodayDate
  case class CQLDateTime(year: Int, Month: Int,
                         day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0)
  case object CQLDateTimeNow

  def processParameters(params: Seq[Object]): Seq[Object] = {
    params.map { obj =>
      obj match {
        case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd)
        case CQLTodayDate =>
          val today = java.time.LocalDate.now()
          LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth)
        case CQLDateTimeNow => Instant.now()
        case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) =>
          Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d")
        case p@_ => p
      }
    }
  }

CassandraEngine更新運算分為單條update和批次update。批次update與事物處理有異曲同工之效:批次中任何一條腳本運算失敗則回滾所有更新:

 def cqlExecute(ctx: CQLContext)(
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
    if (ctx.statements.size == 1)
      cqlSingleUpdate(ctx)
    else
      cqlMultiUpdate(ctx)
  }
  def cqlSingleUpdate(ctx: CQLContext)(
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = {

      val prepStmt = session.prepare(ctx.statements.head)

      var boundStmt =  prepStmt.bind()
      if (ctx.statements != Nil) {
        val params = processParameters(ctx.parameters.head)
        boundStmt = prepStmt.bind(params:_*)
      }

    ctx.consistency.foreach {consistency =>
      boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
      session.executeAsync(boundStmt).map(_.wasApplied())
  }

  def cqlMultiUpdate(ctx: CQLContext)(
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
    val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parameters
    var batch = new BatchStatement()
    commands.foreach { case (stm, params) =>
      val prepStmt = session.prepare(stm)
      if (params == Nil)
        batch.add(prepStmt.bind())
      else {
        val p = processParameters(params)
        batch.add(prepStmt.bind(p: _*))
      }
    }

    ctx.consistency.foreach {consistency =>
      batch.setConsistencyLevel(consistencyLevel(consistency))}
    session.executeAsync(batch).map(_.wasApplied())
  }

CassandraEngine update返回運算狀態Future[Boolean]。下麵是一段update示範:

  val createCQL ="""
  CREATE TABLE testdb.members (
    id UUID primary key,
    name TEXT,
    description TEXT,
    birthday DATE,
    created_at TIMESTAMP,
    picture BLOB
  )"""

  val ctxCreate = CQLContext().setCommand(createCQL)

  val ctxInsert = CQLContext().setCommand("insert into testdb.members(id,name,description,birthday,created_at,picture)" +
    " values(uuid(),?,?,?,?,?)", "alan xu", "alan-xu", CQLDate(1966, 11, 27), CQLDateTimeNow, cqlFileToBytes("/users/tiger/Nobody.png"))
  
  val createData = for {
    createTable <- cqlExecute(ctxCreate)
    insertData <- cqlExecute(ctxInsert)
  } yield(createTable, insertData)

  createData.onComplete {
    case Success((c,i)) => println(s"Create Table: $c, Insert Data $i")
    case Failure(e) => println(e.getMessage)
  }

在上面的例子里我們用for-comprehension實現了連續運算。註意在這個例子里已經包括了date,datetime,blob等輸入參數類型。

fetch-query的statement構建信息如下:

case class CQLQueryContext[M](
                       statement: String,
                       parameter: Seq[Object] = Nil,
                       consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
                       extractor: Row => M
                     )

fetch-query運算也是用execute方式實現的:

    /**
     * Executes the provided query.
     * <p/>
     * This method blocks until at least some result has been received from the
     * database. However, for SELECT queries, it does not guarantee that the
     * result has been received in full. But it does guarantee that some
     * response has been received from the database, and in particular
     * guarantees that if the request is invalid, an exception will be thrown
     * by this method.
     *
     * @param statement the CQL query to execute (that can be any {@link Statement}).
     * @return the result of the query. That result will never be null but can
     * be empty (and will be for any non SELECT query).
     * @throws NoHostAvailableException    if no host in the cluster can be
     *                                     contacted successfully to execute this query.
     * @throws QueryExecutionException     if the query triggered an execution
     *                                     exception, i.e. an exception thrown by Cassandra when it cannot execute
     *                                     the query with the requested consistency level successfully.
     * @throws QueryValidationException    if the query if invalid (syntax error,
     *                                     unauthorized or any other validation problem).
     * @throws UnsupportedFeatureException if the protocol version 1 is in use and
     *                                     a feature not supported has been used. Features that are not supported by
     *                                     the version protocol 1 include: BatchStatement, ResultSet paging and binary
     *                                     values in RegularStatement.
     */
    ResultSet execute(Statement statement);

返回結果ResultSet經過轉換後成為scala collection:

  def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A], pageSize: Int = 100)(
    implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= {

    val prepStmt = session.prepare(ctx.statement)

    var boundStmt =  prepStmt.bind()
    if (ctx.parameter != Nil) {
      val params = processParameters(ctx.parameter)
      boundStmt = prepStmt.bind(params:_*)
    }

    ctx.consistency.foreach {consistency =>
      boundStmt.setConsistencyLevel(consistencyLevel(consistency))}

    val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
    (resultSet,(resultSet.asScala.view.map(ctx.extractor)).to[C])

  }

fetchResultPage是分頁讀取的,可以用fetchMoreResults持續讀取:

    /**
     * Force fetching the next page of results for this result set, if any.
     * <p/>
     * This method is entirely optional. It will be called automatically while
     * the result set is consumed (through {@link #one}, {@link #all} or iteration)
     * when needed (i.e. when {@code getAvailableWithoutFetching() == 0} and
     * {@code isFullyFetched() == false}).
     * <p/>
     * You can however call this method manually to force the fetching of the
     * next page of results. This can allow to prefetch results before they are
     * strictly needed. For instance, if you want to prefetch the next page of
     * results as soon as there is less than 100 rows readily available in this
     * result set, you can do:
     * <pre>
     *   ResultSet rs = session.execute(...);
     *   Iterator<Row> iter = rs.iterator();
     *   while (iter.hasNext()) {
     *       if (rs.getAvailableWithoutFetching() == 100 && !rs.isFullyFetched())
     *           rs.fetchMoreResults();
     *       Row row = iter.next()
     *       ... process the row ...
     *   }
     * </pre>
     * This method is not blocking, so in the example above, the call to {@code
     * fetchMoreResults} will not block the processing of the 100 currently available
     * rows (but {@code iter.hasNext()} will block once those rows have been processed
     * until the fetch query returns, if it hasn't yet).
     * <p/>
     * Only one page of results (for a given result set) can be
     * fetched at any given time. If this method is called twice and the query
     * triggered by the first call has not returned yet when the second one is
     * performed, then the 2nd call will simply return a future on the currently
     * in progress query.
     *
     * @return a future on the completion of fetching the next page of results.
     * If the result set is already fully retrieved ({@code isFullyFetched() == true}),
     * then the returned future will return immediately but not particular error will be
     * thrown (you should thus call {@link #isFullyFetched()} to know if calling this
     * method can be of any use}).
     */
    ListenableFuture<S> fetchMoreResults();

下麵是分頁持續讀取的實現:

  def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)(
       extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) =
    if (resultSet.isFullyFetched) {
      (resultSet, None)
    } else {
      try {
        val result = Await.result(resultSet.fetchMoreResults(), timeOut)
        (result, Some((result.asScala.view.map(extractor)).to[C]))
      } catch { case e: Throwable => (resultSet, None) }
    }

我們用這兩個函數來讀取上面用cqlInsert腳本加入cassandra的數據:

  //data model
  case class Member(
                     id: String,
                     name: String,
                     description: Option[String] = None,
                     birthday: LocalDate,
                     createdAt: java.util.Date,
                     picture: Option[ByteBuffer] = None)

  //data row converter
  val toMember = (rs: Row) => Member(
    id = rs.getUUID("id").toString,
    name = rs.getString("name"),
    description = {
      val d = rs.getString("description")
      if (d == null)
        None
      else
        Some(d)

      Some(rs.getColumnDefinitions.toString)
    },
    birthday = rs.getDate("birthday"),
    createdAt = rs.getTimestamp("created_at"),
    picture = {
      val pic = rs.getBytes("picture")
      if (pic == null)
        None
      else
        Some(pic)

    }
  )

 try {
   val qtx = CQLQueryContext(statement = "select * from testdb.members", extractor = toMember)
   val (resultSet, vecResults) = fetchResultPage[Vector, Member](qtx)

   var vecMembers: Vector[Member] = vecResults

   var isExh = resultSet.isExhausted
   var nextPage: (ResultSet, Option[Vector[Member]]) = (resultSet, Some(vecResults))
   while (!isExh) {
     nextPage = fetchMorePages[Vector,Member](nextPage._1,1 second)(toMember)
     nextPage._2.foreach {vec =>
       vecMembers = vecMembers ++ vec
     }
     isExh = resultSet.isExhausted
   }
   vecMembers.foreach { m =>
     println(s"id: ${m.id}-name:${m.name}-${m.description} birthday: ${m.birthday.toString}")
     println(s"created_at: ${cqlDateTimeString(m.createdAt,"yyyy-MM-dd HH:mm:ss.SSS")}")
     m.picture match {
       case Some(buf) =>
         val fname = s"/users/tiger/pic-${m.name}.png"
         cqlBytesToFile(buf,fname)
         println(s"saving picture to $fname")
       case _ => println("empty picture!")
     }
   }
 } catch {
   case e: Exception => println(e.getMessage)
 }

在上面的示範里我們還引用了一些helper函數:

 def cqlFileToBytes(fileName: String): ByteBuffer = {
    val fis = new FileInputStream(fileName)
    val b = new Array[Byte](fis.available + 1)
    val length = b.length
    fis.read(b)
    ByteBuffer.wrap(b)
  }


  def cqlBytesToFile(bytes: ByteBuffer, fileName: String)(
        implicit mat: Materializer): Future[IOResult] = {
    val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes))
    source.runWith(FileIO.toPath(Paths.get(fileName)))
  }

  def cqlDateTimeString(date: java.util.Date, fmt: String): String = {
    val outputFormat = new java.text.SimpleDateFormat(fmt)
    outputFormat.format(date)
  }

  def useJava8DateTime(cluster: Cluster) = {
    //for jdk8 datetime format
    cluster.getConfiguration().getCodecRegistry()
      .register(InstantCodec.instance)
  }

還需要一個ByteBufferInputStream類型來實現blob內容的讀取:

 class ByteBufferInputStream(buf: ByteBuffer) extends InputStream {
    override def read: Int = {
      if (!buf.hasRemaining) return -1
      buf.get
    }

    override def read(bytes: Array[Byte], off: Int, len: Int): Int = {
      val length: Int = Math.min(len, buf.remaining)
      buf.get(bytes, off, length)
      length
    }
  }
  object ByteBufferInputStream {
    def apply(buf: ByteBuffer): ByteBufferInputStream = {
      new ByteBufferInputStream(buf)
    }
  }

下麵就是本次討論示範源代碼:

build.sbt

name := "learn_cassandra"

version := "0.1"

scalaVersion := "2.12.4"

libraryDependencies := Seq(
  "com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0",
  "com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0",
  "com.typesafe.akka" %% "akka-actor" % "2.5.4",
  "com.typesafe.akka" %% "akka-stream" % "2.5.4",
  "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.16",
  "org.scalikejdbc" %% "scalikejdbc"       % "3.2.1",
  "org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test",
  "org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1",
  "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",
  "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",
  "com.h2database"  %  "h2"                % "1.4.196",
  "mysql" % "mysql-connector-java" % "6.0.6",
  "org.postgresql" % "postgresql" % "42.2.0",
  "commons-dbcp" % "commons-dbcp" % "1.4",
  "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",
  "com.zaxxer" % "HikariCP" % "2.7.4",
  "com.jolbox" % "bonecp" % "0.8.0.RELEASE",
  "com.typesafe.slick" %% "slick" % "3.2.1",
  "ch.qos.logback"  %  "logback-classic"   % "1.2.3")

CassandraEngine.scala

import com.datastax.driver.core._
import scala.concurrent._
import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture}
import scala.collection.JavaConverters._
import scala.collection.generic.CanBuildFrom
import scala.concurrent.duration.Duration

object CQLContext {
  // Consistency Levels
  type CONSISTENCY_LEVEL = Int
  val ANY: CONSISTENCY_LEVEL          =                                        0x0000
  val ONE: CONSISTENCY_LEVEL          =                                        0x0001
  val TWO: CONSISTENCY_LEVEL          =                                        0x0002
  val THREE: CONSISTENCY_LEVEL        =                                        0x0003
  val QUORUM : CONSISTENCY_LEVEL      =                                        0x0004
  val ALL: CONSISTENCY_LEVEL          =                                        0x0005
  val LOCAL_QUORUM: CONSISTENCY_LEVEL =                                        0x0006
  val EACH_QUORUM: CONSISTENCY_LEVEL  =                                        0x0007
  val LOCAL_ONE: CONSISTENCY_LEVEL    =                                      0x000A
  val LOCAL_SERIAL: CONSISTENCY_LEVEL =                                     0x000B
  val SERIAL: CONSISTENCY_LEVEL       =                                      0x000C

  def apply(): CQLContext = CQLContext(statements = Nil)

  def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => {
    consistency match {
      case ALL => ConsistencyLevel.ALL
      case ONE => ConsistencyLevel.ONE
      case TWO => ConsistencyLevel.TWO
      case THREE => ConsistencyLevel.THREE
      case ANY => ConsistencyLevel.ANY
      case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM
      case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE
      case QUORUM => ConsistencyLevel.QUORUM
      case SERIAL => ConsistencyLevel.SERIAL
      case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL

    }
  }

}
case class CQLQueryContext[M](
                       statement: String,
                       parameter: Seq[Object] = Nil,
                       consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
                       extractor: Row => M
                     )

case class CQLContext(
                       statements: Seq[String],
                       parameters: Seq[Seq[Object]] = Nil,
                       consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None
                     ) { ctx =>

  def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext =
    ctx.copy(consistency = Some(_consistency))
  def setCommand(_statement: String, _parameters: Object*): CQLContext =
    ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters))
  def appendCommand(_statement: String, _parameters: Object*): CQLContext =
    ctx.copy(statements = ctx.statements :+ _statement,
      parameters = ctx.parameters ++ Seq(_parameters))
}

object CQLEngine {
  import CQLContext._
  import CQLHelpers._

  def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A], pageSize: Int = 100)(
    implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= {

    val prepStmt = session.prepare(ctx.statement)

    var boundStmt =  prepStmt.bind()
    if (ctx.parameter != Nil) {
      val params = processParameters(ctx.parameter)
      boundStmt = prepStmt.bind(params:_*)
    }

    ctx.consistency.foreach {consistency =>
      boundStmt.setConsistencyLevel(consistencyLevel(consistency))}

    val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
    (resultSet,(resultSet.asScala.view.map(ctx.extractor)).to[C])
  }
  def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)(
       extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) =
    if (resultSet.isFullyFetched) {
      (resultSet, None)
    } else {
      try {
        val result = Await.result(resultSet.fetchMoreResults(), timeOut)
        (result, Some((result.asScala.view.map(extractor)).to[C]))
      } catch { case e: Throwable => (resultSet, None) }
    }
  def cqlExecute(ctx: CQLContext)(
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
    if (ctx.statements.size == 1)
      cqlSingleUpdate(ctx)
    else
      cqlMultiUpdate(ctx)
  }
  def cqlSingleUpdate(ctx: CQLContext)(
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = {

      val prepStmt = session.prepare(ctx.statements.head)

      var boundStmt =  prepStmt.bind()
      if (ctx.statements != Nil) {
        val params = processParameters(ctx.parameters.head)
        boundStmt = prepStmt.bind(params:_*)
      }

    ctx.consistency.foreach {consistency =>
      boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
      session.executeAsync(boundStmt).map(_.wasApplied())
  }
  def cqlMultiUpdate(ctx: CQLContext)(
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
    val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parameters
    var batch = new BatchStatement()
    commands.foreach { case (stm, params) =>
      val prepStmt = session.prepare(stm)
      if (params == Nil)
        batch.add(prepStmt.bind())
      else {
        val p = processParameters(params)
        batch.add(prepStmt.bind(p: _*))
      }
    }
    ctx.consistency.foreach {consistency =>
      batch.setConsistencyLevel(consistencyLevel(consistency))}
    session.executeAsync(batch).map(_.wasApplied())
  }
}
object CQLHelpers {
  import java.nio.ByteBuffer
  import java.io._
  import java.nio.file._
  import com.datastax.driver.core.LocalDate
  import com.datastax.driver.extras.codecs.jdk8.InstantCodec
  import java.time.Instant
  import akka.stream.scaladsl._
  import akka.stream._

  implicit def listenableFutureToFuture[T](
               listenableFuture: ListenableFuture[T]): Future[T] = {
    val promise = Promise[T]()
    Futures.addCallback(listenableFuture, new FutureCallback[T] {
      def onFailure(error: Throwable): Unit = {
        promise.failure(error)
        ()
      }
      def onSuccess(result: T): Unit = {
        promise.success(result)
        ()
      }
    })
    promise.future
  }

  case class CQLDate(year: Int, month: Int, day: Int)
  case object CQLTodayDate
  case class CQLDateTime(year: Int, Month: Int,
                         day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0)
  case object CQLDateTimeNow

  def processParameters(params: Seq[Object]): Seq[Object] = {
    params.map { obj =>
      obj match {
        case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd)
        case CQLTodayDate =>
          val today = java.time.LocalDate.now()
          LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth)
        case CQLDateTimeNow => Instant.now()
        case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) =>
          Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d")
        case p@_ => p
      }
    }
  }
  class ByteBufferInputStream(buf: ByteBuffer) extends InputStream {
    override def read: Int = {
      if (!buf.hasRemaining) return -1
      buf.get
    }

    override def read(bytes: Array[Byte], off: Int, len: Int): Int = {
      val length: Int = Math.min(len, buf.remaining)
      buf.get(bytes, off, length)
      length
    }
  }
  object ByteBufferInputStream {
    def apply(buf: ByteBuffer): ByteBufferInputStream = {
      new ByteBufferInputStream(buf)
    }
  }
  class FixsizedByteBufferOutputStream(buf: ByteBuffer) extends OutputStream {

    override def write(b: Int): Unit = {
      buf.put(b.toByte)
    }

    override def write(bytes: Array[Byte], off: Int, len: Int): Unit = {
      buf.put(bytes, off, len)
    }
  }
  object FixsizedByteBufferOutputStream {
    def apply(buf: ByteBuffer) = new FixsizedByteBufferOutputStream(buf)
  }
  class ExpandingByteBufferOutputStream(var buf: ByteBuffer, onHeap: Boolean) extends OutputStream {

    private val increasing = ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR

    override def write(b: Array[Byte], off: Int, len: Int): Unit = {
      val position = buf.position
      val limit = buf.limit
      val newTotal: Long = position + len
      if(newTotal > limit){
        var capacity = (buf.capacity * increasing)
        while(capacity <= newTotal){
          capacity = (capacity*increasing)
        }
        increase(capacity.toInt)
      }

      buf.put(b, 0, len)
    }

    override def write(b: Int): Unit= {
      if (!buf.hasRemaining) increase((buf.capacity * increasing).toInt)
      buf.put(b.toByte)
    }
    protected def increase(newCapacity: Int): Unit = {
      buf.limit(buf.position)
      buf.rewind
      val newBuffer =
        if (onHeap) ByteBuffer.allocate(newCapacity)
        else  ByteBuffer.allocateDirect(newCapacity)
      newBuffer.put(buf)
      buf.clear
      buf = newBuffer
    }
    def size: Long = buf.position
    def capacity: Long = buf.capacity
    def byteBuffer: ByteBuffer = buf
  }
  object ExpandingByteBufferOutputStream {
    val DEFAULT_INCREASING_FACTOR = 1.5f
    def apply(size: Int, increasingBy: Float, onHeap: Boolean) = {
      if (increasingBy <= 1) throw new IllegalArgumentException("Increasing Factor must be greater than 1.0")
      val buffer: ByteBuffer =
        if (onHeap) ByteBuffer.allocate(size)
        else ByteBuffer.allocateDirect(size)
      new ExpandingByteBufferOutputStream(buffer,onHeap)
    }
    def apply(size: Int): ExpandingByteBufferOutputStream = {
      apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, false)
    }

    def apply(size: Int, onHeap: Boolean): ExpandingByteBufferOutputStream = {
      apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, onHeap)
    }

    def apply(size: Int, increasingBy: Float): ExpandingByteBufferOutputStream = {
      apply(size, increasingBy, false)
    }

  }
  def cqlFileToBytes(fileName: String): ByteBuffer = {
    val fis = new FileInputStream(fileName)
    val b = new Array[Byte](fis.available + 1)
    val length = b.length
    fis.read(b)
    ByteBuffer.wrap(b)
  }
  def cqlBytesToFile(bytes: ByteBuffer, fileName: String)(
        implicit mat: Materializer): Future[IOResult] = {
    val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes))
    source.runWith(FileIO.toPath(Paths.get(fileName)))
  }
  def cqlDateTimeString(date: java.util.Date, fmt: String): String = {
    val outputFormat = new java.text.SimpleDateFormat(fmt)
    outputFormat.format(date)
  }
  def useJava8DateTime(cluster: Cluster) = {
    //for jdk8 datetime format
    cluster.getConfiguration().getCodecRegistry()
      .register(InstantCodec.instance)
  }
}

CQLEngineDemo.scala

import scala.util._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.datastax.driver.core._
import CQLEngine._
import CQLHelpers._
import com.datastax.driver.core.LocalDate
import java.nio.ByteBuffer
import scala.concurrent.duration._

object CQLEngineDemo extends App {

  //#init-mat
  implicit val cqlsys = ActorSystem("cqlSystem")
  implicit val mat = ActorMaterializer()
  implicit val ec = cqlsys.dispatcher

  val cluster = new Cluster
  .Builder()
    .addContactPoints("localhost")
    .withPort(9042)
    .build()

  useJava8DateTime(cluster)
  implicit val session = cluster.connect()

  val createCQL ="""
  CREATE TABLE testdb.members (
    id UUID prim

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 這個項目最初其實是fork別人的項目。當初想接觸下mongodb資料庫,找個例子學習下,後來改著改著就面目全非了。後臺和資料庫重構,前端增加了登錄註冊功能,僅保留了博客設置頁面,但是也優化了。 "線上地址" 一、更新內容 0. 資料庫重新設計,改成以用戶分組的subDocs資料庫結構 0. 應資料庫 ...
  • 前言 進入自己github主頁會看到自己的提交記錄,如果某天沒有提交記錄,那天的小方框就顯示灰色。強迫症的我,每次進來看著就感覺不爽, 想著自己每天記得提交點東西,爭取像 "阮一峰" 大神一樣,每天都有提交記錄。 但是,畢竟是人,哪天忙了就會忘記提交,所以想著能不能實現在自己阿裡雲伺服器(linux ...
  • 關於元素居中的總結 包括:1.不使用定位 a.水平居中 b.垂直居中 c.其他居中 2.使用定位 a.父元素高寬固定 ... ...
  • 首先下載eCharts源代碼,然後可以按照官網的5分鐘上手ECharts教程做一個簡單的例子,這裡為了將前端顯示和後端邏輯分開,可以建一個index.html和一個繪製圖表的chartTest.js,代碼如下: js代碼如下: 通過上面的代碼就可以繪製出下麵這樣的一個簡單的圖表 其中xAxis和yA ...
  • 作為軟體開發人員,我們已知道思考如何將應用程式因數分解成組件部分。 這是對象導向、軟體抽象和組件化的中心模式。 現在,這種因數分解往往以共用庫和技術層之間的類與介面呈現。 通常採用一種分層方法,有後端存儲、中間層業務邏輯和前端用戶界面 (UI)。 過去幾年來的變化是身為開發人員的我們,開始為業務驅動 ...
  • 一致性演算法 是分散式系統中最重要的問題之一。錶面上看,這似乎很簡單,只是讓幾個節點在某些方面達成一致。在本篇之中,會帶大家完整的梳理分散式系統之中的共識演算法,來更加深刻的理解分散式系統的設計。 1.原子提交和兩階段提交(2PC) 原子提交防止了資料庫處於半更新的狀態,這對於需要滿足多對象事務和維護次 ...
  • 該模塊作用是完成Python數值和C語言結構體的Python字元串形式間的轉換。這可以用於處理存儲在文件中或從網路連接中存儲的二進位數據,以及其他數據源。 用途: 在Python基本數據類型和二進位數據之間進行轉換 模塊提供了用於在位元組字元串和Python原生數據類型之間轉換函數,比如數字和字元串。 ...
  • Time Limit: 4000/2000 MS (Java/Others) Memory Limit: 32768/32768 K (Java/Others)Total Submission(s): 5934 Accepted Submission(s): 1845 Problem Descrip ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...