Skip to content

Commit

Permalink
feat: Add additional scheduled pipelines client getters and unit tests.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 540643400
  • Loading branch information
vertex-sdk-bot authored and copybara-github committed Jun 15, 2023
1 parent 8191035 commit 9371b4f
Show file tree
Hide file tree
Showing 2 changed files with 248 additions and 0 deletions.
40 changes: 40 additions & 0 deletions google/cloud/aiplatform/preview/schedule/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,46 @@ def state(self) -> Optional[gca_schedule.Schedule.State]:
self._sync_gca_resource()
return self._gca_resource.state

@property
def max_run_count(self) -> int:
"""Current Schedule max_run_count.
Returns:
Schedule max_run_count.
"""
self._sync_gca_resource()
return self._gca_resource.max_run_count

@property
def cron_expression(self) -> str:
"""Current Schedule cron expression.
Returns:
Schedule cron expression.
"""
self._sync_gca_resource()
return self._gca_resource.cron

@property
def max_concurrent_run_count(self) -> int:
"""Current Schedule max_concurrent_run_count.
Returns:
Schedule max_concurrent_run_count.
"""
self._sync_gca_resource()
return self._gca_resource.max_concurrent_run_count

@property
def allow_queueing(self) -> bool:
"""Whether current Schedule allows queueing.
Returns:
Schedule allow_queueing.
"""
self._sync_gca_resource()
return self._gca_resource.allow_queueing

def _block_until_complete(self) -> None:
"""Helper method to block and check on Schedule until complete."""
# Used these numbers so failures surface fast
Expand Down
208 changes: 208 additions & 0 deletions tests/unit/aiplatform/test_pipeline_job_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -1377,3 +1377,211 @@ def test_call_schedule_service_update_before_create(
assert e.match(
regexp=r"Not updating PipelineJobSchedule: PipelineJobSchedule must be active or completed."
)

@pytest.mark.parametrize(
"job_spec",
[_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB],
)
def test_get_max_run_count_before_create(
self,
mock_schedule_service_create,
mock_schedule_service_get,
mock_schedule_bucket_exists,
job_spec,
mock_load_yaml_and_json,
):
"""Gets the PipelineJobSchedule max_run_count before creating.
Raises error because PipelineJobSchedule should be created first.
"""
aiplatform.init(
project=_TEST_PROJECT,
staging_bucket=_TEST_GCS_BUCKET_NAME,
location=_TEST_LOCATION,
credentials=_TEST_CREDENTIALS,
)

job = pipeline_jobs.PipelineJob(
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
template_path=_TEST_TEMPLATE_PATH,
parameter_values=_TEST_PIPELINE_PARAMETER_VALUES,
input_artifacts=_TEST_PIPELINE_INPUT_ARTIFACTS,
enable_caching=True,
)

pipeline_job_schedule = pipeline_job_schedules.PipelineJobSchedule(
pipeline_job=job,
display_name=_TEST_PIPELINE_JOB_SCHEDULE_DISPLAY_NAME,
)

with pytest.raises(RuntimeError) as e:
pipeline_job_schedule.max_run_count

assert e.match(regexp=r"PipelineJobSchedule resource has not been created.")

pipeline_job_schedule.create(
cron_expression=_TEST_PIPELINE_JOB_SCHEDULE_CRON_EXPRESSION,
max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT,
max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT,
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
create_request_timeout=None,
)

pipeline_job_schedule.max_run_count

@pytest.mark.parametrize(
"job_spec",
[_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB],
)
def test_get_cron_expression_before_create(
self,
mock_schedule_service_create,
mock_schedule_service_get,
mock_schedule_bucket_exists,
job_spec,
mock_load_yaml_and_json,
):
"""Gets the PipelineJobSchedule cron before creating.
Raises error because PipelineJobSchedule should be created first.
"""
aiplatform.init(
project=_TEST_PROJECT,
staging_bucket=_TEST_GCS_BUCKET_NAME,
location=_TEST_LOCATION,
credentials=_TEST_CREDENTIALS,
)

job = pipeline_jobs.PipelineJob(
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
template_path=_TEST_TEMPLATE_PATH,
parameter_values=_TEST_PIPELINE_PARAMETER_VALUES,
input_artifacts=_TEST_PIPELINE_INPUT_ARTIFACTS,
enable_caching=True,
)

pipeline_job_schedule = pipeline_job_schedules.PipelineJobSchedule(
pipeline_job=job,
display_name=_TEST_PIPELINE_JOB_SCHEDULE_DISPLAY_NAME,
)

with pytest.raises(RuntimeError) as e:
pipeline_job_schedule.cron_expression

assert e.match(regexp=r"PipelineJobSchedule resource has not been created.")

pipeline_job_schedule.create(
cron_expression=_TEST_PIPELINE_JOB_SCHEDULE_CRON_EXPRESSION,
max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT,
max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT,
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
create_request_timeout=None,
)

pipeline_job_schedule.cron_expression

@pytest.mark.parametrize(
"job_spec",
[_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB],
)
def test_get_max_concurrent_run_count_before_create(
self,
mock_schedule_service_create,
mock_schedule_service_get,
mock_schedule_bucket_exists,
job_spec,
mock_load_yaml_and_json,
):
"""Gets the PipelineJobSchedule max_concurrent_run_count before creating.
Raises error because PipelineJobSchedule should be created first.
"""
aiplatform.init(
project=_TEST_PROJECT,
staging_bucket=_TEST_GCS_BUCKET_NAME,
location=_TEST_LOCATION,
credentials=_TEST_CREDENTIALS,
)

job = pipeline_jobs.PipelineJob(
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
template_path=_TEST_TEMPLATE_PATH,
parameter_values=_TEST_PIPELINE_PARAMETER_VALUES,
input_artifacts=_TEST_PIPELINE_INPUT_ARTIFACTS,
enable_caching=True,
)

pipeline_job_schedule = pipeline_job_schedules.PipelineJobSchedule(
pipeline_job=job,
display_name=_TEST_PIPELINE_JOB_SCHEDULE_DISPLAY_NAME,
)

with pytest.raises(RuntimeError) as e:
pipeline_job_schedule.max_concurrent_run_count

assert e.match(regexp=r"PipelineJobSchedule resource has not been created.")

pipeline_job_schedule.create(
cron_expression=_TEST_PIPELINE_JOB_SCHEDULE_CRON_EXPRESSION,
max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT,
max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT,
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
create_request_timeout=None,
)

pipeline_job_schedule.max_concurrent_run_count

@pytest.mark.parametrize(
"job_spec",
[_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB],
)
def test_get_allow_queueing_before_create(
self,
mock_schedule_service_create,
mock_schedule_service_get,
mock_schedule_bucket_exists,
job_spec,
mock_load_yaml_and_json,
):
"""Gets the PipelineJobSchedule allow_queueing before creating.
Raises error because PipelineJobSchedule should be created first.
"""
aiplatform.init(
project=_TEST_PROJECT,
staging_bucket=_TEST_GCS_BUCKET_NAME,
location=_TEST_LOCATION,
credentials=_TEST_CREDENTIALS,
)

job = pipeline_jobs.PipelineJob(
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
template_path=_TEST_TEMPLATE_PATH,
parameter_values=_TEST_PIPELINE_PARAMETER_VALUES,
input_artifacts=_TEST_PIPELINE_INPUT_ARTIFACTS,
enable_caching=True,
)

pipeline_job_schedule = pipeline_job_schedules.PipelineJobSchedule(
pipeline_job=job,
display_name=_TEST_PIPELINE_JOB_SCHEDULE_DISPLAY_NAME,
)

with pytest.raises(RuntimeError) as e:
pipeline_job_schedule.allow_queueing

assert e.match(regexp=r"PipelineJobSchedule resource has not been created.")

pipeline_job_schedule.create(
cron_expression=_TEST_PIPELINE_JOB_SCHEDULE_CRON_EXPRESSION,
max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT,
max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT,
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
create_request_timeout=None,
)

pipeline_job_schedule.allow_queueing

0 comments on commit 9371b4f

Please sign in to comment.