问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

请问如何使用使用python实现并行处理

发布网友 发布时间:2022-04-13 00:01

我来回答

2个回答

懂视网 时间:2022-04-13 04:22

本篇文章给大家带来的内容是关于如何实现python3实现并发访问水平切分表,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

场景说明

假设有一个mysql表被水平切分,分散到多个host中,每个host拥有n个切分表。
如果需要并发去访问这些表,快速得到查询结果, 应该怎么做呢?
这里提供一种方案,利用python3的asyncio异步io库及aiomysql异步库去实现这个需求。

代码演示

import logging
import random
import asynciofrom aiomysql 
import create_pool
# 假设mysql表分散在8个host, 每个host有16张子表
TBLES = { "192.168.1.01": "table_000-015", 
# 000-015表示该ip下的表明从table_000一直连续到table_015
 "192.168.1.02": "table_016-031", 
 "192.168.1.03": "table_032-047", 
 "192.168.1.04": "table_048-063", 
  "192.168.1.05": "table_064-079", 
  "192.168.1.06": "table_080-095", 
  "192.168.1.07": "table_096-0111", 
  "192.168.1.08": "table_112-0127",
}
USER = "xxx"PASSWD = "xxxx"# wrapper函数,用于捕捉异常def query_wrapper(func):
 async def wrapper(*args, **kwargs):
 try:
  await func(*args, **kwargs) except Exception as e:
  print(e) return wrapper
  # 实际的sql访问处理函数,通过aiomysql实现异步非阻塞请求@
  query_wrapperasync def query_do_something(ip, db, table):
 async with create_pool(host=ip, db=db, user=USER, password=PASSWD) as pool:
 async with pool.get() as conn:
  async with conn.cursor() as cur:
  sql = ("select xxx from {} where xxxx")
  await cur.execute(sql.format(table))
  res = await cur.fetchall() 
 # then do something...# 生成sql访问队列, 队列的每个元素包含要对某个表进行访问的函数及参数def gen_tasks():
 tasks = [] for ip, tbls in TBLES.items():
 cols = re.split('_|-', tbls)
 tblpre = "_".join(cols[:-2])
 min_num = int(cols[-2])
 max_num = int(cols[-1]) 
  for num in range(min_num, max_num+1):
  tasks.append(
  (query_do_something, ip, 'your_dbname', '{}_{}'.format(tblpre, num))
  )

 random.shuffle(tasks) 
 return tasks# 按批量运行sql访问请求队列def run_tasks(tasks, batch_len):
 try: 
 for idx in range(0, len(tasks), batch_len):
  batch_tasks = tasks[idx:idx+batch_len]
  logging.info("current batch, start_idx:%s len:%s" % (idx, len(batch_tasks))) 
   for i in range(0, len(batch_tasks)):
  l = batch_tasks[i]
  batch_tasks[i] = asyncio.ensure_future(
   l[0](*l[1:])
  )
  loop.run_until_complete(asyncio.gather(*batch_tasks)) 
  except Exception as e:
 logging.warn(e)# main方法, 通过asyncio实现函数异步调用def main():
 loop = asyncio.get_event_loop()

 tasks = gen_tasks()
 batch_len = len(TBLES.keys()) * 5 # all up to you
 run_tasks(tasks, batch_len)

 loop.close()

热心网友 时间:2022-04-13 01:30

9000 行数据很小了, 慢的话 要是你每条数据处理逻辑都不复杂的话 那就是频繁的读写数据库 耗费了时间 。
1.不知道你是不是读一条 处理一条 入库一条
要是这样 你把逻辑改一下 :新建一个数组,读一条 处理一条 把处理好的数据放到数组中,再接着处理下有一条 ,以此类推,数据字段不多(不要搞得内存溢出)的话, 9000条都可以加载到数组里面了 ,然后利用python executemany(sql,values) 把这个数组一次性插入数据库
2.看看A表的数据能不能分组
例如 假设表A的id字段如下
id
1
2
3
4
5
.。
那么就可以利用id值的奇偶性 分成 2块数据 1,3,5 .---- 2,4
然后python起俩个进程 分别处理这俩块数据 ,根据Abiao实际情况 多分几个数据块 用python多个进程一起处理

祝你好运
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
说课包括哪些方面 说课内容包括()。 如何在手机百度上删除对话记录? 结核病是什么样的疾病? 曹丕17岁得了肺痨,明知自己命不长久,还要强争王位,是不是很自私呢?_百... 古代小说常出现的病名 急求一篇"生活小窍门"(500字)的作文 至今最有什么小妙招 健康的戒烟方法 笔记本电池锁死是什么原因引起的? 宝宝十个月得了慢性支气管炎怎么办 10个月宝宝支气管炎反复发烧怎么办 十一个月大的宝宝经常咳嗽还带点支气管炎,怎么办 中国人寿鸿福至尊年金险哪里买?靠谱吗? 微信被人投诉,能转账吗 太平人寿财富安赢年金险哪里买?便宜吗? 微信收到转账1000,然后对方给我投诉了,我该怎么办 手机3G时代什么时候来,是不是3G时代后现在的国产手机就无法用了? 太平人寿财富安赢年金险哪里买?每年花多少钱? 平安人寿金瑞人生年金险哪里买?注意哪些问题? 3G时代是怎么样的一个时代 3G时代是什么样的时代? 平安人寿智能星年金险哪里买?靠谱吗? 微信转账对方已经收款,可以申诉撤回吗 3g是什么哪年出的 平安人寿智能星年金险哪里买?注意哪些问题? 什么是3G网络时代? 大都会人寿宝贝B款年金险哪里买?好吗? 什么3G时代 大都会人寿宝贝A款年金险哪里买?好吗? 水墨画初学者要注意哪些东西? 一见韩信就下跪磕头的樊哙,真的是一员猛将吗? 樊哙和韩信 韩信凭什么看不起樊哙呢? 韩信被贬后,樊哙为何仍对他行跪拜之礼呢? 韩信被夺取兵权,路过君王妹夫家,说了句什么自嘲的话,成为千古名言? 樊哙为什么会对被贬为淮阴侯的韩信磕头? 梦见在乔上和乔大钓鱼是什么意思 西汉初期樊哙拜会韩信,是否意味吕后的拉拢呢? 右附件常大欠清。 《淮阴侯列传》中,韩信去拜访樊哙说什么生乃与哙等为伍,他很看不起樊哙啊,为何? 左附件区可见大小约29x20mm囊性,壁厚,内液尚清,右侧卵巢显示欠清,盆腔内未见 为什么韩信会羞于和樊哙这样的人为伍呢? 樊哙出自那本书?标准答案 樊哙见他都跪拜,韩信的地位在当时究竟有多高? 三国中有关樊哙的故事情节 用简洁的语言叙述一个与刘邦或韩信有关的故事 左卵巢无回声区 直径16MM 右卵巢显示欠清 韩信、彭越、英布、龙且、樊哙谁比较厉害? 华为nova2plus存储卡放哪个位置