add device anomaly detection demo#163
add device anomaly detection demo#163HaojieZhang6848 wants to merge 2 commits intokubeedge:masterfrom
Conversation
|
Welcome @HaojieZhang6848! It looks like this is your first PR to kubeedge/examples 🎉 |
Summary of ChangesHello @HaojieZhang6848, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request delivers a fully functional demonstration of device anomaly detection integrated with KubeEdge. It provides a practical example for developers to understand and implement ML-driven anomaly detection for IoT devices, leveraging KubeEdge's device management and communication protocols to update device statuses in Kubernetes. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a KubeEdge-based device anomaly detection demo. The changes include a new external-anomaly-detector Python gRPC service that uses a Bayesian Network model to detect anomalies in IoT device data, a room-simulator FastAPI application to simulate light devices and a brightness sensor with fault injection capabilities, and modifications to the me-restful-mapper Go application to integrate with the anomaly detection service via gRPC. The mapper now collects device property updates and sends them to the external anomaly detector, which then updates Kubernetes DeviceStatus CRDs with anomaly information. New Protobuf definitions, Docker Compose configurations, and detailed READMEs (English and Chinese) are added to support the demo. Review comments highlight several issues: a critical logical error where the anomaly status is updated for all devices instead of the specific one, masked errors in the Go mapper's SetDeviceData and GetDeviceData functions, inconsistencies in port numbers between the architecture diagram and implementation, incorrect plain text vs. JSON body expectations in fault injection examples across documentation and code, an improper string formatting in SetDeviceValue, and a security concern regarding the use of grpc.WithInsecure() in the demo. Additionally, there are suggestions to replace print statements with proper logging and to handle Kubernetes config loading errors more gracefully.
| for d_name in ["light-1", "light-2", "brightness-sensor-1"]: | ||
| api.patch_namespaced_custom_object( | ||
| group="devices.kubeedge.io", | ||
| version="v1beta1", | ||
| namespace="default", | ||
| plural="devicestatuses", | ||
| name=d_name, |
There was a problem hiding this comment.
The update_anomaly_status function currently updates the anomaly status for all three devices (light-1, light-2, brightness-sensor-1) regardless of which device the anomaly was detected for. This is a critical logical error. The anomaly status should only be updated for the specific device where the anomaly was observed (e.g., brightness-sensor-1 if its predicted brightness doesn't match the actual value).
api.patch_namespaced_custom_object(
group="devices.kubeedge.io",
version="v1beta1",
namespace="default",
plural="devicestatuses",
name="brightness-sensor-1", # Only update the sensor that reports brightness
body=patch
)| return nil | ||
| } |
| klog.Errorf("GetDeviceData failed for http://%s:%d%s, error: %v", c.ProtocolConfig.ConfigData.Ip, c.ProtocolConfig.ConfigData.Port, c.ProtocolConfig.ConfigData.Url, err) | ||
| return nil, nil |
There was a problem hiding this comment.
When an error occurs in GetDeviceData, it returns nil, nil. This masks the error, making it difficult for callers to distinguish between a successful call with no data and a failed call. It should return nil, err to propagate the error.
| klog.Errorf("GetDeviceData failed for http://%s:%d%s, error: %v", c.ProtocolConfig.ConfigData.Ip, c.ProtocolConfig.ConfigData.Port, c.ProtocolConfig.ConfigData.Url, err) | |
| return nil, nil | |
| return nil, err |
| subgraph "Cloud/Control Plane" | ||
| CloudCore[CloudCore] | ||
| K8s[Kubernetes API Server] | ||
| Detector[External Anomaly Detector<br/>gRPC Service<br/>Port 5000] |
There was a problem hiding this comment.
The architecture diagram indicates the External Anomaly Detector runs on Port 5000. However, the docker-compose.yaml and external-anomaly-detector/main.py configure it to run on Port 6804. This discrepancy should be corrected in the diagram for accuracy.
Detector[External Anomaly Detector<br/>gRPC Service<br/>Port 6804]
| response = requests.post( | ||
| f"{base_url}/lights/light1/fault", | ||
| json={"fault": True} |
There was a problem hiding this comment.
The Python requests example for injecting faults uses json={"fault": True}. This is inconsistent with the Python main.py implementation which expects a plain text body ('0' or '1'). Please correct the example to match the expected plain text input.
| response = requests.post( | |
| f"{base_url}/lights/light1/fault", | |
| json={"fault": True} | |
| response = requests.post( | |
| f"{base_url}/lights/light1/fault", | |
| data="1", | |
| headers={"Content-Type": "text/plain"} | |
| ) |
| # Fetch current value from k8s apiserver and store in memory | ||
| self._store_device_data(request.deviceName, request.propertyName, request.propertyValue) |
There was a problem hiding this comment.
The comment Fetch current value from k8s apiserver and store in memory is misleading. The propertyValue is received directly from the gRPC request, not fetched from the Kubernetes API server at this point.
| # Fetch current value from k8s apiserver and store in memory | |
| self._store_device_data(request.deviceName, request.propertyName, request.propertyValue) | |
| # Store received device data in memory |
| # Fetch current value from k8s apiserver and store in memory | ||
| self._store_device_data(request.deviceName, request.propertyName, request.propertyValue) |
There was a problem hiding this comment.
The comment Fetch current value from k8s apiserver and store in memory is misleading. The propertyValue is received directly from the gRPC request, not fetched from the Kubernetes API server at this point.
| # Fetch current value from k8s apiserver and store in memory | |
| self._store_device_data(request.deviceName, request.propertyName, request.propertyValue) | |
| # Store received device data in memory |
| ports: | ||
| - "6804:6804" | ||
| volumes: | ||
| - ~/.kube/config:/root/.kube/config:ro |
There was a problem hiding this comment.
Mounting ~/.kube/config directly into the container can be a security risk as it exposes the host's Kubernetes credentials. While acceptable for a demo, consider more secure methods for production deployments, such as using Kubernetes Secrets or service accounts, or limiting the scope of the mounted config.
| $ docker ps | ||
| a2cc34c56101 huajuan6848/external-anomaly-detector:1.0.0 "uv run python main.…" About an hour ago Up About an hour 0.0.0.0:6804->6804/tcp, :::6804->6804/tcp external-anomaly-detector | ||
| 6bc6d74a36de huajuan6848/room-simulator:1.0.0 "uv run uvicorn main…" About an hour ago Up About an hour 0.0.0.0:8000->8000/tcp, :::8000->8000/tcp room-simulator | ||
| 098a371611f7 huajuan6848/me-restful-mappper:1.0.0 "/kubeedge/main --co…" About an hour ago Up About an hour me-restful-mapper |
There was a problem hiding this comment.
Typo: "mappper" should be "mapper".
| 098a371611f7 huajuan6848/me-restful-mappper:1.0.0 "/kubeedge/main --co…" About an hour ago Up About an hour me-restful-mapper | |
| 098a371611f7 huajuan6848/me-restful-mapper:1.0.0 "/kubeedge/main --co…" About an hour ago Up About an hour me-restful-mapper |
| level=logging.INFO, | ||
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | ||
| handlers=[ | ||
| logging.FileHandler('room_simulator.log'), | ||
| logging.StreamHandler() | ||
| ] |
Signed-off-by: Haojie Zhang <hjzhang6848@163.com>
31c541e to
338cb78
Compare
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: HaojieZhang6848 The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Core logic changes:
- Change brightness calculation from a simple light count
(light1 + light2, range 0-2) to a weighted sum
(light1×1 + light2×2, range 0-3), making each combination of
light states uniquely observable and enabling unambiguous
per-device fault attribution
- Regenerate observed_data.csv and retrain the Bayesian Network
model with the new brightness scheme
- Add locate_fault() to external-anomaly-detector: when an anomaly
is detected, systematically flip each light's reported state and
re-run inference to identify the faulty device
- Refactor update_anomaly_status() to accept a per-device message
dict and update each device's DeviceStatus CRD independently,
instead of writing a single message to all devices
Bug fixes (me-restful-mapper):
- Fix type assertion panic: use safe type assertion with error
handling instead of direct cast for req.Data in driver.go
- Fix grpc_connection_manager: compare connectivity state using
typed constants (connectivity.Ready/Idle) instead of string
comparison; close stale connections before removing from pool;
remove redundant TCP pre-check before gRPC dial
- Fix http_client.go: pass string value directly instead of
fmt.Sprintf("%d", value) which produced wrong output for strings
- Fix adHandler call: remove unused dataModel and adConfig
parameters from signature
- Fix reportCycle guard: use <= 0 instead of == 0
Minor improvements:
- Replace print() with logger calls in model_training.py
- Use sys.exit(1) instead of re-raising on k8s config load failure
- Fix typo "Theorically" → "Theoretically" in README.md
- Fix device CRD filename references: brightness-sensor.yaml →
brightness-sensor-1.yaml in README and kubectl commands
- Update all docs to reflect new brightness scale and behavior
Signed-off-by: Haojie Zhang <hjzhang6848@163.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive device anomaly detection demo. The implementation is well-structured, covering device simulation, mapper integration, and an ML-based anomaly detection service. My review focuses on improving code clarity, robustness, and adherence to best practices. Key suggestions include refining the gRPC API definition for better clarity, improving error handling in the Python service, and updating to non-deprecated Go functions for better maintainability.
| ret, err := GetDeviceValue(c.ProtocolConfig.ConfigData.Ip, c.ProtocolConfig.ConfigData.Port, c.ProtocolConfig.ConfigData.Url) | ||
| if err != nil { | ||
| klog.Errorf("GetDeviceData failed for http://%s:%d%s, error: %v", c.ProtocolConfig.ConfigData.Ip, c.ProtocolConfig.ConfigData.Port, c.ProtocolConfig.ConfigData.Url, err) | ||
| return nil, nil |
| ## How to Run the Demo(One-Click with Docker Compose) | ||
|
|
||
| Before running the demo, please ensure your KubeEdge `cloudcore` and `edgecore` are properly set up and running, and include codes from PR [#6543](https://github.com/kubeedge/kubeedge/pull/6543). For convenience, the `room-simulator`, `external-anomaly-detector`, and `me-restful-mapper` are planned to be run in Docker containers, so please ensure Docker is installed on your system. If you prefer to run them directly on your host machine, please use `uv` to install the required Python dependencies for `room-simulator` and `external-anomaly-detector`, and build binary with Go for `me-restful-mapper`. |
There was a problem hiding this comment.
There are some minor grammatical and formatting issues in the documentation that could be improved for clarity.
| ## How to Run the Demo(One-Click with Docker Compose) | |
| Before running the demo, please ensure your KubeEdge `cloudcore` and `edgecore` are properly set up and running, and include codes from PR [#6543](https://github.com/kubeedge/kubeedge/pull/6543). For convenience, the `room-simulator`, `external-anomaly-detector`, and `me-restful-mapper` are planned to be run in Docker containers, so please ensure Docker is installed on your system. If you prefer to run them directly on your host machine, please use `uv` to install the required Python dependencies for `room-simulator` and `external-anomaly-detector`, and build binary with Go for `me-restful-mapper`. | |
| ## How to Run the Demo (One-Click with Docker Compose) | |
| Before running the demo, please ensure your KubeEdge `cloudcore` and `edgecore` are properly set up and running, and include the changes from PR [#6543](https://github.com/kubeedge/kubeedge/pull/6543). |
| service ExternalAnomalyDetector { | ||
| // Detects anomalies based on the provided data | ||
| rpc DetectAnomaly (AnomalyRequest) returns (AnomalyResponse); |
There was a problem hiding this comment.
The RPC name DetectAnomaly is a bit misleading. Based on its usage, this RPC is for reporting device property data, while the actual anomaly detection happens asynchronously. Renaming it to something like ReportPropertyData would better reflect its purpose and improve the API's clarity.
| service ExternalAnomalyDetector { | |
| // Detects anomalies based on the provided data | |
| rpc DetectAnomaly (AnomalyRequest) returns (AnomalyResponse); | |
| service ExternalAnomalyDetector { | |
| // Reports property data for anomaly detection | |
| rpc ReportPropertyData (AnomalyRequest) returns (AnomalyResponse); |
| # Add externalanomalydetectorpb to sys.path to fix generated code imports | ||
| sys.path.append(os.path.join(os.path.dirname(__file__), 'externalanomalydetectorpb')) |
There was a problem hiding this comment.
Modifying sys.path at runtime is generally discouraged as it can make the project harder to maintain and package. Since externalanomalydetectorpb is defined as a package in pyproject.toml, it should be directly importable after installation (e.g., with uv sync or pip install -e .). Please remove this sys.path manipulation.
| def _store_device_data(self, device_name, property_name, property_value): | ||
| """store device data in memory""" | ||
| with data_lock: | ||
| data_store[device_name] = int(property_value) | ||
| logger.info(f"Stored {device_name}.{property_name} = {property_value} in memory") |
There was a problem hiding this comment.
The int(property_value) conversion on line 60 could raise a ValueError. While the calling function DetectAnomaly has a broad except Exception block, handling this specific error within _store_device_data would allow for more precise logging and better error diagnostics.
def _store_device_data(self, device_name, property_name, property_value):
"""store device data in memory"""
try:
value = int(property_value)
with data_lock:
data_store[device_name] = value
logger.info(f"Stored {device_name}.{property_name} = {value} in memory")
except ValueError:
logger.error(f"Could not convert property value '{property_value}' to an integer for {device_name}.{property_name}")
raise| ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | ||
| defer cancel() | ||
|
|
||
| conn, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBlock()) |
There was a problem hiding this comment.
grpc.WithInsecure() is deprecated. Please use grpc.WithTransportCredentials(insecure.NewCredentials()) instead. You will also need to add "google.golang.org/grpc/credentials/insecure" to your imports.
| conn, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBlock()) | |
| conn, err := grpc.DialContext(ctx, address, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) |
| return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode) | ||
| } | ||
|
|
||
| body, err := ioutil.ReadAll(resp.Body) |
What type of PR is this?
/kind feature
What this PR does / why we need it:
This PR adds a comprehensive Device Anomaly Detection Demo to showcase the device anomaly detection capabilities introduced in KubeEdge PR #6543. The demo provides a complete, runnable example that demonstrates:
This example serves as a reference implementation for developers who want to:
Which issue(s) this PR fixes:
Relates to kubeedge/kubeedge#6543
Special notes for your reviewer:
docker-compose upcommand