diff --git a/cpo-core/src/main/java/org/synchronoss/cpo/jta/CpoBaseXaResource.java b/cpo-core/src/main/java/org/synchronoss/cpo/jta/CpoBaseXaResource.java index ce3df4dad..e62789eb0 100644 --- a/cpo-core/src/main/java/org/synchronoss/cpo/jta/CpoBaseXaResource.java +++ b/cpo-core/src/main/java/org/synchronoss/cpo/jta/CpoBaseXaResource.java @@ -22,25 +22,36 @@ * ]] */ -import java.util.ArrayList; -import java.util.HashMap; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; +import java.util.function.Consumer; +import java.util.function.Function; import javax.transaction.xa.XAException; import javax.transaction.xa.Xid; +import org.synchronoss.cpo.CpoException; /** Created by dberry on 3/9/15. */ -public abstract class CpoBaseXaResource implements CpoXaResource { +public abstract class CpoBaseXaResource implements CpoXaResource { - // Mutex used for assigning the statemap for the class - private static final String XA_STATEMAP_MUTEX = "XA_STATEMAP_MUTEX"; + /** + * Each XAResource, the class that extends this, needs to see the XID state for all similar + * resources. So a StringBuilderXaResource needs to see all the XIDs that are in play with other + * StringBuilderXaResource instances + */ + private static final ConcurrentHashMap>> + cpoXaResourceStateMap = new ConcurrentHashMap<>(); + + private volatile Xid associatedXid = null; - private static final HashMap> cpoXaStateMapMap = new HashMap<>(); + private final T localResource; - private T localResource = null; + private final Semaphore semaphore = new Semaphore(1); - private final CpoXaStateMap cpoXaStateMap = getCpoXaStateMap(); + private final ConcurrentHashMap> xidStateMap = getXidStateMap(); public CpoBaseXaResource(T localResource) { - this.localResource = localResource; + this.localResource = Objects.requireNonNull(localResource); } protected abstract void prepareResource(T xaResource) throws XAException; @@ -53,23 +64,37 @@ public CpoBaseXaResource(T localResource) { protected abstract void closeResource(T xaResource) throws XAException; - protected T getCurrentResource() { - synchronized (cpoXaStateMap) { - T currentResource = null; - Xid associatedXid = cpoXaStateMap.getXaResourceMap().get(this); - if (associatedXid != null) { - CpoXaState cpoXaState = cpoXaStateMap.getXidStateMap().get(associatedXid); - currentResource = cpoXaState.getResource(); - } - if (currentResource == null) { - currentResource = localResource; - } - return currentResource; + private T getResource() { + @SuppressWarnings("unchecked") + CpoXaState cpoXaState = + associatedXid == null ? null : (CpoXaState) xidStateMap.get(associatedXid); + return cpoXaState == null + ? localResource + : cpoXaState.getResource() == null ? localResource : cpoXaState.getResource(); + } + + public R apply(Function function) throws CpoException { + semaphore.acquireUninterruptibly(); + try { + return function.apply(getResource()); + } catch (RuntimeException e) { + if (e.getCause() instanceof CpoException) throw (CpoException) e.getCause(); + throw e; + } finally { + semaphore.release(); } } - protected T getLocalResource() { - return localResource; + public void accept(Consumer consumer) throws CpoException { + semaphore.acquireUninterruptibly(); + try { + consumer.accept(getResource()); + } catch (RuntimeException e) { + if (e.getCause() instanceof CpoException) throw (CpoException) e.getCause(); + throw e; + } finally { + semaphore.release(); + } } /** @@ -78,7 +103,6 @@ protected T getLocalResource() { * @throws XAException - */ public void closeAssociated() throws XAException { - Xid associatedXid = cpoXaStateMap.getXaResourceMap().get(this); if (associatedXid != null) { close(associatedXid); } @@ -90,21 +114,32 @@ public void closeAssociated() throws XAException { * @param xid of the global transaction * @throws XAException An error has occurred. */ + @SuppressWarnings("unchecked") public void close(Xid xid) throws XAException { - synchronized (cpoXaStateMap) { - CpoXaState cpoXaState = cpoXaStateMap.getXidStateMap().get(xid); - - if (cpoXaState == null) - throw CpoXaError.createXAException(CpoXaError.XAER_NOTA, "Unknown XID"); - - // close the resource - closeResource(cpoXaState.getResource()); - - // unassociate - cpoXaStateMap.getXaResourceMap().remove(cpoXaState.getAssignedResourceManager()); - - // remove the xid reference - cpoXaStateMap.getXidStateMap().remove(xid); + try { + xidStateMap.compute( + xid, + (k, cpoXaState) -> { + if (cpoXaState == null) + throw new RuntimeException( + CpoXaError.createXAException(CpoXaError.XAER_NOTA, "Unknown XID")); + + // unassociate + if (cpoXaState.getAssignedResourceManager() != null) { + cpoXaState.getAssignedResourceManager().associatedXid = null; + } + + try { + closeResource((T) cpoXaState.getResource()); + } catch (XAException e) { + throw new RuntimeException(e); + } + + return null; + }); + } catch (RuntimeException e) { + if (e.getCause() instanceof XAException) throw (XAException) e.getCause(); + throw e; } } @@ -121,30 +156,41 @@ public void close(Xid xid) throws XAException { * resource manager has rolled back the branch's work and has released all held resources. */ @Override + @SuppressWarnings("unchecked") public void commit(Xid xid, boolean onePhase) throws XAException { - synchronized (cpoXaStateMap) { - CpoXaState cpoXaState = cpoXaStateMap.getXidStateMap().get(xid); - - if (cpoXaState == null) - throw CpoXaError.createXAException(CpoXaError.XAER_NOTA, "Unknown XID"); - - if (cpoXaState.getAssociation() == CpoXaState.XA_UNASSOCIATED) { - if (onePhase) { - if (!cpoXaState.isSuccess()) { - rollbackResource(cpoXaState.getResource()); - cpoXaState.setSuccess(true); - throw CpoXaError.createXAException( - CpoXaError.XA_RBROLLBACK, - "Trying to commit an unsuccessful transaction. Transaction Rolled Back"); - } - prepareResource(cpoXaState.getResource()); - } - commitResource(cpoXaState.getResource()); - cpoXaState.setPrepared(false); - } else { - throw CpoXaError.createXAException( - CpoXaError.XAER_PROTO, "Commit can only be called on an unassociated XID"); - } + try { + xidStateMap.compute( + xid, + (k, cpoXaState) -> { + if (cpoXaState == null) + throw new RuntimeException( + CpoXaError.createXAException(CpoXaError.XAER_NOTA, "Unknown XID")); + if (cpoXaState.getAssociation() != CpoXaState.XA_UNASSOCIATED) + throw new RuntimeException( + CpoXaError.createXAException( + CpoXaError.XAER_PROTO, "Commit can only be called on an unassociated XID")); + try { + if (onePhase) { + if (!cpoXaState.isSuccess()) { + rollbackResource((T) cpoXaState.getResource()); + cpoXaState.setSuccess(true); + throw new RuntimeException( + CpoXaError.createXAException( + CpoXaError.XA_RBROLLBACK, + "Trying to commit an unsuccessful transaction. Transaction Rolled Back")); + } + prepareResource((T) cpoXaState.getResource()); + } + commitResource((T) cpoXaState.getResource()); + cpoXaState.setPrepared(false); + } catch (XAException e) { + throw new RuntimeException(e); + } + return cpoXaState; + }); + } catch (RuntimeException e) { + if (e.getCause() instanceof XAException) throw (XAException) e.getCause(); + throw e; } } @@ -169,49 +215,58 @@ public void commit(Xid xid, boolean onePhase) throws XAException { */ @Override public void end(Xid xid, int flags) throws XAException { - synchronized (cpoXaStateMap) { - CpoXaState cpoXaState = cpoXaStateMap.getXidStateMap().get(xid); - - if (cpoXaState == null) - throw CpoXaError.createXAException(CpoXaError.XAER_NOTA, "Unknown XID"); - - // has this already been ended - if (cpoXaState.getAssociation() == CpoXaState.XA_UNASSOCIATED) - throw CpoXaError.createXAException(CpoXaError.XAER_PROTO, "Cannot End an Unassociated XID"); - - switch (flags) { - case TMSUSPEND: - // You can only suspend an associated transaction - if (cpoXaState.getAssociation() == CpoXaState.XA_ASSOCIATED) { - cpoXaStateMap.getXaResourceMap().remove(cpoXaState.getAssignedResourceManager()); - cpoXaState.setAssociation(CpoXaState.XA_SUSPENDED); - cpoXaState.setAssignedResourceManager(null); - } else { - throw CpoXaError.createXAException( - CpoXaError.XAER_PROTO, "You can only suspend an associated XID"); - } - break; - - // you can fail or succeed an associated or suspended trx - case TMFAIL: - // mark transaction as failed - cpoXaStateMap.getXaResourceMap().remove(cpoXaState.getAssignedResourceManager()); - cpoXaState.setAssociation(CpoXaState.XA_UNASSOCIATED); - cpoXaState.setAssignedResourceManager(null); - cpoXaState.setSuccess(!cpoXaState.isSuccess()); - break; - - case TMSUCCESS: - // mark transaction as success - cpoXaStateMap.getXaResourceMap().remove(cpoXaState.getAssignedResourceManager()); - cpoXaState.setAssociation(CpoXaState.XA_UNASSOCIATED); - cpoXaState.setAssignedResourceManager(null); - cpoXaState.setSuccess(cpoXaState.isSuccess()); - break; - - default: - throw CpoXaError.createXAException(CpoXaError.XAER_INVAL, "Invalid flag for end()"); - } + try { + xidStateMap.compute( + xid, + (k, cpoXaState) -> { + if (cpoXaState == null) + throw new RuntimeException( + CpoXaError.createXAException(CpoXaError.XAER_NOTA, "Unknown XID")); + + // has this already been ended + if (cpoXaState.getAssociation() == CpoXaState.XA_UNASSOCIATED) + throw new RuntimeException( + CpoXaError.createXAException( + CpoXaError.XAER_PROTO, "Cannot End an Unassociated XID")); + + switch (flags) { + case TMSUSPEND: + // You can only suspend an associated transaction + if (cpoXaState.getAssociation() != CpoXaState.XA_ASSOCIATED) + throw new RuntimeException( + CpoXaError.createXAException( + CpoXaError.XAER_PROTO, "You can only suspend an associated XID")); + cpoXaState.setAssociation(CpoXaState.XA_SUSPENDED); + break; + + // you can fail or succeed an associated or suspended trx + case TMFAIL: + // mark transaction as failed + cpoXaState.setAssociation(CpoXaState.XA_UNASSOCIATED); + cpoXaState.setSuccess(!cpoXaState.isSuccess()); + break; + + case TMSUCCESS: + // mark transaction as success + cpoXaState.setAssociation(CpoXaState.XA_UNASSOCIATED); + cpoXaState.setSuccess(cpoXaState.isSuccess()); + break; + + default: + throw new RuntimeException( + CpoXaError.createXAException(CpoXaError.XAER_INVAL, "Invalid flag for end()")); + } + + if (cpoXaState.getAssignedResourceManager() != null) { + cpoXaState.getAssignedResourceManager().associatedXid = null; + cpoXaState.setAssignedResourceManager(null); + } + + return cpoXaState; + }); + } catch (RuntimeException e) { + if (e.getCause() instanceof XAException) throw (XAException) e.getCause(); + throw e; } } @@ -254,32 +309,42 @@ public int getTransactionTimeout() throws XAException { * XAER_RMFAIL, XAER_NOTA, XAER_INVAL, or XAER_PROTO. */ @Override + @SuppressWarnings("unchecked") public int prepare(Xid xid) throws XAException { - synchronized (cpoXaStateMap) { - CpoXaState cpoXaState = cpoXaStateMap.getXidStateMap().get(xid); - - if (cpoXaState == null) - throw CpoXaError.createXAException(CpoXaError.XAER_NOTA, "Unknown XID"); - - if (!cpoXaState.isSuccess()) { - rollbackResource(cpoXaState.getResource()); - cpoXaState.setPrepared(false); - cpoXaState.setSuccess(true); - throw CpoXaError.createXAException( - CpoXaError.XA_RBROLLBACK, - "Trying to prepare an unsuccessfull transaction. Rollback performed"); - } - - if (cpoXaState.getAssociation() == CpoXaState.XA_UNASSOCIATED) { - prepareResource(cpoXaState.getResource()); - cpoXaState.setPrepared(true); - } else { - throw CpoXaError.createXAException( - CpoXaError.XAER_PROTO, "Prepare can only be called on an associated XID"); - } - - return XA_OK; + try { + xidStateMap.compute( + xid, + (k, cpoXaState) -> { + if (cpoXaState == null) + throw new RuntimeException( + CpoXaError.createXAException(CpoXaError.XAER_NOTA, "Unknown XID")); + if (cpoXaState.getAssociation() != CpoXaState.XA_UNASSOCIATED) + throw new RuntimeException( + CpoXaError.createXAException( + CpoXaError.XAER_PROTO, "Prepare can only be called on an unassociated XID")); + try { + if (!cpoXaState.isSuccess()) { + rollbackResource((T) cpoXaState.getResource()); + cpoXaState.setPrepared(false); + cpoXaState.setSuccess(true); + throw new RuntimeException( + CpoXaError.createXAException( + CpoXaError.XA_RBROLLBACK, + "Trying to prepare an unsuccessfull transaction. Rollback performed")); + } + prepareResource((T) cpoXaState.getResource()); + cpoXaState.setPrepared(true); + } catch (XAException e) { + throw new RuntimeException(e); + } + + return cpoXaState; + }); + } catch (RuntimeException e) { + if (e.getCause() instanceof XAException) throw (XAException) e.getCause(); + throw e; } + return XA_OK; } /** @@ -297,22 +362,16 @@ public int prepare(Xid xid) throws XAException { */ @Override public Xid[] recover(int flags) throws XAException { - synchronized (cpoXaStateMap) { - switch (flags) { - case TMSTARTRSCAN: - case TMENDRSCAN: - case TMSTARTRSCAN | TMENDRSCAN: - case TMNOFLAGS: - ArrayList xids = new ArrayList<>(); - - for (CpoXaState cpoXaState : cpoXaStateMap.getXidStateMap().values()) { - if (cpoXaState.isPrepared()) xids.add(cpoXaState.getXid()); - } - return xids.toArray(new Xid[0]); - default: + return switch (flags) { + case TMSTARTRSCAN, TMENDRSCAN, TMSTARTRSCAN | TMENDRSCAN, TMNOFLAGS -> + xidStateMap.values().stream() + .filter(CpoXaState::isPrepared) + .map(CpoXaState::getXid) + .toList() + .toArray(new Xid[0]); + default -> throw CpoXaError.createXAException(CpoXaError.XAER_INVAL, "Invalid flag for recover()"); - } - } + }; } /** @@ -325,22 +384,34 @@ public Xid[] recover(int flags) throws XAException { * one of the XA_RB* exceptions. Upon return, the resource manager has rolled back the * branch's work and has released all held resources. */ + @SuppressWarnings("unchecked") @Override public void rollback(Xid xid) throws XAException { - synchronized (cpoXaStateMap) { - CpoXaState cpoXaState = cpoXaStateMap.getXidStateMap().get(xid); - - if (cpoXaState == null) - throw CpoXaError.createXAException(CpoXaError.XAER_NOTA, "Unknown XID"); - - if (cpoXaState.getAssociation() == CpoXaState.XA_UNASSOCIATED) { - rollbackResource(cpoXaState.getResource()); - cpoXaState.setPrepared(false); - cpoXaState.setSuccess(true); - } else { - throw CpoXaError.createXAException( - CpoXaError.XAER_PROTO, "Rollback can only be called on an unassociated XID"); - } + try { + xidStateMap.compute( + xid, + (k, cpoXaState) -> { + if (cpoXaState == null) + throw new RuntimeException( + CpoXaError.createXAException(CpoXaError.XAER_NOTA, "Unknown XID")); + if (cpoXaState.getAssociation() != CpoXaState.XA_UNASSOCIATED) + throw new RuntimeException( + CpoXaError.createXAException( + CpoXaError.XAER_PROTO, "Rollback can only be called on an unassociated XID")); + + try { + rollbackResource((T) cpoXaState.getResource()); + cpoXaState.setPrepared(false); + cpoXaState.setSuccess(true); + } catch (XAException e) { + throw new RuntimeException(e); + } + return cpoXaState; + }); + + } catch (RuntimeException e) { + if (e.getCause() instanceof XAException) throw (XAException) e.getCause(); + throw e; } } @@ -376,72 +447,77 @@ public boolean setTransactionTimeout(int seconds) throws XAException { * XAER_RMFAIL, XAER_DUPID, XAER_OUTSIDE, XAER_NOTA, XAER_INVAL, or XAER_PROTO. */ @Override + @SuppressWarnings("unchecked") public void start(Xid xid, int flags) throws XAException { + try { + xidStateMap.compute( + xid, + (k, cpoXaState) -> { + if (associatedXid != null) + throw new RuntimeException( + CpoXaError.createXAException( + CpoXaError.XAER_PROTO, "Start can not be called on an associated XID")); + + switch (flags) { + case TMNOFLAGS: // Starting a new transaction ID + // if it is already in use then throw a dupe id error + if (cpoXaState != null) + throw new RuntimeException( + CpoXaError.createXAException(CpoXaError.XAER_DUPID, "Duplicate XID")); + + try { + cpoXaState = + new CpoXaState<>( + xid, createNewResource(), CpoXaState.XA_ASSOCIATED, this, true); + } catch (XAException e) { + throw new RuntimeException(e); + } + break; + + case TMJOIN: + if (cpoXaState == null) + throw new RuntimeException( + CpoXaError.createXAException(CpoXaError.XAER_NOTA, "Unknown XID")); + + if (cpoXaState.getAssociation() != CpoXaState.XA_UNASSOCIATED) { + throw new RuntimeException( + CpoXaError.createXAException( + CpoXaError.XAER_PROTO, + "TMJOIN can only be used with an unassociated XID")); + } + break; + + case TMRESUME: + if (cpoXaState == null) + throw new RuntimeException( + CpoXaError.createXAException(CpoXaError.XAER_NOTA, "Unknown XID")); + + // you can only resume a suspended transaction + if (cpoXaState.getAssociation() != CpoXaState.XA_SUSPENDED) { + throw new RuntimeException( + CpoXaError.createXAException( + CpoXaError.XAER_PROTO, "TMRESUME can only be used with a suspended XID")); + } + break; + + default: // invalid arguments + throw new RuntimeException( + CpoXaError.createXAException(CpoXaError.XAER_INVAL, "Invalid start() flag")); + } - synchronized (cpoXaStateMap) { - // see if we are already associated with a global transaction - if (cpoXaStateMap.getXaResourceMap().get(this) != null) - throw CpoXaError.createXAException( - CpoXaError.XAER_PROTO, "Start can not be called on an associated XID"); - - CpoXaState cpoXaState = cpoXaStateMap.getXidStateMap().get(xid); - - switch (flags) { - case TMNOFLAGS: // Starting a new transaction ID - // if it is already in use then throw a dupe id error - if (cpoXaState != null) - throw CpoXaError.createXAException(CpoXaError.XAER_DUPID, "Duplicate XID"); - - cpoXaState = - new CpoXaState<>(xid, createNewResource(), CpoXaState.XA_ASSOCIATED, this, true); - cpoXaStateMap.getXidStateMap().put(xid, cpoXaState); - cpoXaStateMap.getXaResourceMap().put(this, xid); - break; - - case TMJOIN: - if (cpoXaState == null) - throw CpoXaError.createXAException(CpoXaError.XAER_NOTA, "Unknown XID"); - - if (cpoXaState.getAssociation() == CpoXaState.XA_UNASSOCIATED) { cpoXaState.setAssociation(CpoXaState.XA_ASSOCIATED); - cpoXaState.setAssignedResourceManager(this); - cpoXaStateMap.getXaResourceMap().put(this, xid); - } else { - throw CpoXaError.createXAException( - CpoXaError.XAER_PROTO, "TMJOIN can only be used with an unassociated XID"); - } - break; - - case TMRESUME: - if (cpoXaState == null) - throw CpoXaError.createXAException(CpoXaError.XAER_NOTA, "Unknown XID"); - - // you can only resume a suspended transaction - if (cpoXaState.getAssociation() == CpoXaState.XA_SUSPENDED) { - cpoXaState.setAssociation(CpoXaState.XA_ASSOCIATED); - cpoXaState.setAssignedResourceManager(this); - cpoXaStateMap.getXaResourceMap().put(this, xid); - } else { - throw CpoXaError.createXAException( - CpoXaError.XAER_PROTO, "TMRESUME can only be used with a suspended XID"); - } - break; - - default: // invalid arguments - throw CpoXaError.createXAException(CpoXaError.XAER_INVAL, "Invalid start() flag"); - } + ((CpoXaState) cpoXaState).setAssignedResourceManager(this); + associatedXid = xid; + return cpoXaState; + }); + } catch (RuntimeException e) { + if (e.getCause() instanceof XAException) throw (XAException) e.getCause(); + throw e; } } - private CpoXaStateMap getCpoXaStateMap() { - synchronized (XA_STATEMAP_MUTEX) { - CpoXaStateMap stateMap = - (CpoXaStateMap) cpoXaStateMapMap.get(this.getClass().getName()); - if (stateMap == null) { - stateMap = new CpoXaStateMap(); - cpoXaStateMapMap.put(this.getClass().getName(), stateMap); - } - return stateMap; - } + private ConcurrentHashMap> getXidStateMap() { + return cpoXaResourceStateMap.computeIfAbsent( + this.getClass().getName(), (k) -> new ConcurrentHashMap<>()); } } diff --git a/cpo-core/src/main/java/org/synchronoss/cpo/jta/CpoXaResource.java b/cpo-core/src/main/java/org/synchronoss/cpo/jta/CpoXaResource.java index 627fbafb6..7125d05db 100644 --- a/cpo-core/src/main/java/org/synchronoss/cpo/jta/CpoXaResource.java +++ b/cpo-core/src/main/java/org/synchronoss/cpo/jta/CpoXaResource.java @@ -27,7 +27,7 @@ import javax.transaction.xa.Xid; /** Created by dberry on 11/8/15. */ -public interface CpoXaResource extends XAResource { +public interface CpoXaResource extends XAResource { /** * @param xid The id of the XAResource to close * @throws XAException An exception occurred closing the resource diff --git a/cpo-core/src/main/java/org/synchronoss/cpo/jta/CpoXaStateMap.java b/cpo-core/src/main/java/org/synchronoss/cpo/jta/CpoXaStateMap.java index 58c220e5c..c9973e4bb 100644 --- a/cpo-core/src/main/java/org/synchronoss/cpo/jta/CpoXaStateMap.java +++ b/cpo-core/src/main/java/org/synchronoss/cpo/jta/CpoXaStateMap.java @@ -31,14 +31,6 @@ public class CpoXaStateMap { // map of all seen XIDs private final HashMap> xidStateMap = new HashMap<>(); - // map of associated XAResources - private final HashMap, Xid> xaResourceMap = new HashMap<>(); - - // map of all assigned resources - public HashMap, Xid> getXaResourceMap() { - return xaResourceMap; - } - // map of all xid State public HashMap> getXidStateMap() { return xidStateMap; diff --git a/cpo-core/src/test/java/org/synchronoss/cpo/jta/CpoXaResourceTest.java b/cpo-core/src/test/java/org/synchronoss/cpo/jta/CpoXaResourceTest.java index 23d21e2b0..47216e3a0 100644 --- a/cpo-core/src/test/java/org/synchronoss/cpo/jta/CpoXaResourceTest.java +++ b/cpo-core/src/test/java/org/synchronoss/cpo/jta/CpoXaResourceTest.java @@ -66,7 +66,7 @@ public void testStart() { try { xaResource.start(xid1, XAResource.TMNOFLAGS); - assertEquals(0, xaResource.length()); + assertEquals(xaResource.length(), 0); xaResource.append(GLOBAL_RESOURCE); xaResource.end(xid1, XAResource.TMSUCCESS); } catch (XAException xae) { @@ -98,7 +98,7 @@ public void testStart() { // should allow a start join on an unassigned xid xaResource.start(xid1, XAResource.TMJOIN); // check for the global value - assertEquals(GLOBAL_RESOURCE, xaResource.toString()); + assertEquals(xaResource.toString(), GLOBAL_RESOURCE); // suspend the transaction for the next tests xaResource.end(xid1, XAResource.TMSUSPEND); } catch (XAException xae) { @@ -130,7 +130,7 @@ public void testStart() { // should allow a start resume on a suspended xid xaResource.start(xid1, XAResource.TMRESUME); // check for the global value - assertEquals(GLOBAL_RESOURCE, xaResource.toString()); + assertEquals(xaResource.toString(), GLOBAL_RESOURCE); // suspend the transaction for the next tests xaResource.end(xid1, XAResource.TMSUCCESS); } catch (XAException xae) { @@ -181,7 +181,7 @@ public void testEnd() { // create the global transaction and test end success try { xaResource.start(xid1, XAResource.TMNOFLAGS); - assertEquals(0, xaResource.length()); + assertEquals(xaResource.length(), 0); xaResource.append(GLOBAL_RESOURCE); xaResource.end(xid1, XAResource.TMSUCCESS); } catch (XAException xae) { diff --git a/cpo-core/src/test/java/org/synchronoss/cpo/jta/EnhancedStringBuilderXaResource.java b/cpo-core/src/test/java/org/synchronoss/cpo/jta/EnhancedStringBuilderXaResource.java index 6e5065ac6..c8dd5f6ab 100644 --- a/cpo-core/src/test/java/org/synchronoss/cpo/jta/EnhancedStringBuilderXaResource.java +++ b/cpo-core/src/test/java/org/synchronoss/cpo/jta/EnhancedStringBuilderXaResource.java @@ -41,6 +41,6 @@ public class EnhancedStringBuilderXaResource extends StringBuilderXaResource { public boolean isSameRM(XAResource xaResource) throws XAException { if (xaResource == null) throw new XAException(XAException.XAER_INVAL); - return xaResource instanceof EnhancedStringBuilderXaResource; + return xaResource instanceof EnhancedStringBuilderXaResource && this.equals(xaResource); } } diff --git a/cpo-core/src/test/java/org/synchronoss/cpo/jta/StringBuilderXaResource.java b/cpo-core/src/test/java/org/synchronoss/cpo/jta/StringBuilderXaResource.java index 655cbef8c..b3c4df746 100644 --- a/cpo-core/src/test/java/org/synchronoss/cpo/jta/StringBuilderXaResource.java +++ b/cpo-core/src/test/java/org/synchronoss/cpo/jta/StringBuilderXaResource.java @@ -24,9 +24,13 @@ import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.synchronoss.cpo.CpoException; /** Created by dberry on 8/9/15. */ public class StringBuilderXaResource extends CpoBaseXaResource { + private static final Logger logger = LoggerFactory.getLogger(StringBuilderXaResource.class); public StringBuilderXaResource() { super(new StringBuilder()); @@ -63,17 +67,27 @@ protected void closeResource(StringBuilder xaResource) throws XAException {} public boolean isSameRM(XAResource xaResource) throws XAException { if (xaResource == null) throw new XAException(XAException.XAER_INVAL); - return xaResource instanceof StringBuilderXaResource; + return xaResource instanceof StringBuilderXaResource && this.equals(xaResource); } // StringBuilder Proxy methods - public StringBuilder append(Object obj) { - return getCurrentResource().append(String.valueOf(obj)); + public StringBuilderXaResource append(Object obj) { + try { + accept((sb) -> sb.append(obj)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } - public StringBuilder append(String str) { - return getCurrentResource().append(str); + public StringBuilderXaResource append(String str) { + try { + accept((sb) -> sb.append(str)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } /** @@ -89,195 +103,360 @@ public StringBuilder append(String str) { * n; otherwise, it is equal to the character at index k-n in the argument {@code * sb}. * - * @param sb the {@code StringBuffer} to append. + * @param stringBuffer the {@code StringBuffer} to append. * @return a reference to this object. */ - public StringBuilder append(StringBuffer sb) { - return getCurrentResource().append(sb); + public StringBuilderXaResource append(StringBuffer stringBuffer) { + try { + accept((sb) -> sb.append(stringBuffer)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } - public StringBuilder append(CharSequence s) { - return getCurrentResource().append(s); + public StringBuilderXaResource append(CharSequence s) { + try { + accept((sb) -> sb.append(s)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } /** * @throws IndexOutOfBoundsException {@inheritDoc} */ - public StringBuilder append(CharSequence s, int start, int end) { - return getCurrentResource().append(s, start, end); + public StringBuilderXaResource append(CharSequence s, int start, int end) { + try { + accept((sb) -> sb.append(s, start, end)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } - public StringBuilder append(char[] str) { - return getCurrentResource().append(str); + public StringBuilderXaResource append(char[] str) { + try { + accept((sb) -> sb.append(str)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } /** * @throws IndexOutOfBoundsException {@inheritDoc} */ - public StringBuilder append(char[] str, int offset, int len) { - return getCurrentResource().append(str, offset, len); - } - - public StringBuilder append(boolean b) { - return getCurrentResource().append(b); - } - - public StringBuilder append(char c) { - return getCurrentResource().append(c); - } - - public StringBuilder append(int i) { - return getCurrentResource().append(i); - } - - public StringBuilder append(long lng) { - return getCurrentResource().append(lng); - } - - public StringBuilder append(float f) { - return getCurrentResource().append(f); - } - - public StringBuilder append(double d) { - return getCurrentResource().append(d); + public StringBuilderXaResource append(char[] str, int offset, int len) { + try { + accept((sb) -> sb.append(str, offset, len)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; + } + + public StringBuilderXaResource append(boolean b) { + try { + accept((sb) -> sb.append(b)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; + } + + public StringBuilderXaResource append(char c) { + try { + accept((sb) -> sb.append(c)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; + } + + public StringBuilderXaResource append(int i) { + try { + accept((sb) -> sb.append(i)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; + } + + public StringBuilderXaResource append(long lng) { + try { + accept((sb) -> sb.append(lng)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; + } + + public StringBuilderXaResource append(float f) { + try { + accept((sb) -> sb.append(f)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; + } + + public StringBuilderXaResource append(double d) { + try { + accept((sb) -> sb.append(d)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } /** * @since 1.5 */ - public StringBuilder appendCodePoint(int codePoint) { - return getCurrentResource().appendCodePoint(codePoint); + public StringBuilderXaResource appendCodePoint(int codePoint) { + try { + accept((sb) -> sb.appendCodePoint(codePoint)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } /** * @throws StringIndexOutOfBoundsException {@inheritDoc} */ - public StringBuilder delete(int start, int end) { - return getCurrentResource().delete(start, end); + public StringBuilderXaResource delete(int start, int end) { + try { + accept((sb) -> sb.delete(start, end)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } /** * @throws StringIndexOutOfBoundsException {@inheritDoc} */ - public StringBuilder deleteCharAt(int index) { - return getCurrentResource().deleteCharAt(index); + public StringBuilderXaResource deleteCharAt(int index) { + try { + accept((sb) -> sb.deleteCharAt(index)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } /** * @throws StringIndexOutOfBoundsException {@inheritDoc} */ - public StringBuilder replace(int start, int end, String str) { - return getCurrentResource().replace(start, end, str); + public StringBuilderXaResource replace(int start, int end, String str) { + try { + accept((sb) -> sb.replace(start, end, str)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } /** * @throws StringIndexOutOfBoundsException {@inheritDoc} */ - public StringBuilder insert(int index, char[] str, int offset, int len) { - return getCurrentResource().insert(index, str, offset, len); + public StringBuilderXaResource insert(int index, char[] str, int offset, int len) { + try { + accept((sb) -> sb.insert(index, str, offset, len)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } /** * @throws StringIndexOutOfBoundsException {@inheritDoc} */ - public StringBuilder insert(int offset, Object obj) { - return getCurrentResource().insert(offset, obj); + public StringBuilderXaResource insert(int offset, Object obj) { + try { + accept((sb) -> sb.insert(offset, obj)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } /** * @throws StringIndexOutOfBoundsException {@inheritDoc} */ - public StringBuilder insert(int offset, String str) { - return getCurrentResource().insert(offset, str); + public StringBuilderXaResource insert(int offset, String str) { + try { + accept((sb) -> sb.insert(offset, str)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } /** * @throws StringIndexOutOfBoundsException {@inheritDoc} */ - public StringBuilder insert(int offset, char[] str) { - return getCurrentResource().insert(offset, str); + public StringBuilderXaResource insert(int offset, char[] str) { + try { + accept((sb) -> sb.insert(offset, str)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } /** * @throws IndexOutOfBoundsException {@inheritDoc} */ - public StringBuilder insert(int dstOffset, CharSequence s) { - return getCurrentResource().insert(dstOffset, s); + public StringBuilderXaResource insert(int dstOffset, CharSequence s) { + try { + accept((sb) -> sb.insert(dstOffset, s)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } /** * @throws IndexOutOfBoundsException {@inheritDoc} */ - public StringBuilder insert(int dstOffset, CharSequence s, int start, int end) { - return getCurrentResource().insert(dstOffset, s, start, end); + public StringBuilderXaResource insert(int dstOffset, CharSequence s, int start, int end) { + try { + accept((sb) -> sb.insert(dstOffset, s, start, end)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } /** * @throws StringIndexOutOfBoundsException {@inheritDoc} */ - public StringBuilder insert(int offset, boolean b) { - return getCurrentResource().insert(offset, b); + public StringBuilderXaResource insert(int offset, boolean b) { + try { + accept((sb) -> sb.insert(offset, b)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } /** * @throws IndexOutOfBoundsException {@inheritDoc} */ - public StringBuilder insert(int offset, char c) { - return getCurrentResource().insert(offset, c); + public StringBuilderXaResource insert(int offset, char c) { + try { + accept((sb) -> sb.insert(offset, c)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } /** * @throws StringIndexOutOfBoundsException {@inheritDoc} */ - public StringBuilder insert(int offset, int i) { - return getCurrentResource().insert(offset, i); + public StringBuilderXaResource insert(int offset, int i) { + try { + accept((sb) -> sb.insert(offset, i)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } /** * @throws StringIndexOutOfBoundsException {@inheritDoc} */ - public StringBuilder insert(int offset, long l) { - return getCurrentResource().insert(offset, l); + public StringBuilderXaResource insert(int offset, long l) { + try { + accept((sb) -> sb.insert(offset, l)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } /** * @throws StringIndexOutOfBoundsException {@inheritDoc} */ - public StringBuilder insert(int offset, float f) { - return getCurrentResource().insert(offset, f); + public StringBuilderXaResource insert(int offset, float f) { + try { + accept((sb) -> sb.insert(offset, f)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } /** * @throws StringIndexOutOfBoundsException {@inheritDoc} */ - public StringBuilder insert(int offset, double d) { - return getCurrentResource().insert(offset, d); + public StringBuilderXaResource insert(int offset, double d) { + try { + accept((sb) -> sb.insert(offset, d)); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } public int indexOf(String str) { - return getCurrentResource().indexOf(str); + try { + return apply((sb) -> sb.indexOf(str)); + } catch (CpoException e) { + logger.error("Error appending", e); + return -1; + } } public int indexOf(String str, int fromIndex) { - return getCurrentResource().indexOf(str, fromIndex); + try { + return apply((sb) -> sb.indexOf(str, fromIndex)); + } catch (CpoException e) { + logger.error("Error appending", e); + return -1; + } } public int lastIndexOf(String str) { - return getCurrentResource().lastIndexOf(str); + try { + return apply((sb) -> sb.lastIndexOf(str)); + } catch (CpoException e) { + logger.error("Error appending", e); + return -1; + } } public int lastIndexOf(String str, int fromIndex) { - return getCurrentResource().lastIndexOf(str, fromIndex); + try { + return apply((sb) -> sb.lastIndexOf(str, fromIndex)); + } catch (CpoException e) { + logger.error("Error appending", e); + return -1; + } } - public StringBuilder reverse() { - return getCurrentResource().reverse(); + public StringBuilderXaResource reverse() { + try { + accept(StringBuilder::reverse); + } catch (CpoException e) { + logger.error("Error appending", e); + } + return this; } @Override public String toString() { // Create a copy, don't share the array - return getCurrentResource().toString(); + try { + return apply(StringBuilder::toString); + } catch (CpoException e) { + logger.error("Error appending", e); + return ""; + } } /** @@ -287,12 +466,17 @@ public String toString() { * @return the number of chars in this sequence */ int length() { - return getCurrentResource().length(); + try { + return apply(StringBuilder::length); + } catch (CpoException e) { + logger.error("Error appending", e); + return 0; + } } /** - * Returns the char value at the specified index. An index ranges from zero to - * length() - 1. The first char value of the sequence is at index zero, the + * Returns the char value at the specified index. An index ranges from zero to + * length() - 1. The first char value of the sequence is at index zero, the * next at index one, and so on, as for array indexing. * *

If the char value specified by the index is a char value to be returned * @return the specified char value - * @throws IndexOutOfBoundsException if the index argument is negative or not less than - * length() + * @throws IndexOutOfBoundsException if the index argument is negative or not less + * than length() */ char charAt(int index) { - return getCurrentResource().charAt(index); + try { + return apply((sb) -> sb.charAt(index)); + } catch (CpoException e) { + logger.error("Error appending", e); + return Character.UNASSIGNED; + } } /** * Returns a CharSequence that is a subsequence of this sequence. The subsequence * starts with the char value at the specified index and ends with the char - * value at index end - 1. The length (in chars) of the returned - * sequence is end - start, so if start == end then an empty sequence is + * value at index end - 1. The length (in chars) of the returned + * sequence is end - start, so if start == end then an empty sequence is * returned. * * @param start the start index, inclusive * @param end the end index, exclusive * @return the specified subsequence - * @throws IndexOutOfBoundsException if start or end are negative, if - * end is greater than length(), or if start is greater than - * end + * @throws IndexOutOfBoundsException if start or end are negative, if + * end is greater than length(), or if start is greater + * than end */ CharSequence subSequence(int start, int end) { - return getCurrentResource().subSequence(start, end); + try { + return apply((sb) -> sb.subSequence(start, end)); + } catch (CpoException e) { + logger.error("Error appending", e); + return null; + } } } diff --git a/cpo-jdbc/src/main/java/org/synchronoss/cpo/jdbc/jta/JdbcCpoXaAdapter.java b/cpo-jdbc/src/main/java/org/synchronoss/cpo/jdbc/jta/JdbcCpoXaAdapter.java index 8c6a95a9e..976530e25 100644 --- a/cpo-jdbc/src/main/java/org/synchronoss/cpo/jdbc/jta/JdbcCpoXaAdapter.java +++ b/cpo-jdbc/src/main/java/org/synchronoss/cpo/jdbc/jta/JdbcCpoXaAdapter.java @@ -27,6 +27,8 @@ import java.util.stream.Stream; import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.synchronoss.cpo.*; import org.synchronoss.cpo.enums.Comparison; import org.synchronoss.cpo.enums.Logical; @@ -71,6 +73,8 @@ public class JdbcCpoXaAdapter extends CpoBaseXaResource implemen /** Version Id for this class. */ private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(JdbcCpoXaAdapter.class); + private JdbcCpoAdapterFactory jdbcCpoAdapterFactory; /** @@ -87,15 +91,27 @@ public JdbcCpoXaAdapter(JdbcCpoAdapterFactory jdbcCpoAdapterFactory) throws CpoE /** Commits the current transaction behind the CpoTrxAdapter */ @Override public void commit() throws CpoException { - JdbcCpoAdapter currentResource = getCurrentResource(); - if (currentResource != getLocalResource()) ((JdbcCpoTrxAdapter) currentResource).commit(); + accept( + (adapter) -> { + try { + if (adapter instanceof JdbcCpoTrxAdapter) ((JdbcCpoTrxAdapter) adapter).commit(); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } /** Rollback the current transaction behind the CpoTrxAdapter */ @Override public void rollback() throws CpoException { - JdbcCpoAdapter currentResource = getCurrentResource(); - if (currentResource != getLocalResource()) ((JdbcCpoTrxAdapter) currentResource).rollback(); + accept( + (adapter) -> { + try { + if (adapter instanceof JdbcCpoTrxAdapter) ((JdbcCpoTrxAdapter) adapter).rollback(); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } /** @@ -114,20 +130,40 @@ public void close() throws CpoException { /** Returns true if the TrxAdapter has been closed, false if it is still active */ @Override public boolean isClosed() throws CpoException { - JdbcCpoAdapter currentResource = getCurrentResource(); - if (currentResource != getLocalResource()) - return ((JdbcCpoTrxAdapter) currentResource).isClosed(); - else return true; + return apply( + (adapter) -> { + try { + if (adapter instanceof JdbcCpoTrxAdapter) + return ((JdbcCpoTrxAdapter) adapter).isClosed(); + return true; + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public long insertBean(T bean) throws CpoException { - return getCurrentResource().insertBean(bean); + return apply( + (cpo) -> { + try { + return cpo.insertBean(bean); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public long insertBean(String groupName, T bean) throws CpoException { - return getCurrentResource().insertBean(groupName, bean); + return apply( + (cpo) -> { + try { + return cpo.insertBean(groupName, bean); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override @@ -138,17 +174,38 @@ public long insertBean( Collection orderBy, Collection nativeExpressions) throws CpoException { - return getCurrentResource().insertBean(groupName, bean, wheres, orderBy, nativeExpressions); + return apply( + (cpo) -> { + try { + return cpo.insertBean(groupName, bean, wheres, orderBy, nativeExpressions); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public long insertBeans(List beans) throws CpoException { - return getCurrentResource().insertBeans(beans); + return apply( + (cpo) -> { + try { + return cpo.insertBeans(beans); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public long insertBeans(String groupName, List beans) throws CpoException { - return getCurrentResource().insertBeans(groupName, beans); + return apply( + (cpo) -> { + try { + return cpo.insertBeans(groupName, beans); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override @@ -159,17 +216,38 @@ public long insertBeans( Collection orderBy, Collection nativeExpressions) throws CpoException { - return getCurrentResource().insertBeans(groupName, beans, wheres, orderBy, nativeExpressions); + return apply( + (cpo) -> { + try { + return cpo.insertBeans(groupName, beans, wheres, orderBy, nativeExpressions); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public long deleteBean(T bean) throws CpoException { - return getCurrentResource().deleteBean(bean); + return apply( + (cpo) -> { + try { + return cpo.deleteBean(bean); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public long deleteBean(String groupName, T bean) throws CpoException { - return getCurrentResource().deleteBean(groupName, bean); + return apply( + (cpo) -> { + try { + return cpo.deleteBean(groupName, bean); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override @@ -180,17 +258,38 @@ public long deleteBean( Collection orderBy, Collection nativeExpressions) throws CpoException { - return getCurrentResource().deleteBean(groupName, bean, wheres, orderBy, nativeExpressions); + return apply( + (cpo) -> { + try { + return cpo.deleteBean(groupName, bean, wheres, orderBy, nativeExpressions); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public long deleteBeans(List beans) throws CpoException { - return getCurrentResource().deleteBeans(beans); + return apply( + (cpo) -> { + try { + return cpo.deleteBeans(beans); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public long deleteBeans(String groupName, List beans) throws CpoException { - return getCurrentResource().deleteBeans(groupName, beans); + return apply( + (cpo) -> { + try { + return cpo.deleteBeans(groupName, beans); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override @@ -201,108 +300,227 @@ public long deleteBeans( Collection orderBy, Collection nativeExpressions) throws CpoException { - return getCurrentResource().deleteBeans(groupName, beans, wheres, orderBy, nativeExpressions); + return apply( + (cpo) -> { + try { + return cpo.deleteBeans(groupName, beans, wheres, orderBy, nativeExpressions); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public T executeBean(T bean) throws CpoException { - return getCurrentResource().executeBean(bean); + return apply( + (cpo) -> { + try { + return cpo.executeBean(bean); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public T executeBean(String groupName, T bean) throws CpoException { - return getCurrentResource().executeBean(groupName, bean); + return apply( + (cpo) -> { + try { + return cpo.executeBean(groupName, bean); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public T executeBean(String groupName, C criteria, T result) throws CpoException { - return getCurrentResource().executeBean(groupName, criteria, result); + return apply( + (cpo) -> { + try { + return cpo.executeBean(groupName, criteria, result); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public long existsBean(T bean) throws CpoException { - return getCurrentResource().existsBean(bean); + return apply( + (cpo) -> { + try { + return cpo.existsBean(bean); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public long existsBean(String groupName, T bean) throws CpoException { - return getCurrentResource().existsBean(groupName, bean); + return apply( + (cpo) -> { + try { + return cpo.existsBean(groupName, bean); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public long existsBean(String groupName, T bean, Collection wheres) throws CpoException { - return getCurrentResource().existsBean(groupName, bean, wheres); + return apply( + (cpo) -> { + try { + return cpo.existsBean(groupName, bean, wheres); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public CpoOrderBy newOrderBy(String attribute, boolean ascending) throws CpoException { - return getCurrentResource().newOrderBy(attribute, ascending); + return apply( + (cpo) -> { + try { + return cpo.newOrderBy(attribute, ascending); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public CpoOrderBy newOrderBy(String marker, String attribute, boolean ascending) throws CpoException { - return getCurrentResource().newOrderBy(marker, attribute, ascending); + return apply( + (cpo) -> { + try { + return cpo.newOrderBy(marker, attribute, ascending); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public CpoOrderBy newOrderBy(String attribute, boolean ascending, String function) throws CpoException { - return getCurrentResource().newOrderBy(attribute, ascending, function); + return apply( + (cpo) -> { + try { + return cpo.newOrderBy(attribute, ascending, function); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public CpoOrderBy newOrderBy(String marker, String attribute, boolean ascending, String function) throws CpoException { - return getCurrentResource().newOrderBy(marker, attribute, ascending, function); + return apply( + (cpo) -> { + try { + return cpo.newOrderBy(marker, attribute, ascending, function); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public CpoWhere newWhere() throws CpoException { - return getCurrentResource().newWhere(); + return apply(JdbcCpoAdapter::newWhere); } @Override public CpoWhere newWhere(Logical logical, String attr, Comparison comp, T value) throws CpoException { - return getCurrentResource().newWhere(logical, attr, comp, value); + return apply((cpo) -> cpo.newWhere(logical, attr, comp, value)); } @Override public CpoWhere newWhere(Logical logical, String attr, Comparison comp, T value, boolean not) throws CpoException { - return getCurrentResource().newWhere(logical, attr, comp, value, not); + return apply((cpo) -> cpo.newWhere(logical, attr, comp, value, not)); } @Override public long upsertBean(T bean) throws CpoException { - return getCurrentResource().upsertBean(bean); + return apply( + (cpo) -> { + try { + return cpo.upsertBean(bean); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public long upsertBean(String groupName, T bean) throws CpoException { - return getCurrentResource().upsertBean(groupName, bean); + return apply( + (cpo) -> { + try { + return cpo.upsertBean(groupName, bean); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public long upsertBeans(List beans) throws CpoException { - return getCurrentResource().upsertBeans(beans); + return apply( + (cpo) -> { + try { + return cpo.upsertBeans(beans); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public long upsertBeans(String groupName, List beans) throws CpoException { - return getCurrentResource().upsertBeans(groupName, beans); + return apply( + (cpo) -> { + try { + return cpo.upsertBeans(groupName, beans); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public T retrieveBean(T bean) throws CpoException { - return getCurrentResource().retrieveBean(bean); + return apply( + (cpo) -> { + try { + return cpo.retrieveBean(bean); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public T retrieveBean(String groupName, T bean) throws CpoException { - return getCurrentResource().retrieveBean(groupName, bean); + return apply( + (cpo) -> { + try { + return cpo.retrieveBean(groupName, bean); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override @@ -313,7 +531,14 @@ public T retrieveBean( Collection orderBy, Collection nativeExpressions) throws CpoException { - return getCurrentResource().retrieveBean(groupName, bean, wheres, orderBy, nativeExpressions); + return apply( + (cpo) -> { + try { + return cpo.retrieveBean(groupName, bean, wheres, orderBy, nativeExpressions); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override @@ -324,7 +549,14 @@ public T retrieveBean( Collection wheres, Collection orderBy) throws CpoException { - return getCurrentResource().retrieveBean(groupName, criteria, result, wheres, orderBy); + return apply( + (cpo) -> { + try { + return cpo.retrieveBean(groupName, criteria, result, wheres, orderBy); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override @@ -336,46 +568,95 @@ public T retrieveBean( Collection orderBy, Collection nativeExpressions) throws CpoException { - return getCurrentResource() - .retrieveBean(groupName, criteria, result, wheres, orderBy, nativeExpressions); + return apply( + (cpo) -> { + try { + return cpo.retrieveBean( + groupName, criteria, result, wheres, orderBy, nativeExpressions); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public Stream retrieveBeans(String groupName, C criteria) throws CpoException { - return getCurrentResource().retrieveBeans(groupName, criteria); + return apply( + (cpo) -> { + try { + return cpo.retrieveBeans(groupName, criteria); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public Stream retrieveBeans( String groupName, C criteria, CpoWhere where, Collection orderBy) throws CpoException { - return getCurrentResource().retrieveBeans(groupName, criteria, where, orderBy); + return apply( + (cpo) -> { + try { + return cpo.retrieveBeans(groupName, criteria, where, orderBy); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public Stream retrieveBeans(String groupName, C criteria, Collection orderBy) throws CpoException { - return getCurrentResource().retrieveBeans(groupName, criteria, orderBy); + return apply( + (cpo) -> { + try { + return cpo.retrieveBeans(groupName, criteria, orderBy); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public Stream retrieveBeans( String groupName, C criteria, Collection wheres, Collection orderBy) throws CpoException { - return getCurrentResource().retrieveBeans(groupName, criteria, wheres, orderBy); + return apply( + (cpo) -> { + try { + return cpo.retrieveBeans(groupName, criteria, wheres, orderBy); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public Stream retrieveBeans(String groupName, C criteria, T result) throws CpoException { - return getCurrentResource().retrieveBeans(groupName, criteria, result); + return apply( + (cpo) -> { + try { + return cpo.retrieveBeans(groupName, criteria, result); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public Stream retrieveBeans( String groupName, C criteria, T result, CpoWhere where, Collection orderBy) throws CpoException { - return getCurrentResource().retrieveBeans(groupName, criteria, result, where, orderBy); + return apply( + (cpo) -> { + try { + return cpo.retrieveBeans(groupName, criteria, result, where, orderBy); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override @@ -386,7 +667,14 @@ public Stream retrieveBeans( Collection wheres, Collection orderBy) throws CpoException { - return getCurrentResource().retrieveBeans(groupName, criteria, result, wheres, orderBy); + return apply( + (cpo) -> { + try { + return cpo.retrieveBeans(groupName, criteria, result, wheres, orderBy); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override @@ -398,18 +686,39 @@ public Stream retrieveBeans( Collection orderBy, Collection nativeExpressions) throws CpoException { - return getCurrentResource() - .retrieveBeans(groupName, criteria, result, wheres, orderBy, nativeExpressions); + return apply( + (cpo) -> { + try { + return cpo.retrieveBeans( + groupName, criteria, result, wheres, orderBy, nativeExpressions); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public long updateBean(T bean) throws CpoException { - return getCurrentResource().updateBean(bean); + return apply( + (cpo) -> { + try { + return cpo.updateBean(bean); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public long updateBean(String groupName, T bean) throws CpoException { - return getCurrentResource().updateBean(groupName, bean); + return apply( + (cpo) -> { + try { + return cpo.updateBean(groupName, bean); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override @@ -420,17 +729,38 @@ public long updateBean( Collection orderBy, Collection nativeExpressions) throws CpoException { - return getCurrentResource().updateBean(groupName, bean, wheres, orderBy, nativeExpressions); + return apply( + (cpo) -> { + try { + return cpo.updateBean(groupName, bean, wheres, orderBy, nativeExpressions); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public long updateBeans(List beans) throws CpoException { - return getCurrentResource().updateBeans(beans); + return apply( + (cpo) -> { + try { + return cpo.updateBeans(beans); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public long updateBeans(String groupName, List beans) throws CpoException { - return getCurrentResource().updateBeans(groupName, beans); + return apply( + (cpo) -> { + try { + return cpo.updateBeans(groupName, beans); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override @@ -441,42 +771,80 @@ public long updateBeans( Collection orderBy, Collection nativeExpressions) throws CpoException { - return getCurrentResource().updateBeans(groupName, beans, wheres, orderBy, nativeExpressions); + return apply( + (cpo) -> { + try { + return cpo.updateBeans(groupName, beans, wheres, orderBy, nativeExpressions); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override public CpoMetaDescriptor getCpoMetaDescriptor() { - return getCurrentResource().getCpoMetaDescriptor(); + try { + return apply(JdbcCpoAdapter::getCpoMetaDescriptor); + } catch (CpoException e) { + return null; + } } @Override public String getDataSourceName() { - return getCurrentResource().getDataSourceName(); + try { + return apply(JdbcCpoAdapter::getDataSourceName); + } catch (CpoException e) { + return null; + } } @Override public int getFetchSize() { - return getCurrentResource().getFetchSize(); + try { + return apply(JdbcCpoAdapter::getFetchSize); + } catch (CpoException e) { + return 0; + } } @Override public void setFetchSize(int fetchSize) { - getCurrentResource().setFetchSize(fetchSize); + try { + accept((cpo) -> cpo.setFetchSize(fetchSize)); + } catch (CpoException e) { + logger.error("Could not set fetch size", e); + } } @Override public int getBatchSize() { - return getCurrentResource().getBatchSize(); + try { + return apply(JdbcCpoAdapter::getBatchSize); + } catch (CpoException e) { + return 0; + } } @Override public void setBatchSize(int batchSize) { - getCurrentResource().setBatchSize(batchSize); + try { + accept((cpo) -> cpo.setBatchSize(batchSize)); + } catch (CpoException e) { + logger.error("Could not set batch size", e); + } } @Override public List getCpoAttributes(String expression) throws CpoException { - return getCurrentResource().getCpoAttributes(expression); + return apply( + (cpo) -> { + try { + return cpo.getCpoAttributes(expression); + } catch (CpoException e) { + throw new RuntimeException(e); + } + }); } @Override