Skip to content
This repository has been archived by the owner on Sep 12, 2024. It is now read-only.

Commit

Permalink
feat: kafka consumer test & add it to ftrl
Browse files Browse the repository at this point in the history
complete kafka consumer & add kafka source to ftrl
  • Loading branch information
xixici committed Feb 24, 2020
1 parent d2031a1 commit e0abb7e
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 18 deletions.
2 changes: 1 addition & 1 deletion model/feature_pipe_model.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-1,"{""schema"":[""model_id BIGINT,model_info VARCHAR,C14 DOUBLE,C15 DOUBLE,C16 DOUBLE,C17 DOUBLE,C18 DOUBLE,C19 DOUBLE,C20 DOUBLE,C21 DOUBLE"",""""],""param"":[""{\""selectedCols\"":\""[\\\""C14\\\"",\\\""C15\\\"",\\\""C16\\\"",\\\""C17\\\"",\\\""C18\\\"",\\\""C19\\\"",\\\""C20\\\"",\\\""C21\\\""]\""}"",""{\""outputCol\"":\""\\\""vec\\\""\"",\""numFeatures\"":\""30000\"",\""categoricalCols\"":\""[\\\""C1\\\"",\\\""banner_pos\\\"",\\\""site_category\\\"",\\\""app_domain\\\"",\\\""app_category\\\"",\\\""device_type\\\"",\\\""device_conn_type\\\"",\\\""site_id\\\"",\\\""site_domain\\\"",\\\""device_id\\\"",\\\""device_model\\\""]\"",\""selectedCols\"":\""[\\\""C1\\\"",\\\""banner_pos\\\"",\\\""site_category\\\"",\\\""app_domain\\\"",\\\""app_category\\\"",\\\""device_type\\\"",\\\""device_conn_type\\\"",\\\""C14\\\"",\\\""C15\\\"",\\\""C16\\\"",\\\""C17\\\"",\\\""C18\\\"",\\\""C19\\\"",\\\""C20\\\"",\\\""C21\\\"",\\\""site_id\\\"",\\\""site_domain\\\"",\\\""device_id\\\"",\\\""device_model\\\""]\""}""],""clazz"":[""com.alibaba.alink.pipeline.dataproc.StandardScalerModel"",""com.alibaba.alink.pipeline.feature.FeatureHasher""]}"
0,"0^{""withMean"":""true"",""withStd"":""true""}^^^^^^^^"
0,"1048576^[18142.658640793205,319.0757653788269,56.658323291616455,2024.815516577583,1.0359951799759,194.1037805189026,41664.20274601373,76.47513987569938]^^^^^^^^"
0,"2097152^[3315.613480309124,20.281409212694648,36.369006762565846,412.51266274959846,1.2597981015548454,271.63691583483836,49341.57974574241,41.97488147579897]^^^^^^^^"
0,"1048576^[18142.658640793205,319.0757653788269,56.658323291616455,2024.815516577583,1.0359951799759,194.1037805189026,41664.20274601373,76.47513987569938]^^^^^^^^"
118 changes: 101 additions & 17 deletions src/main/scala/com/xixici/alink/Ftrl/ftrl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import com.alibaba.alink.operator.batch.BatchOperator
import com.alibaba.alink.operator.batch.classification.LogisticRegressionTrainBatchOp
import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp
import com.alibaba.alink.operator.stream.StreamOperator
import com.alibaba.alink.operator.stream.dataproc.SplitStreamOp
import com.alibaba.alink.operator.stream.dataproc.{JsonValueStreamOp, SplitStreamOp}
import com.alibaba.alink.operator.stream.onlinelearning.{FtrlPredictStreamOp, FtrlTrainStreamOp}
import com.alibaba.alink.operator.stream.source.CsvSourceStreamOp
import com.alibaba.alink.operator.stream.source.Kafka011SourceStreamOp
import com.alibaba.alink.pipeline.dataproc.StandardScaler
import com.alibaba.alink.pipeline.feature.FeatureHasher
import com.alibaba.alink.pipeline.{Pipeline, PipelineModel}
Expand Down Expand Up @@ -46,7 +46,33 @@ object ftrl {
"C20",
"C21"
)

val schemaJsonArray: Array[String] =
Array(
"$.id",
"$.click",
"$.dt",
"$.C1",
"$.banner_pos",
"$.site_id",
"$.site_domain",
"$.site_category",
"$.app_id",
"$.app_domain",
"$.app_category",
"$.device_id",
"$.device_ip",
"$.device_model",
"$.device_type",
"$.device_conn_type",
"$.C14",
"$.C15",
"$.C16",
"$.C17",
"$.C18",
"$.C19",
"$.C20",
"$.C21"
)
// prepare batch train data
val batchTrainDataFn =
"data/avazu-small.csv"
Expand All @@ -59,20 +85,78 @@ object ftrl {
val vecColName = "vec"
val numHashFeatures = 30000

// val data0 = new Kafka011SourceStreamOp()
// .setBootstrapServers("127.0.0.1:9092")
// .setTopic("avazu")
// .setStartupMode("EARLIEST")
// .setGroupId("alink")
// data0.print(1,1000)
// StreamOperator.execute()

// prepare stream train data
val wholeDataFile = "data/avazu-ctr-train-8M.csv"
val data = new CsvSourceStreamOp()
.setFilePath(wholeDataFile)
.setSchemaStr(schemaStr)
.setIgnoreFirstLine(true)
// prepare stream train data with kafka
val source: Kafka011SourceStreamOp = new Kafka011SourceStreamOp()
.setBootstrapServers("127.0.0.1:9092")
.setTopic("avazu")
.setStartupMode("GROUP_OFFSETS")
.setGroupId("alink_group")
val data = source
.link(
new JsonValueStreamOp()
.setSelectedCol("message")
.setReservedCols("")
.setOutputCols(
"id",
"click",
"dt",
"C1",
"banner_pos",
"site_id",
"site_domain",
"site_category",
"app_id",
"app_domain",
"app_category",
"device_id",
"device_ip",
"device_model",
"device_type",
"device_conn_type",
"C14",
"C15",
"C16",
"C17",
"C18",
"C19",
"C20",
"C21"
)
.setJsonPath(schemaJsonArray))
.select(
"id, "
+ "click, "
+ "dt, "
+ "C1, "
+ "CAST(banner_pos AS int) AS banner_po, "
+ "site_id, "
+ "site_domain, "
+ "site_category, "
+ "app_id, "
+ "app_domain, "
+ "app_category, "
+ "device_id, "
+ "device_ip, "
+ "device_model, "
+ "device_type, "
+ "device_conn_type, "
+ "CAST(C14 AS int) AS C14, "
+ "CAST(C15 AS int) AS C15, "
+ "CAST(C16 AS int) AS C16, "
+ "CAST(C17 AS int) AS C17, "
+ "CAST(C18 AS int) AS C18, "
+ "CAST(C19 AS int) AS C19, "
+ "CAST(C20 AS int) AS C20, "
+ "CAST(C21 AS int) AS C21"
).as(schemaArray)
data.print(10, 20)

// Type2: prepare stream train data with csv
// val wholeDataFile = "data/avazu-ctr-train-8M.csv"
// val data = new CsvSourceStreamOp()
// .setFilePath(wholeDataFile)
// .setSchemaStr(schemaStr)
// .setIgnoreFirstLine(true)

// split stream to train and eval data
val spliter = new SplitStreamOp().setFraction(0.5).linkFrom(data)
Expand Down
108 changes: 108 additions & 0 deletions src/main/scala/com/xixici/alink/Utils/kafkaConsumerTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.xixici.alink.Utils

import com.alibaba.alink.operator.stream.StreamOperator
import com.alibaba.alink.operator.stream.dataproc.JsonValueStreamOp
import com.alibaba.alink.operator.stream.source.Kafka011SourceStreamOp

/**
* Created by yang.lei01 on 2020/2/24.
*/
object kafkaConsumerTest {
def main(args: Array[String]): Unit = {
val schemaArray: Array[String] =
Array(
"$.id",
"$.click",
"$.dt",
"$.C1",
"$.banner_pos",
"$.site_id",
"$.site_domain",
"$.site_category",
"$.app_id",
"$.app_domain",
"$.app_category",
"$.device_id",
"$.device_ip",
"$.device_model",
"$.device_type",
"$.device_conn_type",
"$.C14",
"$.C15",
"$.C16",
"$.C17",
"$.C18",
"$.C19",
"$.C20",
"$.C21"
)
val source: Kafka011SourceStreamOp = new Kafka011SourceStreamOp()
.setBootstrapServers("127.0.0.1:9092")
.setTopic("avazu")
.setStartupMode("GROUP_OFFSETS")
.setGroupId("alink_group")
val data = source
.link(
new JsonValueStreamOp()
.setSelectedCol("message")
.setReservedCols("")
.setOutputCols(
"id",
"click",
"dt",
"C1",
"banner_pos",
"site_id",
"site_domain",
"site_category",
"app_id",
"app_domain",
"app_category",
"device_id",
"device_ip",
"device_model",
"device_type",
"device_conn_type",
"C14",
"C15",
"C16",
"C17",
"C18",
"C19",
"C20",
"C21"
)
.setJsonPath(schemaArray)
.setSkipFailed(true)
)
.select(
"id, "
+ "click, "
+ "dt, "
+ "C1, "
+ "CAST(banner_pos AS int) AS banner_po, "
+ "site_id, "
+ "site_domain, "
+ "site_category, "
+ "app_id, "
+ "app_domain, "
+ "app_category, "
+ "device_id, "
+ "device_ip, "
+ "device_model, "
+ "device_type, "
+ "device_conn_type, "
+ "CAST(C14 AS int) AS C14, "
+ "CAST(C15 AS int) AS C15, "
+ "CAST(C16 AS int) AS C16, "
+ "CAST(C17 AS int) AS C17, "
+ "CAST(C18 AS int) AS C18, "
+ "CAST(C19 AS int) AS C19, "
+ "CAST(C20 AS int) AS C20, "
+ "CAST(C21 AS int) AS C21"
)
println(data.getSchema)
data.print(10, 20)
StreamOperator.execute
}
}

0 comments on commit e0abb7e

Please sign in to comment.