Jenkinsのpipelineを使うと、Jenkinsfileとしてジョブの定義・連携をGroovyで柔軟に記述できるようになります。そこでは例えば「変数を用いてノードやステージを横断する連携を実装する」「いろいろな外部スクリプトを実行して最適な実行条件を分析する」といった処理を容易に実装できます。
今回はその具体例として、pytestのテスト実行時間の最適化を行うJenkinsfileを題材にします。
pytestのテスト並列実行の時間を最適化する
対象は、pytestの2並列実行のジョブです。具体的には、Jenkins Pipelineで指定されたgitリポジトリから必要なファイルをチェックアウトし、その中のpytestのテストケースをノードサーバで実行して、結果をプッシュ& Jenkins Test Report出力する処理とします。実現にあたっては、(仮にテストケースが長大であるとして)ノードサーバを2つ用意し、テストを2分割してそれぞれのノードで並列実行させることで、テスト実行時間の短縮を狙います。
このテストの2分割ですが、旧来の静的なジョブ定義では、テストケース数などで事前に静的分割せざるを得ない場合が多いです。一方でJenkins Pipelineを用いると、テスト実行時間が均等に分割されるように、テストケースを動的に2分割する処理を容易に実装できます。
Jenkinsfileの実装
そのJenkinsfileの具体例を示します。ここでは以下を実装しています。
- Prepareステージで、create_test_selection_file.py(実装例は後述)をシェル実行し、その標準出力をtest_selectionに格納します。
- Testステージで、2つのノード:node1、node2にて、pytestのテストケースを並行実行します。pytestには、前ステージで得たtest_selectionを引数として指定します。
- Reportステージで、pytestが生成したテストレポートファイルのプッシュと、Jenkins Test Reportへの出力を行います。
まとめると外に用意した集計スクリプトを呼び出して、ノードで実行するテストケースを動的に選択しています。
Jenkinsfileの実装例:
pipeline {
environment {
test_selection = ""
}
agent any
stages {
stage('Prepare') {
steps {
script {
def output = sh (
script: 'python create_test_selection_file.py',
returnStdout: true
).trim()
test_selection = output.split(',')
}
}
}
stage('Test') {
steps {
parallel 'automation test':{
node('node1') {
checkout scm
sh "pytest ${test_selection[0]} --junitxml=testresult/result1.xml"
sh "python push_test_report.py testresult"
}
node('node2') {
checkout scm
sh "pytest ${test_selection[1]} --junitxml=testresult/result2.xml"
sh "python push_test_report.py testresult"
}
}
}
}
stage('Report'){
steps {
git branch: 'main', url: 'git://hoge/integration-test.git'
}
post {
always {
junit 'intagration_test/testresult/*.xml'
}
}
}
}
}
create_test_selection_file.pyの実装
(主題ではないため簡易的なサンプルですが)例えば以下のようなものになります。
import glob import xml.etree.ElementTree as ET import numpy as np NUMBER_OF_TESTNODE = 2 def select_test_by_time(test_files, resultfiles): test_result = {} all_time = 0.0 for file in resultfiles: root = ET.parse(file).getroot() all_time += float(root.attrib['time']) for testcase in root.iter('testcase'): test_result[testcase.attrib['file']] = test_result.get(testcase.attrib['file'], 0) + float(testcase.attrib['time']) testsuite_time = 0.0 node_no = 0 output = "" for key, value in test_result.items(): testsuite_time += value output += key + ' ' if testsuite_time > all_time / NUMBER_OF_TESTNODE and node_no < NUMBER_OF_TESTNODE - 1: testsuite_time = 0 output += "," node_no += 1 output = output.replace('\\','/') print(output) def select_test_by_number(test_files): test_group = list(np.array_split(test_files, NUMBER_OF_TESTNODE)) output = '' for tests in test_group: output += ' '.join(tests) + ',' output = output.replace('\\','/') print(output) def create_test_selection_file(): test_file = 'test/test*.py' input_file = 'testresult/*.xml' test_files = glob.glob(test_file) resultfiles = glob.glob(input_file) if resultfiles: select_test_by_time(test_files, resultfiles) else: select_test_by_number(glob.glob(test_file)) if __name__ == '__main__': create_test_selection_file()