在其文档字符串 elasticsearch.helpers.async_bulk
自己描述为一种
帮助:甲基:
~elasticsearch.AsyncElasticsearch.bulk
api 一个更加人友好的界面--它消耗的一个迭代的行动和 将它们发送以在块。 来源
上下文
我已经被使用 AsyncElasticsearch.bulk()
成功地发送大熊猫数据帧一些ES实例
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
yield ('{ "index" : { "_index" : "%s" }}' % (self.index))
yield (json.dumps(record, default=int))
async def send_to_elasticsearch(self, df: DataFrame):
logger.info(f"{self.stage_name} sending batch to elastic")
await self.elastic_client.bulk(self._rec_to_actions(df))
的问题
然而,当涉及到 async_bulk
我得到 index is missing
错误。
async def send_to_elasticsearch(self, df: DataFrame):
await async_bulk(self.elastic_client, self._rec_to_actions(df))
试着为你 _rec_to_actions()
在几个方面没有多少效果。
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
record["index"] = self.index
yield (json.dumps(record, default=int))
我想主要的问题是,我不是很肯定知道 什么是行动,在上下文中略任. 这个概念是无处不在的文件,但没有一个明确的 数据结构 的对应方在该图书馆 源代码 (没有,我能找到,无论如何)
究竟是什么一个 行动 和应该如何我为你我的发电机发送df的数据 self.index
?
环境
- 蟒蛇="3.9.5"
- 略任="7.14.1"