$clusterSubscription = $env:HILO_CLUSTER_SUBSCRIPTION $clusterResourceGroup = $env:HILO_CLUSTER_RESOURCE_GROUP $hiloClusterPool = $env:HILO_CLUSTER_POOL $hiloClusterName = $env:HILO_CLUSTER_NAME $tenantId = $env:TENANT_ID $clientId = $env:CLIENT_ID $clientSecret = $env:CLIENT_SECRET $storageAccount = $env:CLUSTER_STORAGE_ACCOUNT $storageContainer = $env:CLUSTER_STORAGE_CONTAINER $jobJarPath = $env:JOB_JAR_STORAGE_PATH $jobName = $env:JOB_NAME $parallelism = $env:PARALLELISM $entryclass = $env:ENTRY_CLASS $action = $env:JOB_ACTION # Get the access token az login --service-principal -u $clientId -p $clientSecret --tenant $tenantId $token = az account get-access-token --resource=https://management.azure.com/ | ConvertFrom-Json $tok = $token.accesstoken $restUri = 'https://management.azure.com/subscriptions/' + $clusterSubscription + '/resourceGroups/' + $clusterResourceGroup + '/providers/Microsoft.HDInsight/clusterpools/' + $hiloClusterPool + '/clusters/' + $hiloClusterName + '/jobs?api-version=2021-09-15-preview' $data = Invoke-RestMethod -Uri $restUri -Method GET -Headers @{ Authorization = "Bearer $tok" } # Find the "azure-pipeline" job and extract its status $azurePipelineJob = $data.value | Where-Object { $_.properties.jobName -eq $jobName } $azurePipelineJobStatus = $azurePipelineJob.properties.status # Common properties for job operations $jsonData = @{ properties = @{ jobType = "FlinkJob" jobName = "$jobName" } } if ($action -eq "SAVEPOINT" -and $azurePipelineJobStatus -eq "RUNNING") { Write-Host "The $jobName job is currently running. Taking savepoint" $jsonData.properties.action = "SAVEPOINT" } elseif ($action -eq "DELETE") { Write-Host "Deleting job $jobName" $jsonData.properties.action = "DELETE" } else { $jarPath = (Split-Path $jobJarPath -Parent) $jarStorageUrl = "abfs://{0}@{1}.dfs.core.windows.net/{2}" -f $storageContainer, $storageAccount, $jarPath $jarName = (Split-Path $jobJarPath -Leaf) if ($azurePipelineJobStatus -eq "RUNNING") { Write-Host "The $jobName job is currently running. Updating current job" $jsonData.properties.action = "UPDATE" } else { Write-Host "The $jobName job is not running. Launching new job" $jsonData.properties.action = "NEW" $jsonData.properties.jobJarDirectory = $jarStorageUrl $jsonData.properties.jarName = $jarName $jsonData.properties.entryClass = $entryclass $jsonData.properties.flinkConfiguration = @{ parallelism = $parallelism } } } # Convert the data to JSON $jsonString = $jsonData | ConvertTo-Json # JSON data to be sent in the request body $restUri = 'https://management.azure.com/subscriptions/' + $clusterSubscription + '/resourceGroups/' + $clusterResourceGroup + '/providers/Microsoft.HDInsight/clusterpools/' + $hiloClusterPool + '/clusters/' + $hiloClusterName + '/runJob?api-version=2021-09-15-preview' $response = Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json" Write-Host "API Response: $($response | ConvertTo-Json -Depth 4)" #Check status do { # Make the API call to get job data $restUri = 'https://management.azure.com/subscriptions/' + $clusterSubscription + '/resourceGroups/' + $clusterResourceGroup + '/providers/Microsoft.HDInsight/clusterpools/' + $hiloClusterPool + '/clusters/' + $hiloClusterName + '/jobs?api-version=2021-09-15-preview' $data = Invoke-RestMethod -Uri $restUri -Method GET -Headers @{ Authorization = "Bearer $tok" } # Find the "azure-pipeline" job and extract its status $azurePipelineJob = $data.value | Where-Object { $_.properties.jobName -eq $jobName } $azurePipelineActionResult = $azurePipelineJob.properties.actionResult $azurePipelineActionOutput = $azurePipelineJob.properties.jobOutput Write-Host "The job action result : $azurePipelineActionOutput , and action result is $azurePipelineActionResult" if ($azurePipelineActionResult -eq "FAILED") { Write-Host "The job $jobName last action has failed." exit 1 } if ($azurePipelineActionResult -eq "IN_PROGRESS") { # Sleep for a while before making the next API call Start-Sleep -Seconds 10 # Adjust the sleep interval as needed } } while ($azurePipelineActionResult -eq "IN_PROGRESS") Write-Host "The job $jobName operation $action successful."