最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

解决sparkstreaming读取kafka中的json数据,消费后保存到MySQL中,报_corrupt_record和name错误的!!

运维笔记admin19浏览0评论

所用软件版本:

spark2.3.0

IDEA2019.1

kafka_2.11-01.0.2.2

spark-streaming-kafka-0-10_2.11-2.3.0

先贴出代码:

package com.bd.spark

import java.util.Properties

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafkamon.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.dstream.InputDStream



object kafkaSparkStreaming_10version {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("kafka-spark-demo").setMaster("local[4]")
    val ssc =
发布评论

评论列表(0)

  1. 暂无评论