打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
XA 分布式事务原理

概念


XA是由X/Open组织提出的分布式事务的规范。 XA规范主要定义了(全局)事务管理器(TM)和(局 部)资源管理器(RM)之间的接口。主流的关系型 数据库产品都是实现了XA接口的。
 XA接口是双向的系统接口,在事务管理器 (TM)以及一个或多个资源管理器(RM)之 间形成通信桥梁。
 XA之所以需要引入事务管理器是因为,在分布 式系统中,从理论上讲两台机器理论上无法达 到一致的状态,需要引入一个单点进行协调。
 由全局事务管理器管理和协调的事务,可以跨 越多个资源(如数据库或JMS队列)和进程。 全局事务管理器一般使用 XA 二阶段提交协议 与数据库进行交互。

       资源管理器(resource manager):用来管理系统资源,是通向事务资源的途径。数据库就是一种资源管理器。资源管理还应该具有管理事务提交或回滚的能力。
事务管理器(transaction manager):事务管理器是分布式事务的核心管理者。事务管理器与每个资源管理器(resource manager)进行通信,协调并完成事务的处理。事务的各个分支由唯一命名进行标识
Xid 接口 Xid, Xid 接口是 X/Open 事务标识符 XID 结构的 Java 映射。此接口指定三个访问器方法,以检索全局事务格式 ID、全局事务 ID 和分支限定符。Xid 接口供事务管理器和资源管理器使用。此接口对应用程序不可见。

XA 不能自动提交。

分段提交

XA需要两阶段提交: prepare 和 commit. 
第一阶段为 准备(prepare)阶段。即所有的参与者准备执行事务并锁住需要的资源。参与者ready时,向transaction manager报告已准备就绪。 

第二阶段为提交阶段(commit)。当transaction manager确认所有参与者都ready后,向所有参与者发送commit命令。 

假设有两个Connection, con1, con2, 大体的过程如下 .

  1. con1 = XAResouce1.getConnection...
  2. con2 = XAResouce2.getConnection...

  3. con1 do some thing.
  4. con2 do some thing.
  5. after they finish.

  6. pre1 = XAResouce1.prepare();
  7. pre2 = XAResouce2.prepare();

  8. if( both pre1 and pre2 are OK){
  9. XAResouce1 and 2 commit
  10. }else {
  11. XAResouce1 and 2 rollback
  12. }

事务协调/管理者

因为XA 事务是基于两阶段提交协议的,所以需要有一个事务协调者(transaction manager)来保证所有的事务参与者都完成了准备工作(第一阶段)。如果事务协调者(transaction manager)收到所有参与者都准备好的消息,就会通知所有的事务都可以提交了(第二阶段)。MySQL 在这个XA事务中扮演的是参与者的角色,而不是事务协调者(transaction manager)。

测试用例

  1. import com.alibaba.druid.pool.xa.DruidXADataSource;
  2. import com.mysql.jdbc.jdbc2.optional.MysqlXid;

  3. import javax.sql.XAConnection;
  4. import javax.transaction.xa.XAException;
  5. import javax.transaction.xa.XAResource;
  6. import javax.transaction.xa.Xid;
  7. import java.io.IOException;
  8. import java.sql.Connection;
  9. import java.sql.SQLException;
  10. import java.sql.Statement;
  11. import java.util.Properties;


  12. class DistributeTransaction {

  13. private Properties props;
  14. private String propertyfile = "jdbc.properties";

  15. private String sql_1 = "delete from test3 where pk_t=3;";
  16. private String sql_2 = "INSERT INTO test(name) VALUES('tyz');";

  17. DistributeTransaction() {
  18. Connection connection_1 = null;
  19. Connection connection_2 = null;
  20. DruidXADataSource xaDataSource_1 = null;
  21. DruidXADataSource xaDataSource_2 = null;
  22. Xid xid_1 = null;
  23. Xid xid_2 = null;
  24. XAConnection xaConnection_1 = null;
  25. XAConnection xaConnection_2 = null;
  26. XAResource xaResource_1 = null;
  27. XAResource xaResource_2 = null;

  28. try {
  29. props = new Properties();
  30. props.load(getClass().getResourceAsStream(propertyfile));
  31. } catch (IOException io) {
  32. System.err.println("Error while accessing the properties file (" + propertyfile + "). Abort.");
  33. System.exit(1);
  34. }

  35. DruidXADataSource[] xaDataSources = initXADataSource();
  36. xaDataSource_1 = xaDataSources[0];
  37. xaDataSource_2 = xaDataSources[1];

  38. XAConnection[] xaConnections = initXAConnection(xaDataSource_1, xaDataSource_2);
  39. xaConnection_1 = xaConnections[0];
  40. xaConnection_2 = xaConnections[1];

  41. xaResource_1 = initXAResource(xaConnection_1);
  42. xaResource_2 = initXAResource(xaConnection_2);

  43. connection_1 = getDatabaseConnection(xaConnection_1);
  44. connection_2 = getDatabaseConnection(xaConnection_2);

  45. // create a separate branch for a common transaction
  46. Xid[] xids = createXID();
  47. xid_1 = xids[0];
  48. xid_2 = xids[1];

  49. try {
  50. execBranch(connection_1, xaResource_1, xid_1, sql_1);
  51. execBranch(connection_2, xaResource_2, xid_2, sql_2);

  52. if (prepareCommit(xaResource_1, xid_1) == XAResource.XA_OK &&
  53. prepareCommit(xaResource_2, xid_2) == XAResource.XA_OK) {
  54. commitBranch(xaResource_1, xid_1);
  55. commitBranch(xaResource_2, xid_2);
  56. } else {
  57. throw new RuntimeException();
  58. }
  59. } catch (Exception e) {
  60. rollbackBranch(xaResource_1, xid_1);
  61. rollbackBranch(xaResource_2, xid_2);
  62. }
  63. }

  64. DruidXADataSource[] initXADataSource() {
  65. System.out.print("Create a XADataSource_1 data source: ");
  66. DruidXADataSource xaDataSource_1 = new DruidXADataSource();
  67. xaDataSource_1.setDbType(props.getProperty("db1.dbtype"));
  68. xaDataSource_1.setUrl(props.getProperty("db1.url"));
  69. xaDataSource_1.setUsername(props.getProperty("db1.username"));
  70. xaDataSource_1.setPassword(props.getProperty("db1.password"));
  71. System.out.println("Okay.");

  72. System.out.print("Create a XADataSource_2 data source: ");
  73. DruidXADataSource xaDataSource_2 = new DruidXADataSource();
  74. xaDataSource_2.setDbType(props.getProperty("db2.dbtype"));
  75. xaDataSource_2.setUrl(props.getProperty("db2.url"));
  76. xaDataSource_2.setUsername(props.getProperty("db2.username"));
  77. xaDataSource_2.setPassword(props.getProperty("db2.password"));
  78. System.out.println("Okay.");
  79. return new DruidXADataSource[]{xaDataSource_1, xaDataSource_2};
  80. }

  81. XAConnection[] initXAConnection(DruidXADataSource xaDataSource_1, DruidXADataSource xaDataSource_2) {
  82. XAConnection xaconn_1 = null;
  83. XAConnection xaconn_2 = null;
  84. try {
  85. System.out.print("Set up DB_1 XA connection: ");
  86. xaconn_1 = xaDataSource_1.getXAConnection();
  87. System.out.println("Okay.");

  88. System.out.print("Set up DB_2 XA connection: ");
  89. xaconn_2 = xaDataSource_2.getXAConnection();
  90. System.out.println("Okay.");
  91. } catch (SQLException e) {
  92. sqlerr(e);
  93. }
  94. return new XAConnection[]{xaconn_1, xaconn_2};
  95. }

  96. XAResource initXAResource(XAConnection xacon) {
  97. XAResource xares = null;
  98. try {
  99. System.out.print("Setting up a XA resource: ");
  100. xares = xacon.getXAResource();
  101. System.out.println("Okay.");
  102. } catch (SQLException e) {
  103. sqlerr(e);
  104. }
  105. return xares;
  106. }

  107. Connection getDatabaseConnection(XAConnection xacon) {
  108. Connection con = null;
  109. try {
  110. System.out.print("Establish database connection: ");
  111. con = xacon.getConnection();
  112. con.setAutoCommit(false);
  113. System.out.println("Okay.");
  114. } catch (SQLException e) {
  115. sqlerr(e);
  116. }
  117. return con;
  118. }

  119. Xid[] createXID() {
  120. Xid xid_1 = null;
  121. byte[] gid_1 = new byte[1];
  122. byte[] bid_1 = new byte[1];
  123. gid_1[0] = (Byte.decode(props.getProperty("xid.global"))).byteValue();
  124. bid_1[0] = (Byte.decode(props.getProperty("xid.branch.db_1"))).byteValue();
  125. System.out.print("Creating an XID (" + Byte.toString(gid_1[0]) + ", " + Byte.toString(bid_1[0]) + ") for DB_1: ");
  126. xid_1 = new MysqlXid(gid_1, bid_1, 0);
  127. System.out.println("Okay.");

  128. Xid xid_2 = null;
  129. byte[] gid_2 = new byte[1];
  130. byte[] bid_2 = new byte[1];
  131. gid_2[0] = (Byte.decode(props.getProperty("xid.global"))).byteValue();
  132. bid_2[0] = (Byte.decode(props.getProperty("xid.branch.db_2"))).byteValue();
  133. System.out.print("Creating an XID (" + Byte.toString(gid_2[0]) + ", " + Byte.toString(bid_2[0]) + ") for DB_2: ");
  134. xid_2 = new MysqlXid(gid_2, bid_2, 0);
  135. System.out.println("Okay.");
  136. return new Xid[]{xid_1, xid_2};
  137. }

  138. void execBranch(Connection con, XAResource xares, Xid xid, String sql) {
  139. try {
  140. xares.start(xid, XAResource.TMNOFLAGS);
  141. Statement stmt = con.createStatement();
  142. stmt.executeUpdate(sql);
  143. xares.end(xid, XAResource.TMSUCCESS);
  144. } catch (XAException e) {
  145. System.err.println("XA exception caught:");
  146. System.err.println("Cause : " + e.getCause());
  147. System.err.println("Message: " + e.getMessage());
  148. e.printStackTrace();
  149. throw new RuntimeException(e);
  150. } catch (SQLException e) {
  151. sqlerr(e);
  152. throw new RuntimeException(e);
  153. }
  154. }

  155. int prepareCommit(XAResource xares, Xid xid) {
  156. int rc = 0;
  157. System.out.print("Prepare XA branch (" +
  158. Byte.toString((xid.getGlobalTransactionId())[0]) + ", " +
  159. Byte.toString((xid.getBranchQualifier())[0]) + "): ");
  160. try {
  161. xares.prepare(xid);
  162. } catch (XAException e) {
  163. xaerr(e);
  164. throw new RuntimeException(e);
  165. }
  166. System.out.println("Okay.");
  167. return rc;
  168. }

  169. void commitBranch(XAResource xares, Xid xid) {
  170. System.out.print("Commit XA branch (" +
  171. Byte.toString((xid.getGlobalTransactionId())[0]) + ", " +
  172. Byte.toString((xid.getBranchQualifier())[0]) + "): ");
  173. try {
  174. // second parameter is 'false' since we have a two phase commit
  175. xares.commit(xid, false);
  176. } catch (XAException e) {
  177. xaerr(e);
  178. throw new RuntimeException(e);
  179. }
  180. System.out.println("Okay.");
  181. }

  182. void rollbackBranch(XAResource xares, Xid xid) {
  183. System.out.print("Rollback XA branch (" +
  184. Byte.toString((xid.getGlobalTransactionId())[0]) + ", " +
  185. Byte.toString((xid.getBranchQualifier())[0]) + "): ");
  186. try {
  187. xares.rollback(xid);
  188. } catch (XAException e) {
  189. xaerr(e);
  190. throw new RuntimeException(e);
  191. }
  192. System.out.println("Okay.");
  193. }

  194. void sqlerr(SQLException exception) {
  195. System.err.println("FAILED.");
  196. while (exception != null) {
  197. System.err.println("==> SQL Exception caught");
  198. System.err.println("--> SQLCODE : " + exception.getErrorCode());
  199. System.err.println("--> SQLSTATE: " + exception.getSQLState());
  200. System.err.println("--> Message : " + exception.getMessage());
  201. exception = exception.getNextException();
  202. }
  203. }

  204. void xaerr(XAException exception) {
  205. System.err.println("FAILED.");
  206. System.err.println("==> XA Exception caught");
  207. System.err.println("--> Cause : " + exception.getCause());
  208. System.err.println("--> Message: " + exception.getMessage());
  209. exception.printStackTrace();
  210. }

  211. public static void main (String args[]) {
  212. new DistributeTransaction();
  213. }

  214. }

XA性能局限性

效率低下,准备阶段的成本持久,全局事务状态的成本持久,性能与本地事务相差10倍左右;
提交前,出现故障难以恢复和隔离问题。

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
JTA初级研究之JTA和JDBC事务
Java事务处理总结
分布式事务
mysql 对XA事务的支持
分布式事务之解决方案(XA和2PC)
深入解读微服务架构下分布式事务解决方案
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服