使用flink的時候難免和redis打交道,相信大家都使用過flink-connector-redis來處理,但是當我想要使用RedisSink寫入集群時,發現居然不支持使用密碼,於是有了這篇筆記。 ...
起因:使用flink的時候難免和redis打交道,相信大家都使用過flink-connector-redis來處理,但是當我想要使用RedisSink寫入集群時,發現居然不支持使用密碼,於是有了這篇筆記。
事情的經過是這樣的,我準備用Flink往Redis寫入數據,我照常引入flink-connector-redis包
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.1.5</version> </dependency>
然後洋洋灑灑寫下如下代碼:
package org.cube.flink import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} import java.net.InetSocketAddress import java.util.HashSet /** * @Author : Lawrence * @Date : 2022/7/24 23:11 * @Description : Flink結果寫入Redis集群 * @Version : 1.0.0 * @Modification_Record: * Version Date Remark * v1.0.0 2022/7/24 First Create */ object RedisClusterSink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // source import org.apache.flink.api.scala._ val source = env.fromElements("1 hadoop","2 spark","3 flink", "4 hive", "5 redis", "6 hbase") // process val tupleValue = source.map(_.split(" ")).map(x => (x(0), x(1))) // redis config val builder = new FlinkJedisPoolConfig.Builder builder.setHost("cube01").setPort(7001).setPassword("123456") val redisConf: FlinkJedisPoolConfig = builder.build() // sink val redisSink = new RedisSink[(String, String)](redisConf, new MyRedisMapper()) tupleValue.addSink(redisSink) env.execute("RedisClusterSink") } } class MyRedisMapper extends RedisMapper[(String, String)] { override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.SET) } override def getKeyFromData(t: (String, String)): String = t._1 override def getValueFromData(t: (String, String)): String = t._2 }
然後興高采烈地點擊了運行,控制台卻給了我一抹中國紅,
其中最後一條是這樣說的:
Caused by: redis.clients.jedis.exceptions.JedisMovedDataException: MOVED 9842 192.168.20.132:7003
哦哦,是因為我的Redis是集群模式,
這並難不倒我,
我只需要把FlinkJedisPoolConfig改成FlinkJedisClusterConfig就萬事大吉了。
// redis config val builder = new FlinkJedisClusterConfig.Builder val inetSocketAddress = new InetSocketAddress("cube01", 7001) val nodeSet = new HashSet[InetSocketAddress]() nodeSet.add(inetSocketAddress) builder.setNodes(nodeSet).setPassword("123456") val redisConf: FlinkJedisClusterConfig = builder.build()
可是,這個類並沒有setPassword方法,事實上它連"password"這個屬性都沒有。
這並難不倒我。
先不設密碼總行了吧?
燃鵝並不行,控制台又給了我一抹中國紅,
他是這樣說的:
Caused by: redis.clients.jedis.exceptions.JedisDataException: NOAUTH Authentication required.
呵呵,這可難不倒我,
我的本能反應是,應該到Maven倉庫中找到新版的flink-connector-redis包。
燃鵝,當我搜索之後發現,這已經是最新版了。
這也難不倒我。
FlinkJedisPoolConfig不是可以設置密碼嗎?
FlinkJedisClusterConfig不是可以訪問集群嗎?
如果我把他們兩個的代碼整合一下呢?是不是就好了。
於是我本能地把"FlinkJedisClusterConfig"改寫成了"MyFlinkJedisClusterConfig"類,增加了password屬性和對應的get,set方法。
package org.cube.flink; /** * @Author : Lawrence * @Date : 2022/7/25 21:14 * @Description : 包含了password的FlinkJedisClusterConfig * @Version : 1.0.0 * @Modification_Record: * Version Date Remark * v1.0.0 2022/7/25 First Create */ import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; import org.apache.flink.util.Preconditions; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Protocol; import java.net.InetSocketAddress; import java.util.HashSet; import java.util.Iterator; import java.util.Set; public class MyFlinkJedisClusterConfig extends FlinkJedisConfigBase { private static final long serialVersionUID = 1L; private final Set<InetSocketAddress> nodes; private final int maxRedirections; private int soTimeout; private String password; private MyFlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int soTimeout, int maxRedirections, int maxTotal, int maxIdle, int minIdle, String password) { super(connectionTimeout, maxTotal, maxIdle, minIdle); Preconditions.checkNotNull(nodes, "Node information should be presented"); Preconditions.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty"); this.nodes = new HashSet(nodes); this.soTimeout = soTimeout; this.maxRedirections = maxRedirections; this.password = password; } public Set<HostAndPort> getNodes() { Set<HostAndPort> ret = new HashSet(); Iterator var2 = this.nodes.iterator(); while(var2.hasNext()) { InetSocketAddress node = (InetSocketAddress)var2.next(); ret.add(new HostAndPort(node.getHostName(), node.getPort())); } return ret; } public int getMaxRedirections() { return this.maxRedirections; } public int getSoTimeout() { return this.soTimeout; } protected String getPassword() { return this.password; } public String toString() { return "JedisClusterConfig{nodes=" + this.nodes + ", timeout=" + this.connectionTimeout + ", maxRedirections=" + this.maxRedirections + ", maxTotal=" + this.maxTotal + ", maxIdle=" + this.maxIdle + ", minIdle=" + this.minIdle + '}'; } public static class Builder { private Set<InetSocketAddress> nodes; private int timeout = Protocol.DEFAULT_TIMEOUT; private int maxRedirections = 5; //新增屬性 private int soTimeout = Protocol.DEFAULT_TIMEOUT; private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL; private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE; private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE; //增加的屬性 private String password; public Builder() { } public MyFlinkJedisClusterConfig.Builder setNodes(Set<InetSocketAddress> nodes) { this.nodes = nodes; return this; } public MyFlinkJedisClusterConfig.Builder setTimeout(int timeout) { this.timeout = timeout; return this; } public MyFlinkJedisClusterConfig.Builder setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; return this; } public MyFlinkJedisClusterConfig.Builder setMaxRedirections(int maxRedirections) { this.maxRedirections = maxRedirections; return this; } public MyFlinkJedisClusterConfig.Builder setMaxTotal(int maxTotal) { this.maxTotal = maxTotal; return this; } public MyFlinkJedisClusterConfig.Builder setMaxIdle(int maxIdle) { this.maxIdle = maxIdle; return this; } public MyFlinkJedisClusterConfig.Builder setMinIdle(int minIdle) { this.minIdle = minIdle; return this; } public MyFlinkJedisClusterConfig.Builder setPassword(String password) { this.password = password; return this; } public MyFlinkJedisClusterConfig build() { return new MyFlinkJedisClusterConfig(this.nodes, this.timeout, this.soTimeout, this.maxRedirections, this.maxTotal, this.maxIdle, this.minIdle, this.password); } } }
燃鵝,中國紅卻提醒我:
Caused by: java.lang.IllegalArgumentException: Jedis configuration not found
原來,Flink任務執行的時候會調用RedisSink中的open()方法:
public void open(Configuration parameters) throws Exception { this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase); }
而這個方法調用的"RedisCommandsContainerBuilder.build"方法,所使用的參數,依然是舊的FlinkJedisClusterConfig類:
public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase)
所以,還得改寫這兩個類:
MyRedisSink:
package org.cube.flink; /** * @Author : Lawrence * @Date : 2022/7/25 23:52 * @Description : * @Version : 1.0.0 * @Modification_Record : * Version Date Remark * v1.0.0 2022/7/25 First Create */ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; public class MyRedisSink<IN> extends RichSinkFunction<IN> { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.streaming.connectors.redis.RedisSink.class); private String additionalKey; private RedisMapper<IN> redisSinkMapper; private RedisCommand redisCommand; private FlinkJedisConfigBase flinkJedisConfigBase; private RedisCommandsContainer redisCommandsContainer; public MyRedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper) { Preconditions.checkNotNull(flinkJedisConfigBase, "Redis connection pool config should not be null"); Preconditions.checkNotNull(redisSinkMapper, "Redis Mapper can not be null"); Preconditions.checkNotNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null"); this.flinkJedisConfigBase = flinkJedisConfigBase; this.redisSinkMapper = redisSinkMapper; RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription(); this.redisCommand = redisCommandDescription.getCommand(); this.additionalKey = redisCommandDescription.getAdditionalKey(); } @Override public void invoke(IN input) throws Exception { String key = this.redisSinkMapper.getKeyFromData(input); String value = this.redisSinkMapper.getValueFromData(input); switch(this.redisCommand) { case RPUSH: this.redisCommandsContainer.rpush(key, value); break; case LPUSH: this.redisCommandsContainer.lpush(key, value); break; case SADD: this.redisCommandsContainer.sadd(key, value); break; case SET: this.redisCommandsContainer.set(key, value); break; case PFADD: this.redisCommandsContainer.pfadd(key, value); break; case PUBLISH: this.redisCommandsContainer.publish(key, value); break; case ZADD: this.redisCommandsContainer.zadd(this.additionalKey, value, key); break; case HSET: this.redisCommandsContainer.hset(this.additionalKey, key, value); break; default: throw new IllegalArgumentException("Cannot process such data type: " + this.redisCommand); } } @Override public void open(Configuration parameters) throws Exception { this.redisCommandsContainer = MyRedisCommandsContainerBuilder.build(this.flinkJedisConfigBase); } @Override public void close() throws IOException { if (this.redisCommandsContainer != null) { this.redisCommandsContainer.close(); } } }
MyRedisCommandsContainerBuilder:
package org.cube.flink; /** * @Author : Lawrence * @Date : 2022/7/25 21:30 * @Description : 包含了password的RedisCommandsContainerBuilder * @Version : 1.0.0 * @Modification_Record : * Version Date Remark * v1.0.0 2022/7/25 First Create */ import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; import org.apache.flink.streaming.connectors.redis.common.container.RedisClusterContainer; import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; import org.apache.flink.streaming.connectors.redis.common.container.RedisContainer; import org.apache.flink.util.Preconditions; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisSentinelPool; public class MyRedisCommandsContainerBuilder { public MyRedisCommandsContainerBuilder() { } public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase) { if (flinkJedisConfigBase instanceof FlinkJedisPoolConfig) { FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig)flinkJedisConfigBase; return build(flinkJedisPoolConfig); } else if (flinkJedisConfigBase instanceof MyFlinkJedisClusterConfig) { MyFlinkJedisClusterConfig flinkJedisClusterConfig = (MyFlinkJedisClusterConfig)flinkJedisConfigBase; return build(flinkJedisClusterConfig); } else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) { FlinkJedisSentinelConfig flinkJedisSentinelConfig = (FlinkJedisSentinelConfig)flinkJedisConfigBase; return build(flinkJedisSentinelConfig); } else { throw new IllegalArgumentException("Jedis configuration not found"); } } public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) { Preconditions.checkNotNull(jedisPoolConfig, "Redis pool config should not be Null"); GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle()); genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal()); genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle()); JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(), jedisPoolConfig.getPort() , jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(), jedisPoolConfig.getDatabase()); return new RedisContainer(jedisPool); } public static RedisCommandsContainer build(MyFlinkJedisClusterConfig jedisClusterConfig) { Preconditions.checkNotNull(jedisClusterConfig, "Redis cluster config should not be Null"); GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); genericObjectPoolConfig.setMaxIdle(jedisClusterConfig.getMaxIdle()); genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal()); genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle()); JedisCluster jedisCluster; if (null == jedisClusterConfig.getPassword()) { jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout(), jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig); } else { jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout() , jedisClusterConfig.getSoTimeout(), jedisClusterConfig.getMaxRedirections() , jedisClusterConfig.getPassword(), genericObjectPoolConfig); } return new RedisClusterContainer(jedisCluster); } public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) { Preconditions.checkNotNull(jedisSentinelConfig, "Redis sentinel config should not be Null"); GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle()); genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal()); genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle()); JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName() , jedisSentinelConfig.getSentinels(), genericObjectPoolConfig, jedisSentinelConfig.getConnectionTimeout() , jedisSentinelConfig.getSoTimeout(), jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase()); return new RedisContainer(jedisSentinelPool); } }
燃鵝,在重寫"MyRedisCommandsContainerBuilder"類時,你會驚奇地發現,jedisCluster 也不支持密碼。
你可千萬別慣性思維去重新jedisCluster ,
因為這回可真的是版本問題了。
所以這依然難不倒我,
只需要把redis.clients包升級到2.9以上版本即可:
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.1.5</version> <exclusions> <exclusion> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </exclusion> </exclusions> </dependency>
好了,到這裡咱們終於大功告成了。
代碼寫完了,但是咱們卻留下一個疑惑,
為什麼這麼簡單的需求卻沒有jar包更新呢?
我只是想把Flink數據寫到帶密碼的Redis集群里,這過分嗎?
這並不過分,那這又是為啥呢?
我想可能是這樣的:
首先,先想一個問題,在流計算中我們往Redis寫的是什麼數據?
通常是一些狀態信息,中間結果。而Flink本身支持狀態、緩存和廣播機制,導致對Redis的需求下降了。
其次,大數據應用實際運行的環境通常是提交到內網的機器上進行的,大家知道大數據集群之間的主機是需要設置免驗證登錄的,單單給Redis設密碼顯得有一點點多餘。
其三,Redis的密碼機制據說是很弱雞的,出於安全考慮,更多地是通過防火牆來限制埠,所以很多Redis集群處於管理方便並沒有設置密碼的。
其四,出於人類懶惰的本性,發現RedisSink不支持密碼後,最省事的方式,或許是放棄使用密碼。
好了該寫的寫了,該想的也想了,差不多可以愉快地結束這一天了。
那麼晚安了,咱們下期再肝。
三天後。。。
哈哈哈,最後發現好像還是重造了輪子,bahir發佈了一個非正式版的jar,這是介紹頁:
https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
或者maven直接引入這個包即可:
<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.1-SNAPSHOT</version> </dependency>
有點小憂傷。
這個包增加了很多類,完善了一些功能,有興趣可以下載下來,比較一下和1.0的不同。