概述
业务中有时会需要解析excel中的数据,按照要求处理后,写入到db中;
用python处理这个正好简便快捷
demo
没有依赖就 pip install pymysql一下
import pymysql from pymysql.converters import escape_string from openpyxl import load_workbook from Snowflake import Snowflake def load_excel_data(snowflake): # 连接到MySQL数据库 mydb = pymysql.connect( host="xxx.xxx.xxx.xxx", port=3306, user="xxx", passwd="xxx", db="xxxx" ) # 打开Excel文件 wb = load_workbook(filename=r'D:\xx\test.xlsx') sheet = wb.active # 获取表头 header = [cell.value for cell in sheet[1]] column_header = [] # 表头转换列名 for excel_head_name in header: if '11' == excel_head_name: column_header.append("xx") elif '22' == excel_head_name: column_header.append("xx") elif '33' == excel_head_name: column_header.append("xx") elif '1122' == excel_head_name: column_header.append("xx") # 遍历每一行数据,并将其插入到数据库中 cursor = mydb.cursor() count = 0 defaultUser = "'xxx'" for row in sheet.iter_rows(min_row=2, values_only=True): cId = snowflake.next_id() date = row[0] # datetime 转 date date = date.date() a2 = row[1] reason = row[2] detail = row[3] # \'%s\' 将含有特殊内容的字符串整个塞进去 sql = f"INSERT INTO test_table (id, store_id, num, handler, create_by, update_by, date, a2, reason, detail) VALUES ({cId}, 3, 0, 43, {defaultUser}, {defaultUser}, \'%s\', \'%s\', \'%s\', \'%s\')" % (date, self_escape_string(a2), self_escape_string(reason), self_escape_string(detail)) print(sql) # cursor.execute(sql, row) cursor.execute(sql) count += 1 print(f"正在插入{count}条数据") # 提交更改并关闭数据库连接 mydb.commit() cursor.close() mydb.close() # 将字符串中的特殊字符转义 # python中没有null只有None def self_escape_string(data): if data is None: return "" return escape_string(data) if __name__ == '__main__': worker_id = 1 data_center_id = 1 snowflake = Snowflake(worker_id, data_center_id) load_excel_data(snowflake)
雪花id生成主键
import time import random class Snowflake: def __init__(self, worker_id, data_center_id): ### 机器标识ID self.worker_id = worker_id ### 数据中心ID self.data_center_id = data_center_id ### 计数序列号 self.sequence = 0 ### 时间戳 self.last_timestamp = -1 def next_id(self): timestamp = int(time.time() * 1000) if timestamp < self.last_timestamp: raise Exception( "Clock moved backwards. Refusing to generate id for %d milliseconds" % abs(timestamp - self.last_timestamp)) if timestamp == self.last_timestamp: self.sequence = (self.sequence + 1) & 4095 if self.sequence == 0: timestamp = self.wait_for_next_millis(self.last_timestamp) else: self.sequence = 0 self.last_timestamp = timestamp return ((timestamp - 1288834974657) << 22) | (self.data_center_id << 17) | (self.worker_id << 12) | self.sequence def next_id(self): timestamp = int(time.time() * 1000) if timestamp < self.last_timestamp: raise Exception("Clock moved backwards. Refusing to generate id for %d milliseconds" % abs(timestamp - self.last_timestamp)) if timestamp == self.last_timestamp: self.sequence = (self.sequence + 1) & 4095 if self.sequence == 0: timestamp = self.wait_for_next_millis(self.last_timestamp) else: self.sequence = 0 self.last_timestamp = timestamp return ((timestamp - 1288834974657) << 22) | (self.data_center_id << 17) | (self.worker_id << 12) | self.sequence def wait_for_next_millis(self, last_timestamp): timestamp = int(time.time() * 1000) while timestamp <= last_timestamp: timestamp = int(time.time() * 1000) return timestamp
还没有评论,来说两句吧...