Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <string>

#include "minifi-c.h"
#include "minifi-cpp/core/PropertyDefinition.h"
#include "nonstd/expected.hpp"

namespace org::apache::nifi::minifi::api::core {

class ControllerServiceContext {
public:
explicit ControllerServiceContext(MinifiControllerServiceContext* impl) : impl_(impl) {}

nonstd::expected<std::string, std::error_code> getProperty(std::string_view name) const;
nonstd::expected<std::string, std::error_code> getProperty(const minifi::core::PropertyReference& property_reference) const {
return getProperty(property_reference.name);
}

private:
MinifiControllerServiceContext* impl_;
};

} // namespace org::apache::nifi::minifi::api::core
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <memory>
#include <string>

#include "ControllerServiceContext.h"
#include "minifi-cpp/core/ControllerServiceMetadata.h"
#include "minifi-cpp/utils/Id.h"
#include "utils/SmallString.h"

namespace org::apache::nifi::minifi::api {

class Connection;
Comment thread
martinzink marked this conversation as resolved.

namespace core {

class ControllerServiceImpl {
public:
explicit ControllerServiceImpl(minifi::core::ControllerServiceMetadata metadata);

ControllerServiceImpl(const ControllerServiceImpl&) = delete;
ControllerServiceImpl(ControllerServiceImpl&&) = delete;
ControllerServiceImpl& operator=(const ControllerServiceImpl&) = delete;
ControllerServiceImpl& operator=(ControllerServiceImpl&&) = delete;

virtual ~ControllerServiceImpl();

MinifiStatus enable(ControllerServiceContext&);
void notifyStop();

[[nodiscard]] std::string getName() const;
[[nodiscard]] minifi::utils::Identifier getUUID() const;
[[nodiscard]] minifi::utils::SmallString<36> getUUIDStr() const;

protected:
virtual MinifiStatus enableImpl(api::core::ControllerServiceContext&) = 0;
virtual void notifyStopImpl() {}

minifi::core::ControllerServiceMetadata metadata_;

std::shared_ptr<minifi::core::logging::Logger> logger_;
};

} // namespace core
} // namespace org::apache::nifi::minifi::api
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class ProcessContext {
nonstd::expected<std::string, std::error_code> getProperty(const minifi::core::PropertyReference& property_reference, const FlowFile* flow_file = nullptr) const {
return getProperty(property_reference.name, flow_file);
}
nonstd::expected<MinifiControllerService*, std::error_code> getControllerService(std::string_view controller_service_name, std::string_view controller_service_class) const;

bool hasNonEmptyProperty(std::string_view name) const;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
#pragma once

#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

Expand Down Expand Up @@ -53,7 +51,7 @@ class ProcessorImpl {

virtual ~ProcessorImpl();

void setTriggerWhenEmpty(bool trigger_when_empty) {
void setTriggerWhenEmpty(const bool trigger_when_empty) {
trigger_when_empty_ = trigger_when_empty;
}

Expand All @@ -74,8 +72,8 @@ class ProcessorImpl {
static constexpr auto OutputAttributes = std::array<minifi::core::OutputAttributeReference, 0>{};

std::string getName() const;
utils::Identifier getUUID() const;
utils::SmallString<36> getUUIDStr() const;
minifi::utils::Identifier getUUID() const;
minifi::utils::SmallString<36> getUUIDStr() const;

virtual PublishedMetrics calculateMetrics() const {return {};}

Expand All @@ -89,9 +87,6 @@ class ProcessorImpl {
std::atomic<bool> trigger_when_empty_;

std::shared_ptr<minifi::core::logging::Logger> logger_;

private:
mutable std::mutex mutex_;
};

} // namespace core
Expand Down
66 changes: 55 additions & 11 deletions extension-framework/cpp-extension-lib/include/api/core/Resource.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@
#define WIN32_LEAN_AND_MEAN 1
#endif

#include "minifi-c.h"
#include "core/ClassName.h"
#include "api/utils/minifi-c-utils.h"
#include "ControllerServiceContext.h"
#include "FlowFile.h"
#include "ProcessContext.h"
#include "ProcessSession.h"
#include "FlowFile.h"
#include "minifi-cpp/core/ProcessorMetadata.h"
#include "api/utils/minifi-c-utils.h"
#include "core/ClassName.h"
#include "logging/Logger.h"
#include "minifi-c.h"
#include "minifi-cpp/core/ControllerServiceMetadata.h"
#include "minifi-cpp/core/ProcessorMetadata.h"

namespace org::apache::nifi::minifi::api::core {

Expand Down Expand Up @@ -94,11 +96,12 @@ void useProcessorClassDescription(Fn&& fn) {

.callbacks = MinifiProcessorCallbacks{
.create = [] (MinifiProcessorMetadata metadata) -> MINIFI_OWNED void* {
return new Class{minifi::core::ProcessorMetadata{
.uuid = minifi::utils::Identifier::parse(std::string{metadata.uuid.data, metadata.uuid.length}).value(),
.name = std::string{metadata.name.data, metadata.name.length},
.logger = std::make_shared<logging::Logger>(metadata.logger)
}};
try {
return new Class{minifi::core::ProcessorMetadata{
.uuid = minifi::utils::Identifier::parse(std::string{metadata.uuid.data, metadata.uuid.length}).value(),
.name = std::string{metadata.name.data, metadata.name.length},
.logger = std::make_shared<logging::Logger>(metadata.logger)}};
} catch (...) { return nullptr; }
},
.destroy = [] (MINIFI_OWNED void* self) -> void {
delete static_cast<Class*>(self);
Expand Down Expand Up @@ -127,7 +130,9 @@ void useProcessorClassDescription(Fn&& fn) {
}
},
.onUnSchedule = [] (void* self) -> void {
static_cast<Class*>(self)->onUnSchedule();
try {
static_cast<Class*>(self)->onUnSchedule();
} catch (...) {}
},
.calculateMetrics = [] (void* self) -> MINIFI_OWNED MinifiPublishedMetrics* {
auto metrics = static_cast<Class*>(self)->calculateMetrics();
Expand All @@ -145,4 +150,43 @@ void useProcessorClassDescription(Fn&& fn) {
fn(description);
}

template<typename Class, typename Fn>
void useControllerServiceClassDescription(Fn&& fn) {
std::vector<std::vector<MinifiStringView>> string_vector_cache;

const auto full_name = minifi::core::className<Class>();

std::vector<MinifiPropertyDefinition> class_properties = utils::toProperties(Class::Properties, string_vector_cache);

MinifiControllerServiceClassDefinition description{.full_name = utils::toStringView(full_name),
.description = utils::toStringView(Class::Description),
.class_properties_count = gsl::narrow<uint32_t>(class_properties.size()),
.class_properties_ptr = class_properties.data(),

.callbacks = MinifiControllerServiceCallbacks{
.create = [](MinifiControllerServiceMetadata metadata) -> MINIFI_OWNED void* {
try {
return new Class{minifi::core::ControllerServiceMetadata{
.uuid = minifi::utils::Identifier::parse(std::string{metadata.uuid.data, metadata.uuid.length}).value(),
.name = std::string{metadata.name.data, metadata.name.length},
.logger = std::make_shared<logging::Logger>(metadata.logger)}};
} catch (...) { return nullptr; }
},
.destroy = [](MINIFI_OWNED void* self) -> void { delete static_cast<Class*>(self); },
.enable = [](void* self, MinifiControllerServiceContext* context) -> MinifiStatus {
ControllerServiceContext context_wrapper(context);
try {
return static_cast<Class*>(self)->enable(context_wrapper);
} catch (...) { return MINIFI_STATUS_UNKNOWN_ERROR; }
},
.notifyStop = [](void* self) -> void {
try {
static_cast<Class*>(self)->notifyStop();
} catch (...) {}
},
}};

fn(description);
}

} // namespace org::apache::nifi::minifi::api::core
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,28 @@ std::optional<T> parseOptionalEnumProperty(const core::ProcessContext& context,
return result.value();
}

template<typename ControllerServiceType>
ControllerServiceType* parseOptionalControllerService(const core::ProcessContext& context, const minifi::core::PropertyReference& prop) {
const auto controller_service_name = context.getProperty(prop.name);
if (!controller_service_name || controller_service_name->empty()) {
return nullptr;
}

nonstd::expected<MinifiControllerService*, std::error_code> service = context.getControllerService(*controller_service_name, minifi::core::className<ControllerServiceType>());
if (!service) {
return nullptr;
}

return reinterpret_cast<ControllerServiceType*>(*service);
}
Comment thread
martinzink marked this conversation as resolved.

template<typename ControllerServiceType>
gsl::not_null<ControllerServiceType*> parseControllerService(const core::ProcessContext& context, const minifi::core::PropertyReference& prop) {
auto controller_service = parseOptionalControllerService<ControllerServiceType>(context, prop);
if (!controller_service) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Required controller service property '{}' is missing", prop.name));
}
return gsl::make_not_null(controller_service);
}

} // namespace org::apache::nifi::minifi::api::utils
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Expand All @@ -17,22 +17,40 @@
*/
#pragma once

#include "api/core/Resource.h"
#include "core/Processor.h"
#include "core/controller/ControllerService.h"
#include "minifi-cpp/core/ControllerServiceMetadata.h"
#include "minifi-cpp/core/ProcessorMetadata.h"
#include "api/core/Resource.h"
#include "utils/CControllerService.h"
#include "utils/CProcessor.h"
#include "minifi-cpp/agent/agent_docs.h"

namespace org::apache::nifi::minifi::test::utils {

template<typename T, typename ...Args>
std::unique_ptr<minifi::core::Processor> make_custom_c_processor(minifi::core::ProcessorMetadata metadata, Args&&... args) { // NOLINT(cppcoreguidelines-missing-std-forward)
template<typename T, typename... Args>
std::unique_ptr<minifi::core::Processor> make_custom_c_processor(minifi::core::ProcessorMetadata metadata,
Args&&... args) { // NOLINT(cppcoreguidelines-missing-std-forward)
std::unique_ptr<minifi::core::ProcessorApi> processor_impl;
minifi::api::core::useProcessorClassDescription<T>([&] (const MinifiProcessorClassDefinition& description) {
minifi::utils::useCProcessorClassDescription(description, [&] (const auto&, auto c_description) {
minifi::api::core::useProcessorClassDescription<T>([&](const MinifiProcessorClassDefinition& description) {
minifi::utils::useCProcessorClassDescription(description, [&](const auto&, auto c_description) {
processor_impl = std::make_unique<minifi::utils::CProcessor>(std::move(c_description), metadata, new T(metadata, std::forward<Args>(args)...));
});
});
return std::make_unique<minifi::core::Processor>(metadata.name, metadata.uuid, std::move(processor_impl));
}

template<typename T, typename... Args>
std::shared_ptr<minifi::core::controller::ControllerService> make_custom_c_controller_service(minifi::core::ControllerServiceMetadata metadata,
Args&&... args) { // NOLINT(cppcoreguidelines-missing-std-forward)
std::unique_ptr<minifi::core::controller::ControllerServiceApi> controller_service_impl;
minifi::api::core::useControllerServiceClassDescription<T>([&](const MinifiControllerServiceClassDefinition& description) {
minifi::utils::useCControllerServiceClassDescription(description, [&](const auto&, auto c_description) {
controller_service_impl = std::make_unique<minifi::utils::CControllerService>(std::move(c_description),
metadata,
new T(metadata, std::forward<Args>(args)...));
});
});
return std::make_shared<minifi::core::controller::ControllerService>(metadata.name, metadata.uuid, std::move(controller_service_impl));
}
} // namespace org::apache::nifi::minifi::test::utils
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "api/core/ControllerServiceContext.h"
#include "api/utils/minifi-c-utils.h"

namespace org::apache::nifi::minifi::api::core {

nonstd::expected<std::string, std::error_code> ControllerServiceContext::getProperty(const std::string_view name) const {
std::optional<std::string> value = std::nullopt;
const MinifiStatus status = MinifiControllerServiceContextGetProperty(impl_, utils::toStringView(name),
[] (void* data, const MinifiStringView result) {
(*static_cast<std::optional<std::string>*>(data)) = std::string(result.data, result.length);
}, &value);

if (status != MINIFI_STATUS_SUCCESS) {
return nonstd::make_unexpected(utils::make_error_code(status));
}

if (!value) {
return nonstd::make_unexpected(utils::make_error_code(MINIFI_STATUS_UNKNOWN_ERROR));
}
return value.value();
}
Comment thread
martinzink marked this conversation as resolved.

} // namespace org::apache::nifi::minifi::api::core
Loading
Loading