Skip to content

Commit

Permalink
[EJBCLIENT-239] Fix cluster discovery behavior and execution order
Browse files Browse the repository at this point in the history
  • Loading branch information
dmlloyd committed Jun 2, 2017
1 parent 2178731 commit 971ea4a
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 212 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<version.org.wildfly.client-config>1.0.0.Beta4</version.org.wildfly.client-config>
<version.org.wildfly.common>1.2.0.Beta10</version.org.wildfly.common>
<version.org.wildfly.naming.client>1.0.0.Beta14</version.org.wildfly.naming.client>
<version.org.wildfly.discovery>1.0.0.Beta10</version.org.wildfly.discovery>
<version.org.wildfly.discovery>1.0.0.Beta12-SNAPSHOT</version.org.wildfly.discovery>
<version.org.wildfly.security.elytron>1.1.0.Beta42</version.org.wildfly.security.elytron>
<version.org.wildfly.transaction-client>1.0.0.Beta22</version.org.wildfly.transaction-client>
</properties>
Expand Down
83 changes: 62 additions & 21 deletions src/main/java/org/jboss/ejb/client/EJBClientContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public final class EJBClientContext extends Attachable implements Contextual<EJB
* The service type to use for EJB discovery.
*/
public static final ServiceType EJB_SERVICE_TYPE = ServiceType.of("ejb", "jboss");
static final ServiceType EJB_SERVICE_TYPE_WITH_NODE = ServiceType.of("ejb", "jboss", "node", null);

private static final ContextManager<EJBClientContext> CONTEXT_MANAGER = new ContextManager<EJBClientContext>(EJBClientContext.class, "jboss.ejb.client");

Expand Down Expand Up @@ -689,7 +690,7 @@ <R, L extends EJBLocator<T>, T> R performLocatedAction(final L locator, final Lo
final Affinity affinity = locator.getAffinity();
final String scheme;
if (affinity instanceof NodeAffinity) {
return discoverAffinityNode(locator, (NodeAffinity) affinity, locatedAction, authenticationConfiguration, sslContext);
return discoverAffinityNode(locator, (NodeAffinity) affinity, locatedAction, authenticationConfiguration, sslContext, null);
} else if (affinity instanceof ClusterAffinity) {
return discoverAffinityCluster(locator, (ClusterAffinity) affinity, locatedAction, weakAffinity, authenticationConfiguration, sslContext);
} else if (affinity == Affinity.LOCAL) {
Expand All @@ -700,27 +701,53 @@ <R, L extends EJBLocator<T>, T> R performLocatedAction(final L locator, final Lo
assert affinity == Affinity.NONE;
return discoverAffinityNone(locator, locatedAction, weakAffinity, authenticationConfiguration, sslContext);
}
return discoverAffinityScheme(locatedAction, locator, scheme, locator.getAffinity(), authenticationConfiguration, sslContext);
return discoverAffinityScheme(locatedAction, locator, scheme, locator.getAffinity(), authenticationConfiguration, sslContext, null);
}

<R, L extends EJBLocator<T>, T> R discoverAffinityScheme(final LocatedAction<R, L, T> locatedAction, final L locator, final String scheme, final Affinity effectiveAffinity, final AuthenticationConfiguration authenticationConfiguration, final SSLContext sslContext) throws Exception {
<R, L extends EJBLocator<T>, T> R discoverAffinityScheme(final LocatedAction<R, L, T> locatedAction, final L locator, final String scheme, final Affinity effectiveAffinity, final AuthenticationConfiguration authenticationConfiguration, final SSLContext sslContext, final ClusterAffinity fallbackAffinity) throws Exception {
final EJBReceiver transportProvider = getTransportProvider(scheme);
if (transportProvider == null) {
throw Logs.MAIN.noTransportProvider(locator, scheme);
if (fallbackAffinity != null) {
return discoverAffinityCluster(locator, fallbackAffinity, locatedAction, Affinity.NONE, authenticationConfiguration, sslContext);
} else {
throw Logs.MAIN.noTransportProvider(locator, scheme);
}
} else {
return locatedAction.execute(transportProvider, locator, effectiveAffinity, authenticationConfiguration, sslContext);
if (fallbackAffinity != null && effectiveAffinity instanceof URIAffinity && (! transportProvider.isConnected(effectiveAffinity.getUri()) || isStale(effectiveAffinity.getUri(), fallbackAffinity))) {
return discoverAffinityCluster(locator, fallbackAffinity, locatedAction, Affinity.NONE, authenticationConfiguration, sslContext);
} else {
return locatedAction.execute(transportProvider, locator, effectiveAffinity, authenticationConfiguration, sslContext);
}
}
}

boolean isStale(URI uri, ClusterAffinity clusterAffinity) {
try (final ServicesQueue servicesQueue = getDiscovery().discover(ServiceType.of("ejb", "jboss", uri.getScheme(), null), getFilterSpec(clusterAffinity))) {
ServiceURL serviceURL;
for (;;) {
serviceURL = servicesQueue.takeService();
if (serviceURL == null) {
return true;
}
if (serviceURL.getLocationURI().equals(uri)) {
return false;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Logs.MAIN.operationInterrupted();
}
}

<R, L extends EJBLocator<T>, T> R discoverAffinityNone(L locator, final LocatedAction<R, L, T> locatedAction, final Affinity weakAffinity, final AuthenticationConfiguration authenticationConfiguration, final SSLContext sslContext) throws Exception {
assert locator.getAffinity() == Affinity.NONE;

if (weakAffinity instanceof NodeAffinity) {
return discoverAffinityNode(locator, (NodeAffinity) weakAffinity, locatedAction, authenticationConfiguration, sslContext);
return discoverAffinityNode(locator, (NodeAffinity) weakAffinity, locatedAction, authenticationConfiguration, sslContext, null);
} else if (weakAffinity instanceof URIAffinity) {
return discoverAffinityScheme(locatedAction, locator, weakAffinity.getUri().getScheme(), weakAffinity, authenticationConfiguration, sslContext);
return discoverAffinityScheme(locatedAction, locator, weakAffinity.getUri().getScheme(), weakAffinity, authenticationConfiguration, sslContext, null);
} else if (weakAffinity == Affinity.LOCAL) {
return discoverAffinityScheme(locatedAction, locator, "local", locator.getAffinity(), authenticationConfiguration, sslContext);
return discoverAffinityScheme(locatedAction, locator, "local", locator.getAffinity(), authenticationConfiguration, sslContext, null);
} else if (weakAffinity instanceof ClusterAffinity) {
return discoverAffinityCluster(locator, (ClusterAffinity) weakAffinity, locatedAction, Affinity.NONE, authenticationConfiguration, sslContext);
}
Expand All @@ -742,7 +769,7 @@ <R, L extends EJBLocator<T>, T> R discoverAffinityNone(L locator, final LocatedA
final Affinity affinity = Affinity.forUri(serviceURL.getLocationURI());
// recurse for one node if needed
if (affinity instanceof NodeAffinity) {
return discoverAffinityNode(locator, (NodeAffinity) affinity, locatedAction, authenticationConfiguration, sslContext);
return discoverAffinityNode(locator, (NodeAffinity) affinity, locatedAction, authenticationConfiguration, sslContext, null);
} else if (affinity instanceof ClusterAffinity) {
return discoverAffinityCluster(locator, (ClusterAffinity) affinity, locatedAction, Affinity.NONE, authenticationConfiguration, sslContext);
} else if (affinity == Affinity.LOCAL) {
Expand Down Expand Up @@ -806,7 +833,7 @@ <R, L extends EJBLocator<T>, T> R discoverAffinityNone(L locator, final LocatedA
throw Logs.MAIN.selectorReturnedNull(deploymentNodeSelector);
}
}
return discoverAffinityNode(locator, new NodeAffinity(node), locatedAction, authenticationConfiguration, sslContext);
return discoverAffinityNode(locator, new NodeAffinity(node), locatedAction, authenticationConfiguration, sslContext, null);
}
assert ! clusters.isEmpty();
// last of all, find the first cluster and use it
Expand All @@ -817,20 +844,30 @@ <R, L extends EJBLocator<T>, T> R discoverAffinityNone(L locator, final LocatedA
}
}

<R, L extends EJBLocator<T>, T> R discoverAffinityNode(final L locator, final NodeAffinity nodeAffinity, final LocatedAction<R, L, T> locatedAction, final AuthenticationConfiguration authenticationConfiguration, final SSLContext sslContext) throws Exception {
<R, L extends EJBLocator<T>, T> R discoverAffinityNode(final L locator, final NodeAffinity nodeAffinity, final LocatedAction<R, L, T> locatedAction, final AuthenticationConfiguration authenticationConfiguration, final SSLContext sslContext, final ClusterAffinity clusterAffinity) throws Exception {
// we just need to find a single location for this node; therefore, we'll exit at the first opportunity.
try (final ServicesQueue servicesQueue = discover(getFilterSpec(nodeAffinity))) {
try (final ServicesQueue servicesQueue = discover(getFilterSpec(nodeAffinity, clusterAffinity))) {
// we don't recurse into node or cluster for this case; furthermore we always use the first answer.
ServiceURL serviceURL;
for (;;) {
// interruption is caught in the calling method
serviceURL = servicesQueue.takeService();
if (serviceURL == null) {
throw withSuppressed(Logs.MAIN.noEJBReceiverAvailable(locator), servicesQueue.getProblems());
if (clusterAffinity != null) {
return discoverAffinityCluster(locator, clusterAffinity, locatedAction, Affinity.NONE, authenticationConfiguration, sslContext);
} else {
throw withSuppressed(Logs.MAIN.noEJBReceiverAvailable(locator), servicesQueue.getProblems());
}
}
final URI uri = serviceURL.getLocationURI();
final EJBReceiver receiver = getTransportProvider(serviceURL.getUriScheme());
if (receiver != null) {
return locatedAction.execute(receiver, locator, Affinity.forUri(serviceURL.getLocationURI()), authenticationConfiguration, sslContext);
if (clusterAffinity != null && ! receiver.isConnected(uri)) {
// abandon our weak affinity
return discoverAffinityCluster(locator, clusterAffinity, locatedAction, Affinity.NONE, authenticationConfiguration, sslContext);
} else {
return locatedAction.execute(receiver, locator, Affinity.forUri(uri), authenticationConfiguration, sslContext);
}
}
}
}
Expand All @@ -840,11 +877,11 @@ private <R, L extends EJBLocator<T>, T> R discoverAffinityCluster(final L locato
final String clusterName = clusterAffinity.getClusterName();
final EJBClientCluster cluster = configuredClusters.get(clusterName);
if (weakAffinity instanceof NodeAffinity) {
return discoverAffinityNode(locator, (NodeAffinity) weakAffinity, locatedAction, authenticationConfiguration, sslContext);
return discoverAffinityNode(locator, (NodeAffinity) weakAffinity, locatedAction, authenticationConfiguration, sslContext, clusterAffinity);
} else if (weakAffinity == Affinity.LOCAL) {
return discoverAffinityScheme(locatedAction, locator, "local", weakAffinity, authenticationConfiguration, sslContext);
return discoverAffinityScheme(locatedAction, locator, "local", weakAffinity, authenticationConfiguration, sslContext, null);
} else if (weakAffinity instanceof URIAffinity) {
return discoverAffinityScheme(locatedAction, locator, weakAffinity.getUri().getScheme(), weakAffinity, authenticationConfiguration, sslContext);
return discoverAffinityScheme(locatedAction, locator, weakAffinity.getUri().getScheme(), weakAffinity, authenticationConfiguration, sslContext, clusterAffinity);
}
final ClusterNodeSelector selector;
if (cluster != null) {
Expand All @@ -858,7 +895,7 @@ private <R, L extends EJBLocator<T>, T> R discoverAffinityCluster(final L locato
Set<URI> unresolvedUris = new HashSet<>();
Set<String> nodes = new HashSet<>();
List<Throwable> problems;
try (final ServicesQueue servicesQueue = discover(getFilterSpec(clusterAffinity))) {
try (final ServicesQueue servicesQueue = getDiscovery().discover(EJB_SERVICE_TYPE_WITH_NODE, getFilterSpec(clusterAffinity))) {
// interruption is caught in the calling method
ServiceURL serviceURL = servicesQueue.takeService();
while (serviceURL != null) {
Expand Down Expand Up @@ -897,7 +934,7 @@ private <R, L extends EJBLocator<T>, T> R discoverAffinityCluster(final L locato
return locatedAction.execute(getTransportProvider(uri.getScheme()), locator, URIAffinity.forUri(uri), authenticationConfiguration, sslContext);
}
} else if (nodes.size() == 1) {
return discoverAffinityNode(locator, new NodeAffinity(nodes.iterator().next()), locatedAction, authenticationConfiguration, sslContext);
return discoverAffinityNode(locator, new NodeAffinity(nodes.iterator().next()), locatedAction, authenticationConfiguration, sslContext, null);
} else {
// find the subset of connected nodes
final Map<String, URI> all = new HashMap<>();
Expand Down Expand Up @@ -942,6 +979,9 @@ private <R, L extends EJBLocator<T>, T> R discoverAffinityCluster(final L locato

final String[] connectedArray = connected.keySet().toArray(NO_STRINGS);
final String[] nodeArray = all.keySet().toArray(NO_STRINGS);
if (nodeArray.length == 0) {
throw withSuppressed(Logs.MAIN.noEJBReceiverAvailable(locator), problems);
}
final String node = selector.selectNode(clusterName, connectedArray, nodeArray);
if (node == null) {
throw Logs.MAIN.selectorReturnedNull(selector);
Expand Down Expand Up @@ -993,8 +1033,9 @@ FilterSpec getFilterSpec(EJBIdentifier identifier) {
}
}

FilterSpec getFilterSpec(NodeAffinity nodeAffinity) {
return getNodeFilterSpec(nodeAffinity.getNodeName());
FilterSpec getFilterSpec(NodeAffinity nodeAffinity, final ClusterAffinity clusterAffinity) {
final FilterSpec filterSpec = getNodeFilterSpec(nodeAffinity.getNodeName());
return clusterAffinity != null ? FilterSpec.all(filterSpec, getFilterSpec(clusterAffinity)) : filterSpec;
}

FilterSpec getNodeFilterSpec(String nodeName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ private void processMessage(final MessageInputStream message) {
final ServiceURL.Builder concreteBuilder = new ServiceURL.Builder();
concreteBuilder.setAbstractType("ejb");
concreteBuilder.setAbstractTypeAuthority("jboss");
concreteBuilder.addAttribute(EJBClientContext.FILTER_ATTR_CLUSTER, clusterValue);
concreteBuilder.addAttribute(EJBClientContext.FILTER_ATTR_NODE, nodeValue);
if (netmaskBits != 0) {
// do not match all
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@

@MetaInfServices
public final class RemoteEJBDiscoveryConfigurator implements ExternalDiscoveryConfigurator {
public RemoteEJBDiscoveryConfigurator() {
}

public void configure(final Consumer<DiscoveryProvider> discoveryProviderConsumer, final Consumer<RegistryProvider> registryProviderConsumer) {
discoveryProviderConsumer.accept(RemotingEJBDiscoveryProvider.INSTANCE);
}
Expand Down
Loading

0 comments on commit 971ea4a

Please sign in to comment.