Skip to content

Commit

Permalink
Merge branch 'STORM-3227' of https://github.com/revans2/incubator-storm
Browse files Browse the repository at this point in the history
… into STORM-3227

STORM-3227: Only push credentials if going to expected user

This closes apache#2838
  • Loading branch information
Robert Evans committed Sep 18, 2018
2 parents b9e841d + 7607778 commit 8c817e8
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 7 deletions.
7 changes: 5 additions & 2 deletions bin/storm.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,12 @@ def kill(*args):


def upload_credentials(*args):
"""Syntax: [storm upload-credentials topology-name [credkey credvalue]*]
"""Syntax: [storm upload-credentials topology-name [options] [credkey credvalue]*]
Uploads a new set of credentials to a running topology
Uploads a new set of credentials to a running topology.
-f --file <FILE>: provide a properties file with credentials in it to be uploaded
-u --user <USER_NAME>: give the name of the owner of the topology (security precaution).
"""
if not args:
print_usage(command="upload-credentials")
Expand Down
24 changes: 22 additions & 2 deletions storm-client/src/jvm/org/apache/storm/StormSubmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,23 @@ private static Map<String, String> populateCredentials(Map<String, Object> conf,
* @throws NotAliveException if the topology is not alive
* @throws InvalidTopologyException if any other error happens
*/
public static void pushCredentials(String name, Map<String, Object> topoConf, Map<String, String> credentials)
public static void pushCredentials(String name, Map<String,Object> topoConf, Map<String,String> credentials)
throws AuthorizationException, NotAliveException, InvalidTopologyException {
pushCredentials(name, topoConf, credentials, null);
}

/**
* Push a new set of credentials to the running topology.
*
* @param name the name of the topology to push credentials to.
* @param topoConf the topology-specific configuration, if desired. See {@link Config}.
* @param credentials the credentials to push.
* @param expectedUser the user you expect the topology to be owned by.
* @throws AuthorizationException if you are not authorized ot push credentials.
* @throws NotAliveException if the topology is not alive
* @throws InvalidTopologyException if any other error happens
*/
public static void pushCredentials(String name, Map<String, Object> topoConf, Map<String, String> credentials, String expectedUser)
throws AuthorizationException, NotAliveException, InvalidTopologyException {
topoConf = new HashMap(topoConf);
topoConf.putAll(Utils.readCommandLineOpts());
Expand All @@ -124,7 +140,11 @@ public static void pushCredentials(String name, Map<String, Object> topoConf, Ma
try {
try (NimbusClient client = NimbusClient.getConfiguredClient(conf)) {
LOG.info("Uploading new credentials to {}", name);
client.getClient().uploadNewCredentials(name, new Credentials(fullCreds));
Credentials creds = new Credentials(fullCreds);
if (expectedUser != null) {
creds.set_topoOwner(expectedUser);
}
client.getClient().uploadNewCredentials(name, creds);
}
LOG.info("Finished pushing creds to topology: {}", name);
} catch (TException e) {
Expand Down
111 changes: 110 additions & 1 deletion storm-client/src/jvm/org/apache/storm/generated/Credentials.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@ public class Credentials implements org.apache.storm.thrift.TBase<Credentials, C
private static final org.apache.storm.thrift.protocol.TStruct STRUCT_DESC = new org.apache.storm.thrift.protocol.TStruct("Credentials");

private static final org.apache.storm.thrift.protocol.TField CREDS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("creds", org.apache.storm.thrift.protocol.TType.MAP, (short)1);
private static final org.apache.storm.thrift.protocol.TField TOPO_OWNER_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("topoOwner", org.apache.storm.thrift.protocol.TType.STRING, (short)2);

private static final org.apache.storm.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new CredentialsStandardSchemeFactory();
private static final org.apache.storm.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new CredentialsTupleSchemeFactory();

private java.util.Map<java.lang.String,java.lang.String> creds; // required
private java.lang.String topoOwner; // optional

/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.storm.thrift.TFieldIdEnum {
CREDS((short)1, "creds");
CREDS((short)1, "creds"),
TOPO_OWNER((short)2, "topoOwner");

private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();

Expand All @@ -54,6 +57,8 @@ public static _Fields findByThriftId(int fieldId) {
switch(fieldId) {
case 1: // CREDS
return CREDS;
case 2: // TOPO_OWNER
return TOPO_OWNER;
default:
return null;
}
Expand Down Expand Up @@ -94,13 +99,16 @@ public java.lang.String getFieldName() {
}

// isset id assignments
private static final _Fields optionals[] = {_Fields.TOPO_OWNER};
public static final java.util.Map<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData> metaDataMap;
static {
java.util.Map<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData>(_Fields.class);
tmpMap.put(_Fields.CREDS, new org.apache.storm.thrift.meta_data.FieldMetaData("creds", org.apache.storm.thrift.TFieldRequirementType.REQUIRED,
new org.apache.storm.thrift.meta_data.MapMetaData(org.apache.storm.thrift.protocol.TType.MAP,
new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.STRING),
new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.STRING))));
tmpMap.put(_Fields.TOPO_OWNER, new org.apache.storm.thrift.meta_data.FieldMetaData("topoOwner", org.apache.storm.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.STRING)));
metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
org.apache.storm.thrift.meta_data.FieldMetaData.addStructMetaDataMap(Credentials.class, metaDataMap);
}
Expand All @@ -123,6 +131,9 @@ public Credentials(Credentials other) {
java.util.Map<java.lang.String,java.lang.String> __this__creds = new java.util.HashMap<java.lang.String,java.lang.String>(other.creds);
this.creds = __this__creds;
}
if (other.is_set_topoOwner()) {
this.topoOwner = other.topoOwner;
}
}

public Credentials deepCopy() {
Expand All @@ -132,6 +143,7 @@ public Credentials deepCopy() {
@Override
public void clear() {
this.creds = null;
this.topoOwner = null;
}

public int get_creds_size() {
Expand Down Expand Up @@ -168,6 +180,29 @@ public void set_creds_isSet(boolean value) {
}
}

public java.lang.String get_topoOwner() {
return this.topoOwner;
}

public void set_topoOwner(java.lang.String topoOwner) {
this.topoOwner = topoOwner;
}

public void unset_topoOwner() {
this.topoOwner = null;
}

/** Returns true if field topoOwner is set (has been assigned a value) and false otherwise */
public boolean is_set_topoOwner() {
return this.topoOwner != null;
}

public void set_topoOwner_isSet(boolean value) {
if (!value) {
this.topoOwner = null;
}
}

public void setFieldValue(_Fields field, java.lang.Object value) {
switch (field) {
case CREDS:
Expand All @@ -178,6 +213,14 @@ public void setFieldValue(_Fields field, java.lang.Object value) {
}
break;

case TOPO_OWNER:
if (value == null) {
unset_topoOwner();
} else {
set_topoOwner((java.lang.String)value);
}
break;

}
}

Expand All @@ -186,6 +229,9 @@ public java.lang.Object getFieldValue(_Fields field) {
case CREDS:
return get_creds();

case TOPO_OWNER:
return get_topoOwner();

}
throw new java.lang.IllegalStateException();
}
Expand All @@ -199,6 +245,8 @@ public boolean isSet(_Fields field) {
switch (field) {
case CREDS:
return is_set_creds();
case TOPO_OWNER:
return is_set_topoOwner();
}
throw new java.lang.IllegalStateException();
}
Expand Down Expand Up @@ -227,6 +275,15 @@ public boolean equals(Credentials that) {
return false;
}

boolean this_present_topoOwner = true && this.is_set_topoOwner();
boolean that_present_topoOwner = true && that.is_set_topoOwner();
if (this_present_topoOwner || that_present_topoOwner) {
if (!(this_present_topoOwner && that_present_topoOwner))
return false;
if (!this.topoOwner.equals(that.topoOwner))
return false;
}

return true;
}

Expand All @@ -238,6 +295,10 @@ public int hashCode() {
if (is_set_creds())
hashCode = hashCode * 8191 + creds.hashCode();

hashCode = hashCode * 8191 + ((is_set_topoOwner()) ? 131071 : 524287);
if (is_set_topoOwner())
hashCode = hashCode * 8191 + topoOwner.hashCode();

return hashCode;
}

Expand All @@ -259,6 +320,16 @@ public int compareTo(Credentials other) {
return lastComparison;
}
}
lastComparison = java.lang.Boolean.valueOf(is_set_topoOwner()).compareTo(other.is_set_topoOwner());
if (lastComparison != 0) {
return lastComparison;
}
if (is_set_topoOwner()) {
lastComparison = org.apache.storm.thrift.TBaseHelper.compareTo(this.topoOwner, other.topoOwner);
if (lastComparison != 0) {
return lastComparison;
}
}
return 0;
}

Expand Down Expand Up @@ -286,6 +357,16 @@ public java.lang.String toString() {
sb.append(this.creds);
}
first = false;
if (is_set_topoOwner()) {
if (!first) sb.append(", ");
sb.append("topoOwner:");
if (this.topoOwner == null) {
sb.append("null");
} else {
sb.append(this.topoOwner);
}
first = false;
}
sb.append(")");
return sb.toString();
}
Expand Down Expand Up @@ -353,6 +434,14 @@ public void read(org.apache.storm.thrift.protocol.TProtocol iprot, Credentials s
org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 2: // TOPO_OWNER
if (schemeField.type == org.apache.storm.thrift.protocol.TType.STRING) {
struct.topoOwner = iprot.readString();
struct.set_topoOwner_isSet(true);
} else {
org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
default:
org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
Expand All @@ -379,6 +468,13 @@ public void write(org.apache.storm.thrift.protocol.TProtocol oprot, Credentials
}
oprot.writeFieldEnd();
}
if (struct.topoOwner != null) {
if (struct.is_set_topoOwner()) {
oprot.writeFieldBegin(TOPO_OWNER_FIELD_DESC);
oprot.writeString(struct.topoOwner);
oprot.writeFieldEnd();
}
}
oprot.writeFieldStop();
oprot.writeStructEnd();
}
Expand All @@ -404,6 +500,14 @@ public void write(org.apache.storm.thrift.protocol.TProtocol prot, Credentials s
oprot.writeString(_iter601.getValue());
}
}
java.util.BitSet optionals = new java.util.BitSet();
if (struct.is_set_topoOwner()) {
optionals.set(0);
}
oprot.writeBitSet(optionals, 1);
if (struct.is_set_topoOwner()) {
oprot.writeString(struct.topoOwner);
}
}

@Override
Expand All @@ -422,6 +526,11 @@ public void read(org.apache.storm.thrift.protocol.TProtocol prot, Credentials st
}
}
struct.set_creds_isSet(true);
java.util.BitSet incoming = iprot.readBitSet(1);
if (incoming.get(0)) {
struct.topoOwner = iprot.readString();
struct.set_topoOwner_isSet(true);
}
}
}

Expand Down
14 changes: 13 additions & 1 deletion storm-client/src/py/storm/ttypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5793,11 +5793,13 @@ class Credentials(object):
"""
Attributes:
- creds
- topoOwner
"""


def __init__(self, creds=None,):
def __init__(self, creds=None, topoOwner=None,):
self.creds = creds
self.topoOwner = topoOwner

def read(self, iprot):
if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None:
Expand All @@ -5819,6 +5821,11 @@ def read(self, iprot):
iprot.readMapEnd()
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.STRING:
self.topoOwner = iprot.readString().decode('utf-8') if sys.version_info[0] == 2 else iprot.readString()
else:
iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
Expand All @@ -5837,6 +5844,10 @@ def write(self, oprot):
oprot.writeString(viter541.encode('utf-8') if sys.version_info[0] == 2 else viter541)
oprot.writeMapEnd()
oprot.writeFieldEnd()
if self.topoOwner is not None:
oprot.writeFieldBegin('topoOwner', TType.STRING, 2)
oprot.writeString(self.topoOwner.encode('utf-8') if sys.version_info[0] == 2 else self.topoOwner)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()

Expand Down Expand Up @@ -12580,6 +12591,7 @@ def __ne__(self, other):
Credentials.thrift_spec = (
None, # 0
(1, TType.MAP, 'creds', (TType.STRING, 'UTF8', TType.STRING, 'UTF8', False), None, ), # 1
(2, TType.STRING, 'topoOwner', 'UTF8', None, ), # 2
)
all_structs.append(SubmitOptions)
SubmitOptions.thrift_spec = (
Expand Down
1 change: 1 addition & 0 deletions storm-client/src/storm.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ struct RebalanceOptions {

struct Credentials {
1: required map<string,string> creds;
2: optional string topoOwner;
}

enum TopologyInitialStatus {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class UploadCredentials {
*/
public static void main(String[] args) throws Exception {
Map<String, Object> cl = CLI.opt("f", "file", null)
.opt("u", "user", null)
.arg("topologyName", CLI.FIRST_WINS)
.optionalArg("rawCredentials", CLI.INTO_LIST)
.parse(args);
Expand Down Expand Up @@ -106,7 +107,7 @@ public static void main(String[] args) throws Exception {
}
}
}
StormSubmitter.pushCredentials(topologyName, topologyConf, credentialsMap);
StormSubmitter.pushCredentials(topologyName, topologyConf, credentialsMap, (String) cl.get("u"));
LOG.info("Uploaded new creds to topology: {}", topologyName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3437,6 +3437,25 @@ public void uploadNewCredentials(String topoName, Credentials credentials)
credentials = new Credentials(Collections.emptyMap());
}
checkAuthorization(topoName, topoConf, "uploadNewCredentials");
String realPrincipal = (String) topoConf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
String realUser = (String) topoConf.get(Config.TOPOLOGY_SUBMITTER_USER);
String expectedOwner = null;
if (credentials.is_set_topoOwner()) {
expectedOwner = credentials.get_topoOwner();
} else {
Principal p = ReqContext.context().principal();
if (p != null) {
expectedOwner = p.getName();
}
}
// expectedOwner being null means that security is disabled (which why are we uploading credentials with security disabled???
if (expectedOwner == null) {
LOG.warn("Please check you settings. Credentials are being uploaded to {} with security disabled.", topoId);
} else if (!realPrincipal.equals(expectedOwner) && !realUser.equals(expectedOwner)) {
throw new AuthorizationException(topoId + " is expected to be owned by " + expectedOwner
+ " but is actually owned by " + realPrincipal);
}

synchronized (credUpdateLock) {
//Merge the old credentials so creds nimbus created are not lost.
// And in case the user forgot to upload something important this time.
Expand Down

0 comments on commit 8c817e8

Please sign in to comment.