pbootcms网站模板|日韩1区2区|织梦模板||网站源码|日韩1区2区|jquery建站特效-html5模板网

使用 RabbitMQ 源的 Spark 結構化流

Spark Structured Streaming with RabbitMQ source(使用 RabbitMQ 源的 Spark 結構化流)
本文介紹了使用 RabbitMQ 源的 Spark 結構化流的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!

問題描述

限時送ChatGPT賬號..

我正在嘗試為 Structured Streaming 編寫一個自定義接收器,它將使用來自 RabbitMQ 的消息.Spark 最近發布 DataSource V2 API,看起來很有前途.由于它抽象了許多細節,我想使用這個 API 以既簡單又性能好.但是,由于它很新,因此可用的資源并不多.我需要經驗豐富的 Spark 人員的說明,因為他們會更容易掌握關鍵點.我們開始:

I am trying to write a custom receiver for Structured Streaming that will consume messages from RabbitMQ. Spark recently released DataSource V2 API, which seems very promising. Since it abstracts away many details, I want to use this API for the sake of both simplicity and performance. However, since it's quite new, there are not many sources available. I need some clarification from experienced Spark guys, since they will grasp the key points easier. Here we go:

我的起點是博客文章系列,第一部分 這里.它展示了如何在沒有流式傳輸功能的情況下實現數據源.為了制作流媒體源,我稍微改變了它們,因為我需要實現 MicroBatchReadSupport 代替(或補充)DataSourceV2.

My starting point is the blog post series, with the first part here. It shows how to implement a data source, without streaming capability. To make a streaming source, I slightly changed them, since I need to implement MicroBatchReadSupport instead of (or in addition to) DataSourceV2.

為了提高效率,明智的做法是讓多個 spark 執行器同時使用 RabbitMQ,即來自同一個隊列.如果我不感到困惑,輸入的每個分區 - 在 Spark 的術語中 - 對應于隊列中的消費者 - 在 RabbitMQ 術語中.因此,我們需要為輸入流設置多個分區,對吧?

To be efficient, it's wise to have multiple spark executors consuming RabbitMQ concurrently, i.e. from the same queue. If I'm not confused, every partition of the input -in Spark's terminology- corresponds to a consumer from the queue -in RabbitMQ terminology. Thus, we need to have multiple partitions for the input stream, right?

與 該系列的第 4 部分類似,我實現了 MicroBatchReader如下:

Similar with part 4 of the series, I implemented MicroBatchReader as follows:

@Override
public List<DataReaderFactory<Row>> createDataReaderFactories() {
    int partition = options.getInt(RMQ.PARTITICN, 5);
    List<DataReaderFactory<Row>> factories = new LinkedList<>();
    for (int i = 0; i < partition; i++) {
        factories.add(new RMQDataReaderFactory(options));
    }
    return factories;
}

我正在返回一個工廠列表,并希望列表中的每個實例都將用于創建一個讀取器,該讀取器也是一個消費者.這種方法正確嗎?

I am returning a list of factories, and hope that every instance in the list will be used to create a reader, which will be also a consumer. Is that approach correct?

我希望我的接收器是可靠的,即在每條處理過的消息之后(或至少寫入檢查點目錄以進行進一步處理),我需要將其返回給 RabbitMQ.問題從這里開始:這些工廠是在驅動程序中創建的,實際的讀取過程通過 DataReaders.但是,commit 方法是 MicroBatchReader 的一部分,而不是 DataReader.由于每個 MicroBatchReader 有許多 DataReader,我應該如何將這些消息回傳給 RabbitMQ?或者我應該確認 next 方法在 DataReader 上被調用?安全嗎?如果是這樣,那么 commit 函數的目的是什么?

I want my reciever to be reliable, i.e. after every processed message (or at least written to chekpoint directory for further processing), I need to ack it back to RabbitMQ. The problem starts after here: these factories are created at the driver, and the actual reading process takes place at executors through DataReaders. However, the commit method is a part of MicroBatchReader, not DataReader. Since I have many DataReaders per MicroBatchReader, how should I ack these messages back to RabbitMQ? Or should I ack when the next method is called on DataReader? Is it safe? If so, what is the purpose of commit function then?

澄清: OBFUSCATION:答案中提供的有關重命名某些類/函數的鏈接(除了那里的解釋)讓一切更加清晰 比以往任何時候都更糟.引用 那里:

CLARIFICATION: OBFUSCATION: The link provided in the answer about the renaming of some classes/functions (in addition to the explanations there) made everything much more clear worse than ever. Quoting from there:

重命名:

  • DataReaderFactoryInputPartition

DataReaderInputPartitionReader

...

InputPartition 的目的是管理關聯的閱讀器,現在稱為 InputPartitionReader,帶有顯式創建操作以鏡像關閉操作.這是沒有從 API 中清除的時間更長,因為 DataReaderFactory 似乎更多比它更通用,并且不清楚為什么要生產一組它們閱讀.

InputPartition's purpose is to manage the lifecycle of the associated reader, which is now called InputPartitionReader, with an explicit create operation to mirror the close operation. This was no longer clear from the API because DataReaderFactory appeared to be more generic than it is and it isn't clear why a set of them is produced for a read.

但是,docs 明確表示讀取器工廠將被序列化并發送到執行器,然后將在執行器上創建數據讀取器并進行實際讀取."

However, the docs clearly say that "the reader factory will be serialized and sent to executors, then the data reader will be created on executors and do the actual reading."

為了使消費者可靠,我必須僅在特定消息在 Spark 端提交后才對它進行 ACK.請注意消息必須在傳遞消息的同一連接上進行確認,但在驅動程序節點調用提交函數.如何在 worker/executor 節點上提交?

To make the consumer reliable, I have to ACK for a particular message only after it is committed at Spark side. Note that the messages have to be ACKed on the same connection that it has been delivered through, but commit function is called at driver node. How can I commit at the worker/executor node?

推薦答案

> 我正在返回一個工廠列表,并希望列表中的每個實例都用于創建一個讀取器,它也是一個消費者.這種方法正確嗎?源 [socket][1] 源實現有一個線程將消息推送到內部 ListBuffer.換句話說,有一個消費者(線程)填充了內部 ListBuffer,它**然后**被`planInputPartitions`劃分為多個分區(`createDataReaderFactories` [renamed][2] 到 `planInputPartitions`).此外,根據 [MicroBatchReadSupport][3] 的 Javadoc> 執行引擎將在流式查詢開始時創建一個微批處理讀取器,為每個要處理的批處理交替調用 setOffsetRange 和 createDataReaderFactories,然后在執行完成時調用 stop().請注意,由于重新啟動或故障恢復,單個查詢可能會執行多次.換句話說,`createDataReaderFactories` 應該被調用 **multiple** 次,據我了解,這表明每個 `DataReader` 負責一個靜態輸入分區,這意味著 DataReader 不應該是消費者.----------> 但是,commit 方法是 MicroBatchReader 的一部分,而不是 DataReader ... 如果是這樣,那么 commit 函數的目的是什么?提交函數的部分基本原理可能是防止 MicroBatchReader 的內部緩沖區變大.通過提交偏移量,您可以有效地從緩沖區中刪除小于偏移量的元素,因為您承諾不再處理它們.您可以使用 `batches.trimStart(offsetDiff)` 在套接字源代碼中看到這種情況<小時><刪除>我不確定是否要實現一個可靠的接收器,所以我希望一個更有經驗的 Spark 人能過來解決你的問題,因為我也有興趣!希望這可以幫助!

編輯

我只研究了 socket 和 wiki-edit 來源.這些資源還沒有準備好生產,這是問題所在.相反, kafka 源是更好的起點,與前面提到的源不同,它有多個像作者一樣的消費者正在尋找.

I had only studied the socket, and wiki-edit sources. These sources are not production ready, which is something that the question was was not looking for. Instead, the kafka source is the better starting point which has, unlike the aforementioned sources, multiple consumers like the author was looking for.

但是,也許如果您正在尋找不可靠的來源,上面的套接字和 wikiedit 來源提供了一個不太復雜的解決方案.

However, perhaps if you're looking for unreliable sources, the socket and wikiedit sources above provide a less complicated solution.

這篇關于使用 RabbitMQ 源的 Spark 結構化流的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!

【網站聲明】本站部分內容來源于互聯網,旨在幫助大家更快的解決問題,如果有圖片或者內容侵犯了您的權益,請聯系我們刪除處理,感謝您的支持!

相關文檔推薦

Parsing an ISO 8601 string local date-time as if in UTC(解析 ISO 8601 字符串本地日期時間,就像在 UTC 中一樣)
How to convert Gregorian string to Gregorian Calendar?(如何將公歷字符串轉換為公歷?)
Java: What/where are the maximum and minimum values of a GregorianCalendar?(Java:GregorianCalendar 的最大值和最小值是什么/在哪里?)
Calendar to Date conversion for dates before 15 Oct 1582. Gregorian to Julian calendar switch(1582 年 10 月 15 日之前日期的日歷到日期轉換.公歷到儒略歷切換)
java Calendar setFirstDayOfWeek not working(java日歷setFirstDayOfWeek不起作用)
Java: getting current Day of the Week value(Java:獲取當前星期幾的值)
主站蜘蛛池模板: pbootcms网站模板|织梦模板|网站源码|jquery建站特效-html5模板网 | 磁棒电感生产厂家-电感器厂家-电感定制-贴片功率电感供应商-棒形电感生产厂家-苏州谷景电子有限公司 | 哈尔滨京科脑康神经内科医院-哈尔滨治疗头痛医院-哈尔滨治疗癫痫康复医院 | PAS糖原染色-CBA流式多因子-明胶酶谱MMP-上海研谨生物科技有限公司 | 板框压滤机-隔膜压滤机配件生产厂家-陕西华星佳洋装备制造有限公司 | 骨密度检测仪_骨密度分析仪_骨密度仪_动脉硬化检测仪专业生产厂家【品源医疗】 | 电销卡_稳定企业大语音卡-归属地可选-世纪通信 | AGV叉车|无人叉车|AGV智能叉车|AGV搬运车-江西丹巴赫机器人股份有限公司 | 温州食堂承包 - 温州市尚膳餐饮管理有限公司 | 火锅加盟_四川成都火锅店加盟_中国火锅连锁品牌十强_朝天门火锅【官网】 | 矿用履带式平板车|探水钻机|气动架柱式钻机|架柱式液压回转钻机|履带式钻机-启睿探水钻机厂家 | 切铝机-数控切割机-型材切割机-铝型材切割机-【昆山邓氏精密机械有限公司】 | 磁力抛光机_磁力研磨机_磁力去毛刺机-冠古设备厂家|维修|租赁【官网】 | 镀锌角钢_槽钢_扁钢_圆钢_方矩管厂家_镀锌花纹板-海邦钢铁(天津)有限公司 | 翅片管换热器「型号全」_厂家-淄博鑫科环保 | 琉璃瓦-琉璃瓦厂家-安徽盛阳新型建材科技有限公司 | 选矿设备,选矿生产线,选矿工艺,选矿技术-昆明昆重矿山机械 | 阻燃剂-氢氧化镁-氢氧化铝-沥青阻燃剂-合肥皖燃新材料 | 医学动画公司-制作3d医学动画视频-医疗医学演示动画制作-医学三维动画制作公司 | 热缩管切管机-超声波切带机-织带切带机-无纺布切布机-深圳市宸兴业科技有限公司 | 昆明化妆培训-纹绣美甲-美容美牙培训-昆明博澜培训学校 | 洗石机-移动滚筒式,振动,螺旋,洗矿机-青州冠诚重工机械有限公司 | 耐火浇注料价格-高强高铝-刚玉碳化硅耐磨浇注料厂家【直销】 | 重庆私家花园设计-别墅花园-庭院-景观设计-重庆彩木园林建设有限公司 | 玉米深加工机械,玉米加工设备,玉米加工机械等玉米深加工设备制造商-河南成立粮油机械有限公司 | 智能楼宇-楼宇自控系统-楼宇智能化-楼宇自动化-三水智能化 | 范秘书_懂你的范文小秘书| 高铝矾土熟料_细粉_骨料_消失模_铸造用铝矾土_铝酸钙粉—嵩峰厂家 | 软文世界-软文推广-软文营销-新闻稿发布-一站式软文自助发稿平台 | 档案密集架,移动密集架,手摇式密集架,吉林档案密集架-厂家直销★价格公道★质量保证 | 电梯装饰-北京万达中意电梯装饰有限公司 | 托盘租赁_塑料托盘租赁_托盘出租_栈板出租_青岛托盘租赁-优胜必达 | 泰安办公家具-泰安派格办公用品有限公司 | 济南玻璃安装_济南玻璃门_济南感应门_济南玻璃隔断_济南玻璃门维修_济南镜片安装_济南肯德基门_济南高隔间-济南凯轩鹏宇玻璃有限公司 | 山东螺杆空压机,烟台空压机,烟台开山空压机-烟台开山机电设备有限公司 | 中国产业发展研究网 - 提供行业研究报告 可行性研究报告 投资咨询 市场调研服务 | 浙江红酒库-冰雕库-气调库-茶叶库安装-医药疫苗冷库-食品物流恒温恒湿车间-杭州领顺实业有限公司 | 浙江筋膜枪-按摩仪厂家-制造商-肩颈按摩仪哪家好-温州市合喜电子科技有限公司 | 企典软件一站式企业管理平台,可私有、本地化部署!在线CRM客户关系管理系统|移动办公OA管理系统|HR人事管理系统|人力 | 济南轻型钢结构/济南铁艺护栏/济南铁艺大门-济南燕翔铁艺制品有限公司 | 郑州外墙清洗_郑州玻璃幕墙清洗_郑州开荒保洁-河南三恒清洗服务有限公司 |