Skip to content

Commit

Permalink
feature: seata plugin (apache#3228)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsbxyyx authored Feb 18, 2021
1 parent e4c0683 commit fa3791e
Show file tree
Hide file tree
Showing 15 changed files with 620 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import javax.sql.rowset.serial.SerialBlob;
import javax.sql.rowset.serial.SerialClob;
import org.nustaq.serialization.FSTConfiguration;
import org.nustaq.serialization.FSTObjectSerializer;

/**
* @author funkye
Expand All @@ -39,6 +40,10 @@ public FstSerializerFactory() {
UndoLogSerializerClassRegistry.getRegisteredClasses().keySet().forEach(conf::registerClass);
}

public void registerSerializer(Class type, FSTObjectSerializer ser, boolean alsoForAllSubclasses) {
conf.registerSerializer(type, ser, alsoForAllSubclasses);
}

public <T> byte[] serialize(T t) {
return conf.asByteArray(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,55 @@
*/
package io.seata.rm.datasource.undo.parser;

import io.seata.common.executor.Initialize;
import io.seata.common.loader.EnhancedServiceLoader;
import io.seata.common.loader.EnhancedServiceNotFoundException;
import io.seata.common.loader.LoadLevel;
import io.seata.common.util.CollectionUtils;
import io.seata.rm.datasource.undo.BranchUndoLog;
import io.seata.rm.datasource.undo.UndoLogParser;
import io.seata.rm.datasource.undo.parser.spi.FstSerializer;
import org.nustaq.serialization.FSTObjectSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
* fst serializer
* @author funkye
*/
@LoadLevel(name = FstUndoLogParser.NAME)
public class FstUndoLogParser implements UndoLogParser {
public class FstUndoLogParser implements UndoLogParser, Initialize {

private static final Logger LOGGER = LoggerFactory.getLogger(FstUndoLogParser.class);

public static final String NAME = "fst";

private FstSerializerFactory fstFactory = FstSerializerFactory.getDefaultFactory();

@Override
public void init() {
try {
List<FstSerializer> serializers = EnhancedServiceLoader.loadAll(FstSerializer.class);
if (CollectionUtils.isNotEmpty(serializers)) {
for (FstSerializer serializer : serializers) {
if (serializer != null) {
Class type = serializer.type();
FSTObjectSerializer ser = serializer.ser();
boolean alsoForAllSubclasses = serializer.alsoForAllSubclasses();
if (type != null && ser != null) {
fstFactory.registerSerializer(type, ser, alsoForAllSubclasses);
LOGGER.info("fst undo log parser load [{}].", serializer.getClass().getName());
}
}
}
}
} catch (EnhancedServiceNotFoundException e) {
LOGGER.warn("FstSerializer not found children class.", e);
}
}

@Override
public String getName() {
return NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.List;
import javax.sql.rowset.serial.SerialBlob;
import javax.sql.rowset.serial.SerialClob;
import javax.sql.rowset.serial.SerialException;
Expand All @@ -43,9 +44,13 @@
import com.fasterxml.jackson.databind.ser.std.ArraySerializerBase;
import io.seata.common.Constants;
import io.seata.common.executor.Initialize;
import io.seata.common.loader.EnhancedServiceLoader;
import io.seata.common.loader.EnhancedServiceNotFoundException;
import io.seata.common.loader.LoadLevel;
import io.seata.common.util.CollectionUtils;
import io.seata.rm.datasource.undo.BranchUndoLog;
import io.seata.rm.datasource.undo.UndoLogParser;
import io.seata.rm.datasource.undo.parser.spi.JacksonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -97,6 +102,28 @@ public class JacksonUndoLogParser implements UndoLogParser, Initialize {

@Override
public void init() {
try {
List<JacksonSerializer> jacksonSerializers = EnhancedServiceLoader.loadAll(JacksonSerializer.class);
if (CollectionUtils.isNotEmpty(jacksonSerializers)) {
for (JacksonSerializer jacksonSerializer : jacksonSerializers) {
Class type = jacksonSerializer.type();
JsonSerializer ser = jacksonSerializer.ser();
JsonDeserializer deser = jacksonSerializer.deser();
if (type != null) {
if (ser != null) {
module.addSerializer(type, ser);
}
if (deser != null) {
module.addDeserializer(type, deser);
}
LOGGER.info("jackson undo log parser load [{}].", jacksonSerializer.getClass().getName());
}
}
}
} catch (EnhancedServiceNotFoundException e) {
LOGGER.warn("JacksonSerializer not found children class.", e);
}

module.addSerializer(Timestamp.class, timestampSerializer);
module.addDeserializer(Timestamp.class, timestampDeserializer);
module.addSerializer(SerialBlob.class, blobSerializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.sql.Clob;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.sql.rowset.serial.SerialBlob;
import javax.sql.rowset.serial.SerialClob;
import com.esotericsoftware.kryo.Kryo;
Expand All @@ -43,6 +45,8 @@ public class KryoSerializerFactory implements KryoFactory {

private KryoPool pool = new KryoPool.Builder(this).softReferences().build();

private static final Map<Class, Serializer> TYPE_MAP = new ConcurrentHashMap<>();

private KryoSerializerFactory() {}

public static KryoSerializerFactory getInstance() {
Expand All @@ -60,11 +64,21 @@ public void returnKryo(KryoSerializer kryoSerializer) {
pool.release(kryoSerializer.getKryo());
}

public void registerSerializer(Class type, Serializer ser) {
if (type != null && ser != null) {
TYPE_MAP.put(type, ser);
}
}

@Override
public Kryo create() {
Kryo kryo = new Kryo();
kryo.setRegistrationRequired(false);

for (Map.Entry<Class, Serializer> entry : TYPE_MAP.entrySet()) {
kryo.register(entry.getKey(), entry.getValue());
}

// support clob and blob
kryo.register(SerialBlob.class, new BlobSerializer());
kryo.register(SerialClob.class, new ClobSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,52 @@
*/
package io.seata.rm.datasource.undo.parser;

import com.esotericsoftware.kryo.Serializer;
import io.seata.common.executor.Initialize;
import io.seata.common.loader.EnhancedServiceLoader;
import io.seata.common.loader.EnhancedServiceNotFoundException;
import io.seata.common.loader.LoadLevel;
import io.seata.common.util.CollectionUtils;
import io.seata.rm.datasource.undo.BranchUndoLog;
import io.seata.rm.datasource.undo.UndoLogParser;
import io.seata.rm.datasource.undo.parser.spi.KryoTypeSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
* kryo serializer
* @author jsbxyyx
*/
@LoadLevel(name = KryoUndoLogParser.NAME)
public class KryoUndoLogParser implements UndoLogParser {
public class KryoUndoLogParser implements UndoLogParser, Initialize {

private static final Logger LOGGER = LoggerFactory.getLogger(KryoUndoLogParser.class);

public static final String NAME = "kryo";

@Override
public void init() {
try {
List<KryoTypeSerializer> serializers = EnhancedServiceLoader.loadAll(KryoTypeSerializer.class);
if (CollectionUtils.isNotEmpty(serializers)) {
for (KryoTypeSerializer typeSerializer : serializers) {
if (typeSerializer != null) {
Class type = typeSerializer.type();
Serializer ser = typeSerializer.serializer();
if (type != null) {
KryoSerializerFactory.getInstance().registerSerializer(type, ser);
LOGGER.info("kryo undo log parser load [{}].", typeSerializer.getClass().getName());
}
}
}
}
} catch (EnhancedServiceNotFoundException e) {
LOGGER.warn("KryoTypeSerializer not found children class.", e);
}
}

@Override
public String getName() {
return NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.util.List;

import io.protostuff.Input;
import io.protostuff.LinkedBuffer;
Expand All @@ -31,9 +32,15 @@
import io.protostuff.runtime.RuntimeEnv;
import io.protostuff.runtime.RuntimeSchema;
import io.seata.common.executor.Initialize;
import io.seata.common.loader.EnhancedServiceLoader;
import io.seata.common.loader.EnhancedServiceNotFoundException;
import io.seata.common.loader.LoadLevel;
import io.seata.common.util.CollectionUtils;
import io.seata.rm.datasource.undo.BranchUndoLog;
import io.seata.rm.datasource.undo.UndoLogParser;
import io.seata.rm.datasource.undo.parser.spi.ProtostuffDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The type protostuff based undo log parser.
Expand All @@ -43,6 +50,8 @@
@LoadLevel(name = ProtostuffUndoLogParser.NAME)
public class ProtostuffUndoLogParser implements UndoLogParser, Initialize {

private static final Logger LOGGER = LoggerFactory.getLogger(ProtostuffUndoLogParser.class);

public static final String NAME = "protostuff";

private final DefaultIdStrategy idStrategy = (DefaultIdStrategy) RuntimeEnv.ID_STRATEGY;
Expand All @@ -51,6 +60,18 @@ public class ProtostuffUndoLogParser implements UndoLogParser, Initialize {

@Override
public void init() {
try {
List<ProtostuffDelegate> delegates = EnhancedServiceLoader.loadAll(ProtostuffDelegate.class);
if (CollectionUtils.isNotEmpty(delegates)) {
for (ProtostuffDelegate delegate : delegates) {
idStrategy.registerDelegate(delegate.create());
LOGGER.info("protostuff undo log parser load [{}].", delegate.getClass().getName());
}
}
} catch (EnhancedServiceNotFoundException e) {
LOGGER.warn("ProtostuffDelegate not found children class.", e);
}

idStrategy.registerDelegate(new DateDelegate());
idStrategy.registerDelegate(new TimestampDelegate());
idStrategy.registerDelegate(new SqlDateDelegate());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.seata.rm.datasource.undo.parser.spi;

import org.nustaq.serialization.FSTObjectSerializer;

/**
* @author jsbxyyx
*/
public interface FstSerializer {

/**
* fst serializer class type
* @return
*/
Class type();

/**
* FSTObjectSerializer custom serializer
* @return
*/
FSTObjectSerializer ser();

/**
* for sub classes
* @return
*/
boolean alsoForAllSubclasses();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.seata.rm.datasource.undo.parser.spi;

import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;

/**
* @author jsbxyyx
*/
public interface JacksonSerializer<T> {

/**
* jackson serializer class type.
* @return
*/
Class<T> type();

/**
* Jackson custom serializer
* @return
*/
JsonSerializer<T> ser();

/**
* Jackson custom deserializer
* @return
*/
JsonDeserializer<? extends T> deser();

}
Loading

0 comments on commit fa3791e

Please sign in to comment.