场景:从kafka读数据,通过jdbc写入mysql
示例:
#往kafka测试主题写入数据
kafka-console-producer.sh --broker-list wh01t:21007 --topic ypg_test --producer.config /client/Kafka/kafka/config/producer.properties
–创建mysql测试表
– dsg.test definition
CREATE TABLE test (
id varchar(50) NOT NULL,
c_date date DEFAULT NULL,
PRIMARY KEY (id)
) ;
flink主类:
package com.pinko.testcase import com.security.InitKafkaUtil import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import scala.collection.JavaConverters.seqAsJavaListConverter /* 测试 */ object Test05FromKafkaToMysql { def main(args: Array[String]): Unit = { val prop = InitKafkaUtil.initPros() InitKafkaUtil.securityPrepare // 加载执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //参数消费多个topic val topics: List[String] = List("ypg_test") val kafkaConsumer = new FlinkKafkaConsumer011[String](topics.asJava, new SimpleStringSchema(), prop) // val kafka = env.fromElements("ypghello", "ypgworld") println("flink环境加载完成,开始处理数据...") /* kafka消息处理逻辑 */ val kafka = env.addSource(kafkaConsumer) kafka.print() kafka.addSink(new MysqlSink()) env.execute("Test05FromKafka") } }
package com.pinko.testcase import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import java.sql.{Connection, DriverManager, PreparedStatement} class MysqlSink extends RichSinkFunction[String] { var conn: Connection = _ var ps: PreparedStatement = _ override def open(parameters: Configuration): Unit = { val conn_str = "jdbc:mysql://10.22.33.44:2883/testdb|test|test#123"; val conns = conn_str.split("\\|") val url: String = conns(0) val username: String = conns(1) val password: String = conns(2) conn = DriverManager.getConnection(url, username, password) println(conn) } override def invoke(value: String, context: SinkFunction.Context[_]): Unit = { val sql = s"insert into test values ('$value', now()) on duplicate key update id = values(id),c_date = values(c_date)" println(sql) ps = conn.prepareStatement(sql) val rowsAffected = ps.executeUpdate() if (rowsAffected > 0) { println("更新成功") } else { println("没有进行更新操作") } } override def close(): Unit = { ps.close() conn.close() } }
还没有评论,来说两句吧...