最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Spark Java Structured Streaming filters - Stack Overflow

programmeradmin4浏览0评论

I have the following data below:

+-------+----------+------------+---------+---------------------+-----------+
|id     |resource id|resource name|event-desc|event-date       |ipaddress1  |
+-------+----------+------------+---------+---------------------+-----------+
|2010001|119       |Netopia     |logout   |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119       |Netopia     |login    |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119       |Netopia     |logout   |+56975-05-07 23:01:37|25:34:21:45|
|2010001|119       |Netopia     |login    |+56975-05-07 23:01:37|25:34:21:45|
|2010001|119       |Netopia     |logout   |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119       |Netopia     |logout   |+56975-05-07 23:01:37|25:34:21:46|
+-------+----------+------------+---------+---------------------+-----------+

I need to apply multiple filters to same dataset and pass to Kafka.

Dataset<Row> mainData=df.select( "data.*").filter("data.eventdesc='logout'");

Send the filtered data to kafka, again there will be another filter condition.

mainData=df.select( "data.*").filter("data.eventdesc='login'");

send the data to kafka.

In this case only the last expression is working. Any suggestions on how I can filter data for both of the conditions and send to kafka?

I have the following data below:

+-------+----------+------------+---------+---------------------+-----------+
|id     |resource id|resource name|event-desc|event-date       |ipaddress1  |
+-------+----------+------------+---------+---------------------+-----------+
|2010001|119       |Netopia     |logout   |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119       |Netopia     |login    |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119       |Netopia     |logout   |+56975-05-07 23:01:37|25:34:21:45|
|2010001|119       |Netopia     |login    |+56975-05-07 23:01:37|25:34:21:45|
|2010001|119       |Netopia     |logout   |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119       |Netopia     |logout   |+56975-05-07 23:01:37|25:34:21:46|
+-------+----------+------------+---------+---------------------+-----------+

I need to apply multiple filters to same dataset and pass to Kafka.

Dataset<Row> mainData=df.select( "data.*").filter("data.eventdesc='logout'");

Send the filtered data to kafka, again there will be another filter condition.

mainData=df.select( "data.*").filter("data.eventdesc='login'");

send the data to kafka.

In this case only the last expression is working. Any suggestions on how I can filter data for both of the conditions and send to kafka?

Share Improve this question edited Mar 19 at 4:47 Santosh Shimpi asked Mar 18 at 4:56 Santosh ShimpiSantosh Shimpi 134 bronze badges
Add a comment  | 

2 Answers 2

Reset to default 0

Based on this answer.

Dataset<Row> mainData=df.select( "data.*").filter("data.eventdesc='logout' or data.eventdesc='login'")

the variable (mainData) is reassigned and thus reference to a new object, I'd like to recommend below approach, if you have complex filter logic.

Dataset<Row> data = df.select("data.*")
    .filter(functions.col("data.eventdesc").equalTo("logout")
        .or(functions.col("data.eventdesc").equalTo("login"))); 

and, for passing a dataset to Kafka topics, you would need to import the related jar with proper version selected first, as below ( not for streaming data ). Hope it is helpful for you.

<dependency>
    <groupId>.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>3.5.x</version>
</dependency>
data.selectExpr("value").write()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("topic", "login-logout-events")
        .save();
发布评论

评论列表(0)

  1. 暂无评论