我一直在不斷的提示大家:FP就是Monadic Programming,是一種特殊的編程風格。在我們熟悉的資料庫編程領域能不能實現FP風格呢?我們先設計一些示範例子來分析一下慣用的資料庫編程過程: 我這裡模擬了一個培訓學校內的一些業務。上面設計的是一些基本函數,可以分別對學員、導師、座位進行查詢和更 ...
我一直在不斷的提示大家:FP就是Monadic Programming,是一種特殊的編程風格。在我們熟悉的資料庫編程領域能不能實現FP風格呢?我們先設計一些示範例子來分析一下慣用的資料庫編程過程:
1 import scalaz._
2 import Scalaz._
3 import scala.language.higherKinds
4 import scala.language.implicitConversions
5 import com.jolbox.bonecp.BoneCP
6 import com.jolbox.bonecp.BoneCPConfig
7 import java.sql.Connection
8 import java.sql.ResultSet
9
10 object freedbtxns {
11 def getTutorId(courseId: Int, conn: Connection): Int = {
12 val sqlString = "select TUTOR from COURSES where ID=" + courseId
13 conn.createStatement().executeQuery(sqlString).getInt("ID")
14 }
15 def getTutorPay(courseId: Int, conn: Connection): Double = {
16 val sqlString = "select PAYAMT from COURSES where ID=" + courseId
17 conn.createStatement().executeQuery(sqlString).getDouble("PAYAMT")
18 }
19 def getStudentFee(courseId: Int, conn: Connection): Double = {
20 val sqlString = "select FEEAMT from COURSES where ID=" + courseId
21 conn.createStatement().executeQuery(sqlString).getDouble("FEEAMT")
22 }
23 def updateTutorPay(tutorId: Int, plusAmt: Double, conn: Connection): Unit = {
24 val sqlString = "update TUTORS set PAYABLE = PAYABLE+"+plusAmt.toString + " where ID=" + tutorId
25 conn.createStatement().executeUpdate(sqlString)
26 }
27 def updateStudentFee(studentId: Int, plusAmt: Double, conn: Connection): Unit = {
28 val sqlString = "update STUDENTS set DUEAMT = DUEAMT+"+plusAmt.toString + " where ID=" + studentId
29 conn.createStatement().executeUpdate(sqlString)
30 }
31 def findEmptySeat(courseId: Int, conn: Connection): Int = {
32 val sqlString = "select ID from SEATS where OCCUPIED='T' AND ID=" + courseId
33 conn.createStatement().executeQuery(sqlString).getInt("ID")
34 }
35 def updateSeatsStatus(seatId: Int, taken: Boolean, conn: Connection): Unit = {
36 val sqlString = "update SEATS set OCCUPIED ='"+taken.toString.toUpperCase.head + "' where ID=" + seatId
37 conn.createStatement().executeUpdate(sqlString)
38 }
我這裡模擬了一個培訓學校內的一些業務。上面設計的是一些基本函數,可以分別對學員、導師、座位進行查詢和更新。如果我們需要把更新工作放入事務處理(transaction)內的話我們可以這樣做:
1 def updateStudent(studentId: Int, courseId: Int): Unit = {
2 val config = new BoneCPConfig()
3 val bonecp = new BoneCP(config)
4 val conn = bonecp.getConnection()
5 conn.setReadOnly(false)
6 conn.setAutoCommit(false)
7 conn.rollback()
8 try {
9 val fee = getStudentFee(courseId, conn)
10 updateStudentFee(studentId,fee, conn)
11 conn.commit()
12 } catch {
13 case (e:Exception) => conn.rollback()
14 } finally {
15 conn.close()
16 }
17 }
18 def updateStudentAndSeat(studentId: Int, courseId: Int): Unit = {
19 val config = new BoneCPConfig()
20 val bonecp = new BoneCP(config)
21 val conn = bonecp.getConnection()
22 conn.setReadOnly(false)
23 conn.setAutoCommit(false)
24 conn.rollback()
25 try {
26 val fee = getStudentFee(courseId, conn)
27 updateStudentFee(studentId,fee, conn)
28 val seatId = findEmptySeat(courseId, conn)
29 updateSeatsStatus(seatId, true, conn)
30 conn.commit()
31 } catch {
32 case (e:Exception) => conn.rollback()
33 } finally {
34 conn.close()
35 }
36 }
馬上可以發現在我們對這些函數在事務處理內進行組合使用時我們必須重新對事務處理進行設置,無法實現真正意義上的函數組合。如果我們認可FP風格的話,這裡起碼有兩項弊處:一是源代碼增加了大量的鋪墊(boilerplate code),重覆事務處理設置、二是每個更新函數都會產生副作用,換句話說就是這裡那裡都會有副作用影響,很難控制,這樣就增加了程式的複雜程度,造成代碼分析的困難。
我們希望達到的目標:
1 /*
2 def updateStudentAndSeat(studentId: Int): program {
3 // findEmptySeat
4 // updateStudentFee
5 // updateSeatStatus
6 }
7
8 def runDBTxn(prg: program) {
9 //conn= getConnection
10 //try
11 // run(pre)
12 //commit
13 //catch
14 //rollback
15 }
16 runDBTxn(updateStudent)
17 runDBTxn(updateStudentAndSeat)
18 runDBTxn(updateSeatStatus)
19 */
我們只在一個地方設置和運行事務處理。我們希望能把不同的program傳入runDBTxn去運算。這不就是Free Monad的編程、運算關註分離模式嘛。那我們就試著用Free Monad來提供資料庫事務處理支持。按上篇討論的設計流程我們先設計ADT:
1 case class SqlOp[A](run: Connection => A)
模擬sql指令很簡單,兩種情況:query或者update。兩者都可以用函數run表示:傳入Connection,返回結果A,A有可能是Unit。要成為Free Monad就必須先獲取SqlOp的Functor實例:
1 case class SqlOp[A](run: Connection => A)
2 implicit val sqlOpFunctor = new Functor[SqlOp] {
3 def map[A,B](sa: SqlOp[A])(f: A => B): SqlOp[B] =
4 SqlOp{ (conn: Connection) => f(sa.run(conn)) }
5 }
基本功能的sql操作函數及升格Free:
1 type Sql[A] = Free[SqlOp,A]
2 def getTutorId(courseId: Int): Sql[Int] =
3 Free.liftF(SqlOp{
4 (conn: Connection) => {
5 val sqlString = "select TUTOR from COURSES where ID=" + courseId
6 conn.createStatement().executeQuery(sqlString).getInt("ID")
7 }
8 })
9
10 def getTutorPay(courseId: Int): Sql[Double] =
11 Free.liftF(SqlOp{
12 (conn: Connection) => {
13 val sqlString = "select PAYAMT from COURSES where ID=" + courseId
14 conn.createStatement().executeQuery(sqlString).getDouble("PAYAMT")
15 }
16 })
17 def getStudentFee(courseId: Int): Sql[Double] =
18 Free.liftF(SqlOp{
19 (conn: Connection) => {
20 val sqlString = "select FEEAMT from COURSES where ID=" + courseId
21 conn.createStatement().executeQuery(sqlString).getDouble("FEEAMT")
22 }
23 })
24 def updateTutorPay(tutorId: Int, plusAmt: Double): Sql[Unit] =
25 Free.liftF(SqlOp{
26 (conn: Connection) => {
27 val sqlString = "update TUTORS set PAYABLE = PAYABLE+"+plusAmt.toString + " where ID=" + tutorId
28 conn.createStatement().executeUpdate(sqlString)
29 }
30 })
31 def updateStudentFee(studentId: Int, plusAmt: Double): Sql[Unit] =
32 Free.liftF(SqlOp{
33 (conn: Connection) => {
34 val sqlString = "update STUDENTS set DUEAMT = DUEAMT+"+plusAmt.toString + " where ID=" + studentId
35 conn.createStatement().executeUpdate(sqlString)
36 }
37 })
38 def findEmptySeat(courseId: Int): Sql[Int] =
39 Free.liftF(SqlOp{
40 (conn: Connection) => {
41 val sqlString = "select ID from SEATS where OCCUPIED='T' AND ID=" + courseId
42 conn.createStatement().executeQuery(sqlString).getInt("ID")
43 }
44 })
45 def updateSeatsStatus(seatId: Int, taken: Boolean): Sql[Unit] =
46 Free.liftF(SqlOp{
47 (conn: Connection) => {
48 val sqlString = "update SEATS set OCCUPIED ='"+taken.toString.toUpperCase.head + "' where ID=" + seatId
49 conn.createStatement().executeUpdate(sqlString)
50 }
51 })
我們現在可以用這些升格成Free的函數來建設AST示範例子:
1 def takeSeat(courseId: Int): Sql[Unit] = for {
2 emptySeat <- findEmptySeat(courseId)
3 _ <- updateSeatsStatus(emptySeat, true)
4 } yield()
5 def addCourse(studentId: Int, courseId: Int): Sql[Unit] = for {
6 fee <- getStudentFee(courseId)
7 pay <- getTutorPay(courseId)
8 tutorId <- getTutorId(courseId)
9 _ <- updateStudentFee(studentId, fee)
10 _ <- updateTutorPay(tutorId, pay)
11 _ <- takeSeat(courseId)
12 } yield()
addCourse對基本函數進行了組合,又調用了已經組合過一次的takeSeat,證明AST可以實現高度的函數組合。
下麵示範實現相關的Interpreter:
1 def runTransactionImpl[A](conn: Connection, ast: Sql[A]): A =
2 ast.resume.fold ({
3 case x: SqlOp[Sql[A]] => runTransactionImpl(conn, x.run(conn))
4 },
5 (a: A) => a
6 )
我們需要一個通用的事務處理方法:
1 def runTransaction[A](ast: Sql[A]): Exception \/ A = {
2 val config = new BoneCPConfig()
3 val bonecp = new BoneCP(config)
4 val conn = bonecp.getConnection()
5 conn.setReadOnly(false)
6 conn.setAutoCommit(false)
7 conn.rollback()
8 try {
9 val result: A = runTransactionImpl(conn, ast)
10 result.right[Exception]
11 } catch {
12 case e: Exception => e.left[A]
13 } finally {
14 conn.close
15 }
16 }
這樣,我們可以在一個地方使用事務處理來運算任何事先設計的AST。下麵是這個用Free來實現FP風格資料庫事務處理的完整示範代碼:
1 import scalaz._
2 import Scalaz._
3 import scala.language.higherKinds
4 import scala.language.implicitConversions
5 import com.jolbox.bonecp.BoneCP
6 import com.jolbox.bonecp.BoneCPConfig
7 import java.sql.Connection
8 import java.sql.ResultSet
9
10 object freedbtxns {
11
12 case class SqlOp[A](run: Connection => A)
13 implicit val sqlOpFunctor = new Functor[SqlOp] {
14 def map[A,B](sa: SqlOp[A])(f: A => B): SqlOp[B] =
15 SqlOp{ (conn: Connection) => f(sa.run(conn)) }
16 }
17 type Sql[A] = Free[SqlOp,A]
18 def getTutorId(courseId: Int): Sql[Int] =
19 Free.liftF(SqlOp{
20 (conn: Connection) => {
21 val sqlString = "select TUTOR from COURSES where ID=" + courseId
22 conn.createStatement().executeQuery(sqlString).getInt("ID")
23 }
24 })
25
26 def getTutorPay(courseId: Int): Sql[Double] =
27 Free.liftF(SqlOp{
28 (conn: Connection) => {
29 val sqlString = "select PAYAMT from COURSES where ID=" + courseId
30 conn.createStatement().executeQuery(sqlString).getDouble("PAYAMT")
31 }
32 })
33 def getStudentFee(courseId: Int): Sql[Double] =
34 Free.liftF(SqlOp{
35 (conn: Connection) => {
36 val sqlString = "select FEEAMT from COURSES where ID=" + courseId
37 conn.createStatement().executeQuery(sqlString).getDouble("FEEAMT")
38 }
39 })
40 def updateTutorPay(tutorId: Int, plusAmt: Double): Sql[Unit] =
41 Free.liftF(SqlOp{
42 (conn: Connection) => {
43 val sqlString = "update TUTORS set PAYABLE = PAYABLE+"+plusAmt.toString + " where ID=" + tutorId
44 conn.createStatement().executeUpdate(sqlString)
45 }
46 })
47 def updateStudentFee(studentId: Int, plusAmt: Double): Sql[Unit] =
48 Free.liftF(SqlOp{
49 (conn: Connection) => {
50 val sqlString = "update STUDENTS set DUEAMT = DUEAMT+"+plusAmt.toString + " where ID=" + studentId
51 conn.createStatement().executeUpdate(sqlString)
52 }
53 })
54 def findEmptySeat(courseId: Int): Sql[Int] =
55 Free.liftF(SqlOp{
56 (conn: Connection) => {
57 val sqlString = "select ID from SEATS where OCCUPIED='T' AND ID=" + courseId
58 conn.createStatement().executeQuery(sqlString).getInt("ID")
59 }
60 })
61 def updateSeatsStatus(seatId: Int, taken: Boolean): Sql[Unit] =
62 Free.liftF(SqlOp{
63 (conn: Connection) => {
64 val sqlString = "update SEATS set OCCUPIED ='"+taken.toString.toUpperCase.head + "' where ID=" + seatId
65 conn.createStatement().executeUpdate(sqlString)
66 }
67 })
68
69 def takeSeat(courseId: Int): Sql[Unit] = for {
70 emptySeat <- findEmptySeat(courseId)
71 _ <- updateSeatsStatus(emptySeat, true)
72 } yield()
73 def addCourse(studentId: Int, courseId: Int): Sql[Unit] = for {
74 fee <- getStudentFee(courseId)
75 pay <- getTutorPay(courseId)
76 tutorId <- getTutorId(courseId)
77 _ <- updateStudentFee(studentId, fee)
78 _ <- updateTutorPay(tutorId, pay)
79 _ <- takeSeat(courseId)
80 } yield()
81
82 def runTransactionImpl[A](conn: Connection, ast: Sql[A]): A =
83 ast.resume.fold ({
84 case x: SqlOp[Sql[A]] => runTransactionImpl(conn, x.run(conn))
85 },
86 (a: A) => a
87 )
88 def runTransaction[A](ast: Sql[A]): Exception \/ A = {
89 val config = new BoneCPConfig()
90 val bonecp = new BoneCP(config)
91 val conn = bonecp.getConnection()
92 conn.setReadOnly(false)
93 conn.setAutoCommit(false)
94 conn.rollback()
95 try {
96 val result: A = runTransactionImpl(conn, ast)
97 result.right[Exception]
98 } catch {
99 case e: Exception => e.left[A]
100 } finally {
101 conn.close
102 }
103 }
104 }