Slackデータの保管に関する考察 (API)
ファイルのカジュアルな復号化に関する考察(Base64→DataURL)最新版 2.1.4
ディレクトリ構造の精査とマッピングに関する小考察
PythonでPostgreSQLとやりとりする https://zenn.dev/collabostyle/articles/36e822520182d3 psycopg2 https://pypi.org/project/psycopg2/
ディレクトリ内の構造とファイルのハッシュ値を取得して整合性を検証するスクリプト
def wareki_to_seireki(wareki: int) -> str: # 年号の対応表 era_map = { 1: (1868, "明治"), 2: (1912, "大正"), 3: (1926, "昭和"), 4: (1989, "平成"), 5: (2019, "令和") } # 7桁の数値を文字列に変換し、各部分を取得 str_wareki = str(wareki).zfill(7) # 桁不足の際に補完 era_digit = int(str_wareki[0]) # 年号部分 year = int(str_wareki[1:3]) # 年度部分 month_day = str_wareki[3:7] # 月日部分 if era_digit not in era_map: raise ValueError("無効な年号の値です") base_year, era_name = era_map[era_digit] seireki_year = base_year + (year - 1) return f"{seireki_year}年{month_day[:2]}月{month_day[2:]}日 ({era_name}{year}年)"def seireki_to_wareki(seireki: int, month: int, day: int) -> str: era_map = [ (2019, "令和", 5), (1989, "平成", 4), (1926, "昭和", 3), (1912, "大正", 2), (1868, "明治", 1) ] for base_year, era_name, era_digit in era_map: if seireki >= base_year: wareki_year = seireki - base_year + 1 return f"{era_digit}{str(wareki_year).zfill(2)}{str(month).zfill(2)}{str(day).zfill(2)}" raise ValueError("無効な西暦の値です") # 使用例 print(wareki_to_seireki(5190701)) # 令和19年7月1日 -> 2037年7月1日 print(seireki_to_wareki(2037, 7, 1)) # 2037年7月1日 -> 5190701
定数管理スクリプト#!/usr/bin/python # -*- coding: utf-8 -*- import site import sys, warnings, copy from types import MappingProxyType class ConstantError(TypeError): # #定数用独自例外処理クラス (名称だけのクラス) ###INSIDE### """定数への再代入例外処理用Exceptionクラス""" def __init__(self, message): #生成時に実行される ###INSIDE### super().__init__(message) class Constant: #定数管理クラス def __init__(self, recursion_limit=20): #生成時に実行 if recursion_limit is not None and type(recursion_limit) in [int, float]: self.set_constant_class_recursion_limit(recursion_limit) else: self.set_constant_class_recursion_limit(0) def __getattr__(self, name): #要素が見つからない場合に呼び出される関数 ###INSIDE### raise AttributeError("'{0}' does not exist. You can't refer to something that doesn't exist.".format(name)) def __setattr__(self, name, value): #クラスインスタンスに値を代入する際に呼び出される関数 ###INSIDE### if name in self.__dict__: #インスタンス変数にすでに存在していないかを確認 raise ConstantError("The constant named '{0}' cannot be changed.".format(name)) elif name in set(dir(self)): #インスタンス内にすでに存在していないかを確認 (変数ではないが存在しているオブジェクト) raise ConstantError("'{0}' is already reserved.".format(name)) else: try: self.__dict__[name] = copy.deepcopy(self.read_only_object_converter(value)) #参照渡しによる改変禁止 except TypeError as e: #UnPickleエラー(ディープコピー断念) try: self.__dict__[name] = copy.copy(self.read_only_object_converter(value)) except TypeError as e: #UnPickleエラー(シャーロコピー断念) self.__dict__[name] = self.read_only_object_converter(value) def __delattr__(self, name): #インスタンスから定数を消させないための関数 ###INSIDE### if name in self.__dict__: #インスタンス変数にすでに存在していないかを確認 raise ConstantError("The constant named '{0}' cannot be deleted.".format(name)) elif name in set(dir(self)): #インスタンス内にすでに存在していないかを確認 (変数ではないが存在しているオブジェクト) raise ConstantError("'{0}' is already reserved.".format(name)) else: raise AttributeError("'{0}' does not exist. You can't delete something that doesn't exist.".format(name)) def read_only_object_converter(self, value, recursion_limit=None): #list, set, dict を変更不可にする関数 recursion_limit_buffer = None if recursion_limit is None: try: recursion_limit_buffer = int(self.CONSTANT_CLASS_RECURSION_LIMIT) except Exception: #例外が発生した場合は処理を行わない recursion_limit_buffer = 0 else: try: recursion_limit_buffer = int(recursion_limit) except Exception: #例外が発生した場合は処理を行わない recursion_limit_buffer = 0 try: if recursion_limit_buffer < 1: raise RecursionError("'recursion_limit's value is zero.") if type(value) in (dict, MappingProxyType): value_buffer = dict() for k, v in dict(value).items(): value_buffer[k] = self.read_only_object_converter(v, recursion_limit=recursion_limit_buffer-1) return MappingProxyType(value_buffer) elif type(value) in (set, frozenset): value_buffer = set() for v in list(value): value_buffer.add(self.read_only_object_converter(v, recursion_limit=recursion_limit_buffer-1)) return frozenset(value_buffer) elif type(value) in (list, tuple): value_buffer = list() for v in list(value): value_buffer.append(self.read_only_object_converter(v, recursion_limit=recursion_limit_buffer-1)) return tuple(value_buffer) else: return value except RecursionError: return value def read_only_object_reverse_converter(self, value, recursion_limit=None): #tuple, frozenset, MappingProxyType を変更可能にする関数 recursion_limit_buffer = None if recursion_limit is None: try: recursion_limit_buffer = int(self.CONSTANT_CLASS_RECURSION_LIMIT) except Exception: #例外が発生した場合は処理を行わない recursion_limit_buffer = 0 else: try: recursion_limit_buffer = int(recursion_limit) except Exception: #例外が発生した場合は処理を行わない recursion_limit_buffer = 0 try: if recursion_limit_buffer < 1: raise RecursionError("'recursion_limit's value is zero.") if type(value) in (dict, MappingProxyType): value_buffer = dict() for k, v in dict(value).items(): value_buffer[k] = self.read_only_object_reverse_converter(v, recursion_limit=recursion_limit_buffer-1) return dict(value_buffer) elif type(value) in (set, frozenset): value_buffer = set() for v in list(value): value_buffer.add(self.read_only_object_reverse_converter(v, recursion_limit=recursion_limit_buffer-1)) return set(value_buffer) elif type(value) in (list, tuple): value_buffer = list() for v in list(value): value_buffer.append(self.read_only_object_reverse_converter(v, recursion_limit=recursion_limit_buffer-1)) return list(value_buffer) else: return value except RecursionError: return value def get_constant(self, constant_name_list=None, ignore_mode=True): #定数の辞書を返す関数 try: return_dict = {} for k, v in self.__dict__.items(): if constant_name_list is None or k in constant_name_list: return_dict[k] = v return return_dict except BaseException as e: if not(ignore_mode) or type(e) == KeyboardInterrupt: raise return None def show_constant(self, constant_name_list=None, ignore_mode=True): #定数をまとめた表を表示する関数 try: show_dict = self.get_constant(constant_name_list=constant_name_list, ignore_mode=False) show_dict_key_max_len = max([len(str(k)) for k in show_dict.keys()]) if len(show_dict) > 0 else 0 show_dict_value_max_len = max([len(str(v)) for v in show_dict.values()]) if len(show_dict) > 0 else 0 print("[ Constant list ({0}) ]".format(len(show_dict))) counter, counter_max_len = 0, len(str(len(show_dict) - 1)) for k, v in sorted(show_dict.items(), key=lambda x:x[0]): v_buffer = str(v).replace("\n", "\\n") v_buffer = "'{0}'".format(v_buffer) if v is not None and type(v) == str else v_buffer print("{3:{4}d} : {0:<{2}} = {1:<{5}}".format(str(k).replace("\n", "\\n"), v_buffer, show_dict_key_max_len, counter, counter_max_len, show_dict_value_max_len)) counter += 1 print() return True except BaseException as e: if not(ignore_mode) or type(e) == KeyboardInterrupt: raise return False def get_reserved_object(self, ignore_mode=True): #予約済みオブジェクトの名前リストを返す関数 return_list = [] try: for n in list(set(dir(self)) - set(self.__dict__.keys())): return_list.append(n) return return_list except BaseException as e: if not(ignore_mode) or type(e) == KeyboardInterrupt: raise return None def show_reserved_object(self, ignore_mode=True): #予約済みオブジェクトをまとめた表を表示する関数 try: show_list = self.get_reserved_object(ignore_mode=False) show_list_max_len = max([len(str(n)) for n in show_list]) if len(show_list) > 0 else 0 print("[ Reserved object list ({0}) ]".format(len(show_list))) counter, counter_max_len = 0, len(str(len(show_list) - 1)) for n in sorted(show_list): print("{1:{2}d} : {0:<{3}}".format(str(n).replace("\n", "\\n"), counter, counter_max_len, show_list_max_len)) counter += 1 print() return True except BaseException as e: if not(ignore_mode) or type(e) == KeyboardInterrupt: raise return False def overwrite_constant(self, constant_name, new_constant_value, check_for_reserved_object_flag=False, ignore_mode=True): #定数を上書きできる関数 ※非推奨 try: if constant_name in set(dir(self)) and constant_name not in self.__dict__ and check_for_reserved_object_flag: raise ConstantError("'{0}' is already reserved.".format(constant_name)) self.__dict__[constant_name] = new_constant_value return True except (ConstantError, KeyError) as e: if not(ignore_mode): raise return False def delete_constant(self, constant_name, ignore_mode=True): #定数を削除できる関数 ※非推奨 try: if constant_name in set(dir(self)) and constant_name not in self.__dict__: raise ConstantError("'{0}' is already reserved.".format(constant_name)) del self.__dict__[constant_name] return True except (ConstantError, KeyError) as e: if not(ignore_mode): raise return False def set_constant_class_recursion_limit(self, recursion_limit=None, ignore_mode=True): #変更不可能化処理の再帰上限を設定する関数 try: if recursion_limit is not None: recursion_limit_buffer = 0 if int(recursion_limit) < 0 else int(recursion_limit) self.__dict__["CONSTANT_CLASS_RECURSION_LIMIT"] = recursion_limit_buffer return self.__dict__["CONSTANT_CLASS_RECURSION_LIMIT"] except (KeyError, ValueError): if not(ignore_mode): raise return None CONST = Constant() #定数クラスをインスタンス化
埋めたいdef padding_text_list(text_list, to_length=None, padding_chr=" ", right_mode=False): if len(text_list) < 1: return list() padding_chr_true = " " if padding_chr is None or len(str(padding_chr)) < 1 else str(padding_chr)[0] max_len = max([len(str(text)) for text in text_list]) if to_length is None else int(to_length) if right_mode: return [str(text).rstrip(padding_chr_true).rjust(max_len, padding_chr_true) for text in text_list] else: return [str(text).lstrip(padding_chr_true).ljust(max_len, padding_chr_true) for text in text_list]
ソートからの変換器生成def create_convert_dict(data_2d_list, key_function, value_function, sort_key_function=None): # ソートが必要な場合は指定された関数でソート if sort_key_function: data_2d_list = sorted(data_2d_list, key=sort_key_function) # 辞書の生成(上書き対応) result_dict = {key_function(item): value_function(item) for item in data_2d_list} return result_dict # 使用例 dict_of_key_value = create_convert_dict( data_2d_list=[["1", "A", "う"], ["2", "B", "い"], ["2", "C", "あ"]], key_function=lambda x: x[0], value_function=lambda x: x[2], sort_key_function=lambda x: (x[0], x[1]), )
固定長ファイル変換その1(骨格)import unicodedata def get_display_width(text): return sum([2 if unicodedata.east_asian_width(ch) in ("A", "F", "W") else 1 for ch in text]) if len(text) > 0 else 0 class Fixed_length_file_converter: def __init__(self, config_dict_list): self.config_dict_list = [] self.record_size = 0 counter = 1 for cd in config_dict_list: if "SIZE" in cd and "NAME" in cd: func = cd.get("FUNCTION", None) cd_n = { "NUMBER": counter, "NAME": cd["NAME"], "SIZE": cd["SIZE"], "FUNCTION": func } self.config_dict_list.append(cd_n) self.record_size += cd["SIZE"] else: raise Exception("The config dictionary list must have 'NAME' and 'SIZE' parameters.") counter += 1 def print_config(self): headers, col_widths = ["NUMBER", "NAME", "SIZE", "FUNCTION"]. [8, 20, 6, 20] # Head line = "" for header, width in zip(headers, col_widths): line += header.ljust(width) print(line) print("-" * sum(col_widths)) # Body for cd in self.config_dict_list: line = "" line += str(cd["NUMBER"]).ljust(col_widths[0]) line += str(cd["NAME"]).ljust(col_widths[1]) line += str(cd["SIZE"]).ljust(col_widths[2]) func_name = cd["FUNCTION"].__name__ if cd["FUNCTION"] else "None" line += func_name.ljust(col_widths[3]) print(line) def read(self, path): records = [] try: with open(path, mode="rb") as bfr: data = bfr.read() total_size = len(data) # 余りチェック remainder = total_size % self.record_size if remainder != 0: raise ValueException( "{0} bytes leftover at the end of file. (total: {1} bytes, record_size: {2})".format( remainder, total_size, self.record_size } ) for i in range(0, total_size - remainder, self.record_size): record_data = data[i:i + self.record_size] record = [] offset = 0 for field in self.config_dict_list: size = field["SIZE"] field_data = record_data[offset:offset + size] decoded = field_data if field["FUNCTION"]: decoded = field["FUNCTION"](field_data) record.append(decoded) offset += size records.append(record) except Exception as e: raise Exception("Failed to read file: [{0}]: {1}".format(type(e).__name__, str(e))) return records
固定長ファイル変換その2import unicodedata def get_display_width(text): return sum([2 if unicodedata.east_asian_width(ch) in ("A", "F", "W") else 1 for ch in text]) if len(text) > 0 else 0 def convert_1byte_ebcdic(bytes): #EBCDIC変換 return bytes.decode("cp930") def convert_1byte_pac10(bytes): #PAC10進数(Packed Decimal)を整数に変換する関数 data_list = list() for byte in bytes: data_list.append((byte & 0b11110000) >> 4) data_list.append(byte & 0b00001111) for s in data_list[:-1]: if not(0b0000 <= s and s <= 0b1001): raise ValueError("'{0}' is not a valid PAC decimal number.".format(s)) sign = "+" if data_list[-1] not in (0b1100, 0b1101): raise ValueError("'{0}' is not a valid PAC sign.".format(data_list[-1])) elif data_list[-1] == 0b1101: sign = "-" result_string = "".join([str(s) for s in data_list[:-1]]) return int("{0}{1}".format(sign, result_string)) def convert_2byte_jef(bytes): #JEF変換 #【2バイト文字のみ前提】JEFのバイト列をJISコード経由でPythonのUnicode文字列に変換する result = [] i = 0 while i < len(bytes): if i + 1 >= len(bytes): break # 奇数個だったら無視 b1 = bytes[i] b2 = bytes[i + 1] jef_code = (b1 << 8) | b2 # JISコードに戻す jis_code = jef_code - 0x8080 high = (jis_code >> 8) & 0xFF low = jis_code & 0xFF # JISバイト列作成 jis_bytes = bytes([high, low]) try: # JISコードを一度Unicodeに変換する char = jis_bytes.decode("iso2022_jp") result.append(char) except Exception: result.append("?") #デコードエラー i += 2 return "".join(result) class Fixed_length_file_converter: def __init__(self, config_dict_list): self.config_dict_list = [] self.record_size = 0 counter = 1 for cd in config_dict_list: if "SIZE" in cd and "NAME" in cd: func = cd.get("FUNCTION", None) cd_n = { "NUMBER": counter, "NAME": cd["NAME"], "SIZE": cd["SIZE"], "FUNCTION": func } self.config_dict_list.append(cd_n) self.record_size += cd["SIZE"] else: raise Exception("The config dictionary list must have 'NAME' and 'SIZE' parameters.") counter += 1 def print_config(self): headers, col_widths = ["NUMBER", "NAME", "SIZE", "FUNCTION"], [8, 20, 6, 20] # Head line = "" for header, width in zip(headers, col_widths): line += header.ljust(width) print(line) print("-" * sum(col_widths)) # Body for cd in self.config_dict_list: line = "" line += str(cd["NUMBER"]).ljust(col_widths[0]) line += str(cd["NAME"]).ljust(col_widths[1]) line += str(cd["SIZE"]).ljust(col_widths[2]) func_name = cd["FUNCTION"].__name__ if cd["FUNCTION"] else "None" line += func_name.ljust(col_widths[3]) print(line) def read(self, path): records = [] try: with open(path, mode="rb") as bfr: data = bfr.read() total_size = len(data) # 余りチェック remainder = total_size % self.record_size if remainder != 0: raise ValueException( "{0} bytes leftover at the end of file. (total: {1} bytes, record_size: {2})".format( remainder, total_size, self.record_size ) ) for i in range(0, total_size - remainder, self.record_size): record_data = data[i:i + self.record_size] record = [] offset = 0 for field in self.config_dict_list: size = field["SIZE"] field_data = record_data[offset:offset + size] decoded = field_data if field["FUNCTION"]: decoded = field["FUNCTION"](field_data) record.append(decoded) offset += size records.append(record) except Exception as e: raise Exception("Failed to read file: [{0}]: {1}".format(type(e).__name__, str(e))) return records if "__main__" == __name__: print(convert_1byte_pac10([0x08, 0x12, 0x34, 0x5D]))
#住民票コード - 「平成14年総務省告示第436号」 #チェックデジット生成方式: モジュラス11 ウェイト2~7def get_checkdigit(source_code_string): #チェックデジット生成 (モジュラス11 ウェイト2~7) value_list = [int(s) for s in reversed(str(source_code_string))] weight_list = [i % 6 + 2 for i in range(len(source_code_string))] sum_value = 0 for v, w in zip(value_list, weight_list): sum_value += v * w return 11 - sum_value % 11 if sum_value % 11 >= 2 else 0
Slackアーカイブシステム(g_ver=2.3)
Oracle接続系をベースにしたプログラムの塊(設定削除済み)
・PostgreSQL https://www.postgresql.org/download/windows/ ・PythonでのPostgreSQLデータベースラッパー(ビルドがめんどいからBinaryでヨシ) https://pypi.org/project/psycopg2-binary/
#SlackCollector[v48-7_20250601].zip
の分割データOracle接続Oracle接続コア部分TIFFファイルをbase64CSVにするコードimport os, base64, csv, datetime LOAD_DIRECTORY = os.path.join(os.path.dirname(os.path.abspath(__file__)), "DATA") OUTPUT_FILE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "OUTPUT.csv") IF_FUNCTION = lambda x: str(x).lower().endswith(".tiff") ROW_FUNCTION = [ lambda x, y : "{0:0>10s}".format(os.path.splitext(os.path.basename(str(x)))[0]), lambda x, y : str(os.path.basename(x)), lambda x, y : str(y) ] TITLE_LIST = ["登録番号", "ファイル名", "データ"] def package_binary_to_2d_list( if_function=IF_FUNCTION, row_function=ROW_FUNCTION, load_directory_path=LOAD_DIRECTORY, output_file_path=OUTPUT_FILE_PATH, title_list=TITLE_LIST ): with open(output_file_path, "w", newline="", encoding="utf-8") as fw: cfw = csv.writer(fw) cfw.writerow(title_list) for file_name in os.listdir(load_directory_path): file_path = os.path.join(load_directory_path, file_name) if os.path.isfile(file_path) and (file_name): with open(file_path, mode="rb") as fr: binary_data = fr.read() base64_data = base64.b64encode(binary_data).decode("ascii") row = list() for f in row_function: row.append(f(file_path, base64_data)) cfw.writerow(row) if __name__ == "__main__": print("開始: {0}".format(datetime.datetime.now())) package_binary_to_2d_list( if_function=IF_FUNCTION, row_function=ROW_FUNCTION, load_directory_path=LOAD_DIRECTORY, output_file_path=OUTPUT_FILE_PATH, title_list=TITLE_LIST ) print("終了: {0}".format(datetime.datetime.now()))
順番に辞書入りリストを並べたいdef reorder_dict_list_with_validation(data): if not data: return [] mynum_to_dict = {} nextnum_set = set() # 準備と検査:辞書作成 & 重複 mynum チェック for d in data: mynum = d.get("mynum") nextnum = d.get("nextnum") if mynum in mynum_to_dict: raise ValueError("重複した「自ノード」が存在します: {0}".format(mynum)) mynum_to_dict[mynum] = d if nextnum is not None: nextnum_set.add(nextnum) # 始点を決定:他のノードの nextnum に登場しない mynum が始点 start_candidates = set(mynum_to_dict.keys()) - nextnum_set if len(start_candidates) != 1: raise ValueError("始点が1つに特定できません(候補: {0})".format(start_candidates)) start = start_candidates.pop() # 順に辿る:ループや存在しない nextnum を検出 visited, result = set(), list() while start is not None: if start in visited: raise ValueError("循環を検出しました: {0} が再訪されました".format(start)) node = mynum_to_dict.get(start) if node is None: raise ValueError("存在しない「次ノード」を参照しています: {0}".format(start)) result.append(node) visited.add(start) start = node["nextnum"] # 全ノードが使用されたか確認 if len(visited) != len(data): unused = set(mynum_to_dict.keys()) - visited raise ValueError("未使用のノードが存在します: {0}".format(unused)) return result def test(data): print("-"*60) try: ordered = reorder_dict_list_with_validation(data) for item in ordered: print(item) except Exception as e: print("[{0}]: {1}".format(type(e).__name__, e)) print() data = [{"mynum":1, "nextnum":3}, {"mynum":2, "nextnum":1}, {"mynum":3, "nextnum":None}] test(data) # 循環あり data = [{"mynum":1, "nextnum":2}, {"mynum":2, "nextnum":1}] test(data) # 存在しない nextnum を参照 data = [{"mynum":1, "nextnum":2}] test(data) # 始点が複数 data = [{"mynum":1, "nextnum":3}, {"mynum":2, "nextnum":3}, {"mynum":3, "nextnum":None}] test(data) # mynum 重複 data = [{"mynum":1, "nextnum":2}, {"mynum":1, "nextnum":None}] test(data)
生成_CSV→表CSVをしゃれたHTMLテーブルにしたいだろ?import os, html from collections import OrderedDict import pprint OUTPUT_NAME = "ジョブ構成" TITLE_STRINGS = "サンプルデータ" TITLE_SMALL_FLAG = True BUFFER = [ ["#あ", "あい", "123", "0"], ["#か", "かき", "456", "htr", "0"], ["#か", "かく", "456", "hr", "0"], ["#か", "かけ", "123", "", "1"], ["#さ", "さし", "ramen", "なぜ"], ["#さ", "さす", "sarada", "なぜ"], ["#さ", "させ", "YAYAYA"], ["#さ", "さそ", "OPOPOP", "なぜ"], ] STYLE_DICT = { tuple() : "background-color: rgb(242,242,242);", ("#あ", "あい",) : "background-color: rgb(211,235,247);", ("#か", "かき",) : "background-color: rgb(226,239,218);", ("#か", "かく",) : "background-color: rgb(235,242,204);", ("#か", "かけ",) : "background-color: rgb(254,214,221);", ("#さ", "さし",) : "background-color: rgb(214,220,228);", ("#さ", "さす",) : "background-color: rgb(219,183,255);", ("#さ", "させ",) : "background-color: rgb(217,217,217);", ("#さ", "さそ",) : "background-color: rgb(252,228,214);", } def split_style_dict(style_dict=STYLE_DICT): counter = 0 a_dict, b_dict = dict(), dict() for k, v in style_dict.items(): a_dict[k] = "element_{0}".format(counter) b_dict["element_{0}".format(counter)] = v counter += 1 return (a_dict, b_dict) def convert_table_list(table_list): merge_table_dict = OrderedDict() for table_list_one in table_list: if len(table_list_one) > 0 and table_list_one[0] is not None and len(table_list_one[0]) > 0: if table_list_one[0] not in merge_table_dict: merge_table_dict[table_list_one[0]] = list() merge_table_dict[table_list_one[0]].append(table_list_one[1:]) return_dict = OrderedDict() for k, v in merge_table_dict.items(): return_dict[k] = convert_table_list(v) return return_dict def convert_dict_to_command_list(table_dict, y_point=0, x_point=0, history_list=list(), mother_option_set=set()): return_list, y_one_point, simple_count = list(), 0, 0 for k, v in table_dict.items(): data_buffer = { "X" : x_point, "Y" : y_point, "V" : str(k), "H" : history_list+[k], "O" : set(mother_option_set), } if simple_count == 0: data_buffer["O"].add("group_start") data_buffer["O"].add("group_start_d{0}".format(len(history_list))) if simple_count == len(table_dict) - 1: data_buffer["O"].add("group_end") data_buffer["O"].add("group_end_d{0}".format(len(history_list))) if x_point == 0: data_buffer["O"].add("deep_start") if len(v) == 0: data_buffer["O"].add("deep_end") #data_buffer["O"].add("simple_count_{0}".format(simple_count)) simple_count += 1 t_c = 1 if len(v) > 0: buffer = convert_dict_to_command_list( v, x_point=x_point+1, y_point=y_point, history_list=history_list+[k], mother_option_set={"mother_{0}".format(j) if j[:7] != "mother_" else str(j) for j in list(data_buffer["O"])} ) return_list.extend(buffer[0]) y_point += buffer[1] y_one_point += buffer[1] t_c = buffer[1] for b2 in buffer[2]: data_buffer["O"].add("children_{0}".format(b2) if b2[:9] != "children_" else str(b2)) else: y_point += 1 y_one_point += 1 data_buffer.update({ "N" : t_c, "D" : len(history_list+[k]), }) return_list.append(data_buffer) return (return_list, y_one_point, set(data_buffer["O"])) def have_histry(histry, style_class_dict={}): return_style="n_box" style_dict_index = sorted([k for k in style_class_dict], key=lambda x: len(x)) for style_dict_index_one in style_dict_index: if "/".join(map(str, histry)).startswith("/".join(map(str, style_dict_index_one))): return_style = style_class_dict[style_dict_index_one] return return_style def add_external_adjacency_tag(command_list): new_command_list = list() n_set = set() for command_one in command_list: if "X" in command_one and "Y" in command_one and "N" in command_one: for nn in range(command_one["N"]): n_set.add((command_one["X"], command_one["Y"]+nn)) for command_one in command_list: command_one_buffer = dict(command_one) if "X" in command_one and "Y" in command_one and "N" in command_one and "O" in command_one: #top top_flag = (command_one["X"], command_one["Y"]-1) in n_set bottom_flag = (command_one["X"], command_one["Y"]+command_one["N"]) in n_set left_flag = (command_one["X"]-1, command_one["Y"]) in n_set right_flag = (command_one["X"]+1, command_one["Y"]) in n_set option_set = set() if not(top_flag): command_one_buffer["O"].add("external_adjacency_top") if not(bottom_flag): command_one_buffer["O"].add("external_adjacency_bottom") if not(left_flag): command_one_buffer["O"].add("external_adjacency_left") if not(right_flag): command_one_buffer["O"].add("external_adjacency_right") new_command_list.append(command_one_buffer) return new_command_list def convert_command_list_to_html_strings(command_list, style_class_dict=dict(), title_small_flag=TITLE_SMALL_FLAG): y_point_set = {c["Y"] for c in command_list if "Y" in c} table_html_list = [] for y in sorted(y_point_set): y_list = list() for co in sorted([c for c in command_list if "Y" in c and c["Y"] == y], key=lambda x: x["X"]): if "V" in co and "N" in co and "X" in co and "H" in co and "D" in co and "O" in co: y_list.append("<td rowspan=\"{0}\" class=\"{2} {3} {4} {5} {8}\" id=\"{6}\" title=\"{7}\">{1}</td>".format( co["N"], html.escape(co["V"]), "x_cell-{0}".format(co["X"]), " ".join(["y_cell-{0}".format(dummy_x) for dummy_x in range(co["Y"], co["Y"]+co["N"])]), " ".join(["xy_cell-{0}-{1}".format(co["X"], dummy_y) for dummy_y in range(co["Y"], co["Y"]+co["N"])]), " ".join(list(sorted(co["O"]))), "xy_direct_cell-{0}-{1}".format(co["X"], co["Y"]), " ".join([z for z in [ html.escape("Name: '{0}'".format(co["V"])), html.escape("Path: '/{0}'".format("/".join(co["H"]))), html.escape("Tag: {0}".format(" ".join(["{{{0}}}".format(o) for o in sorted(co["O"])]))) if not(title_small_flag) else None, html.escape("X: {0}".format(co["X"])) if not(title_small_flag) else None, html.escape("Y: {0}".format( ", ".join([str(dummy_x) for dummy_x in range(co["Y"], co["Y"]+co["N"])]) )) if not(title_small_flag) else None ] if z is not None]), have_histry(co["H"], style_class_dict=style_class_dict) )) table_html_list.append("<tr>{0}</tr>".format("".join(y_list))) return "<table class=\"data_frame\"><tbody>{0}</tbody></table>".format("".join(table_html_list))
def html_table_to_html(table_strings, title_strings=None, title="タイトル", css_plus_strings=""): return """ <html> <head> <!-- T.T. (update: 2023/02/16) --> <meta name="viewport" content="width=1200"> <meta charset="utf-8"> <link rel="icon" type="image/vnd.microsoft.icon" href="data:image/x-icon;base64,AAABAAEAEBAAAAEAIABoBAAAFgAAACgAAAAQAAAAIAAAAAEAIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A////AP///wD///8A//8AAP//AAD//wAA//8AAP//AAD//wAA//8AAP//AAD//wAA//8AAP//AAD//wAA//8AAP//AAD//wAA//8AAA=="> <title>{2}</title> <style type="text/css"> body {{ width: max-content; margin: 0px; padding: 0px; }} div#title_block {{ background-color: #000000; color: #FFFFFF; font-weight: bold; width: calc(100% - 1em); font-size: 0.8em; padding: 0em 0.5em 0em 0.5em; margin: 0em 0em 1em 0em; /*font-family: \"Courier New\", Consolas, Menlo, Monaco, monospace;*/ }} div#table_block {{ background-color: rgb(166,166,166); padding: 0em 1em 1em 0em; }} table.data_frame {{ margin: 0px; background-color: rgb(166,166,166); border-collapse: collapse; }} table.data_frame > tbody > tr > td {{ text-align: left; vertical-align: top; white-space: nowrap; border: 1px solid #000000; padding: 0.05em 0.3em; white-space: pre; /*font-family: \"Courier New\", Consolas, Menlo, Monaco, monospace;*/ }} /* 外部が空白でああるか? */ table.data_frame > tbody > tr > td.external_adjacency_top {{ border-top: 3px solid #000000; }} table.data_frame > tbody > tr > td.external_adjacency_bottom {{ border-bottom: 3px solid #000000; }} table.data_frame > tbody > tr > td.external_adjacency_left {{ border-left: 3px solid #000000; }} table.data_frame > tbody > tr > td.external_adjacency_right {{ border-right: 3px solid #000000; }} /* 横方向の最初と最後 */ table.data_frame > tbody > tr > td.deep_start {{ border-left: 3px solid #000000; }} table.data_frame > tbody > tr > td.deep_end {{ border-right: 3px solid #000000; }} /* グループ区切りA */ table.data_frame > tbody > tr > td.x_cell-0 {{ border: 3px solid #000000; }} table.data_frame > tbody > tr > td.x_cell-1 {{ border: 3px solid #000000; }} table.data_frame > tbody > tr > td.x_cell-2.deep_end {{ border: 3px solid #000000; }} /* グループ区切りA\B */ table.data_frame > tbody > tr > td.group_start_d2 {{ border-top: 3px solid #000000; }} table.data_frame > tbody > tr > td.group_end_d2 {{ border-bottom: 3px solid #000000; }} /* グループ区切りC */ table.data_frame > tbody > tr > td.x_cell-1.children_group_start_d3 {{ border-top: 3px solid #000000; }} table.data_frame > tbody > tr > td.x_cell-1.children_group_end_d3 {{ border-bottom: 3px solid #000000; }} table.data_frame > tbody > tr > td.x_cell-2.children_group_start_d3 {{ border-top: 3px solid #000000; }} table.data_frame > tbody > tr > td.x_cell-2.children_group_end_d3 {{ border-bottom: 3px solid #000000; }} table.data_frame > tbody > tr > td.group_start_d3 {{ border-top: 3px solid #000000; }} table.data_frame > tbody > tr > td.group_end_d3 {{ border-bottom: 3px solid #000000; }} table.data_frame > tbody > tr > td.group_start_d4.mother_group_start_d3 {{ border-top: 3px solid #000000; }} table.data_frame > tbody > tr > td.group_end_d4.mother_group_end_d3 {{ border-bottom: 3px solid #000000; }} table.data_frame > tbody > tr > td.group_start_d5.mother_group_start_d3 {{ border-top: 3px solid #000000; }} table.data_frame > tbody > tr > td.group_end_d5.mother_group_end_d3 {{ border-bottom: 3px solid #000000; }} table.data_frame > tbody > tr > td.group_start_d6.mother_group_start_d3 {{ border-top: 3px solid #000000; }} table.data_frame > tbody > tr > td.group_end_d6.mother_group_end_d3 {{ border-bottom: 3px solid #000000; }} {4} </style> </head> <body> <div id=\"title_block\"{3}>{1}</div> <div id=\"table_block\"{3}> {0} </div> </body> </html> """.format( table_strings, html.escape(title_strings), html.escape(title), " style=\"display: none;\"" if title_strings is None else "", css_plus_strings ).strip() if "__main__" == __name__: buffer, max_len = list(), 0 for b in BUFFER: if max_len < len(b): max_len = len(b) for b in BUFFER: max_len_one = len(b) buffer.append([b[i] for i in range(len(b))] + ["" for _ in range(max_len - len(b))]) dump_list = [[None for _ in range(len(buffer))] for _ in range(max_len)] buffer, counter, children_option = convert_dict_to_command_list( convert_table_list(buffer) ) for b in buffer: print("■", b) style_class_dict, css_dict = split_style_dict(style_dict=STYLE_DICT) data = html_table_to_html( convert_command_list_to_html_strings( add_external_adjacency_tag(buffer), style_class_dict=style_class_dict ), title_strings=TITLE_STRINGS, title=OUTPUT_NAME, css_plus_strings=" ".join(["table.data_frame > tbody > tr > td.{0} {{ {1} }}".format(kb, vb) for kb, vb in css_dict.items()]) ) with open(os.path.join(os.path.dirname(__file__), "#{0}_v21.html".format(OUTPUT_NAME)), "w", encoding="utf-8-sig") as fw: fw.write(data)
ログ出力import sys, subprocess, time, os, unicodedata, traceback def len_2b(text): return sum([2 if unicodedata.east_asian_width(s) in "AFW" else 1 for s in str(text)]) if len(str(text)) else 0 def ensure_directory_exists(file_path): directory_path = os.path.dirname(os.path.abspath(file_path)) if not(os.path.exists(directory_path)): os.makedirs(directory_path, exist_ok=True) def write_with_retry(path, content, retry_interval=10, max_retries=None, encoding="utf-8", overwrite_mode=True): retries, flag = 0, False ensure_directory_exists(path) while True: try: with open(path, "a" if overwrite_mode else "w", encoding=encoding) as f: f.write(content) flag = True break except (PermissionError, OSError) as e: retries += 1 if max_retries is not None and retries >= max_retries: raise RuntimeError("Max retries reached while writing to \"{0}\": {1}".format(path, e)) time.sleep(retry_interval) return flag def main(argv): return_flag = 255 if len(argv) < 3: print("Usage: python {0} <output_file_path> <command> [args...]".format(argv[0])) return_flag = 3 else: output_path, command = argv[1], argv[2:] try: try: result = subprocess.run(command, capture_output=True, text=True) buffer = "#Command:".format(" ".join(command)) content = "\n".join([buffer, "-"*len_2b(buffer), result.stdout, result.stderr]) flag = write_with_retry(output_path, content) return_flag = 0 if flag else 2 except FileNotFoundError as e: write_with_retry(output_path, content) content = "\n".join([ "#Command : {0}".format(" ".join(command)), "#Exception: [0]: {1}".format(type(e).__name__, e), traceback.format_exc() ]) print(content) return_flag = 4 except Exception as e: content = "\n".join([ "#Command : {0}".format(" ".join(command)), "#Exception: [0]: {1}".format(type(e).__name__, e), traceback.format_exc() ]) print(content) return_flag = 5 return return_flag if __name__ == "__main__": return_flag = main(sys.argv) sys.exit(return_flag)
hex_text = "C5BEBDD0BCE8BEC3" encode_list = [ "ascii", "big5", "big5hkscs", "cp037", "cp273", "cp424", "cp437", "cp500", "cp720", "cp737", "cp775", "cp850", "cp852", "cp855", "cp856", "cp857", "cp858", "cp860", "cp861", "cp862", "cp863", "cp864", "cp865", "cp866", "cp869", "cp874", "cp875", "cp932", "cp949", "cp950", "cp1006", "cp1026", "cp1125", "cp1140", "cp1250", "cp1251", "cp1252", "cp1253", "cp1254", "cp1255", "cp1256", "cp1257", "cp1258", "euc_jp", "euc_jis_2004", "euc_jisx0213", "euc_kr", "gb2312", "gbk", "gb18030", "hz", "iso2022_jp", "iso2022_jp_1", "iso2022_jp_2", "iso2022_jp_2004", "iso2022_jp_3", "iso2022_jp_ext", "iso2022_kr", "latin_1", "iso8859_2", "iso8859_3", "iso8859_4", "iso8859_5", "iso8859_6", "iso8859_7", "iso8859_8", "iso8859_9", "iso8859_10", "iso8859_11", "iso8859_13", "iso8859_14", "iso8859_15", "iso8859_16", "johab", "koi8_r", "koi8_t", "koi8_u", "kz1048", "mac_cyrillic", "mac_greek", "mac_iceland", "mac_latin2", "mac_roman", "mac_turkish", "ptcp154", "shift_jis", "shift_jis_2004", "shift_jisx0213", "utf_32", "utf_32_be", "utf_32_le", "utf_16", "utf_16_be", "utf_16_le", "utf_7", "utf_8" ] print("変換前: {0}".format(hex_text)) print("-"*60) for en in encode_list: print("変換後: {0:<15s}: {1}".format(en, bytes.fromhex(hex_text).decode(en, errors="replace")))
ジョブネット可視化?import json from pathlib import Path def export_custom_graph_html(nodes, edges, html_path="graph.html"): html_template = """<!DOCTYPE html> <html lang="ja"> <head> <meta charset="UTF-8"> <title>Systemwalkerジョブネット可視化</title> <script src="https://unpkg.com/cytoscape@3.26.0/dist/cytoscape.min.js"></script> <style> html, body {{ margin: 0; padding: 0; height: 100%; overflow: hidden; font-family: sans-serif; }} #container {{ display: flex; height: 100%; }} #cy {{ flex: 1; background: #FFFFFF; }} #info {{ width: 300px; background: #F7F7F7; border-left: 1px solid #ccc; padding: 10px; overflow-y: auto; font-size: 13px; }} #info h2 {{ font-size: 16px; border-bottom: 1px solid #ccc; margin-bottom: 5px; }} table {{ width: 100%; border-collapse: collapse; }} td {{ padding: 4px 6px; border-bottom: 1px solid #ddd; }} td.key {{ font-weight: bold; width: 40%; color: #333; }} </style> </head> <body> <div id="container"> <div id="cy"></div> <div id="info"> <h2>ノード情報</h2> <p>ノードをクリックすると属性が表示されます。</p> </div> </div> <script> const elements = {data}; const cy = cytoscape({{ container: document.getElementById('cy'), elements: elements, layout: {{ name: 'preset', padding: 50 }}, style: [ {{ selector: 'node', style: {{ 'label': 'data(label)', 'text-valign': 'center', 'text-halign': 'center', 'font-size': 12, 'color': '#FFFFFF', 'width': 'label', 'padding': '8px', 'shape': 'roundrectangle', 'background-color': ele => {{ if (ele.data('type') === 'message') return '#FF9900'; else if (ele.data('type') === 'jobnet') return '#3C8DBC'; else if (ele.data('type') === 'group') return 'transparent'; else return '#4CAF50'; }}, }} }}, {{ selector: 'node[type="group"]', style: {{ 'shape': 'roundrectangle', 'background-opacity': 0.05, 'border-color': '#888', 'border-width': 2, 'label': 'data(label)', 'font-size': 16, 'text-valign': 'top', 'text-halign': 'center' }} }}, {{ selector: '$node > node', style: {{ 'padding': '10px' }} }}, {{ selector: 'edge', style: {{ 'width': 2, 'line-color': '#888', 'target-arrow-shape': 'triangle', 'target-arrow-color': '#888', 'line-style': ele => ele.data('type') === 'message' ? 'dashed' : 'solid', 'label': 'data(label)', 'font-size': 10, 'color': '#333', 'curve-style': 'bezier' }} }} ] }}); // クリックされたノードの情報を表示 cy.on('tap', 'node', function(evt) {{ const data = evt.target.data(); const info = document.getElementById('info'); let html = "<h2>ノード情報</h2><table>"; for (const [key, value] of Object.entries(data)) {{ if (key != 'id') {{ let true_key = String(key).replace('label', '名称') html += '<tr><td class="key">' + true_key + '</td><td>' + value + '</td></tr>'; }} }} html += "</table>"; info.innerHTML = html; }}); // 位置保存ログ(ドラッグ時) cy.on('dragfree', function(evt) {{ let positions = cy.nodes().map(n => ({{ id: n.id(), position: n.position() }})); console.log("ノード位置情報:", JSON.stringify(positions, null, 2)); }}); </script> </body> </html> """ Path(html_path).write_text(html_template.format( data=json.dumps(nodes+edges, ensure_ascii=False, indent=2) ), encoding="utf-8") print(f":white_check_mark: HTML出力完了: {html_path}") nodes = [ {"data": {"id": "J001", "label": "ジョブ1", "属性":"G"}, "position": {"x": 100, "y": 200}}, {"data": {"id": "J002", "label": "ジョブ2"}, "position": {"x": 300, "y": 200}}, {"data": {"id": "MSG01", "label": ":envelope_with_arrow: メッセージA", "type": "message"}, "position": {"x": 200, "y": 50}}, ] edges = [ {"data": {"source": "J001", "target": "J002", "label": "依存"}}, {"data": {"source": "MSG01", "target": "J002", "label": "起動", "type": "message"}}, ] export_custom_graph_html(nodes, edges, html_path="graph.html")
ノード相対座標を絶対座標に再定義することが出来るのでは?import random # 子ノードの定義(相対座標) # 各親IDごとに子ノードのリストを定義 parents = { "P1": [{"id": "C1", "x": 0, "y": 0}, {"id": "C2", "x": 100, "y": 50}], "P2": [{"id": "C3", "x": 0, "y": 0}, {"id": "C4", "x": 150, "y": 100}], "P3": [{"id": "C5", "x": 0, "y": 0}, {"id": "C6", "x": 50, "y": 50}] } # 各親のバウンディングボックス取得 def compute_parent_size(children, margin=20): min_x = min(child["x"] for child in children) max_x = max(child["x"] for child in children) min_y = min(child["y"] for child in children) max_y = max(child["y"] for child in children) width = max_x - min_x + margin * 2 height = max_y - min_y + margin * 2 return width, height # 親ノードの配置(グリッド風) def layout_parents(parents, margin=50): positions = {} x_offset = 0 y_offset = 0 row_height = 0 max_row_width = 800 for pid, children in parents.items(): width, height = compute_parent_size(children) if x_offset + width > max_row_width: x_offset = 0 y_offset += row_height + margin row_height = 0 positions[pid] = (x_offset, y_offset) x_offset += width + margin row_height = max(row_height, height) return positions # 子ノードの絶対座標を計算 def compute_absolute_positions(parents, parent_positions, margin=20): absolute = [] for pid, children in parents.items(): parent_x, parent_y = parent_positions[pid] for child in children: abs_x = parent_x + child["x"] + margin abs_y = parent_y + child["y"] + margin absolute.append({ "id": child["id"], "parent": pid, "x": abs_x, "y": abs_y }) return absolute # 実行 parent_positions = layout_parents(parents) absolute_children = compute_absolute_positions(parents, parent_positions) # 出力確認 for node in absolute_children: print(f"Node {node['id']} (parent {node['parent']}) => absolute x={node['x']}, y={node['y']}")
出典:「国土数値情報(行政区域データ)」(国土交通省)(https://nlftp.mlit.go.jp/ksj/gml/datalist/KsjTmplt-N03-v3_0.html)(2025年08月04日取得)を加工して作成
市区町村描画import geopandas as gpd import matplotlib.pyplot as plt from PIL import Image, PngImagePlugin #pillow import os, io, sys """ # 背景画像の読み込み background = Image.open("background.png") # GeoJSON(市町村境界)の読み込み gdf = gpd.read_file("N03-21_210101.geojson", encoding="utf-8") # 静岡県用(22000) print(gdf) # 市町村名でフィルタ target_city = "16201" target = gdf[gdf["N03_007"] == target_city] # N03_004 = 市区町村名 # シルエットを描画(白黒) fig, ax = plt.subplots(figsize=(5, 5)) target.plot(ax=ax, color='black', aspect=1) ax.axis('off') # シルエット画像を保存 plt.savefig("silhouette.png", transparent=True, bbox_inches='tight', pad_inches=0) plt.close() # シルエット画像を合成 silhouette = Image.open("silhouette.png").convert("RGBA") silhouette = silhouette.resize(background.size) # 貼り付け(上に重ねる) composite = Image.alpha_composite(background.convert("RGBA"), silhouette) composite.save("output.png") """ """ def draw_pref_outline_with_city( geojson_path: str, target_code: str, background_path: str, output_path: str ): # GeoJSON読み込み gdf = gpd.read_file(geojson_path, encoding="utf-8") # 対象市町村 city = gdf[gdf["N03_007"] == target_code] if city.empty: raise ValueError(f"団体コード {target_code} に該当する市町村が見つかりません") # 都道府県取得 pref_name = city.iloc[0]["N03_001"] pref = gdf[gdf["N03_001"] == pref_name] # 県全体の範囲(描画サイズに使用) bounds = pref.total_bounds # [minx, miny, maxx, maxy] "#{:02x}{:02x}{:02x}".format(*rgb_code) # matplotlibで県全体+市町村塗りつぶしを描画 fig, ax = plt.subplots(figsize=(6, 6), dpi=1200) pref.boundary.plot(ax=ax, color="#{:02x}{:02x}{:02x}".format(*(158, 234, 245)), linewidth=0.6) city.plot(ax=ax, color="#{:02x}{:02x}{:02x}".format(*(158, 234, 245)), edgecolor='none', linewidth=0) #94, 224, 240 # 軸設定:県全体に合わせる margin_ratio = 0.1 # 少し余白を持たせる dx = bounds[2] - bounds[0] dy = bounds[3] - bounds[1] ax.set_xlim(bounds[0] - dx * margin_ratio, bounds[2] + dx * margin_ratio) ax.set_ylim(bounds[1] - dy * margin_ratio, bounds[3] + dy * margin_ratio) ax.axis("off") # 保存(透過) temp_overlay_path = "temp_overlay.png" plt.savefig(temp_overlay_path, transparent=True, bbox_inches='tight', pad_inches=0, dpi=1200) plt.close() # 背景画像と合成 background = Image.open(background_path).convert("RGBA") overlay = Image.open(temp_overlay_path).convert("RGBA") # overlay を背景サイズにフィット(アスペクト比維持) bg_w, bg_h = background.size ov_w, ov_h = overlay.size ratio = min(bg_w / ov_w, bg_h / ov_h) new_size = (int(ov_w * ratio), int(ov_h * ratio)) #overlay_resized = overlay.resize(new_size, Image.LANCZOS) overlay_resized = overlay.resize(new_size, Image.NEAREST) # 中央に貼り付け paste_x = (bg_w - new_size[0]) // 2 paste_y = (bg_h - new_size[1]) // 2 composite = background.copy() composite.alpha_composite(overlay_resized, dest=(paste_x, paste_y)) # 保存 composite.save(output_path) # 使用例(浜松市 22130) draw_pref_outline_with_city( geojson_path="N03-21_210101.geojson", target_code="16201", background_path="background.png", output_path="final_output.png" ) """ def draw_direct_on_background( geojson_path: str, target_code: str, background_path: str, output_path: str, meta_text: str, only_mode: bool, current_dir ): # 1. GeoJSON 読み込み gdf = gpd.read_file(geojson_path, encoding="utf-8") # 2. 対象市町村取得 city = gdf[gdf["N03_007"] == target_code] if city.empty: raise ValueError(f"団体コード {target_code} に該当する市町村が見つかりません") # 3. 県境界取得 pref_name = city.iloc[0]["N03_001"] pref = gdf[gdf["N03_001"] == pref_name] if only_mode: pref = city # 4. 背景画像サイズ取得 background = Image.open(background_path).convert("RGBA") bg_w, bg_h = background.size # 5. 地理座標範囲取得(県全体の境界) minx, miny, maxx, maxy = pref.total_bounds dx = maxx - minx dy = maxy - miny # 6. アスペクト比調整用 aspect_img = bg_w / bg_h aspect_map = dx / dy # 7. DPI(ピクセル解像度)調整 dpi = 1200 figsize = (bg_w / dpi, bg_h / dpi) # 8. 描画 fig, ax = plt.subplots(figsize=figsize, dpi=dpi) pref.boundary.plot(ax=ax, color="#{:02x}{:02x}{:02x}".format(*(158, 234, 245)), linewidth=0.02) city.plot(ax=ax, color="#{:02x}{:02x}{:02x}".format(*(158, 234, 245)), edgecolor='none') # 9. 軸調整:県の範囲をそのまま使う(アスペクト比維持) ax.set_xlim(minx, maxx) ax.set_ylim(miny, maxy) ax.axis('off') # 10. 保存:背景と同サイズ・同アスペクト・欠損なし overlay_path_raw = os.path.join(current_dir ,"temp_overlay_exact_raw", "{0}.png".format(target_code)) overlay_path = os.path.join(current_dir ,"temp_overlay_exact", "{0}.png".format(target_code)) os.makedirs(os.path.dirname(overlay_path_raw), exist_ok=True) os.makedirs(os.path.dirname(overlay_path), exist_ok=True) plt.savefig(overlay_path_raw, transparent=True, bbox_inches=None, pad_inches=0) plt.close() make_alpha_opaque(overlay_path_raw, overlay_path) os.makedirs(os.path.dirname(output_path), exist_ok=True) paste_center(background_path, overlay_path, output_path, meta_text) """ # 11. Pillowで合成(サイズ一致するのでズレなし) overlay = Image.open(overlay_path).convert("RGBA") final = Image.alpha_composite(background, overlay) final.save(output_path) """ def make_alpha_opaque(image_path, output_path): # RGBAで読み込み img = Image.open(image_path).convert("RGBA") datas = img.getdata() new_data = [] for r, g, b, a in datas: if a > 63: new_data.append((r, g, b, 255)) # 半透明→完全不透明 else: new_data.append((r, g, b, 0)) # 透明はそのまま img.putdata(new_data) img.save(output_path) def paste_center(background_path, overlay_path, output_path, meta_text=None): # 画像読み込み bg = Image.open(background_path).convert("RGBA") ov = Image.open(overlay_path).convert("RGBA") bg_w, bg_h = bg.size ov_w, ov_h = ov.size # 中央座標計算 paste_x = (bg_w - ov_w) // 2 paste_y = (bg_h - ov_h) // 2 # 背景に貼り付け(透明部分を維持) composite = bg.copy() composite.paste(ov, (paste_x, paste_y), mask=ov) #メタデータ編集 metadata = PngImagePlugin.PngInfo() if meta_text is not None: metadata.add_text("caption", str(meta_text)) # 保存 composite.save(output_path, pnginfo=metadata) # 使用例 CITY_CODE = [] for i in CITY_CODE: ii = str(i) print(ii) try: draw_direct_on_background( geojson_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "source", "N03-21_210101.geojson"), target_code=ii, background_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "source", "background.png"), output_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "source_sub", "{0}.png".format(ii)), meta_text="出典:「国土数値情報(行政区域データ)」(国土交通省)(https://nlftp.mlit.go.jp/ksj/gml/datalist/KsjTmplt-N03-v3_0.html)(2025年08月04日取得)を加工して作成", only_mode=False, current_dir=os.path.dirname(os.path.abspath(__file__)) ) except Exception as e: print(" ⇒ miss", e)
警告:メルカトル図表のため、都道府県を中央に置くことでゆがみを軽減するimport geopandas as gpd import matplotlib.pyplot as plt from PIL import Image, PngImagePlugin #pillow import os, io, sys import pyproj from shapely.geometry import MultiPolygon from pyproj import CRS def draw_direct_on_background( geojson_path: str, target_code: str, background_path: str, output_path: str, meta_text: str, only_mode: bool, current_dir, gdf=None ): # 1. GeoJSON 読み込み(WGS84 / EPSG:4326 前提) if gdf is None: gdf = gpd.read_file(geojson_path, encoding="utf-8") gdf = gdf.to_crs(epsg=4326) # 明示的に緯度経度に変換しておく # 2. 対象市町村取得 city = gdf[gdf["N03_007"] == target_code] if city.empty: raise ValueError(f"団体コード {target_code} に該当する市町村が見つかりません") # 3. 県境界取得 pref_name = city.iloc[0]["N03_001"] pref = gdf[gdf["N03_001"] == pref_name] # 4. 県の重心(中央)を計算して投影定義(横メルカトル投影) pref_geometry = pref.union_all() # 県の全体を一つの形状に合成 center_x = pref_geometry.centroid.x center_y = pref_geometry.centroid.y # 5. カスタム投影(横メルカトル + 中央緯度経度を基準に) custom_crs = CRS.from_proj4( f"+proj=tmerc +lat_0={center_y} +lon_0={center_x} +k=1 +x_0=0 +y_0=0 +datum=WGS84 +units=m +no_defs" ) # 6. 投影を適用 gdf = gdf.to_crs(custom_crs) city = gdf[gdf["N03_007"] == target_code] pref = gdf[gdf["N03_001"] == pref_name] if only_mode: pref = city # 4. 背景画像サイズ取得 background = Image.open(background_path).convert("RGBA") bg_w, bg_h = background.size # 5. 地理座標範囲取得(県全体の境界) minx, miny, maxx, maxy = pref.total_bounds dx = maxx - minx dy = maxy - miny # 6. アスペクト比調整用 aspect_img = bg_w / bg_h aspect_map = dx / dy # 7. DPI(ピクセル解像度)調整 dpi = 1200 figsize = (bg_w / dpi, bg_h / dpi) # 8. 描画 fig, ax = plt.subplots(figsize=figsize, dpi=dpi) pref.boundary.plot(ax=ax, color="#{:02x}{:02x}{:02x}".format(*(158, 234, 245)), linewidth=0.02) city.plot(ax=ax, color="#{:02x}{:02x}{:02x}".format(*(158, 234, 245)), edgecolor='none') # 9. 軸調整:県の範囲をそのまま使う(アスペクト比維持) ax.set_xlim(minx, maxx) ax.set_ylim(miny, maxy) ax.axis('off') # 10. 保存:背景と同サイズ・同アスペクト・欠損なし overlay_path_raw = os.path.join(current_dir ,"temp_overlay_exact_raw", "{0}.png".format(target_code)) overlay_path = os.path.join(current_dir ,"temp_overlay_exact", "{0}.png".format(target_code)) os.makedirs(os.path.dirname(overlay_path_raw), exist_ok=True) os.makedirs(os.path.dirname(overlay_path), exist_ok=True) plt.savefig(overlay_path_raw, transparent=True, bbox_inches=None, pad_inches=0) plt.close() make_alpha_opaque(overlay_path_raw, overlay_path) os.makedirs(os.path.dirname(output_path), exist_ok=True) paste_center(background_path, overlay_path, output_path, meta_text) """ # 11. Pillowで合成(サイズ一致するのでズレなし) overlay = Image.open(overlay_path).convert("RGBA") final = Image.alpha_composite(background, overlay) final.save(output_path) """ def make_alpha_opaque(image_path, output_path): # RGBAで読み込み img = Image.open(image_path).convert("RGBA") datas = img.getdata() new_data = [] for r, g, b, a in datas: if a > 63: new_data.append((r, g, b, 255)) # 半透明→完全不透明 else: new_data.append((r, g, b, 0)) # 透明はそのまま img.putdata(new_data) img.save(output_path) def paste_center(background_path, overlay_path, output_path, meta_text=None): # 画像読み込み bg = Image.open(background_path).convert("RGBA") ov = Image.open(overlay_path).convert("RGBA") bg_w, bg_h = bg.size ov_w, ov_h = ov.size # 中央座標計算 paste_x = (bg_w - ov_w) // 2 paste_y = (bg_h - ov_h) // 2 # 背景に貼り付け(透明部分を維持) composite = bg.copy() composite.paste(ov, (paste_x, paste_y), mask=ov) #メタデータ編集 metadata = PngImagePlugin.PngInfo() if meta_text is not None: metadata.add_text("caption", str(meta_text)) # 保存 composite.save(output_path, pnginfo=metadata) # 使用例 CITY_CODE = ["01000", "01100", "01202", "01203", "01204", "01205", "01206", "01207", "47329", "47348", "47350", "47353", "47354", "47355", "47356", "47357", "47358", "47359", "47360", "47361", "47362", "47375", "47381", "47382",] if "__main__" == __name__: geojson_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "source", "N03-21_210101.geojson") gdf = gpd.read_file(geojson_path, encoding="utf-8") gdf = gdf.to_crs(epsg=4326) # 明示的に緯度経度に変換しておく for i in CITY_CODE: ii = str(i) print(ii) try: draw_direct_on_background( geojson_path=geojson_path, target_code=ii, background_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "source", "background.png"), output_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "source_sub", "{0}.png".format(ii)), meta_text="出典:「国土数値情報(行政区域データ)」(国土交通省)(https://nlftp.mlit.go.jp/ksj/gml/datalist/KsjTmplt-N03-v3_0.html)(2025年08月04日取得)を加工して作成", only_mode=False, current_dir=os.path.dirname(os.path.abspath(__file__)), gdf=gdf ) except Exception as e: print(" ⇒ miss", e)
モード対応完全版import geopandas as gpd import matplotlib.pyplot as plt from PIL import Image, PngImagePlugin #pillow import os, io, sys import pyproj from shapely.geometry import MultiPolygon from pyproj import CRS def draw_direct_on_background( geojson_path: str, target_code: str, background_path: str, output_path: str, meta_text: str, only_mode: bool, look_city_mode: bool, current_dir, gdf=None, tag=None, frame_path=None ): tagw = "_{0}".format(tag) if tag is not None else "" expand_ratio = 0.5 # 1. GeoJSON 読み込み(WGS84 / EPSG:4326 前提) if gdf is None: gdf = gpd.read_file(geojson_path, encoding="utf-8") gdf = gdf.to_crs(epsg=4326) # 明示的に緯度経度に変換しておく # 2. 対象市町村取得 city = gdf[gdf["N03_007"] == target_code] if city.empty: raise ValueError(f"団体コード {target_code} に該当する市町村が見つかりません") # 3. 県境界取得 pref_name = city.iloc[0]["N03_001"] pref = gdf[gdf["N03_001"] == pref_name] if not(look_city_mode): # 4. 県の重心(中央)を計算して投影定義(横メルカトル投影)やっぱていぎが違うのでは? pref_geometry = pref.union_all() # 県の全体を一つの形状に合成 center_x = pref_geometry.centroid.x center_y = pref_geometry.centroid.y # 5. カスタム投影(横メルカトル + 中央緯度経度を基準にする) custom_crs = CRS.from_proj4( "+proj=tmerc +lat_0={0} +lon_0={1} +k=1 +x_0=0 +y_0=0 +datum=WGS84 +units=m +no_defs".format( center_y, center_x ) ) # 6. 投影を適用 gdf = gdf.to_crs(custom_crs) city = gdf[gdf["N03_007"] == target_code] pref = gdf[gdf["N03_001"] == pref_name] if only_mode: pref = city # 4. 背景画像サイズ取得 background = Image.open(background_path).convert("RGBA") bg_w, bg_h = background.size # 5. 地理座標範囲取得(県全体の境界) minx, miny, maxx, maxy = pref.total_bounds dx = maxx - minx dy = maxy - miny xlim = (minx, maxx) ylim = (miny, maxy) if look_city_mode: minx, miny, maxx, maxy = city.total_bounds dx = maxx - minx dy = maxy - miny xlim = (minx - dx * expand_ratio, maxx + dx * expand_ratio) ylim = (miny - dy * expand_ratio, maxy + dy * expand_ratio) # 6. アスペクト比調整用 aspect_img = bg_w / bg_h aspect_map = dx / dy # 7. DPI(ピクセル解像度)調整 dpi = 2400 figsize = (bg_w / dpi, bg_h / dpi) if look_city_mode: figsize = (bg_w*(1+expand_ratio) / dpi, bg_h*(1+expand_ratio) / dpi) #print("({0}, {1}) = ({2}, {3})".format(dx, dy, bg_w, bg_h)) # 8. 描画 fig, ax = plt.subplots(figsize=figsize, dpi=dpi) fig.patch.set_alpha(0) #透明背景 ax.axis("off") #軸のメモリ無し if not(only_mode): pref.boundary.plot(ax=ax, color="#{:02x}{:02x}{:02x}".format(*(148, 234, 245)), linewidth=0.04 if look_city_mode else 0.02) city.plot(ax=ax, color="#{:02x}{:02x}{:02x}".format(*(94, 224, 240)), edgecolor='none') # 9. 軸調整:?の範囲とする(アスペクト比維持) ax.set_xlim(*xlim) ax.set_ylim(*ylim) #ax.axis('off') # 10. 保存:背景と同サイズ・同アスペクト・欠損なし overlay_path_raw = os.path.join(current_dir ,"temp", "temp_overlay_exact_raw{0}".format(tagw), "{0}.png".format(target_code)) overlay_path = os.path.join(current_dir ,"temp", "temp_overlay_exact{0}".format(tagw), "{0}.png".format(target_code)) os.makedirs(os.path.dirname(overlay_path_raw), exist_ok=True) os.makedirs(os.path.dirname(overlay_path), exist_ok=True) plt.savefig( overlay_path_raw, transparent=True, #facecolor="#{:02x}{:02x}{:02x}".format(*(214, 214, 214)), #bbox_inches=None, bbox_inches="tight", pad_inches=0 ) plt.close() make_alpha_opaque(overlay_path_raw, overlay_path) os.makedirs(os.path.dirname(output_path), exist_ok=True) paste_center(background_path, overlay_path, output_path, meta_text) if frame_path is not None: overlay_png_exact(output_path, frame_path, output_path) #枠線付与 """ # 11. Pillowで合成(サイズ一致するのでズレなし) overlay = Image.open(overlay_path).convert("RGBA") final = Image.alpha_composite(background, overlay) final.save(output_path) """ def overlay_png_exact(base_path, overlay_path, output_path): #単純上書き合成関数 """ base_path : 背景PNG画像(RGBA) overlay_path : 重ねるPNG画像(RGBA、透明部分あり) output_path : 合成結果を保存するパス """ base = Image.open(base_path).convert("RGBA") overlay = Image.open(overlay_path).convert("RGBA") if base.size != overlay.size: raise ValueError("画像サイズが一致しません") # 合成:overlay のアルファを使って上書き result = Image.alpha_composite(base, overlay) result.save(output_path) def make_alpha_opaque(image_path, output_path): # RGBAで読み込み img = Image.open(image_path).convert("RGBA") datas = img.getdata() new_data = [] for r, g, b, a in datas: if a > 63: new_data.append((r, g, b, 255)) # 半透明→完全不透明 else: new_data.append((r, g, b, 0)) # 透明はそのまま img.putdata(new_data) img.save(output_path) def paste_center(background_path, overlay_path, output_path, meta_text=None, frame_path=None): # 画像読み込み bg = Image.open(background_path).convert("RGBA") ov = Image.open(overlay_path).convert("RGBA") bg_w, bg_h = bg.size ov_w, ov_h = ov.size # 中央座標計算 paste_x = (bg_w - ov_w) // 2 paste_y = (bg_h - ov_h) // 2 # 背景に貼り付け(透明部分を維持) composite = bg.copy() composite.paste(ov, (paste_x, paste_y), mask=ov) #メタデータ編集 metadata = PngImagePlugin.PngInfo() if meta_text is not None: metadata.add_text("caption", str(meta_text)) # 保存 composite.save(output_path, pnginfo=metadata) # 使用例 CITY_CODE = ["01000", "01100", "01202", "01203", "01204", "01205", "01206", "01207", "01208", "01209", "01210", "01211", "01212", "01213", "01214", "01215", "01216", "01217", "01218", "01219", "01220", "01221", "01222", "01223", "01224", "01225", "01226", "01227", "01228", "01229", "01230", "01231", "01233", "01234", "01235", "01236", "01303", "01304", "01331", "01332", "01333", "01334", "01337", "01343", "01345", "01346", "01347", "01361", "01362", "01363", "01364", "01367", "01370", "01371", "01391", "01392", "01393", "01394", "01395", "01396", "01397", "01398", "01399", "01400", "01401", "01402", "01403", "01404", "01405", "01406", "01407", "01408", "01409", "01423", "01424", "01425", "01427", "01428", "01429", "01430", "01431", "01432", "01433", "01434", "01436", "01437", "01438", "01452", "01453", "01454", "01455", "01456", "01457", "01458", "01459", "01460", "01461", "01462", "01463", "01464", "01465", "01468", "01469", "01470", "01471", "01472", "01481", "01482", "01483", "01484", "01485", "01486", "01487", "01511", "01512", "01513", "01514", "01516", "01517", "01518", "01519", "01520", "01543", "01544", "01545", "01546", "01547", "01549", "01550", "01552", "01555", "01559", "01560", "01561", "01562", "01563", "01564", "01571", "01575", "01578", "01581", "01584", "01585", "01586", "01601", "01602", "01604", "01607", "01608", "01609", "01610", "01631", "01632", "01633", "01634", "01635", "01636", "01637", "01638", "01639", "01641", "01642", "01643", "01644", "01645", "01646", "01647", "01648", "01649", "01661", "01662", "01663", "01664", "01665", "01667", "01668", "01691", "01692", "01693", "01694", "01695", "01696", "01697", "01698", "01699", "01700", "02000", "02201", "02202", "02203", "02204", "02205", "02206", "02207", "02208", "02209", "02210", "02301", "02303", "02304", "02307", "02321", "02323", "02343", "02361", "02362", "02367", "02381", "02384", "02387", "02401", "02402", "02405", "02406", "02408", "02411", "02412", "02423", "02424", "02425", "02426", "02441", "02442", "02443", "02445", "02446", "02450", "03000", "03201", "03202", "03203", "03205", "03206", "03207", "03208", "03209", "03210", "03211", "03213", "03214", "03215", "03216", "03301", "03302", "03303", "03321", "03322", "03366", "03381", "03402", "03441", "03461", "03482", "03483", "03484", "03485", "03501", "03503", "03506", "03507", "03524", "04000", "04100", "04202", "04203", "04205", "04206", "04207", "04208", "04209", "04211", "04212", "04213", "04214", "04215", "04216", "04301", "04302", "04321", "04322", "04323", "04324", "04341", "04361", "04362", "04401", "04404", "04406", "04421", "04422", "04424", "04444", "04445", "04501", "04505", "04581", "04606", "05000", "05201", "05202", "05203", "05204", "05206", "05207", "05209", "05210", "05211", "05212", "05213", "05214", "05215", "05303", "05327", "05346", "05348", "05349", "05361", "05363", "05366", "05368", "05434", "05463", "05464", "06000", "06201", "06202", "06203", "06204", "06205", "06206", "06207", "06208", "06209", "06210", "06211", "06212", "06213", "06301", "06302", "06321", "06322", "06323", "06324", "06341", "06361", "06362", "06363", "06364", "06365", "06366", "06367", "06381", "06382", "06401", "06402", "06403", "06426", "06428", "06461", "07000",] TAG = "OPEN" CONTROL_LIST = [ ("市町村視点_都道府県除外", True, True), #("都道府県視点_都道府県除外", False, True), #意味なし ("市町村視点_都道府県付き", True, False), ("都道府県視点_都道府県付き", False, False), ] if "__main__" == __name__: geojson_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "source", "N03-21_210101.geojson") gdf = gpd.read_file(geojson_path, encoding="utf-8") gdf = gdf.to_crs(epsg=4326) # 明示的に緯度経度に変換しておく for i in CITY_CODE: for tag, look_city_mode, only_mode in CONTROL_LIST: ii = str(i) print(ii, "(", tag, ")") try: draw_direct_on_background( geojson_path=geojson_path, target_code=ii, background_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "source", "background.png"), output_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "source_sub_{0}".format(tag), "{0}.png".format(ii)), meta_text="出典:「国土数値情報(行政区域データ)」(国土交通省)(https://nlftp.mlit.go.jp/ksj/gml/datalist/KsjTmplt-N03-v3_0.html)(2025年08月04日取得)を加工して作成", only_mode=only_mode, #都道府県完全無視モード look_city_mode=look_city_mode, #市町村中心描画モード current_dir=os.path.dirname(os.path.abspath(__file__)), gdf=gdf, tag=tag, frame_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "source", "frame.png") ) except Exception as e: print(" ⇒ miss", e)
データなどは国土地理院からダウンロードする
独立ファイル構成チェック機構(interpreter除外)
独立ファイル構成チェック機構(バッチ修正)
独立ファイル構成チェック機構: Ver:1.6.0(プロセスに比較辞書を渡す部分を高速化)
独立ファイル構成チェック機構: Ver:1.6.4(設定変更無視フラグの宣伝性強化)
独立ファイル構成チェック機構: Ver:1.6.5(PermissionError時にエラーが波及する問題があったため、修正)
sample json{ "CONFIG_HASH": "5bf50300e7d96519c3a806f049992cf0cf871539e9086bf5835cf461fd4a6d4bed40a8d25e3db34e601d024d4380fdfac69da47fb7ad0933aedb96a21da7be95", "RAW_SYSTEM_STRUCTURE_HASH": "7bb1f3e592104f780428816f14534468c4ed64e06be4d664989460ac9ffddfbf35b3e1797bbd4a2c4be56495ed1283f7fb465c05fb898fbc4fe70dcb3444eeac", "RAW_SYSTEM_STRUCTURE_LIST": [ "Inspecter", "Inspecter/Execute.bat", "Inspecter/Interpreter", "Inspecter/Interpreter/LICENSE.txt", "Inspecter/Interpreter/_asyncio.pyd", "Inspecter/Interpreter/_bz2.pyd", "Inspecter/Interpreter/_ctypes.pyd", "Inspecter/Interpreter/_decimal.pyd", "Inspecter/Interpreter/_elementtree.pyd", "Inspecter/Interpreter/_hashlib.pyd", "Inspecter/Interpreter/_lzma.pyd", "Inspecter/Interpreter/_msi.pyd", "Inspecter/Interpreter/_multiprocessing.pyd", "Inspecter/Interpreter/_overlapped.pyd", "Inspecter/Interpreter/_queue.pyd", "Inspecter/Interpreter/_socket.pyd", "Inspecter/Interpreter/_sqlite3.pyd", "Inspecter/Interpreter/_ssl.pyd", "Inspecter/Interpreter/_uuid.pyd", "Inspecter/Interpreter/_zoneinfo.pyd", "Inspecter/Interpreter/libcrypto-1_1.dll", "Inspecter/Interpreter/libffi-8.dll", "Inspecter/Interpreter/libssl-1_1.dll", "Inspecter/Interpreter/pyexpat.pyd", "Inspecter/Interpreter/python.cat", "Inspecter/Interpreter/python.exe", "Inspecter/Interpreter/python3.dll", "Inspecter/Interpreter/python311._pth", "Inspecter/Interpreter/python311.dll", "Inspecter/Interpreter/python311.zip", "Inspecter/Interpreter/pythonw.exe", "Inspecter/Interpreter/select.pyd", "Inspecter/Interpreter/sqlite3.dll", "Inspecter/Interpreter/unicodedata.pyd", "Inspecter/Interpreter/vcruntime140.dll", "Inspecter/Interpreter/vcruntime140_1.dll", "Inspecter/Interpreter/winsound.pyd", "Inspecter/Scripts", "Inspecter/Scripts/config.py", "Inspecter/Scripts/constant.py", "Inspecter/Scripts/inspect.py", "Inspecter/Scripts/memorize.py", "Inspecter_1_6_5.zip", "Inspecter_1_6_5_NonInterpreter.zip", "bk", "bk/Inspecter_1_4_1.zip", "bk/Inspecter_1_4_1_NonInta.zip", "bk/Inspecter_1_4_2.zip", "bk/Inspecter_1_4_2_NonInterpreter.zip", "bk/Inspecter_1_4_3.zip", "bk/Inspecter_1_4_3_NonInterpreter.zip", "bk/Inspecter_1_5_0.zip", "bk/Inspecter_1_5_0_NonInterpreter.zip", "bk/Inspecter_1_6_0.zip", "bk/Inspecter_1_6_0_NonInterpreter.zip", "bk/Inspecter_1_6_1.zip", "bk/Inspecter_1_6_1_NonInterpreter.zip", "bk/Inspecter_1_6_2.zip", "bk/Inspecter_1_6_2_NonInterpreter.zip", "bk/Inspecter_1_6_3.zip", "bk/Inspecter_1_6_3_NonInterpreter.zip", "bk/Inspecter_1_6_4.zip", "bk/Inspecter_1_6_4_NonInterpreter.zip" ],
"SYSTEM_STRUCTURE_HASH": "776cdd1e5f33ca3044b9daa35bd80c4d51905a196a6bf627eedf60350b4d3b3dd11adfd4193a80f8c129ea1daa2345963e63549fe3a8e15024d02035598d0abb", "SYSTEM_STRUCTURE_LIST": [ "Inspecter", "Inspecter/Execute.bat", "Inspecter/Interpreter", "Inspecter/Interpreter/LICENSE.txt", "Inspecter/Interpreter/_asyncio.pyd", "Inspecter/Interpreter/_bz2.pyd", "Inspecter/Interpreter/_ctypes.pyd", "Inspecter/Interpreter/_decimal.pyd", "Inspecter/Interpreter/_elementtree.pyd", "Inspecter/Interpreter/_hashlib.pyd", "Inspecter/Interpreter/_lzma.pyd", "Inspecter/Interpreter/_msi.pyd", "Inspecter/Interpreter/_multiprocessing.pyd", "Inspecter/Interpreter/_overlapped.pyd", "Inspecter/Interpreter/_queue.pyd", "Inspecter/Interpreter/_socket.pyd", "Inspecter/Interpreter/_sqlite3.pyd", "Inspecter/Interpreter/_ssl.pyd", "Inspecter/Interpreter/_uuid.pyd", "Inspecter/Interpreter/_zoneinfo.pyd", "Inspecter/Interpreter/libcrypto-1_1.dll", "Inspecter/Interpreter/libffi-8.dll", "Inspecter/Interpreter/libssl-1_1.dll", "Inspecter/Interpreter/pyexpat.pyd", "Inspecter/Interpreter/python.cat", "Inspecter/Interpreter/python.exe", "Inspecter/Interpreter/python3.dll", "Inspecter/Interpreter/python311._pth", "Inspecter/Interpreter/python311.dll", "Inspecter/Interpreter/python311.zip", "Inspecter/Interpreter/pythonw.exe", "Inspecter/Interpreter/select.pyd", "Inspecter/Interpreter/sqlite3.dll", "Inspecter/Interpreter/unicodedata.pyd", "Inspecter/Interpreter/vcruntime140.dll", "Inspecter/Interpreter/vcruntime140_1.dll", "Inspecter/Interpreter/winsound.pyd", "Inspecter/Scripts", "Inspecter/Scripts/constant.py", "Inspecter/Scripts/inspect.py", "Inspecter/Scripts/memorize.py", "Inspecter_1_6_5.zip", "Inspecter_1_6_5_NonInterpreter.zip", "bk", "bk/Inspecter_1_4_1.zip", "bk/Inspecter_1_4_1_NonInta.zip", "bk/Inspecter_1_4_2.zip", "bk/Inspecter_1_4_2_NonInterpreter.zip", "bk/Inspecter_1_4_3.zip", "bk/Inspecter_1_4_3_NonInterpreter.zip", "bk/Inspecter_1_5_0.zip", "bk/Inspecter_1_5_0_NonInterpreter.zip", "bk/Inspecter_1_6_0.zip", "bk/Inspecter_1_6_0_NonInterpreter.zip", "bk/Inspecter_1_6_1.zip", "bk/Inspecter_1_6_1_NonInterpreter.zip", "bk/Inspecter_1_6_2.zip", "bk/Inspecter_1_6_2_NonInterpreter.zip", "bk/Inspecter_1_6_3.zip", "bk/Inspecter_1_6_3_NonInterpreter.zip", "bk/Inspecter_1_6_4.zip", "bk/Inspecter_1_6_4_NonInterpreter.zip" ], "SYSTEM_FILE_HASH_MAP": { "Inspecter/Interpreter/select.pyd": "4fb23e92d729d2e10bd9baf6652878891a44067456ef7ac188bb2f82b0e99fc70c7a65e24b114a6fca77f7ae94a5c620df57c4c6359e78429bdf553001d4acbb", "Inspecter_1_6_5.zip": "529e9bae4a028f9ae6653ae49715c72a335f886e583f8e1f5fa72df7809d90a6e54890451fde541fe2129ef1a718d750160e17dccb8dce45311fdd599e58fcf1", "Inspecter/Scripts/inspect.py": "e0c85d347b61bcfcd14c35748951b979416ceaaabc2453997236db511660d49c5b52e1b0ec6b0cae5a50c071592e5f147e151d47d829df591bc68078d794e11d", "bk/Inspecter_1_4_3_NonInterpreter.zip": "473042612d7f5452c1518e92edd4ccff2fdccbf9d6841bb36ba185ec89cbf57d9758706eeca1d4de7e6e2d3fff062b054c79e1a4977c44d8ab43c630ef79f191", "Inspecter/Interpreter/_queue.pyd": "9ab7639055e358854a602d05797a7fa9c0b62a3f53b017d8a4cd614444da3a18d9a444f5cf7002b257a00f00fae917df9122bc1a0dfee2817ee2867a47eadd2b", "Inspecter/Interpreter/LICENSE.txt": "7a1d89b9772d70372d16729667952c65cf5db1f9c87ec5cc2dce7d7028ab8ea76cad4e518be650b3b4d5d96294aea78bcea216c65244c3b1c00b2a07305baf42", "Inspecter/Interpreter/_uuid.pyd": "9e1beb704be9731a7c3d453da77ae97751db596890b107bbf9ad402b92e1fb3b9ce55adf870e44b464ccae4df667ad62d58ef6e669db47191b589c0e01ac0086", "bk/Inspecter_1_6_2_NonInterpreter.zip": "9df1f640908befcadea3a3e519c1dd4d4a952d28f57507b1c7fd7c11387228e05c892e12b72cca865f25b4614979be0d24824757003a301c8ca3bc96198789b8", "Inspecter/Interpreter/python311.dll": "c68dea707258b51e047379f9ecc7f84d09cdbc8e2a393c4795ce99792b69e5366b43309250de45ac57d56847234b2c54a869a9f0f5ff7b955299e22375b532c2", "bk/Inspecter_1_4_1.zip": "1f29395f7bbc5f40c3ec98b9c9e91cf50db3af83f34b2ab6b715177cf511362cbb595a27a6fc37f9987eb301b1ff7233f864c3fb8ce4923d557eeb7d0322ea75", "bk/Inspecter_1_6_3.zip": "21667739c00c2db4bf3a97b38c152ea71dde9cf06521d68bb98a6edf842f78e6fd1177efa52b503aec5965acbe211230a2280c4aa83e04f85766c593ae48aff3", "Inspecter/Scripts/config.py": "7ab6444f35f49edb645046715b1dbc44a2d67482ea83431a91e945dcdf4c485c4123fa5d5104e77d120a489ce387ff8d27c31d95522907aa955d382d4bd44614", "Inspecter/Interpreter/_ctypes.pyd": "f8d288e6371fd436b59c8868bba83943be9dd5a677ec2706004f698bb74211c0bc87ab02aeff007196fa409eede8d2a415ca34f765c47110280f43991cb59eda", "Inspecter_1_6_5_NonInterpreter.zip": "96c09ac1bfbf6cd5e6df5187c840b352eae27d0f8da0c94d279529f4bf2d8e9ba36537e6598a4b87b6cf2932809bae5e8eb51bbc0235fcaafa071bba7009bf14", "bk/Inspecter_1_6_2.zip": "9df1f640908befcadea3a3e519c1dd4d4a952d28f57507b1c7fd7c11387228e05c892e12b72cca865f25b4614979be0d24824757003a301c8ca3bc96198789b8", "bk/Inspecter_1_5_0_NonInterpreter.zip": "e89177bd87c5fd8a31e39b3e936b3d0f0582d4319ce18dceb3af34b0ca61c2632bea32e8d1fa38a7d67759ed4e0654cfb694f21dc3f2d839d154404e32703d95", "Inspecter/Interpreter/sqlite3.dll": "9e69a1ea979ecc8278bff83646b01e62c3e1cf611edc4e96ce3dc034bc609b7ea48c5571c4f051952ef2803c35b8c0afb58ecee33e8fb35df32a24662046f53e", "Inspecter/Interpreter/_overlapped.pyd": "89e9883a168c41c959da249802192675b774be2e2e4af9853445efe484c453537a7a96e313d96b570a70b95e78d22e838c03d432f4404cddd247df0282cc52bf", "Inspecter/Interpreter/python311._pth": "25fd39dbb61c7f10fefd8c15726e52351ce5cfca3d69a8f4accf7f3c9850410e325d723d3f9fdc935a3a4c9eabf1507f667fcbb850deb9d3c593957b9c1be544", "bk/Inspecter_1_4_2.zip": "a8843c3caf9fd860dce8a0b11db7404e4957203a97dbbe083fe9ada6b69a193cd9991b4474c8607c1d61f03d8c1c59208a5c2e7eececb9490c65b48d2836c8ab", "bk/Inspecter_1_6_1_NonInterpreter.zip": "bbc5ec66f35c1ef4b07eeb5a84a05a97464fc0bad45bedd7d66005df6c787c831b8e1352008cafe206953f2341ca1a741001bce60d2e8d8db28dad7167e6b7f3", "bk/Inspecter_1_4_2_NonInterpreter.zip": "06b2c1344697b2e3bd9184d0772133ad897c795b3e4e526b6cb914bd4db729e62ce7ae18ead1f4e6631ebfe11dd51a04184e552960965b2722cc5bbdfbb98c71", "Inspecter/Interpreter/pythonw.exe": "395dc6db063022be6f0455208233713259da8fdefec3fdf90be38f59acafe19ae00141307ae06d6db768022fa8d198a2cdc020b3a98389d8821b7aebb9d1ebe1", "Inspecter/Scripts/constant.py": "273b0db930a01149db30d8c66a292f5cf514fadfcf4bc24e6c52e14afa8b57ea7f4bbd7d70d6496bdea6c82d8f9e6cc62f9e802e92eece3d05452ae46f7a03ab", "Inspecter/Interpreter/libcrypto-1_1.dll": "7876d16ff23a548929704401c3c21eb52573109ef56d30be403c1e28abf2fc7adb8755fe88a325f225811bb90e3f36dc24bbc52a1067cc6e50b5ee4b43ced1d0", "Inspecter/Interpreter/python311.zip": "eee265ed168c3301f686418571d3bac1febb9f8ed9aefcccfa40c196a5a9874f80d0543979794c9971f8a0099ff992bc8609fa61d50b4e4136bde03128861d22", "Inspecter/Interpreter/_asyncio.pyd": "e9e5c593b9cdb0df6305eca3be172013a9151ce2c7059128b4fad8846368b832c12a23fcd9ca6435374c3775a69c2428c2bd30d03b23744a83c3cfbeb1cb31a0", "Inspecter/Interpreter/libssl-1_1.dll": "c3d9b36563b56e2b56395e08b3839e7b9f4142f6ab4550644743ac29e65e761e7ba0cd1bbd74ddde90ca069792f9bbf146df99982dba8f04992b14db9cd528aa", "Inspecter/Interpreter/_lzma.pyd": "b7391f04a2b0e6732ba43f554f2a37c78eeae47989a1bd9be5102d66db0bb99432061b93ebaffce0024b8257a0cce622f070a38fb447a2f355deb86de5dd1103", "Inspecter/Interpreter/unicodedata.pyd": "7fb46a3f8b9d29c87e630b75034178f5baaaac8627ea0a7ab9fc632a3366ad1c1d4e404878324059d0ad25ab53c9bb7b201456ff69ab1a3fa3de4e3353f61f1c", "Inspecter/Interpreter/_multiprocessing.pyd": "e013ecc410de96c348bbcbb4105e2862b2080440872aa1d36458e7f9710e5ed246eb0b437b31654f2bd885d8d2fc2291094c019d97d647eb47ee79bc97183a70", "Inspecter/Interpreter/pyexpat.pyd": "2bd5c5b796ab879ba5aabc38d9f5e137630eb71aeb7b1ef7f2ac85abc597eb3d2ff2ce02e2a78f84ee33281baa42706bb6044a0f9156e08037541182735e63d0", "bk/Inspecter_1_6_0_NonInterpreter.zip": "ca41d0120667552bc3690264cbf009ff1b1d21ad3c554f8fd0d7fe29abb692a7be424e3c6cca6792bec019b56dcf4159a81fe840ef48a67f6641fed7684c8055", "Inspecter/Interpreter/_socket.pyd": "e84d0994e3740768320d06a5f8c853466513c6164226f4d53c02d2702931c7b21068ceaa3a843d18e411cfede7f7573c08504a2cd6f7399609f4e1ce33d01e85", "bk/Inspecter_1_6_3_NonInterpreter.zip": "e453e58bc6c6774b641e86f69e73fb89932f364d75b03a4970ef515385f306a09ffb3abefc7da6b2c126936332f8e8a0e8d9198d2e10cfe705076ee0b5842f05", "bk/Inspecter_1_6_1.zip": "838f4ccd060a1075815a87c72b5634a16a8d2042523029d443f4d87291a12bf2c7d7bbdc3073c77542a452a88d9afef603b93116f6ebb40318aa03da87352ffd",
"Inspecter/Interpreter/python3.dll": "909ff94e7be5d1a0bf6ffeb8807d10bd21babd5bb2ee3acdaabef3557e5df8dd5d3b3cb96f4ca7b85559349d91d707c7ceb65348a80dbf3c21fceca4143bd01d", "bk/Inspecter_1_6_4.zip": "a46c5080e58ab7d4987a794518c9bd305b576b85ddd441e12e82e5a9dd77d8c2ded20f58055295150f421f769b3025553269ae31ccbc13de90e0a962f6901604", "Inspecter/Interpreter/_elementtree.pyd": "5dd4d03427b6a061a7f81a8cd27589dd971c88f89588603e7c847f653eea57ada4414637b4ea9c82916a30a56c55a8c449b84b8b4758892dc7e006523e9f4afa", "Inspecter/Interpreter/_bz2.pyd": "e27a60b2309cff4abeefffff008d2393d531fc6da056d7af4237061dee14a12189151c8034d66b554877a4aa5aa9660baf1ccd124f3326f8d179dc86d5704d40", "Inspecter/Interpreter/_decimal.pyd": "7df06e7f2be248f97d889eb09bf90c92f9783dfef8f877800675b5778cc05d40c42dba5447412f329c5401a83d9ea9cd874f241f5e0fe86f3c102a0fb6eea577", "bk/Inspecter_1_6_0.zip": "90fe75bebb179a09fb18e613c91ba3d33f20eb62076bb60c37ab3e48388065ef55774bf3364094442fe3ff6e082924ed47bf8d7f16a9529677c0b2e9e7634efa", "Inspecter/Interpreter/libffi-8.dll": "ed547ef2d350139718edabcaf6733a6e1808497d367212170f14845f5e8dee0487a32cffb1a2c8e178ef0763c21d6f9c765a490091e45eb63a3399c87113c996", "bk/Inspecter_1_4_3.zip": "e38a212fc7db0bb2b924e0cac3b5aeca278def8f31f70f3daf017c32f5107ed2976426c209ff4b3eb64323a9848cabfb864812a9ee7db334ebdf6b274ff595be", "Inspecter/Interpreter/_zoneinfo.pyd": "5792254b2e0a4351abba9d39b0b160bd2c857a909bcbb40ba5c43c6aa64795df0862298f5e90b28d488c7d8b4efbc07f92cb7c563075864b3b3ce2979be00f6f", "Inspecter/Interpreter/vcruntime140.dll": "a97535b563c71f3b5b352112421de92ea078be50024ec6c39a8cf5cc43d46c1b4aec837370c80fcdebacf545f933a8ddcf81cab459f7fd0e6f7e3cbf67437040", "bk/Inspecter_1_5_0.zip": "7b65db430f48375a69834a642f37901497d662aa3994d9c48f547516968b40a6898828a17f3ee5ac5e1d8234d03a29b4c2bfec3cdd3a9a4ce8a5bcdadd7be597", "Inspecter/Interpreter/python.exe": "7c622f14764679bd54b0db9dfd318a03c8fb131433655b1f15541f5134d032bb55388670162727973991be60ce2c5dc91ef4de635d80b9d9061ef6eb51fc9bfa", "Inspecter/Scripts/memorize.py": "ba5747c50adb40a8fc1d6453fc64cf483bec7c5702bd1454d58762a02cc320114a9f455856645ff95f676210cb5dc76607b57dbd498a70f7e41d75ce7387ae20", "Inspecter/Interpreter/_hashlib.pyd": "7c0b42e1c6c093ff18bbdbd166bffd078c5d29ae1a1455027adbfb96e7fef0b434e504e86f51a32390ff4771f3793e66642ae6677b4e69e753aac71c95c5579b", "Inspecter/Interpreter/_sqlite3.pyd": "38dd4f3ebdde122808b8d9dda75521fa2338d4484ffa8008121cf0d998624a5d1f40952232a082ee6f581b84331cbcfb500b0bf4183ee940cd928102299d6c1f", "bk/Inspecter_1_4_1_NonInta.zip": "d7d2c7dde93a6ea1d1c2b668d61229e66b914fa3c5336e2c52e42a326ab1e45dcedd5145dadbfbca4687258ff68bad4e55d72a583c08997b787fa86f24c076f4", "bk/Inspecter_1_6_4_NonInterpreter.zip": "be3cc4ae8306afe849262c54cb00721d6b9b98ef1eacaea4e84589e1d994206c90b950aa9f0eaf86285f7111cae7a818955159ac86de65312dbbd8d114ee453f", "Inspecter/Interpreter/_msi.pyd": "cc412e9e49a2c982cc380988db050ca3d9394de40b21505ad390ff4baa78767a41efeb68b2f348dad65b17cb6c5aca90581982657136aff5aeca6ddaa6a4cdf2", "Inspecter/Execute.bat": "50fce428dbeba065c509191d4e8fbc5e164e5819e17e3e7df2fdff1f93d7895f9eb527bd0cabc15fe3e4f9fb4e534cbb25dab66c3cddae021132f062f96462ff", "Inspecter/Interpreter/winsound.pyd": "11cc8013f1e146ee09be2dcb89ace2d8976d6cdba080c500904a4b55f0070348eacceba6b91572f131651718f4a619fcaa06a7ae0540ce3b3c422e79a1468221", "Inspecter/Interpreter/_ssl.pyd": "99f309054c03f9bb389f1732d25661b50affe0e540a0fcc59774829e225a0fc8b1d220cdb443f41133992c092be02d9d01b4668d7f1f634c22671f13ef6d72a8", "Inspecter/Interpreter/vcruntime140_1.dll": "8cfdb0e2b340efd6099659f05822e40df6a067c553292ac12513c09393b78b5be60f4c42fe6f904acf079ff1d5b036b65fc07260c305dee66ad9df4f7d8c8f50", "Inspecter/Interpreter/python.cat": "6ee6f190b3faefcb81d1bf1be51c83914ef3c2323b19e7aaf4190e37d76430b7b1a486a9e3d74e81ec1a155dea1077755ed83eac2ea5b9e5cd71895bcf54109c" }, "CREATE_START_DATETIME": "2025/09/30 08:11:36", "CREATE_FINISH_DATETIME": "2025/09/30 08:11:37" }
V10 CSV系列import csv def read_csv(path, encoding="utf-8"): with open(path, mode="r", encoding=encoding, newline="") as f: reader = csv.reader(f) return list(reader) def write_csv(path, rows, encoding="utf-8"): with open(path, mode="w", encoding=encoding, newline="") as f: writer = csv.writer(f, quoting=csv.QUOTE_ALL, lineterminator="\n") writer.writerows(rows)
画像の自動結合ツール開発。import os os.chdir(os.path.dirname(os.path.abspath(__file__))) import cv2 import numpy as np from pathlib import Path from collections import defaultdict # ============================== # ディレクトリ設定 # ============================== INPUT_ROOT = Path("./INPUT") OUTPUT_ROOT = Path("./OUTPUT") # ============================== # パラメータ設定(要調整) # ============================== TOP_IGNORE_PX = 80 # 下側画像の上部固定領域 BOTTOM_IGNORE_PX = 80 # 上側画像の下部固定領域 MIN_OVERLAP_PX = 30 MAX_OVERLAP_PX = 500 MAX_IDENTICAL_RATIO = 0.01 # 完全一致行がこれ以上なら無効 # ============================== # 完全一致行の割合を計算 # ============================== def identical_row_ratio(a, b): identical = 0 h = a.shape[0] for i in range(h): if np.array_equal(a[i], b[i]): identical += 1 return identical / h if h > 0 else 1.0 # ============================== # 重なり検知 # ============================== def detect_overlap(img_upper, img_lower): h1 = img_upper.shape[0] h2 = img_lower.shape[0] gray1 = cv2.cvtColor(img_upper, cv2.COLOR_BGR2GRAY) gray2 = cv2.cvtColor(img_lower, cv2.COLOR_BGR2GRAY) upper_end = h1 - BOTTOM_IGNORE_PX lower_start = TOP_IGNORE_PX best_score = -1.0 best_overlap = 0 max_search = min( MAX_OVERLAP_PX, upper_end, h2 - lower_start ) for overlap in range(MIN_OVERLAP_PX, max_search): part1 = gray1[upper_end - overlap : upper_end] part2 = gray2[lower_start : lower_start + overlap] # 完全一致行が多すぎる overlap は除外 if identical_row_ratio(part1, part2) > MAX_IDENTICAL_RATIO: continue score = cv2.matchTemplate( part1, part2, cv2.TM_CCOEFF_NORMED )[0][0] if score > best_score: best_score = score best_overlap = overlap return best_overlap # ============================== # 画像結合 # ============================== def stitch_images(images): result = images[0] for next_img in images[1:]: overlap = detect_overlap(result, next_img) if overlap > 0: upper = result[:-overlap, :] lower = next_img[overlap:, :] result = np.vstack([upper, lower]) else: # overlap が検出できなかった場合は単純結合 result = np.vstack([result, next_img]) return result # ============================== # ディレクトリ単位処理 # ============================== def process_directory(input_dir: Path): groups = defaultdict(list) for bmp in sorted(input_dir.glob("*.bmp")): key = bmp.name[0] groups[key].append(bmp) output_dir = OUTPUT_ROOT / input_dir.name output_dir.mkdir(parents=True, exist_ok=True) for key, files in groups.items(): images = [cv2.imread(str(f)) for f in files] if any(img is None for img in images): raise RuntimeError(f"画像読み込み失敗: {files}") stitched = stitch_images(images) output_path = output_dir / f"{key}.bmp" cv2.imwrite(str(output_path), stitched) print(f"[OK] 出力: {output_path}") # ============================== # メイン処理 # ============================== def main(): OUTPUT_ROOT.mkdir(exist_ok=True) for subdir in INPUT_ROOT.iterdir(): if subdir.is_dir(): print(f"処理中: {subdir}") process_directory(subdir) if __name__ == "__main__": main()
プロトタイプCSV解析機import logging import openpyxl from openpyxl.styles import PatternFill, Font, Border, Side from openpyxl.cell.text import InlineFont from openpyxl.cell.rich_text import TextBlock, CellRichText from openpyxl.utils import get_column_letter from datetime import datetime from typing import List, Any, Callable, Tuple, Set, Dict, Optional class ExcelDataProcessor: def __init__( self, data_2d_list: List[List[Any]], output_excel_file_path: str, output_log_file_path: str, output_summary_file_path: str, header_list: Optional[List[str]] = None, output_excel_design_row_mode: str = 'NONE', output_excel_design_column_mode: str = 'NONE', filter_function: Callable[[Any], str] = lambda x: str(x).strip() if x is not None else "", check_function: Callable[[str], Tuple[bool, Set[int]]] = lambda x: (False, set()), target_cell_color: str = "FFFF00", # 対象セル/行の色 (Yellow) target_char_color: str = "FF0000", # 対象文字の色 (Red) header_default_color: str = "D9D9D9", # 【追加】ヘッダーのデフォルト背景色 (Gray) no_column_default_color: str = "E0E0E0" # 【追加】No.列のデフォルト背景色 (Light Gray) ): self.raw_data = data_2d_list self.excel_path = output_excel_file_path self.log_path = output_log_file_path self.summary_path = output_summary_file_path self.header_list = header_list self.row_mode = output_excel_design_row_mode.upper() self.col_mode = output_excel_design_column_mode.upper() self.filter_func = filter_function self.check_func = check_function # 色設定 (#除去) self.target_bg_color = target_cell_color.replace("#", "") self.target_font_color = target_char_color.replace("#", "") self.header_default_bg = header_default_color.replace("#", "") self.no_col_default_bg = no_column_default_color.replace("#", "") self._setup_logger() self.analysis_result = [] self.target_cols_indices = set() self.summary_stats = {"total_rows": 0, "target_rows": 0, "total_targets_found": 0} def _setup_logger(self): logging.basicConfig( filename=self.log_path, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', force=True ) self.logger = logging.getLogger(__name__) def _prepare_data(self): if self.header_list: headers = self.header_list data_rows = self.raw_data else: if not self.raw_data: raise ValueError("データが存在しません。") headers = self.raw_data[0] data_rows = self.raw_data[1:] return headers, data_rows def analyze(self): self.logger.info("解析を開始します。") headers, data_rows = self._prepare_data() processed_rows = [] for r_idx, row in enumerate(data_rows): row_info = { "original_row_idx": r_idx, "has_target": False, "cells": [] } for c_idx, cell_raw in enumerate(row): filtered_val = self.filter_func(cell_raw) is_target, target_indices = self.check_func(filtered_val) cell_info = { "value": filtered_val, "is_target": is_target, "target_indices": target_indices } row_info["cells"].append(cell_info) if is_target: row_info["has_target"] = True self.target_cols_indices.add(c_idx) self.summary_stats["total_targets_found"] += 1 processed_rows.append(row_info) if row_info["has_target"]: self.summary_stats["target_rows"] += 1 self.summary_stats["total_rows"] = len(processed_rows) self.headers = headers self.processed_data = processed_rows self.logger.info(f"解析完了: {self.summary_stats}") def _create_rich_text(self, text: str, target_indices: Set[int]) -> CellRichText: """【修正済み】InlineFontを使用したリッチテキスト生成""" if not target_indices: return CellRichText(text) rich_parts = [] default_font = InlineFont(color="000000") target_font = InlineFont(color=self.target_font_color, b=True) current_str = "" current_is_target = -1 for i, char in enumerate(text): is_target_char = 1 if i in target_indices else 0 if current_is_target == -1: current_is_target = is_target_char current_str += char elif current_is_target == is_target_char: current_str += char else: font_to_use = target_font if current_is_target == 1 else default_font rich_parts.append(TextBlock(font_to_use, current_str)) current_str = char current_is_target = is_target_char if current_str: font_to_use = target_font if current_is_target == 1 else default_font rich_parts.append(TextBlock(font_to_use, current_str)) return CellRichText(rich_parts) def export_excel(self): self.logger.info("Excel出力を開始します。") wb = openpyxl.Workbook() ws = wb.active ws.title = "CheckResult" # --- スタイル定義 --- # 1. 塗りつぶし fill_target = PatternFill(start_color=self.target_bg_color, end_color=self.target_bg_color, fill_type="solid") fill_header_target = PatternFill(start_color="FFD700", end_color="FFD700", fill_type="solid") # 列ターゲット(Gold) # 【追加】デフォルト背景色 fill_header_default = PatternFill(start_color=self.header_default_bg, end_color=self.header_default_bg, fill_type="solid") fill_no_col_default = PatternFill(start_color=self.no_col_default_bg, end_color=self.no_col_default_bg, fill_type="solid") # 【追加】枠線(格子線) thin_side = Side(style='thin', color="000000") border_all = Border(left=thin_side, right=thin_side, top=thin_side, bottom=thin_side) # --- 列・行のフィルタリング --- output_col_indices = [] for i in range(len(self.headers)): if self.col_mode == 'DELETE': if i in self.target_cols_indices: output_col_indices.append(i) else: output_col_indices.append(i) rows_to_write = [] for row_info in self.processed_data: if self.row_mode == 'DELETE' and not row_info["has_target"]: continue rows_to_write.append(row_info) # --- ヘッダー書き込み --- # A1: No.列ヘッダー cell_no = ws.cell(row=1, column=1, value="No.") cell_no.fill = fill_header_default cell_no.border = border_all excel_col_idx = 2 for original_col_idx in output_col_indices: cell = ws.cell(row=1, column=excel_col_idx, value=str(self.headers[original_col_idx])) # 色設定:ターゲット列ならGold、それ以外はデフォルトヘッダー色 if original_col_idx in self.target_cols_indices: cell.fill = fill_header_target else: cell.fill = fill_header_default # 枠線適用 cell.border = border_all # 列非表示設定 (HIDEモード) if self.col_mode == 'HIDE' and original_col_idx not in self.target_cols_indices: col_letter = get_column_letter(excel_col_idx) ws.column_dimensions[col_letter].hidden = True excel_col_idx += 1 # --- データ書き込み --- current_excel_row = 2 for row_info in rows_to_write: # 1. No.列 no_cell = ws.cell(row=current_excel_row, column=1, value=row_info["original_row_idx"] + 1) # 色設定:ターゲット行ならYellow、それ以外はデフォルトNo列色 if row_info["has_target"]: no_cell.fill = fill_target else: no_cell.fill = fill_no_col_default no_cell.border = border_all # 行非表示設定 (HIDEモード) if self.row_mode == 'HIDE' and not row_info["has_target"]: ws.row_dimensions[current_excel_row].hidden = True # 2. 実データ列 excel_col_idx = 2 for original_col_idx in output_col_indices: if original_col_idx < len(row_info["cells"]): cell_data = row_info["cells"][original_col_idx] cell = ws.cell(row=current_excel_row, column=excel_col_idx) # リッチテキスト if cell_data["is_target"] and cell_data["target_indices"]: cell.value = self._create_rich_text(cell_data["value"], cell_data["target_indices"]) else: cell.value = cell_data["value"] # 背景色 if cell_data["is_target"]: cell.fill = fill_target # 枠線 cell.border = border_all excel_col_idx += 1 current_excel_row += 1 try: wb.save(self.excel_path) self.logger.info(f"Excel保存成功: {self.excel_path}") except Exception as e: self.logger.error(f"Excel保存失敗: {e}") raise def export_summary(self): try: with open(self.summary_path, 'w', encoding='utf-8') as f: f.write("=== Summary Report ===\n") f.write(f"Executed at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"Total Rows Processed: {self.summary_stats['total_rows']}\n") f.write(f"Target Rows Found: {self.summary_stats['target_rows']}\n") f.write(f"Total Target Cells/Hits: {self.summary_stats['total_targets_found']}\n") self.logger.info(f"集計ファイル保存成功: {self.summary_path}") except Exception as e: self.logger.error(f"集計ファイル保存失敗: {e}") def execute(self): self.analyze() self.export_excel() self.export_summary() print(f"処理完了。ログを確認してください: {self.log_path}") # --- 実行例 --- if __name__ == "__main__": def my_filter(val): return str(val).strip() if val is not None else "" def my_check(text): target = "Error" indices = set() if target in text: # 簡易的な検索(重複考慮なし版) start = text.find(target) while start != -1: for i in range(start, start + len(target)): indices.add(i) start = text.find(target, start + 1) return True, indices return False, set() raw_data = [ ["ID", "Log Message", "Status"], [1, "System started", "OK"], [2, "Connection Error occurred", "Fail"], [3, "Process completed", "OK"], [4, "Unknown Error code 99", "Fail"] ] processor = ExcelDataProcessor( data_2d_list=raw_data, output_excel_file_path="result_v2.xlsx", output_log_file_path="process_v2.log", output_summary_file_path="summary_v2.txt", header_default_color="#B9B9B9", # ヘッダーのグレー no_column_default_color="#E0E0E0", # No列の薄いグレー target_cell_color="#FFFF00", # ターゲットセルの黄色 filter_function=my_filter, check_function=my_check ) processor.execute()
import os import shutil import hashlib import re import logging # ========================================== # 設定・定数 # ========================================== class ExitCode: SUCCESS = 0 ERR_NOT_FOUND = 11 ERR_HASH_CALC = 12 ERR_VERIFY_FAIL = 20 ERR_CLEANUP_FAIL = 31 ERR_CRITICAL = 32 class TransferMode(Enum): MOVE = 'move' # 成功時に元ファイルをBackupへ移動 COPY = 'copy' # 成功時に元ファイルを残す(またはBackupへコピー) # ========================================== # ユーティリティ関数 # ========================================== def calculate_file_hash(filepath, algorithm="sha256"): """ファイルのハッシュ値を計算する""" hash_func = hashlib.new(algorithm) chunk_size = 1024 * 8 try: with open(filepath, "rb") as f: while chunk := f.read(chunk_size): hash_func.update(chunk) return hash_func.hexdigest() except Exception as e: logging.error(f"ハッシュ計算エラー: {filepath} - {e}") return None def setup_logger(log_dir): """ログ設定""" if not os.path.exists(log_dir): os.makedirs(log_dir) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") log_file = os.path.join(log_dir, f"transfer_{timestamp}.log") logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.FileHandler(log_file, encoding='utf-8'), logging.StreamHandler() ] ) return log_file # ========================================== # メイン処理クラス # ========================================== class FileTransferProcessor: def __init__(self, input_dir, file_regex, output_dir, mode, log_dir): self.input_dir = input_dir self.file_pattern = re.compile(file_regex) self.output_dir = output_dir self.mode = mode # Backup/Errorフォルダの設定(図のStep7, 8に基づく) self.backup_dir = os.path.join(input_dir, 'backup') self.error_dir = os.path.join(output_dir, 'error') # ログ初期化 setup_logger(log_dir) def _ensure_dirs(self): """必要なディレクトリの作成""" os.makedirs(self.output_dir, exist_ok=True) os.makedirs(self.backup_dir, exist_ok=True) os.makedirs(self.error_dir, exist_ok=True) def process_single_file(self, filename): source_path = os.path.join(self.input_dir, filename) dest_path = os.path.join(self.output_dir, filename) logging.info(f"--- 処理開始: {filename} ---") # ------------------------------------------------- # ① 対象ファイルの存在を確認する # ------------------------------------------------- if not os.path.exists(source_path): logging.error(f"Step1: ファイルが存在しません: {source_path}") return ExitCode.ERR_NOT_FOUND # ------------------------------------------------- # ② 対象ファイルのハッシュ値記録 (Pre-Transfer) # ------------------------------------------------- src_hash_pre = calculate_file_hash(source_path) if src_hash_pre is None: logging.error("Step2: 事前ハッシュ取得失敗") return ExitCode.ERR_HASH_CALC logging.info(f"Step2: 元ハッシュ(Pre): {src_hash_pre}") # ------------------------------------------------- # ③ 対象ファイルのコピー # ------------------------------------------------- copy_success = False try: shutil.copy2(source_path, dest_path) copy_success = True logging.info("Step3: コピー実行完了") except Exception as e: logging.error(f"Step3: コピー処理中にエラー発生: {e}") copy_success = False # ------------------------------------------------- # ④ ②で記録したハッシュ値との照合 (Source再検証: コピー中の変更検出) # ------------------------------------------------- src_hash_post = calculate_file_hash(source_path) source_unchanged = (src_hash_pre == src_hash_post) if not source_unchanged: logging.warning(f"Step4: 警告!コピー中に元ファイルが変更されました。\nPre: {src_hash_pre}\nPost:{src_hash_post}") # ------------------------------------------------- # ⑤ ②で記録したハッシュ値との照合 (Dest検証: コピー成功確認) # ------------------------------------------------- dest_hash = None transfer_valid = False if os.path.exists(dest_path): dest_hash = calculate_file_hash(dest_path) transfer_valid = (src_hash_pre == dest_hash) if not transfer_valid: logging.warning(f"Step5: 警告!転送先ファイルのハッシュが一致しません。\nSrc: {src_hash_pre}\nDst: {dest_hash}") # ------------------------------------------------- # ⑥ 判定 (正常終了/差異なしを判定) # ------------------------------------------------- is_success = copy_success and source_unchanged and transfer_valid if is_success: logging.info("Step6: 判定OK - 全整合性確認完了") # ------------------------------------------------- # ⑦ コピー元ファイルを削除/Backupに移動 (成功時) # ------------------------------------------------- try: if self.mode == TransferMode.MOVE: # Moveモード: Backupへ移動(または削除) shutil.move(source_path, os.path.join(self.backup_dir, filename)) logging.info(f"Step7: 元ファイルをBackupへ移動しました: {filename}") else: # Copyモード: 何もしない(あるいは済スタンプを押すなど) logging.info("Step7: Copyモードのため元ファイルは保持されます") return ExitCode.SUCCESS except Exception as e: logging.error(f"Step7: 事後処理(Backup移動)でエラー: {e}") return ExitCode.ERR_CLEANUP_FAIL else: logging.error("Step6: 判定NG - 転送または検証に失敗しました") # ------------------------------------------------- # ⑧ 例外処理: コピーしたファイルを削除/Errorに移動 (失敗時) # ------------------------------------------------- try: if os.path.exists(dest_path): target_error_path = os.path.join(self.error_dir, filename) # 既に同名エラーファイルがある場合のハンドリング(上書き) if os.path.exists(target_error_path): os.remove(target_error_path) shutil.move(dest_path, target_error_path) logging.info(f"Step8: 不完全な転送ファイルをErrorへ移動しました: {filename}") else: logging.info("Step8:転送先ファイルが存在しないため移動処理スキップ") # エラーの原因に応じたコードを返す(ここでは簡略化して汎用エラー) return ExitCode.ERR_VERIFY_FAIL except Exception as e: logging.error(f"Step8: エラー処理中の致命的エラー: {e}") return ExitCode.ERR_CRITICAL def run(self): logging.info("=== ファイル転送バッチ開始 ===") self._ensure_dirs() # 入力ディレクトリ走査 try: files = os.listdir(self.input_dir) except FileNotFoundError: logging.critical(f"入力ディレクトリが見つかりません: {self.input_dir}") return target_files = [f for f in files if self.file_pattern.match(f)] logging.info(f"対象ファイル数: {len(target_files)}") for filename in target_files: result = self.process_single_file(filename) if result == ExitCode.SUCCESS: logging.info(f"[{filename}] 完了: SUCCESS") else: logging.error(f"[{filename}] 終了: ERROR Code {result}") logging.info("=== ファイル転送バッチ終了 ===") # ========================================== # エントリーポイント # ========================================== if __name__ == "__main__": # 引数パーサーの設定 parser = argparse.ArgumentParser(description='堅牢なファイル転送プログラム') # 入力必須項目 parser.add_argument('--input_dir', required=True, help='入力ディレクトリパス') parser.add_argument('--regex', required=True, help='入力ファイル正規表現 (例: ".*\.txt")') parser.add_argument('--output_dir', required=True, help='出力ディレクトリパス') parser.add_argument('--log_dir', required=True, help='ログ出力ディレクトリ') # オプション項目 parser.add_argument('--mode', choices=['move', 'copy'], default='copy', help='動作モード (move: 元ファイルをBackupへ移動, copy: 元ファイルを維持)') args = parser.parse_args() mode_enum = TransferMode.MOVE if args.mode == 'move' else TransferMode.COPY processor = FileTransferProcessor( input_dir=args.input_dir, file_regex=args.regex, output_dir=args.output_dir, mode=mode_enum, log_dir=args.log_dir ) processor.run()
| 構成日時 | : | 2026-04-20 00:17:12 |
| 現在ページ番号 | : | 1 |
| 最大ページ数 | : | 1 |
| 最古メッセージ日時 | : | 2024-10-22 21:08:35 |
| 最新メッセージ日時 | : | 2026-02-08 21:06:53 |
| メインスレッド数 | : | 50 |
| サブスレッド数 | : | 31 |
| 推定ページサイズ | : | 225.544KiB |