Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

35 changes: 29 additions & 6 deletions core/src/main/thrift/compaction-coordinator.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ enum TCompactionState {
SUCCEEDED
# Compactor should set state to FAILED when compaction job fails, message should be mandatory
FAILED
# Compactor should set state to CANCELLED to acknowledge that it has stopped compacting
# Compactor should set state to CANCELLED to acknowledge that it has stopped compacting
CANCELLED
}

Expand Down Expand Up @@ -79,12 +79,15 @@ service CompactionCoordinatorService {
*/
void compactionCompleted(
1:client.TInfo tinfo
2:security.TCredentials credentials
2:security.TCredentials credentials
3:string externalCompactionId
4:data.TKeyExtent extent
5:tabletserver.TCompactionStats stats
)throws(
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

/*
* Called by Compactor to get the next compaction job
*/
Expand All @@ -94,8 +97,11 @@ service CompactionCoordinatorService {
3:string groupName
4:string compactor
5:string externalCompactionId
)throws(
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

/*
* Called by Compactor to update the Coordinator with the state of the compaction
*/
Expand All @@ -105,8 +111,11 @@ service CompactionCoordinatorService {
3:string externalCompactionId
4:TCompactionStatusUpdate status
5:i64 timestamp
)throws(
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

/*
* Called by Compactor on unsuccessful completion of compaction job
*/
Expand All @@ -117,6 +126,9 @@ service CompactionCoordinatorService {
4:data.TKeyExtent extent
5:string exceptionClassName
6:TCompactionState failureState
)throws(
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

/*
Expand All @@ -125,6 +137,9 @@ service CompactionCoordinatorService {
TExternalCompactionMap getRunningCompactions(
1:client.TInfo tinfo
2:security.TCredentials credentials
)throws(
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

/*
Expand All @@ -134,6 +149,9 @@ service CompactionCoordinatorService {
map<string,TExternalCompactionList> getLongRunningCompactions(
1:client.TInfo tinfo
2:security.TCredentials credentials
)throws(
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

/*
Expand All @@ -142,14 +160,19 @@ service CompactionCoordinatorService {
TExternalCompactionMap getCompletedCompactions(
1:client.TInfo tinfo
2:security.TCredentials credentials
)throws(
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

void cancel(
1:client.TInfo tinfo
2:security.TCredentials credentials
3:string externalCompactionId
)throws(
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

}

service CompactorService {
Expand Down
17 changes: 10 additions & 7 deletions core/src/main/thrift/manager.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ service FateService {
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

}

service ManagerClientService {
Expand Down Expand Up @@ -348,7 +348,7 @@ service ManagerClientService {
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

void tabletServerStopping(
1:client.TInfo tinfo
2:security.TCredentials credentials
Expand All @@ -369,7 +369,7 @@ service ManagerClientService {
2:client.ThriftNotActiveServiceException tnase
3:ThriftPropertyException tpe
)

void modifySystemProperties(
1:client.TInfo tinfo
2:security.TCredentials credentials
Expand Down Expand Up @@ -398,17 +398,17 @@ service ManagerClientService {
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

void removeResourceGroupNode(
1:client.TInfo tinfo
2:security.TCredentials credentials
3:string resourceGroup
) throws (
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
3:client.ThriftResourceGroupNotExistsException rgne
3:client.ThriftResourceGroupNotExistsException rgne
)

void setResourceGroupProperty(
1:client.TInfo tinfo
2:security.TCredentials credentials
Expand All @@ -421,7 +421,7 @@ service ManagerClientService {
3:ThriftPropertyException tpe
4:client.ThriftResourceGroupNotExistsException rgne
)

void modifyResourceGroupProperties(
1:client.TInfo tinfo
2:security.TCredentials credentials
Expand Down Expand Up @@ -496,6 +496,7 @@ service ManagerClientService {
) throws (
1:client.ThriftSecurityException sec
2:client.ThriftTableOperationException toe
3:client.ThriftNotActiveServiceException tnase
)

list<data.TKeyExtent> updateTabletMergeability(
Expand All @@ -506,12 +507,14 @@ service ManagerClientService {
) throws (
1:client.ThriftSecurityException sec
2:client.ThriftTableOperationException toe
3:client.ThriftNotActiveServiceException tnase
)

i64 getManagerTimeNanos(
1:client.TInfo tinfo
2:security.TCredentials credentials
) throws (
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server;

/**
* This interface allows service implementations which support running multiple instances
* concurrently with only one active instance to report whether or not they are the active service.
*/
public interface HighlyAvailableService {

/**
* Is this service instance currently the active instance for the Accumulo cluster.
*
* @return True if the service is the active service, false otherwise.
*/
boolean isActiveService();

/**
* Is this service instance currently in the process of upgrading.
*
* @return True if the service is upgrading, false otherwise.
*/
default boolean isUpgrading() {
return false;
}

/**
* Get the name of the service
*
* @return service name
*/
default String getServiceName() {
return this.getClass().getSimpleName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server.rpc;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Objects;

import org.apache.accumulo.core.clientImpl.thrift.ThriftNotActiveServiceException;
import org.apache.accumulo.server.HighlyAvailableService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An {@link InvocationHandler} which checks to see if a {@link HighlyAvailableService} is the
* current active instance of that service, throwing {@link ThriftNotActiveServiceException} when it
* is not the current active instance.
*/
public class HighlyAvailableServiceInvocationHandler<I> implements InvocationHandler {
private static final Logger LOG =
LoggerFactory.getLogger(HighlyAvailableServiceInvocationHandler.class);

private final I instance;
private final HighlyAvailableService service;

public HighlyAvailableServiceInvocationHandler(I instance, HighlyAvailableService service) {
this.instance = Objects.requireNonNull(instance);
this.service = Objects.requireNonNull(service);
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

// If the service is upgrading, throw an exception
if (service.isUpgrading()) {
LOG.trace("Service can not be accessed while it is upgrading.");
throw new ThriftNotActiveServiceException(service.getServiceName(),
"Service can not be accessed while it is upgrading");
}

// If the service is not active, throw an exception
if (!service.isActiveService()) {
LOG.trace("Denying access to RPC service as this instance is not the active instance.");
throw new ThriftNotActiveServiceException(service.getServiceName(),
"Denying access to RPC service as this instance is not the active instance");
}
try {
// Otherwise, call the real method
return method.invoke(instance, args);
} catch (InvocationTargetException ex) {
throw ex.getCause();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server.rpc;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;

import org.apache.accumulo.core.util.ClassUtil;
import org.apache.accumulo.server.HighlyAvailableService;

/**
* A class to wrap invocations to the Thrift handler to prevent these invocations from succeeding
* when the Accumulo service that this Thrift service is for has not yet obtained its ZooKeeper
* lock.
*
* <p>
* Its expected that all methods in the wrapped thrift service declare they throw
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these Highly* classes were copied as is from an earlier commit. The only change I made was adding this comment.

* {@link org.apache.accumulo.core.clientImpl.thrift.ThriftNotActiveServiceException}. The methods
* should declare they throw in the thrift IDL.
*
* @since 2.0
*/
public class HighlyAvailableServiceWrapper {

private static final HighlyAvailableServiceWrapper INSTANCE = new HighlyAvailableServiceWrapper();

// Not for public use.
private HighlyAvailableServiceWrapper() {}

public static <I> I service(final I instance, HighlyAvailableService service) {
InvocationHandler handler = INSTANCE.getInvocationHandler(instance, service);

ClassUtil.getInterfaces(instance.getClass()).forEach(iface -> {
System.out.println("Interface " + iface.getName() + " of " + instance.getClass().getName());
});

@SuppressWarnings("unchecked")
I proxiedInstance = (I) Proxy.newProxyInstance(instance.getClass().getClassLoader(),
ClassUtil.getInterfaces(instance.getClass()).toArray(new Class<?>[0]), handler);
return proxiedInstance;
}

protected <T> HighlyAvailableServiceInvocationHandler<T> getInvocationHandler(final T instance,
final HighlyAvailableService service) {
return new HighlyAvailableServiceInvocationHandler<>(instance, service);
}
}
Loading