关于内存资源消耗的疑问
#1561
Replies: 2 comments 1 reply
-
我和你的情况差不多 也遇到了相同的问题 但是我们数据量稍微大点500w 方便加下vx:jccccccc 交流沟通下吗 |
Beta Was this translation helpful? Give feedback.
1 reply
-
内存大正常 你的dag显示有很多步 每步 都要 读写 rocksdb 的 、hash 和 join 等都浪费内存,提示:这种不是cdc的问题 应该去flink社区提问 |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
现在我们公司内部使用flink sql实现将小规模的mysql数据实时同步到elasticsearch中,目前没有使用直接flink-cdc方案,而是使用debezium + kafka + flink sql的方式,后面大概率还会有不少实时同步的任务需求。但目前遇到了一些问题,主要是资源消耗的问题与方案的扩展性稳定性,制约着我们进一步大规模使用fink sql方式实现实时同步的任务。
版本说明:
jobmanage + 3taskmanage standalone集群
flink版本:1.11
目前我们运行的任务有10个,并行度设置都是1,mysql表的数据量都不大(最大的表20万左右),但运行时消耗的资源却很大(公司资源有限,领导希望能降低内存使用的大小),其中最复杂的一个任务关联的表有11张,内存使用也最大。请问有什么方法能大幅度降低flink job的运行内存?
flink job任务:


tm内存占用:
最复杂的flink sql code:
streamTableEnvironment.executeSql("insert into sink_article\n" + " select t.id,t.title,t.articleNo,t.authorName,t.publicNo,t.puretext as content,t.introduction,e.
keyword,LOWER(e.keyword) as keywordlk,c.flags,d.cates,c.flagNos,d.cateNos,t.deleteFlag,t.putWay," + "t.readTimes,if(f.likes is null ,0,cast(f.likes as integer)) AS likes,if(j.collections is null ,0,cast(j.collections as integer)) AS collections,if(g.comments is null ,0,cast(g.comments as integer)) AS comments,if(h.forwards is null ,0,cast(h.forwards as integer)) AS forwards," + " (t.registerDate/1000) as registerDate,t.isOriginal, i.articleNo as exportNo\n" + " from source_article t\n" + " left join ( " + " select t1.articleno,count(1) as likes \n" + " from source_wolf_like t1\n" + " where t1.businesstype = 'C' " + " and t1.articleno is not null and t1.articleno <> ''\n" + " group by t1.articleno " + ") f on t.articleNo = f.articleno " + " left join ( " + " select t1.articleno,count(1) as collections \n" + " from source_wolf_collection t1\n" + " where t1.businesstype = 'C' " + " and t1.articleno is not null and t1.articleno <> ''\n" + " group by t1.articleno " + ") j on t.articleNo = j.articleno " + " left join ( " + " select t2.articleno,count(1) as comments \n" + " from source_wolf_discuss t2\n" + " where t2.discusstype = 'C' " + " and t2.enabled = 0 " + " and t2.articleno is not null and t2.articleno <> ''\n" + " group by t2.articleno " + ") g on t.articleNo = g.articleno " + " left join ( " + " select t3.articleno,count(1) as forwards \n" + " from source_wolf_forward t3 \n" + " where t3.businesstype = 'C' " + " and t3.articleno is not null and t3.articleno <> ''\n" + " group by t3.articleno " + ") h on t.articleNo = h.articleno " + " LEFT JOIN (\n" + " SELECT\n" + " y.articleNo,\n" + " concat_agg( x.flagName) AS flags,\n" + " concat_agg( x.flagNo) AS flagNos\n" + " FROM\n" + " source_t_flag x\n" + " INNER JOIN source_t_article_flag y ON x.flagNo = y.flagNo\n" + " GROUP BY\n" + " y.articleNo\n" + ") c ON t.articleNo = c.articleNo\n" + "LEFT JOIN (\n" + " SELECT\n" + " n.articleNo,\n" + " concat_agg( m.categoryName) AS cates,\n" + " concat_agg( m.categoryNo) AS cateNos\n" + " FROM\n" + " source_t_category m\n" + " INNER JOIN source_t_article_category n ON m.categoryNo = n.categoryNo\n" + " GROUP BY\n" + " n.articleNo\n" + ") d ON t.articleNo = d.articleNo\n" + " LEFT JOIN (\n" + " SELECT\n" + " z.articleno,\n" + " concat_agg( z.
keyword) AS
keyword\n" + " FROM\n" + " source_t_article_keyword z\n" + " GROUP BY\n" + " z.articleno\n" + ") e ON t.articleNo = e.articleno\n" + " left join ( " + " select articleNo\n" + " from source_wolf_article_export \n" + " group by articleNo " + ") i on t.articleNo = i.articleNo " + " WHERE\n" + " 1 = 1\n" + " AND t.deleteFlag = false\n" );
fink sql 拓扑图:

fink sql算子情况:

请问是我使用flink sql的姿势不对,还是任务本身就是要消耗这么多
Beta Was this translation helpful? Give feedback.
All reactions