I have the following data below:
+-------+----------+------------+---------+---------------------+-----------+
|id |resource id|resource name|event-desc|event-date |ipaddress1 |
+-------+----------+------------+---------+---------------------+-----------+
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119 |Netopia |login |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:45|
|2010001|119 |Netopia |login |+56975-05-07 23:01:37|25:34:21:45|
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:46|
+-------+----------+------------+---------+---------------------+-----------+
I need to apply multiple filters to same dataset and pass to Kafka.
Dataset<Row> mainData=df.select( "data.*").filter("data.eventdesc='logout'");
Send the filtered data to kafka, again there will be another filter condition.
mainData=df.select( "data.*").filter("data.eventdesc='login'");
send the data to kafka.
In this case only the last expression is working. Any suggestions on how I can filter data for both of the conditions and send to kafka?
I have the following data below:
+-------+----------+------------+---------+---------------------+-----------+
|id |resource id|resource name|event-desc|event-date |ipaddress1 |
+-------+----------+------------+---------+---------------------+-----------+
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119 |Netopia |login |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:45|
|2010001|119 |Netopia |login |+56975-05-07 23:01:37|25:34:21:45|
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:46|
+-------+----------+------------+---------+---------------------+-----------+
I need to apply multiple filters to same dataset and pass to Kafka.
Dataset<Row> mainData=df.select( "data.*").filter("data.eventdesc='logout'");
Send the filtered data to kafka, again there will be another filter condition.
mainData=df.select( "data.*").filter("data.eventdesc='login'");
send the data to kafka.
In this case only the last expression is working. Any suggestions on how I can filter data for both of the conditions and send to kafka?
Share Improve this question edited Mar 19 at 4:47 Santosh Shimpi asked Mar 18 at 4:56 Santosh ShimpiSantosh Shimpi 134 bronze badges2 Answers
Reset to default 0Based on this answer.
Dataset<Row> mainData=df.select( "data.*").filter("data.eventdesc='logout' or data.eventdesc='login'")
the variable (mainData
) is reassigned and thus reference to a new object, I'd like to recommend below approach, if you have complex filter logic.
Dataset<Row> data = df.select("data.*")
.filter(functions.col("data.eventdesc").equalTo("logout")
.or(functions.col("data.eventdesc").equalTo("login")));
and, for passing a dataset to Kafka topics, you would need to import the related jar with proper version selected first, as below ( not for streaming data ). Hope it is helpful for you.
<dependency>
<groupId>.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.5.x</version>
</dependency>
data.selectExpr("value").write()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "login-logout-events")
.save();