Я работаю над созданием подключаемого модуля Logstash Input для использования ADAL для интеграции с API-интерфейсами управления активностью Office 365. Я написал отдельные компоненты для получения токена, использования этого токена для подписки и получения данных журнала активности.
Сейчас я работаю над интеграцией в инфраструктуру Logstash и сталкиваюсь с проблемами, когда Logstash жалуется на то, что не знает, что такое ADAL, даже если он мне необходим.
Все тот же код работает независимо от Logstash, только не внутри класса плагина.
Это мой первый набег на Руби, так что я довольно озадачен. Любая помощь?
Сообщение об ошибке из Logstash:
[2018-09-16T00:51:32,816][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-09-16T00:51:33,921][INFO ][logstash.inputs.office365managementapi] Starting Office 365 Management API input...
[2018-09-16T00:51:34,246][ERROR][logstash.pipeline ] Error registering plugin {:pipeline_id=>"main", :plugin=>"<LogStash::Inputs::Office365ManagementApi client_id=>\"redacted\", tenant_id=>\"redacted\", tenant_domain=>\"redacted\", private_key=>\"/tmp/o365.pfx\", subscriptions=>[\"Audit.AzureActiveDirectory\", \"Audit.Exchange\", \"Audit.SharePoint\", \"Audit.General\", \"DLP.All\"], id=>\"fb61b83b76494f098a0a7e24391779ee1212f0d9adf8ef8dedae4424e8dedb57\", enable_metric=>true, codec=><LogStash::Codecs::Plain id=>\"plain_c7c9d514-5d23-459d-98ea-87d250e7a00c\", enable_metric=>true, charset=>\"UTF-8\">, resource=>\"https://manage.office.com\">", :error=>"uninitialized constant LogStash::Inputs::Office365ManagementApi::ADAL::Logging\nDid you mean? LogStash::Logging", :thread=>"#<Thread:0xca2e135 run>"}
[2018-09-16T00:51:34,367][ERROR][logstash.pipeline ] Pipeline aborted due to error {:pipeline_id=>"main", :exception=>#<NameError: uninitialized constant LogStash::Inputs::Office365ManagementApi::ADAL::Logging
Did you mean? LogStash::Logging>, :backtrace=>["org/jruby/RubyModule.java:3343:in `const_missing'", "/usr/local/Cellar/logstash/6.2.4/libexec/vendor/local_gems/82bdbf8d/logstash-input-office365_management_api-1.0.0/lib/logstash/inputs/office365_management_api.rb:70:in `register'", "/usr/local/Cellar/logstash/6.2.4/libexec/logstash-core/lib/logstash/pipeline.rb:342:in `register_plugin'", "/usr/local/Cellar/logstash/6.2.4/libexec/logstash-core/lib/logstash/pipeline.rb:353:in `block in register_plugins'", "org/jruby/RubyArray.java:1734:in `each'", "/usr/local/Cellar/logstash/6.2.4/libexec/logstash-core/lib/logstash/pipeline.rb:353:in `register_plugins'", "/usr/local/Cellar/logstash/6.2.4/libexec/logstash-core/lib/logstash/pipeline.rb:500:in `start_inputs'", "/usr/local/Cellar/logstash/6.2.4/libexec/logstash-core/lib/logstash/pipeline.rb:394:in `start_workers'", "/usr/local/Cellar/logstash/6.2.4/libexec/logstash-core/lib/logstash/pipeline.rb:290:in `run'", "/usr/local/Cellar/logstash/6.2.4/libexec/logstash-core/lib/logstash/pipeline.rb:250:in `block in start'"], :thread=>"#<Thread:0xca2e135 run>"}
[2018-09-16T00:51:34,418][ERROR][logstash.agent ] Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: LogStash::PipelineAction::Create/pipeline_id:main, action_result: false", :backtrace=>nil}
Код ниже:
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "stud/interval"
require "socket" # for Socket.gethostname
require "json"
require 'net/http'
require 'uri'
# Using this input you can receive activities from the Office 365 Management API
# ==== Security
# This plugin utilizes certificate authentication with the Office 365 Management API
# to generate an access token, which is then used for all subsequent API activities.
# If the token expires, the plugin will request a new token automatically.
# All communication for this plugin is encrypted by SSL/TLS communication.
class LogStash::Inputs::Office365ManagementApi < LogStash::Inputs::Base
config_name "office365_management_api"
# Codec used to decode the incoming data.
# This codec will be used as a fall-back if the content-type
# is not found in the "additional_codecs" hash
default :codec, "plain"
# Fix for broken ruby ADAL
module ADAL
class TokenRequest
module GrantType
JWT_BEARER = 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer'
# Client ID generated through your custom application in Azure AD
# https://portal.azure.com/#blade/Microsoft_AAD_IAM/ActiveDirectoryMenuBlade/RegisteredApps
config :client_id, :validate => :string, :required => true
# Tenant ID/Directory ID of your Office 365 tenant
# https://portal.azure.com/#blade/Microsoft_AAD_IAM/ActiveDirectoryMenuBlade/Properties
config :tenant_id, :validate => :string, :required => true
# Your Office 365 tenant domain, ie. yourdomain.onmicrosoft.com
config :tenant_domain, :validate => :string, :required => true
# Resource you are requesting access to. This defaults to https://manage.office.com and shouldn't change unless necessary.
config :resource, :validate => :string, :default => 'https://manage.office.com'
# PFX Private key for your Application Certificate you created
config :private_key, :validate => :path
# Private key password if one was used
config :private_key_password, :validate => :string, :default => nil
# Activity subscriptions you want to monitor
# These can be one or many of:
# Audit.AzureActiveDirectory
# Audit.Exchange
# Audit.Sharepoint
# Audit.General
# DLP.All
config :subscriptions, :validate => :array, :default => ["Audit.AzureActiveDirectory", "Audit.Exchange", "Audit.SharePoint", "Audit.General", "DLP.All"]
def register
require "adal"
@logger.info("Starting Office 365 Management API input...")
@host = Socket.gethostname
# ADAL supports four logging options: VERBOSE, INFO, WARN and ERROR.
ADAL::Logging.log_level = ADAL::Logger::VERBOSE
end # def register
def get_token
@logger.info("Generating access token...")
if @private_key_password.nil?
pfx = OpenSSL::PKCS12.new(File.read(@private_key))
pfx = OpenSSL::PKCS12.new(File.read(@private_key), @private_key_password)
authority = ADAL::Authority.new("login.microsoftonline.com", @tenant_domain)
client_cred = ADAL::ClientAssertionCertificate.new(authority, @client_id, pfx)
result = ADAL::AuthenticationContext
.new("login.microsoftonline.com", @tenant_domain)
.acquire_token_for_client(@resource, client_cred)
case result
when ADAL::SuccessResponse
puts 'Successfully authenticated with client credentials. Received access ' "token: #{result.access_token}."
# Create class variable for reuse of Access Token
@access_token = result.access_token
@http_headers = {
'Authorization' => "Bearer #{@access_token}",
'Content-Type' => 'application/x-www-form-urlencoded'
when ADAL::FailureResponse
puts 'Failed to authenticate with client credentials. Received error: ' "#{result.error} and error description: #{result.error_description}."
exit 1
end #def get_token
def check_subscription
@logger.info("Checking for proper subscriptions...")
@subscriptions.each do |sub|
sub_uri = URI("https://manage.office.com/api/v1.0/#{@tenant_id}/activity/feed/subscriptions/start?contentType=#{sub}")
sub_http = Net::HTTP.new(sub_uri.host, sub_uri.port)
sub_http.use_ssl = true
sub_resp = http.post(sub_uri.request_uri, data = "", @http_headers)
case sub_resp
when Net::HTTPSuccess
puts "Created subscription to #{sub} in tenant #{@tenant_id}..."
when Net::HTTPUnauthorized
puts "Authentication Error Encountered: #{sub_resp.message}"
when Net::HTTPServerError
puts "Server Error Encountered: #{sub_resp.message}"
puts "Unknown Error Encountered: #{sub_resp.message}"
end #def check_subscription
def run(queue)
# we can abort the loop if stop? becomes true
while !stop?
#event = LogStash::Event.new("message" => @message, "host" => @host)
#queue << event
raise 'Error getting token' unless get_token().status == 0
# because the sleep interval can be big, when shutdown happens
# we want to be able to abort the sleep
# Stud.stoppable_sleep will frequently evaluate the given block
# and abort the sleep(@interval) if the return value is true
Stud.stoppable_sleep(@interval) { stop? }
end # loop
end # def run
def stop
# nothing to do in this case so it is not necessary to define stop
# examples of common "stop" tasks:
# * close sockets (unblocking blocking reads/accepts)
# * cleanup temporary files
# * terminate spawned threads
end # class LogStash::Inputs::Office365ManagementApi