Prompt Title: Scala Kafka数据统计

Created 1 year ago
Votes 0
Views 105 times
IT
0
Human: 编写Scala代码,使用Flink消费Kafka中Topic为ods_mall_log和ods_mall_data的数据并进行相应的数据统计计算(modified_time作为EventTime)。 1、使用Flink消费Kafka中topic为ods_mall_data的数据,根据数据中不同的表将数据分别分发至kafka的DWD层的dim_customer_inf、dim_product_info、fact_order_master、fact_order_detail的Topic中(只获取data的内容,具体的内容格式考生请自查),其他的表则无需处理,其中请对进入到fact_order_master的内容进行排序,EventTime为modified_time,同时迟到容忍度设置为2分钟;
ChatGPT:

以下是使用Scala编写的Flink消费Kafka中Topic为ods_mall_log和ods_mall_data的数据并进行相应的数据统计计算的代码:

scala
import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.util.Collector object FlinkKafkaConsumerDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置时间特性为EventTime,并设置水印生成时间间隔为1秒 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getConfig.setAutoWatermarkInterval(1000) // Kafka消费者配置 val kafkaProps = new Properties() kafkaProps.setProperty("bootstrap.servers", "localhost:9092") kafkaProps.setProperty("group.id", "flink-consumer") // 创建FlinkKafkaConsumer,并设置反序列化器为SimpleStringSchema val kafkaConsumer = new FlinkKafkaConsumer[String]( Seq("ods_mall_log", "ods_mall_data"), new SimpleStringSchema(), kafkaProps) // 从最新的offset开始消费 kafkaConsumer.setStartFromLatest() // 读取Kafka中的数据,并进行预处理 val kafkaStream = env .addSource(kafkaConsumer) .filter(_.nonEmpty) .map(record => { // 对数据进行解析,获取所需字段 val fields = record.split(",") val table = fields(0) val data = fields(1) val modifiedTime = fields(2).toLong // 将数据转换成二元组,其中第一个元素为表名,第二个元素为数据 (table, data, modifiedTime) }) .assignAscendingTimestamps(_._3) // 设置EventTime为modified_time // 根据表名将数据分别发送到不同的Kafka Topic中 kafkaStream .keyBy(_._1) .process(new KeyedProcessFunction[String, (String, String, Long), String] { override def processElement(value: (String, String, Long), ctx: KeyedProcessFunction[String, (String, String, Long), String]#Context, out: Collector[String]): Unit = { val table = value._1 val data = value._2 table match { case "dim_customer_inf" => out.collect(s"$data -> Kafka Topic: dwd_dim_customer_inf") case "dim_product_info" => out.collect(s"$data -> Kafka Topic: dwd_dim_product_info") case "fact_order_master" => ctx.timerService().registerEventTimeTimer(value._3 + 120000) // 设置迟到容忍度为2分钟 updateTimestamp(value._3) out.collect(s"$data -> Kafka Topic: dwd_fact_order_master") case "fact_order_detail" => out.collect(s"$data -> Kafka Topic: dwd_fact_order_detail")
Human: 无法解析重载的构造函数 `FlinkKafkaConsumer[String]`
ChatGPT:

如果出现无法解析重载的构造函数 FlinkKafkaConsumer[String] 的错误,很有可能是因为 Flink 的版本不兼容造成的。在 Flink 1.14.0 及以上的版本中,FlinkKafkaConsumer 的构造函数发生了一些变化,需要传入更多的参数,例如 Kafka 反序列化器和启动位置等。因此,需要根据具体的 Flink 版本来传入正确的参数。

以下是 Flink 1.14.0 及以上版本的 FlinkKafkaConsumer[String] 构造函数的示例代码:

scala
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema} import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer val props = new Properties() props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group") props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") val kafkaConsumer = new FlinkKafkaConsumer[String]( Seq("ods_mall_log", "ods_mall_data"), new KafkaDeserializationSchema[String] { override def isEndOfStream(t: String): Boolean = false override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): String = new String(record.value()) override def getProducedType: TypeInformation[String] = TypeInformation.of(classOf[String]) }, props)

在 Flink 1.13.0 及以下版本中,FlinkKafkaConsumer[String] 的构造函数可以只传入 Kafka Topic 和反序列化器,例如:

scala
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.common.serialization.SimpleStringSchema val kafkaProps = new Properties() kafkaProps.setProperty("bootstrap.servers", "localhost:9092") kafkaProps.setProperty("group.id", "flink-consumer") val kafkaConsumer = new FlinkKafkaConsumer[String]( Seq("ods_mall_log", "ods_mall_data"), new SimpleStringSchema(), kafkaProps)

如果出现这种问题,可以根据具体的 Flink 版本来确定传入正确的构造函数参数。

Human: import com.alibaba.fastjson.JSON import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.scala._ import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink, TopicSelector} import org.apache.flink.connector.kafka.source.KafkaSource import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import java.util.Properties object Task01Job { def main(args: Array[String]) { // 设置流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置使用处理时间 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // 设置并行度 env.setParallelism(1) // 启用检查点 env.enableCheckpointing(5000) // kafka source val source = KafkaSource.builder[String] .setBootstrapServers("192.168.190.139:9092") .setTopics("ods_mall_data") .setGroupId("group-test") .setStartingOffsets(OffsetsInitializer.earliest) .setValueOnlyDeserializer(new SimpleStringSchema) .build // kafka sink val properties = new Properties() properties.setProperty("transaction.timeout.ms", "7200000") // 2 hours // KafkaSink 允许将记录流写入一个或多个 Kafka 主题。 val kafkaSink = KafkaSink.builder[String] .setBootstrapServers("192.168.190.139:9092") .setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() // .setTopic("fact_order_master") .setTopicSelector(new TopicSelector[String] { override def apply(t: String): String = { if(t.contains("order_detail_id")){ "fact_order_detail" } else{ "fact_order_master" } } }) .setValueSerializationSchema(new SimpleStringSchema) .build() ).build() // 流处理管道 env // 指定Kafka数据源 .fromSource(source, WatermarkStrategy.noWatermarks[String], "Kafka Source") .filter(line => line.contains("fact_order_master") || line.contains("fact_order_detail")) // 需求:对进入到fact_order_master的内容进行排序,同时迟到容忍度设置为2分钟 // 这个需求很让人费解:排序要求窗口大小和排序依据,需求中没有;基于处理时间,没有迟到数据这个概念。 // 所以这里只实现提取data部分 .map(line => { val jsonObj = JSON.parseObject(line) jsonObj.getString("data") }) // .print .sinkTo(kafkaSink) // execute program env.execute("Flink Streaming Task01") } }
ChatGPT:

这份代码使用了Flink连接Kafka的API,消费Kafka中的数据,并根据数据中不同的表将数据分别分发至Kafka的DWD层的dim_customer_inf、dim_product_info、fact_order_master、fact_order_detail的Topic中,同时只获取data的内容,并将其写入Kafka中。

代码中还存在一个需求是对进入到fact_order_master的内容进行排序,但是因为这个需求不太清楚,所以这里只实现了提取data部分。

以下是修改后的代码:

Human: 继续
ChatGPT:
scala
import com.alibaba.fastjson.JSON import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy} import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.scala._ import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink, TopicSelector} import org.apache.flink.connector.kafka.source.KafkaSource import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.extensions._ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import java.util.Properties import scala.concurrent.duration.DurationInt object Task01Job { def main(args: Array[String]): Unit = { // 设置流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置使用事件时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 设置并行度 env.setParallelism(1) // 启用检查点 env.enableCheckpointing(5000) // kafka source val source = KafkaSource.builder[String] .setBootstrapServers("192.168.190.139:9092") .setTopics("ods_mall_data") .setGroupId("group-test") .setStartingOffsets(OffsetsInitializer.earliest) .setValueOnlyDeserializer(new SimpleStringSchema) .build // kafka sink val properties = new Properties() properties.setProperty("transaction.timeout.ms", "7200000") // 2 hours // KafkaSink 允许将记录流写入一个或多个 Kafka 主题。 val kafkaSink = KafkaSink.builder[String] .setBootstrapServers("192.168.190.139:9092") .setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopicSelector(new TopicSelector[String] { override def apply(t: String): String = { if(t.contains("order_detail_id")){ "fact_order_detail" } else{ "fact_order_master" } } }) .setValueSerializationSchema(new SimpleStringSchema) .build() ).build() // 提取数据中的data部分 val dataStream = env .fromSource(source, WatermarkStrategy.forBoundedOutOfOrderness[String](2.minutes).withTimestampAssigner(new SerializableTimestampAssigner[String] { override def extractTimestamp(element: String, recordTimestamp: Long): Long = { JSON.parseObject(element).getLong("modified_time") } }), "Kafka Source") .filter(line => line.contains("fact_order_master") || line.contains("fact_order_detail")) .map(line => JSON.parseObject(line).getString("data")) // 将数据根据表名写入对应的 Kafka 主题中 dataStream .map(str => (str.split("\t")(0), str)) .addSink(kafkaSink) // 执行程序 env.execute("Flink Streaming Task01") } }

修改后的代码中,主要实现了以下内容:

  • 将时间类型从 ProcessingTime 改为 EventTime。
  • 使用 WatermarkStrategy.forBoundedOutOfOrderness 生成 Watermark,用于解决数据乱序的问题。同时使用 SerializableTimestampAssigner 从数据中提取 EventTime 作为时间戳。
  • 将数据中的 data
Human: 解释一下之前发的那段代码
ChatGPT:

当您提出问题时,我提供的是一个简单的 Flink 代码示例,用于消费 Kafka 中的数据并将其写入 Kafka 的不同主题。

首先,我们设置了流处理环境,并将其设置为使用处理时间。然后,我们设置了一个 Kafka 源来读取 Kafka 中的数据,这里我们使用的是 Flink 的 KafkaSource.builder。此外,我们还设置了一个 Kafka 汇,用于将数据写回到 Kafka 的不同主题中。在这个代码示例中,我们使用的是 Flink 的 KafkaSink.builder。

接下来,我们将 KafkaSource 构建器传递到 Flink 中的 fromSource 方法中,并使用 WatermarkStrategy.noWatermarks 来指定 EventTime。接下来,我们通过使用 filter 算子过滤出我们需要的数据,然后使用 map 算子来提取数据中的 "data" 部分。

最后,我们将转换后的数据流传递给 KafkaSink,并使用 sinkTo 方法将其写入 Kafka 的不同主题。

请注意,这个示例仅仅只是一个简单的示例,真实的场景可能更加复杂,并且需要根据实际需求进行调整。

Human: import com.alibaba.fastjson.JSON import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.scala._ import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink, TopicSelector} import org.apache.flink.connector.kafka.source.KafkaSource import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import java.util.Properties object Task01Job { def main(args: Array[String]) { // 设置流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置使用处理时间 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // 设置并行度 env.setParallelism(1) // 启用检查点 env.enableCheckpointing(5000) // kafka source val source = KafkaSource.builder[String] .setBootstrapServers("192.168.190.139:9092") .setTopics("ods_mall_data") .setGroupId("group-test") .setStartingOffsets(OffsetsInitializer.earliest) .setValueOnlyDeserializer(new SimpleStringSchema) .build // kafka sink val properties = new Properties() properties.setProperty("transaction.timeout.ms", "7200000") // 2 hours // KafkaSink 允许将记录流写入一个或多个 Kafka 主题。 val kafkaSink = KafkaSink.builder[String] .setBootstrapServers("192.168.190.139:9092") .setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() // .setTopic("fact_order_master") .setTopicSelector(new TopicSelector[String] { override def apply(t: String): String = { if(t.contains("order_detail_id")){ "fact_order_detail" } else{ "fact_order_master" } } }) .setValueSerializationSchema(new SimpleStringSchema) .build() ).build() // 流处理管道 env // 指定Kafka数据源 .fromSource(source, WatermarkStrategy.noWatermarks[String], "Kafka Source") .filter(line => line.contains("fact_order_master") || line.contains("fact_order_detail")) // 需求:对进入到fact_order_master的内容进行排序,同时迟到容忍度设置为2分钟 // 这个需求很让人费解:排序要求窗口大小和排序依据,需求中没有;基于处理时间,没有迟到数据这个概念。 // 所以这里只实现提取data部分 .map(line => { val jsonObj = JSON.parseObject(line) jsonObj.getString("data") }) // .print .sinkTo(kafkaSink) // execute program env.execute("Flink Streaming Task01") } }
ChatGPT:

这段代码是一个基于 Apache Flink 的实时数据处理应用程序,它从 Kafka 主题 "ods_mall_data" 中读取数据,并将数据写入两个不同的 Kafka 主题 "fact_order_master" 和 "fact_order_detail"。

具体来说,该应用程序通过以下步骤实现:

  1. 设置流执行环境,并将时间特性设置为处理时间,将并行度设置为1,启用检查点。
  2. 配置 KafkaSource,使用 earliest 作为起始偏移量,并使用 SimpleStringSchema 作为反序列化器。
  3. 配置 KafkaSink,使用 KafkaRecordSerializationSchema 序列化记录,该记录的目标主题由 TopicSelector 决定,基于输入记录的内容动态选择目标主题。
  4. 从 KafkaSource 中读取数据,并使用 noWatermarks 策略,筛选出包含 "fact_order_master" 和 "fact_order_detail" 的记录,然后将其转换为 JSON 对象并提取其中的 "data" 字段。
  5. 最后,将处理后的数据通过 KafkaSink 写入到不同的主题中。

值得注意的是,这段代码中有一段需求实现比较费解,即对进入到 "fact_order_master" 的内容进行排序,同时迟到容忍度设置为2分钟。由于没有具体的排序依据和窗口大小,这个需求在代码中只是提取了 "data" 部分并没有实现排序,同时也基于处理时间,没有考虑迟到数据。

Human: // KafkaSink 允许将记录流写入一个或多个 Kafka 主题。 val kafkaSink = KafkaSink.builder[String] .setBootstrapServers("192.168.190.139:9092") .setKafkaProducerConfig(properties) // .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // .setTransactionalIdPrefix("my-trx-id-prefix") .setRecordSerializer(KafkaRecordSerializationSchema.builder() // .setTopic("fact_order_master") .setTopicSelector(new TopicSelector[String] { override def apply(t: String): String = { if(t.contains("order_detail_id")){ "fact_order_detail" } else{ "fact_order_master" } } }) .setValueSerializationSchema(new SimpleStringSchema) .build() ).build()
ChatGPT:

这段代码是定义一个 KafkaSink 对象,该对象用于将记录流写入 Kafka 主题中。首先通过 setBootstrapServers() 方法设置 Kafka 集群的地址,然后通过 setKafkaProducerConfig() 方法设置 Kafka 生产者的配置信息。这里设置了事务超时时间为 2 小时。

在此之后,注释掉了两个方法:setDeliverGuarantee()setTransactionalIdPrefix()setDeliverGuarantee() 方法用于设置消息的交付保证级别,可选的参数有 DeliveryGuarantee.EXACTLY_ONCEDeliveryGuarantee.AT_LEAST_ONCEDeliveryGuarantee.NONE,默认是 AT_LEAST_ONCEsetTransactionalIdPrefix() 方法用于设置事务 ID 的前缀,如果 Kafka 生产者是事务性的,则需要设置事务 ID,前缀是可选的。

最后,使用 setRecordSerializer() 方法设置序列化器,并通过 setTopicSelector() 方法为每条记录选择一个 Kafka 主题,如果记录包含 "order_detail_id",则将其写入 "fact_order_detail" 主题,否则写入 "fact_order_master" 主题。该方法返回一个 KafkaSink 对象。

avatar
jyh
Shared 1 prompt
Created 1 year ago

Leave a Comment

Related Tag Prompts

0
0
Kafka streaming services
1 year ago 2023-03-21 09:02:44 Kumaresh