全体の作業工程
1. 学習用画像取得
2. 学習用画像切り出し
3. 学習用画像にラベルの付与
4. モデル作成( PyTorch & 深層学習プログラミング 赤石雅典著 日経BP社 12章カスタムデータの画像分類 )
5. モデルの保存(CPUモードで実施)
6. Pythonスクリプト、シェルスクリプト作成
mac OS: Monterey 12.7.6
前提:
メール送信(mac)設定済み
下記をrootのcrontabに設定済
*/10 * * * * /usr/sbin/postfix start
シェル名: check_web_updates_v3.sh
②画像を分割して分割後格納先に格納する
③下記条件に該当する画像のみターゲット判定ディレクトリに格納する
・soldでない
・前回存在しない
・高額でない
④ターゲット判定実行
⑤ターゲットがある場合、メール送信
⑥前回取得分を削除、今回ファイルを、前回ファイルにリネーム
注意事項:
OS再起動した場合、一度画面からログインしないとchromeを起動できない
shellでは全コマンドを原則、フルパスで記載する必要あり( cronで実行エラーとなるため )
confファイル:
check_web_updates_v3.conf
画像ファイル名:
<URLキー>_af.png
<URLキー>_bf.png
IMAGE_DIR_NAME=/Volumes/share/Image_v3
分割後ファイル格納先
${IMAGE_DIR_NAME}/split_af
${IMAGE_DIR_NAME}/split_bf
分割後ファイル名:
x=1,2,3,4,5
分割後ターゲット判定ファイル格納先
${IMAGE_DIR_NAME}/check/dummy/
■(1) スクリーンショットを取得するPythonスクリプト
python3.11
cat <<-'EOF' > get_screenshots_v3.py
from selenium import webdriver
import chromedriver_binary
import time
import csv
import os
from selenium.webdriver.chrome.options import Options
from PIL import Image
base_dir_name = '/Users/testuser/check_web_updates_v3'
image_dir_name = '/Volumes/share/Image_v3'
options = Options()
options.add_argument('--headless')
# driver = webdriver.Chrome()
driver = webdriver.Chrome(options = options)
with open(base_dir_name + "/" + "check_web_updates_v3.conf", "r", encoding="utf8") as csv_file:
f = csv.reader(csv_file, delimiter=",", doublequote=True, lineterminator="\n", quotechar='"', skipinitialspace=False)
for row in f:
key = row[0]
header = int(row[1])
threshold = float(row[2])
url = row[3]
driver.get(url)
driver.maximize_window()
time.sleep(10)
# ヘッダー変動画像回避
if header > 0:
driver.execute_script("window.scrollBy(0, " + str(header) + ");")
time.sleep(5)
driver.save_screenshot(image_dir_name +'/' + key + '_af.png')
if 'xxx' in key:
im = Image.open(image_dir_name +'/' + key + '_af.png')
im_crop = im.crop((280, 200, 1200, 420))
im_crop.save(image_dir_name +'/' + key + '_af.png', quality=95)
driver.close()
EOF
pip3.11 install selenium
pip3.11 install chromedriver_binary
python3.11 get_screenshots_v3.py
python3.11
cat <<-'EOF' > split_screenshots_v3.py
import cv2
import numpy as np
import os
import glob
import csv
base_dir_name = '/Users/testuser/check_web_updates_v3'
image_dir_name = '/Volumes/share/Image_v3'
def crop_img(key,count):
src = cv2.imread(image_dir_name +'/' + key + '_af.png', cv2.IMREAD_COLOR)
gray = cv2.cvtColor(src, cv2.COLOR_BGR2GRAY)
retval, bw = cv2.threshold(gray, 50, 255, cv2.THRESH_BINARY)
contours, hierarchy = cv2.findContours(bw, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE)
for i in range(0, len(contours)):
area = cv2.contourArea(contours[i])
if area > 3000:
print(area)
rect = contours[i]
x, y, w, h = cv2.boundingRect(rect)
print(w,h)
if w > 147 and w < 149 or h > 147 and h < 149:
cv2.imwrite(image_dir_name +'/split_af/' + key +'_' + str(count) + '.png', src[y:y + h, x:x + w])
cv2.imwrite(image_dir_name +'/check/dummy/' + key +'_' + str(count) + '.png', src[y:y + h, x:x + w])
count = count + 1
return count
with open(base_dir_name + "/" + "check_web_updates_v3.conf", "r", encoding="utf8") as csv_file:
f = csv.reader(csv_file, delimiter=",", doublequote=True, lineterminator="\n", quotechar='"', skipinitialspace=False)
for row in f:
key = row[0]
print(key)
count = 1
count = crop_img(key,count)
print(count)
EOF
python3.11 split_screenshots_v3.py
python3.11
cat <<-'EOF' > del_sold_v3.py
from PIL import Image
import numpy as np
import csv
import glob
import os
base_dir_name = '/Users/testuser/check_web_updates_v3'
image_dir_name = '/Volumes/share/Image_v3'
red=np.array([255,2,17])
with open(base_dir_name + "/" + "check_web_updates_v3.conf", "r", encoding="utf8") as csv_file:
f = csv.reader(csv_file, delimiter=",", doublequote=True, lineterminator="\n", quotechar='"', skipinitialspace=False)
for row in f:
key = row[0]
files = glob.glob(image_dir_name +'/check/dummy/' + key +'_' + '*' + '.png')
for f2 in files:
im = Image.open(f2)
array1 = np.array(im)
flg=0
for i in range(0,40):
for j in range(41-i,45-i):
#print(array1[i][j])
if (array1[i][j]!=red).any():
flg=1
if flg == 0:
#ファイル削除
os.remove(f2)
EOF
python3.11 del_sold_v3.py
python3.11
cat <<-'EOF' > del_not_new_v3.py
import numpy as np
import os
import csv
import sys
from PIL import Image
import glob
base_dir_name = '/Users/testuser/check_web_updates_v3'
image_dir_name = '/Volumes/share/Image_v3'
with open(base_dir_name + "/" + "check_web_updates_v3.conf", "r", encoding="utf8") as csv_file:
f = csv.reader(csv_file, delimiter=",", doublequote=True, lineterminator="\n", quotechar='"', skipinitialspace=False)
for row in f:
key = row[0]
threshold = float(row[2])
files2 = glob.glob(image_dir_name +'/check/dummy/' + key +'_' + '*' + '.png')
for f2 in files2:
image2 = Image.open(f2)
image2 = image2.resize((148,148))
array2 = np.array(image2)
files3 = glob.glob(image_dir_name +'/split_bf/' + key +'_' + '*' + '.png')
for f3 in files3:
image3 = Image.open(f3)
image3 = image3.resize((148,148))
array3 = np.array(image3)
similarity = np.count_nonzero(array2 == array3) / np.count_nonzero(array2 == array2)
if similarity > threshold :
os.remove(f2)
break
# 画素値での比較ではうまく重複除去できない場合があるため、ヒストグラムによる方法も併用する
import cv2, os
with open(base_dir_name + "/" + "check_web_updates_v3.conf", "r", encoding="utf8") as csv_file:
f = csv.reader(csv_file, delimiter=",", doublequote=True, lineterminator="\n", quotechar='"', skipinitialspace=False)
for row in f:
key = row[0]
threshold = float(row[2])
files2 = glob.glob(image_dir_name +'/check/dummy/' + key +'_' + '*' + '.png')
for f2 in files2:
image2 = cv2.imread(f2)
image2 = cv2.resize(image2, (148,148))
image2_hist = cv2.calcHist([image2], [2], None, [256], [0, 256])
files3 = glob.glob(image_dir_name +'/split_bf/' + key +'_' + '*' + '.png')
for f3 in files3:
image3 = cv2.imread(f3)
image3 = cv2.resize(image3, (148,148))
image3_hist = cv2.calcHist([image3], [2], None, [256], [0, 256])
similarity = cv2.compareHist(image2_hist, image3_hist, 0)
if similarity > threshold :
os.remove(f2)
break
EOF
python3.11 del_not_new_v3.py
python3.11
cat <<-'EOF' > check_target_v3.py
import numpy as np
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import torchvision.datasets as datasets
from torchvision import models
import os
base_dir_name = '/Users/testuser/check_web_updates_v3'
image_dir_name = '/Volumes/share/Image_v3'
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
# 検証データ用 : 正規化のみ実施
test_transform = transforms.Compose([
transforms.Resize(224),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(0.5, 0.5)
])
net = models.vgg19_bn(pretrained = False)
# 最終ノードの出力を2に変更する
in_features = net.classifier[6].in_features
net.classifier[6] = nn.Linear(in_features, 2)
net = net.to(device)
classes = ['target', 'other']
net.load_state_dict(torch.load(base_dir_name + "/" + "model_cpu.pth"))
###
import os
from torch.utils.data import Dataset
from PIL import Image
class CustomDataset(Dataset):
def __init__(self, root_dir, transform=None):
self.root_dir = root_dir
self.transform = transform
self.filenames = []
for filename in os.listdir(root_dir):
if filename.endswith(('.png', '.jpg', '.jpeg')):
self.filenames.append(os.path.join(root_dir, filename))
def __len__(self):
return len(self.filenames)
def __getitem__(self, idx):
image_path = self.filenames[idx]
image = Image.open(image_path).convert('RGB')
if self.transform:
image = self.transform(image)
return image, image_path
###
#test_data3 = datasets.ImageFolder(image_dir_name +'/check',transform=test_transform)
test_data3 = CustomDataset(root_dir=image_dir_name +'/check/dummy', transform=test_transform)
test_loader3 = DataLoader(test_data3,batch_size=100, shuffle=False)
if len(test_loader3) == 0:
print("DataLoader is empty!")
exit()
for images, filenames in test_loader3:
break
n_size = len(images)
inputs = images.to(device)
# 予測計算
outputs = net(inputs)
predicted = torch.max(outputs,1)[1]
for i in range(n_size):
print(filenames[i])
predicted_name = classes[predicted[i]]
print(predicted_name)
if predicted_name == 'other':
os.remove(filenames[i])
EOF
python3.11 check_target_v3.py
cat <<-'EOF' > del_high_price_v3.py
from PIL import Image
import numpy as np
import csv
import glob
import os
import pytesseract
import re
from PIL import ImageOps
base_dir_name = '/Users/testuser/check_web_updates_v3'
image_dir_name = '/Volumes/share/Image_v3'
pytesseract.pytesseract.tesseract_cmd = r'/usr/local/bin/tesseract'
with open(base_dir_name + "/" + "check_web_updates_v3.conf", "r", encoding="utf8") as csv_file:
f = csv.reader(csv_file, delimiter=",", doublequote=True, lineterminator="\n", quotechar='"', skipinitialspace=False)
for row in f:
key = row[0]
files = glob.glob(image_dir_name +'/check/dummy/' + key +'_' + '*' + '.png')
for f2 in files:
print(f2)
img = Image.open(f2)
# リサイズ、グレースケール変換
img_resize = img.resize((256, 256))
im_grayscale = img_resize.convert('L')
# 二値化処理、反転
binary_img = im_grayscale.point(lambda x: 0 if x < 230 else 255, '1')
img_inverted = ImageOps.invert(binary_img.convert('RGB'))
#切り出し
im_crop = img_inverted.crop((30, 190, 200, 230))
text = pytesseract.image_to_string(im_crop, lang='eng', config='--psm 3 --oem 1')
text2 = text.split("\n")
print(text2)
# 複数行の場合がある。その場合は、長さが最大の行(最大の行が複数ある場合は1件目) を取得する
maxlen = 0
for r in text2:
if maxlen < len(r):
maxr = r
maxlen = len(r)
print(maxr,maxlen)
#100円以上
if maxlen > 3:
# 数字以外を削除
text3 = maxr.replace(",", "")
text4 = text3.replace(".", "")
text5 = re.sub(r'\D', ' ', text4)
print(text5)
text6 = text5.lstrip().split(" ")[0]
print(text6)
# 閾値より大きい場合は削除
if text6.isdigit():
if int(text6) > 999999:
os.remove(f2)
EOF
python3.11 del_high_price_v3.py
■本体シェル
cat <<-'EOF' > check_web_updates_v3.sh
#!/bin/bash
BASE_DIR_NAME=/Users/testuser/check_web_updates_v3
IMAGE_DIR_NAME=/Volumes/share/Image_v3
# 開始ログ
/bin/echo "[$(date '+%Y/%m/%d %H:%M:%S')] job start" >> ${BASE_DIR_NAME}/check_web_updates_v3.log
# 1. PCが起動していない場合、何もせずに終了する
/sbin/ping -c 1 10.11.21.1
if [ "$?" -ne "0" ] ; then
exit 0
fi
# 2. Image_DIRをマウント
/usr/bin/open "smb://admin:admin@10.11.21.4/share"
/bin/sleep 20
# 3. 前回画像ファイルを削除、今回画像ファイルを、前回画像ファイルにリネーム
for i in $(/bin/cat ${BASE_DIR_NAME}/check_web_updates_v3.conf) ; do
key=${i%%,*}
/bin/rm -rf "${IMAGE_DIR_NAME}/${key}_bf.png"
/bin/mv "${IMAGE_DIR_NAME}/${key}_af.png" "${IMAGE_DIR_NAME}/${key}_bf.png"
done
# 分割後ファイルについても同様に前回分を削除、今回分を前回分にコピー
/bin/rm -rf "${IMAGE_DIR_NAME}"/split_bf/*.png
/bin/mv "${IMAGE_DIR_NAME}"/split_af/*.png "${IMAGE_DIR_NAME}"/split_bf/
# ターゲット判定ディレクトリをクリア
/bin/rm -rf "${IMAGE_DIR_NAME}"/check/dummy/*.png
# 4. スクリーンショット取得
/usr/local/bin/python3.11 ${BASE_DIR_NAME}/get_screenshots_v3.py
# 5. 画像分割
/usr/local/bin/python3.11 ${BASE_DIR_NAME}/split_screenshots_v3.py
# 6. sold除外
/usr/local/bin/python3.11 ${BASE_DIR_NAME}/del_sold_v3.py
# 7. 既出除外
/usr/local/bin/python3.11 ${BASE_DIR_NAME}/del_not_new_v3.py
# 8. ターゲット以外除外
/usr/local/bin/python3.11 ${BASE_DIR_NAME}/check_target_v3.py
# 高額除外
/usr/local/bin/python3.11 ${BASE_DIR_NAME}/del_high_price_v3.py
# 9. 該当がある場合、本文に記載しメール送信
result2=""
result3=""
for i in $(/bin/ls -1 "${IMAGE_DIR_NAME}"/check/dummy/*.png); do
#echo $i
result2=${result2}"\\n"$(basename "$i")
result3=${result3}" "$(basename "$i")
done
builtin echo -e $result2
builtin echo -e $result3
: > "${BASE_DIR_NAME}"/wk
if [ -n "${result2}" ] ; then
builtin echo -e "${result2}" >> "${BASE_DIR_NAME}"/wk
for filename in $(echo "${result3}") ; do
/usr/bin/uuencode "${IMAGE_DIR_NAME}/check/dummy/${filename}" "${filename}" >> "${BASE_DIR_NAME}"/wk
done
/bin/cat "${BASE_DIR_NAME}"/wk | /usr/bin/mail -s "website_updated_v3" hoge@example.com
fi
# 10. Image_DIRをアンマウント
/sbin/umount /Volumes/share
# 終了ログ
/bin/echo "[$(date '+%Y/%m/%d %H:%M:%S')] job end" >> ${BASE_DIR_NAME}/check_web_updates_v3.log
exit 0
EOF
chmod +x check_web_updates_v3.sh
./check_web_updates_v3.sh
■クーロン登録
crontab -e
15,45 * * * * /Users/testuser/check_web_updates_v3/check_web_updates_v3.sh
crontab -l
log stream --info --predicate 'process == "cron"'
log show --info --predicate 'process == "cron"' --start '2017-05-25'