Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-16396. Allow authoritative mode on a subdirectory. #1043

Merged
merged 2 commits into from
Jul 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ private Constants() {

public static final String USER_AGENT_PREFIX = "fs.s3a.user.agent.prefix";

/** Whether or not to allow MetadataStore to be source of truth for a path prefix */
public static final String AUTHORITATIVE_PATH = "fs.s3a.authoritative.path";
public static final String[] DEFAULT_AUTHORITATIVE_PATH = {};

/** Whether or not to allow MetadataStore to be source of truth. */
public static final String METADATASTORE_AUTHORITATIVE =
"fs.s3a.metadatastore.authoritative";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private final AtomicBoolean closed = new AtomicBoolean(false);
private volatile boolean isClosed = false;
private MetadataStore metadataStore;
private boolean allowAuthoritative;
private boolean allowAuthoritativeMetadataStore;
private Collection<String> allowAuthoritativePaths;

/** Delegation token integration; non-empty when DT support is enabled. */
private Optional<S3ADelegationTokens> delegationTokens = Optional.empty();
Expand Down Expand Up @@ -397,11 +398,13 @@ public void initialize(URI name, Configuration originalConf)
ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);

setMetadataStore(S3Guard.getMetadataStore(this));
allowAuthoritative = conf.getBoolean(METADATASTORE_AUTHORITATIVE,
allowAuthoritativeMetadataStore = conf.getBoolean(METADATASTORE_AUTHORITATIVE,
DEFAULT_METADATASTORE_AUTHORITATIVE);
allowAuthoritativePaths = S3Guard.getAuthoritativePaths(this);

if (hasMetadataStore()) {
LOG.debug("Using metadata store {}, authoritative={}",
getMetadataStore(), allowAuthoritative);
LOG.debug("Using metadata store {}, authoritative store={}, authoritative path={}",
getMetadataStore(), allowAuthoritativeMetadataStore, allowAuthoritativePaths);
}
initMultipartUploads(conf);
} catch (AmazonClientException e) {
Expand Down Expand Up @@ -840,7 +843,8 @@ public String pathToKey(Path path) {
* @param key s3 key or ""
* @return the with a trailing "/", or, if it is the root key, "",
*/
private String maybeAddTrailingSlash(String key) {
@InterfaceAudience.Private
public String maybeAddTrailingSlash(String key) {
if (!key.isEmpty() && !key.endsWith("/")) {
return key + '/';
} else {
Expand Down Expand Up @@ -1446,7 +1450,7 @@ public boolean hasMetadataStore() {
*/
@VisibleForTesting
boolean hasAuthoritativeMetadataStore() {
return hasMetadataStore() && allowAuthoritative;
return hasMetadataStore() && allowAuthoritativeMetadataStore;
}

/**
Expand Down Expand Up @@ -2398,6 +2402,8 @@ public FileStatus[] innerListStatus(Path f) throws FileNotFoundException,

DirListingMetadata dirMeta =
S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider);
boolean allowAuthoritative = S3Guard.allowAuthoritative(f, this,
allowAuthoritativeMetadataStore, allowAuthoritativePaths);
if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
return S3Guard.dirMetaToStatuses(dirMeta);
}
Expand All @@ -2415,6 +2421,7 @@ public FileStatus[] innerListStatus(Path f) throws FileNotFoundException,
result.add(files.next());
}
// merge the results. This will update the store as needed

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline without purpose

return S3Guard.dirListingUnion(metadataStore, path, result, dirMeta,
allowAuthoritative, ttlTimeProvider);
} else {
Expand Down Expand Up @@ -2629,6 +2636,8 @@ S3AFileStatus innerGetFileStatus(final Path f,
// dest is also a directory, there's no difference.
// TODO After HADOOP-16085 the modification detection can be done with
// etags or object version instead of modTime
boolean allowAuthoritative = S3Guard.allowAuthoritative(f, this,
allowAuthoritativeMetadataStore, allowAuthoritativePaths);
if (!pm.getFileStatus().isDirectory() &&
!allowAuthoritative) {
LOG.debug("Metadata for {} found in the non-auth metastore.", path);
Expand Down Expand Up @@ -3554,7 +3563,8 @@ public String toString() {
sb.append(", blockFactory=").append(blockFactory);
}
sb.append(", metastore=").append(metadataStore);
sb.append(", authoritative=").append(allowAuthoritative);
sb.append(", authoritativeStore=").append(allowAuthoritativeMetadataStore);
sb.append(", authoritativePath=").append(allowAuthoritativePaths);
sb.append(", useListV1=").append(useListV1);
if (committerIntegration != null) {
sb.append(", magicCommitter=").append(isMagicCommitEnabled());
Expand Down Expand Up @@ -3794,10 +3804,13 @@ private RemoteIterator<S3ALocatedFileStatus> innerListFiles(Path f, boolean
key, delimiter);
final RemoteIterator<S3AFileStatus> cachedFilesIterator;
final Set<Path> tombstones;
boolean allowAuthoritative = S3Guard.allowAuthoritative(f, this,
allowAuthoritativeMetadataStore, allowAuthoritativePaths);
if (recursive) {
final PathMetadata pm = metadataStore.get(path, true);
// shouldn't need to check pm.isDeleted() because that will have
// been caught by getFileStatus above.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline without purpose

MetadataStoreListFilesIterator metadataStoreListFilesIterator =
new MetadataStoreListFilesIterator(metadataStore, pm,
allowAuthoritative);
Expand Down Expand Up @@ -3886,6 +3899,8 @@ public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
final RemoteIterator<S3AFileStatus> cachedFileStatusIterator =
listing.createProvidedFileStatusIterator(
S3Guard.dirMetaToStatuses(meta), filter, acceptor);
boolean allowAuthoritative = S3Guard.allowAuthoritative(f, this,
allowAuthoritativeMetadataStore, allowAuthoritativePaths);
return (allowAuthoritative && meta != null
&& meta.isAuthoritative())
? listing.createLocatedFileStatusIterator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,8 @@ public void createSuccessMarker(Path outputPath,
conf.getTrimmed(S3_METADATA_STORE_IMPL, ""));
successData.addDiagnostic(METADATASTORE_AUTHORITATIVE,
conf.getTrimmed(METADATASTORE_AUTHORITATIVE, "false"));
successData.addDiagnostic(AUTHORITATIVE_PATH,
conf.getTrimmed(AUTHORITATIVE_PATH, ""));
successData.addDiagnostic(MAGIC_COMMITTER_ENABLED,
conf.getTrimmed(MAGIC_COMMITTER_ENABLED, "false"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1321,7 +1321,6 @@ public void put(
final DirListingMetadata meta,
@Nullable final BulkOperationState operationState) throws IOException {
LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline removed without purpose

// directory path
Path path = meta.getPath();
DDBPathMetadata ddbPathMeta =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,9 +50,8 @@
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.util.ReflectionUtils;

import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_AUTHORITATIVE_PATH;
import static org.apache.hadoop.fs.s3a.Statistic.S3GUARD_METADATASTORE_PUT_PATH_LATENCY;
import static org.apache.hadoop.fs.s3a.Statistic.S3GUARD_METADATASTORE_PUT_PATH_REQUEST;
import static org.apache.hadoop.fs.s3a.S3AUtils.createUploadFileStatus;
Expand Down Expand Up @@ -772,4 +772,33 @@ public static DirListingMetadata listChildrenWithTtl(MetadataStore ms,
return dlm;
}

public static Collection<String> getAuthoritativePaths(S3AFileSystem fs) {
String[] rawAuthoritativePaths =
fs.getConf().getTrimmedStrings(AUTHORITATIVE_PATH, DEFAULT_AUTHORITATIVE_PATH);
Collection<String> authoritativePaths = new ArrayList<>();
if (rawAuthoritativePaths.length > 0) {
for (int i = 0; i < rawAuthoritativePaths.length; i++) {
Path qualified = fs.qualify(new Path(rawAuthoritativePaths[i]));
authoritativePaths.add(fs.maybeAddTrailingSlash(qualified.toString()));
}
}
return authoritativePaths;
}

public static boolean allowAuthoritative(Path p, S3AFileSystem fs,
boolean authMetadataStore, Collection<String> authPaths) {
String haystack = fs.maybeAddTrailingSlash(p.toString());
if (authMetadataStore) {
return true;
}
if (!authPaths.isEmpty()) {
for (String needle : authPaths) {

if (haystack.startsWith(needle)) {
return true;
}
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1177,8 +1177,10 @@ public int run(String[] args, PrintStream out)
if (usingS3Guard) {
out.printf("Filesystem %s is using S3Guard with store %s%n",
fsUri, store.toString());
printOption(out, "Authoritative S3Guard",
printOption(out, "Authoritative Metadata Store",
METADATASTORE_AUTHORITATIVE, "false");
printOption(out, "Authoritative Path",
AUTHORITATIVE_PATH, "");
authMode = conf.getBoolean(METADATASTORE_AUTHORITATIVE, false);
printStoreDiagnostics(out, store);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ two different reasons:
stored in metadata store.
* This mode can be set as a configuration property
`fs.s3a.metadatastore.authoritative`
* It can also be set only on specific directories by setting
`fs.s3a.authoritative.path` to one or more prefixes, for example
`s3a://bucket/path` or "/auth1,/auth2".
* All interactions with the S3 bucket(s) must be through S3A clients sharing
the same metadata store.
* This is independent from which metadata store implementation is used.
Expand Down
Loading